[#185] tree: Fix getSubTreeMultipartUploads

Every tree node contains only FileName
but key in multipart info must contain FilePath

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-08-16 13:59:19 +03:00
parent 8898c2ec08
commit adec93af54

View file

@ -221,6 +221,30 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
return version return version
} }
func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
uploadID, _ := treeNode.Get(uploadIDKV)
if uploadID == "" {
return nil, fmt.Errorf("it's not a multipart node")
}
multipartInfo := &data.MultipartInfo{
ID: treeNode.ID,
Key: filePath,
UploadID: uploadID,
Meta: treeNode.Meta,
}
ownerID, _ := treeNode.Get(ownerKV)
_ = multipartInfo.Owner.DecodeString(ownerID)
created, _ := treeNode.Get(createdKV)
if utcMilli, err := strconv.ParseInt(created, 10, 64); err == nil {
multipartInfo.Created = time.UnixMilli(utcMilli)
}
return multipartInfo, nil
}
func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) { func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
multipartInfo := &data.MultipartInfo{ multipartInfo := &data.MultipartInfo{
ID: node.GetNodeID(), ID: node.GetNodeID(),
@ -858,14 +882,14 @@ func (c *Tree) CreateMultipartUpload(ctx context.Context, bktInfo *data.BucketIn
} }
func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) { func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string) ([]*data.MultipartInfo, error) {
subTreeNodes, _, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false) subTreeNodes, headPrefix, err := c.getSubTreeByPrefix(ctx, bktInfo, systemTree, prefix, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var result []*data.MultipartInfo var result []*data.MultipartInfo
for _, node := range subTreeNodes { for _, node := range subTreeNodes {
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID()) multipartUploads, err := c.getSubTreeMultipartUploads(ctx, bktInfo, node.GetNodeID(), headPrefix)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -875,19 +899,55 @@ func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.Bu
return result, nil return result, nil
} }
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) ([]*data.MultipartInfo, error) { func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth) subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result := make([]*data.MultipartInfo, 0, len(subTree)) var parentPrefix string
for _, node := range subTree { if parentFilePath != "" { // The root of subTree can also have a parent
multipartInfo, err := newMultipartInfo(node) parentPrefix = strings.TrimSuffix(parentFilePath, separator) + separator // To avoid 'foo//bar'
if err != nil { // missed uploadID (it's a part node) }
var filepath string
namesMap := make(map[uint64]string, len(subTree))
multiparts := make(map[string][]*data.MultipartInfo, len(subTree))
for i, node := range subTree {
treeNode, fileName, err := parseTreeNode(node)
if err != nil {
continue continue
} }
result = append(result, multipartInfo)
if i != 0 {
if filepath, err = formFilePath(node, fileName, namesMap); err != nil {
return nil, fmt.Errorf("invalid node order: %w", err)
}
} else {
filepath = parentPrefix + fileName
namesMap[treeNode.ID] = filepath
}
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
if err != nil {
continue
}
key := formLatestNodeKey(node.GetParentID(), fileName)
multipartInfos, ok := multiparts[key]
if !ok {
multipartInfos = []*data.MultipartInfo{multipartInfo}
} else {
multipartInfos = append(multipartInfos, multipartInfo)
}
multiparts[key] = multipartInfos
}
result := make([]*data.MultipartInfo, 0, len(multiparts))
for _, multipartInfo := range multiparts {
result = append(result, multipartInfo...)
} }
return result, nil return result, nil