[#248] Support delete marker

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2021-09-07 09:17:12 +03:00 committed by Alex Vanin
parent 458f9cf17b
commit 3c2e25f977
6 changed files with 107 additions and 67 deletions

View file

@ -3,6 +3,7 @@ package handler
import (
"encoding/xml"
"net/http"
"strconv"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
@ -25,6 +26,13 @@ type ObjectIdentifier struct {
VersionID string `xml:"VersionId,omitempty"`
}
// DeletedObject carries key name for the object to delete.
type DeletedObject struct {
ObjectIdentifier
DeleteMarker bool `xml:"DeleteMarker,omitempty"`
DeleteMarkerVersionID string `xml:"DeleteMarkerVersionId,omitempty"`
}
// DeleteError structure.
type DeleteError struct {
Code string
@ -38,7 +46,7 @@ type DeleteObjectsResponse struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"`
// Collection of all deleted objects
DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"`
DeletedObjects []DeletedObject `xml:"Deleted,omitempty"`
// Collection of errors deleting certain objects.
Errors []DeleteError `xml:"Error,omitempty"`
@ -56,20 +64,22 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
if errs := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, versionedObject); len(errs) != 0 && errs[0] != nil {
deletedObjects, err := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, versionedObject)
deletedObject := deletedObjects[0]
if err == nil {
err = deletedObject.Error
}
if err != nil {
h.log.Error("could not delete object",
zap.String("request_id", reqInfo.RequestID),
zap.String("bucket_name", reqInfo.BucketName),
zap.String("object_name", reqInfo.ObjectName),
zap.Error(errs[0]))
zap.Error(err))
}
// Ignore delete errors:
// api.WriteErrorResponse(r.Context(), w, api.Error{
// Code: api.GetAPIError(api.ErrInternalError).Code,
// Description: err.Error(),
// HTTPStatusCode: http.StatusInternalServerError,
// }, r.URL)
if deletedObject.DeleteMarkVersion != "" {
w.Header().Set(api.AmzDeleteMarker, strconv.FormatBool(true))
w.Header().Set(api.AmzVersionID, deletedObject.DeleteMarkVersion)
}
w.WriteHeader(http.StatusNoContent)
@ -113,7 +123,7 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
response := &DeleteObjectsResponse{
Errors: make([]DeleteError, 0, len(toRemove)),
DeletedObjects: make([]ObjectIdentifier, 0, len(toRemove)),
DeletedObjects: make([]DeletedObject, 0, len(toRemove)),
}
if err := h.checkBucketOwner(r, reqInfo.BucketName); err != nil {
@ -128,34 +138,47 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
return nil
})
if errs := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, toRemove); errs != nil && !requested.Quiet {
deletedObjects, err := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, toRemove)
if !requested.Quiet && err != nil {
h.logAndSendError(w, "couldn't delete objects", reqInfo, err)
return
}
var errs []error
for _, obj := range deletedObjects {
if obj.Error == nil {
deletedObj := DeletedObject{
ObjectIdentifier: ObjectIdentifier{
ObjectName: obj.Name,
VersionID: obj.VersionID,
},
DeleteMarkerVersionID: obj.DeleteMarkVersion,
}
if deletedObj.DeleteMarkerVersionID != "" {
deletedObj.DeleteMarker = true
}
response.DeletedObjects = append(response.DeletedObjects, deletedObj)
} else if !requested.Quiet {
code := "BadRequest"
if s3err, ok := obj.Error.(errors.Error); ok {
code = s3err.Code
}
response.Errors = append(response.Errors, DeleteError{
Code: code,
Message: obj.Error.Error(),
Key: obj.Name,
VersionID: obj.VersionID,
})
errs = append(errs, obj.Error)
}
}
if !requested.Quiet && len(errs) != 0 {
additional := []zap.Field{
zap.Array("objects", marshaler),
zap.Errors("errors", errs),
}
h.logAndSendError(w, "could not delete objects", reqInfo, nil, additional...)
for _, e := range errs {
if err, ok := e.(*errors.ObjectError); ok {
code := "BadRequest"
if s3err, ok := err.Err.(errors.Error); ok {
code = s3err.Code
}
response.Errors = append(response.Errors, DeleteError{
Code: code,
Message: err.Error(),
Key: err.Object,
VersionID: err.Version,
})
delete(removed, err.ObjectVersion())
}
}
}
for _, val := range removed {
response.DeletedObjects = append(response.DeletedObjects, ObjectIdentifier{ObjectName: val.Name, VersionID: val.VersionID})
}
if err := api.EncodeToResponse(w, response); err != nil {

View file

@ -7,6 +7,7 @@ const (
AmzVersionID = "X-Amz-Version-Id"
AmzTaggingCount = "X-Amz-Tagging-Count"
AmzTagging = "X-Amz-Tagging"
AmzDeleteMarker = "X-Amz-Delete-Marker"
LastModified = "Last-Modified"
Date = "Date"

View file

@ -131,10 +131,12 @@ type (
Encode string
}
// VersionedObject stores object name and version.
// VersionedObject stores info about objects to delete.
VersionedObject struct {
Name string
VersionID string
DeleteMarkVersion string
Error error
}
// PutTaggingParams stores tag set params.
@ -177,7 +179,7 @@ type (
ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error)
ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error)
DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) []error
DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) ([]*VersionedObject, error)
DeleteObjectTagging(ctx context.Context, p *api.ObjectInfo) error
DeleteBucketTagging(ctx context.Context, bucket string) error
}
@ -586,7 +588,7 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*api.Objec
}
// DeleteObject removes all objects with passed nice name.
func (n *layer) deleteObject(ctx context.Context, bkt *api.BucketInfo, obj *VersionedObject) error {
func (n *layer) deleteObject(ctx context.Context, bkt *api.BucketInfo, obj *VersionedObject) *VersionedObject {
var (
err error
ids []*object.ID
@ -594,7 +596,8 @@ func (n *layer) deleteObject(ctx context.Context, bkt *api.BucketInfo, obj *Vers
versioningEnabled := n.isVersioningEnabled(ctx, bkt)
if !versioningEnabled && obj.VersionID != unversionedObjectVersionID && obj.VersionID != "" {
return errors.GetAPIError(errors.ErrInvalidVersion)
obj.Error = errors.GetAPIError(errors.ErrInvalidVersion)
return obj
}
if versioningEnabled {
@ -604,55 +607,63 @@ func (n *layer) deleteObject(ctx context.Context, bkt *api.BucketInfo, obj *Vers
Header: map[string]string{versionsDeleteMarkAttr: obj.VersionID},
}
if len(obj.VersionID) != 0 {
id, err := n.checkVersionsExist(ctx, bkt, obj)
version, err := n.checkVersionsExist(ctx, bkt, obj)
if err != nil {
return err
obj.Error = err
return obj
}
ids = []*object.ID{version.ID}
if version.Headers[versionsDeleteMarkAttr] == delMarkFullObject {
obj.DeleteMarkVersion = version.Version()
}
ids = []*object.ID{id}
p.Header[versionsDelAttr] = obj.VersionID
} else {
p.Header[versionsDeleteMarkAttr] = delMarkFullObject
}
if _, err = n.objectPut(ctx, bkt, p); err != nil {
return err
objInfo, err := n.objectPut(ctx, bkt, p)
if err != nil {
obj.Error = err
return obj
}
if len(obj.VersionID) == 0 {
obj.DeleteMarkVersion = objInfo.Version()
}
} else {
ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: obj.Name})
if err != nil {
return err
obj.Error = err
return obj
}
}
for _, id := range ids {
if err = n.objectDelete(ctx, bkt.CID, id); err != nil {
return err
obj.Error = err
return obj
}
if err = n.DeleteObjectTagging(ctx, &api.ObjectInfo{ID: id, Bucket: bkt.Name, Name: obj.Name}); err != nil {
return err
obj.Error = err
return obj
}
}
n.listsCache.CleanCacheEntriesContainingObject(obj.Name, bkt.CID)
return nil
return obj
}
// DeleteObjects from the storage.
func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) []error {
var errs = make([]error, 0, len(objects))
func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) ([]*VersionedObject, error) {
bkt, err := n.GetBucketInfo(ctx, bucket)
if err != nil {
return append(errs, err)
return nil, err
}
for _, obj := range objects {
if err := n.deleteObject(ctx, bkt, obj); err != nil {
errs = append(errs, &errors.ObjectError{Err: err, Object: obj.Name, Version: obj.VersionID})
}
for i, obj := range objects {
objects[i] = n.deleteObject(ctx, bkt, obj)
}
return errs
return objects, nil
}
func (n *layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*cid.ID, error) {

View file

@ -225,7 +225,10 @@ func (v *objectVersions) getDelHeader() string {
func (v *objectVersions) getVersion(oid *object.ID) *api.ObjectInfo {
for _, version := range v.objects {
if version.ID == oid {
if version.Version() == oid.String() {
if contains(v.delList, oid.String()) {
return nil
}
return version
}
}
@ -365,7 +368,7 @@ func objectInfoToBucketSettings(info *api.ObjectInfo) *BucketSettings {
return res
}
func (n *layer) checkVersionsExist(ctx context.Context, bkt *api.BucketInfo, obj *VersionedObject) (*object.ID, error) {
func (n *layer) checkVersionsExist(ctx context.Context, bkt *api.BucketInfo, obj *VersionedObject) (*api.ObjectInfo, error) {
id := object.NewID()
if err := id.Parse(obj.VersionID); err != nil {
return nil, errors.GetAPIError(errors.ErrInvalidVersion)
@ -375,9 +378,10 @@ func (n *layer) checkVersionsExist(ctx context.Context, bkt *api.BucketInfo, obj
if err != nil {
return nil, err
}
if !contains(versions.existedVersions(), obj.VersionID) {
version := versions.getVersion(id)
if version == nil {
return nil, errors.GetAPIError(errors.ErrInvalidVersion)
}
return id, nil
return version, nil
}

View file

@ -245,11 +245,12 @@ func (tc *testContext) getObject(objectName, versionID string, needError bool) (
}
func (tc *testContext) deleteObject(objectName, versionID string) {
errs := tc.layer.DeleteObjects(tc.ctx, tc.bkt, []*VersionedObject{
deletedObjects, err := tc.layer.DeleteObjects(tc.ctx, tc.bkt, []*VersionedObject{
{Name: objectName, VersionID: versionID},
})
for _, err := range errs {
require.NoError(tc.t, err)
for _, obj := range deletedObjects {
require.NoError(tc.t, obj.Error)
}
}

View file

@ -383,7 +383,7 @@ Compatibility: 11/24/26
|----|---------------------------------------------------------------------------------------------|-------|--------|
| 1 | s3tests_boto3.functional.test_s3.test_versioning_bucket_create_suspend | ok | ok |
| 2 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_read_remove | ok | ok |
| 3 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_read_remove_head | ERROR | ok |
| 3 | s3tests_boto3.functional.test_s3.test_versioning_obj_create_read_remove_head | ok | ok |
| 4 | s3tests_boto3.functional.test_s3.test_versioning_obj_plain_null_version_removal | ERROR | ok |
| 5 | s3tests_boto3.functional.test_s3.test_versioning_obj_plain_null_version_overwrite | ERROR | ok |
| 6 | s3tests_boto3.functional.test_s3.test_versioning_obj_plain_null_version_overwrite_suspended | ERROR | ok |
@ -395,7 +395,7 @@ Compatibility: 11/24/26
| 12 | s3tests_boto3.functional.test_s3.test_versioning_copy_obj_version | ok | ok |
| 13 | s3tests_boto3.functional.test_s3.test_versioning_multi_object_delete | ok | ok |
| 14 | s3tests_boto3.functional.test_s3.test_versioning_multi_object_delete_with_marker | ok | ok |
| 15 | s3tests_boto3.functional.test_s3.test_versioning_multi_object_delete_with_marker_create | ERROR | ok |
| 15 | s3tests_boto3.functional.test_s3.test_versioning_multi_object_delete_with_marker_create | ok | ok |
| 16 | s3tests_boto3.functional.test_s3.test_versioned_object_acl | ERROR | FAIL |
| 17 | s3tests_boto3.functional.test_s3.test_versioned_object_acl_no_version_specified | ERROR | FAIL |
| 18 | s3tests_boto3.functional.test_s3.test_versioned_concurrent_object_create_concurrent_remove | ERROR | ok |