[#1074] pilorama: Allow empty filenames in SortedByFilename()
All checks were successful
DCO action / DCO (pull_request) Successful in 5m38s
Vulncheck / Vulncheck (pull_request) Successful in 5m57s
Tests and linters / gopls check (pull_request) Successful in 6m57s
Build / Build Components (1.22) (pull_request) Successful in 6m48s
Build / Build Components (1.21) (pull_request) Successful in 6m52s
Tests and linters / Lint (pull_request) Successful in 8m26s
Tests and linters / Staticcheck (pull_request) Successful in 8m48s
Tests and linters / Tests (1.22) (pull_request) Successful in 11m39s
Tests and linters / Tests (1.21) (pull_request) Successful in 11m47s
Tests and linters / Tests with -race (pull_request) Successful in 13m49s
Pre-commit hooks / Pre-commit (pull_request) Successful in 17m24s

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2024-04-04 10:40:21 +03:00
parent 4c7ff159ec
commit 7085723c6b
8 changed files with 53 additions and 34 deletions

View file

@ -210,7 +210,7 @@ func (e *StorageEngine) TreeGetChildren(ctx context.Context, cid cidSDK.ID, tree
}
// TreeSortedByFilename implements the pilorama.Forest interface.
func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last string, count int) ([]pilorama.NodeInfo, string, error) {
func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last *string, count int) ([]pilorama.NodeInfo, *string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("container_id", cid.EncodeToString()),
@ -222,7 +222,7 @@ func (e *StorageEngine) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID,
var err error
var nodes []pilorama.NodeInfo
var cursor string
var cursor *string
for _, sh := range e.sortShards(cid) {
nodes, cursor, err = sh.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
if err != nil {

View file

@ -1004,7 +1004,7 @@ func (t *boltForest) hasFewChildren(b *bbolt.Bucket, nodeID Node, threshold int)
}
// TreeSortedByFilename implements the Forest interface.
func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last string, count int) ([]NodeInfo, string, error) {
func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last *string, count int) ([]NodeInfo, *string, error) {
var (
startedAt = time.Now()
success = false
@ -1026,7 +1026,7 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, "", ErrDegradedMode
return nil, last, ErrDegradedMode
}
h := newHeap(last, count)
@ -1070,20 +1070,25 @@ func (t *boltForest) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, tr
}
if fewChildren {
result = sortAndCut(result, []byte(last))
result = sortAndCut(result, last)
}
if len(result) != 0 {
last = string(result[len(result)-1].Meta.GetAttr(AttributeFilename))
s := string(result[len(result)-1].Meta.GetAttr(AttributeFilename))
last = &s
}
return result, last, metaerr.Wrap(err)
}
func sortAndCut(result []NodeInfo, last []byte) []NodeInfo {
func sortAndCut(result []NodeInfo, last *string) []NodeInfo {
var lastBytes []byte
if last != nil {
lastBytes = []byte(*last)
}
sort.Slice(result, func(i, j int) bool {
return bytes.Compare(result[i].Meta.GetAttr(AttributeFilename), result[j].Meta.GetAttr(AttributeFilename)) == -1
})
for i := range result {
if bytes.Compare(last, result[i].Meta.GetAttr(AttributeFilename)) == -1 {
if lastBytes == nil || bytes.Compare(lastBytes, result[i].Meta.GetAttr(AttributeFilename)) == -1 {
return result[i:]
}
}

View file

@ -156,11 +156,11 @@ func (f *memoryForest) TreeGetMeta(_ context.Context, cid cid.ID, treeID string,
}
// TreeSortedByFilename implements the Forest interface.
func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeID string, nodeID Node, start string, count int) ([]NodeInfo, string, error) {
func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeID string, nodeID Node, start *string, count int) ([]NodeInfo, *string, error) {
fullID := cid.String() + "/" + treeID
s, ok := f.treeMap[fullID]
if !ok {
return nil, "", ErrTreeNotFound
return nil, start, ErrTreeNotFound
}
if count == 0 {
return nil, start, nil
@ -169,7 +169,14 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
children := s.tree.getChildren(nodeID)
res := make([]NodeInfo, 0, len(children))
for _, childID := range children {
if len(s.infoMap[childID].Meta.GetAttr(AttributeFilename)) == 0 {
var found bool
for _, kv := range s.infoMap[childID].Meta.Items {
if kv.Key == AttributeFilename {
found = true
break
}
}
if !found {
continue
}
res = append(res, NodeInfo{
@ -179,22 +186,24 @@ func (f *memoryForest) TreeSortedByFilename(_ context.Context, cid cid.ID, treeI
})
}
if len(res) == 0 {
return res, "", nil
return res, start, nil
}
sort.Slice(res, func(i, j int) bool {
return bytes.Compare(res[i].Meta.GetAttr(AttributeFilename), res[j].Meta.GetAttr(AttributeFilename)) == -1
})
for i := range res {
if string(res[i].Meta.GetAttr(AttributeFilename)) > start {
if start == nil || string(res[i].Meta.GetAttr(AttributeFilename)) > *start {
finish := i + count
if len(res) < finish {
finish = len(res)
}
return res[i:finish], string(res[finish-1].Meta.GetAttr(AttributeFilename)), nil
last := string(res[finish-1].Meta.GetAttr(AttributeFilename))
return res[i:finish], &last, nil
}
}
return nil, string(res[len(res)-1].Meta.GetAttr(AttributeFilename)), nil
last := string(res[len(res)-1].Meta.GetAttr(AttributeFilename))
return nil, &last, nil
}
// TreeGetChildren implements the Forest interface.

View file

@ -215,7 +215,7 @@ func BenchmarkForestSortedIteration(b *testing.B) {
b.Run(providers[i].name+",root", func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, RootID, "", 100)
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, RootID, nil, 100)
if err != nil || len(res) != 100 {
b.Fatalf("err %v, count %d", err, len(res))
}
@ -223,7 +223,7 @@ func BenchmarkForestSortedIteration(b *testing.B) {
})
b.Run(providers[i].name+",leaf", func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, 1, "", 100)
res, _, err := f.TreeSortedByFilename(context.Background(), cnr, treeID, 1, nil, 100)
if err != nil || len(res) != 0 {
b.FailNow()
}
@ -246,14 +246,14 @@ func testForestTreeSortedIteration(t *testing.T, s ForestStorage) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
treeAdd := func(t *testing.T, ts int) {
treeAdd := func(t *testing.T, ts int, filename string) {
_, err := s.TreeMove(context.Background(), d, treeID, &Move{
Child: RootID + uint64(ts),
Parent: RootID,
Meta: Meta{
Time: Timestamp(ts),
Items: []KeyValue{
{Key: AttributeFilename, Value: []byte(strconv.Itoa(ts))},
{Key: AttributeFilename, Value: []byte(filename)},
},
},
})
@ -261,19 +261,20 @@ func testForestTreeSortedIteration(t *testing.T, s ForestStorage) {
}
const count = 9
for i := 0; i < count; i++ {
treeAdd(t, i+1)
treeAdd(t, 1, "")
for i := 1; i < count; i++ {
treeAdd(t, i+1, strconv.Itoa(i+1))
}
var result []NodeInfo
treeAppend := func(t *testing.T, last string, count int) string {
treeAppend := func(t *testing.T, last *string, count int) *string {
res, cursor, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, RootID, last, count)
require.NoError(t, err)
result = append(result, res...)
return cursor
}
last := treeAppend(t, "", 2)
last := treeAppend(t, nil, 2)
last = treeAppend(t, last, 3)
last = treeAppend(t, last, 0)
last = treeAppend(t, last, 1)
@ -282,8 +283,12 @@ func testForestTreeSortedIteration(t *testing.T, s ForestStorage) {
require.Len(t, result, count)
for i := range result {
require.Equal(t, RootID+uint64(i+1), result[i].ID)
if i == 0 {
require.Equal(t, "", string(result[i].Meta.GetAttr(AttributeFilename)))
} else {
require.Equal(t, strconv.Itoa(RootID+i+1), string(result[i].Meta.GetAttr(AttributeFilename)))
}
}
}
func TestForest_TreeSortedFilename(t *testing.T) {
@ -341,7 +346,7 @@ func testForestTreeSortedByFilename(t *testing.T, s ForestStorage) {
}
getChildren := func(t *testing.T, id Node) []NodeInfo {
res, _, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, id, "", len(items))
res, _, err := s.TreeSortedByFilename(context.Background(), d.CID, treeID, id, nil, len(items))
require.NoError(t, err)
return res
}

View file

@ -28,12 +28,12 @@ func (h *filenameHeap) Pop() any {
// fixedHeap maintains a fixed number of smallest elements started at some point.
type fixedHeap struct {
start string
start *string
count int
h *filenameHeap
}
func newHeap(start string, count int) *fixedHeap {
func newHeap(start *string, count int) *fixedHeap {
h := new(filenameHeap)
heap.Init(h)
@ -45,7 +45,7 @@ func newHeap(start string, count int) *fixedHeap {
}
func (h *fixedHeap) push(id Node, filename string) bool {
if filename == "" || filename <= h.start {
if h.start != nil && filename <= *h.start {
return false
}
heap.Push(h.h, heapInfo{id: id, filename: filename})

View file

@ -35,7 +35,7 @@ type Forest interface {
TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node) ([]NodeInfo, error)
// TreeSortedByFilename returns children of the node with the specified ID. The nodes are sorted by the filename attribute..
// Should return ErrTreeNotFound if the tree is not found, and empty result if the node is not in the tree.
TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last string, count int) ([]NodeInfo, string, error)
TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID Node, last *string, count int) ([]NodeInfo, *string, error)
// TreeGetOpLog returns first log operation stored at or above the height.
// In case no such operation is found, empty Move and nil error should be returned.
TreeGetOpLog(ctx context.Context, cid cidSDK.ID, treeID string, height uint64) (Move, error)

View file

@ -184,7 +184,7 @@ func (s *Shard) TreeGetChildren(ctx context.Context, cid cidSDK.ID, treeID strin
}
// TreeSortedByFilename implements the pilorama.Forest interface.
func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last string, count int) ([]pilorama.NodeInfo, string, error) {
func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID string, nodeID pilorama.Node, last *string, count int) ([]pilorama.NodeInfo, *string, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeSortedByFilename",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
@ -196,14 +196,14 @@ func (s *Shard) TreeSortedByFilename(ctx context.Context, cid cidSDK.ID, treeID
defer span.End()
if s.pilorama == nil {
return nil, "", ErrPiloramaDisabled
return nil, last, ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return nil, "", ErrDegradedMode
return nil, last, ErrDegradedMode
}
return s.pilorama.TreeSortedByFilename(ctx, cid, treeID, nodeID, last, count)
}

View file

@ -446,7 +446,7 @@ func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid
type stackItem struct {
values []pilorama.NodeInfo
parent pilorama.Node
last string
last *string
}
// Traverse the tree in a DFS manner. Because we need to support arbitrary depth,
@ -502,7 +502,7 @@ func getSortedSubTree(ctx context.Context, srv TreeService_GetSubTreeServer, cid
}
if b.GetDepth() == 0 || uint32(len(stack)) < b.GetDepth() {
children, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), node.ID, "", batchSize)
children, last, err := forest.TreeSortedByFilename(ctx, cid, b.GetTreeId(), node.ID, nil, batchSize)
if err != nil {
return err
}