[#1642] tree: Fix sorted getSubtree for multiversion filenames

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
Airat Arifullin 2025-02-28 15:47:29 +03:00 committed by Evgenii Stratonikov
parent a11b2d27e4
commit 760b6a44ea
4 changed files with 29 additions and 6 deletions

View file

@ -1128,6 +1128,7 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
}
t.fillSortedChildren(b, nodeIDs, h)
for info, ok := h.pop(); ok; info, ok = h.pop() {
for _, id := range info.id {
childInfo, err := t.getChildInfo(b, key, id)
@ -1154,7 +1155,7 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
}
if len(res) != 0 {
s := string(findAttr(res[len(res)-1].Meta, AttributeFilename))
last = NewCursor(s)
last = NewCursor(s, res[len(res)-1].Children[len(res[len(res)-1].Children)-1])
}
return res, last, metaerr.Wrap(err)
}

View file

@ -207,11 +207,11 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
if start == nil || string(findAttr(r[i].Meta, AttributeFilename)) > start.GetFilename() {
finish := min(len(res), i+count)
last := string(findAttr(r[finish-1].Meta, AttributeFilename))
return r[i:finish], NewCursor(last), nil
return r[i:finish], NewCursor(last, 0), nil
}
}
last := string(res[len(res)-1].Meta.GetAttr(AttributeFilename))
return nil, NewCursor(last), nil
return nil, NewCursor(last, 0), nil
}
// TreeGetChildren implements the Forest interface.

View file

@ -50,8 +50,19 @@ func newHeap(start *Cursor, count int) *fixedHeap {
const amortizationMultiplier = 5
func (h *fixedHeap) push(id MultiNode, filename string) bool {
if h.start != nil && filename <= (*h.start).GetFilename() {
return false
if h.start != nil {
if filename < h.start.GetFilename() {
return false
} else if filename == h.start.GetFilename() {
// A tree may have a lot of nodes with the same filename but different versions so that
// len(nodes) > batch_size. The cut nodes should be pushed into the result on repeated call
// with the same filename.
pos := slices.Index(id, h.start.GetNode())
if pos == -1 || pos+1 >= len(id) {
return false
}
id = id[pos+1:]
}
}
*h.h = append(*h.h, heapInfo{id: id, filename: filename})

View file

@ -85,11 +85,15 @@ const (
type Cursor struct {
// Last traversed filename.
filename string
// Last traversed node.
node Node
}
func NewCursor(filename string) *Cursor {
func NewCursor(filename string, node Node) *Cursor {
return &Cursor{
filename: filename,
node: node,
}
}
@ -100,6 +104,13 @@ func (c *Cursor) GetFilename() string {
return c.filename
}
func (c *Cursor) GetNode() Node {
if c == nil {
return Node(0)
}
return c.node
}
// CIDDescriptor contains container ID and information about the node position
// in the list of container nodes.
type CIDDescriptor struct {