From ac81c70c09cd9109c44345a94c96ed339153d187 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Fri, 22 Jul 2022 11:53:05 +0300 Subject: [PATCH] [#1621] pilorama: Batch related operations Signed-off-by: Evgenii Stratonikov Signed-off-by: Evgenii Stratonikov Signed-off-by: Evgenii Stratonikov --- CHANGELOG.md | 2 + pkg/local_object_storage/pilorama/batch.go | 49 ++++ pkg/local_object_storage/pilorama/boltdb.go | 126 ++++++--- .../pilorama/forest_test.go | 258 ++++++++++++------ 4 files changed, 307 insertions(+), 128 deletions(-) create mode 100644 pkg/local_object_storage/pilorama/batch.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 67fa8a418..ac236d9ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ Changelog for FrostFS Node ## [Unreleased] ### Added +- Separate batching for replicated operations over the same container in pilorama (#1621) + ### Changed ### Fixed - Big object removal with non-local parts (#1978) diff --git a/pkg/local_object_storage/pilorama/batch.go b/pkg/local_object_storage/pilorama/batch.go new file mode 100644 index 000000000..c4c3574ba --- /dev/null +++ b/pkg/local_object_storage/pilorama/batch.go @@ -0,0 +1,49 @@ +package pilorama + +import ( + "sort" + "sync" + "time" + + cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id" + "go.etcd.io/bbolt" +) + +type batch struct { + forest *boltForest + timer *time.Timer + mtx sync.Mutex + start sync.Once + cid cidSDK.ID + treeID string + results []chan<- error + operations []*Move +} + +func (b *batch) trigger() { + b.mtx.Lock() + if b.timer != nil { + b.timer.Stop() + b.timer = nil + } + b.mtx.Unlock() + b.start.Do(b.run) +} + +func (b *batch) run() { + sort.Slice(b.operations, func(i, j int) bool { + return b.operations[i].Time < b.operations[j].Time + }) + err := b.forest.db.Update(func(tx *bbolt.Tx) error { + bLog, bTree, err := b.forest.getTreeBuckets(tx, b.cid, b.treeID) + if err != nil { + return err + } + + var lm LogMove + return b.forest.applyOperation(bLog, bTree, b.operations, &lm) + }) + for i := range b.operations { + b.results[i] <- err + } +} diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index d3c13942e..6df455be0 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -24,6 +24,11 @@ type boltForest struct { modeMtx sync.RWMutex mode mode.Mode + + // mtx protects batches field. + mtx sync.Mutex + batches []*batch + cfg } @@ -318,15 +323,64 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou } } - return t.db.Batch(func(tx *bbolt.Tx) error { - bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) - if err != nil { - return err + if t.db.MaxBatchSize == 1 { + return t.db.Update(func(tx *bbolt.Tx) error { + bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) + if err != nil { + return err + } + + var lm LogMove + return t.applyOperation(bLog, bTree, []*Move{m}, &lm) + }) + } + + ch := make(chan error, 1) + t.addBatch(d, treeID, m, ch) + return <-ch +} + +func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) { + t.mtx.Lock() + for i := 0; i < len(t.batches); i++ { + t.batches[i].mtx.Lock() + if t.batches[i].timer == nil { + t.batches[i].mtx.Unlock() + copy(t.batches[i:], t.batches[i+1:]) + t.batches = t.batches[:len(t.batches)-1] + i-- + continue } - lm := &LogMove{Move: *m} - return t.applyOperation(bLog, bTree, lm) - }) + found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID + if found { + t.batches[i].results = append(t.batches[i].results, ch) + t.batches[i].operations = append(t.batches[i].operations, m) + if len(t.batches[i].operations) == t.db.MaxBatchSize { + t.batches[i].timer.Stop() + t.batches[i].timer = nil + t.batches[i].mtx.Unlock() + b := t.batches[i] + t.mtx.Unlock() + b.trigger() + return + } + t.batches[i].mtx.Unlock() + t.mtx.Unlock() + return + } + t.batches[i].mtx.Unlock() + } + b := &batch{ + forest: t, + cid: d.CID, + treeID: treeID, + results: []chan<- error{ch}, + operations: []*Move{m}, + } + b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger) + t.batches = append(t.batches, b) + t.mtx.Unlock() } func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) { @@ -351,16 +405,11 @@ func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) return bLog, bData, nil } -func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *LogMove) error { +// applyOperations applies log operations. Assumes lm are sorted by timestamp. +func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*Move, lm *LogMove) error { var tmp LogMove var cKey [17]byte - var logKey [8]byte - binary.BigEndian.PutUint64(logKey[:], lm.Time) - if logBucket.Get(logKey[:]) != nil { - return nil - } - c := logBucket.Cursor() key, value := c.Last() @@ -369,43 +418,42 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log r := io.NewBinReaderFromIO(b) // 1. Undo up until the desired timestamp is here. - for len(key) == 8 && binary.BigEndian.Uint64(key) > lm.Time { + for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) { b.Reset(value) if err := t.logFromBytes(&tmp, r); err != nil { return err } - if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil { + if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil { return err } key, value = c.Prev() } - key, _ = c.Next() + for i := 0; i < len(ms); i++ { + // Loop invariant: key represents the next stored timestamp after ms[i].Time. - // 2. Insert the operation. - if len(key) != 8 || binary.BigEndian.Uint64(key) != lm.Time { + // 2. Insert the operation. + lm.Move = *ms[i] if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil { return err } - } - if key == nil { - // The operation is inserted in the beginning, reposition the cursor. - // Otherwise, `Next` call will return currently inserted operation. - c.First() - } - key, value = c.Seek(key) - - // 3. Re-apply all other operations. - for len(key) == 8 { - b.Reset(value) - if err := t.logFromBytes(&tmp, r); err != nil { - return err - } - if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil { - return err - } + // Cursor can be invalid, seek again. + binary.BigEndian.PutUint64(cKey[:], lm.Time) + _, _ = c.Seek(cKey[:8]) key, value = c.Next() + + // 3. Re-apply all other operations. + for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) { + b.Reset(value) + if err := t.logFromBytes(&tmp, r); err != nil { + return err + } + if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil { + return err + } + key, value = c.Next() + } } return nil @@ -511,15 +559,15 @@ func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, me return nil } -func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error { +func (t *boltForest) undo(m *LogMove, b *bbolt.Bucket, key []byte) error { if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil { return err } - if !lm.HasOld { + if !m.HasOld { return t.removeNode(b, key, m.Child, m.Parent) } - return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta) + return t.addNode(b, key, m.Child, m.Old.Parent, m.Old.Meta) } func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool { diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index 54ae94123..cbd7f5143 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "strconv" + "sync" "testing" cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id" @@ -16,9 +17,9 @@ import ( var providers = []struct { name string - construct func(t testing.TB) Forest + construct func(t testing.TB, opts ...Option) Forest }{ - {"inmemory", func(t testing.TB) Forest { + {"inmemory", func(t testing.TB, _ ...Option) Forest { f := NewMemoryForest() require.NoError(t, f.Open(false)) require.NoError(t, f.Init()) @@ -28,14 +29,15 @@ var providers = []struct { return f }}, - {"bbolt", func(t testing.TB) Forest { + {"bbolt", func(t testing.TB, opts ...Option) Forest { // Use `os.TempDir` because we construct multiple times in the same test. tmpDir, err := os.MkdirTemp(os.TempDir(), "*") require.NoError(t, err) f := NewBoltForest( - WithPath(filepath.Join(tmpDir, "test.db")), - WithMaxBatchSize(1)) + append([]Option{ + WithPath(filepath.Join(tmpDir, "test.db")), + WithMaxBatchSize(1)}, opts...)...) require.NoError(t, f.Open(false)) require.NoError(t, f.Init()) t.Cleanup(func() { @@ -407,7 +409,7 @@ func TestForest_Apply(t *testing.T) { } } -func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) { +func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) { cid := cidtest.ID() d := CIDDescriptor{cid, 0, 1} treeID := "version" @@ -461,7 +463,7 @@ func TestForest_GetOpLog(t *testing.T) { } } -func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) { +func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) { cid := cidtest.ID() d := CIDDescriptor{cid, 0, 1} treeID := "version" @@ -520,7 +522,7 @@ func TestForest_TreeExists(t *testing.T) { } } -func testForestTreeExists(t *testing.T, constructor func(t testing.TB) Forest) { +func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...Option) Forest) { s := constructor(t) checkExists := func(t *testing.T, expected bool, cid cidSDK.ID, treeID string) { @@ -654,20 +656,20 @@ func TestForest_ApplyRandom(t *testing.T) { } } -func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Forest) { - rand.Seed(42) - - const ( - nodeCount = 5 - opCount = 20 - iterCount = 200 - ) - - cid := cidtest.ID() - d := CIDDescriptor{cid, 0, 1} - treeID := "version" - expected := constructor(t) +func TestForest_ParallelApply(t *testing.T) { + for i := range providers { + if providers[i].name == "inmemory" { + continue + } + t.Run(providers[i].name, func(t *testing.T) { + testForestTreeParallelApply(t, providers[i].construct, 8, 128, 10) + }) + } +} +// prepareRandomTree creates a random sequence of operation and applies them to tree. +// The operations are guaranteed to be applied and returned sorted by `Time`. +func prepareRandomTree(nodeCount, opCount int) []Move { ops := make([]Move, nodeCount+opCount) for i := 0; i < nodeCount; i++ { ops[i] = Move{ @@ -686,7 +688,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore for i := nodeCount; i < len(ops); i++ { ops[i] = Move{ - Parent: rand.Uint64() % (nodeCount + 12), + Parent: rand.Uint64() % uint64(nodeCount+12), Meta: Meta{ Time: Timestamp(i + nodeCount), Items: []KeyValue{ @@ -694,17 +696,111 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore {Value: make([]byte, 10)}, }, }, - Child: rand.Uint64() % (nodeCount + 10), + Child: rand.Uint64() % uint64(nodeCount+10), } if rand.Uint32()%5 == 0 { ops[i].Parent = TrashID } rand.Read(ops[i].Meta.Items[1].Value) } + + return ops +} + +func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) { + for i := uint64(0); i < uint64(nodeCount); i++ { + expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i) + require.NoError(t, err) + actualMeta, actualParent, err := actual.TreeGetMeta(cid, treeID, i) + require.NoError(t, err) + require.Equal(t, expectedParent, actualParent, "node id: %d", i) + require.Equal(t, expectedMeta, actualMeta, "node id: %d", i) + + if ma, ok := actual.(*memoryForest); ok { + me := expected.(*memoryForest) + require.Equal(t, len(me.treeMap), len(ma.treeMap)) + + for k, sa := range ma.treeMap { + se, ok := me.treeMap[k] + require.True(t, ok) + require.Equal(t, se.operations, sa.operations) + require.Equal(t, se.infoMap, sa.infoMap) + + require.Equal(t, len(se.childMap), len(sa.childMap)) + for ck, la := range sa.childMap { + le, ok := se.childMap[ck] + require.True(t, ok) + require.ElementsMatch(t, le, la) + } + } + require.Equal(t, expected, actual, i) + } + } +} + +func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, batchSize, opCount, iterCount int) { + rand.Seed(42) + + const nodeCount = 5 + + ops := prepareRandomTree(nodeCount, opCount) + + cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} + treeID := "version" + + expected := constructor(t) for i := range ops { require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false)) } + for i := 0; i < iterCount; i++ { + // Shuffle random operations, leave initialization in place. + rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] }) + + actual := constructor(t, WithMaxBatchSize(batchSize)) + wg := new(sync.WaitGroup) + ch := make(chan *Move, 0) + for i := 0; i < batchSize; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for op := range ch { + require.NoError(t, actual.TreeApply(d, treeID, op, false)) + } + }() + } + + for i := range ops { + ch <- &ops[i] + } + close(ch) + wg.Wait() + + compareForests(t, expected, actual, cid, treeID, nodeCount) + } +} + +func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) { + rand.Seed(42) + + const ( + nodeCount = 5 + opCount = 20 + ) + + ops := prepareRandomTree(nodeCount, opCount) + + cid := cidtest.ID() + d := CIDDescriptor{cid, 0, 1} + treeID := "version" + + expected := constructor(t) + for i := range ops { + require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false)) + } + + const iterCount = 200 for i := 0; i < iterCount; i++ { // Shuffle random operations, leave initialization in place. rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] }) @@ -713,59 +809,39 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore for i := range ops { require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false)) } - for i := uint64(0); i < nodeCount; i++ { - expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i) - require.NoError(t, err) - actualMeta, actualParent, err := actual.TreeGetMeta(cid, treeID, i) - require.NoError(t, err) - require.Equal(t, expectedParent, actualParent, "node id: %d", i) - require.Equal(t, expectedMeta, actualMeta, "node id: %d", i) - - if ma, ok := actual.(*memoryForest); ok { - me := expected.(*memoryForest) - require.Equal(t, len(me.treeMap), len(ma.treeMap)) - - for k, sa := range ma.treeMap { - se, ok := me.treeMap[k] - require.True(t, ok) - require.Equal(t, se.operations, sa.operations) - require.Equal(t, se.infoMap, sa.infoMap) - - require.Equal(t, len(se.childMap), len(sa.childMap)) - for ck, la := range sa.childMap { - le, ok := se.childMap[ck] - require.True(t, ok) - require.ElementsMatch(t, le, la) - } - } - require.Equal(t, expected, actual, i) - } - } + compareForests(t, expected, actual, cid, treeID, nodeCount) } } const benchNodeCount = 1000 +var batchSizes = []int{1, 2, 4, 8, 16, 32} + func BenchmarkApplySequential(b *testing.B) { for i := range providers { if providers[i].name == "inmemory" { // memory backend is not thread-safe continue } b.Run(providers[i].name, func(b *testing.B) { - benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { - ops := make([]Move, opCount) - for i := range ops { - ops[i] = Move{ - Parent: uint64(rand.Intn(benchNodeCount)), - Meta: Meta{ - Time: Timestamp(i), - Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, - }, - Child: uint64(rand.Intn(benchNodeCount)), - } - } - return ops - }) + for _, bs := range batchSizes { + b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) { + s := providers[i].construct(b, WithMaxBatchSize(bs)) + benchmarkApply(b, s, func(opCount int) []Move { + ops := make([]Move, opCount) + for i := range ops { + ops[i] = Move{ + Parent: uint64(rand.Intn(benchNodeCount)), + Meta: Meta{ + Time: Timestamp(i), + Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, + }, + Child: uint64(rand.Intn(benchNodeCount)), + } + } + return ops + }) + }) + } }) } } @@ -780,25 +856,30 @@ func BenchmarkApplyReorderLast(b *testing.B) { continue } b.Run(providers[i].name, func(b *testing.B) { - benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { - ops := make([]Move, opCount) - for i := range ops { - ops[i] = Move{ - Parent: uint64(rand.Intn(benchNodeCount)), - Meta: Meta{ - Time: Timestamp(i), - Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, - }, - Child: uint64(rand.Intn(benchNodeCount)), - } - if i != 0 && i%blockSize == 0 { - for j := 0; j < blockSize/2; j++ { - ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j] + for _, bs := range batchSizes { + b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) { + s := providers[i].construct(b, WithMaxBatchSize(bs)) + benchmarkApply(b, s, func(opCount int) []Move { + ops := make([]Move, opCount) + for i := range ops { + ops[i] = Move{ + Parent: uint64(rand.Intn(benchNodeCount)), + Meta: Meta{ + Time: Timestamp(i), + Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, + }, + Child: uint64(rand.Intn(benchNodeCount)), + } + if i != 0 && i%blockSize == 0 { + for j := 0; j < blockSize/2; j++ { + ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j] + } + } } - } - } - return ops - }) + return ops + }) + }) + } }) } } @@ -810,18 +891,17 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) { cid := cidtest.ID() d := CIDDescriptor{cid, 0, 1} treeID := "version" - ch := make(chan *Move, b.N) - for i := range ops { - ch <- &ops[i] + ch := make(chan int, b.N) + for i := 0; i < b.N; i++ { + ch <- i } b.ResetTimer() b.ReportAllocs() - b.SetParallelism(50) + b.SetParallelism(10) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - op := <-ch - if err := s.TreeApply(d, treeID, op, false); err != nil { + if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil { b.Fatalf("error in `Apply`: %v", err) } }