From d5aef7566f5d3b15ebb32f05d2ac9f320b021265 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 10 Aug 2021 15:08:15 +0300 Subject: [PATCH] [#122] Add delete versioned object Signed-off-by: Denis Kirillov --- api/errors/errors.go | 7 ++ api/handler/delete.go | 31 +++++++-- api/handler/get.go | 2 +- api/headers.go | 2 +- api/layer/layer.go | 154 ++++++++++++++++++++++++++++++++--------- api/layer/object.go | 78 +++++++++------------ api/layer/util_test.go | 1 + 7 files changed, 189 insertions(+), 86 deletions(-) diff --git a/api/errors/errors.go b/api/errors/errors.go index 24424a6b..d1dd1126 100644 --- a/api/errors/errors.go +++ b/api/errors/errors.go @@ -66,6 +66,7 @@ const ( ErrNoSuchKey ErrNoSuchUpload ErrNoSuchVersion + ErrInvalidVersion ErrNotImplemented ErrPreconditionFailed ErrNotModified @@ -529,6 +530,12 @@ var errorCodes = errorCodeMap{ Description: "Indicates that the version ID specified in the request does not match an existing version.", HTTPStatusCode: http.StatusNotFound, }, + ErrInvalidVersion: { + ErrCode: ErrInvalidVersion, + Code: "InvalidArgument", + Description: "Invalid version id specified", + HTTPStatusCode: http.StatusBadRequest, + }, ErrNotImplemented: { ErrCode: ErrNotImplemented, Code: "NotImplemented", diff --git a/api/handler/delete.go b/api/handler/delete.go index 742a7bbb..a8b56bc6 100644 --- a/api/handler/delete.go +++ b/api/handler/delete.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/layer" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // DeleteObjectsRequest - xml carrying the object key names which needs to be deleted. @@ -21,6 +22,7 @@ type DeleteObjectsRequest struct { // ObjectIdentifier carries key name for the object to delete. type ObjectIdentifier struct { ObjectName string `xml:"Key"` + VersionID string `xml:"VersionId,omitempty"` } // DeleteError structure. @@ -43,18 +45,22 @@ type DeleteObjectsResponse struct { func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) { reqInfo := api.GetReqInfo(r.Context()) + versionedObject := []*layer.VersionedObject{{ + Name: reqInfo.ObjectName, + VersionID: reqInfo.URL.Query().Get("versionId"), + }} if err := h.checkBucketOwner(r, reqInfo.BucketName); err != nil { h.logAndSendError(w, "expected owner doesn't match", reqInfo, err) return } - if err := h.obj.DeleteObject(r.Context(), reqInfo.BucketName, reqInfo.ObjectName); err != nil { + if errs := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, versionedObject); len(errs) != 0 && errs[0] != nil { h.log.Error("could not delete object", zap.String("request_id", reqInfo.RequestID), zap.String("bucket_name", reqInfo.BucketName), zap.String("object_name", reqInfo.ObjectName), - zap.Error(err)) + zap.Error(errs[0])) // Ignore delete errors: @@ -94,10 +100,14 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re } removed := make(map[string]struct{}) - toRemove := make([]string, 0, len(requested.Objects)) + toRemove := make([]*layer.VersionedObject, 0, len(requested.Objects)) for _, obj := range requested.Objects { - removed[obj.ObjectName] = struct{}{} - toRemove = append(toRemove, obj.ObjectName) + versionedObj := &layer.VersionedObject{ + Name: obj.ObjectName, + VersionID: obj.VersionID, + } + toRemove = append(toRemove, versionedObj) + removed[versionedObj.String()] = struct{}{} } response := &DeleteObjectsResponse{ @@ -110,9 +120,16 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re return } + marshaler := zapcore.ArrayMarshalerFunc(func(encoder zapcore.ArrayEncoder) error { + for _, obj := range toRemove { + encoder.AppendString(obj.String()) + } + return nil + }) + if errs := h.obj.DeleteObjects(r.Context(), reqInfo.BucketName, toRemove); errs != nil && !requested.Quiet { additional := []zap.Field{ - zap.Strings("objects_name", toRemove), + zap.Array("objects", marshaler), zap.Errors("errors", errs), } h.logAndSendError(w, "could not delete objects", reqInfo, nil, additional...) @@ -138,7 +155,7 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re } if err := api.EncodeToResponse(w, response); err != nil { - h.logAndSendError(w, "could not write response", reqInfo, err, zap.Strings("objects_name", toRemove)) + h.logAndSendError(w, "could not write response", reqInfo, err, zap.Array("objects", marshaler)) return } } diff --git a/api/handler/get.go b/api/handler/get.go index d18c1751..9a1c62f0 100644 --- a/api/handler/get.go +++ b/api/handler/get.go @@ -72,7 +72,7 @@ func writeHeaders(h http.Header, info *layer.ObjectInfo) { h.Set(api.LastModified, info.Created.UTC().Format(http.TimeFormat)) h.Set(api.ContentLength, strconv.FormatInt(info.Size, 10)) h.Set(api.ETag, info.HashSum) - h.Set(api.AmzVersionId, info.ID().String()) + h.Set(api.AmzVersionID, info.ID().String()) for key, val := range info.Headers { h[api.MetadataPrefix+key] = []string{val} diff --git a/api/headers.go b/api/headers.go index 303b46da..adf81c5c 100644 --- a/api/headers.go +++ b/api/headers.go @@ -4,7 +4,7 @@ package api const ( MetadataPrefix = "X-Amz-Meta-" AmzMetadataDirective = "X-Amz-Metadata-Directive" - AmzVersionId = "X-Amz-Version-Id" + AmzVersionID = "X-Amz-Version-Id" LastModified = "Last-Modified" Date = "Date" diff --git a/api/layer/layer.go b/api/layer/layer.go index bbd6e0f7..a0a45a50 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -1,6 +1,7 @@ package layer import ( + "bytes" "context" "crypto/ecdsa" "fmt" @@ -8,6 +9,7 @@ import ( "net/url" "sort" "strconv" + "strings" "time" "github.com/nspcc-dev/neofs-api-go/pkg/acl/eacl" @@ -128,6 +130,12 @@ type ( Encode string } + // VersionedObject stores object name and version. + VersionedObject struct { + Name string + VersionID string + } + // NeoFS provides basic NeoFS interface. NeoFS interface { Get(ctx context.Context, address *object.Address) (*object.Object, error) @@ -158,8 +166,7 @@ type ( ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*ListObjectsInfoV2, error) ListObjectVersions(ctx context.Context, p *ListObjectVersionsParams) (*ListObjectVersionsInfo, error) - DeleteObject(ctx context.Context, bucket, object string) error - DeleteObjects(ctx context.Context, bucket string, objects []string) []error + DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) []error } ) @@ -168,6 +175,10 @@ const ( bktVersionSettingsObject = ".s3-versioning-settings" ) +func (t *VersionedObject) String() string { + return t.Name + ":" + t.VersionID +} + // NewLayer creates instance of layer. It checks credentials // and establishes gRPC connection with node. func NewLayer(log *zap.Logger, conns pool.Pool, config *CacheConfig) Client { @@ -330,16 +341,21 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*Object } if len(p.VersionID) == 0 { - return n.headLastVersion(ctx, bkt, p.Object) + objInfo, err := n.headLastVersion(ctx, bkt, p.Object) + if err == nil { + if deleteMark, err2 := strconv.ParseBool(objInfo.Headers[versionsDeleteMarkAttr]); err2 == nil && deleteMark { + return nil, api.GetAPIError(api.ErrNoSuchKey) + } + } + return objInfo, err } - return n.headVersion(ctx, bkt, p.Object, p.VersionID) + return n.headVersion(ctx, bkt, p.VersionID) } func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *BucketInfo) (*ObjectInfo, error) { oid, err := n.objectFindID(ctx, &findParams{cid: bkt.CID, val: bktVersionSettingsObject}) if err != nil { - n.log.Error("could not find object id", zap.Error(err)) return nil, err } @@ -367,7 +383,12 @@ func (n *layer) getSettingsObjectInfo(ctx context.Context, bkt *BucketInfo) (*Ob // PutObject into storage. func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) { - return n.objectPut(ctx, p) + bkt, err := n.GetBucketInfo(ctx, p.Bucket) + if err != nil { + return nil, err + } + + return n.objectPut(ctx, bkt, p) } // CopyObject from one bucket into another bucket. @@ -395,35 +416,96 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInf } // DeleteObject removes all objects with passed nice name. -func (n *layer) DeleteObject(ctx context.Context, bucket, filename string) error { +func (n *layer) deleteObject(ctx context.Context, bkt *BucketInfo, obj *VersionedObject) error { var ( err error ids []*object.ID - bkt *BucketInfo ) - if bkt, err = n.GetBucketInfo(ctx, bucket); err != nil { - return &errors.DeleteError{ - Err: err, - Object: filename, + versioningEnabled := n.isVersioningEnabled(ctx, bkt) + if !versioningEnabled && obj.VersionID != "null" && obj.VersionID != "" { + return errors.GetAPIError(errors.ErrInvalidVersion) + } + + if versioningEnabled { + if len(obj.VersionID) != 0 { + id := object.NewID() + if err := id.Parse(obj.VersionID); err != nil { + return &errors.DeleteError{Err: api.GetAPIError(api.ErrInvalidVersion), Object: obj.String()} + } + ids = []*object.ID{id} + + lastObject, err := n.headLastVersion(ctx, bkt, obj.Name) + if err != nil { + return &api.DeleteError{Err: err, Object: obj.String()} + } + if !strings.Contains(lastObject.Headers[versionsAddAttr], obj.VersionID) || + strings.Contains(lastObject.Headers[versionsDelAttr], obj.VersionID) { + return &api.DeleteError{Err: api.GetAPIError(api.ErrInvalidVersion), Object: obj.String()} + } + + if lastObject.ID().String() == obj.VersionID { + if added := lastObject.Headers[versionsAddAttr]; len(added) > 0 { + addedVersions := strings.Split(added, ",") + sourceCopyVersion, err := n.headVersion(ctx, bkt, addedVersions[len(addedVersions)-1]) + if err != nil { + return &api.DeleteError{Err: err, Object: obj.String()} + } + p := &CopyObjectParams{ + SrcObject: sourceCopyVersion, + DstBucket: bkt.Name, + DstObject: obj.Name, + SrcSize: sourceCopyVersion.Size, + Header: map[string]string{versionsDelAttr: obj.VersionID}, + } + if _, err := n.CopyObject(ctx, p); err != nil { + return err + } + } else { + p := &PutObjectParams{ + Object: obj.Name, + Reader: bytes.NewReader(nil), + Header: map[string]string{ + versionsDelAttr: obj.VersionID, + versionsDeleteMarkAttr: strconv.FormatBool(true), + }, + } + if _, err := n.objectPut(ctx, bkt, p); err != nil { + return &api.DeleteError{Err: err, Object: obj.String()} + } + } + } else { + p := &CopyObjectParams{ + SrcObject: lastObject, + DstBucket: bkt.Name, + DstObject: obj.Name, + SrcSize: lastObject.Size, + Header: map[string]string{versionsDelAttr: obj.VersionID}, + } + if _, err := n.CopyObject(ctx, p); err != nil { + return err + } + } + } else { + p := &PutObjectParams{ + Object: obj.Name, + Reader: bytes.NewReader(nil), + Header: map[string]string{versionsDeleteMarkAttr: strconv.FormatBool(true)}, + } + if _, err := n.objectPut(ctx, bkt, p); err != nil { + return &errors.DeleteError{Err: err, Object: obj.String()} + } } - } else if ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: filename}); err != nil { - return &errors.DeleteError{ - Err: err, - Object: filename, + } else { + ids, err = n.objectSearch(ctx, &findParams{cid: bkt.CID, val: obj.Name}) + if err != nil { + return &errors.DeleteError{Err: err, Object: obj.String()} } } for _, id := range ids { - addr := object.NewAddress() - addr.SetObjectID(id) - addr.SetContainerID(bkt.CID) - - if err = n.objectDelete(ctx, addr); err != nil { - return &errors.DeleteError{ - Err: err, - Object: filename, - } + if err = n.objectDelete(ctx, bkt.CID, id); err != nil { + return &errors.DeleteError{Err: err, Object: obj.String()} } } @@ -431,11 +513,16 @@ func (n *layer) DeleteObject(ctx context.Context, bucket, filename string) error } // DeleteObjects from the storage. -func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []string) []error { +func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []*VersionedObject) []error { var errs = make([]error, 0, len(objects)) + bkt, err := n.GetBucketInfo(ctx, bucket) + if err != nil { + return append(errs, err) + } + for i := range objects { - if err := n.DeleteObject(ctx, bucket, objects[i]); err != nil { + if err := n.deleteObject(ctx, bkt, objects[i]); err != nil { errs = append(errs, err) } } @@ -461,6 +548,14 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { return err } + ids, err := n.objectSearch(ctx, &findParams{cid: bucketInfo.CID}) + if err != nil { + return err + } + if len(ids) != 0 { + return api.GetAPIError(api.ErrBucketNotEmpty) + } + return n.deleteContainer(ctx, bucketInfo.CID) } @@ -581,10 +676,7 @@ func (n *layer) PutBucketVersioning(ctx context.Context, p *PutVersioningParams) } if objectInfo != nil { - addr := object.NewAddress() - addr.SetObjectID(objectInfo.ID()) - addr.SetContainerID(bucketInfo.CID) - if err = n.objectDelete(ctx, addr); err != nil { + if err = n.objectDelete(ctx, bucketInfo.CID, objectInfo.ID()); err != nil { return nil, err } } diff --git a/api/layer/object.go b/api/layer/object.go index 44b2258d..75bfab91 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -64,8 +64,9 @@ type ( ) const ( - versionsDelAttr = "S3-Versions-del" - versionsAddAttr = "S3-Versions-add" + versionsDelAttr = "S3-Versions-del" + versionsAddAttr = "S3-Versions-add" + versionsDeleteMarkAttr = "S3-Versions-delete-mark" ) // objectSearch returns all available objects by search params. @@ -121,11 +122,10 @@ func (n *layer) objectRange(ctx context.Context, p *getParams) ([]byte, error) { } // objectPut into NeoFS, took payload from io.Reader. -func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) { +func (n *layer) objectPut(ctx context.Context, bkt *BucketInfo, p *PutObjectParams) (*ObjectInfo, error) { var ( err error obj string - bkt *BucketInfo own = n.Owner(ctx) ) @@ -136,9 +136,6 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, if obj, err = url.QueryUnescape(p.Object); err != nil { return nil, err } - if bkt, err = n.GetBucketInfo(ctx, p.Bucket); err != nil { - return nil, err - } versioningEnabled := n.isVersioningEnabled(ctx, bkt) lastVersionInfo, err := n.headLastVersion(ctx, bkt, p.Object) @@ -155,15 +152,18 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, versionsAddedStr += "," } versionsAddedStr += lastVersionInfo.ID().String() - addedVersions := object.NewAttribute() - addedVersions.SetKey(versionsAddAttr) - addedVersions.SetValue(versionsAddedStr) - attributes = append(attributes, addedVersions) - if delVersions := lastVersionInfo.Headers[versionsDelAttr]; len(delVersions) > 0 { - deletedVersions := object.NewAttribute() - deletedVersions.SetKey(versionsDelAttr) - deletedVersions.SetValue(delVersions) - attributes = append(attributes, deletedVersions) + p.Header[versionsAddAttr] = versionsAddedStr + + deleted := p.Header[versionsDelAttr] + if delVersions := lastVersionInfo.Headers[versionsDelAttr]; len(delVersions) != 0 { + if len(deleted) == 0 { + deleted = delVersions + } else { + deleted = delVersions + "," + deleted + } + } + if len(deleted) != 0 { + p.Header[versionsDelAttr] = deleted } } else { versionsDeletedStr := lastVersionInfo.Headers[versionsDelAttr] @@ -171,24 +171,19 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, versionsDeletedStr += "," } versionsDeletedStr += lastVersionInfo.ID().String() - deletedVersions := object.NewAttribute() - deletedVersions.SetKey(versionsDelAttr) - deletedVersions.SetValue(versionsDeletedStr) + p.Header[versionsDelAttr] = versionsDeletedStr - attributes = append(attributes, deletedVersions) idsToDeleteArr = append(idsToDeleteArr, lastVersionInfo.ID()) } } - unix := strconv.FormatInt(time.Now().UTC().Unix(), 10) - filename := object.NewAttribute() filename.SetKey(object.AttributeFileName) filename.SetValue(obj) createdAt := object.NewAttribute() createdAt.SetKey(object.AttributeTimestamp) - createdAt.SetValue(unix) + createdAt.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10)) attributes = append(attributes, filename, createdAt) @@ -240,11 +235,7 @@ func (n *layer) objectPut(ctx context.Context, p *PutObjectParams) (*ObjectInfo, } for _, id := range idsToDeleteArr { - addr := object.NewAddress() - addr.SetObjectID(id) - addr.SetContainerID(bkt.CID) - - if err = n.objectDelete(ctx, addr); err != nil { + if err = n.objectDelete(ctx, bkt.CID, id); err != nil { n.log.Warn("couldn't delete object", zap.Stringer("version id", id), zap.Error(err)) @@ -284,33 +275,28 @@ func (n *layer) headLastVersion(ctx context.Context, bkt *BucketInfo, objectName return objectInfoFromMeta(bkt, infos[len(infos)-1], "", ""), nil } -func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, objectName, versionID string) (*ObjectInfo, error) { - ids, err := n.objectSearch(ctx, &findParams{cid: bkt.CID, val: objectName}) - if err != nil { +func (n *layer) headVersion(ctx context.Context, bkt *BucketInfo, versionID string) (*ObjectInfo, error) { + oid := object.NewID() + if err := oid.Parse(versionID); err != nil { return nil, err } - if len(ids) == 0 { - return nil, api.GetAPIError(api.ErrNoSuchVersion) - } - for _, id := range ids { - if id.String() == versionID { - meta, err := n.objectHead(ctx, bkt.CID, id) - if err != nil { - if strings.Contains(err.Error(), "not found") { - return nil, api.GetAPIError(api.ErrNoSuchVersion) - } - return nil, err - } - return objectInfoFromMeta(bkt, meta, "", ""), nil + meta, err := n.objectHead(ctx, bkt.CID, oid) + if err != nil { + if strings.Contains(err.Error(), "not found") { + return nil, api.GetAPIError(api.ErrNoSuchVersion) } + return nil, err } - return nil, api.GetAPIError(api.ErrNoSuchVersion) + return objectInfoFromMeta(bkt, meta, "", ""), nil } // objectDelete puts tombstone object into neofs. -func (n *layer) objectDelete(ctx context.Context, address *object.Address) error { +func (n *layer) objectDelete(ctx context.Context, cid *cid.ID, oid *object.ID) error { + address := object.NewAddress() + address.SetContainerID(cid) + address.SetObjectID(oid) dop := new(client.DeleteObjectParams) dop.WithAddress(address) n.objCache.Delete(address) diff --git a/api/layer/util_test.go b/api/layer/util_test.go index 9b44d854..d393b143 100644 --- a/api/layer/util_test.go +++ b/api/layer/util_test.go @@ -48,6 +48,7 @@ func newTestInfo(oid *object.ID, bkt *BucketInfo, name string, isDir bool) *Obje id: oid, Name: name, Bucket: bkt.Name, + bucketID: bkt.CID, Size: defaultTestPayloadLength, ContentType: defaultTestContentType, Created: time.Unix(defaultTestCreated.Unix(), 0),