[#1074] pilorama: Allow empty filenames in SortedByFilename()

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-04-04 10:40:21 +03:00 committed by Evgenii Stratonikov
parent 064e18b277
commit c1d90f018b
8 changed files with 53 additions and 34 deletions

View file

@ -210,7 +210,7 @@ func (e *StorageEngine) TreeGetChildren(ctx context.Context, cid cidSDK.ID, tree
}
// TreeSortedByFilename implements the pilorama.Forest interface.
func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last string, count int) ([]pilorama.NodeInfo, string, error) {
func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last *string, count int) ([]pilorama.NodeInfo, *string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
@ -222,7 +222,7 @@ func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID,
var err error
var nodes []pilorama.NodeInfo
var cursor string
var cursor *string
for _, sh := range e.sortShards(cid) {
nodes, cursor, err = sh.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
if err != nil {

View file

@ -1003,7 +1003,7 @@ func (t *boltForest) hasFewChildren(b *bbolt.Bucket, nodeID Node, threshold int)
}
// TreeSortedByFilename implements the Forest interface.
func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last string, count int) ([]NodeInfo, string, error) {
func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last *string, count int) ([]NodeInfo, *string, error) {
var (
startedAt = time.Now()
success = false
@ -1025,7 +1025,7 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, "", ErrDegradedMode
return nil, last, ErrDegradedMode
}
h := newHeap(last, count)
@ -1069,20 +1069,25 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
}
if fewChildren {
result = sortAndCut(result, []byte(last))
result = sortAndCut(result, last)
}
if len(result) != 0 {
last = string(result[len(result)-1].Meta.GetAttr(AttributeFilename))
s := string(result[len(result)-1].Meta.GetAttr(AttributeFilename))
last = &s
}
return result, last, metaerr.Wrap(err)
}
func sortAndCut(result []NodeInfo, last []byte) []NodeInfo {
func sortAndCut(result []NodeInfo, last *string) []NodeInfo {
var lastBytes []byte
if last != nil {
lastBytes = []byte(*last)
}
sort.Slice(result, func(i, j int) bool {
return bytes.Compare(result[i].Meta.GetAttr(AttributeFilename), result[j].Meta.GetAttr(AttributeFilename)) == -1
})
for i := range result {
if bytes.Compare(last, result[i].Meta.GetAttr(AttributeFilename)) == -1 {
if lastBytes == nil || bytes.Compare(lastBytes, result[i].Meta.GetAttr(AttributeFilename)) == -1 {
return result[i:]
}
}

View file

@ -156,11 +156,11 @@ func (f *memoryForest) TreeGetMeta(_ context.Context, cid cid.ID, treeID string,
}
// TreeSortedByFilename implements the Forest interface.
func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeID string, nodeID Node, start string, count int) ([]NodeInfo, string, error) {
func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeID string, nodeID Node, start *string, count int) ([]NodeInfo, *string, error) {
fullID := cid.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
return nil, "", ErrTreeNotFound
return nil, start, ErrTreeNotFound
}
if count == 0 {
return nil, start, nil
@ -169,7 +169,14 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
children := s.tree.getChildren(nodeID)
res := make([]NodeInfo, 0, len(children))
for _, childID := range children {
if len(s.infoMap[childID].Meta.GetAttr(AttributeFilename)) == 0 {
var found bool
for _, kv := range s.infoMap[childID].Meta.Items {
if kv.Key == AttributeFilename {
found = true
break
}
}
if !found {
continue
}
res = append(res, NodeInfo{
@ -179,22 +186,24 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
})
}
if len(res) == 0 {
return res, "", nil
return res, start, nil
}
sort.Slice(res, func(i, j int) bool {
return bytes.Compare(res[i].Meta.GetAttr(AttributeFilename), res[j].Meta.GetAttr(AttributeFilename)) == -1
})
for i := range res {
if string(res[i].Meta.GetAttr(AttributeFilename)) > start {
if start == nil || string(res[i].Meta.GetAttr(AttributeFilename)) > *start {
finish := i + count
if len(res) < finish {
finish = len(res)
}
return res[i:finish], string(res[finish-1].Meta.GetAttr(AttributeFilename)), nil
last := string(res[finish-1].Meta.GetAttr(AttributeFilename))
return res[i:finish], &last, nil
}
}
return nil, string(res[len(res)-1].Meta.GetAttr(AttributeFilename)), nil
last := string(res[len(res)-1].Meta.GetAttr(AttributeFilename))
return nil, &last, nil
}
// TreeGetChildren implements the Forest interface.

View file

@ -215,7 +215,7 @@ func BenchmarkForestSortedIteration(b *testing.B) {
b.Run(providers[i].name+",root", func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, RootID, "", 100)
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, RootID, nil, 100)
if err != nil || len(res) != 100 {
b.Fatalf("err %v, count %d", err, len(res))
}
@ -223,7 +223,7 @@ func BenchmarkForestSortedIteration(b *testing.B) {
})
b.Run(providers[i].name+",leaf", func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, 1, "", 100)
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, 1, nil, 100)
if err != nil || len(res) != 0 {
b.FailNow()
}
@ -246,14 +246,14 @@ func testForestTreeSortedIteration(t *testing.T, s ForestStorage) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
treeAdd := func(t *testing.T, ts int) {
treeAdd := func(t *testing.T, ts int, filename string) {
_, err := s.TreeMove(context.Background(), d, treeID, &Move{
Child: RootID + uint64(ts),
Parent: RootID,
Meta: Meta{
Time: Timestamp(ts),
Items: []KeyValue{
{Key: AttributeFilename, Value: []byte(strconv.Itoa(ts))},
{Key: AttributeFilename, Value: []byte(filename)},
},
},
})
@ -261,19 +261,20 @@ func testForestTreeSortedIteration(t *testing.T, s ForestStorage) {
}
const count = 9
for i := 0; i < count; i++ {
treeAdd(t, i+1)
treeAdd(t, 1, "")
for i := 1; i < count; i++ {
treeAdd(t, i+1, strconv.Itoa(i+1))
}
var result []NodeInfo
treeAppend := func(t *testing.T, last string, count int) string {
treeAppend := func(t *testing.T, last *string, count int) *string {
res, cursor, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, RootID, last, count)
require.NoError(t, err)
result = append(result, res...)
return cursor
}
last := treeAppend(t, "", 2)
last := treeAppend(t, nil, 2)
last = treeAppend(t, last, 3)
last = treeAppend(t, last, 0)
last = treeAppend(t, last, 1)
@ -282,7 +283,11 @@ func testForestTreeSortedIteration(t *testing.T, s ForestStorage) {
require.Len(t, result, count)
for i := range result {
require.Equal(t, RootID+uint64(i+1), result[i].ID)
require.Equal(t, strconv.Itoa(RootID+i+1), string(result[i].Meta.GetAttr(AttributeFilename)))
if i == 0 {
require.Equal(t, "", string(result[i].Meta.GetAttr(AttributeFilename)))
} else {
require.Equal(t, strconv.Itoa(RootID+i+1), string(result[i].Meta.GetAttr(AttributeFilename)))
}
}
}
@ -341,7 +346,7 @@ func testForestTreeSortedByFilename(t *testing.T, s ForestStorage) {
}
getChildren := func(t *testing.T, id Node) []NodeInfo {
res, _, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, id, "", len(items))
res, _, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, id, nil, len(items))
require.NoError(t, err)
return res
}

View file

@ -28,13 +28,13 @@ func (h *filenameHeap) Pop() any {
// fixedHeap maintains a fixed number of smallest elements started at some point.
type fixedHeap struct {
start string
start *string
max string
count int
h *filenameHeap
}
func newHeap(start string, count int) *fixedHeap {
func newHeap(start *string, count int) *fixedHeap {
h := new(filenameHeap)
heap.Init(h)
@ -47,7 +47,7 @@ func newHeap(start string, count int) *fixedHeap {
}
func (h *fixedHeap) push(id Node, filename string) bool {
if filename == "" || filename <= h.start {
if h.start != nil && filename <= *h.start {
return false
}
heap.Push(h.h, heapInfo{id: id, filename: filename})

View file

@ -35,7 +35,7 @@ type Forest interface {
TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]NodeInfo, error)
// TreeSortedByFilename returns children of the node with the specified ID. The nodes are sorted by the filename attribute..
// Should return ErrTreeNotFound if the tree is not found, and empty result if the node is not in the tree.
TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last string, count int) ([]NodeInfo, string, error)
TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last *string, count int) ([]NodeInfo, *string, error)
// TreeGetOpLog returns first log operation stored at or above the height.
// In case no such operation is found, empty Move and nil error should be returned.
TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (Move, error)

View file

@ -184,7 +184,7 @@ func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID strin
}
// TreeSortedByFilename implements the pilorama.Forest interface.
func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last string, count int) ([]pilorama.NodeInfo, string, error) {
func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last *string, count int) ([]pilorama.NodeInfo, *string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
@ -196,14 +196,14 @@ func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID
defer span.End()
if s.pilorama == nil {
return nil, "", ErrPiloramaDisabled
return nil, last, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, "", ErrDegradedMode
return nil, last, ErrDegradedMode
}
return s.pilorama.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
}

View file

@ -446,7 +446,7 @@ func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid
type stackItem struct {
values []pilorama.NodeInfo
parent pilorama.Node
last string
last *string
}
// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
@ -502,7 +502,7 @@ func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid
}
if b.GetDepth() == 0 || uint32(len(stack)) < b.GetDepth() {
children, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), node.ID, "", batchSize)
children, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), node.ID, nil, batchSize)
if err != nil {
return err
}