[#472] tree: Don't use sorted GetSubTree for nodes without FileName #472

Merged
alexvanin merged 1 commit from dkirillov/frostfs-s3-gw:bugfix/sorted_get_sub_tree into master 2024-08-27 09:43:29 +00:00
4 changed files with 22 additions and 10 deletions

View file

@ -119,7 +119,7 @@ func TestListObjectsVersionsSkipLogTaggingNodesError(t *testing.T) {
} }
func makeAllTreeObjectsOld(hc *handlerContext, bktInfo *data.BucketInfo) { func makeAllTreeObjectsOld(hc *handlerContext, bktInfo *data.BucketInfo) {
nodes, err := hc.treeMock.GetSubTree(hc.Context(), bktInfo, "version", []uint64{0}, 0) nodes, err := hc.treeMock.GetSubTree(hc.Context(), bktInfo, "version", []uint64{0}, 0, true)
require.NoError(hc.t, err) require.NoError(hc.t, err)
for _, node := range nodes { for _, node := range nodes {

View file

@ -102,14 +102,18 @@ func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([
return res, nil return res, nil
} }
func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]tree.NodeResponse, error) { func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]tree.NodeResponse, error) {
order := treepool.NoneOrder
if sort {
order = treepool.AscendingOrder
}
poolPrm := treepool.GetSubTreeParams{ poolPrm := treepool.GetSubTreeParams{
CID: bktInfo.CID, CID: bktInfo.CID,
TreeID: treeID, TreeID: treeID,
RootID: rootID, RootID: rootID,
Depth: depth, Depth: depth,
BearerToken: getBearer(ctx, bktInfo), BearerToken: getBearer(ctx, bktInfo),
Order: treepool.AscendingOrder, Order: order,
} }
if len(rootID) == 1 && rootID[0] == 0 { if len(rootID) == 1 && rootID[0] == 0 {
// storage node interprets 'nil' value as []uint64{0} // storage node interprets 'nil' value as []uint64{0}

View file

@ -33,7 +33,7 @@ type (
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant. // Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface { ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error) GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]NodeResponse, error) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error)
GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (SubTreeStream, error) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (SubTreeStream, error)
alexvanin marked this conversation as resolved
Review

If we going to use streaming later in #469, shouldn't we adopt sort flag in streaming method too? Or it won't be used for anything besides getSubTreeMultipartUpload?

If we going to use streaming later in https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/issues/469, shouldn't we adopt `sort` flag in streaming method too? Or it won't be used for anything besides `getSubTreeMultipartUpload`?
Review

We should adopt this flag only if we start streaming parts and not only multiparts

We should adopt this flag only if we start streaming parts and not only multiparts
AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error)
AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error)
@ -751,7 +751,7 @@ func (c *Tree) getTreeNode(ctx context.Context, bktInfo *data.BucketInfo, nodeID
} }
func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, keys ...string) (map[string]*treeNode, error) { func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, keys ...string) (map[string]*treeNode, error) {
subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, []uint64{nodeID}, 2) subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, []uint64{nodeID}, 2, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1147,7 +1147,7 @@ func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo,
return nil, "", err return nil, "", err
} }
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2) subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false)
if err != nil { if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) { if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil return nil, "", nil
@ -1305,7 +1305,11 @@ func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.Bu
} }
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) { func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth) // sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute
// so when we are only interested in multipart nodes, we can set this flag
// (despite we sort multiparts in above layer anyway)
// to skip its children (parts) that don't have FileName
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1394,7 +1398,7 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
} }
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2) parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil { if err != nil {
return oid.ID{}, err return oid.ID{}, err
} }
@ -1444,7 +1448,7 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
} }
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2) parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -234,7 +234,7 @@ func (c *ServiceClientMemory) GetNodes(_ context.Context, p *GetNodesParams) ([]
return res2, nil return res2, nil
} }
func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]NodeResponse, error) { func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()] cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok { if !ok {
return nil, nil return nil, nil
@ -254,6 +254,10 @@ func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.Bucket
return nil, ErrNodeNotFound return nil, ErrNodeNotFound
} }
if sort {
sortNode(tr.treeData)
}
// we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels // we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels
return node.listNodes(nil, depth-1), nil return node.listNodes(nil, depth-1), nil
} }