WIP: pilorama: add custom batches

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-07-20 12:11:11 +03:00
parent 34d20fd592
commit 9426fd5046
8 changed files with 204 additions and 89 deletions

View file

@ -51,7 +51,7 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a
}
// TreeApply implements the pilorama.Forest interface.
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error {
func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m []pilorama.Move) error {
var err error
for _, sh := range e.sortShardsByWeight(d.CID) {
err = sh.TreeApply(d, treeID, m)

View file

@ -7,6 +7,8 @@ import (
"math/rand"
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/io"
@ -17,10 +19,22 @@ import (
type boltForest struct {
db *bbolt.DB
// mtx protects batches field.
mtx sync.Mutex
batches []batch
batchesCh chan batch
closeCh chan struct{}
cfg
}
const defaultMaxBatchSize = 10
type batch struct {
cid cidSDK.ID
treeID string
ch []chan error
m []Move
}
var (
dataBucket = []byte{0}
@ -60,7 +74,30 @@ func NewBoltForest(opts ...Option) ForestStorage {
return &b
}
func (t *boltForest) Init() error { return nil }
func (t *boltForest) Init() error {
t.closeCh = make(chan struct{})
batchWorkersCount := t.maxBatchSize
t.batchesCh = make(chan batch, batchWorkersCount)
go func() {
tick := time.NewTicker(time.Millisecond * 20)
defer tick.Stop()
for {
select {
case <-t.closeCh:
return
case <-tick.C:
t.trigger()
}
}
}()
for i := 0; i < batchWorkersCount; i++ {
go t.applier()
}
return nil
}
func (t *boltForest) Open() error {
err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
if err != nil {
@ -91,7 +128,13 @@ func (t *boltForest) Open() error {
return nil
})
}
func (t *boltForest) Close() error { return t.db.Close() }
func (t *boltForest) Close() error {
if t.closeCh != nil {
close(t.closeCh)
t.closeCh = nil
}
return t.db.Close()
}
// TreeMove implements the Forest interface.
func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) {
@ -110,8 +153,7 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove
if m.Child == RootID {
m.Child = t.findSpareID(bTree)
}
lm.Move = *m
return t.applyOperation(bLog, bTree, &lm)
return t.applyOperation(bLog, bTree, []Move{*m}, &lm)
})
}
@ -203,22 +245,63 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 {
}
// TreeApply implements the Forest interface.
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move) error {
func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m []Move) error {
if !d.checkValid() {
return ErrInvalidCIDDescriptor
}
return t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID)
if err != nil {
return err
}
ch := make(chan error, 1)
t.addBatch(d, treeID, m, ch)
return <-ch
}
lm := &LogMove{Move: *m}
return t.applyOperation(bLog, bTree, lm)
func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m []Move, ch chan error) {
t.mtx.Lock()
defer t.mtx.Unlock()
for i := range t.batches {
if t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID {
t.batches[i].ch = append(t.batches[i].ch, ch)
t.batches[i].m = append(t.batches[i].m, m...)
return
}
}
t.batches = append(t.batches, batch{
cid: d.CID,
treeID: treeID,
ch: []chan error{ch},
m: m,
})
}
func (t *boltForest) trigger() {
t.mtx.Lock()
for i := range t.batches {
t.batchesCh <- t.batches[i]
}
t.batches = t.batches[:0]
t.mtx.Unlock()
}
func (t *boltForest) applier() {
for b := range t.batchesCh {
sort.Slice(b.m, func(i, j int) bool {
return b.m[i].Time < b.m[j].Time
})
err := t.db.Batch(func(tx *bbolt.Tx) error {
bLog, bTree, err := t.getTreeBuckets(tx, b.cid, b.treeID)
if err != nil {
return err
}
var lm LogMove
return t.applyOperation(bLog, bTree, b.m, &lm)
})
for i := range b.ch {
b.ch[i] <- err
}
}
}
func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) {
treeRoot := bucketName(cid, treeID)
child, err := tx.CreateBucket(treeRoot)
@ -243,7 +326,8 @@ func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string)
return bLog, bData, nil
}
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *LogMove) error {
// applyOperations applies log operations. Assumes lm are sorted by timestamp.
func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []Move, lm *LogMove) error {
var tmp LogMove
var cKey [17]byte
@ -255,7 +339,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log
r := io.NewBinReaderFromIO(b)
// 1. Undo up until the desired timestamp is here.
for len(key) == 8 && binary.BigEndian.Uint64(key) > lm.Time {
for len(key) == 8 && binary.BigEndian.Uint64(key) > ms[0].Time {
b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
return err
@ -266,27 +350,34 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log
key, value = c.Prev()
}
// 2. Insert the operation.
if len(key) != 8 || binary.BigEndian.Uint64(key) != lm.Time {
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
return err
}
}
key, value = c.Next()
// 3. Re-apply all other operations.
for len(key) == 8 {
b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
return err
}
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil {
return err
var i int
for {
// 2. Insert the operation.
if len(key) != 8 || binary.BigEndian.Uint64(key) != ms[i].Time {
lm.Move = ms[i]
if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil {
return err
}
}
key, value = c.Next()
}
return nil
i++
// 3. Re-apply all other operations.
for len(key) == 8 && (i == len(ms) || binary.BigEndian.Uint64(key) < ms[i].Time) {
b.Reset(value)
if err := t.logFromBytes(&tmp, r); err != nil {
return err
}
if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil {
return err
}
key, value = c.Next()
}
if i == len(ms) {
return nil
}
}
}
func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error {

View file

@ -89,7 +89,7 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string
}
// TreeApply implements the Forest interface.
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move) error {
func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op []Move) error {
if !d.checkValid() {
return ErrInvalidCIDDescriptor
}
@ -101,7 +101,14 @@ func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move) error
f.treeMap[fullID] = s
}
return s.Apply(op)
for i := range op {
err := s.Apply(&op[i])
if err != nil {
return err
}
}
return nil
}
func (f *memoryForest) Init() error {

View file

@ -329,20 +329,20 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) {
t.Run("invalid descriptor", func(t *testing.T) {
s := constructor(t)
err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{
err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, []Move{{
Child: 10,
Parent: 0,
Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}},
})
}})
require.ErrorIs(t, err, ErrInvalidCIDDescriptor)
})
testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) {
require.NoError(t, s.TreeApply(d, treeID, &Move{
require.NoError(t, s.TreeApply(d, treeID, []Move{{
Child: child,
Parent: parent,
Meta: meta,
}))
}}))
}
t.Run("add a child, then insert a parent removal", func(t *testing.T) {
@ -404,7 +404,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest)
})
for i := range logs {
require.NoError(t, s.TreeApply(d, treeID, &logs[i]))
require.NoError(t, s.TreeApply(d, treeID, logs[i:i+1]))
}
testGetOpLog := func(t *testing.T, height uint64, m Move) {
@ -483,7 +483,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
rand.Read(ops[i].Meta.Items[1].Value)
}
for i := range ops {
require.NoError(t, expected.TreeApply(d, treeID, &ops[i]))
require.NoError(t, expected.TreeApply(d, treeID, ops[i:i+1]))
}
for i := 0; i < iterCount; i++ {
@ -492,7 +492,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore
actual := constructor(t)
for i := range ops {
require.NoError(t, actual.TreeApply(d, treeID, &ops[i]))
require.NoError(t, actual.TreeApply(d, treeID, ops[i:i+1]))
}
for i := uint64(0); i < nodeCount; i++ {
expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i)
@ -517,20 +517,24 @@ func BenchmarkApplySequential(b *testing.B) {
continue
}
b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
Parent: uint64(rand.Intn(benchNodeCount)),
Meta: Meta{
Time: Timestamp(i),
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
},
Child: uint64(rand.Intn(benchNodeCount)),
}
}
return ops
})
for _, bs := range []int{1, 2, 4} {
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), bs, func(opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
Parent: uint64(rand.Intn(benchNodeCount)),
Meta: Meta{
Time: Timestamp(i),
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
},
Child: uint64(rand.Intn(benchNodeCount)),
}
}
return ops
})
})
}
})
}
}
@ -545,48 +549,61 @@ func BenchmarkApplyReorderLast(b *testing.B) {
continue
}
b.Run(providers[i].name, func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
Parent: uint64(rand.Intn(benchNodeCount)),
Meta: Meta{
Time: Timestamp(i),
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
},
Child: uint64(rand.Intn(benchNodeCount)),
}
if i != 0 && i%blockSize == 0 {
for j := 0; j < blockSize/2; j++ {
ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j]
for _, bs := range []int{1, 2, 4} {
b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) {
benchmarkApply(b, providers[i].construct(b), bs, func(opCount int) []Move {
ops := make([]Move, opCount)
for i := range ops {
ops[i] = Move{
Parent: uint64(rand.Intn(benchNodeCount)),
Meta: Meta{
Time: Timestamp(i),
Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}},
},
Child: uint64(rand.Intn(benchNodeCount)),
}
if i != 0 && i%blockSize == 0 {
for j := 0; j < blockSize/2; j++ {
ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j]
}
}
}
}
}
return ops
})
return ops
})
})
}
})
}
}
func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) {
func benchmarkApply(b *testing.B, s Forest, batchSize int, genFunc func(int) []Move) {
rand.Seed(42)
ops := genFunc(b.N)
cid := cidtest.ID()
d := CIDDescriptor{cid, 0, 1}
treeID := "version"
ch := make(chan *Move, b.N)
for i := range ops {
ch <- &ops[i]
ch := make(chan int, b.N)
for i := 0; i < b.N; i++ {
ch <- i
}
b.ResetTimer()
b.ReportAllocs()
b.SetParallelism(50)
b.SetParallelism(20)
b.RunParallel(func(pb *testing.PB) {
batch := make([]Move, 0, batchSize)
for pb.Next() {
op := <-ch
if err := s.TreeApply(d, treeID, op); err != nil {
batch = append(batch, ops[<-ch])
if len(batch) == batchSize {
if err := s.TreeApply(d, treeID, batch); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
batch = batch[:0]
}
}
if len(batch) > 0 {
if err := s.TreeApply(d, treeID, batch); err != nil {
b.Fatalf("error in `Apply`: %v", err)
}
}
@ -662,12 +679,12 @@ func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor
items = append(items, KeyValue{AttributeVersion, []byte(version)})
}
require.NoError(t, s.TreeApply(d, treeID, &Move{
require.NoError(t, s.TreeApply(d, treeID, []Move{{
Parent: parent,
Child: node,
Meta: Meta{
Time: uint64(ts),
Items: items,
},
}))
}}))
}

View file

@ -17,7 +17,7 @@ type Forest interface {
// Internal nodes in path should have exactly one attribute, otherwise a new node is created.
TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error)
// TreeApply applies replicated operation from another node.
TreeApply(d CIDDescriptor, treeID string, m *Move) error
TreeApply(d CIDDescriptor, treeID string, m []Move) error
// TreeGetByPath returns all nodes corresponding to the path.
// The path is constructed by descending from the root using the values of the
// AttributeFilename in meta.

View file

@ -24,7 +24,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri
}
// TreeApply implements the pilorama.Forest interface.
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error {
func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m []pilorama.Move) error {
if s.GetMode() == ModeReadOnly {
return ErrReadOnlyMode
}

View file

@ -456,11 +456,11 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e
d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size}
resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}}
return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{
return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), []pilorama.Move{{
Parent: op.GetParentId(),
Child: op.GetChildId(),
Meta: meta,
})
}})
}
func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error {

View file

@ -89,7 +89,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri
return newHeight, err
}
d := pilorama.CIDDescriptor{CID: cid}
if err := s.forest.TreeApply(d, treeID, m); err != nil {
if err := s.forest.TreeApply(d, treeID, []pilorama.Move{*m}); err != nil {
return newHeight, err
}
if m.Time > newHeight {