From 0f656b1471d6317a0a722471094ab360688e7814 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 16 Jan 2025 14:01:14 +0300 Subject: [PATCH] [#601] Use tombstone batching during delete-objects Signed-off-by: Denis Kirillov --- api/handler/delete.go | 3 + api/handler/delete_test.go | 893 +++++++++++++++++++++------ api/handler/multipart_upload_test.go | 86 ++- api/layer/delete_test.go | 80 +++ api/layer/frostfs_mock.go | 15 +- api/layer/layer.go | 221 ++++++- api/layer/tombstone.go | 87 +++ api/layer/tree_mock.go | 4 + api/layer/versioning_test.go | 13 +- 9 files changed, 1184 insertions(+), 218 deletions(-) create mode 100644 api/layer/delete_test.go diff --git a/api/handler/delete.go b/api/handler/delete.go index 5c08e9fc..26e69bf2 100644 --- a/api/handler/delete.go +++ b/api/handler/delete.go @@ -223,6 +223,9 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re } if deletedObj.DeleteMarkerVersionID != "" { deletedObj.DeleteMarker = true + if obj.VersionID != "" { + deletedObj.DeleteMarkerVersionID = obj.VersionID + } } response.DeletedObjects = append(response.DeletedObjects, deletedObj) } diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go index ed0c992b..be2dfd4d 100644 --- a/api/handler/delete_test.go +++ b/api/handler/delete_test.go @@ -3,6 +3,7 @@ package handler import ( "bytes" "encoding/xml" + "errors" "io" "net/http" "net/http/httptest" @@ -12,6 +13,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -154,310 +156,825 @@ func TestDeleteObjectsError(t *testing.T) { func TestDeleteObject(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) - checkFound(t, tc, bktName, objName, emptyVersion) - deleteObject(t, tc, bktName, objName, emptyVersion) - checkNotFound(t, tc, bktName, objName, emptyVersion) + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObject(t, tc, bktName, objName, emptyVersion) + checkNotFound(t, tc, bktName, objName, emptyVersion) - require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo)) + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo)) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) + + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + checkNotFound(t, tc, bktName, objName, emptyVersion) + + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo)) + }) } func TestDeleteObjectFromSuspended(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-versioned-for-removal", "object-to-delete" - createSuspendedBucket(t, tc, bktName) - putObject(tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-versioned-for-removal", "object-to-delete" - versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) - require.Equal(t, data.UnversionedObjectVersionID, versionID) + createSuspendedBucket(t, tc, bktName) + putObject(tc, bktName, objName) + + versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, versionID) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-versioned-for-removal-multi", "object-to-delete" + + createSuspendedBucket(t, tc, bktName) + putObject(tc, bktName, objName) + + res := deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.True(t, res.DeletedObjects[0].DeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, res.DeletedObjects[0].DeleteMarkerVersionID) + }) } func TestDeleteDeletedObject(t *testing.T) { tc := prepareHandlerContext(t) t.Run("unversioned bucket", func(t *testing.T) { - bktName, objName := "bucket-unversioned-removal", "object-to-delete" - createBucketAndObject(tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-unversioned-removal", "object-to-delete" + createBucketAndObject(tc, bktName, objName) - versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) - require.Empty(t, versionID) - require.False(t, isDeleteMarker) - versionID, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) - require.Empty(t, versionID) - require.False(t, isDeleteMarker) + versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.Empty(t, versionID) + require.False(t, isDeleteMarker) + versionID, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) + require.Empty(t, versionID) + require.False(t, isDeleteMarker) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-unversioned-removal-multi", "object-to-delete" + createBucketAndObject(tc, bktName, objName) + + res := deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.False(t, res.DeletedObjects[0].DeleteMarker) + require.Empty(t, res.DeletedObjects[0].VersionID) + res = deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.False(t, res.DeletedObjects[0].DeleteMarker) + require.Empty(t, res.DeletedObjects[0].VersionID) + }) }) t.Run("versioned bucket", func(t *testing.T) { - bktName, objName := "bucket-versioned-for-removal", "object-to-delete" - createVersionedBucketAndObject(t, tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-versioned-for-removal", "object-to-delete" + createVersionedBucketAndObject(t, tc, bktName, objName) - _, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) - _, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) + _, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + _, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-versioned-for-removal-multi", "object-to-delete" + createVersionedBucketAndObject(t, tc, bktName, objName) + + res := deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.True(t, res.DeletedObjects[0].DeleteMarker) + res = deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.True(t, res.DeletedObjects[0].DeleteMarker) + }) }) t.Run("versioned bucket not found obj", func(t *testing.T) { - bktName, objName := "bucket-versioned-for-removal-not-found", "object-to-delete" - _, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-versioned-for-removal-not-found", "object-to-delete" + _, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) - versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, objInfo.VersionID()) - require.False(t, isDeleteMarker) - require.Equal(t, objInfo.VersionID(), versionID) + versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, objInfo.VersionID()) + require.False(t, isDeleteMarker) + require.Equal(t, objInfo.VersionID(), versionID) - versionID2, isDeleteMarker := deleteObject(t, tc, bktName, objName, versionID) - require.False(t, isDeleteMarker) - require.Equal(t, objInfo.VersionID(), versionID2) + versionID2, isDeleteMarker := deleteObject(t, tc, bktName, objName, versionID) + require.False(t, isDeleteMarker) + require.Equal(t, objInfo.VersionID(), versionID2) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-versioned-for-removal-not-found-multi", "object-to-delete" + _, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) + + res := deleteObjects(t, tc, bktName, [][2]string{{objName, objInfo.VersionID()}}) + require.Len(t, res.DeletedObjects, 1) + require.False(t, res.DeletedObjects[0].DeleteMarker) + require.Equal(t, objInfo.VersionID(), res.DeletedObjects[0].VersionID) + + res = deleteObjects(t, tc, bktName, [][2]string{{objName, res.DeletedObjects[0].VersionID}}) + require.Len(t, res.DeletedObjects, 1) + require.False(t, res.DeletedObjects[0].DeleteMarker) + require.Equal(t, objInfo.VersionID(), res.DeletedObjects[0].VersionID) + }) }) } func TestDeleteObjectVersioned(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) - checkFound(t, tc, bktName, objName, emptyVersion) - deleteObject(t, tc, bktName, objName, emptyVersion) - checkNotFound(t, tc, bktName, objName, emptyVersion) + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObject(t, tc, bktName, objName, emptyVersion) + checkNotFound(t, tc, bktName, objName, emptyVersion) - checkFound(t, tc, bktName, objName, objInfo.VersionID()) - deleteObject(t, tc, bktName, objName, objInfo.VersionID()) - checkNotFound(t, tc, bktName, objName, objInfo.VersionID()) + checkFound(t, tc, bktName, objName, objInfo.VersionID()) + deleteObject(t, tc, bktName, objName, objInfo.VersionID()) + checkNotFound(t, tc, bktName, objName, objInfo.VersionID()) - require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't") + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't") + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) + + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + checkNotFound(t, tc, bktName, objName, emptyVersion) + + checkFound(t, tc, bktName, objName, objInfo.VersionID()) + deleteObjects(t, tc, bktName, [][2]string{{objName, objInfo.VersionID()}}) + checkNotFound(t, tc, bktName, objName, objInfo.VersionID()) + + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't") + }) } func TestDeleteObjectUnversioned(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal-unversioned", "object-to-delete-unversioned" - bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-unversioned", "object-to-delete-unversioned" + bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) - checkFound(t, tc, bktName, objName, emptyVersion) - deleteObject(t, tc, bktName, objName, emptyVersion) - checkNotFound(t, tc, bktName, objName, emptyVersion) + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObject(t, tc, bktName, objName, emptyVersion) + checkNotFound(t, tc, bktName, objName, emptyVersion) - versions := listVersions(t, tc, bktName) - require.Len(t, versions.DeleteMarker, 0, "delete markers must be empty") - require.Len(t, versions.Version, 0, "versions must be empty") + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 0, "delete markers must be empty") + require.Len(t, versions.Version, 0, "versions must be empty") - require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't") + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't") + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-unversioned-multi", "object-to-delete-unversioned" + bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) + + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + checkNotFound(t, tc, bktName, objName, emptyVersion) + + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 0, "delete markers must be empty") + require.Len(t, versions.Version, 0, "versions must be empty") + + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't") + }) } func TestRemoveDeleteMarker(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) - checkFound(t, tc, bktName, objName, emptyVersion) - deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) - checkNotFound(t, tc, bktName, objName, emptyVersion) + checkFound(t, tc, bktName, objName, emptyVersion) + deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + checkNotFound(t, tc, bktName, objName, emptyVersion) - checkFound(t, tc, bktName, objName, objInfo.VersionID()) - deleteObject(t, tc, bktName, objName, deleteMarkerVersion) - checkFound(t, tc, bktName, objName, emptyVersion) + checkFound(t, tc, bktName, objName, objInfo.VersionID()) + deleteObject(t, tc, bktName, objName, deleteMarkerVersion) + checkFound(t, tc, bktName, objName, emptyVersion) - require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") + require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) + + checkFound(t, tc, bktName, objName, emptyVersion) + res := deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.True(t, res.DeletedObjects[0].DeleteMarker) + checkNotFound(t, tc, bktName, objName, emptyVersion) + + checkFound(t, tc, bktName, objName, objInfo.VersionID()) + deleteObjects(t, tc, bktName, [][2]string{{objName, res.DeletedObjects[0].DeleteMarkerVersionID}}) + checkFound(t, tc, bktName, objName, emptyVersion) + + require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") + }) } func TestDeleteMarkerVersioned(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - createVersionedBucketAndObject(t, tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + createVersionedBucketAndObject(t, tc, bktName, objName) - t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) { - deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) - versions := listVersions(t, tc, bktName) - require.Len(t, versions.DeleteMarker, 1) - require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) + t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) { + deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) - _, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) - versions = listVersions(t, tc, bktName) - require.Len(t, versions.DeleteMarker, 1) - require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) + _, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + versions = listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) + }) + + t.Run("do not create delete marker if object does not exist", func(t *testing.T) { + versionsBefore := listVersions(t, tc, bktName) + _, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion) + require.False(t, isDeleteMarker) + versionsAfter := listVersions(t, tc, bktName) + require.Equal(t, versionsBefore, versionsAfter) + }) }) - t.Run("do not create delete marker if object does not exist", func(t *testing.T) { - versionsBefore := listVersions(t, tc, bktName) - _, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion) - require.False(t, isDeleteMarker) - versionsAfter := listVersions(t, tc, bktName) - require.Equal(t, versionsBefore, versionsAfter) + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + createVersionedBucketAndObject(t, tc, bktName, objName) + + t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) { + res1 := deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res1.DeletedObjects, 1) + require.True(t, res1.DeletedObjects[0].DeleteMarker) + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + require.Equal(t, res1.DeletedObjects[0].DeleteMarkerVersionID, versions.DeleteMarker[0].VersionID) + + res2 := deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res2.DeletedObjects, 1) + require.True(t, res2.DeletedObjects[0].DeleteMarker) + versions = listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + // we use delete marker from res1 + require.Equal(t, res1.DeletedObjects[0].DeleteMarkerVersionID, versions.DeleteMarker[0].VersionID) + }) + + t.Run("do not create delete marker if object does not exist", func(t *testing.T) { + versionsBefore := listVersions(t, tc, bktName) + res := deleteObjects(t, tc, bktName, [][2]string{{"dummy", emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.False(t, res.DeletedObjects[0].DeleteMarker) + versionsAfter := listVersions(t, tc, bktName) + require.Equal(t, versionsBefore, versionsAfter) + }) }) } func TestDeleteMarkerSuspended(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName) - putBucketVersioning(t, tc, bktName, false) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName) + putBucketVersioning(t, tc, bktName, false) - t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) { - deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) { + deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion) + + deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion) + + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) + }) + + t.Run("do not create delete marker if object does not exist", func(t *testing.T) { + versionsBefore := listVersions(t, tc, bktName) + _, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion) + require.False(t, isDeleteMarker) + versionsAfter := listVersions(t, tc, bktName) + require.Equal(t, versionsBefore, versionsAfter) + }) + + t.Run("remove last unversioned non delete marker", func(t *testing.T) { + objName := "obj3" + putObject(tc, bktName, objName) + + nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName) + require.NoError(t, err) + + deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion) + + objVersions := getVersion(listVersions(t, tc, bktName), objName) + require.Len(t, objVersions, 0) + + require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID)) + }) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo, _ := createVersionedBucketAndObject(t, tc, bktName, objName) + putBucketVersioning(t, tc, bktName, false) + + t.Run("not create new delete marker if last version is delete marker", func(t *testing.T) { + res := deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.True(t, res.DeletedObjects[0].DeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, res.DeletedObjects[0].DeleteMarkerVersionID) + + res = deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.True(t, res.DeletedObjects[0].DeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, res.DeletedObjects[0].DeleteMarkerVersionID) + + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + require.Equal(t, res.DeletedObjects[0].DeleteMarkerVersionID, versions.DeleteMarker[0].VersionID) + }) + + t.Run("do not create delete marker if object does not exist", func(t *testing.T) { + versionsBefore := listVersions(t, tc, bktName) + res := deleteObjects(t, tc, bktName, [][2]string{{"dummy", emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.False(t, res.DeletedObjects[0].DeleteMarker) + versionsAfter := listVersions(t, tc, bktName) + require.Equal(t, versionsBefore, versionsAfter) + }) + + t.Run("remove last unversioned non delete marker", func(t *testing.T) { + objName := "obj3" + putObject(tc, bktName, objName) + + nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName) + require.NoError(t, err) + + res := deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.True(t, res.DeletedObjects[0].DeleteMarker) + require.Equal(t, data.UnversionedObjectVersionID, res.DeletedObjects[0].DeleteMarkerVersionID) + + objVersions := getVersion(listVersions(t, tc, bktName), objName) + require.Len(t, objVersions, 0) + + require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID)) + }) + }) +} + +func TestDeleteMarkerSuspendedComplex(t *testing.T) { + hc := prepareHandlerContext(t) + + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + createBucket(hc, bktName) + + putObject(hc, bktName, objName) + putBucketVersioning(t, hc, bktName, true) + deleteObject(t, hc, bktName, objName, emptyVersion) + putBucketVersioning(t, hc, bktName, false) + + deleteMarkerVersion, isDeleteMarker := deleteObject(t, hc, bktName, objName, emptyVersion) require.True(t, isDeleteMarker) require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion) - deleteMarkerVersion, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) - require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion) - - versions := listVersions(t, tc, bktName) + versions := listVersions(t, hc, bktName) require.Len(t, versions.DeleteMarker, 1) - require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) }) - t.Run("do not create delete marker if object does not exist", func(t *testing.T) { - versionsBefore := listVersions(t, tc, bktName) - _, isDeleteMarker := deleteObject(t, tc, bktName, "dummy", emptyVersion) - require.False(t, isDeleteMarker) - versionsAfter := listVersions(t, tc, bktName) - require.Equal(t, versionsBefore, versionsAfter) - }) + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + createBucket(hc, bktName) - t.Run("remove last unversioned non delete marker", func(t *testing.T) { - objName := "obj3" - putObject(tc, bktName, objName) + putObject(hc, bktName, objName) + putBucketVersioning(t, hc, bktName, true) + deleteObject(t, hc, bktName, objName, emptyVersion) + putBucketVersioning(t, hc, bktName, false) - nodeVersion, err := tc.tree.GetUnversioned(tc.Context(), bktInfo, objName) - require.NoError(t, err) + deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) - deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) - require.Equal(t, data.UnversionedObjectVersionID, deleteMarkerVersion) - - objVersions := getVersion(listVersions(t, tc, bktName), objName) - require.Len(t, objVersions, 0) - - require.False(t, tc.MockedPool().ObjectExists(nodeVersion.OID)) + versions := listVersions(t, hc, bktName) + require.Len(t, versions.DeleteMarker, 1) }) } func TestDeleteObjectCombined(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) - putBucketVersioning(t, tc, bktName, true) + putBucketVersioning(t, tc, bktName, true) - checkFound(t, tc, bktName, objName, emptyVersion) - deleteObject(t, tc, bktName, objName, emptyVersion) - checkNotFound(t, tc, bktName, objName, emptyVersion) + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObject(t, tc, bktName, objName, emptyVersion) + checkNotFound(t, tc, bktName, objName, emptyVersion) - checkFound(t, tc, bktName, objName, objInfo.VersionID()) + checkFound(t, tc, bktName, objName, objInfo.VersionID()) - require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") + require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) + + putBucketVersioning(t, tc, bktName, true) + + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + checkNotFound(t, tc, bktName, objName, emptyVersion) + + checkFound(t, tc, bktName, objName, objInfo.VersionID()) + + require.True(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object doesn't exist but should") + }) +} + +func TestDeleteObjectCombinedMultipart(t *testing.T) { + hc := prepareHandlerContext(t) + + partSize := layer.UploadMinSize + objLen := 2 * partSize + + setObjectError := func(bktInfo *data.BucketInfo, versionID string) { + var objID oid.ID + require.NoError(t, objID.DecodeString(versionID)) + var addr oid.Address + addr.SetContainer(bktInfo.CID) + addr.SetObject(objID) + hc.tp.SetObjectError(addr, &apistatus.ObjectNotFound{}) + } + + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo := createBucket(hc, bktName) + putBucketVersioning(t, hc, bktName, true) + + multipartUpload(hc, bktName, objName, nil, objLen, partSize) + + resp := headObjectBase(hc, bktName, objName, emptyVersion) + require.Equal(t, http.StatusOK, resp.Code) + versionID := resp.Header().Get(api.AmzVersionID) + + setObjectError(bktInfo.BktInfo, versionID) + + deleteObject(t, hc, bktName, objName, versionID) + checkNotFound(t, hc, bktName, objName, versionID) + checkNotFound(t, hc, bktName, objName, emptyVersion) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo := createBucket(hc, bktName) + putBucketVersioning(t, hc, bktName, true) + + multipartUpload(hc, bktName, objName, nil, objLen, partSize) + + resp := headObjectBase(hc, bktName, objName, emptyVersion) + require.Equal(t, http.StatusOK, resp.Code) + versionID := resp.Header().Get(api.AmzVersionID) + + setObjectError(bktInfo.BktInfo, versionID) + + deleteObjects(t, hc, bktName, [][2]string{{objName, versionID}}) + checkNotFound(t, hc, bktName, objName, versionID) + checkNotFound(t, hc, bktName, objName, emptyVersion) + }) +} + +func TestDeleteObjectCombinedMultipartFailed(t *testing.T) { + hc := prepareHandlerContext(t) + + partSize := layer.UploadMinSize + objLen := 2 * partSize + + setObjectError := func(bktInfo *data.BucketInfo, versionID oid.ID) { + var addr oid.Address + addr.SetContainer(bktInfo.CID) + addr.SetObject(versionID) + hc.tp.SetObjectError(addr, errors.New("test error")) + } + + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo := createBucket(hc, bktName) + + multipartUpload(hc, bktName, objName, nil, objLen, partSize) + nodeVersion, err := hc.tree.GetUnversioned(hc.Context(), bktInfo.BktInfo, objName) + require.NoError(t, err) + versionID := nodeVersion.OID + + setObjectError(bktInfo.BktInfo, versionID) + + deleteObjectErr(hc, bktName, objName, emptyVersion, apierr.ErrInternalError) + + versions := listVersions(t, hc, bktName) + require.Len(t, versions.DeleteMarker, 0) + require.Len(t, versions.Version, 1) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo := createBucket(hc, bktName) + + multipartUpload(hc, bktName, objName, nil, objLen, partSize) + nodeVersion, err := hc.tree.GetUnversioned(hc.Context(), bktInfo.BktInfo, objName) + require.NoError(t, err) + versionID := nodeVersion.OID + + setObjectError(bktInfo.BktInfo, versionID) + + res := deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.Errors, 1) + require.Equal(t, "BadRequest", res.Errors[0].Code) + require.Len(t, res.DeletedObjects, 0) + + versions := listVersions(t, hc, bktName) + require.Len(t, versions.DeleteMarker, 0) + require.Len(t, versions.Version, 1) + }) +} + +func TestDeleteObjectCleanCache(t *testing.T) { + hc := prepareHandlerContext(t) + + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo := createBucket(hc, bktName) + putBucketVersioning(t, hc, bktName, true) + + versionID := putObject(hc, bktName, objName) + + var objID oid.ID + require.NoError(t, objID.DecodeString(versionID)) + var addr oid.Address + addr.SetContainer(bktInfo.BktInfo.CID) + addr.SetObject(objID) + + require.NotNil(t, hc.cache.GetObject(hc.owner, addr)) + deleteObject(t, hc, bktName, objName, versionID) + require.Nil(t, hc.cache.GetObject(hc.owner, addr)) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo := createBucket(hc, bktName) + putBucketVersioning(t, hc, bktName, true) + + versionID := putObject(hc, bktName, objName) + + var objID oid.ID + require.NoError(t, objID.DecodeString(versionID)) + var addr oid.Address + addr.SetContainer(bktInfo.BktInfo.CID) + addr.SetObject(objID) + + require.NotNil(t, hc.cache.GetObject(hc.owner, addr)) + deleteObjects(t, hc, bktName, [][2]string{{objName, versionID}}) + require.Nil(t, hc.cache.GetObject(hc.owner, addr)) + }) } func TestDeleteObjectSuspended(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) - putBucketVersioning(t, tc, bktName, true) + putBucketVersioning(t, tc, bktName, true) - checkFound(t, tc, bktName, objName, emptyVersion) - deleteObject(t, tc, bktName, objName, emptyVersion) - checkNotFound(t, tc, bktName, objName, emptyVersion) + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObject(t, tc, bktName, objName, emptyVersion) + checkNotFound(t, tc, bktName, objName, emptyVersion) - putBucketVersioning(t, tc, bktName, false) + putBucketVersioning(t, tc, bktName, false) - deleteObject(t, tc, bktName, objName, emptyVersion) - checkNotFound(t, tc, bktName, objName, objInfo.VersionID()) + deleteObject(t, tc, bktName, objName, emptyVersion) + checkNotFound(t, tc, bktName, objName, objInfo.VersionID()) - require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't") + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't") + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo, objInfo := createBucketAndObject(tc, bktName, objName) + + putBucketVersioning(t, tc, bktName, true) + + checkFound(t, tc, bktName, objName, emptyVersion) + deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + checkNotFound(t, tc, bktName, objName, emptyVersion) + + putBucketVersioning(t, tc, bktName, false) + + deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + checkNotFound(t, tc, bktName, objName, objInfo.VersionID()) + + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo), "object exists but shouldn't") + }) } func TestDeleteMarkers(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - createTestBucket(tc, bktName) - putBucketVersioning(t, tc, bktName, true) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + createTestBucket(tc, bktName) + putBucketVersioning(t, tc, bktName, true) - checkNotFound(t, tc, bktName, objName, emptyVersion) - deleteObject(t, tc, bktName, objName, emptyVersion) - deleteObject(t, tc, bktName, objName, emptyVersion) - deleteObject(t, tc, bktName, objName, emptyVersion) + checkNotFound(t, tc, bktName, objName, emptyVersion) + deleteObject(t, tc, bktName, objName, emptyVersion) + deleteObject(t, tc, bktName, objName, emptyVersion) + deleteObject(t, tc, bktName, objName, emptyVersion) - versions := listVersions(t, tc, bktName) - require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length") - require.Len(t, versions.Version, 0, "versions must be empty") + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length") + require.Len(t, versions.Version, 0, "versions must be empty") - require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs") + require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs") + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + createTestBucket(tc, bktName) + putBucketVersioning(t, tc, bktName, true) + + checkNotFound(t, tc, bktName, objName, emptyVersion) + deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 0, "invalid delete markers length") + require.Len(t, versions.Version, 0, "versions must be empty") + + require.Len(t, listOIDsFromMockedFrostFS(t, tc, bktName), 0, "shouldn't be any object in frostfs") + }) } func TestGetHeadDeleteMarker(t *testing.T) { hc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - createTestBucket(hc, bktName) - putBucketVersioning(t, hc, bktName, true) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + createTestBucket(hc, bktName) + putBucketVersioning(t, hc, bktName, true) - putObject(hc, bktName, objName) + putObject(hc, bktName, objName) - deleteMarkerVersionID, _ := deleteObject(t, hc, bktName, objName, emptyVersion) + deleteMarkerVersionID, _ := deleteObject(t, hc, bktName, objName, emptyVersion) - w := headObjectBase(hc, bktName, objName, deleteMarkerVersionID) - require.Equal(t, w.Code, http.StatusMethodNotAllowed) - require.Equal(t, w.Result().Header.Get(api.AmzDeleteMarker), "true") + w := headObjectBase(hc, bktName, objName, deleteMarkerVersionID) + require.Equal(t, w.Code, http.StatusMethodNotAllowed) + require.Equal(t, w.Result().Header.Get(api.AmzDeleteMarker), "true") - w, r := prepareTestRequest(hc, bktName, objName, nil) - hc.Handler().GetObjectHandler(w, r) - assertStatus(hc.t, w, http.StatusNotFound) - require.Equal(t, w.Result().Header.Get(api.AmzDeleteMarker), "true") + w, r := prepareTestRequest(hc, bktName, objName, nil) + hc.Handler().GetObjectHandler(w, r) + assertStatus(hc.t, w, http.StatusNotFound) + require.Equal(t, w.Result().Header.Get(api.AmzDeleteMarker), "true") + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + createTestBucket(hc, bktName) + putBucketVersioning(t, hc, bktName, true) + + putObject(hc, bktName, objName) + + res := deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.True(t, res.DeletedObjects[0].DeleteMarker) + + w := headObjectBase(hc, bktName, objName, res.DeletedObjects[0].DeleteMarkerVersionID) + require.Equal(t, w.Code, http.StatusMethodNotAllowed) + require.Equal(t, w.Result().Header.Get(api.AmzDeleteMarker), "true") + + w, r := prepareTestRequest(hc, bktName, objName, nil) + hc.Handler().GetObjectHandler(w, r) + assertStatus(hc.t, w, http.StatusNotFound) + require.Equal(t, w.Result().Header.Get(api.AmzDeleteMarker), "true") + }) } func TestDeleteObjectFromListCache(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) - versions := listObjectsV1(tc, bktName, "", "", "", -1) - require.Len(t, versions.Contents, 1) + versions := listObjectsV1(tc, bktName, "", "", "", -1) + require.Len(t, versions.Contents, 1) - checkFound(t, tc, bktName, objName, objInfo.VersionID()) - deleteObject(t, tc, bktName, objName, objInfo.VersionID()) - checkNotFound(t, tc, bktName, objName, objInfo.VersionID()) + checkFound(t, tc, bktName, objName, objInfo.VersionID()) + deleteObject(t, tc, bktName, objName, objInfo.VersionID()) + checkNotFound(t, tc, bktName, objName, objInfo.VersionID()) - // check cache is clean after object removal - versions = listObjectsV1(tc, bktName, "", "", "", -1) - require.Len(t, versions.Contents, 0) + // check cache is clean after object removal + versions = listObjectsV1(tc, bktName, "", "", "", -1) + require.Len(t, versions.Contents, 0) - require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo)) + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo)) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName) + + versions := listObjectsV1(tc, bktName, "", "", "", -1) + require.Len(t, versions.Contents, 1) + + checkFound(t, tc, bktName, objName, objInfo.VersionID()) + deleteObjects(t, tc, bktName, [][2]string{{objName, objInfo.VersionID()}}) + checkNotFound(t, tc, bktName, objName, objInfo.VersionID()) + + // check cache is clean after object removal + versions = listObjectsV1(tc, bktName, "", "", "", -1) + require.Len(t, versions.Contents, 0) + + require.False(t, existInMockedFrostFS(tc, bktInfo, objInfo)) + }) } func TestDeleteObjectCheckMarkerReturn(t *testing.T) { tc := prepareHandlerContext(t) - bktName, objName := "bucket-for-removal", "object-to-delete" - createVersionedBucketAndObject(t, tc, bktName, objName) + t.Run("single delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal", "object-to-delete" + createVersionedBucketAndObject(t, tc, bktName, objName) - deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) + deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) + require.True(t, isDeleteMarker) - versions := listVersions(t, tc, bktName) - require.Len(t, versions.DeleteMarker, 1) - require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) - deleteMarkerVersion2, isDeleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarkerVersion) - require.True(t, isDeleteMarker2) - versions = listVersions(t, tc, bktName) - require.Len(t, versions.DeleteMarker, 0) - require.Equal(t, deleteMarkerVersion, deleteMarkerVersion2) + deleteMarkerVersion2, isDeleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarkerVersion) + require.True(t, isDeleteMarker2) + versions = listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 0) + require.Equal(t, deleteMarkerVersion, deleteMarkerVersion2) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, objName := "bucket-for-removal-multi", "object-to-delete" + createVersionedBucketAndObject(t, tc, bktName, objName) + + res := deleteObjects(t, tc, bktName, [][2]string{{objName, emptyVersion}}) + require.Len(t, res.DeletedObjects, 1) + require.True(t, res.DeletedObjects[0].DeleteMarker) + + versions := listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 1) + require.Equal(t, res.DeletedObjects[0].DeleteMarkerVersionID, versions.DeleteMarker[0].VersionID) + + res2 := deleteObjects(t, tc, bktName, [][2]string{{objName, res.DeletedObjects[0].DeleteMarkerVersionID}}) + require.Len(t, res2.DeletedObjects, 1) + require.True(t, res2.DeletedObjects[0].DeleteMarker) + + versions = listVersions(t, tc, bktName) + require.Len(t, versions.DeleteMarker, 0) + require.Equal(t, res.DeletedObjects[0].DeleteMarkerVersionID, res2.DeletedObjects[0].DeleteMarkerVersionID) + }) } func TestDeleteBucketByNotOwner(t *testing.T) { @@ -514,16 +1031,27 @@ func getBucketVersioning(hc *handlerContext, bktName string) *VersioningConfigur } func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version string) (string, bool) { - query := make(url.Values) - query.Add(api.QueryVersionID, version) - - w, r := prepareTestFullRequest(tc, bktName, objName, query, nil) - tc.Handler().DeleteObjectHandler(w, r) + w := deleteObjectBase(tc, bktName, objName, version) assertStatus(t, w, http.StatusNoContent) return w.Header().Get(api.AmzVersionID), w.Header().Get(api.AmzDeleteMarker) != "" } +func deleteObjectErr(hc *handlerContext, bktName, objName, version string, code apierr.ErrorCode) { + w := deleteObjectBase(hc, bktName, objName, version) + assertS3Error(hc.t, w, apierr.GetAPIError(code)) +} + +func deleteObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder { + query := make(url.Values) + query.Add(api.QueryVersionID, version) + + w, r := prepareTestFullRequest(hc, bktName, objName, query, nil) + hc.Handler().DeleteObjectHandler(w, r) + + return w +} + func deleteObjects(t *testing.T, tc *handlerContext, bktName string, objVersions [][2]string) *DeleteObjectsResponse { w := deleteObjectsBase(tc, bktName, objVersions) @@ -611,11 +1139,12 @@ func getVersion(resp *ListObjectsVersionsResponse, objName string) []*ObjectVers return res } -func putObject(hc *handlerContext, bktName, objName string) { +func putObject(hc *handlerContext, bktName, objName string) string { body := bytes.NewReader([]byte("content")) w, r := prepareTestPayloadRequest(hc, bktName, objName, body) hc.Handler().PutObjectHandler(w, r) assertStatus(hc.t, w, http.StatusOK) + return w.Header().Get(api.AmzVersionID) } func createSuspendedBucket(t *testing.T, tc *handlerContext, bktName string) *data.BucketInfo { diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index 576a4e7c..7edfee0a 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -50,35 +50,69 @@ func TestDeleteMultipartAllParts(t *testing.T) { partSize := layer.UploadMinSize objLen := 6 * partSize - bktName, bktName2, objName := "bucket", "bucket2", "object" + t.Run("single delete", func(t *testing.T) { + bktName, bktName2, objName := "bucket", "bucket2", "object" - // unversioned bucket - createTestBucket(hc, bktName) - multipartUpload(hc, bktName, objName, nil, objLen, partSize) - hc.tp.ClearTombstoneOIDCount() - deleteObject(t, hc, bktName, objName, emptyVersion) - require.Empty(t, hc.tp.Objects()) - require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount()) + // unversioned bucket + createTestBucket(hc, bktName) + multipartUpload(hc, bktName, objName, nil, objLen, partSize) + hc.tp.ClearTombstoneOIDCount() + deleteObject(t, hc, bktName, objName, emptyVersion) + require.Empty(t, hc.tp.Objects()) + require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount()) - // encrypted multipart - multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize) - hc.tp.ClearTombstoneOIDCount() - deleteObject(t, hc, bktName, objName, emptyVersion) - require.Empty(t, hc.tp.Objects()) - require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount()) + // encrypted multipart + multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize) + hc.tp.ClearTombstoneOIDCount() + deleteObject(t, hc, bktName, objName, emptyVersion) + require.Empty(t, hc.tp.Objects()) + require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount()) - // versions bucket - createTestBucket(hc, bktName2) - putBucketVersioning(t, hc, bktName2, true) - multipartUpload(hc, bktName2, objName, nil, objLen, partSize) - _, hdr := getObject(hc, bktName2, objName) - versionID := hdr.Get("X-Amz-Version-Id") - hc.tp.ClearTombstoneOIDCount() - deleteObject(t, hc, bktName2, objName, emptyVersion) - require.Equal(t, 0, hc.tp.TombstoneOIDCount()) - deleteObject(t, hc, bktName2, objName, versionID) - require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount()) - require.Empty(t, hc.tp.Objects()) + // versions bucket + createTestBucket(hc, bktName2) + putBucketVersioning(t, hc, bktName2, true) + multipartUpload(hc, bktName2, objName, nil, objLen, partSize) + _, hdr := getObject(hc, bktName2, objName) + versionID := hdr.Get("X-Amz-Version-Id") + hc.tp.ClearTombstoneOIDCount() + deleteObject(t, hc, bktName2, objName, emptyVersion) + require.Equal(t, 0, hc.tp.TombstoneOIDCount()) + deleteObject(t, hc, bktName2, objName, versionID) + require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount()) + require.Empty(t, hc.tp.Objects()) + }) + + t.Run("multi delete", func(t *testing.T) { + bktName, bktName2, objName := "bucket-multi", "bucket2-multi", "object" + + // unversioned bucket + createTestBucket(hc, bktName) + multipartUpload(hc, bktName, objName, nil, objLen, partSize) + hc.tp.ClearTombstoneOIDCount() + deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) + require.Empty(t, hc.tp.Objects()) + require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount()) + + // encrypted multipart + multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize) + hc.tp.ClearTombstoneOIDCount() + deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}}) + require.Empty(t, hc.tp.Objects()) + require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount()) + + // versions bucket + createTestBucket(hc, bktName2) + putBucketVersioning(t, hc, bktName2, true) + multipartUpload(hc, bktName2, objName, nil, objLen, partSize) + _, hdr := getObject(hc, bktName2, objName) + versionID := hdr.Get("X-Amz-Version-Id") + hc.tp.ClearTombstoneOIDCount() + deleteObjects(t, hc, bktName2, [][2]string{{objName, emptyVersion}}) + require.Equal(t, 0, hc.tp.TombstoneOIDCount()) + deleteObjects(t, hc, bktName2, [][2]string{{objName, versionID}}) + require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount()) + require.Empty(t, hc.tp.Objects()) + }) } func TestMultipartCopiesNumber(t *testing.T) { diff --git a/api/layer/delete_test.go b/api/layer/delete_test.go new file mode 100644 index 00000000..40840fcf --- /dev/null +++ b/api/layer/delete_test.go @@ -0,0 +1,80 @@ +package layer + +import ( + "errors" + "testing" + + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" + "github.com/stretchr/testify/require" +) + +func TestDelete(t *testing.T) { + t.Run("single delete", func(t *testing.T) { + tc := prepareContext(t) + netInfo := netmap.NetworkInfo{} + settings := &data.BucketSettings{Versioning: data.VersioningSuspended} + + obj1 := tc.putObjectNamed(tc.bktInfo, "obj1", []byte("obj1")) + obj2 := tc.putObjectNamed(tc.bktInfo, "obj2", []byte("obj2")) + + cnrID := tc.bktInfo.CID.EncodeToString() + + tmock := tc.layer.treeService.(*TreeServiceMock) + list := tmock.versions[cnrID][obj1.Name] + tmock.versions[cnrID][obj1.Name] = append(list, tmock.versions[cnrID][obj2.Name]...) + + tc.testFrostFS.SetObjectError(obj2.Address(), errors.New("obj error")) + + prm := &DeleteObjectParams{ + BktInfo: tc.bktInfo, + Objects: []*VersionedObject{{Name: obj1.Name}}, + Settings: settings, + NetworkInfo: netInfo, + IsMultiple: false, + } + + vos := tc.layer.DeleteObjects(tc.ctx, prm) + require.Len(t, vos, 1) + require.Error(t, vos[0].Error) + + list = tmock.versions[cnrID][obj1.Name] + require.Len(t, list, 1) + require.False(t, list[0].IsDeleteMarker) + require.Equal(t, obj2.Name, list[0].FilePath) + }) + + t.Run("multiple delete", func(t *testing.T) { + tc := prepareContext(t) + netInfo := netmap.NetworkInfo{} + settings := &data.BucketSettings{Versioning: data.VersioningSuspended} + + obj3 := tc.putObjectNamed(tc.bktInfo, "obj3", []byte("obj3")) + obj4 := tc.putObjectNamed(tc.bktInfo, "obj4", []byte("obj4")) + + cnrID := tc.bktInfo.CID.EncodeToString() + + tmock := tc.layer.treeService.(*TreeServiceMock) + list := tmock.versions[cnrID][obj3.Name] + tmock.versions[cnrID][obj3.Name] = append(list, tmock.versions[cnrID][obj4.Name]...) + + tc.testFrostFS.SetObjectError(obj4.Address(), errors.New("obj error")) + + prm := &DeleteObjectParams{ + BktInfo: tc.bktInfo, + Objects: []*VersionedObject{{Name: obj3.Name}}, + Settings: settings, + NetworkInfo: netInfo, + IsMultiple: true, + } + + vos := tc.layer.DeleteObjects(tc.ctx, prm) + require.Len(t, vos, 1) + require.Error(t, vos[0].Error) + + list = tmock.versions[cnrID][obj3.Name] + require.Len(t, list, 1) + require.False(t, list[0].IsDeleteMarker) + require.Equal(t, obj4.Name, list[0].FilePath) + }) +} diff --git a/api/layer/frostfs_mock.go b/api/layer/frostfs_mock.go index c18268fc..193e0452 100644 --- a/api/layer/frostfs_mock.go +++ b/api/layer/frostfs_mock.go @@ -524,7 +524,9 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatc } func (t *TestFrostFS) Relations() relations.Relations { - return &RelationsMock{} + return &RelationsMock{ + objectErrors: t.objectErrors, + } } func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error { @@ -568,9 +570,16 @@ func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool { return false } -type RelationsMock struct{} +type RelationsMock struct { + objectErrors map[string]error +} + +func (r *RelationsMock) GetSplitInfo(_ context.Context, cnrID cid.ID, objID oid.ID, _ relations.Tokens) (*object.SplitInfo, error) { + addr := newAddress(cnrID, objID) + if err := r.objectErrors[addr.EncodeToString()]; err != nil { + return nil, err + } -func (r *RelationsMock) GetSplitInfo(context.Context, cid.ID, oid.ID, relations.Tokens) (*object.SplitInfo, error) { return nil, relations.ErrNoSplitInfo } diff --git a/api/layer/layer.go b/api/layer/layer.go index 2dc2f993..e9e5f21b 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -13,6 +13,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" @@ -29,6 +30,7 @@ import ( cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" @@ -844,14 +846,223 @@ func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*Ver ctx, span := tracing.StartSpanFromContext(ctx, "layer.DeleteObjects") defer span.End() - for i, obj := range p.Objects { - p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo) - if p.IsMultiple && p.Objects[i].Error != nil { - n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", obj.String()), zap.Error(p.Objects[i].Error), logs.TagField(logs.TagExternalStorage)) + if !p.IsMultiple { + for i, obj := range p.Objects { + p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo) } + return p.Objects } - return p.Objects + maxLifetime := n.features.TombstoneLifetime() + inputCh := make(chan tombstoneData, len(p.Objects)) + outputCh := n.submitPutTombstoneMultipleDelete(ctx, p.BktInfo, p.NetworkInfo.CurrentEpoch()+maxLifetime, inputCh) + + var wg sync.WaitGroup + wg.Add(1) + + resObjects := make([]*VersionedObject, 0, len(p.Objects)) + + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case r, ok := <-outputCh: + if !ok { + return + } + + if r.err != nil && !client.IsErrObjectAlreadyRemoved(r.err) && !client.IsErrObjectNotFound(r.err) { + n.reqLogger(ctx).Error(logs.FailedToPutTombstones, zap.Error(r.err), logs.TagField(logs.TagExternalStorage)) + + for _, obj := range r.objs { + obj.obj.Error = r.err + resObjects = append(resObjects, obj.obj) + } + + continue + } + + LOOP: + for _, obj := range r.objs { + for _, node := range obj.nodes { + if err := n.treeService.RemoveVersion(ctx, p.BktInfo, node.ID); err != nil { + n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", node.OID.EncodeToString()), + zap.Error(err), logs.TagField(logs.TagExternalStorage)) + obj.obj.Error = r.err + resObjects = append(resObjects, obj.obj) + continue LOOP + } + if !node.IsDeleteMarker { + n.cache.DeleteObject(newAddress(p.BktInfo.CID, node.OID)) + } + } + + if obj.needDeleteMarker && obj.obj.Error == nil { + obj.obj = n.createDeleteMarker(ctx, p, obj.obj) + } + + resObjects = append(resObjects, obj.obj) + } + } + } + }() + + tokens := prepareTokensParameter(ctx, p.BktInfo.Owner) + for _, obj := range p.Objects { + n.deleteObjectUsingBatchTombstone(ctx, p, obj, tokens, inputCh) + } + + close(inputCh) + wg.Wait() + + return resObjects +} + +// deleteObjectUsingBatchTombstone schedule object removing. +// Method logic structure is similar to Layer.deleteObject. +func (n *Layer) deleteObjectUsingBatchTombstone(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject, tokens relations.Tokens, inputCh chan<- tombstoneData) { + if len(obj.VersionID) != 0 || p.Settings.Unversioned() { + var nodeVersions []*data.NodeVersion + if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, p.BktInfo, obj); obj.Error != nil { + inputCh <- tombstoneData{obj: n.handleNotFoundError(p.BktInfo, obj)} + return + } + + for _, nodeVersion := range nodeVersions { + if nodeVersion.IsDeleteMarker { + obj.DeleteMarkVersion = obj.VersionID + } + } + if !n.removeOldVersionUsingBatchTombstone(ctx, p.BktInfo, obj, nodeVersions, p.NetworkInfo, tokens, inputCh, false) { + return + } + + n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name) + return + } + + lastVersion, err := n.getLastNodeVersion(ctx, p.BktInfo, obj) + if err != nil { + obj.Error = err + inputCh <- tombstoneData{obj: n.handleNotFoundError(p.BktInfo, obj)} + return + } + + if p.Settings.VersioningSuspended() { + obj.VersionID = data.UnversionedObjectVersionID + + var nodeVersions []*data.NodeVersion + if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, p.BktInfo, obj); obj.Error != nil { + if !isNotFoundError(obj.Error) { + inputCh <- tombstoneData{obj: obj} + return + } + obj.Error = nil + } + + for i, nodeVersion := range nodeVersions { + if nodeVersion.ID == lastVersion.ID && nodeVersion.IsDeleteMarker { + nodeVersions = append(nodeVersions[:i], nodeVersions[i+1:]...) + break + } + } + + if lastVersion.IsDeleteMarker { + obj.DeleteMarkVersion = lastVersion.OID.EncodeToString() + } + + if !n.removeOldVersionUsingBatchTombstone(ctx, p.BktInfo, obj, nodeVersions, p.NetworkInfo, tokens, inputCh, !lastVersion.IsDeleteMarker) { + return + } + + n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name) + return + } + + if lastVersion.IsDeleteMarker { + obj.DeleteMarkVersion = lastVersion.OID.EncodeToString() + inputCh <- tombstoneData{obj: obj} + return + } + + inputCh <- tombstoneData{obj: n.createDeleteMarker(ctx, p, obj)} + n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name) +} + +func (n *Layer) createDeleteMarker(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject) *VersionedObject { + randOID, err := getRandomOID() + if err != nil { + obj.Error = fmt.Errorf("couldn't get random oid: %w", err) + return obj + } + + obj.DeleteMarkVersion = randOID.EncodeToString() + now := TimeNow(ctx) + newVersion := &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + OID: randOID, + FilePath: obj.Name, + Created: &now, + Owner: &n.gateOwner, + IsDeleteMarker: true, + CreationEpoch: p.NetworkInfo.CurrentEpoch(), + }, + IsUnversioned: p.Settings.VersioningSuspended(), + } + + _, obj.Error = n.treeService.AddVersion(ctx, p.BktInfo, newVersion) + return obj +} + +func (n *Layer) removeOldVersionUsingBatchTombstone(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeVersions []*data.NodeVersion, networkInfo netmap.NetworkInfo, tokens relations.Tokens, in chan<- tombstoneData, needDeleteMarker bool) bool { + td := tombstoneData{ + needDeleteMarker: needDeleteMarker, + obj: obj, + nodes: make([]*data.NodeVersion, 0, len(nodeVersions)), + members: make([]oid.ID, 0, len(nodeVersions)), + } + + for _, nodeVersion := range nodeVersions { + if nodeVersion.IsDeleteMarker { + td.nodes = append(td.nodes, nodeVersion) + continue + } + + if nodeVersion.IsCombined { + err := n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo) + if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) { + obj.Error = err + in <- td + return false + } + + td.nodes = append(td.nodes, nodeVersion) + continue + } + + oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens) + if err != nil { + n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), + zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err)) + + if client.IsErrObjectAlreadyRemoved(err) || client.IsErrObjectNotFound(err) { + td.nodes = append(td.nodes, nodeVersion) + continue + } + + obj.Error = err + in <- td + return false + } + + td.nodes = append(td.nodes, nodeVersion) + td.members = append(td.members, append(oids, nodeVersion.OID)...) + } + + in <- td + return true } func (n *Layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) { diff --git a/api/layer/tombstone.go b/api/layer/tombstone.go index 5aac279d..dfcfeda4 100644 --- a/api/layer/tombstone.go +++ b/api/layer/tombstone.go @@ -76,6 +76,93 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me } } +type tombstoneData struct { + needDeleteMarker bool + obj *VersionedObject + nodes []*data.NodeVersion + members []oid.ID +} + +type tombstoneResp struct { + objs []tombstoneRespObj + err error +} + +type tombstoneRespObj struct { + needDeleteMarker bool + obj *VersionedObject + nodes []*data.NodeVersion +} + +func (n *Layer) submitPutTombstoneMultipleDelete(ctx context.Context, bkt *data.BucketInfo, expEpoch uint64, ch <-chan tombstoneData) <-chan tombstoneResp { + res := make(chan tombstoneResp, cap(ch)) + maxMembers := n.features.TombstoneMembersSize() + + go func() { + var wg sync.WaitGroup + defer func() { + wg.Wait() + close(res) + }() + + var tr tombstoneResp + var members []oid.ID + + for { + select { + case <-ctx.Done(): + return + case td, ok := <-ch: + if !ok { + if len(members) != 0 { + n.submitPutTombstoneBatch(ctx, bkt, expEpoch, res, &wg, tr, members) + } + return + } + + if len(td.members) == 0 { + res <- tombstoneResp{objs: []tombstoneRespObj{{needDeleteMarker: td.needDeleteMarker, obj: td.obj, nodes: td.nodes}}, err: td.obj.Error} + continue + } + + members = append(members, td.members...) + tr.objs = append(tr.objs, tombstoneRespObj{needDeleteMarker: td.needDeleteMarker, obj: td.obj, nodes: td.nodes}) + + if len(members) > maxMembers { + n.submitPutTombstoneBatch(ctx, bkt, expEpoch, res, &wg, tr, members) + tr = tombstoneResp{} + members = nil + } + } + } + }() + + return res +} + +func (n *Layer) submitPutTombstoneBatch(ctx context.Context, bkt *data.BucketInfo, expEpoch uint64, res chan<- tombstoneResp, wg *sync.WaitGroup, tr tombstoneResp, members []oid.ID) { + tomb := object.NewTombstone() + tomb.SetExpirationEpoch(expEpoch) + tomb.SetMembers(members) + + wg.Add(1) + err := n.workerPool.Submit(func() { + defer wg.Done() + + if tr.err = n.putTombstoneObject(ctx, tomb, bkt); tr.err != nil { + n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(tr.err)) + } + res <- tr + }) + + if err != nil { + wg.Done() + n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) + tr.err = fmt.Errorf("submit task to pool: %w", err) + res <- tr + } +} + func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error { payload, err := tomb.Marshal() if err != nil { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 577f4068..ff33f41a 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math/rand" "sort" "strings" "time" @@ -248,6 +249,7 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo versions, ok := cnrVersionsMap[newVersion.FilePath] if !ok { cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion} + newVersion.ID = rand.Uint64() return newVersion.ID, nil } @@ -258,6 +260,8 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo if len(versions) != 0 { newVersion.ID = versions[len(versions)-1].ID + 1 newVersion.Timestamp = versions[len(versions)-1].Timestamp + 1 + } else { + newVersion.ID = rand.Uint64() } result := versions diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index 51d35cd6..56eaf133 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -17,14 +17,19 @@ import ( oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" ) func (tc *testContext) putObject(content []byte) *data.ObjectInfo { + return tc.putObjectNamed(tc.bktInfo, tc.obj, content) +} + +func (tc *testContext) putObjectNamed(bktInfo *data.BucketInfo, objName string, content []byte) *data.ObjectInfo { extObjInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{ - BktInfo: tc.bktInfo, - Object: tc.obj, + BktInfo: bktInfo, + Object: objName, Size: ptr(uint64(len(content))), Reader: bytes.NewReader(content), Header: make(map[string]string), @@ -170,12 +175,16 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { var owner user.ID user.IDFromKey(&owner, key.PrivateKey.PublicKey) + antPool, err := ants.NewPool(10) + require.NoError(t, err) + layerCfg := &Config{ Cache: NewCache(config), AnonKey: AnonymousKey{Key: key}, TreeService: NewTreeService(), Features: &FeatureSettingsMock{}, GateOwner: owner, + WorkerPool: antPool, } return &testContext{