From 1fd943ee88af166ed9233216151ea3e13f72c943 Mon Sep 17 00:00:00 2001
From: Denis Kirillov <denis@nspcc.ru>
Date: Mon, 25 Jul 2022 16:00:35 +0300
Subject: [PATCH] [#610] Fix deleted object removal

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
---
 api/handler/delete.go      | 16 ++++---
 api/handler/delete_test.go | 91 ++++++++++++++++++++++++++++++++++----
 api/layer/layer.go         | 19 +++++---
 3 files changed, 103 insertions(+), 23 deletions(-)

diff --git a/api/handler/delete.go b/api/handler/delete.go
index 9ddbcc465..0711462c3 100644
--- a/api/handler/delete.go
+++ b/api/handler/delete.go
@@ -87,13 +87,10 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
 	if deletedObject.Error != nil {
 		if isErrObjectLocked(deletedObject.Error) {
 			h.logAndSendError(w, "object is locked", reqInfo, errors.GetAPIError(errors.ErrAccessDenied))
-			return
+		} else {
+			h.logAndSendError(w, "could not delete object", reqInfo, deletedObject.Error)
 		}
-		h.log.Error("could not delete object",
-			zap.String("request_id", reqInfo.RequestID),
-			zap.String("bucket_name", reqInfo.BucketName),
-			zap.String("object_name", reqInfo.ObjectName),
-			zap.Error(deletedObject.Error))
+		return
 	}
 
 	var m *SendNotificationParams
@@ -131,9 +128,14 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
 		h.log.Error("couldn't send notification: %w", zap.Error(err))
 	}
 
+	if deletedObject.VersionID != "" {
+		w.Header().Set(api.AmzVersionID, deletedObject.VersionID)
+	}
 	if deletedObject.DeleteMarkVersion != "" {
 		w.Header().Set(api.AmzDeleteMarker, strconv.FormatBool(true))
-		w.Header().Set(api.AmzVersionID, deletedObject.DeleteMarkVersion)
+		if deletedObject.VersionID == "" {
+			w.Header().Set(api.AmzVersionID, deletedObject.DeleteMarkVersion)
+		}
 	}
 
 	w.WriteHeader(http.StatusNoContent)
diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go
index cd39d9755..ed6731da4 100644
--- a/api/handler/delete_test.go
+++ b/api/handler/delete_test.go
@@ -1,12 +1,14 @@
 package handler
 
 import (
+	"bytes"
 	"net/http"
 	"net/url"
 	"testing"
 
 	"github.com/nspcc-dev/neofs-s3-gw/api"
 	"github.com/nspcc-dev/neofs-s3-gw/api/data"
+	"github.com/nspcc-dev/neofs-s3-gw/api/layer"
 	"github.com/stretchr/testify/require"
 )
 
@@ -20,11 +22,13 @@ func TestDeleteBucket(t *testing.T) {
 	bktName, objName := "bucket-for-removal", "object-to-delete"
 	_, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
 
-	deleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
+	deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
+	require.True(t, isDeleteMarker)
+
 	deleteBucket(t, tc, bktName, http.StatusConflict)
 	deleteObject(t, tc, bktName, objName, objInfo.Version())
 	deleteBucket(t, tc, bktName, http.StatusConflict)
-	deleteObject(t, tc, bktName, objName, deleteMarker)
+	deleteObject(t, tc, bktName, objName, deleteMarkerVersion)
 	deleteBucket(t, tc, bktName, http.StatusNoContent)
 }
 
@@ -41,6 +45,57 @@ func TestDeleteObject(t *testing.T) {
 	require.False(t, existInMockedNeoFS(tc, bktInfo, objInfo))
 }
 
+func TestDeleteObjectFromSuspended(t *testing.T) {
+	tc := prepareHandlerContext(t)
+	bktName, objName := "bucket-versioned-for-removal", "object-to-delete"
+
+	createSuspendedBucket(t, tc, bktName)
+	putObject(t, tc, bktName, objName)
+
+	versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
+	require.True(t, isDeleteMarker)
+	require.Equal(t, layer.UnversionedObjectVersionID, versionID)
+}
+
+func TestDeleteDeletedObject(t *testing.T) {
+	tc := prepareHandlerContext(t)
+
+	t.Run("unversioned bucket", func(t *testing.T) {
+		bktName, objName := "bucket-unversioned-removal", "object-to-delete"
+		createBucketAndObject(t, tc, bktName, objName)
+
+		versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
+		require.Empty(t, versionID)
+		require.False(t, isDeleteMarker)
+		versionID, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
+		require.Empty(t, versionID)
+		require.False(t, isDeleteMarker)
+	})
+
+	t.Run("versioned bucket", func(t *testing.T) {
+		bktName, objName := "bucket-versioned-for-removal", "object-to-delete"
+		createVersionedBucketAndObject(t, tc, bktName, objName)
+
+		_, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
+		require.True(t, isDeleteMarker)
+		_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
+		require.True(t, isDeleteMarker)
+	})
+
+	t.Run("versioned bucket not found obj", func(t *testing.T) {
+		bktName, objName := "bucket-versioned-for-removal", "object-to-delete"
+		_, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
+
+		versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, objInfo.Version())
+		require.False(t, isDeleteMarker)
+		require.Equal(t, objInfo.Version(), versionID)
+
+		versionID2, isDeleteMarker := deleteObject(t, tc, bktName, objName, versionID)
+		require.False(t, isDeleteMarker)
+		require.Equal(t, objInfo.Version(), versionID2)
+	})
+}
+
 func TestDeleteObjectVersioned(t *testing.T) {
 	tc := prepareHandlerContext(t)
 
@@ -82,7 +137,8 @@ func TestRemoveDeleteMarker(t *testing.T) {
 	bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
 
 	checkFound(t, tc, bktName, objName, emptyVersion)
-	deleteMarkerVersion := deleteObject(t, tc, bktName, objName, emptyVersion)
+	deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
+	require.True(t, isDeleteMarker)
 	checkNotFound(t, tc, bktName, objName, emptyVersion)
 
 	checkFound(t, tc, bktName, objName, objInfo.Version())
@@ -174,16 +230,18 @@ func TestDeleteObjectCheckMarkerReturn(t *testing.T) {
 	bktName, objName := "bucket-for-removal", "object-to-delete"
 	createVersionedBucketAndObject(t, tc, bktName, objName)
 
-	deleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
+	deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
+	require.True(t, isDeleteMarker)
 
 	versions := listVersions(t, tc, bktName)
 	require.Len(t, versions.DeleteMarker, 1)
-	require.Equal(t, deleteMarker, versions.DeleteMarker[0].VersionID)
+	require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
 
-	deleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarker)
+	deleteMarkerVersion2, isDeleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarkerVersion)
+	require.True(t, isDeleteMarker2)
 	versions = listVersions(t, tc, bktName)
 	require.Len(t, versions.DeleteMarker, 0)
-	require.Equal(t, deleteMarker, deleteMarker2)
+	require.Equal(t, deleteMarkerVersion, deleteMarkerVersion2)
 }
 
 func createBucketAndObject(t *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
@@ -217,7 +275,7 @@ func putBucketVersioning(t *testing.T, tc *handlerContext, bktName string, enabl
 	assertStatus(t, w, http.StatusOK)
 }
 
-func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version string) string {
+func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version string) (string, bool) {
 	query := make(url.Values)
 	query.Add(api.QueryVersionID, version)
 
@@ -225,7 +283,7 @@ func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version st
 	tc.Handler().DeleteObjectHandler(w, r)
 	assertStatus(t, w, http.StatusNoContent)
 
-	return w.Header().Get(api.AmzVersionID)
+	return w.Header().Get(api.AmzVersionID), w.Header().Get(api.AmzDeleteMarker) != ""
 }
 
 func deleteBucket(t *testing.T, tc *handlerContext, bktName string, code int) {
@@ -269,3 +327,18 @@ func listObjectsV1(t *testing.T, tc *handlerContext, bktName string) *ListObject
 	parseTestResponse(t, w, res)
 	return res
 }
+
+func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
+	body := bytes.NewReader([]byte("content"))
+	w, r := prepareTestPayloadRequest(bktName, objName, body)
+	tc.Handler().PutObjectHandler(w, r)
+	assertStatus(t, w, http.StatusOK)
+}
+
+func createSuspendedBucket(t *testing.T, tc *handlerContext, bktName string) *data.BucketInfo {
+	createTestBucket(tc.Context(), t, tc, bktName)
+	bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName)
+	require.NoError(t, err)
+	putBucketVersioning(t, tc, bktName, false)
+	return bktInfo
+}
diff --git a/api/layer/layer.go b/api/layer/layer.go
index dd0dcd0d5..1c43dab10 100644
--- a/api/layer/layer.go
+++ b/api/layer/layer.go
@@ -467,7 +467,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
 	if len(obj.VersionID) != 0 || settings.Unversioned() {
 		var nodeVersion *data.NodeVersion
 		if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
-			return obj
+			return dismissNotFoundError(obj)
 		}
 
 		if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
@@ -486,14 +486,10 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
 
 		var nodeVersion *data.NodeVersion
 		if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
-			return obj
+			return dismissNotFoundError(obj)
 		}
 
-		if obj.Error == nil {
-			if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
-				return obj
-			}
-		} else if !errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) {
+		if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
 			return obj
 		}
 	}
@@ -528,6 +524,15 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
 	return obj
 }
 
+func dismissNotFoundError(obj *VersionedObject) *VersionedObject {
+	if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) ||
+		errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) {
+		obj.Error = nil
+	}
+
+	return obj
+}
+
 func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
 	objVersion := &ObjectVersion{
 		BktInfo:               bkt,