[#42] Support expiration lifecycle
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
28723f4a68
commit
481520705a
22 changed files with 1207 additions and 60 deletions
148
api/layer/lifecycle.go
Normal file
148
api/layer/lifecycle.go
Normal file
|
@ -0,0 +1,148 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
apiErr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type PutBucketLifecycleParams struct {
|
||||
BktInfo *data.BucketInfo
|
||||
LifecycleCfg *data.LifecycleConfiguration
|
||||
LifecycleReader io.Reader
|
||||
CopiesNumbers []uint32
|
||||
MD5Hash string
|
||||
}
|
||||
|
||||
func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucketLifecycleParams) error {
|
||||
prm := PrmObjectCreate{
|
||||
Payload: p.LifecycleReader,
|
||||
Filepath: p.BktInfo.LifecycleConfigurationObjectName(),
|
||||
CreationTime: TimeNow(ctx),
|
||||
}
|
||||
|
||||
var lifecycleBkt *data.BucketInfo
|
||||
if n.lifecycleCnrInfo == nil {
|
||||
lifecycleBkt = p.BktInfo
|
||||
prm.CopiesNumber = p.CopiesNumbers
|
||||
} else {
|
||||
lifecycleBkt = n.lifecycleCnrInfo
|
||||
prm.PrmAuth.PrivateKey = &n.gateKey.PrivateKey
|
||||
}
|
||||
|
||||
prm.Container = lifecycleBkt.CID
|
||||
|
||||
_, objID, _, md5, err := n.objectPutAndHash(ctx, prm, lifecycleBkt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("put lifecycle object: %w", err)
|
||||
}
|
||||
|
||||
hashBytes, err := base64.StdEncoding.DecodeString(p.MD5Hash)
|
||||
if err != nil {
|
||||
return apiErr.GetAPIError(apiErr.ErrInvalidDigest)
|
||||
}
|
||||
|
||||
if !bytes.Equal(hashBytes, md5) {
|
||||
n.deleteLifecycleObject(ctx, p.BktInfo, newAddress(lifecycleBkt.CID, objID))
|
||||
|
||||
return apiErr.GetAPIError(apiErr.ErrInvalidDigest)
|
||||
}
|
||||
|
||||
objsToDelete, err := n.treeService.PutBucketLifecycleConfiguration(ctx, p.BktInfo, newAddress(lifecycleBkt.CID, objID))
|
||||
objsToDeleteNotFound := errors.Is(err, ErrNoNodeToRemove)
|
||||
if err != nil && !objsToDeleteNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if !objsToDeleteNotFound {
|
||||
for _, addr := range objsToDelete {
|
||||
n.deleteLifecycleObject(ctx, p.BktInfo, addr)
|
||||
}
|
||||
}
|
||||
|
||||
n.cache.PutLifecycleConfiguration(n.BearerOwner(ctx), p.BktInfo, p.LifecycleCfg)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteLifecycleObject removes object and logs in case of error.
|
||||
func (n *Layer) deleteLifecycleObject(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) {
|
||||
var prmAuth PrmAuth
|
||||
lifecycleBkt := bktInfo
|
||||
if !addr.Container().Equals(bktInfo.CID) {
|
||||
lifecycleBkt = &data.BucketInfo{CID: addr.Container()}
|
||||
prmAuth.PrivateKey = &n.gateKey.PrivateKey
|
||||
}
|
||||
|
||||
if err := n.objectDeleteWithAuth(ctx, lifecycleBkt, addr.Object(), prmAuth); err != nil {
|
||||
n.reqLogger(ctx).Error(logs.CouldntDeleteLifecycleObject, zap.Error(err),
|
||||
zap.String("cid", lifecycleBkt.CID.EncodeToString()),
|
||||
zap.String("oid", addr.Object().EncodeToString()))
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Layer) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.LifecycleConfiguration, error) {
|
||||
owner := n.BearerOwner(ctx)
|
||||
if cfg := n.cache.GetLifecycleConfiguration(owner, bktInfo); cfg != nil {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
addr, err := n.treeService.GetBucketLifecycleConfiguration(ctx, bktInfo)
|
||||
objNotFound := errors.Is(err, ErrNodeNotFound)
|
||||
if err != nil && !objNotFound {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if objNotFound {
|
||||
return nil, fmt.Errorf("%w: %s", apiErr.GetAPIError(apiErr.ErrNoSuchLifecycleConfiguration), err.Error())
|
||||
}
|
||||
|
||||
var prmAuth PrmAuth
|
||||
lifecycleBkt := bktInfo
|
||||
if !addr.Container().Equals(bktInfo.CID) {
|
||||
lifecycleBkt = &data.BucketInfo{CID: addr.Container()}
|
||||
prmAuth.PrivateKey = &n.gateKey.PrivateKey
|
||||
}
|
||||
|
||||
obj, err := n.objectGetWithAuth(ctx, lifecycleBkt, addr.Object(), prmAuth)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get lifecycle object: %w", err)
|
||||
}
|
||||
|
||||
lifecycleCfg := &data.LifecycleConfiguration{}
|
||||
|
||||
if err = xml.NewDecoder(obj.Payload).Decode(&lifecycleCfg); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal lifecycle configuration: %w", err)
|
||||
}
|
||||
|
||||
n.cache.PutLifecycleConfiguration(owner, bktInfo, lifecycleCfg)
|
||||
|
||||
return lifecycleCfg, nil
|
||||
}
|
||||
|
||||
func (n *Layer) DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
objs, err := n.treeService.DeleteBucketLifecycleConfiguration(ctx, bktInfo)
|
||||
objsNotFound := errors.Is(err, ErrNoNodeToRemove)
|
||||
if err != nil && !objsNotFound {
|
||||
return err
|
||||
}
|
||||
if !objsNotFound {
|
||||
for _, addr := range objs {
|
||||
n.deleteLifecycleObject(ctx, bktInfo, addr)
|
||||
}
|
||||
}
|
||||
|
||||
n.cache.DeleteLifecycleConfiguration(bktInfo)
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue