[#42] Fix using separate container for lifecycles

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-07-23 14:53:34 +03:00
parent d8ed320781
commit ccaf048b9d
8 changed files with 131 additions and 90 deletions

View file

@ -165,18 +165,14 @@ func prepareHandlerContextBase(t *testing.T, cacheCfg *layer.CachesConfig) *hand
features := &layer.FeatureSettingsMock{}
res, err := tp.CreateContainer(context.Background(), layer.PrmContainerCreate{Name: ".bucket-lifecycles"})
require.NoError(t, err)
layerCfg := &layer.Config{
Cache: layer.NewCache(cacheCfg),
AnonKey: layer.AnonymousKey{Key: key},
Resolver: testResolver,
TreeService: treeMock,
Features: features,
GateOwner: owner,
LifecycleCnrInfo: &data.BucketInfo{CID: res.ContainerID},
GateKey: key,
Cache: layer.NewCache(cacheCfg),
AnonKey: layer.AnonymousKey{Key: key},
Resolver: testResolver,
TreeService: treeMock,
Features: features,
GateOwner: owner,
GateKey: key,
}
var pp netmap.PlacementPolicy

View file

@ -820,7 +820,22 @@ func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
}
n.cache.DeleteBucket(p.BktInfo)
return n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken)
lifecycleObj, treeErr := n.treeService.GetBucketLifecycleConfiguration(ctx, p.BktInfo)
if treeErr != nil {
n.reqLogger(ctx).Error(logs.GetBucketLifecycle, zap.Error(treeErr))
}
err = n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken)
if err != nil {
return fmt.Errorf("delete container: %w", err)
}
if treeErr == nil && !lifecycleObj.Container().Equals(p.BktInfo.CID) {
n.deleteLifecycleObject(ctx, p.BktInfo, lifecycleObj)
}
return nil
}
func (n *Layer) GetNetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {

View file

@ -12,7 +12,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apiErr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -45,7 +44,7 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke
createdObj, err := n.objectPutAndHash(ctx, prm, lifecycleBkt)
if err != nil {
return err
return fmt.Errorf("put lifecycle object: %w", err)
}
hashBytes, err := base64.StdEncoding.DecodeString(p.MD5Hash)
@ -54,19 +53,19 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke
}
if !bytes.Equal(hashBytes, createdObj.MD5Sum) {
n.deleteLifecycleObject(ctx, lifecycleBkt, createdObj.ID)
n.deleteLifecycleObject(ctx, p.BktInfo, newAddress(lifecycleBkt.CID, createdObj.ID))
return apiErr.GetAPIError(apiErr.ErrInvalidDigest)
}
objIDToDelete, err := n.treeService.PutBucketLifecycleConfiguration(ctx, p.BktInfo, createdObj.ID)
objIDToDeleteNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !objIDToDeleteNotFound {
objToDelete, err := n.treeService.PutBucketLifecycleConfiguration(ctx, p.BktInfo, newAddress(lifecycleBkt.CID, createdObj.ID))
objToDeleteNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !objToDeleteNotFound {
return err
}
if !objIDToDeleteNotFound {
n.deleteLifecycleObject(ctx, lifecycleBkt, objIDToDelete)
if !objToDeleteNotFound {
n.deleteLifecycleObject(ctx, p.BktInfo, objToDelete)
}
n.cache.PutLifecycleConfiguration(n.BearerOwner(ctx), p.BktInfo, p.LifecycleCfg)
@ -75,18 +74,18 @@ func (n *Layer) PutBucketLifecycleConfiguration(ctx context.Context, p *PutBucke
}
// deleteLifecycleObject removes object and logs in case of error.
func (n *Layer) deleteLifecycleObject(ctx context.Context, lifecycleBkt *data.BucketInfo, objID oid.ID) {
var err error
if n.lifecycleCnrInfo == nil {
err = n.objectDelete(ctx, lifecycleBkt, objID)
} else {
err = n.objectDeleteWithAuth(ctx, lifecycleBkt, objID, PrmAuth{PrivateKey: &n.gateKey.PrivateKey})
func (n *Layer) deleteLifecycleObject(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) {
var prmAuth PrmAuth
lifecycleBkt := bktInfo
if !addr.Container().Equals(bktInfo.CID) {
lifecycleBkt = &data.BucketInfo{CID: addr.Container()}
prmAuth.PrivateKey = &n.gateKey.PrivateKey
}
if err != nil {
if err := n.objectDeleteWithAuth(ctx, lifecycleBkt, addr.Object(), prmAuth); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteLifecycleObject, zap.Error(err),
zap.String("cid", lifecycleBkt.CID.EncodeToString()),
zap.String("oid", objID.EncodeToString()))
zap.String("oid", addr.Object().EncodeToString()))
}
}
@ -96,25 +95,26 @@ func (n *Layer) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *da
return cfg, nil
}
objID, err := n.treeService.GetBucketLifecycleConfiguration(ctx, bktInfo)
objIDNotFound := errors.Is(err, ErrNodeNotFound)
if err != nil && !objIDNotFound {
addr, err := n.treeService.GetBucketLifecycleConfiguration(ctx, bktInfo)
objNotFound := errors.Is(err, ErrNodeNotFound)
if err != nil && !objNotFound {
return nil, err
}
if objIDNotFound {
if objNotFound {
return nil, fmt.Errorf("%w: %s", apiErr.GetAPIError(apiErr.ErrNoSuchLifecycleConfiguration), err.Error())
}
var obj *object.Object
if n.lifecycleCnrInfo == nil {
if obj, err = n.objectGet(ctx, bktInfo, objID); err != nil {
return nil, err
}
} else {
if obj, err = n.objectGetWithAuth(ctx, n.lifecycleCnrInfo, objID, PrmAuth{PrivateKey: &n.gateKey.PrivateKey}); err != nil {
return nil, err
}
var prmAuth PrmAuth
lifecycleBkt := bktInfo
if !addr.Container().Equals(bktInfo.CID) {
lifecycleBkt = &data.BucketInfo{CID: addr.Container()}
prmAuth.PrivateKey = &n.gateKey.PrivateKey
}
obj, err := n.objectGetWithAuth(ctx, lifecycleBkt, addr.Object(), prmAuth)
if err != nil {
return nil, fmt.Errorf("get lifecycle object: %w", err)
}
lifecycleCfg := &data.LifecycleConfiguration{}
@ -129,20 +129,22 @@ func (n *Layer) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *da
}
func (n *Layer) DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) error {
objID, err := n.treeService.DeleteBucketLifecycleConfiguration(ctx, bktInfo)
objIDNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !objIDNotFound {
obj, err := n.treeService.DeleteBucketLifecycleConfiguration(ctx, bktInfo)
objNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !objNotFound {
return err
}
if !objIDNotFound {
if n.lifecycleCnrInfo == nil {
if err = n.objectDelete(ctx, bktInfo, objID); err != nil {
return err
}
} else {
if err = n.objectDeleteWithAuth(ctx, n.lifecycleCnrInfo, objID, PrmAuth{PrivateKey: &n.gateKey.PrivateKey}); err != nil {
return err
}
if !objNotFound {
var prmAuth PrmAuth
lifecycleBkt := bktInfo
if !obj.Container().Equals(bktInfo.CID) {
lifecycleBkt = &data.BucketInfo{CID: obj.Container()}
prmAuth.PrivateKey = &n.gateKey.PrivateKey
}
if err = n.objectDeleteWithAuth(ctx, lifecycleBkt, obj.Object(), prmAuth); err != nil {
return fmt.Errorf("delete lifecycle object: %w", err)
}
}

View file

@ -392,49 +392,49 @@ LOOP:
return result, nil
}
func (t *TreeServiceMock) PutBucketLifecycleConfiguration(_ context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
func (t *TreeServiceMock) PutBucketLifecycleConfiguration(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) {
systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
if !ok {
systemMap = make(map[string]*data.BaseNodeVersion)
}
systemMap["lifecycle"] = &data.BaseNodeVersion{
OID: objID,
OID: addr.Object(),
}
t.system[bktInfo.CID.EncodeToString()] = systemMap
return oid.ID{}, ErrNoNodeToRemove
return oid.Address{}, ErrNoNodeToRemove
}
func (t *TreeServiceMock) GetBucketLifecycleConfiguration(_ context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
func (t *TreeServiceMock) GetBucketLifecycleConfiguration(_ context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
if !ok {
return oid.ID{}, ErrNodeNotFound
return oid.Address{}, ErrNodeNotFound
}
node, ok := systemMap["lifecycle"]
if !ok {
return oid.ID{}, ErrNodeNotFound
return oid.Address{}, ErrNodeNotFound
}
return node.OID, nil
return newAddress(bktInfo.CID, node.OID), nil
}
func (t *TreeServiceMock) DeleteBucketLifecycleConfiguration(_ context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
func (t *TreeServiceMock) DeleteBucketLifecycleConfiguration(_ context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
if !ok {
return oid.ID{}, ErrNoNodeToRemove
return oid.Address{}, ErrNoNodeToRemove
}
node, ok := systemMap["lifecycle"]
if !ok {
return oid.ID{}, ErrNoNodeToRemove
return oid.Address{}, ErrNoNodeToRemove
}
delete(systemMap, "lifecycle")
return node.OID, nil
return newAddress(bktInfo.CID, node.OID), nil
}
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {

View file

@ -63,9 +63,9 @@ type TreeService interface {
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error)
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error)
PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oldObjIDToDelete oid.ID, err error)
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oldObjToDelete oid.Address, err error)
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)
DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)
// Compound methods for optimizations

View file

@ -168,13 +168,12 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
layerCfg := &Config{
Cache: NewCache(config),
AnonKey: AnonymousKey{Key: key},
TreeService: NewTreeService(),
Features: &FeatureSettingsMock{},
GateOwner: owner,
LifecycleCnrInfo: &data.BucketInfo{CID: res.ContainerID},
GateKey: key,
Cache: NewCache(config),
AnonKey: AnonymousKey{Key: key},
TreeService: NewTreeService(),
Features: &FeatureSettingsMock{},
GateOwner: owner,
GateKey: key,
}
return &testContext{

View file

@ -153,4 +153,5 @@ const (
CouldntCacheLifecycleConfiguration = "couldn't cache lifecycle configuration"
CouldNotFetchLifecycleContainerInfo = "couldn't fetch lifecycle container info"
FoundSeveralBucketLifecycleNodes = "found several bucket lifecycle nodes, latest be used"
GetBucketLifecycle = "get bucket lifecycle"
)

View file

@ -15,6 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -84,6 +85,7 @@ const (
ownerKeyKV = "ownerKey"
lockConfigurationKV = "LockConfiguration"
oidKV = "OID"
cidKV = "CID"
isCombinedKV = "IsCombined"
isUnversionedKV = "IsUnversioned"
@ -1374,22 +1376,23 @@ func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipart
return result, nil
}
func (c *Tree) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
func (c *Tree) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) (oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketLifecycleFilename})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
return oid.Address{}, fmt.Errorf("couldn't get node: %w", err)
}
meta := make(map[string]string)
meta[FileNameKey] = bucketLifecycleFilename
meta[oidKV] = objID.EncodeToString()
meta[oidKV] = addr.Object().EncodeToString()
meta[cidKV] = addr.Container().EncodeToString()
if isErrNotFound {
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
return oid.ID{}, err
return oid.Address{}, err
}
return oid.ID{}, layer.ErrNoNodeToRemove
return oid.Address{}, layer.ErrNoNodeToRemove
}
ind := node.GetLatestNodeIndex()
@ -1397,22 +1400,27 @@ func (c *Tree) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *dat
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketLifecycleNodes)
}
return node.ObjID, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta)
}
func (c *Tree) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketLifecycleFilename})
prevAddr, err := getAddress(node)
if err != nil {
return oid.ID{}, err
return oid.Address{}, fmt.Errorf("couldn't get prev lifecycle object addr: %w", err)
}
return node.ObjID, nil
return prevAddr, c.service.MoveNode(ctx, bktInfo, systemTree, node.ID[ind], 0, meta)
}
func (c *Tree) DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
func (c *Tree) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketLifecycleFilename})
if err != nil {
return oid.Address{}, err
}
return getAddress(node)
}
func (c *Tree) DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketLifecycleFilename})
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) {
return oid.ID{}, err
return oid.Address{}, err
}
if node != nil {
@ -1421,10 +1429,30 @@ func (c *Tree) DeleteBucketLifecycleConfiguration(ctx context.Context, bktInfo *
c.reqLogger(ctx).Warn(logs.FoundSeveralBucketLifecycleNodes)
}
return node.ObjID, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind])
addr, err := getAddress(node)
if err != nil {
return oid.Address{}, fmt.Errorf("couldn't get lifecycle object addr: %w", err)
}
return addr, c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind])
}
return oid.ID{}, layer.ErrNoNodeToRemove
return oid.Address{}, layer.ErrNoNodeToRemove
}
func getAddress(node *treeNode) (oid.Address, error) {
var addr oid.Address
addr.SetObject(node.ObjID)
if cidStr, ok := node.Get(cidKV); ok {
var cnrID cid.ID
if err := cnrID.DecodeString(cidStr); err != nil {
return oid.Address{}, fmt.Errorf("couldn't decode cid: %w", err)
}
addr.SetContainer(cnrID)
}
return addr, nil
}
func (c *Tree) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {