[#1621] pilorama: Batch related operations

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
Signed-off-by: Evgenii Stratonikov <evgeniy@morphbits.ru>
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
Evgenii Stratonikov 2022-07-22 11:53:05 +03:00 committed by fyrchik
parent 9009612a82
commit ac81c70c09
4 changed files with 307 additions and 128 deletions

View file

@ -4,6 +4,8 @@ Changelog for FrostFS Node
## [Unreleased]
### Added
- Separate batching for replicated operations over the same container in pilorama (#1621)
### Changed
### Fixed
- Big object removal with non-local parts (#1978)

View file

@ -0,0 +1,49 @@
package pilorama
import (
"sort"
"sync"
"time"
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"go.etcd.io/bbolt"
)
type batch struct {
forest *boltForest
timer *time.Timer
mtx sync.Mutex
start sync.Once
cid cidSDK.ID
treeID string
results []chan<- error
operations []*Move
}
func (b *batch) trigger() {
b.mtx.Lock()
if b.timer != nil {
b.timer.Stop()
b.timer = nil
}
b.mtx.Unlock()
b.start.Do(b.run)
}
func (b *batch) run() {
sort.Slice(b.operations, func(i, j int) bool {
return b.operations[i].Time < b.operations[j].Time
})
err := b.forest.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := b.forest.getTreeBuckets(tx, b.cid, b.treeID)
if err != nil {
return err
}
var lm LogMove
return b.forest.applyOperation(bLog, bTree, b.operations, &lm)
})
for i := range b.operations {
b.results[i] <- err
}
}

View file

@ -24,6 +24,11 @@ type boltForest struct {
modeMtx sync.RWMutex
mode mode.Mode
// mtx protects batches field.
mtx sync.Mutex
batches []*batch
cfg
}
@ -318,15 +323,64 @@ func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move, backgrou
}
}
return t.db.Batch(func(tx *bbolt.Tx) error {
if t.db.MaxBatchSize == 1 {
return t.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
if err != nil {
return err
}
lm := &LogMove{Move: *m}
return t.applyOperation(bLog, bTree, lm)
var lm LogMove
return t.applyOperation(bLog, bTree, []*Move{m}, &lm)
})
}
ch := make(chan error, 1)
t.addBatch(d, treeID, m, ch)
return <-ch
}
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m *Move, ch chan error) {
t.mtx.Lock()
for i := 0; i < len(t.batches); i++ {
t.batches[i].mtx.Lock()
if t.batches[i].timer == nil {
t.batches[i].mtx.Unlock()
copy(t.batches[i:], t.batches[i+1:])
t.batches = t.batches[:len(t.batches)-1]
i--
continue
}
found := t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID
if found {
t.batches[i].results = append(t.batches[i].results, ch)
t.batches[i].operations = append(t.batches[i].operations, m)
if len(t.batches[i].operations) == t.db.MaxBatchSize {
t.batches[i].timer.Stop()
t.batches[i].timer = nil
t.batches[i].mtx.Unlock()
b := t.batches[i]
t.mtx.Unlock()
b.trigger()
return
}
t.batches[i].mtx.Unlock()
t.mtx.Unlock()
return
}
t.batches[i].mtx.Unlock()
}
b := &batch{
forest: t,
cid: d.CID,
treeID: treeID,
results: []chan<- error{ch},
operations: []*Move{m},
}
b.timer = time.AfterFunc(t.db.MaxBatchDelay, b.trigger)
t.batches = append(t.batches, b)
t.mtx.Unlock()
}
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) {
@ -351,16 +405,11 @@ func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string)
return bLog, bData, nil
}
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *LogMove) error {
// applyOperations applies log operations. Assumes lm are sorted by timestamp.
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*Move, lm *LogMove) error {
var tmp LogMove
var cKey [17]byte
var logKey [8]byte
binary.BigEndian.PutUint64(logKey[:], lm.Time)
if logBucket.Get(logKey[:]) != nil {
return nil
}
c := logBucket.Cursor()
key, value := c.Last()
@ -369,35 +418,33 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log
r := io.NewBinReaderFromIO(b)
// 1. Undo up until the desired timestamp is here.
for len(key) == 8 && binary.BigEndian.Uint64(key) > lm.Time {
for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
return err
}
if err := t.undo(&tmp.Move, &tmp, treeBucket, cKey[:]); err != nil {
if err := t.undo(&tmp, treeBucket, cKey[:]); err != nil {
return err
}
key, value = c.Prev()
}
key, _ = c.Next()
for i := 0; i < len(ms); i++ {
// Loop invariant: key represents the next stored timestamp after ms[i].Time.
// 2. Insert the operation.
if len(key) != 8 || binary.BigEndian.Uint64(key) != lm.Time {
lm.Move = *ms[i]
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
return err
}
}
if key == nil {
// The operation is inserted in the beginning, reposition the cursor.
// Otherwise, `Next` call will return currently inserted operation.
c.First()
}
key, value = c.Seek(key)
// Cursor can be invalid, seek again.
binary.BigEndian.PutUint64(cKey[:], lm.Time)
_, _ = c.Seek(cKey[:8])
key, value = c.Next()
// 3. Re-apply all other operations.
for len(key) == 8 {
for len(key) == 8 && (i == len(ms)-1 || binary.BigEndian.Uint64(key) < ms[i+1].Time) {
b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
return err
@ -407,6 +454,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log
}
key, value = c.Next()
}
}
return nil
}
@ -511,15 +559,15 @@ func (t *boltForest) addNode(b *bbolt.Bucket, key []byte, child, parent Node, me
return nil
}
func (t *boltForest) undo(m *Move, lm *LogMove, b *bbolt.Bucket, key []byte) error {
func (t *boltForest) undo(m *LogMove, b *bbolt.Bucket, key []byte) error {
if err := b.Delete(childrenKey(key, m.Child, m.Parent)); err != nil {
return err
}
if !lm.HasOld {
if !m.HasOld {
return t.removeNode(b, key, m.Child, m.Parent)
}
return t.addNode(b, key, m.Child, lm.Old.Parent, lm.Old.Meta)
return t.addNode(b, key, m.Child, m.Old.Parent, m.Old.Meta)
}
func (t *boltForest) isAncestor(b *bbolt.Bucket, parent, child Node) bool {

View file

@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"strconv"
"sync"
"testing"
cidSDK "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
@ -16,9 +17,9 @@ import (
var providers = []struct {
name string
construct func(t testing.TB) Forest
construct func(t testing.TB, opts ...Option) Forest
}{
{"inmemory", func(t testing.TB) Forest {
{"inmemory", func(t testing.TB, _ ...Option) Forest {
f := NewMemoryForest()
require.NoError(t, f.Open(false))
require.NoError(t, f.Init())
@ -28,14 +29,15 @@ var providers = []struct {
return f
}},
{"bbolt", func(t testing.TB) Forest {
{"bbolt", func(t testing.TB, opts ...Option) Forest {
// Use `os.TempDir` because we construct multiple times in the same test.
tmpDir, err := os.MkdirTemp(os.TempDir(), "*")
require.NoError(t, err)
f := NewBoltForest(
append([]Option{
WithPath(filepath.Join(tmpDir, "test.db")),
WithMaxBatchSize(1))
WithMaxBatchSize(1)}, opts...)...)
require.NoError(t, f.Open(false))
require.NoError(t, f.Init())
t.Cleanup(func() {
@ -407,7 +409,7 @@ func TestForest_Apply(t *testing.T) {
}
}
func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) {
func testForestTreeApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
@ -461,7 +463,7 @@ func TestForest_GetOpLog(t *testing.T) {
}
}
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) {
func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
@ -520,7 +522,7 @@ func TestForest_TreeExists(t *testing.T) {
}
}
func testForestTreeExists(t *testing.T, constructor func(t testing.TB) Forest) {
func testForestTreeExists(t *testing.T, constructor func(t testing.TB, opts ...Option) Forest) {
s := constructor(t)
checkExists := func(t *testing.T, expected bool, cid cidSDK.ID, treeID string) {
@ -654,20 +656,20 @@ func TestForest_ApplyRandom(t *testing.T) {
}
}
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Forest) {
rand.Seed(42)
const (
nodeCount = 5
opCount = 20
iterCount = 200
)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
func TestForest_ParallelApply(t *testing.T) {
for i := range providers {
if providers[i].name == "inmemory" {
continue
}
t.Run(providers[i].name, func(t *testing.T) {
testForestTreeParallelApply(t, providers[i].construct, 8, 128, 10)
})
}
}
// prepareRandomTree creates a random sequence of operation and applies them to tree.
// The operations are guaranteed to be applied and returned sorted by `Time`.
func prepareRandomTree(nodeCount, opCount int) []Move {
ops := make([]Move, nodeCount+opCount)
for i := 0; i < nodeCount; i++ {
ops[i] = Move{
@ -686,7 +688,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
for i := nodeCount; i < len(ops); i++ {
ops[i] = Move{
Parent: rand.Uint64() % (nodeCount + 12),
Parent: rand.Uint64() % uint64(nodeCount+12),
Meta: Meta{
Time: Timestamp(i + nodeCount),
Items: []KeyValue{
@ -694,26 +696,19 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
{Value: make([]byte, 10)},
},
},
Child: rand.Uint64() % (nodeCount + 10),
Child: rand.Uint64() % uint64(nodeCount+10),
}
if rand.Uint32()%5 == 0 {
ops[i].Parent = TrashID
}
rand.Read(ops[i].Meta.Items[1].Value)
}
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
}
for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place.
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
return ops
}
actual := constructor(t)
for i := range ops {
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
}
for i := uint64(0); i < nodeCount; i++ {
func compareForests(t *testing.T, expected, actual Forest, cid cidSDK.ID, treeID string, nodeCount int) {
for i := uint64(0); i < uint64(nodeCount); i++ {
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
require.NoError(t, err)
actualMeta, actualParent, err := actual.TreeGetMeta(cid, treeID, i)
@ -741,18 +736,97 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
require.Equal(t, expected, actual, i)
}
}
}
func testForestTreeParallelApply(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest, batchSize, opCount, iterCount int) {
rand.Seed(42)
const nodeCount = 5
ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
}
for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place.
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
actual := constructor(t, WithMaxBatchSize(batchSize))
wg := new(sync.WaitGroup)
ch := make(chan *Move, 0)
for i := 0; i < batchSize; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for op := range ch {
require.NoError(t, actual.TreeApply(d, treeID, op, false))
}
}()
}
for i := range ops {
ch <- &ops[i]
}
close(ch)
wg.Wait()
compareForests(t, expected, actual, cid, treeID, nodeCount)
}
}
func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB, _ ...Option) Forest) {
rand.Seed(42)
const (
nodeCount = 5
opCount = 20
)
ops := prepareRandomTree(nodeCount, opCount)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
expected := constructor(t)
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i], false))
}
const iterCount = 200
for i := 0; i < iterCount; i++ {
// Shuffle random operations, leave initialization in place.
rand.Shuffle(len(ops), func(i, j int) { ops[i], ops[j] = ops[j], ops[i] })
actual := constructor(t)
for i := range ops {
require.NoError(t, actual.TreeApply(d, treeID, &ops[i], false))
}
compareForests(t, expected, actual, cid, treeID, nodeCount)
}
}
const benchNodeCount = 1000
var batchSizes = []int{1, 2, 4, 8, 16, 32}
func BenchmarkApplySequential(b *testing.B) {
for i := range providers {
if providers[i].name == "inmemory" { // memory backend is not thread-safe
continue
}
b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
for _, bs := range batchSizes {
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
s := providers[i].construct(b, WithMaxBatchSize(bs))
benchmarkApply(b, s, func(opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
@ -768,6 +842,8 @@ func BenchmarkApplySequential(b *testing.B) {
})
})
}
})
}
}
func BenchmarkApplyReorderLast(b *testing.B) {
@ -780,7 +856,10 @@ func BenchmarkApplyReorderLast(b *testing.B) {
continue
}
b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
for _, bs := range batchSizes {
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
s := providers[i].construct(b, WithMaxBatchSize(bs))
benchmarkApply(b, s, func(opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
@ -801,6 +880,8 @@ func BenchmarkApplyReorderLast(b *testing.B) {
})
})
}
})
}
}
func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
@ -810,18 +891,17 @@ func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
ch := make(chan *Move, b.N)
for i := range ops {
ch <- &ops[i]
ch := make(chan int, b.N)
for i := 0; i < b.N; i++ {
ch <- i
}
b.ResetTimer()
b.ReportAllocs()
b.SetParallelism(50)
b.SetParallelism(10)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
op := <-ch
if err := s.TreeApply(d, treeID, op, false); err != nil {
if err := s.TreeApply(d, treeID, &ops[<-ch], false); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
}