forked from TrueCloudLab/frostfs-s3-gw
[#417] List parts using tree service
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
bc0bdc7767
commit
58f2bf44b1
4 changed files with 59 additions and 38 deletions
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/internal/misc"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -321,7 +322,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
|||
}
|
||||
}
|
||||
|
||||
objects, err := n.getUploadParts(ctx, p.Info)
|
||||
_, objects, err := n.getUploadParts(ctx, p.Info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -487,7 +488,7 @@ func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
|
|||
}
|
||||
|
||||
func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error {
|
||||
objects, err := n.getUploadParts(ctx, p)
|
||||
_, objects, err := n.getUploadParts(ctx, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -503,12 +504,12 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
|||
|
||||
func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||
var res ListPartsInfo
|
||||
objs, err := n.getUploadParts(ctx, p.Info)
|
||||
multipartInfo, objs, err := n.getUploadParts(ctx, p.Info) // todo consider listing without head object from NeoFS
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res.Owner = objs[0].Owner
|
||||
res.Owner = multipartInfo.Owner
|
||||
|
||||
parts := make([]*Part, 0, len(objs))
|
||||
|
||||
|
@ -560,51 +561,45 @@ func (n *layer) GetUploadInitInfo(ctx context.Context, p *UploadInfoParams) (*da
|
|||
return info, nil
|
||||
}
|
||||
|
||||
func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (map[int]*data.ObjectInfo, error) {
|
||||
// we search by UploadID attribute because parts are system objects which have system name not filename
|
||||
// and search in attributes by prefix is not supported
|
||||
f := &findParams{
|
||||
attr: [2]string{UploadIDAttributeName, p.UploadID},
|
||||
bkt: p.Bkt,
|
||||
func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.ObjectInfo, error) {
|
||||
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Bkt.CID, p.Key, p.UploadID)
|
||||
if err != nil {
|
||||
if stderrors.Is(err, ErrNodeNotFound) {
|
||||
return nil, nil, errors.GetAPIError(errors.ErrNoSuchUpload)
|
||||
}
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
ids, err := n.objectSearch(ctx, f)
|
||||
parts, err := n.treeService.GetParts(ctx, &p.Bkt.CID, multipartInfo.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
res := make(map[int]*data.ObjectInfo)
|
||||
var addr oid.Address
|
||||
addr.SetContainer(p.Bkt.CID)
|
||||
for _, part := range parts {
|
||||
addr.SetObject(part.OID)
|
||||
objInfo := n.objCache.GetObject(addr)
|
||||
if objInfo == nil {
|
||||
meta, err := n.objectHead(ctx, p.Bkt, part.OID)
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't head a part of upload",
|
||||
zap.String("object id", part.OID.EncodeToString()),
|
||||
zap.String("bucket id", p.Bkt.CID.EncodeToString()),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
objInfo = objInfoFromMeta(p.Bkt, meta)
|
||||
}
|
||||
|
||||
for i := range ids {
|
||||
meta, err := n.objectHead(ctx, p.Bkt, ids[i])
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't head a part of upload",
|
||||
zap.Stringer("object id", &ids[i]),
|
||||
zap.Stringer("bucket id", p.Bkt.CID),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
info := objInfoFromMeta(p.Bkt, meta)
|
||||
// skip objects which are completed by "complete-multipart-upload" because they have "s3-Upload-Id" attribute
|
||||
if !isSystem(info) {
|
||||
continue
|
||||
}
|
||||
numStr := info.Headers[UploadPartNumberAttributeName]
|
||||
num, err := strconv.Atoi(numStr)
|
||||
if err != nil {
|
||||
return nil, errors.GetAPIError(errors.ErrInternalError)
|
||||
}
|
||||
res[num] = info
|
||||
if err = n.systemCache.PutObject(systemObjectKey(p.Bkt, FormUploadPartName(p.UploadID, p.Key, num)), info); err != nil {
|
||||
res[part.Number] = objInfo
|
||||
if err = n.objCache.PutObject(objInfo); err != nil {
|
||||
n.log.Warn("couldn't cache upload part", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if len(res) == 0 {
|
||||
return nil, errors.GetAPIError(errors.ErrNoSuchUpload)
|
||||
}
|
||||
|
||||
return res, nil
|
||||
return multipartInfo, res, nil
|
||||
}
|
||||
|
||||
func FormUploadPartName(uploadID, key string, partNumber int) string {
|
||||
|
|
|
@ -58,6 +58,7 @@ type TreeService interface {
|
|||
// and returns objectID of a previous part which must be deleted in NeoFS.
|
||||
// If a part is being added for the first time, the previous part ID will be nil.
|
||||
AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error)
|
||||
GetParts(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64) ([]*data.PartInfo, error)
|
||||
}
|
||||
|
||||
// ErrNodeNotFound is returned from Tree service in case of not found error.
|
||||
|
|
|
@ -848,6 +848,27 @@ func (c *TreeClient) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID
|
|||
return oldObjIDToDelete, c.moveNode(ctx, cnrID, systemTree, foundPartID, multipartNodeID, meta)
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetParts(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
||||
parts, err := c.getSubTree(ctx, cnrID, systemTree, multipartNodeID, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]*data.PartInfo, 0, len(parts))
|
||||
for _, part := range parts {
|
||||
if part.GetNodeId() == multipartNodeID {
|
||||
continue
|
||||
}
|
||||
partInfo, err := newPartInfo(part)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
result = append(result, partInfo)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) Close() error {
|
||||
if c.conn != nil {
|
||||
return c.conn.Close()
|
||||
|
|
|
@ -251,3 +251,7 @@ func (t *TreeServiceMock) GetMultipartUpload(ctx context.Context, cnrID *cid.ID,
|
|||
func (t *TreeServiceMock) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetParts(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue