[#610] Fix deleted object removal

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-07-25 16:00:35 +03:00 committed by Alex Vanin
parent 685a5f0ce8
commit 1fd943ee88
3 changed files with 103 additions and 23 deletions

View file

@ -87,13 +87,10 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
if deletedObject.Error != nil {
if isErrObjectLocked(deletedObject.Error) {
h.logAndSendError(w, "object is locked", reqInfo, errors.GetAPIError(errors.ErrAccessDenied))
return
} else {
h.logAndSendError(w, "could not delete object", reqInfo, deletedObject.Error)
}
h.log.Error("could not delete object",
zap.String("request_id", reqInfo.RequestID),
zap.String("bucket_name", reqInfo.BucketName),
zap.String("object_name", reqInfo.ObjectName),
zap.Error(deletedObject.Error))
return
}
var m *SendNotificationParams
@ -131,9 +128,14 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
h.log.Error("couldn't send notification: %w", zap.Error(err))
}
if deletedObject.VersionID != "" {
w.Header().Set(api.AmzVersionID, deletedObject.VersionID)
}
if deletedObject.DeleteMarkVersion != "" {
w.Header().Set(api.AmzDeleteMarker, strconv.FormatBool(true))
w.Header().Set(api.AmzVersionID, deletedObject.DeleteMarkVersion)
if deletedObject.VersionID == "" {
w.Header().Set(api.AmzVersionID, deletedObject.DeleteMarkVersion)
}
}
w.WriteHeader(http.StatusNoContent)

View file

@ -1,12 +1,14 @@
package handler
import (
"bytes"
"net/http"
"net/url"
"testing"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/stretchr/testify/require"
)
@ -20,11 +22,13 @@ func TestDeleteBucket(t *testing.T) {
bktName, objName := "bucket-for-removal", "object-to-delete"
_, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
deleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
deleteBucket(t, tc, bktName, http.StatusConflict)
deleteObject(t, tc, bktName, objName, objInfo.Version())
deleteBucket(t, tc, bktName, http.StatusConflict)
deleteObject(t, tc, bktName, objName, deleteMarker)
deleteObject(t, tc, bktName, objName, deleteMarkerVersion)
deleteBucket(t, tc, bktName, http.StatusNoContent)
}
@ -41,6 +45,57 @@ func TestDeleteObject(t *testing.T) {
require.False(t, existInMockedNeoFS(tc, bktInfo, objInfo))
}
func TestDeleteObjectFromSuspended(t *testing.T) {
tc := prepareHandlerContext(t)
bktName, objName := "bucket-versioned-for-removal", "object-to-delete"
createSuspendedBucket(t, tc, bktName)
putObject(t, tc, bktName, objName)
versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
require.Equal(t, layer.UnversionedObjectVersionID, versionID)
}
func TestDeleteDeletedObject(t *testing.T) {
tc := prepareHandlerContext(t)
t.Run("unversioned bucket", func(t *testing.T) {
bktName, objName := "bucket-unversioned-removal", "object-to-delete"
createBucketAndObject(t, tc, bktName, objName)
versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.Empty(t, versionID)
require.False(t, isDeleteMarker)
versionID, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.Empty(t, versionID)
require.False(t, isDeleteMarker)
})
t.Run("versioned bucket", func(t *testing.T) {
bktName, objName := "bucket-versioned-for-removal", "object-to-delete"
createVersionedBucketAndObject(t, tc, bktName, objName)
_, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
_, isDeleteMarker = deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
})
t.Run("versioned bucket not found obj", func(t *testing.T) {
bktName, objName := "bucket-versioned-for-removal", "object-to-delete"
_, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, objInfo.Version())
require.False(t, isDeleteMarker)
require.Equal(t, objInfo.Version(), versionID)
versionID2, isDeleteMarker := deleteObject(t, tc, bktName, objName, versionID)
require.False(t, isDeleteMarker)
require.Equal(t, objInfo.Version(), versionID2)
})
}
func TestDeleteObjectVersioned(t *testing.T) {
tc := prepareHandlerContext(t)
@ -82,7 +137,8 @@ func TestRemoveDeleteMarker(t *testing.T) {
bktInfo, objInfo := createVersionedBucketAndObject(t, tc, bktName, objName)
checkFound(t, tc, bktName, objName, emptyVersion)
deleteMarkerVersion := deleteObject(t, tc, bktName, objName, emptyVersion)
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
checkNotFound(t, tc, bktName, objName, emptyVersion)
checkFound(t, tc, bktName, objName, objInfo.Version())
@ -174,16 +230,18 @@ func TestDeleteObjectCheckMarkerReturn(t *testing.T) {
bktName, objName := "bucket-for-removal", "object-to-delete"
createVersionedBucketAndObject(t, tc, bktName, objName)
deleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
deleteMarkerVersion, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion)
require.True(t, isDeleteMarker)
versions := listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 1)
require.Equal(t, deleteMarker, versions.DeleteMarker[0].VersionID)
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)
deleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarker)
deleteMarkerVersion2, isDeleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarkerVersion)
require.True(t, isDeleteMarker2)
versions = listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 0)
require.Equal(t, deleteMarker, deleteMarker2)
require.Equal(t, deleteMarkerVersion, deleteMarkerVersion2)
}
func createBucketAndObject(t *testing.T, tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) {
@ -217,7 +275,7 @@ func putBucketVersioning(t *testing.T, tc *handlerContext, bktName string, enabl
assertStatus(t, w, http.StatusOK)
}
func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version string) string {
func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version string) (string, bool) {
query := make(url.Values)
query.Add(api.QueryVersionID, version)
@ -225,7 +283,7 @@ func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version st
tc.Handler().DeleteObjectHandler(w, r)
assertStatus(t, w, http.StatusNoContent)
return w.Header().Get(api.AmzVersionID)
return w.Header().Get(api.AmzVersionID), w.Header().Get(api.AmzDeleteMarker) != ""
}
func deleteBucket(t *testing.T, tc *handlerContext, bktName string, code int) {
@ -269,3 +327,18 @@ func listObjectsV1(t *testing.T, tc *handlerContext, bktName string) *ListObject
parseTestResponse(t, w, res)
return res
}
func putObject(t *testing.T, tc *handlerContext, bktName, objName string) {
body := bytes.NewReader([]byte("content"))
w, r := prepareTestPayloadRequest(bktName, objName, body)
tc.Handler().PutObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
}
func createSuspendedBucket(t *testing.T, tc *handlerContext, bktName string) *data.BucketInfo {
createTestBucket(tc.Context(), t, tc, bktName)
bktInfo, err := tc.Layer().GetBucketInfo(tc.Context(), bktName)
require.NoError(t, err)
putBucketVersioning(t, tc, bktName, false)
return bktInfo
}

View file

@ -467,7 +467,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
if len(obj.VersionID) != 0 || settings.Unversioned() {
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return obj
return dismissNotFoundError(obj)
}
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
@ -486,14 +486,10 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return obj
return dismissNotFoundError(obj)
}
if obj.Error == nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return obj
}
} else if !errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return obj
}
}
@ -528,6 +524,15 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj
}
func dismissNotFoundError(obj *VersionedObject) *VersionedObject {
if errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) ||
errors.IsS3Error(obj.Error, errors.ErrNoSuchVersion) {
obj.Error = nil
}
return obj
}
func (n *layer) getNodeVersionToDelete(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) (*data.NodeVersion, error) {
objVersion := &ObjectVersion{
BktInfo: bkt,