[#1621] pilorama: Batch related operations
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru> Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru> Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
847a262ece
commit
38f9a5f73f
4 changed files with 307 additions and 128 deletions
|
@ -4,6 +4,8 @@ Changelog for FrostFS Node
|
|||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- Separate batching for replicated operations over the same container in pilorama (#1621)
|
||||
|
||||
### Changed
|
||||
### Fixed
|
||||
- Big object removal with non-local parts (#1978)
|
||||
|
|
49
pkg/local_object_storage/pilorama/batch.go
Normal file
49
pkg/local_object_storage/pilorama/batch.go
Normal file
|
@ -0,0 +1,49 @@
|
|||
package pilorama
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type batch struct {
|
||||
forest *boltForest
|
||||
timer *time.Timer
|
||||
mtx sync.Mutex
|
||||
start sync.Once
|
||||
cid cidSDK.ID
|
||||
treeID string
|
||||
results []chan<- error
|
||||
operations []*Move
|
||||
}
|
||||
|
||||
func (b *batch) trigger() {
|
||||
b.mtx.Lock()
|
||||
if b.timer != nil {
|
||||
b.timer.Stop()
|
||||
b.timer = nil
|
||||
}
|
||||
b.mtx.Unlock()
|
||||
b.start.Do(b.run)
|
||||
}
|
||||
|
||||
func (b *batch) run() {
|
||||
sort.Slice(b.operations, func(i, j int) bool {
|
||||
return b.operations[i].Time < b.operations[j].Time
|
||||
})
|
||||
err := b.forest.db.Update(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := b.forest.getTreeBuckets(tx, b.cid, b.treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var lm LogMove
|
||||
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
|
||||
})
|
||||
for i := range b.operations {
|
||||
b.results[i] <- err
|
||||
}
|
||||
}
|
|
@ -24,6 +24,11 @@ type boltForest struct {
|
|||
|
||||
modeMtx sync.RWMutex
|
||||
mode mode.Mode
|
||||
|
||||
// mtx protects batches field.
|
||||
mtx sync.Mutex
|
||||
batches []*batch
|
||||
|
||||
cfg
|
||||
}
|
||||
|
||||
|
@ -318,17 +323,66 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
|
|||
}
|
||||
}
|
||||
|
||||
return t.db.Batch(func(tx *bbolt.Tx) error {
|
||||
if t.db.MaxBatchSize == 1 {
|
||||
return t.db.Update(func(tx *bbolt.Tx) error {
|
||||
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lm := &LogMove{Move: *m}
|
||||
return t.applyOperation(bLog, bTree, lm)
|
||||
var lm LogMove
|
||||
return t.applyOperation(bLog, bTree, []*Move{m}, &lm)
|
||||
})
|
||||
}
|
||||
|
||||
ch := make(chan error, 1)
|
||||
t.addBatch(d, treeID, m, ch)
|
||||
return <-ch
|
||||
}
|
||||
|
||||
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) {
|
||||
t.mtx.Lock()
|
||||
for i := 0; i < len(t.batches); i++ {
|
||||
t.batches[i].mtx.Lock()
|
||||
if t.batches[i].timer == nil {
|
||||
t.batches[i].mtx.Unlock()
|
||||
copy(t.batches[i:], t.batches[i+1:])
|
||||
t.batches = t.batches[:len(t.batches)-1]
|
||||
i--
|
||||
continue
|
||||
}
|
||||
|
||||
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID
|
||||
if found {
|
||||
t.batches[i].results = append(t.batches[i].results, ch)
|
||||
t.batches[i].operations = append(t.batches[i].operations, m)
|
||||
if len(t.batches[i].operations) == t.db.MaxBatchSize {
|
||||
t.batches[i].timer.Stop()
|
||||
t.batches[i].timer = nil
|
||||
t.batches[i].mtx.Unlock()
|
||||
b := t.batches[i]
|
||||
t.mtx.Unlock()
|
||||
b.trigger()
|
||||
return
|
||||
}
|
||||
t.batches[i].mtx.Unlock()
|
||||
t.mtx.Unlock()
|
||||
return
|
||||
}
|
||||
t.batches[i].mtx.Unlock()
|
||||
}
|
||||
b := &batch{
|
||||
forest: t,
|
||||
cid: d.CID,
|
||||
treeID: treeID,
|
||||
results: []chan<- error{ch},
|
||||
operations: []*Move{m},
|
||||
}
|
||||
b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
|
||||
t.batches = append(t.batches, b)
|
||||
t.mtx.Unlock()
|
||||
}
|
||||
|
||||
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) {
|
||||
treeRoot := bucketName(cid, treeID)
|
||||
child := tx.Bucket(treeRoot)
|
||||
|
@ -351,16 +405,11 @@ func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string)
|
|||
return bLog, bData, nil
|
||||
}
|
||||
|
||||
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *LogMove) error {
|
||||
// applyOperations applies log operations. Assumes lm are sorted by timestamp.
|
||||
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*Move, lm *LogMove) error {
|
||||
var tmp LogMove
|
||||
var cKey [17]byte
|
||||
|
||||
var logKey [8]byte
|
||||
binary.BigEndian.PutUint64(logKey[:], lm.Time)
|
||||
if logBucket.Get(logKey[:]) != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
c := logBucket.Cursor()
|
||||
|
||||
key, value := c.Last()
|
||||
|
@ -369,35 +418,33 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log
|
|||
r := io.NewBinReaderFromIO(b)
|
||||
|
||||
// 1. Undo up until the desired timestamp is here.
|
||||
for len(key) == 8 && binary.BigEndian.Uint64(key) > lm.Time {
|
||||
for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
|
||||
b.Reset(value)
|
||||
if err := t.logFromBytes(&tmp, r); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil {
|
||||
if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil {
|
||||
return err
|
||||
}
|
||||
key, value = c.Prev()
|
||||
}
|
||||
|
||||
key, _ = c.Next()
|
||||
for i := 0; i < len(ms); i++ {
|
||||
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
|
||||
|
||||
// 2. Insert the operation.
|
||||
if len(key) != 8 || binary.BigEndian.Uint64(key) != lm.Time {
|
||||
lm.Move = *ms[i]
|
||||
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if key == nil {
|
||||
// The operation is inserted in the beginning, reposition the cursor.
|
||||
// Otherwise, `Next` call will return currently inserted operation.
|
||||
c.First()
|
||||
}
|
||||
key, value = c.Seek(key)
|
||||
// Cursor can be invalid, seek again.
|
||||
binary.BigEndian.PutUint64(cKey[:], lm.Time)
|
||||
_, _ = c.Seek(cKey[:8])
|
||||
key, value = c.Next()
|
||||
|
||||
// 3. Re-apply all other operations.
|
||||
for len(key) == 8 {
|
||||
for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) {
|
||||
b.Reset(value)
|
||||
if err := t.logFromBytes(&tmp, r); err != nil {
|
||||
return err
|
||||
|
@ -407,6 +454,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log
|
|||
}
|
||||
key, value = c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -511,15 +559,15 @@ func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, me
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error {
|
||||
func (t *boltForest) undo(m *LogMove, b *bbolt.Bucket, key []byte) error {
|
||||
if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !lm.HasOld {
|
||||
if !m.HasOld {
|
||||
return t.removeNode(b, key, m.Child, m.Parent)
|
||||
}
|
||||
return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta)
|
||||
return t.addNode(b, key, m.Child, m.Old.Parent, m.Old.Meta)
|
||||
}
|
||||
|
||||
func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -16,9 +17,9 @@ import (
|
|||
|
||||
var providers = []struct {
|
||||
name string
|
||||
construct func(t testing.TB) Forest
|
||||
construct func(t testing.TB, opts ...Option) Forest
|
||||
}{
|
||||
{"inmemory", func(t testing.TB) Forest {
|
||||
{"inmemory", func(t testing.TB, _ ...Option) Forest {
|
||||
f := NewMemoryForest()
|
||||
require.NoError(t, f.Open(false))
|
||||
require.NoError(t, f.Init())
|
||||
|
@ -28,14 +29,15 @@ var providers = []struct {
|
|||
|
||||
return f
|
||||
}},
|
||||
{"bbolt", func(t testing.TB) Forest {
|
||||
{"bbolt", func(t testing.TB, opts ...Option) Forest {
|
||||
// Use `os.TempDir` because we construct multiple times in the same test.
|
||||
tmpDir, err := os.MkdirTemp(os.TempDir(), "*")
|
||||
require.NoError(t, err)
|
||||
|
||||
f := NewBoltForest(
|
||||
append([]Option{
|
||||
WithPath(filepath.Join(tmpDir, "test.db")),
|
||||
WithMaxBatchSize(1))
|
||||
WithMaxBatchSize(1)}, opts...)...)
|
||||
require.NoError(t, f.Open(false))
|
||||
require.NoError(t, f.Init())
|
||||
t.Cleanup(func() {
|
||||
|
@ -407,7 +409,7 @@ func TestForest_Apply(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
@ -461,7 +463,7 @@ func TestForest_GetOpLog(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
@ -520,7 +522,7 @@ func TestForest_TreeExists(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testForestTreeExists(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...Option) Forest) {
|
||||
s := constructor(t)
|
||||
|
||||
checkExists := func(t *testing.T, expected bool, cid cidSDK.ID, treeID string) {
|
||||
|
@ -654,20 +656,20 @@ func TestForest_ApplyRandom(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Forest) {
|
||||
rand.Seed(42)
|
||||
|
||||
const (
|
||||
nodeCount = 5
|
||||
opCount = 20
|
||||
iterCount = 200
|
||||
)
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
expected := constructor(t)
|
||||
func TestForest_ParallelApply(t *testing.T) {
|
||||
for i := range providers {
|
||||
if providers[i].name == "inmemory" {
|
||||
continue
|
||||
}
|
||||
t.Run(providers[i].name, func(t *testing.T) {
|
||||
testForestTreeParallelApply(t, providers[i].construct, 8, 128, 10)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// prepareRandomTree creates a random sequence of operation and applies them to tree.
|
||||
// The operations are guaranteed to be applied and returned sorted by `Time`.
|
||||
func prepareRandomTree(nodeCount, opCount int) []Move {
|
||||
ops := make([]Move, nodeCount+opCount)
|
||||
for i := 0; i < nodeCount; i++ {
|
||||
ops[i] = Move{
|
||||
|
@ -686,7 +688,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
|
|||
|
||||
for i := nodeCount; i < len(ops); i++ {
|
||||
ops[i] = Move{
|
||||
Parent: rand.Uint64() % (nodeCount + 12),
|
||||
Parent: rand.Uint64() % uint64(nodeCount+12),
|
||||
Meta: Meta{
|
||||
Time: Timestamp(i + nodeCount),
|
||||
Items: []KeyValue{
|
||||
|
@ -694,26 +696,19 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
|
|||
{Value: make([]byte, 10)},
|
||||
},
|
||||
},
|
||||
Child: rand.Uint64() % (nodeCount + 10),
|
||||
Child: rand.Uint64() % uint64(nodeCount+10),
|
||||
}
|
||||
if rand.Uint32()%5 == 0 {
|
||||
ops[i].Parent = TrashID
|
||||
}
|
||||
rand.Read(ops[i].Meta.Items[1].Value)
|
||||
}
|
||||
for i := range ops {
|
||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
||||
|
||||
return ops
|
||||
}
|
||||
|
||||
for i := 0; i < iterCount; i++ {
|
||||
// Shuffle random operations, leave initialization in place.
|
||||
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
|
||||
|
||||
actual := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
|
||||
}
|
||||
for i := uint64(0); i < nodeCount; i++ {
|
||||
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
|
||||
for i := uint64(0); i < uint64(nodeCount); i++ {
|
||||
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
|
||||
require.NoError(t, err)
|
||||
actualMeta, actualParent, err := actual.TreeGetMeta(cid, treeID, i)
|
||||
|
@ -742,17 +737,96 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, batchSize, opCount, iterCount int) {
|
||||
rand.Seed(42)
|
||||
|
||||
const nodeCount = 5
|
||||
|
||||
ops := prepareRandomTree(nodeCount, opCount)
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
expected := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
||||
}
|
||||
|
||||
for i := 0; i < iterCount; i++ {
|
||||
// Shuffle random operations, leave initialization in place.
|
||||
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
|
||||
|
||||
actual := constructor(t, WithMaxBatchSize(batchSize))
|
||||
wg := new(sync.WaitGroup)
|
||||
ch := make(chan *Move, 0)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for op := range ch {
|
||||
require.NoError(t, actual.TreeApply(d, treeID, op, false))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for i := range ops {
|
||||
ch <- &ops[i]
|
||||
}
|
||||
close(ch)
|
||||
wg.Wait()
|
||||
|
||||
compareForests(t, expected, actual, cid, treeID, nodeCount)
|
||||
}
|
||||
}
|
||||
|
||||
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
|
||||
rand.Seed(42)
|
||||
|
||||
const (
|
||||
nodeCount = 5
|
||||
opCount = 20
|
||||
)
|
||||
|
||||
ops := prepareRandomTree(nodeCount, opCount)
|
||||
|
||||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
|
||||
expected := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
|
||||
}
|
||||
|
||||
const iterCount = 200
|
||||
for i := 0; i < iterCount; i++ {
|
||||
// Shuffle random operations, leave initialization in place.
|
||||
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
|
||||
|
||||
actual := constructor(t)
|
||||
for i := range ops {
|
||||
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
|
||||
}
|
||||
compareForests(t, expected, actual, cid, treeID, nodeCount)
|
||||
}
|
||||
}
|
||||
|
||||
const benchNodeCount = 1000
|
||||
|
||||
var batchSizes = []int{1, 2, 4, 8, 16, 32}
|
||||
|
||||
func BenchmarkApplySequential(b *testing.B) {
|
||||
for i := range providers {
|
||||
if providers[i].name == "inmemory" { // memory backend is not thread-safe
|
||||
continue
|
||||
}
|
||||
b.Run(providers[i].name, func(b *testing.B) {
|
||||
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
|
||||
for _, bs := range batchSizes {
|
||||
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
|
||||
s := providers[i].construct(b, WithMaxBatchSize(bs))
|
||||
benchmarkApply(b, s, func(opCount int) []Move {
|
||||
ops := make([]Move, opCount)
|
||||
for i := range ops {
|
||||
ops[i] = Move{
|
||||
|
@ -768,6 +842,8 @@ func BenchmarkApplySequential(b *testing.B) {
|
|||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkApplyReorderLast(b *testing.B) {
|
||||
|
@ -780,7 +856,10 @@ func BenchmarkApplyReorderLast(b *testing.B) {
|
|||
continue
|
||||
}
|
||||
b.Run(providers[i].name, func(b *testing.B) {
|
||||
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
|
||||
for _, bs := range batchSizes {
|
||||
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
|
||||
s := providers[i].construct(b, WithMaxBatchSize(bs))
|
||||
benchmarkApply(b, s, func(opCount int) []Move {
|
||||
ops := make([]Move, opCount)
|
||||
for i := range ops {
|
||||
ops[i] = Move{
|
||||
|
@ -801,6 +880,8 @@ func BenchmarkApplyReorderLast(b *testing.B) {
|
|||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
||||
|
@ -810,18 +891,17 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
|
|||
cid := cidtest.ID()
|
||||
d := CIDDescriptor{cid, 0, 1}
|
||||
treeID := "version"
|
||||
ch := make(chan *Move, b.N)
|
||||
for i := range ops {
|
||||
ch <- &ops[i]
|
||||
ch := make(chan int, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
ch <- i
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
b.SetParallelism(50)
|
||||
b.SetParallelism(10)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
op := <-ch
|
||||
if err := s.TreeApply(d, treeID, op, false); err != nil {
|
||||
if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil {
|
||||
b.Fatalf("error in `Apply`: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue