From 58f2bf44b11f1dce84e5e1d6b97891fe9dae5f77 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 24 May 2022 16:07:47 +0300 Subject: [PATCH] [#417] List parts using tree service Signed-off-by: Denis Kirillov --- api/layer/multipart_upload.go | 71 +++++++++++++--------------- api/layer/tree_service.go | 1 + internal/neofs/tree.go | 21 ++++++++ internal/neofstest/tree/tree_mock.go | 4 ++ 4 files changed, 59 insertions(+), 38 deletions(-) diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index b5288b0b..0c847852 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -15,6 +15,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/internal/misc" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/zap" ) @@ -321,7 +322,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar } } - objects, err := n.getUploadParts(ctx, p.Info) + _, objects, err := n.getUploadParts(ctx, p.Info) if err != nil { return nil, err } @@ -487,7 +488,7 @@ func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload } func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error { - objects, err := n.getUploadParts(ctx, p) + _, objects, err := n.getUploadParts(ctx, p) if err != nil { return err } @@ -503,12 +504,12 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) { var res ListPartsInfo - objs, err := n.getUploadParts(ctx, p.Info) + multipartInfo, objs, err := n.getUploadParts(ctx, p.Info) // todo consider listing without head object from NeoFS if err != nil { return nil, err } - res.Owner = objs[0].Owner + res.Owner = multipartInfo.Owner parts := make([]*Part, 0, len(objs)) @@ -560,51 +561,45 @@ func (n *layer) GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*da return info, nil } -func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (map[int]*data.ObjectInfo, error) { - // we search by UploadID attribute because parts are system objects which have system name not filename - // and search in attributes by prefix is not supported - f := &findParams{ - attr: [2]string{UploadIDAttributeName, p.UploadID}, - bkt: p.Bkt, +func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.ObjectInfo, error) { + multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Bkt.CID, p.Key, p.UploadID) + if err != nil { + if stderrors.Is(err, ErrNodeNotFound) { + return nil, nil, errors.GetAPIError(errors.ErrNoSuchUpload) + } + return nil, nil, err } - ids, err := n.objectSearch(ctx, f) + parts, err := n.treeService.GetParts(ctx, &p.Bkt.CID, multipartInfo.ID) if err != nil { - return nil, err + return nil, nil, err } res := make(map[int]*data.ObjectInfo) + var addr oid.Address + addr.SetContainer(p.Bkt.CID) + for _, part := range parts { + addr.SetObject(part.OID) + objInfo := n.objCache.GetObject(addr) + if objInfo == nil { + meta, err := n.objectHead(ctx, p.Bkt, part.OID) + if err != nil { + n.log.Warn("couldn't head a part of upload", + zap.String("object id", part.OID.EncodeToString()), + zap.String("bucket id", p.Bkt.CID.EncodeToString()), + zap.Error(err)) + continue + } + objInfo = objInfoFromMeta(p.Bkt, meta) + } - for i := range ids { - meta, err := n.objectHead(ctx, p.Bkt, ids[i]) - if err != nil { - n.log.Warn("couldn't head a part of upload", - zap.Stringer("object id", &ids[i]), - zap.Stringer("bucket id", p.Bkt.CID), - zap.Error(err)) - continue - } - info := objInfoFromMeta(p.Bkt, meta) - // skip objects which are completed by "complete-multipart-upload" because they have "s3-Upload-Id" attribute - if !isSystem(info) { - continue - } - numStr := info.Headers[UploadPartNumberAttributeName] - num, err := strconv.Atoi(numStr) - if err != nil { - return nil, errors.GetAPIError(errors.ErrInternalError) - } - res[num] = info - if err = n.systemCache.PutObject(systemObjectKey(p.Bkt, FormUploadPartName(p.UploadID, p.Key, num)), info); err != nil { + res[part.Number] = objInfo + if err = n.objCache.PutObject(objInfo); err != nil { n.log.Warn("couldn't cache upload part", zap.Error(err)) } } - if len(res) == 0 { - return nil, errors.GetAPIError(errors.ErrNoSuchUpload) - } - - return res, nil + return multipartInfo, res, nil } func FormUploadPartName(uploadID, key string, partNumber int) string { diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index ae615dba..52edc940 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -58,6 +58,7 @@ type TreeService interface { // and returns objectID of a previous part which must be deleted in NeoFS. // If a part is being added for the first time, the previous part ID will be nil. AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error) + GetParts(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64) ([]*data.PartInfo, error) } // ErrNodeNotFound is returned from Tree service in case of not found error. diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 4a240de4..71fe767e 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -848,6 +848,27 @@ func (c *TreeClient) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID return oldObjIDToDelete, c.moveNode(ctx, cnrID, systemTree, foundPartID, multipartNodeID, meta) } +func (c *TreeClient) GetParts(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64) ([]*data.PartInfo, error) { + parts, err := c.getSubTree(ctx, cnrID, systemTree, multipartNodeID, 1) + if err != nil { + return nil, err + } + + result := make([]*data.PartInfo, 0, len(parts)) + for _, part := range parts { + if part.GetNodeId() == multipartNodeID { + continue + } + partInfo, err := newPartInfo(part) + if err != nil { + continue + } + result = append(result, partInfo) + } + + return result, nil +} + func (c *TreeClient) Close() error { if c.conn != nil { return c.conn.Close() diff --git a/internal/neofstest/tree/tree_mock.go b/internal/neofstest/tree/tree_mock.go index f10e976b..ead70912 100644 --- a/internal/neofstest/tree/tree_mock.go +++ b/internal/neofstest/tree/tree_mock.go @@ -251,3 +251,7 @@ func (t *TreeServiceMock) GetMultipartUpload(ctx context.Context, cnrID *cid.ID, func (t *TreeServiceMock) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error) { panic("implement me") } + +func (t *TreeServiceMock) GetParts(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64) ([]*data.PartInfo, error) { + panic("implement me") +}