diff --git a/CHANGELOG.md b/CHANGELOG.md index a52ef9a..894273f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ This document outlines major changes between releases. - Clean up List and Name caches when object is missing in Tree service (#57) - Get empty bucket CORS from frostfs (TrueCloudLab#36) - Don't count pool error on client abort (#35) +- Don't create unnecessary delete-markers (#83) ### Added - Return `X-Owner-Id` in `head-bucket` response (#79) diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go index 7ee1318..2153935 100644 --- a/api/handler/delete_test.go +++ b/api/handler/delete_test.go @@ -147,6 +147,81 @@ func TestRemoveDeleteMarker(t *testing.T) { require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") } +func TestDeleteMarkerVersioned(t *testing.T) { + tc := prepareHandlerContext(t) + + bktName, objName := "bucket-for-removal", "object-to-delete" + createVersionedBucketAndObject(t, tc, bktName, objName) + + t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) { + deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + versions := listVersions(t, tc, bktName) + require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) + + _, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + versions = listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) + }) + + t.Run("do not create delete marker if object does not exist", func(t *testing.T) { + versionsBefore := listVersions(t, tc, bktName) + _, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion) + require.False(t, isDeleteMarker) + versionsAfter := listVersions(t, tc, bktName) + require.Equal(t, versionsBefore, versionsAfter) + }) +} + +func TestDeleteMarkerSuspended(t *testing.T) { + tc := prepareHandlerContext(t) + + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName) + putBucketVersioning(t, tc, bktName, false) + + t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) { + deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion) + + deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion) + + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) + }) + + t.Run("do not create delete marker if object does not exist", func(t *testing.T) { + versionsBefore := listVersions(t, tc, bktName) + _, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion) + require.False(t, isDeleteMarker) + versionsAfter := listVersions(t, tc, bktName) + require.Equal(t, versionsBefore, versionsAfter) + }) + + t.Run("remove last unversioned non delete marker", func(t *testing.T) { + objName := "obj3" + putObject(t, tc, bktName, objName) + + nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName) + require.NoError(t, err) + + deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion) + + objVersions := getVersion(listVersions(t, tc, bktName), objName) + require.Len(t, objVersions, 0) + + require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID)) + }) +} + func TestDeleteObjectCombined(t *testing.T) { tc := prepareHandlerContext(t) @@ -197,7 +272,7 @@ func TestDeleteMarkers(t *testing.T) { deleteObject(t, tc, bktName, objName, emptyVersion) versions := listVersions(t, tc, bktName) - require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length") + require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length") require.Len(t, versions.Version, 0, "versions must be empty") require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs") @@ -316,6 +391,16 @@ func listVersions(t *testing.T, tc *handlerContext, bktName string) *ListObjects return res } +func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVersionResponse { + var res []*ObjectVersionResponse + for i, version := range resp.Version { + if version.Key == objName { + res = append(res, &resp.Version[i]) + } + } + return res +} + func putObject(t *testing.T, tc *handlerContext, bktName, objName string) { body := bytes.NewReader([]byte("content")) w, r := prepareTestPayloadRequest(tc, bktName, objName, body) diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 8c1816f..4fb911b 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -33,6 +33,7 @@ type handlerContext struct { t *testing.T h *handler tp *layer.TestFrostFS + tree *tree.Tree context context.Context } @@ -85,11 +86,13 @@ func prepareHandlerContext(t *testing.T) *handlerContext { var owner user.ID user.IDFromKey(&owner, key.PrivateKey.PublicKey) + treeMock := NewTreeServiceMock(t) + layerCfg := &layer.Config{ Caches: layer.DefaultCachesConfigs(zap.NewExample()), AnonKey: layer.AnonymousKey{Key: key}, Resolver: testResolver, - TreeService: NewTreeServiceMock(t), + TreeService: treeMock, } var pp netmap.PlacementPolicy @@ -110,6 +113,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext { t: t, h: h, tp: tp, + tree: treeMock, context: context.WithValue(context.Background(), api.BoxData, newTestAccessBox(t, key)), } } diff --git a/api/layer/layer.go b/api/layer/layer.go index 165d159..72505d5 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -561,21 +561,38 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings return obj } - var newVersion *data.NodeVersion + lastVersion, err := n.getLastNodeVersion(ctx, bkt, obj) + if err != nil { + obj.Error = err + return n.handleNotFoundError(bkt, obj) + } if settings.VersioningSuspended() { obj.VersionID = data.UnversionedObjectVersionID - var nodeVersion *data.NodeVersion - if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { - return n.handleNotFoundError(bkt, obj) + var nullVersionToDelete *data.NodeVersion + if lastVersion.IsUnversioned { + if !lastVersion.IsDeleteMarker() { + nullVersionToDelete = lastVersion + } + } else if nullVersionToDelete, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { + if !isNotFoundError(obj.Error) { + return obj + } } - if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { - return obj + if nullVersionToDelete != nil { + if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nullVersionToDelete, obj); obj.Error != nil { + return obj + } } } + if lastVersion.IsDeleteMarker() { + obj.DeleteMarkVersion = lastVersion.OID.EncodeToString() + return obj + } + randOID, err := getRandomOID() if err != nil { obj.Error = fmt.Errorf("couldn't get random oid: %w", err) @@ -584,7 +601,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings obj.DeleteMarkVersion = randOID.EncodeToString() - newVersion = &data.NodeVersion{ + newVersion := &data.NodeVersion{ BaseNodeVersion: data.BaseNodeVersion{ OID: randOID, FilePath: obj.Name, @@ -606,8 +623,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings } func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject { - if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) || - errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) { + if isNotFoundError(obj.Error) { obj.Error = nil n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID) n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) @@ -616,6 +632,11 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject) return obj } +func isNotFoundError(err error) bool { + return errors.IsS3Error(err, errors.ErrNoSuchKey) || + errors.IsS3Error(err, errors.ErrNoSuchVersion) +} + func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) { objVersion := &ObjectVersion{ BktInfo: bkt, @@ -627,6 +648,17 @@ func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo return n.getNodeVersion(ctx, objVersion) } +func (n *layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) { + objVersion := &ObjectVersion{ + BktInfo: bkt, + ObjectName: obj.Name, + VersionID: "", + NoErrorOnDeleteMarker: true, + } + + return n.getNodeVersion(ctx, objVersion) +} + func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) { if nodeVersion.IsDeleteMarker() { return obj.VersionID, nil diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index dc27104..809a7df 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -15,6 +15,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/eacl" @@ -55,6 +56,16 @@ func (t *TestFrostFS) Objects() []*object.Object { return res } +func (t *TestFrostFS) ObjectExists(objID oid.ID) bool { + for _, obj := range t.objects { + if id, _ := obj.ID(); id.Equals(objID) { + return true + } + } + + return false +} + func (t *TestFrostFS) AddObject(key string, obj *object.Object) { t.objects[key] = obj } @@ -161,7 +172,7 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec }, nil } - return nil, fmt.Errorf("object not found %s", addr) + return nil, fmt.Errorf("%w: %s", apistatus.ObjectNotFound{}, addr) } func (t *TestFrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {