From 70957d75fdf9bbf7ec2cc86061f18a4f8b4570e1 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 24 May 2022 17:55:56 +0300 Subject: [PATCH] [#417] Complete multipart upload using tree service Signed-off-by: Denis Kirillov --- api/handler/multipart_upload.go | 61 +++++---------- api/layer/layer.go | 3 +- api/layer/multipart_upload.go | 127 ++++++++++++-------------------- api/layer/object.go | 17 ++++- internal/neofs/tree.go | 13 ++-- 5 files changed, 86 insertions(+), 135 deletions(-) diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 05a6cec..107d7d3 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -1,8 +1,6 @@ package handler import ( - "bytes" - "encoding/json" "encoding/xml" "net/http" "net/url" @@ -87,11 +85,6 @@ type ( ETag string `xml:"ETag"` LastModified string `xml:"LastModified"` } - - UploadData struct { - TagSet map[string]string - ACL *AccessControlPolicy - } ) const ( @@ -120,6 +113,7 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re Bkt: bktInfo, Key: reqInfo.ObjectName, }, + Data: &layer.UploadData{}, } if containsACLHeaders(r) { @@ -132,11 +126,11 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re h.logAndSendError(w, "could not parse acl", reqInfo, err) return } - p.ACLHeaders = formACLHeadersForMultipart(r.Header) + p.Data.ACLHeaders = formACLHeadersForMultipart(r.Header) } if len(r.Header.Get(api.AmzTagging)) > 0 { - p.TagSet, err = parseTaggingHeader(r.Header) + p.Data.TagSet, err = parseTaggingHeader(r.Header) if err != nil { h.logAndSendError(w, "could not parse tagging", reqInfo, err, additional...) return @@ -352,7 +346,6 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. Key: reqInfo.ObjectName, } additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)} - uploadData = &UploadData{} ) reqBody := new(CompleteMultipartUpload) @@ -366,42 +359,11 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. return } - initPart, err := h.obj.GetUploadInitInfo(r.Context(), uploadInfo) - if err != nil { - h.logAndSendError(w, "could not get multipart upload info", reqInfo, err, additional...) - return - } - - if initPart.Size > 0 { - initPartPayload := bytes.NewBuffer(make([]byte, 0, initPart.Size)) - p := &layer.GetObjectParams{ - ObjectInfo: initPart, - Writer: initPartPayload, - BucketInfo: bktInfo, - } - if err = h.obj.GetObject(r.Context(), p); err != nil { - h.logAndSendError(w, "could not get multipart upload acl and/or tagging", reqInfo, err, additional...) - return - } - - if err = json.Unmarshal(initPartPayload.Bytes(), uploadData); err != nil { - h.logAndSendError(w, "could not unmarshal multipart upload acl and/or tagging", reqInfo, err, additional...) - return - } - - if uploadData.ACL != nil { - if sessionTokenSetEACL, err = getSessionTokenSetEACL(r.Context()); err != nil { - h.logAndSendError(w, "couldn't get eacl token", reqInfo, err) - return - } - } - } - c := &layer.CompleteMultipartParams{ Info: uploadInfo, Parts: reqBody.Parts, } - objInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c) + uploadData, objInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c) if err != nil { h.logAndSendError(w, "could not complete multipart upload", reqInfo, err, additional...) return @@ -419,12 +381,23 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http. } } - if uploadData.ACL != nil { + if len(uploadData.ACLHeaders) != 0 { + key, err := h.bearerTokenIssuerKey(r.Context()) + if err != nil { + h.logAndSendError(w, "couldn't get gate key", reqInfo, err) + return + } + acl, err := parseACLHeaders(r.Header, key) + if err != nil { + h.logAndSendError(w, "could not parse acl", reqInfo, err) + return + } + resInfo := &resourceInfo{ Bucket: objInfo.Bucket, Object: objInfo.Name, } - astObject, err := aclToAst(uploadData.ACL, resInfo) + astObject, err := aclToAst(acl, resInfo) if err != nil { h.logAndSendError(w, "could not translate acl of completed multipart upload to ast", reqInfo, err, additional...) return diff --git a/api/layer/layer.go b/api/layer/layer.go index cd8e3e1..9adb7d0 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -232,13 +232,12 @@ type ( DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error - CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error) + CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ObjectInfo, error) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) - GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*data.ObjectInfo, error) PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 1a55cd0..734cfa6 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/internal/misc" @@ -46,8 +45,12 @@ type ( } CreateMultipartParams struct { - Info *UploadInfoParams - Header map[string]string + Info *UploadInfoParams + Header map[string]string + Data *UploadData + } + + UploadData struct { TagSet map[string]string ACLHeaders map[string]string } @@ -124,24 +127,32 @@ type ( ) func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error { + metaSize := len(p.Header) + if p.Data != nil { + metaSize += len(p.Data.ACLHeaders) + metaSize += len(p.Data.TagSet) + } + info := &data.MultipartInfo{ Key: p.Info.Key, UploadID: p.Info.UploadID, Owner: n.Owner(ctx), Created: time.Now(), - Meta: make(map[string]string, len(p.Header)+len(p.ACLHeaders)+len(p.TagSet)), + Meta: make(map[string]string, metaSize), } for key, val := range p.Header { info.Meta[metaPrefix+key] = val } - for key, val := range p.ACLHeaders { - info.Meta[aclPrefix+key] = val - } + if p.Data != nil { + for key, val := range p.Data.ACLHeaders { + info.Meta[aclPrefix+key] = val + } - for key, val := range p.TagSet { - info.Meta[tagPrefix+key] = val + for key, val := range p.Data.TagSet { + info.Meta[tagPrefix+key] = val + } } return n.treeService.CreateMultipartUpload(ctx, &p.Info.Bkt.CID, info) @@ -310,51 +321,20 @@ func (x *multiObjectReader) Read(p []byte) (n int, err error) { return n + next, err } -func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error) { - var ( - obj *data.ObjectInfo - partsAttrValue string - ) - +func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ObjectInfo, error) { for i := 1; i < len(p.Parts); i++ { if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber { - return nil, errors.GetAPIError(errors.ErrInvalidPartOrder) + return nil, nil, errors.GetAPIError(errors.ErrInvalidPartOrder) } } - _, objects, err := n.getUploadParts(ctx, p.Info) + multipartInfo, objects, err := n.getUploadParts(ctx, p.Info) // todo consider avoid heading objects if err != nil { - return nil, err + return nil, nil, err } - if len(objects) == 1 { - obj, err = n.headLastVersionIfNotDeleted(ctx, p.Info.Bkt, p.Info.Key) - if err != nil { - if errors.IsS3Error(err, errors.ErrNoSuchKey) { - return nil, errors.GetAPIError(errors.ErrInvalidPart) - } - return nil, err - } - if obj != nil && obj.Headers[UploadIDAttributeName] == p.Info.UploadID { - return obj, nil - } - return nil, errors.GetAPIError(errors.ErrInvalidPart) - } - - if _, ok := objects[0]; !ok { - n.log.Error("could not get init multipart upload", - zap.Stringer("bucket id", p.Info.Bkt.CID), - zap.String("uploadID", misc.SanitizeString(p.Info.UploadID)), - zap.String("uploadKey", p.Info.Key), - ) - // we return InternalError because if we are here it means we've checked InitPart in handler before and - // received successful result, it's strange we didn't get the InitPart again - return nil, errors.GetAPIError(errors.ErrInternalError) - } - - // keep in mind objects[0] is the init part - if len(objects) <= len(p.Parts) { - return nil, errors.GetAPIError(errors.ErrInvalidPart) + if len(objects) < len(p.Parts) { + return nil, nil, errors.GetAPIError(errors.ErrInvalidPart) } parts := make([]*data.ObjectInfo, 0, len(p.Parts)) @@ -362,31 +342,29 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar for i, part := range p.Parts { info := objects[part.PartNumber] if info == nil || part.ETag != info.HashSum { - return nil, errors.GetAPIError(errors.ErrInvalidPart) + return nil, nil, errors.GetAPIError(errors.ErrInvalidPart) } // for the last part we have no minimum size limit if i != len(p.Parts)-1 && info.Size < uploadMinSize { - return nil, errors.GetAPIError(errors.ErrEntityTooSmall) + return nil, nil, errors.GetAPIError(errors.ErrEntityTooSmall) } parts = append(parts, info) - partsAttrValue += strconv.Itoa(part.PartNumber) + "=" + strconv.FormatInt(info.Size, 10) + "," } - initMetadata := objects[0].Headers - if len(objects[0].ContentType) != 0 { - initMetadata[api.ContentType] = objects[0].ContentType + initMetadata := make(map[string]string, len(multipartInfo.Meta)) + uploadData := &UploadData{ + TagSet: make(map[string]string), + ACLHeaders: make(map[string]string), + } + for key, val := range multipartInfo.Meta { + if strings.HasPrefix(key, metaPrefix) { + initMetadata[strings.TrimPrefix(key, metaPrefix)] = val + } else if strings.HasPrefix(key, tagPrefix) { + uploadData.TagSet[strings.TrimPrefix(key, tagPrefix)] = val + } else if strings.HasPrefix(key, aclPrefix) { + uploadData.ACLHeaders[strings.TrimPrefix(key, aclPrefix)] = val + } } - - /* We will keep "S3-Upload-Id" attribute in a completed object to determine if it is a "common" object or a completed object. - We will need to differ these objects if something goes wrong during completing multipart upload. - I.e. we had completed the object but didn't put tagging/acl for some reason */ - delete(initMetadata, UploadPartNumberAttributeName) - delete(initMetadata, UploadKeyAttributeName) - delete(initMetadata, attrVersionsIgnore) - delete(initMetadata, objectSystemAttributeName) - delete(initMetadata, versionsUnversionedAttr) - - initMetadata[UploadCompletedParts] = partsAttrValue[:len(partsAttrValue)-1] r := &multiObjectReader{ ctx: ctx, @@ -396,7 +374,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar r.prm.bktInfo = p.Info.Bkt - obj, err = n.PutObject(ctx, &PutObjectParams{ + obj, err := n.PutObject(ctx, &PutObjectParams{ BktInfo: p.Info.Bkt, Object: p.Info.Key, Reader: r, @@ -408,9 +386,11 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar zap.String("uploadKey", p.Info.Key), zap.Error(err)) - return nil, errors.GetAPIError(errors.ErrInternalError) + return nil, nil, errors.GetAPIError(errors.ErrInternalError) } + var addr oid.Address + addr.SetContainer(p.Info.Bkt.CID) for partNum, objInfo := range objects { if partNum == 0 { continue @@ -421,10 +401,11 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar zap.Stringer("bucket id", p.Info.Bkt.CID), zap.Error(err)) } - n.systemCache.Delete(systemObjectKey(p.Info.Bkt, FormUploadPartName(p.Info.UploadID, p.Info.Key, partNum))) + addr.SetObject(objInfo.ID) + n.objCache.Delete(addr) } - return obj, nil + return uploadData, obj, n.treeService.DeleteMultipartUpload(ctx, &p.Info.Bkt.CID, multipartInfo.ID) } func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) { @@ -549,18 +530,6 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn return &res, nil } -func (n *layer) GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*data.ObjectInfo, error) { - info, err := n.HeadSystemObject(ctx, p.Bkt, FormUploadPartName(p.UploadID, p.Key, 0)) - if err != nil { - if errors.IsS3Error(err, errors.ErrNoSuchKey) { - return nil, errors.GetAPIError(errors.ErrNoSuchUpload) - } - return nil, err - } - - return info, nil -} - func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.ObjectInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Bkt.CID, p.Key, p.UploadID) if err != nil { diff --git a/api/layer/object.go b/api/layer/object.go index 3b0c4ce..417e08d 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -237,7 +237,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID) - return &data.ObjectInfo{ + objInfo := &data.ObjectInfo{ ID: *id, CID: p.BktInfo.CID, @@ -250,7 +250,20 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object Headers: p.Header, ContentType: p.Header[api.ContentType], HashSum: hex.EncodeToString(hash), - }, nil + } + + if err = n.objCache.PutObject(objInfo); err != nil { + n.log.Warn("couldn't add object to cache", zap.Error(err), + zap.String("object_name", p.Object), zap.String("bucket_name", p.BktInfo.Name), + zap.String("cid", objInfo.CID.EncodeToString()), zap.String("oid", objInfo.ID.EncodeToString())) + } + if err = n.namesCache.Put(objInfo.NiceName(), objInfo.Address()); err != nil { + n.log.Warn("couldn't put obj address to name cache", + zap.String("obj nice name", objInfo.NiceName()), + zap.Error(err)) + } + + return objInfo, nil } func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objName string, lock *data.ObjectLock) error { diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 27f90f5..407e91e 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -689,9 +689,6 @@ func (c *TreeClient) getLatestVersion(ctx context.Context, cnrID *cid.ID, treeID } nodes, err := c.getNodes(ctx, p) if err != nil { - if strings.Contains(err.Error(), "not found") { - return nil, layer.ErrNodeNotFound - } return nil, fmt.Errorf("couldn't get nodes: %w", err) } @@ -984,7 +981,7 @@ func (c *TreeClient) getSubTree(ctx context.Context, cnrID *cid.ID, treeID strin cli, err := c.service.GetSubTree(ctx, request) if err != nil { if strings.Contains(err.Error(), "not found") { - return nil, nil + return nil, layer.ErrNodeNotFound } return nil, fmt.Errorf("failed to get sub tree client: %w", err) } @@ -996,7 +993,7 @@ func (c *TreeClient) getSubTree(ctx context.Context, cnrID *cid.ID, treeID strin break } else if err != nil { if strings.Contains(err.Error(), "not found") { - return nil, nil + return nil, layer.ErrNodeNotFound } return nil, fmt.Errorf("failed to get sub tree: %w", err) } @@ -1044,9 +1041,6 @@ func (c *TreeClient) getNode(ctx context.Context, cnrID *cid.ID, treeID string, } nodes, err := c.getNodes(ctx, p) if err != nil { - if strings.Contains(err.Error(), "not found") { - return nil, layer.ErrNodeNotFound - } return nil, fmt.Errorf("couldn't get nodes: %w", err) } if len(nodes) == 0 { @@ -1084,6 +1078,9 @@ func (c *TreeClient) getNodes(ctx context.Context, p *getNodesParams) ([]*tree.G resp, err := c.service.GetNodeByPath(ctx, request) if err != nil { + if strings.Contains(err.Error(), "not found") { + return nil, layer.ErrNodeNotFound + } return nil, fmt.Errorf("failed to get node path: %w", err) }