forked from TrueCloudLab/frostfs-s3-gw
[#417] List multipart uploads using tree service
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
13e01164d7
commit
e1b9a4432a
5 changed files with 156 additions and 100 deletions
|
@ -45,6 +45,7 @@ type ObjectTaggingInfo struct {
|
|||
|
||||
// MultipartInfo is multipart upload information.
|
||||
type MultipartInfo struct {
|
||||
Key string
|
||||
UploadID string
|
||||
Owner user.ID
|
||||
Created time.Time
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/internal/misc"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -124,6 +123,7 @@ type (
|
|||
|
||||
func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error {
|
||||
info := &data.MultipartInfo{
|
||||
Key: p.Info.Key,
|
||||
UploadID: p.Info.UploadID,
|
||||
Owner: n.Owner(ctx),
|
||||
Created: time.Now(),
|
||||
|
@ -142,7 +142,7 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
|
|||
info.Meta[tagPrefix+key] = val
|
||||
}
|
||||
|
||||
return n.treeService.CreateMultipart(ctx, &p.Info.Bkt.CID, p.Info.Key, info)
|
||||
return n.treeService.CreateMultipart(ctx, &p.Info.Bkt.CID, info)
|
||||
}
|
||||
|
||||
func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error) {
|
||||
|
@ -376,29 +376,16 @@ func (n *layer) ListMultipartUploads(ctx context.Context, p *ListMultipartUpload
|
|||
return &result, nil
|
||||
}
|
||||
|
||||
f := &findParams{
|
||||
attr: [2]string{UploadPartNumberAttributeName, "0"},
|
||||
bkt: p.Bkt,
|
||||
}
|
||||
|
||||
ids, err := n.objectSearch(ctx, f)
|
||||
multipartInfos, err := n.treeService.GetMultipartUploadsByPrefix(ctx, &p.Bkt.CID, p.Prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
uploads := make([]*UploadInfo, 0, len(ids))
|
||||
uploads := make([]*UploadInfo, 0, len(multipartInfos))
|
||||
uniqDirs := make(map[string]struct{})
|
||||
|
||||
for i := range ids {
|
||||
meta, err := n.objectHead(ctx, p.Bkt, ids[i])
|
||||
if err != nil {
|
||||
n.log.Warn("couldn't head object",
|
||||
zap.Stringer("object id", &ids[i]),
|
||||
zap.Stringer("bucket id", p.Bkt.CID),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
info := uploadInfoFromMeta(meta, p.Prefix, p.Delimiter)
|
||||
for _, multipartInfo := range multipartInfos {
|
||||
info := uploadInfoFromMultipartInfo(multipartInfo, p.Prefix, p.Delimiter)
|
||||
if info != nil {
|
||||
if info.IsDir {
|
||||
if _, ok := uniqDirs[info.Key]; ok {
|
||||
|
@ -598,24 +585,14 @@ func trimAfterUploadKey(key string, objects []*UploadInfo) []*UploadInfo {
|
|||
return result
|
||||
}
|
||||
|
||||
func uploadInfoFromMeta(meta *object.Object, prefix, delimiter string) *UploadInfo {
|
||||
var (
|
||||
isDir bool
|
||||
creation time.Time
|
||||
userHeaders = userHeaders(meta.Attributes())
|
||||
key = userHeaders[UploadKeyAttributeName]
|
||||
)
|
||||
func uploadInfoFromMultipartInfo(uploadInfo *data.MultipartInfo, prefix, delimiter string) *UploadInfo {
|
||||
var isDir bool
|
||||
key := uploadInfo.Key
|
||||
|
||||
if !strings.HasPrefix(key, prefix) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if val, ok := userHeaders[object.AttributeTimestamp]; ok {
|
||||
if dt, err := strconv.ParseInt(val, 10, 64); err == nil {
|
||||
creation = time.Unix(dt, 0)
|
||||
}
|
||||
}
|
||||
|
||||
if len(delimiter) > 0 {
|
||||
tail := strings.TrimPrefix(key, prefix)
|
||||
index := strings.Index(tail, delimiter)
|
||||
|
@ -628,9 +605,9 @@ func uploadInfoFromMeta(meta *object.Object, prefix, delimiter string) *UploadIn
|
|||
return &UploadInfo{
|
||||
IsDir: isDir,
|
||||
Key: key,
|
||||
UploadID: userHeaders[UploadIDAttributeName],
|
||||
Owner: *meta.OwnerID(),
|
||||
Created: creation,
|
||||
UploadID: uploadInfo.UploadID,
|
||||
Owner: uploadInfo.Owner,
|
||||
Created: uploadInfo.Created,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,8 @@ type TreeService interface {
|
|||
GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.BaseNodeVersion, error)
|
||||
RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error
|
||||
|
||||
CreateMultipart(ctx context.Context, cnrID *cid.ID, objectName string, info *data.MultipartInfo) error
|
||||
CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error
|
||||
GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error)
|
||||
}
|
||||
|
||||
// ErrNodeNotFound is returned from Tree service in case of not found error.
|
||||
|
|
|
@ -177,6 +177,35 @@ func newNodeVersionFromTreeNode(treeNode *TreeNode) *data.NodeVersion {
|
|||
return version
|
||||
}
|
||||
|
||||
func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
|
||||
multipartInfo := &data.MultipartInfo{
|
||||
Meta: make(map[string]string, len(node.GetMeta())),
|
||||
}
|
||||
|
||||
for _, kv := range node.GetMeta() {
|
||||
switch kv.GetKey() {
|
||||
case uploadIDKV:
|
||||
multipartInfo.UploadID = string(kv.GetValue())
|
||||
case systemNameKV:
|
||||
multipartInfo.Key = strings.TrimSuffix(string(kv.GetValue()), emptyFileName)
|
||||
case createdKV:
|
||||
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err == nil {
|
||||
multipartInfo.Created = time.UnixMilli(utcMilli)
|
||||
}
|
||||
case ownerKV:
|
||||
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
|
||||
default:
|
||||
multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue())
|
||||
}
|
||||
}
|
||||
|
||||
if multipartInfo.UploadID == "" {
|
||||
return nil, fmt.Errorf("it's not a multipart node")
|
||||
}
|
||||
|
||||
return multipartInfo, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data.BucketSettings, error) {
|
||||
keysToReturn := []string{versioningEnabledKV, lockConfigurationKV}
|
||||
node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, keysToReturn)
|
||||
|
@ -442,29 +471,13 @@ func pathFromName(objectName string) []string {
|
|||
}
|
||||
|
||||
func (c *TreeClient) GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]oid.ID, error) {
|
||||
var rootID uint64
|
||||
path := strings.Split(prefix, separator)
|
||||
tailPrefix := path[len(path)-1]
|
||||
|
||||
if len(path) > 1 {
|
||||
var err error
|
||||
rootID, err = c.getPrefixNodeID(ctx, cnrID, path[:len(path)-1])
|
||||
if err != nil {
|
||||
if errors.Is(err, layer.ErrNodeNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
subTree, err := c.getSubTree(ctx, cnrID, versionTree, rootID, 1)
|
||||
subTreeNodes, err := c.getSubTreeByPrefix(ctx, cnrID, versionTree, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []oid.ID
|
||||
for _, node := range subTree {
|
||||
if node.GetNodeId() != rootID && hasPrefix(node, tailPrefix) {
|
||||
for _, node := range subTreeNodes {
|
||||
latestNodes, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -474,19 +487,33 @@ func (c *TreeClient) GetLatestVersionsByPrefix(ctx context.Context, cnrID *cid.I
|
|||
result = append(result, latest.OID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, prefixPath []string) (uint64, error) {
|
||||
func (c *TreeClient) determinePrefixNode(ctx context.Context, cnrID *cid.ID, treeID, prefix string) (uint64, string, error) {
|
||||
var rootID uint64
|
||||
path := strings.Split(prefix, separator)
|
||||
tailPrefix := path[len(path)-1]
|
||||
|
||||
if len(path) > 1 {
|
||||
var err error
|
||||
rootID, err = c.getPrefixNodeID(ctx, cnrID, treeID, path[:len(path)-1])
|
||||
if err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
}
|
||||
|
||||
return rootID, tailPrefix, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, treeID string, prefixPath []string) (uint64, error) {
|
||||
p := &getNodesParams{
|
||||
CnrID: cnrID,
|
||||
TreeID: versionTree,
|
||||
TreeID: treeID,
|
||||
Path: prefixPath,
|
||||
Meta: []string{fileNameKV, oidKV},
|
||||
LatestOnly: false,
|
||||
AllAttrs: false,
|
||||
AllAttrs: true,
|
||||
}
|
||||
nodes, err := c.getNodes(ctx, p)
|
||||
if err != nil {
|
||||
|
@ -495,7 +522,7 @@ func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, prefixP
|
|||
|
||||
var intermediateNodes []uint64
|
||||
for _, node := range nodes {
|
||||
if !hasOID(node) {
|
||||
if !isIntermediate(node, pathAttributeFromTreeID(treeID)) {
|
||||
intermediateNodes = append(intermediateNodes, node.GetNodeId())
|
||||
}
|
||||
}
|
||||
|
@ -510,9 +537,33 @@ func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, prefixP
|
|||
return intermediateNodes[0], nil
|
||||
}
|
||||
|
||||
func hasPrefix(node *tree.GetSubTreeResponse_Body, prefix string) bool {
|
||||
func (c *TreeClient) getSubTreeByPrefix(ctx context.Context, cnrID *cid.ID, treeID, prefix string) ([]*tree.GetSubTreeResponse_Body, error) {
|
||||
rootID, tailPrefix, err := c.determinePrefixNode(ctx, cnrID, treeID, prefix)
|
||||
if err != nil {
|
||||
if errors.Is(err, layer.ErrNodeNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
subTree, err := c.getSubTree(ctx, cnrID, treeID, rootID, 1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]*tree.GetSubTreeResponse_Body, 0, len(subTree))
|
||||
for _, node := range subTree {
|
||||
if node.GetNodeId() != rootID && hasPrefix(node, pathAttributeFromTreeID(treeID), tailPrefix) {
|
||||
result = append(result, node)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func hasPrefix(node *tree.GetSubTreeResponse_Body, key, prefix string) bool {
|
||||
for _, kv := range node.GetMeta() {
|
||||
if kv.GetKey() == fileNameKV {
|
||||
if kv.GetKey() == key {
|
||||
return strings.HasPrefix(string(kv.GetValue()), prefix)
|
||||
}
|
||||
}
|
||||
|
@ -520,14 +571,12 @@ func hasPrefix(node *tree.GetSubTreeResponse_Body, prefix string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func hasOID(node *tree.GetNodeByPathResponse_Info) bool {
|
||||
for _, kv := range node.GetMeta() {
|
||||
if kv.GetKey() == oidKV {
|
||||
return true
|
||||
}
|
||||
func isIntermediate(node *tree.GetNodeByPathResponse_Info, key string) bool {
|
||||
if len(node.GetMeta()) != 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
return node.GetMeta()[0].GetKey() == key
|
||||
}
|
||||
|
||||
func (c *TreeClient) getSubTreeVersions(ctx context.Context, cnrID *cid.ID, nodeID uint64, latestOnly bool) ([]*data.NodeVersion, error) {
|
||||
|
@ -578,36 +627,19 @@ func formLatestNodeKey(parentID uint64, fileName string) string {
|
|||
}
|
||||
|
||||
func (c *TreeClient) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) {
|
||||
var rootID uint64
|
||||
path := strings.Split(prefix, separator)
|
||||
tailPrefix := path[len(path)-1]
|
||||
|
||||
if len(path) > 1 {
|
||||
var err error
|
||||
rootID, err = c.getPrefixNodeID(ctx, cnrID, path[:len(path)-1])
|
||||
if err != nil {
|
||||
if errors.Is(err, layer.ErrNodeNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
subTree, err := c.getSubTree(ctx, cnrID, versionTree, rootID, 1)
|
||||
subTreeNodes, err := c.getSubTreeByPrefix(ctx, cnrID, versionTree, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []*data.NodeVersion
|
||||
for _, node := range subTree {
|
||||
if node.GetNodeId() != rootID && hasPrefix(node, tailPrefix) {
|
||||
for _, node := range subTreeNodes {
|
||||
versions, err := c.getSubTreeVersions(ctx, cnrID, node.GetNodeId(), false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, versions...)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
@ -688,11 +720,47 @@ func (c *TreeClient) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, id
|
|||
return c.removeNode(ctx, cnrID, systemTree, id)
|
||||
}
|
||||
|
||||
func (c *TreeClient) CreateMultipart(ctx context.Context, cnrID *cid.ID, objectName string, info *data.MultipartInfo) error {
|
||||
path := pathFromName(objectName)
|
||||
func (c *TreeClient) CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error {
|
||||
path := pathFromName(info.Key)
|
||||
meta := metaFromMultipart(info)
|
||||
|
||||
return c.addNodeByPath(ctx, cnrID, systemTree, path, meta)
|
||||
return c.addNodeByPath(ctx, cnrID, systemTree, path[:len(path)-1], meta)
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) {
|
||||
subTreeNodes, err := c.getSubTreeByPrefix(ctx, cnrID, systemTree, prefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []*data.MultipartInfo
|
||||
for _, node := range subTreeNodes {
|
||||
multipartUploads, err := c.getSubTreeMultipartUploads(ctx, cnrID, node.GetNodeId())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, multipartUploads...)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) getSubTreeMultipartUploads(ctx context.Context, cnrID *cid.ID, nodeID uint64) ([]*data.MultipartInfo, error) {
|
||||
subTree, err := c.getSubTree(ctx, cnrID, systemTree, nodeID, maxGetSubTreeDepth)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]*data.MultipartInfo, 0, len(subTree))
|
||||
for _, node := range subTree {
|
||||
multipartInfo, err := newMultipartInfo(node)
|
||||
if err != nil { // missed uploadID (it's a part node)
|
||||
continue
|
||||
}
|
||||
result = append(result, multipartInfo)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) Close() error {
|
||||
|
@ -839,6 +907,7 @@ func metaFromSettings(settings *data.BucketSettings) map[string]string {
|
|||
}
|
||||
|
||||
func metaFromMultipart(info *data.MultipartInfo) map[string]string {
|
||||
info.Meta[systemNameKV] = info.Key
|
||||
info.Meta[uploadIDKV] = info.UploadID
|
||||
info.Meta[ownerKV] = info.Owner.EncodeToString()
|
||||
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
|
||||
|
|
|
@ -235,3 +235,11 @@ func (t *TreeServiceMock) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID
|
|||
func (t *TreeServiceMock) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.NodeVersion, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue