From bc0bdc7767638ed422bd767a3fe1fb9456ea8641 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 24 May 2022 14:30:37 +0300 Subject: [PATCH] [#417] Upload part using tree service Signed-off-by: Denis Kirillov --- api/cache/objects.go | 23 ++++++ api/data/tree.go | 11 +++ api/handler/multipart_upload.go | 4 +- api/layer/layer.go | 2 +- api/layer/multipart_upload.go | 108 ++++++++++++++++++++------- api/layer/tree_service.go | 8 +- internal/neofs/tree.go | 87 ++++++++++++++++++++- internal/neofstest/tree/tree_mock.go | 10 ++- 8 files changed, 221 insertions(+), 32 deletions(-) diff --git a/api/cache/objects.go b/api/cache/objects.go index 6628852..cedcc39 100644 --- a/api/cache/objects.go +++ b/api/cache/objects.go @@ -5,6 +5,7 @@ import ( "time" "github.com/bluele/gcache" + "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" @@ -55,6 +56,21 @@ func (o *ObjectsCache) Get(address oid.Address) *object.Object { return &result } +// GetObject returns a cached object info. +func (o *ObjectsCache) GetObject(address oid.Address) *data.ObjectInfo { + entry, err := o.cache.Get(address.EncodeToString()) + if err != nil { + return nil + } + + result, ok := entry.(*data.ObjectInfo) + if !ok { + return nil + } + + return result +} + // Put puts an object to cache. func (o *ObjectsCache) Put(obj object.Object) error { cnrID, ok := obj.ContainerID() @@ -73,6 +89,13 @@ func (o *ObjectsCache) Put(obj object.Object) error { return o.cache.Set(addr.EncodeToString(), obj) } +// PutObject puts an object info to cache. +func (o *ObjectsCache) PutObject(obj *data.ObjectInfo) error { + cnrID := obj.CID.EncodeToString() + objID := obj.ID.EncodeToString() + return o.cache.Set(cnrID+"/"+objID, obj) +} + // Delete deletes an object from cache. func (o *ObjectsCache) Delete(address oid.Address) bool { return o.cache.Remove(address.EncodeToString()) diff --git a/api/data/tree.go b/api/data/tree.go index 8401159..1563044 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -45,9 +45,20 @@ type ObjectTaggingInfo struct { // MultipartInfo is multipart upload information. type MultipartInfo struct { + // ID is node id in tree service. + // It's ignored when creating a new multipart upload. + ID uint64 Key string UploadID string Owner user.ID Created time.Time Meta map[string]string } + +// PartInfo is upload information about part. +type PartInfo struct { + Key string + UploadID string + Number int + OID oid.ID +} diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index a419a36..d92a9cc 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -216,13 +216,13 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) { Reader: r.Body, } - info, err := h.obj.UploadPart(r.Context(), p) + hash, err := h.obj.UploadPart(r.Context(), p) if err != nil { h.logAndSendError(w, "could not upload a part", reqInfo, err, additional...) return } - w.Header().Set(api.ETag, info.HashSum) + w.Header().Set(api.ETag, hash) api.WriteSuccessResponseHeadersOnly(w) } diff --git a/api/layer/layer.go b/api/layer/layer.go index c0cc831..cd8e3e1 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -233,7 +233,7 @@ type ( CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error) - UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error) + UploadPart(ctx context.Context, p *UploadPartParams) (string, error) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index acfcbcf..b5288b0 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -2,6 +2,7 @@ package layer import ( "context" + "encoding/hex" stderrors "errors" "fmt" "io" @@ -142,36 +143,91 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar info.Meta[tagPrefix+key] = val } - return n.treeService.CreateMultipart(ctx, &p.Info.Bkt.CID, info) + return n.treeService.CreateMultipartUpload(ctx, &p.Info.Bkt.CID, info) } -func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error) { - if p.PartNumber != 0 { - if _, err := n.GetUploadInitInfo(ctx, p.Info); err != nil { - return nil, err +func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) { + multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Info.Bkt.CID, p.Info.Key, p.Info.UploadID) + if err != nil { + if stderrors.Is(err, ErrNodeNotFound) { + return "", errors.GetAPIError(errors.ErrNoSuchUpload) } + return "", err } if p.Size > uploadMaxSize { - return nil, errors.GetAPIError(errors.ErrEntityTooLarge) + return "", errors.GetAPIError(errors.ErrEntityTooLarge) } - header := make(map[string]string) - appendUploadHeaders(header, p.Info.UploadID, p.Info.Key, p.PartNumber) - - params := &PutSystemObjectParams{ - BktInfo: p.Info.Bkt, - ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber), - Metadata: header, - Reader: p.Reader, - Size: p.Size, + objInfo, err := n.uploadPart(ctx, multipartInfo, p) + if err != nil { + return "", err } - return n.PutSystemObject(ctx, params) + return objInfo.HashSum, nil +} + +func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) { + bktInfo := p.Info.Bkt + prm := PrmObjectCreate{ + Container: bktInfo.CID, + Creator: bktInfo.Owner, + Attributes: make([][2]string, 2), + Payload: p.Reader, + } + + prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID + prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber) + + id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo) + if err != nil { + return nil, err + } + + partInfo := &data.PartInfo{ + Key: p.Info.Key, + UploadID: p.Info.UploadID, + Number: p.PartNumber, + OID: *id, + } + + oldPartID, err := n.treeService.AddPart(ctx, &bktInfo.CID, multipartInfo.ID, partInfo) + if err != nil { + return nil, err + } + if oldPartID != nil { + if err = n.objectDelete(ctx, bktInfo, *oldPartID); err != nil { + n.log.Error("couldn't delete old part object", zap.Error(err), + zap.String("cnrID", bktInfo.CID.EncodeToString()), + zap.String("bucket name", bktInfo.Name), + zap.String("objID", oldPartID.EncodeToString())) + } + } + + objInfo := &data.ObjectInfo{ + ID: *id, + CID: bktInfo.CID, + + Owner: bktInfo.Owner, + Bucket: bktInfo.Name, + Size: p.Size, + Created: time.Now(), + HashSum: hex.EncodeToString(hash), + } + + if err = n.objCache.PutObject(objInfo); err != nil { + n.log.Error("couldn't cache system object", zap.Error(err)) + } + + return objInfo, nil } func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) { - if _, err := n.GetUploadInitInfo(ctx, p.Info); err != nil { + multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Info.Bkt.CID, p.Info.Key, p.Info.UploadID) + if err != nil { + if stderrors.Is(err, ErrNodeNotFound) { + return nil, errors.GetAPIError(errors.ErrNoSuchUpload) + } return nil, err } @@ -192,7 +248,7 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. pr, pw := io.Pipe() go func() { - err := n.GetObject(ctx, &GetObjectParams{ + err = n.GetObject(ctx, &GetObjectParams{ ObjectInfo: p.SrcObjInfo, Writer: pw, Range: p.Range, @@ -204,14 +260,14 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. } }() - return n.PutSystemObject(ctx, &PutSystemObjectParams{ - BktInfo: p.Info.Bkt, - ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber), - Metadata: metadata, - Prefix: "", - Reader: pr, - Size: size, - }) + params := &UploadPartParams{ + Info: p.Info, + PartNumber: p.PartNumber, + Size: size, + Reader: pr, + } + + return n.uploadPart(ctx, multipartInfo, params) } // implements io.Reader of payloads of the object list stored in the NeoFS network. diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 05e747b..ae615db 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -50,8 +50,14 @@ type TreeService interface { GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.BaseNodeVersion, error) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error - CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error + CreateMultipartUpload(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) + GetMultipartUpload(ctx context.Context, cnrID *cid.ID, objectName, uploadID string) (*data.MultipartInfo, error) + + // AddPart puts a node to a system tree as a child of appropriate multipart upload + // and returns objectID of a previous part which must be deleted in NeoFS. + // If a part is being added for the first time, the previous part ID will be nil. + AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error) } // ErrNodeNotFound is returned from Tree service in case of not found error. diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 8da9a78..4a240de 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -55,6 +55,7 @@ const ( isUnversionedKV = "IsUnversioned" isTagKV = "isTag" uploadIDKV = "UploadId" + partNumberKV = "Number" // keys for delete marker nodes. isDeleteMarkerKV = "IdDeleteMarker" @@ -179,6 +180,7 @@ func newNodeVersionFromTreeNode(treeNode *TreeNode) *data.NodeVersion { func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) { multipartInfo := &data.MultipartInfo{ + ID: node.GetNodeId(), Meta: make(map[string]string, len(node.GetMeta())), } @@ -206,6 +208,27 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) { return multipartInfo, nil } +func newPartInfo(node NodeResponse) (*data.PartInfo, error) { + partInfo := &data.PartInfo{} + + for _, kv := range node.GetMeta() { + switch kv.GetKey() { + case partNumberKV: + partInfo.Number, _ = strconv.Atoi(string(kv.GetValue())) + case oidKV: + if err := partInfo.OID.DecodeString(string(kv.GetValue())); err != nil { + return nil, fmt.Errorf("invalid oid: %w", err) + } + } + } + + if partInfo.Number <= 0 { + return nil, fmt.Errorf("it's not a part node") + } + + return partInfo, nil +} + func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data.BucketSettings, error) { keysToReturn := []string{versioningEnabledKV, lockConfigurationKV} node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, keysToReturn) @@ -720,7 +743,7 @@ func (c *TreeClient) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, id return c.removeNode(ctx, cnrID, systemTree, id) } -func (c *TreeClient) CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error { +func (c *TreeClient) CreateMultipartUpload(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error { path := pathFromName(info.Key) meta := metaFromMultipart(info) @@ -763,6 +786,68 @@ func (c *TreeClient) getSubTreeMultipartUploads(ctx context.Context, cnrID *cid. return result, nil } +func (c *TreeClient) GetMultipartUpload(ctx context.Context, cnrID *cid.ID, objectName, uploadID string) (*data.MultipartInfo, error) { + path := pathFromName(objectName) + p := &getNodesParams{ + CnrID: cnrID, + TreeID: systemTree, + Path: path, + AllAttrs: true, + } + + nodes, err := c.getNodes(ctx, p) + if err != nil { + return nil, err + } + + for _, node := range nodes { + info, err := newMultipartInfo(node) + if err != nil { + continue + } + if info.UploadID == uploadID { + return info, nil + } + } + + return nil, layer.ErrNodeNotFound +} + +func (c *TreeClient) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error) { + parts, err := c.getSubTree(ctx, cnrID, systemTree, multipartNodeID, 1) + if err != nil { + return nil, err + } + + meta := map[string]string{ + partNumberKV: strconv.Itoa(info.Number), + oidKV: info.OID.EncodeToString(), + } + + var foundPartID uint64 + for _, part := range parts { + if part.GetNodeId() == multipartNodeID { + continue + } + partInfo, err := newPartInfo(part) + if err != nil { + continue + } + if partInfo.Number == info.Number { + foundPartID = part.GetNodeId() + oldObjIDToDelete = &partInfo.OID + break + } + } + + if oldObjIDToDelete == nil { + _, err = c.addNode(ctx, cnrID, systemTree, multipartNodeID, meta) + return nil, err + } + + return oldObjIDToDelete, c.moveNode(ctx, cnrID, systemTree, foundPartID, multipartNodeID, meta) +} + func (c *TreeClient) Close() error { if c.conn != nil { return c.conn.Close() diff --git a/internal/neofstest/tree/tree_mock.go b/internal/neofstest/tree/tree_mock.go index ee9af90..f10e976 100644 --- a/internal/neofstest/tree/tree_mock.go +++ b/internal/neofstest/tree/tree_mock.go @@ -236,10 +236,18 @@ func (t *TreeServiceMock) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid panic("implement me") } -func (t *TreeServiceMock) CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error { +func (t *TreeServiceMock) CreateMultipartUpload(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error { panic("implement me") } func (t *TreeServiceMock) GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) { panic("implement me") } + +func (t *TreeServiceMock) GetMultipartUpload(ctx context.Context, cnrID *cid.ID, objectName, uploadID string) (*data.MultipartInfo, error) { + panic("implement me") +} + +func (t *TreeServiceMock) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error) { + panic("implement me") +}