From e1b9a4432aeee392e0c8f36d79f58163371a50d9 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 24 May 2022 11:41:10 +0300 Subject: [PATCH] [#417] List multipart uploads using tree service Signed-off-by: Denis Kirillov --- api/data/tree.go | 1 + api/layer/multipart_upload.go | 47 ++----- api/layer/tree_service.go | 3 +- internal/neofs/tree.go | 197 ++++++++++++++++++--------- internal/neofstest/tree/tree_mock.go | 8 ++ 5 files changed, 156 insertions(+), 100 deletions(-) diff --git a/api/data/tree.go b/api/data/tree.go index 91e85a1..8401159 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -45,6 +45,7 @@ type ObjectTaggingInfo struct { // MultipartInfo is multipart upload information. type MultipartInfo struct { + Key string UploadID string Owner user.ID Created time.Time diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index e8c74eb..acfcbcf 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -14,7 +14,6 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/internal/misc" - "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/zap" ) @@ -124,6 +123,7 @@ type ( func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error { info := &data.MultipartInfo{ + Key: p.Info.Key, UploadID: p.Info.UploadID, Owner: n.Owner(ctx), Created: time.Now(), @@ -142,7 +142,7 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar info.Meta[tagPrefix+key] = val } - return n.treeService.CreateMultipart(ctx, &p.Info.Bkt.CID, p.Info.Key, info) + return n.treeService.CreateMultipart(ctx, &p.Info.Bkt.CID, info) } func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error) { @@ -376,29 +376,16 @@ func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload return &result, nil } - f := &findParams{ - attr: [2]string{UploadPartNumberAttributeName, "0"}, - bkt: p.Bkt, - } - - ids, err := n.objectSearch(ctx, f) + multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, &p.Bkt.CID, p.Prefix) if err != nil { return nil, err } - uploads := make([]*UploadInfo, 0, len(ids)) + uploads := make([]*UploadInfo, 0, len(multipartInfos)) uniqDirs := make(map[string]struct{}) - for i := range ids { - meta, err := n.objectHead(ctx, p.Bkt, ids[i]) - if err != nil { - n.log.Warn("couldn't head object", - zap.Stringer("object id", &ids[i]), - zap.Stringer("bucket id", p.Bkt.CID), - zap.Error(err)) - continue - } - info := uploadInfoFromMeta(meta, p.Prefix, p.Delimiter) + for _, multipartInfo := range multipartInfos { + info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter) if info != nil { if info.IsDir { if _, ok := uniqDirs[info.Key]; ok { @@ -598,24 +585,14 @@ func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo { return result } -func uploadInfoFromMeta(meta *object.Object, prefix, delimiter string) *UploadInfo { - var ( - isDir bool - creation time.Time - userHeaders = userHeaders(meta.Attributes()) - key = userHeaders[UploadKeyAttributeName] - ) +func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo { + var isDir bool + key := uploadInfo.Key if !strings.HasPrefix(key, prefix) { return nil } - if val, ok := userHeaders[object.AttributeTimestamp]; ok { - if dt, err := strconv.ParseInt(val, 10, 64); err == nil { - creation = time.Unix(dt, 0) - } - } - if len(delimiter) > 0 { tail := strings.TrimPrefix(key, prefix) index := strings.Index(tail, delimiter) @@ -628,9 +605,9 @@ func uploadInfoFromMeta(meta *object.Object, prefix, delimiter string) *UploadIn return &UploadInfo{ IsDir: isDir, Key: key, - UploadID: userHeaders[UploadIDAttributeName], - Owner: *meta.OwnerID(), - Created: creation, + UploadID: uploadInfo.UploadID, + Owner: uploadInfo.Owner, + Created: uploadInfo.Created, } } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index c1507cd..05e747b 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -50,7 +50,8 @@ type TreeService interface { GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.BaseNodeVersion, error) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error - CreateMultipart(ctx context.Context, cnrID *cid.ID, objectName string, info *data.MultipartInfo) error + CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error + GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) } // ErrNodeNotFound is returned from Tree service in case of not found error. diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 7a62ccf..8da9a78 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -177,6 +177,35 @@ func newNodeVersionFromTreeNode(treeNode *TreeNode) *data.NodeVersion { return version } +func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) { + multipartInfo := &data.MultipartInfo{ + Meta: make(map[string]string, len(node.GetMeta())), + } + + for _, kv := range node.GetMeta() { + switch kv.GetKey() { + case uploadIDKV: + multipartInfo.UploadID = string(kv.GetValue()) + case systemNameKV: + multipartInfo.Key = strings.TrimSuffix(string(kv.GetValue()), emptyFileName) + case createdKV: + if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err == nil { + multipartInfo.Created = time.UnixMilli(utcMilli) + } + case ownerKV: + _ = multipartInfo.Owner.DecodeString(string(kv.GetValue())) + default: + multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) + } + } + + if multipartInfo.UploadID == "" { + return nil, fmt.Errorf("it's not a multipart node") + } + + return multipartInfo, nil +} + func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data.BucketSettings, error) { keysToReturn := []string{versioningEnabledKV, lockConfigurationKV} node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, keysToReturn) @@ -442,51 +471,49 @@ func pathFromName(objectName string) []string { } func (c *TreeClient) GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) { - var rootID uint64 - path := strings.Split(prefix, separator) - tailPrefix := path[len(path)-1] - - if len(path) > 1 { - var err error - rootID, err = c.getPrefixNodeID(ctx, cnrID, path[:len(path)-1]) - if err != nil { - if errors.Is(err, layer.ErrNodeNotFound) { - return nil, nil - } - return nil, err - } - } - - subTree, err := c.getSubTree(ctx, cnrID, versionTree, rootID, 1) + subTreeNodes, err := c.getSubTreeByPrefix(ctx, cnrID, versionTree, prefix) if err != nil { return nil, err } var result []oid.ID - for _, node := range subTree { - if node.GetNodeId() != rootID && hasPrefix(node, tailPrefix) { - latestNodes, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), true) - if err != nil { - return nil, err - } + for _, node := range subTreeNodes { + latestNodes, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), true) + if err != nil { + return nil, err + } - for _, latest := range latestNodes { - result = append(result, latest.OID) - } + for _, latest := range latestNodes { + result = append(result, latest.OID) } } return result, nil } -func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, prefixPath []string) (uint64, error) { +func (c *TreeClient) determinePrefixNode(ctx context.Context, cnrID *cid.ID, treeID, prefix string) (uint64, string, error) { + var rootID uint64 + path := strings.Split(prefix, separator) + tailPrefix := path[len(path)-1] + + if len(path) > 1 { + var err error + rootID, err = c.getPrefixNodeID(ctx, cnrID, treeID, path[:len(path)-1]) + if err != nil { + return 0, "", err + } + } + + return rootID, tailPrefix, nil +} + +func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, treeID string, prefixPath []string) (uint64, error) { p := &getNodesParams{ CnrID: cnrID, - TreeID: versionTree, + TreeID: treeID, Path: prefixPath, - Meta: []string{fileNameKV, oidKV}, LatestOnly: false, - AllAttrs: false, + AllAttrs: true, } nodes, err := c.getNodes(ctx, p) if err != nil { @@ -495,7 +522,7 @@ func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, prefixP var intermediateNodes []uint64 for _, node := range nodes { - if !hasOID(node) { + if !isIntermediate(node, pathAttributeFromTreeID(treeID)) { intermediateNodes = append(intermediateNodes, node.GetNodeId()) } } @@ -510,9 +537,33 @@ func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, prefixP return intermediateNodes[0], nil } -func hasPrefix(node *tree.GetSubTreeResponse_Body, prefix string) bool { +func (c *TreeClient) getSubTreeByPrefix(ctx context.Context, cnrID *cid.ID, treeID, prefix string) ([]*tree.GetSubTreeResponse_Body, error) { + rootID, tailPrefix, err := c.determinePrefixNode(ctx, cnrID, treeID, prefix) + if err != nil { + if errors.Is(err, layer.ErrNodeNotFound) { + return nil, nil + } + return nil, err + } + + subTree, err := c.getSubTree(ctx, cnrID, treeID, rootID, 1) + if err != nil { + return nil, err + } + + result := make([]*tree.GetSubTreeResponse_Body, 0, len(subTree)) + for _, node := range subTree { + if node.GetNodeId() != rootID && hasPrefix(node, pathAttributeFromTreeID(treeID), tailPrefix) { + result = append(result, node) + } + } + + return result, nil +} + +func hasPrefix(node *tree.GetSubTreeResponse_Body, key, prefix string) bool { for _, kv := range node.GetMeta() { - if kv.GetKey() == fileNameKV { + if kv.GetKey() == key { return strings.HasPrefix(string(kv.GetValue()), prefix) } } @@ -520,14 +571,12 @@ func hasPrefix(node *tree.GetSubTreeResponse_Body, prefix string) bool { return false } -func hasOID(node *tree.GetNodeByPathResponse_Info) bool { - for _, kv := range node.GetMeta() { - if kv.GetKey() == oidKV { - return true - } +func isIntermediate(node *tree.GetNodeByPathResponse_Info, key string) bool { + if len(node.GetMeta()) != 1 { + return false } - return false + return node.GetMeta()[0].GetKey() == key } func (c *TreeClient) getSubTreeVersions(ctx context.Context, cnrID *cid.ID, nodeID uint64, latestOnly bool) ([]*data.NodeVersion, error) { @@ -578,35 +627,18 @@ func formLatestNodeKey(parentID uint64, fileName string) string { } func (c *TreeClient) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) { - var rootID uint64 - path := strings.Split(prefix, separator) - tailPrefix := path[len(path)-1] - - if len(path) > 1 { - var err error - rootID, err = c.getPrefixNodeID(ctx, cnrID, path[:len(path)-1]) - if err != nil { - if errors.Is(err, layer.ErrNodeNotFound) { - return nil, nil - } - return nil, err - } - } - - subTree, err := c.getSubTree(ctx, cnrID, versionTree, rootID, 1) + subTreeNodes, err := c.getSubTreeByPrefix(ctx, cnrID, versionTree, prefix) if err != nil { return nil, err } var result []*data.NodeVersion - for _, node := range subTree { - if node.GetNodeId() != rootID && hasPrefix(node, tailPrefix) { - versions, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), false) - if err != nil { - return nil, err - } - result = append(result, versions...) + for _, node := range subTreeNodes { + versions, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), false) + if err != nil { + return nil, err } + result = append(result, versions...) } return result, nil @@ -688,11 +720,47 @@ func (c *TreeClient) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, id return c.removeNode(ctx, cnrID, systemTree, id) } -func (c *TreeClient) CreateMultipart(ctx context.Context, cnrID *cid.ID, objectName string, info *data.MultipartInfo) error { - path := pathFromName(objectName) +func (c *TreeClient) CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error { + path := pathFromName(info.Key) meta := metaFromMultipart(info) - return c.addNodeByPath(ctx, cnrID, systemTree, path, meta) + return c.addNodeByPath(ctx, cnrID, systemTree, path[:len(path)-1], meta) +} + +func (c *TreeClient) GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) { + subTreeNodes, err := c.getSubTreeByPrefix(ctx, cnrID, systemTree, prefix) + if err != nil { + return nil, err + } + + var result []*data.MultipartInfo + for _, node := range subTreeNodes { + multipartUploads, err := c.getSubTreeMultipartUploads(ctx, cnrID, node.GetNodeId()) + if err != nil { + return nil, err + } + result = append(result, multipartUploads...) + } + + return result, nil +} + +func (c *TreeClient) getSubTreeMultipartUploads(ctx context.Context, cnrID *cid.ID, nodeID uint64) ([]*data.MultipartInfo, error) { + subTree, err := c.getSubTree(ctx, cnrID, systemTree, nodeID, maxGetSubTreeDepth) + if err != nil { + return nil, err + } + + result := make([]*data.MultipartInfo, 0, len(subTree)) + for _, node := range subTree { + multipartInfo, err := newMultipartInfo(node) + if err != nil { // missed uploadID (it's a part node) + continue + } + result = append(result, multipartInfo) + } + + return result, nil } func (c *TreeClient) Close() error { @@ -839,6 +907,7 @@ func metaFromSettings(settings *data.BucketSettings) map[string]string { } func metaFromMultipart(info *data.MultipartInfo) map[string]string { + info.Meta[systemNameKV] = info.Key info.Meta[uploadIDKV] = info.UploadID info.Meta[ownerKV] = info.Owner.EncodeToString() info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10) diff --git a/internal/neofstest/tree/tree_mock.go b/internal/neofstest/tree/tree_mock.go index a396740..ee9af90 100644 --- a/internal/neofstest/tree/tree_mock.go +++ b/internal/neofstest/tree/tree_mock.go @@ -235,3 +235,11 @@ func (t *TreeServiceMock) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID func (t *TreeServiceMock) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) { panic("implement me") } + +func (t *TreeServiceMock) CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error { + panic("implement me") +} + +func (t *TreeServiceMock) GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) { + panic("implement me") +}