From 13e01164d72c32fb34a42b53d6fd9b2fd4435fd9 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 23 May 2022 17:34:13 +0300 Subject: [PATCH] [#417] Create multipart upload using tree service Signed-off-by: Denis Kirillov Signed-off-by: Alex Vanin --- api/data/tree.go | 8 ++++ api/handler/multipart_upload.go | 78 +++++++++++++++---------------- api/layer/layer.go | 1 + api/layer/multipart_upload.go | 45 ++++++++++++++---- api/layer/tree_service.go | 2 + internal/neofs/tree.go | 82 ++++++++++++++++++++------------- 6 files changed, 135 insertions(+), 81 deletions(-) diff --git a/api/data/tree.go b/api/data/tree.go index f2312a8..91e85a1 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -42,3 +42,11 @@ type ObjectTaggingInfo struct { ObjName string VersionID string } + +// MultipartInfo is multipart upload information. +type MultipartInfo struct { + UploadID string + Owner user.ID + Created time.Time + Meta map[string]string +} diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index ed1f2f0..a419a36 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -100,8 +100,6 @@ const ( ) func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { - /* initiation of multipart uploads is implemented via creation of "system" upload part with 0 part number - (min value of partNumber of a common part is 1) and holding data: metadata, acl, tagging */ reqInfo := api.GetReqInfo(r.Context()) bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName) @@ -110,22 +108,19 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re return } - var ( - hasData bool - b []byte + uploadID := uuid.New() + additional := []zap.Field{ + zap.String("uploadID", uploadID.String()), + zap.String("Key", reqInfo.ObjectName), + } - uploadID = uuid.New() - data = &UploadData{} - additional = []zap.Field{ - zap.String("uploadID", uploadID.String()), - zap.String("Key", reqInfo.ObjectName), - } - uploadInfo = &layer.UploadInfoParams{ + p := &layer.CreateMultipartParams{ + Info: &layer.UploadInfoParams{ UploadID: uploadID.String(), Bkt: bktInfo, Key: reqInfo.ObjectName, - } - ) + }, + } if containsACLHeaders(r) { key, err := h.bearerTokenIssuerKey(r.Context()) @@ -133,53 +128,35 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re h.logAndSendError(w, "couldn't get gate key", reqInfo, err) return } - data.ACL, err = parseACLHeaders(r.Header, key) - if err != nil { + if _, err = parseACLHeaders(r.Header, key); err != nil { h.logAndSendError(w, "could not parse acl", reqInfo, err) return } - hasData = true + p.ACLHeaders = formACLHeadersForMultipart(r.Header) } if len(r.Header.Get(api.AmzTagging)) > 0 { - data.TagSet, err = parseTaggingHeader(r.Header) + p.TagSet, err = parseTaggingHeader(r.Header) if err != nil { h.logAndSendError(w, "could not parse tagging", reqInfo, err, additional...) return } - hasData = true } - metadata := parseMetadata(r) + p.Header = parseMetadata(r) if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 { - metadata[api.ContentType] = contentType + p.Header[api.ContentType] = contentType } - p := &layer.UploadPartParams{ - Info: uploadInfo, - PartNumber: 0, - Header: metadata, - } - - if hasData { - b, err = json.Marshal(data) - if err != nil { - h.logAndSendError(w, "could not marshal json with acl and/or tagging", reqInfo, err, additional...) - return - } - p.Reader = bytes.NewReader(b) - } - - info, err := h.obj.UploadPart(r.Context(), p) - if err != nil { + if err = h.obj.CreateMultipartUpload(r.Context(), p); err != nil { h.logAndSendError(w, "could not upload a part", reqInfo, err, additional...) return } resp := InitiateMultipartUploadResponse{ - Bucket: info.Bucket, - Key: info.Headers[layer.UploadKeyAttributeName], - UploadID: info.Headers[layer.UploadIDAttributeName], + Bucket: reqInfo.BucketName, + Key: reqInfo.ObjectName, + UploadID: uploadID.String(), } if err = api.EncodeToResponse(w, resp); err != nil { @@ -188,6 +165,25 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re } } +func formACLHeadersForMultipart(header http.Header) map[string]string { + result := make(map[string]string) + + if value := header.Get(api.AmzACL); value != "" { + result[api.AmzACL] = value + } + if value := header.Get(api.AmzGrantRead); value != "" { + result[api.AmzGrantRead] = value + } + if value := header.Get(api.AmzGrantFullControl); value != "" { + result[api.AmzGrantFullControl] = value + } + if value := header.Get(api.AmzGrantWrite); value != "" { + result[api.AmzGrantWrite] = value + } + + return result +} + func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) { reqInfo := api.GetReqInfo(r.Context()) diff --git a/api/layer/layer.go b/api/layer/layer.go index 3ecbd9a..c0cc831 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -231,6 +231,7 @@ type ( DeleteObjects(ctx context.Context, p *DeleteObjectParams) ([]*VersionedObject, error) DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error + CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error) UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index ebe4a7d..e8c74eb 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -26,6 +26,9 @@ const ( UploadCompletedParts = "S3-Completed-Parts" UploadPartKeyPrefix = ".upload-" + metaPrefix = "meta-" + aclPrefix = "acl-" + MaxSizeUploadsList = 1000 MaxSizePartsList = 1000 UploadMinPartNumber = 1 @@ -41,12 +44,18 @@ type ( Key string } + CreateMultipartParams struct { + Info *UploadInfoParams + Header map[string]string + TagSet map[string]string + ACLHeaders map[string]string + } + UploadPartParams struct { Info *UploadInfoParams PartNumber int Size int64 Reader io.Reader - Header map[string]string } UploadCopyParams struct { @@ -113,6 +122,29 @@ type ( } ) +func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error { + info := &data.MultipartInfo{ + UploadID: p.Info.UploadID, + Owner: n.Owner(ctx), + Created: time.Now(), + Meta: make(map[string]string, len(p.Header)+len(p.ACLHeaders)+len(p.TagSet)), + } + + for key, val := range p.Header { + info.Meta[metaPrefix+key] = val + } + + for key, val := range p.ACLHeaders { + info.Meta[aclPrefix+key] = val + } + + for key, val := range p.TagSet { + info.Meta[tagPrefix+key] = val + } + + return n.treeService.CreateMultipart(ctx, &p.Info.Bkt.CID, p.Info.Key, info) +} + func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error) { if p.PartNumber != 0 { if _, err := n.GetUploadInitInfo(ctx, p.Info); err != nil { @@ -124,17 +156,13 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.Obje return nil, errors.GetAPIError(errors.ErrEntityTooLarge) } - if p.Header == nil { - p.Header = make(map[string]string) - } - - appendUploadHeaders(p.Header, p.Info.UploadID, p.Info.Key, p.PartNumber) + header := make(map[string]string) + appendUploadHeaders(header, p.Info.UploadID, p.Info.Key, p.PartNumber) params := &PutSystemObjectParams{ BktInfo: p.Info.Bkt, ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber), - Metadata: p.Header, - Prefix: "", + Metadata: header, Reader: p.Reader, Size: p.Size, } @@ -609,5 +637,4 @@ func uploadInfoFromMeta(meta *object.Object, prefix, delimiter string) *UploadIn func appendUploadHeaders(metadata map[string]string, uploadID, key string, partNumber int) { metadata[UploadIDAttributeName] = uploadID metadata[UploadPartNumberAttributeName] = strconv.Itoa(partNumber) - metadata[UploadKeyAttributeName] = key } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 6203b06..c1507cd 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -49,6 +49,8 @@ type TreeService interface { AddSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *data.BaseNodeVersion) error GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.BaseNodeVersion, error) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error + + CreateMultipart(ctx context.Context, cnrID *cid.ID, objectName string, info *data.MultipartInfo) error } // ErrNodeNotFound is returned from Tree service in case of not found error. diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 910e5be..7a62ccf 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -39,7 +39,6 @@ type ( getNodesParams struct { CnrID *cid.ID TreeID string - PathAttr string Path []string Meta []string LatestOnly bool @@ -55,8 +54,9 @@ const ( systemNameKV = "SystemName" isUnversionedKV = "IsUnversioned" isTagKV = "isTag" + uploadIDKV = "UploadId" - // keys for delete marker nodes + // keys for delete marker nodes. isDeleteMarkerKV = "IdDeleteMarker" filePathKV = "FilePath" ownerKV = "Owner" @@ -179,7 +179,7 @@ func newNodeVersionFromTreeNode(treeNode *TreeNode) *data.NodeVersion { func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data.BucketSettings, error) { keysToReturn := []string{versioningEnabledKV, lockConfigurationKV} - node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{settingsFileName}, keysToReturn) + node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, keysToReturn) if err != nil { return nil, fmt.Errorf("couldn't get node: %w", err) } @@ -202,7 +202,7 @@ func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data. } func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, settings *data.BucketSettings) error { - node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{settingsFileName}, []string{}) + node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, []string{}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return fmt.Errorf("couldn't get node: %w", err) @@ -219,7 +219,7 @@ func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, setting } func (c *TreeClient) GetNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{notifConfFileName}, []string{oidKV}) + node, err := c.getSystemNode(ctx, cnrID, []string{notifConfFileName}, []string{oidKV}) if err != nil { return nil, err } @@ -228,7 +228,7 @@ func (c *TreeClient) GetNotificationConfigurationNode(ctx context.Context, cnrID } func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{notifConfFileName}, []string{oidKV}) + node, err := c.getSystemNode(ctx, cnrID, []string{notifConfFileName}, []string{oidKV}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return nil, fmt.Errorf("couldn't get node: %w", err) @@ -247,7 +247,7 @@ func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID } func (c *TreeClient) GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{corsFilename}, []string{oidKV}) + node, err := c.getSystemNode(ctx, cnrID, []string{corsFilename}, []string{oidKV}) if err != nil { return nil, err } @@ -256,7 +256,7 @@ func (c *TreeClient) GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, } func (c *TreeClient) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{corsFilename}, []string{oidKV}) + node, err := c.getSystemNode(ctx, cnrID, []string{corsFilename}, []string{oidKV}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return nil, fmt.Errorf("couldn't get node: %w", err) @@ -275,7 +275,7 @@ func (c *TreeClient) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oi } func (c *TreeClient) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{corsFilename}, []string{oidKV}) + node, err := c.getSystemNode(ctx, cnrID, []string{corsFilename}, []string{oidKV}) if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { return nil, err } @@ -344,7 +344,7 @@ func (c *TreeClient) DeleteObjectTagging(ctx context.Context, cnrID *cid.ID, obj } func (c *TreeClient) GetBucketTagging(ctx context.Context, cnrID *cid.ID) (map[string]string, error) { - node, err := c.getSystemNodeWithAllAttributes(ctx, cnrID, systemTree, []string{bucketTaggingFilename}) + node, err := c.getSystemNodeWithAllAttributes(ctx, cnrID, []string{bucketTaggingFilename}) if err != nil { if strings.Contains(err.Error(), "not found") { return nil, layer.ErrNodeNotFound @@ -364,7 +364,7 @@ func (c *TreeClient) GetBucketTagging(ctx context.Context, cnrID *cid.ID) (map[s } func (c *TreeClient) PutBucketTagging(ctx context.Context, cnrID *cid.ID, tagSet map[string]string) error { - node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{bucketTaggingFilename}, []string{}) + node, err := c.getSystemNode(ctx, cnrID, []string{bucketTaggingFilename}, []string{}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return fmt.Errorf("couldn't get node: %w", err) @@ -387,7 +387,7 @@ func (c *TreeClient) PutBucketTagging(ctx context.Context, cnrID *cid.ID, tagSet } func (c *TreeClient) DeleteBucketTagging(ctx context.Context, cnrID *cid.ID) error { - node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{bucketTaggingFilename}, nil) + node, err := c.getSystemNode(ctx, cnrID, []string{bucketTaggingFilename}, nil) if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { return err } @@ -429,7 +429,7 @@ func (c *TreeClient) GetLatestVersion(ctx context.Context, cnrID *cid.ID, object meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV} path := pathFromName(objectName) - return c.getLatestVersion(ctx, cnrID, versionTree, fileNameKV, path, meta) + return c.getLatestVersion(ctx, cnrID, versionTree, path, meta) } // pathFromName splits name by '/' and add an empty marker if name has trailing slash. @@ -483,7 +483,6 @@ func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, prefixP p := &getNodesParams{ CnrID: cnrID, TreeID: versionTree, - PathAttr: fileNameKV, Path: prefixPath, Meta: []string{fileNameKV, oidKV}, LatestOnly: false, @@ -617,18 +616,17 @@ func (c *TreeClient) GetSystemVersion(ctx context.Context, cnrID *cid.ID, object meta := []string{oidKV} path := pathFromName(objectName) - node, err := c.getLatestVersion(ctx, cnrID, systemTree, systemNameKV, path, meta) + node, err := c.getLatestVersion(ctx, cnrID, systemTree, path, meta) if err != nil { return nil, err } return &node.BaseNodeVersion, nil } -func (c *TreeClient) getLatestVersion(ctx context.Context, cnrID *cid.ID, treeID, attrPath string, path, meta []string) (*data.NodeVersion, error) { +func (c *TreeClient) getLatestVersion(ctx context.Context, cnrID *cid.ID, treeID string, path, meta []string) (*data.NodeVersion, error) { p := &getNodesParams{ CnrID: cnrID, TreeID: treeID, - PathAttr: attrPath, Path: path, Meta: meta, LatestOnly: true, @@ -671,7 +669,7 @@ func (c *TreeClient) getUnversioned(ctx context.Context, cnrID *cid.ID, treeID, } func (c *TreeClient) AddVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *data.NodeVersion) error { - return c.addVersion(ctx, cnrID, versionTree, fileNameKV, filepath, version) + return c.addVersion(ctx, cnrID, versionTree, filepath, version) } func (c *TreeClient) AddSystemVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *data.BaseNodeVersion) error { @@ -679,7 +677,7 @@ func (c *TreeClient) AddSystemVersion(ctx context.Context, cnrID *cid.ID, filepa BaseNodeVersion: *version, IsUnversioned: true, } - return c.addVersion(ctx, cnrID, systemTree, systemNameKV, filepath, newVersion) + return c.addVersion(ctx, cnrID, systemTree, filepath, newVersion) } func (c *TreeClient) RemoveVersion(ctx context.Context, cnrID *cid.ID, id uint64) error { @@ -690,6 +688,13 @@ func (c *TreeClient) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, id return c.removeNode(ctx, cnrID, systemTree, id) } +func (c *TreeClient) CreateMultipart(ctx context.Context, cnrID *cid.ID, objectName string, info *data.MultipartInfo) error { + path := pathFromName(objectName) + meta := metaFromMultipart(info) + + return c.addNodeByPath(ctx, cnrID, systemTree, path, meta) +} + func (c *TreeClient) Close() error { if c.conn != nil { return c.conn.Close() @@ -698,11 +703,11 @@ func (c *TreeClient) Close() error { return nil } -func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, attrPath, filepath string, version *data.NodeVersion) error { +func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, filepath string, version *data.NodeVersion) error { path := pathFromName(filepath) meta := map[string]string{ - oidKV: version.OID.EncodeToString(), - attrPath: path[len(path)-1], + oidKV: version.OID.EncodeToString(), + pathAttributeFromTreeID(treeID): path[len(path)-1], } if version.DeleteMarker != nil { @@ -739,7 +744,6 @@ func (c *TreeClient) getVersions(ctx context.Context, cnrID *cid.ID, treeID, fil p := &getNodesParams{ CnrID: cnrID, TreeID: treeID, - PathAttr: fileNameKV, Path: path, Meta: keysToReturn, LatestOnly: false, @@ -834,19 +838,26 @@ func metaFromSettings(settings *data.BucketSettings) map[string]string { return results } -func (c *TreeClient) getSystemNode(ctx context.Context, cnrID *cid.ID, treeID string, path, meta []string) (*TreeNode, error) { - return c.getNode(ctx, cnrID, treeID, systemNameKV, path, meta, false) +func metaFromMultipart(info *data.MultipartInfo) map[string]string { + info.Meta[uploadIDKV] = info.UploadID + info.Meta[ownerKV] = info.Owner.EncodeToString() + info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10) + + return info.Meta } -func (c *TreeClient) getSystemNodeWithAllAttributes(ctx context.Context, cnrID *cid.ID, treeID string, path []string) (*TreeNode, error) { - return c.getNode(ctx, cnrID, treeID, systemNameKV, path, []string{}, true) +func (c *TreeClient) getSystemNode(ctx context.Context, cnrID *cid.ID, path, meta []string) (*TreeNode, error) { + return c.getNode(ctx, cnrID, systemTree, path, meta, false) } -func (c *TreeClient) getNode(ctx context.Context, cnrID *cid.ID, treeID, pathAttr string, path, meta []string, allAttrs bool) (*TreeNode, error) { +func (c *TreeClient) getSystemNodeWithAllAttributes(ctx context.Context, cnrID *cid.ID, path []string) (*TreeNode, error) { + return c.getNode(ctx, cnrID, systemTree, path, []string{}, true) +} + +func (c *TreeClient) getNode(ctx context.Context, cnrID *cid.ID, treeID string, path, meta []string, allAttrs bool) (*TreeNode, error) { p := &getNodesParams{ CnrID: cnrID, TreeID: treeID, - PathAttr: pathAttr, Path: path, Meta: meta, LatestOnly: false, @@ -876,7 +887,7 @@ func (c *TreeClient) getNodes(ctx context.Context, p *getNodesParams) ([]*tree.G TreeId: p.TreeID, Path: p.Path, Attributes: p.Meta, - PathAttribute: p.PathAttr, + PathAttribute: pathAttributeFromTreeID(p.TreeID), LatestOnly: p.LatestOnly, AllAttributes: p.AllAttrs, BearerToken: getBearer(ctx), @@ -943,7 +954,7 @@ func (c *TreeClient) addNodeByPath(ctx context.Context, cnrID *cid.ID, treeID st TreeId: treeID, Path: path, Meta: metaToKV(meta), - PathAttribute: fileNameKV, + PathAttribute: pathAttributeFromTreeID(treeID), BearerToken: getBearer(ctx), }, } @@ -961,6 +972,15 @@ func (c *TreeClient) addNodeByPath(ctx context.Context, cnrID *cid.ID, treeID st return err } +func pathAttributeFromTreeID(treeID string) string { + switch treeID { + case systemTree: + return systemNameKV + default: + return fileNameKV + } +} + func (c *TreeClient) moveNode(ctx context.Context, cnrID *cid.ID, treeID string, nodeID, parentID uint64, meta map[string]string) error { request := &tree.MoveRequest{ Body: &tree.MoveRequest_Body{