[#532] Fix object removal

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-06-15 15:17:29 +03:00 committed by Alex Vanin
parent 12a2060dd0
commit 6ad7c988e6
7 changed files with 207 additions and 63 deletions

View file

@ -79,7 +79,6 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
p := &layer.DeleteObjectParams{
BktInfo: bktInfo,
BktSettings: bktSettings,
Objects: versionedObject,
}
deletedObjects, err := h.obj.DeleteObjects(r.Context(), p)
@ -206,15 +205,8 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
return nil
})
bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
return
}
p := &layer.DeleteObjectParams{
BktInfo: bktInfo,
BktSettings: bktSettings,
Objects: toRemove,
}
deletedObjects, err := h.obj.DeleteObjects(r.Context(), p)

142
api/handler/delete_test.go Normal file
View file

@ -0,0 +1,142 @@
package handler
import (
"context"
"io"
"net/http"
"net/url"
"testing"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/stretchr/testify/require"
)
func TestDeleteObject(t *testing.T) {
ctx := context.Background()
tc := prepareHandlerContext(t)
bktName := "bucket-for-removal"
createTestBucket(ctx, t, tc, bktName)
bktInfo, err := tc.Layer().GetBucketInfo(ctx, bktName)
require.NoError(t, err)
objName := "object"
objInfo := createTestObject(ctx, t, tc, bktInfo, objName)
w, r := prepareTestRequest(t, bktName, objName, nil)
tc.Handler().HeadObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
w, r = prepareTestRequest(t, bktName, objName, nil)
tc.Handler().DeleteObjectHandler(w, r)
assertStatus(t, w, http.StatusNoContent)
w, r = prepareTestRequest(t, bktName, objName, nil)
tc.Handler().HeadObjectHandler(w, r)
assertStatus(t, w, http.StatusNotFound)
p := &layer.GetObjectParams{
BucketInfo: bktInfo,
ObjectInfo: objInfo,
Writer: io.Discard,
}
err = tc.Layer().GetObject(ctx, p)
require.Error(t, err)
}
func TestDeleteObjectVersioned(t *testing.T) {
ctx := context.Background()
tc := prepareHandlerContext(t)
bktName := "bucket-for-removal"
createTestBucket(ctx, t, tc, bktName)
bktInfo, err := tc.Layer().GetBucketInfo(ctx, bktName)
require.NoError(t, err)
cfg := &VersioningConfiguration{Status: "Enabled"}
w, r := prepareTestRequest(t, bktName, "", cfg)
tc.Handler().PutBucketVersioningHandler(w, r)
assertStatus(t, w, http.StatusOK)
objName := "object"
objInfo := createTestObject(ctx, t, tc, bktInfo, objName)
w, r = prepareTestRequest(t, bktName, objName, nil)
tc.Handler().HeadObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
w, r = prepareTestRequest(t, bktName, objName, nil)
tc.Handler().DeleteObjectHandler(w, r)
assertStatus(t, w, http.StatusNoContent)
w, r = prepareTestRequest(t, bktName, objName, nil)
tc.Handler().HeadObjectHandler(w, r)
assertStatus(t, w, http.StatusNotFound)
query := make(url.Values)
query.Add(api.QueryVersionID, objInfo.Version())
w, r = prepareTestFullRequest(t, bktName, objName, query, nil)
tc.Handler().HeadObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
w, r = prepareTestFullRequest(t, bktName, objName, query, nil)
r.URL.RawQuery = query.Encode()
tc.Handler().DeleteObjectHandler(w, r)
assertStatus(t, w, http.StatusNoContent)
p := &layer.GetObjectParams{
BucketInfo: bktInfo,
ObjectInfo: objInfo,
Writer: io.Discard,
}
err = tc.Layer().GetObject(ctx, p)
require.Error(t, err)
}
func TestDeleteObjectCombined(t *testing.T) {
ctx := context.Background()
tc := prepareHandlerContext(t)
bktName := "bucket-for-removal"
createTestBucket(ctx, t, tc, bktName)
bktInfo, err := tc.Layer().GetBucketInfo(ctx, bktName)
require.NoError(t, err)
objName := "object"
objInfo := createTestObject(ctx, t, tc, bktInfo, objName)
w, r := prepareTestRequest(t, bktName, objName, nil)
tc.Handler().HeadObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
cfg := &VersioningConfiguration{Status: "Enabled"}
w, r = prepareTestRequest(t, bktName, objName, cfg)
tc.Handler().PutBucketVersioningHandler(w, r)
assertStatus(t, w, http.StatusOK)
w, r = prepareTestRequest(t, bktName, objName, nil)
tc.Handler().DeleteObjectHandler(w, r)
assertStatus(t, w, http.StatusNoContent)
w, r = prepareTestRequest(t, bktName, objName, nil)
tc.Handler().HeadObjectHandler(w, r)
assertStatus(t, w, http.StatusNotFound)
query := make(url.Values)
query.Add(api.QueryVersionID, objInfo.Version())
w, r = prepareTestFullRequest(t, bktName, objName, query, nil)
tc.Handler().HeadObjectHandler(w, r)
assertStatus(t, w, http.StatusNotFound) // because we remove null version
p := &layer.GetObjectParams{
BucketInfo: bktInfo,
ObjectInfo: objInfo,
Writer: io.Discard,
}
err = tc.Layer().GetObject(ctx, p)
require.Error(t, err)
}

View file

@ -8,6 +8,7 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"testing"
"time"
@ -46,7 +47,7 @@ func prepareHandlerContext(t *testing.T) *handlerContext {
key, err := keys.NewPrivateKey()
require.NoError(t, err)
l := zap.NewNop()
l := zap.NewExample()
tp := layer.NewTestNeoFS()
testResolver := &resolver.BucketResolver{Name: "test_resolver"}
@ -112,7 +113,7 @@ func createTestBucketWithLock(ctx context.Context, t *testing.T, h *handlerConte
return bktInfo
}
func createTestObject(ctx context.Context, t *testing.T, h *handlerContext, bktInfo *data.BucketInfo, objName string) {
func createTestObject(ctx context.Context, t *testing.T, h *handlerContext, bktInfo *data.BucketInfo, objName string) *data.ObjectInfo {
content := make([]byte, 1024)
_, err := rand.Read(content)
require.NoError(t, err)
@ -121,7 +122,7 @@ func createTestObject(ctx context.Context, t *testing.T, h *handlerContext, bktI
object.AttributeTimestamp: strconv.FormatInt(time.Now().UTC().Unix(), 10),
}
_, err = h.Layer().PutObject(ctx, &layer.PutObjectParams{
objInfo, err := h.Layer().PutObject(ctx, &layer.PutObjectParams{
BktInfo: bktInfo,
Object: objName,
Size: int64(len(content)),
@ -129,14 +130,21 @@ func createTestObject(ctx context.Context, t *testing.T, h *handlerContext, bktI
Header: header,
})
require.NoError(t, err)
return objInfo
}
func prepareTestRequest(t *testing.T, bktName, objName string, body interface{}) (*httptest.ResponseRecorder, *http.Request) {
return prepareTestFullRequest(t, bktName, objName, make(url.Values), body)
}
func prepareTestFullRequest(t *testing.T, bktName, objName string, query url.Values, body interface{}) (*httptest.ResponseRecorder, *http.Request) {
rawBody, err := xml.Marshal(body)
require.NoError(t, err)
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPut, defaultURL, bytes.NewReader(rawBody))
r.URL.RawQuery = query.Encode()
reqInfo := api.NewReqInfo(w, r, api.ObjectRequest{Bucket: bktName, Object: objName})
r = r.WithContext(api.SetReqInfo(r.Context(), reqInfo))

View file

@ -594,6 +594,6 @@ func assertStatus(t *testing.T, w *httptest.ResponseRecorder, status int) {
if w.Code != status {
resp, err := io.ReadAll(w.Result().Body)
require.NoError(t, err)
require.Fail(t, string(resp))
require.Failf(t, "unexpected status", "expected: %d, actual: %d, resp: '%s'", status, w.Code, string(resp))
}
}

View file

@ -95,6 +95,7 @@ type (
BktInfo *data.BucketInfo
ObjectName string
VersionID string
NoErrorOnDeleteMarker bool
}
// RangeParams stores range header request parameters.
@ -115,7 +116,6 @@ type (
DeleteObjectParams struct {
BktInfo *data.BucketInfo
BktSettings *data.BucketSettings
Objects []*VersionedObject
}
@ -474,15 +474,35 @@ func getRandomOID() (*oid.ID, error) {
}
// DeleteObject removes all objects with the passed nice name.
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
if len(obj.VersionID) == 0 || obj.VersionID == UnversionedObjectVersionID {
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
if len(obj.VersionID) == 0 {
obj.VersionID = UnversionedObjectVersionID
}
objVersion := &ObjectVersion{
BktInfo: bkt,
ObjectName: obj.Name,
VersionID: obj.VersionID,
NoErrorOnDeleteMarker: true,
}
var nodeVersion *data.NodeVersion
nodeVersion, obj.Error = n.getNodeVersion(ctx, objVersion)
if obj.VersionID == UnversionedObjectVersionID {
if obj.Error == nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return obj
}
} else if !errors.IsS3Error(obj.Error, errors.ErrNoSuchKey) {
return obj
}
randOID, err := getRandomOID()
if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
return obj
}
obj.DeleteMarkVersion = UnversionedObjectVersionID
newVersion := &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: *randOID,
@ -495,24 +515,21 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
IsUnversioned: true,
}
if len(obj.VersionID) == 0 && settings.VersioningEnabled {
newVersion.IsUnversioned = false
obj.DeleteMarkVersion = randOID.EncodeToString()
}
if obj.Error = n.treeService.AddVersion(ctx, &bkt.CID, newVersion); obj.Error != nil {
return obj
}
n.namesCache.Delete(bkt.Name + "/" + obj.Name)
} else {
versions, err := n.treeService.GetVersions(ctx, &bkt.CID, obj.Name)
if err != nil {
obj.Error = err
if obj.Error != nil {
return obj
}
if obj.DeleteMarkVersion, obj.Error = n.removeVersionIfFound(ctx, bkt, versions, obj); obj.Error != nil {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return obj
}
if obj.Error = n.treeService.RemoveVersion(ctx, &bkt.CID, nodeVersion.ID); obj.Error != nil {
return obj
}
}
@ -522,29 +539,18 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
return obj
}
func (n *layer) removeVersionIfFound(ctx context.Context, bkt *data.BucketInfo, versions []*data.NodeVersion, obj *VersionedObject) (string, error) {
for _, version := range versions {
if version.OID.EncodeToString() != obj.VersionID {
continue
func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
if nodeVersion.DeleteMarker != nil {
return obj.VersionID, nil
}
var deleteMarkVersion string
if version.DeleteMarker != nil {
deleteMarkVersion = obj.VersionID
} else if err := n.objectDelete(ctx, bkt, version.OID); err != nil {
return deleteMarkVersion, err
}
return deleteMarkVersion, n.treeService.RemoveVersion(ctx, &bkt.CID, version.ID)
}
return "", errors.GetAPIError(errors.ErrNoSuchVersion)
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
}
// DeleteObjects from the storage.
func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) ([]*VersionedObject, error) {
for i, obj := range p.Objects {
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.BktSettings, obj)
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, obj)
}
return p.Objects, nil

View file

@ -150,11 +150,11 @@ func (n *layer) getNodeVersion(ctx context.Context, objVersion *ObjectVersion) (
}
}
if version == nil {
err = errors.GetAPIError(errors.ErrNoSuchKey)
err = errors.GetAPIError(errors.ErrNoSuchVersion)
}
}
if err == nil && version.DeleteMarker != nil || errorsStd.Is(err, ErrNodeNotFound) {
if err == nil && version.DeleteMarker != nil && !objVersion.NoErrorOnDeleteMarker || errorsStd.Is(err, ErrNodeNotFound) {
return nil, errors.GetAPIError(errors.ErrNoSuchKey)
}

View file

@ -53,10 +53,9 @@ func (tc *testContext) getObject(objectName, versionID string, needError bool) (
return objInfo, content.Bytes()
}
func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {
func (tc *testContext) deleteObject(objectName, versionID string) {
p := &DeleteObjectParams{
BktInfo: tc.bktInfo,
BktSettings: settings,
Objects: []*VersionedObject{
{Name: objectName, VersionID: versionID},
},
@ -231,7 +230,7 @@ func TestVersioningDeleteObject(t *testing.T) {
tc.putObject([]byte("content obj1 v1"))
tc.putObject([]byte("content obj1 v2"))
tc.deleteObject(tc.obj, "", settings)
tc.deleteObject(tc.obj, "")
tc.getObject(tc.obj, "", true)
tc.checkListObjects()
@ -269,19 +268,19 @@ func TestVersioningDeleteSpecificObjectVersion(t *testing.T) {
objV3Content := []byte("content obj1 v3")
objV3Info := tc.putObject(objV3Content)
tc.deleteObject(tc.obj, objV2Info.Version(), settings)
tc.deleteObject(tc.obj, objV2Info.Version())
tc.getObject(tc.obj, objV2Info.Version(), true)
_, buffer3 := tc.getObject(tc.obj, "", false)
require.Equal(t, objV3Content, buffer3)
tc.deleteObject(tc.obj, "", settings)
tc.deleteObject(tc.obj, "")
tc.getObject(tc.obj, "", true)
versions := tc.listVersions()
for _, ver := range versions.DeleteMarker {
if ver.IsLatest {
tc.deleteObject(tc.obj, ver.Object.Version(), settings)
tc.deleteObject(tc.obj, ver.Object.Version())
}
}
@ -296,10 +295,7 @@ func TestNoVersioningDeleteObject(t *testing.T) {
tc.putObject([]byte("content obj1 v1"))
tc.putObject([]byte("content obj1 v2"))
versioning, err := tc.layer.GetBucketSettings(tc.ctx, tc.bktInfo)
require.NoError(t, err)
tc.deleteObject(tc.obj, "", versioning)
tc.deleteObject(tc.obj, "")
tc.getObject(tc.obj, "", true)
tc.checkListObjects()
}