Fix pilorama sorted listing, support/v0.42 #1328

Merged
fyrchik merged 1 commit from fyrchik/frostfs-node:fix-pochtabank-160 into support/v0.42 2024-09-04 19:51:11 +00:00
3 changed files with 32 additions and 10 deletions

View file

@ -1161,6 +1161,7 @@ func (t *boltForest) fillSortedChildren(b *bbolt.Bucket, nodeIDs MultiNode, h *f
lastFilename = nil lastFilename = nil
nodes = nil nodes = nil
length = actualLength + 1 length = actualLength + 1
count = 0
c.Seek(append(prefix, byte(length), byte(length>>8))) c.Seek(append(prefix, byte(length), byte(length>>8)))
c.Prev() // c.Next() will be performed by for loop c.Prev() // c.Next() will be performed by for loop
} }

View file

@ -237,9 +237,8 @@ func BenchmarkForestSortedIteration(b *testing.B) {
// The issue which we call "BugWithSkip" is easiest to understand when filenames are // The issue which we call "BugWithSkip" is easiest to understand when filenames are
// monotonically increasing numbers. We want the list of sorted filenames to have different length interleaved. // monotonically increasing numbers. We want the list of sorted filenames to have different length interleaved.
// The bug happens when we switch between length during listing. // The bug happens when we switch between length during listing.
// Thus this test contains numbers from 1 to 1000 and batch size of size 100. // Thus this test contains numbers from 1 to 2000 and batch size of size 10.
func TestForest_TreeSortedIterationBugWithSkip(t *testing.T) { func TestForest_TreeSortedIterationBugWithSkip(t *testing.T) {
t.Skip()
for i := range providers { for i := range providers {
t.Run(providers[i].name, func(t *testing.T) { t.Run(providers[i].name, func(t *testing.T) {
testForestTreeSortedIterationBugWithSkip(t, providers[i].construct(t)) testForestTreeSortedIterationBugWithSkip(t, providers[i].construct(t))

View file

@ -2,6 +2,8 @@ package pilorama
import ( import (
"container/heap" "container/heap"
"slices"
"strings"
) )
type heapInfo struct { type heapInfo struct {
@ -29,6 +31,7 @@ func (h *filenameHeap) Pop() any {
// fixedHeap maintains a fixed number of smallest elements started at some point. // fixedHeap maintains a fixed number of smallest elements started at some point.
type fixedHeap struct { type fixedHeap struct {
start *string start *string
sorted bool
count int count int
h *filenameHeap h *filenameHeap
} }
@ -44,20 +47,39 @@ func newHeap(start *string, count int) *fixedHeap {
} }
} }
const amortizationMultiplier = 5
func (h *fixedHeap) push(id MultiNode, filename string) bool { func (h *fixedHeap) push(id MultiNode, filename string) bool {
if h.start != nil && filename <= *h.start { if h.start != nil && filename <= *h.start {
return false return false
} }
heap.Push(h.h, heapInfo{id: id, filename: filename})
if h.h.Len() > h.count { *h.h = append(*h.h, heapInfo{id: id, filename: filename})
heap.Remove(h.h, h.h.Len()-1) h.sorted = false
if h.h.Len() > h.count*amortizationMultiplier {
slices.SortFunc(*h.h, func(a, b heapInfo) int {
return strings.Compare(a.filename, b.filename)
})
*h.h = (*h.h)[:h.count]
} }
return true return true
} }
func (h *fixedHeap) pop() (heapInfo, bool) { func (h *fixedHeap) pop() (heapInfo, bool) {
if h.h.Len() != 0 { if !h.sorted {
return heap.Pop(h.h).(heapInfo), true slices.SortFunc(*h.h, func(a, b heapInfo) int {
return strings.Compare(a.filename, b.filename)
})
if len(*h.h) > h.count {
*h.h = (*h.h)[:h.count]
}
h.sorted = true
}
if len(*h.h) != 0 {
info := (*h.h)[0]
*h.h = (*h.h)[1:]
return info, true
} }
return heapInfo{}, false return heapInfo{}, false
} }