[#471] tree: Don't use sorted GetSubTree for nodes without FileName #471

Merged
alexvanin merged 1 commit from dkirillov/frostfs-s3-gw:bugfix/sorted_get_sub_tree_support into support/v0.30 2024-08-27 09:41:50 +00:00
4 changed files with 22 additions and 10 deletions
Showing only changes of commit a7ae88e1fb - Show all commits

View file

@ -94,7 +94,7 @@ func TestListObjectsWithOldTreeNodes(t *testing.T) {
}
func makeAllTreeObjectsOld(hc *handlerContext, bktInfo *data.BucketInfo) {
nodes, err := hc.treeMock.GetSubTree(hc.Context(), bktInfo, "version", []uint64{0}, 0)
nodes, err := hc.treeMock.GetSubTree(hc.Context(), bktInfo, "version", []uint64{0}, 0, true)
require.NoError(hc.t, err)
for _, node := range nodes {

View file

@ -102,14 +102,18 @@ func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([
return res, nil
}
func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]tree.NodeResponse, error) {
func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]tree.NodeResponse, error) {
order := treepool.NoneOrder
if sort {
order = treepool.AscendingOrder
}
poolPrm := treepool.GetSubTreeParams{
CID: bktInfo.CID,
TreeID: treeID,
RootID: rootID,
Depth: depth,
BearerToken: getBearer(ctx, bktInfo),
Order: treepool.AscendingOrder,
Order: order,
}
if len(rootID) == 1 && rootID[0] == 0 {
// storage node interprets 'nil' value as []uint64{0}

View file

@ -32,7 +32,7 @@ type (
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
ServiceClient interface {
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]NodeResponse, error)
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error)
GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (SubTreeStream, error)
AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error)
AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error)
@ -699,7 +699,7 @@ func (c *Tree) getTreeNode(ctx context.Context, bktInfo *data.BucketInfo, nodeID
}
func (c *Tree) getTreeNodes(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, keys ...string) (map[string]*treeNode, error) {
subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, []uint64{nodeID}, 2)
subtree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, []uint64{nodeID}, 2, false)
if err != nil {
return nil, err
}
@ -1093,7 +1093,7 @@ func (c *Tree) getSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo,
return nil, "", err
}
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2)
subTree, err := c.service.GetSubTree(ctx, bktInfo, treeID, rootID, 2, false)
if err != nil {
if errors.Is(err, layer.ErrNodeNotFound) {
return nil, "", nil
@ -1251,7 +1251,11 @@ func (c *Tree) GetMultipartUploadsByPrefix(ctx context.Context, bktInfo *data.Bu
}
func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.BucketInfo, nodeID []uint64, parentFilePath string) ([]*data.MultipartInfo, error) {
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth)
// sorting in getSubTree leads to skipping nodes that doesn't have FileName attribute
// so when we are only interested in multipart nodes, we can set this flag
// (despite we sort multiparts in above layer anyway)
// to skip its children (parts) that don't have FileName
subTree, err := c.service.GetSubTree(ctx, bktInfo, systemTree, nodeID, maxGetSubTreeDepth, true)
if err != nil {
return nil, err
}
@ -1340,7 +1344,7 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
}
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return oid.ID{}, err
}
@ -1390,7 +1394,7 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
}
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2)
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil {
return nil, err
}

View file

@ -234,7 +234,7 @@ func (c *ServiceClientMemory) GetNodes(_ context.Context, p *GetNodesParams) ([]
return res2, nil
}
func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]NodeResponse, error) {
func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error) {
cnr, ok := c.containers[bktInfo.CID.EncodeToString()]
if !ok {
return nil, nil
@ -254,6 +254,10 @@ func (c *ServiceClientMemory) GetSubTree(_ context.Context, bktInfo *data.Bucket
return nil, ErrNodeNotFound
}
if sort {
sortNode(tr.treeData)
}
// we depth-1 in case of uint32 and 0 as mark to get all subtree leads to overflow and depth is getting quite big to walk all tree levels
return node.listNodes(nil, depth-1), nil
}