diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ba822f09..498328ceb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ This document outlines major changes between releases. - Support impersonate bearer token (#81) ### Changed +- Remove object from tree and reset its cache on object deletion when it is already removed from storage (#78) - Update prometheus to v1.15.0 (#94) - Update syncTree.sh due to recent renaming (#73) - Update neo-go to v0.101.0 (#14) diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go index 2153935e9..e9fb0e287 100644 --- a/api/handler/delete_test.go +++ b/api/handler/delete_test.go @@ -8,6 +8,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "github.com/stretchr/testify/require" ) @@ -15,6 +17,26 @@ const ( emptyVersion = "" ) +func TestDeleteBucketOnAlreadyRemovedError(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo := createTestBucket(hc, bktName) + + putObject(t, hc, bktName, objName) + + nodeVersion, err := hc.tree.GetUnversioned(hc.context, bktInfo, objName) + require.NoError(t, err) + var addr oid.Address + addr.SetContainer(bktInfo.CID) + addr.SetObject(nodeVersion.OID) + hc.tp.SetObjectError(addr, apistatus.ObjectAlreadyRemoved{}) + + deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) + + deleteBucket(t, hc, bktName, http.StatusNoContent) +} + func TestDeleteBucket(t *testing.T) { tc := prepareHandlerContext(t) @@ -358,6 +380,25 @@ func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version st return w.Header().Get(api.AmzVersionID), w.Header().Get(api.AmzDeleteMarker) != "" } +func deleteObjects(t *testing.T, tc *handlerContext, bktName string, objVersions [][2]string) *DeleteObjectsResponse { + req := &DeleteObjectsRequest{} + for _, version := range objVersions { + req.Objects = append(req.Objects, ObjectIdentifier{ + ObjectName: version[0], + VersionID: version[1], + }) + } + + w, r := prepareTestRequest(tc, bktName, "", req) + r.Header.Set(api.ContentMD5, "") + tc.Handler().DeleteMultipleObjectsHandler(w, r) + assertStatus(t, w, http.StatusOK) + + res := &DeleteObjectsResponse{} + parseTestResponse(t, w, res) + return res +} + func deleteBucket(t *testing.T, tc *handlerContext, bktName string, code int) { w, r := prepareTestRequest(tc, bktName, "", nil) tc.Handler().DeleteBucketHandler(w, r) diff --git a/api/layer/frostfs_mock.go b/api/layer/frostfs_mock.go index 809a7dff2..1b1c57c2b 100644 --- a/api/layer/frostfs_mock.go +++ b/api/layer/frostfs_mock.go @@ -29,6 +29,7 @@ type TestFrostFS struct { FrostFS objects map[string]*object.Object + objectErrors map[string]error containers map[string]*container.Container eaclTables map[string]*eacl.Table currentEpoch uint64 @@ -36,9 +37,10 @@ type TestFrostFS struct { func NewTestFrostFS() *TestFrostFS { return &TestFrostFS{ - objects: make(map[string]*object.Object), - containers: make(map[string]*container.Container), - eaclTables: make(map[string]*eacl.Table), + objects: make(map[string]*object.Object), + objectErrors: make(map[string]error), + containers: make(map[string]*container.Container), + eaclTables: make(map[string]*eacl.Table), } } @@ -46,6 +48,14 @@ func (t *TestFrostFS) CurrentEpoch() uint64 { return t.currentEpoch } +func (t *TestFrostFS) SetObjectError(addr oid.Address, err error) { + if err == nil { + delete(t.objectErrors, addr.EncodeToString()) + } else { + t.objectErrors[addr.EncodeToString()] = err + } +} + func (t *TestFrostFS) Objects() []*object.Object { res := make([]*object.Object, 0, len(t.objects)) @@ -153,6 +163,10 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec sAddr := addr.EncodeToString() + if err := t.objectErrors[sAddr]; err != nil { + return nil, err + } + if obj, ok := t.objects[sAddr]; ok { owner := getOwner(ctx) if !obj.OwnerID().Equals(owner) && !t.isPublicRead(prm.Container) { @@ -239,6 +253,10 @@ func (t *TestFrostFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) err addr.SetContainer(prm.Container) addr.SetObject(prm.Object) + if err := t.objectErrors[addr.EncodeToString()]; err != nil { + return err + } + if obj, ok := t.objects[addr.EncodeToString()]; ok { owner := getOwner(ctx) if !obj.OwnerID().Equals(owner) { diff --git a/api/layer/layer.go b/api/layer/layer.go index e66f36243..4ee38b50c 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -17,6 +17,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" @@ -553,7 +554,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings } if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { - return obj + return n.handleObjectDeleteErrors(ctx, bkt, obj, nodeVersion.ID) } obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID) @@ -583,7 +584,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings if nullVersionToDelete != nil { if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil { - return obj + return n.handleObjectDeleteErrors(ctx, bkt, obj, nullVersionToDelete.ID) } } } @@ -632,6 +633,22 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) return obj } +func (n *layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject { + if client.IsErrObjectAlreadyRemoved(obj.Error) { + n.log.Debug("object already removed", zap.String("bucket", bkt.Name), zap.Stringer("cid", bkt.CID), + zap.String("object", obj.Name), zap.String("oid", obj.VersionID)) + + obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID) + if obj.Error != nil { + return obj + } + + n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) + } + + return obj +} + func isNotFoundError(err error) bool { return errors.IsS3Error(err, errors.ErrNoSuchKey) || errors.IsS3Error(err, errors.ErrNoSuchVersion)