forked from TrueCloudLab/frostfs-s3-gw
[#357] Refactor delete objects
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
b7aac223df
commit
021f5d4dd0
4 changed files with 78 additions and 25 deletions
|
@ -7,8 +7,10 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
@ -66,7 +68,18 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
deletedObjects, err := h.obj.DeleteObjects(r.Context(), bktInfo, versionedObject)
|
||||
bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
p := &layer.DeleteObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
BktSettings: bktSettings,
|
||||
Objects: versionedObject,
|
||||
}
|
||||
deletedObjects, err := h.obj.DeleteObjects(r.Context(), p)
|
||||
deletedObject := deletedObjects[0]
|
||||
if err == nil {
|
||||
err = deletedObject.Error
|
||||
|
@ -145,7 +158,18 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
|
|||
return nil
|
||||
})
|
||||
|
||||
deletedObjects, err := h.obj.DeleteObjects(r.Context(), bktInfo, toRemove)
|
||||
bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
p := &layer.DeleteObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
BktSettings: bktSettings,
|
||||
Objects: toRemove,
|
||||
}
|
||||
deletedObjects, err := h.obj.DeleteObjects(r.Context(), p)
|
||||
if !requested.Quiet && err != nil {
|
||||
h.logAndSendError(w, "couldn't delete objects", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -436,7 +436,17 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
|||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
|
||||
if _, err = h.obj.DeleteObjects(r.Context(), bktInfo, []*layer.VersionedObject{{Name: initPart.Name}}); err != nil {
|
||||
bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
}
|
||||
|
||||
p := &layer.DeleteObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
BktSettings: bktSettings,
|
||||
Objects: []*layer.VersionedObject{{Name: initPart.Name}},
|
||||
}
|
||||
if _, err = h.obj.DeleteObjects(r.Context(), p); err != nil {
|
||||
h.logAndSendError(w, "could not delete init file of multipart upload", reqInfo, err, additional...)
|
||||
return
|
||||
}
|
||||
|
@ -447,9 +457,7 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
|||
Key: objInfo.Name,
|
||||
}
|
||||
|
||||
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
|
||||
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
|
||||
} else if settings.VersioningEnabled {
|
||||
if bktSettings != nil && bktSettings.VersioningEnabled {
|
||||
w.Header().Set(api.AmzVersionID, objInfo.Version())
|
||||
}
|
||||
|
||||
|
|
|
@ -107,6 +107,12 @@ type (
|
|||
Lock *data.ObjectLock
|
||||
}
|
||||
|
||||
DeleteObjectParams struct {
|
||||
BktInfo *data.BucketInfo
|
||||
BktSettings *data.BucketSettings
|
||||
Objects []*VersionedObject
|
||||
}
|
||||
|
||||
// PutSettingsParams stores object copy request parameters.
|
||||
PutSettingsParams struct {
|
||||
BktInfo *data.BucketInfo
|
||||
|
@ -220,7 +226,7 @@ type (
|
|||
ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error)
|
||||
ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error)
|
||||
|
||||
DeleteObjects(ctx context.Context, bktInfo *data.BucketInfo, objects []*VersionedObject) ([]*VersionedObject, error)
|
||||
DeleteObjects(ctx context.Context, p *DeleteObjectParams) ([]*VersionedObject, error)
|
||||
DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error
|
||||
DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objInfo *data.ObjectInfo) error
|
||||
DeleteBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) error
|
||||
|
@ -536,12 +542,18 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Obje
|
|||
}
|
||||
|
||||
// DeleteObject removes all objects with passed nice name.
|
||||
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject) *VersionedObject {
|
||||
func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
|
||||
var (
|
||||
err error
|
||||
ids []*oid.ID
|
||||
|
||||
versioningEnabled = false
|
||||
)
|
||||
|
||||
if settings != nil {
|
||||
versioningEnabled = settings.VersioningEnabled
|
||||
}
|
||||
|
||||
p := &PutObjectParams{
|
||||
BktInfo: bkt,
|
||||
Object: obj.Name,
|
||||
|
@ -549,8 +561,6 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *Ver
|
|||
Header: map[string]string{},
|
||||
}
|
||||
|
||||
versioningEnabled := n.isVersioningEnabled(ctx, bkt)
|
||||
|
||||
// Current implementation doesn't consider "unversioned" mode (so any deletion creates "delete-mark" object).
|
||||
// The reason is difficulties to determinate whether versioning mode is "unversioned" or "suspended".
|
||||
|
||||
|
@ -613,12 +623,12 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, obj *Ver
|
|||
}
|
||||
|
||||
// DeleteObjects from the storage.
|
||||
func (n *layer) DeleteObjects(ctx context.Context, bktInfo *data.BucketInfo, objects []*VersionedObject) ([]*VersionedObject, error) {
|
||||
for i, obj := range objects {
|
||||
objects[i] = n.deleteObject(ctx, bktInfo, obj)
|
||||
func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) ([]*VersionedObject, error) {
|
||||
for i, obj := range p.Objects {
|
||||
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.BktSettings, obj)
|
||||
}
|
||||
|
||||
return objects, nil
|
||||
return p.Objects, nil
|
||||
}
|
||||
|
||||
func (n *layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/layer/neofs"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/internal/neofstest"
|
||||
|
@ -58,10 +59,15 @@ func (tc *testContext) getObject(objectName, versionID string, needError bool) (
|
|||
return objInfo, content.Bytes()
|
||||
}
|
||||
|
||||
func (tc *testContext) deleteObject(objectName, versionID string) {
|
||||
deletedObjects, err := tc.layer.DeleteObjects(tc.ctx, tc.bktInfo, []*VersionedObject{
|
||||
func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {
|
||||
p := &DeleteObjectParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
BktSettings: settings,
|
||||
Objects: []*VersionedObject{
|
||||
{Name: objectName, VersionID: versionID},
|
||||
})
|
||||
},
|
||||
}
|
||||
deletedObjects, err := tc.layer.DeleteObjects(tc.ctx, p)
|
||||
require.NoError(tc.t, err)
|
||||
for _, obj := range deletedObjects {
|
||||
require.NoError(tc.t, obj.Error)
|
||||
|
@ -223,16 +229,17 @@ func TestSimpleNoVersioning(t *testing.T) {
|
|||
|
||||
func TestVersioningDeleteObject(t *testing.T) {
|
||||
tc := prepareContext(t)
|
||||
settings := &data.BucketSettings{VersioningEnabled: true}
|
||||
err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Settings: &data.BucketSettings{VersioningEnabled: true},
|
||||
Settings: settings,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
tc.putObject([]byte("content obj1 v1"))
|
||||
tc.putObject([]byte("content obj1 v2"))
|
||||
|
||||
tc.deleteObject(tc.obj, "")
|
||||
tc.deleteObject(tc.obj, "", settings)
|
||||
tc.getObject(tc.obj, "", true)
|
||||
|
||||
tc.checkListObjects()
|
||||
|
@ -240,9 +247,10 @@ func TestVersioningDeleteObject(t *testing.T) {
|
|||
|
||||
func TestVersioningDeleteSpecificObjectVersion(t *testing.T) {
|
||||
tc := prepareContext(t)
|
||||
settings := &data.BucketSettings{VersioningEnabled: true}
|
||||
err := tc.layer.PutBucketSettings(tc.ctx, &PutSettingsParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Settings: &data.BucketSettings{VersioningEnabled: true},
|
||||
Settings: settings,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -251,18 +259,18 @@ func TestVersioningDeleteSpecificObjectVersion(t *testing.T) {
|
|||
objV3Content := []byte("content obj1 v3")
|
||||
objV3Info := tc.putObject(objV3Content)
|
||||
|
||||
tc.deleteObject(tc.obj, objV2Info.Version())
|
||||
tc.deleteObject(tc.obj, objV2Info.Version(), settings)
|
||||
tc.getObject(tc.obj, objV2Info.Version(), true)
|
||||
|
||||
_, buffer3 := tc.getObject(tc.obj, "", false)
|
||||
require.Equal(t, objV3Content, buffer3)
|
||||
|
||||
tc.deleteObject(tc.obj, "")
|
||||
tc.deleteObject(tc.obj, "", settings)
|
||||
tc.getObject(tc.obj, "", true)
|
||||
|
||||
for _, ver := range tc.listVersions().DeleteMarker {
|
||||
if ver.IsLatest {
|
||||
tc.deleteObject(tc.obj, ver.Object.Version())
|
||||
tc.deleteObject(tc.obj, ver.Object.Version(), settings)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -351,7 +359,10 @@ func TestNoVersioningDeleteObject(t *testing.T) {
|
|||
tc.putObject([]byte("content obj1 v1"))
|
||||
tc.putObject([]byte("content obj1 v2"))
|
||||
|
||||
tc.deleteObject(tc.obj, "")
|
||||
versioning, err := tc.layer.GetBucketSettings(tc.ctx, tc.bktInfo)
|
||||
require.Error(t, err, errors.GetAPIError(errors.ErrNoSuchKey))
|
||||
|
||||
tc.deleteObject(tc.obj, "", versioning)
|
||||
tc.getObject(tc.obj, "", true)
|
||||
tc.checkListObjects()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue