diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 891806fdd..6215334f1 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -51,7 +51,7 @@ func (e *StorageEngine) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, a } // TreeApply implements the pilorama.Forest interface. -func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error { +func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m []pilorama.Move) error { var err error for _, sh := range e.sortShardsByWeight(d.CID) { err = sh.TreeApply(d, treeID, m) diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 0e35d86ca..d1d761420 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -7,6 +7,8 @@ import ( "math/rand" "os" "path/filepath" + "sort" + "sync" "time" "github.com/nspcc-dev/neo-go/pkg/io" @@ -17,10 +19,22 @@ import ( type boltForest struct { db *bbolt.DB + + // mtx protects batches field. + mtx sync.Mutex + batches []batch + batchesCh chan batch + closeCh chan struct{} + cfg } -const defaultMaxBatchSize = 10 +type batch struct { + cid cidSDK.ID + treeID string + ch []chan error + m []Move +} var ( dataBucket = []byte{0} @@ -60,7 +74,30 @@ func NewBoltForest(opts ...Option) ForestStorage { return &b } -func (t *boltForest) Init() error { return nil } +func (t *boltForest) Init() error { + t.closeCh = make(chan struct{}) + + batchWorkersCount := t.maxBatchSize + + t.batchesCh = make(chan batch, batchWorkersCount) + go func() { + tick := time.NewTicker(time.Millisecond * 20) + defer tick.Stop() + for { + select { + case <-t.closeCh: + return + case <-tick.C: + t.trigger() + } + } + }() + for i := 0; i < batchWorkersCount; i++ { + go t.applier() + } + return nil +} + func (t *boltForest) Open() error { err := util.MkdirAllX(filepath.Dir(t.path), t.perm) if err != nil { @@ -91,7 +128,13 @@ func (t *boltForest) Open() error { return nil }) } -func (t *boltForest) Close() error { return t.db.Close() } +func (t *boltForest) Close() error { + if t.closeCh != nil { + close(t.closeCh) + t.closeCh = nil + } + return t.db.Close() +} // TreeMove implements the Forest interface. func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) { @@ -110,8 +153,7 @@ func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove if m.Child == RootID { m.Child = t.findSpareID(bTree) } - lm.Move = *m - return t.applyOperation(bLog, bTree, &lm) + return t.applyOperation(bLog, bTree, []Move{*m}, &lm) }) } @@ -203,22 +245,63 @@ func (t *boltForest) findSpareID(bTree *bbolt.Bucket) uint64 { } // TreeApply implements the Forest interface. -func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m *Move) error { +func (t *boltForest) TreeApply(d CIDDescriptor, treeID string, m []Move) error { if !d.checkValid() { return ErrInvalidCIDDescriptor } - return t.db.Batch(func(tx *bbolt.Tx) error { - bLog, bTree, err := t.getTreeBuckets(tx, d.CID, treeID) - if err != nil { - return err - } + ch := make(chan error, 1) + t.addBatch(d, treeID, m, ch) + return <-ch +} - lm := &LogMove{Move: *m} - return t.applyOperation(bLog, bTree, lm) +func (t *boltForest) addBatch(d CIDDescriptor, treeID string, m []Move, ch chan error) { + t.mtx.Lock() + defer t.mtx.Unlock() + for i := range t.batches { + if t.batches[i].cid.Equals(d.CID) && t.batches[i].treeID == treeID { + t.batches[i].ch = append(t.batches[i].ch, ch) + t.batches[i].m = append(t.batches[i].m, m...) + return + } + } + t.batches = append(t.batches, batch{ + cid: d.CID, + treeID: treeID, + ch: []chan error{ch}, + m: m, }) } +func (t *boltForest) trigger() { + t.mtx.Lock() + for i := range t.batches { + t.batchesCh <- t.batches[i] + } + t.batches = t.batches[:0] + t.mtx.Unlock() +} + +func (t *boltForest) applier() { + for b := range t.batchesCh { + sort.Slice(b.m, func(i, j int) bool { + return b.m[i].Time < b.m[j].Time + }) + err := t.db.Batch(func(tx *bbolt.Tx) error { + bLog, bTree, err := t.getTreeBuckets(tx, b.cid, b.treeID) + if err != nil { + return err + } + + var lm LogMove + return t.applyOperation(bLog, bTree, b.m, &lm) + }) + for i := range b.ch { + b.ch[i] <- err + } + } +} + func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) (*bbolt.Bucket, *bbolt.Bucket, error) { treeRoot := bucketName(cid, treeID) child, err := tx.CreateBucket(treeRoot) @@ -243,7 +326,8 @@ func (t *boltForest) getTreeBuckets(tx *bbolt.Tx, cid cidSDK.ID, treeID string) return bLog, bData, nil } -func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *LogMove) error { +// applyOperations applies log operations. Assumes lm are sorted by timestamp. +func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []Move, lm *LogMove) error { var tmp LogMove var cKey [17]byte @@ -255,7 +339,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log r := io.NewBinReaderFromIO(b) // 1. Undo up until the desired timestamp is here. - for len(key) == 8 && binary.BigEndian.Uint64(key) > lm.Time { + for len(key) == 8 && binary.BigEndian.Uint64(key) > ms[0].Time { b.Reset(value) if err := t.logFromBytes(&tmp, r); err != nil { return err @@ -266,27 +350,34 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, lm *Log key, value = c.Prev() } - // 2. Insert the operation. - if len(key) != 8 || binary.BigEndian.Uint64(key) != lm.Time { - if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil { - return err - } - } - key, value = c.Next() - - // 3. Re-apply all other operations. - for len(key) == 8 { - b.Reset(value) - if err := t.logFromBytes(&tmp, r); err != nil { - return err - } - if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil { - return err + var i int + for { + // 2. Insert the operation. + if len(key) != 8 || binary.BigEndian.Uint64(key) != ms[i].Time { + lm.Move = ms[i] + if err := t.do(logBucket, treeBucket, cKey[:], lm); err != nil { + return err + } } key, value = c.Next() - } - return nil + i++ + + // 3. Re-apply all other operations. + for len(key) == 8 && (i == len(ms) || binary.BigEndian.Uint64(key) < ms[i].Time) { + b.Reset(value) + if err := t.logFromBytes(&tmp, r); err != nil { + return err + } + if err := t.do(logBucket, treeBucket, cKey[:], &tmp); err != nil { + return err + } + key, value = c.Next() + } + if i == len(ms) { + return nil + } + } } func (t *boltForest) do(lb *bbolt.Bucket, b *bbolt.Bucket, key []byte, op *LogMove) error { diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index d5f674ae4..2f2a8a1e1 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -89,7 +89,7 @@ func (f *memoryForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string } // TreeApply implements the Forest interface. -func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move) error { +func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op []Move) error { if !d.checkValid() { return ErrInvalidCIDDescriptor } @@ -101,7 +101,14 @@ func (f *memoryForest) TreeApply(d CIDDescriptor, treeID string, op *Move) error f.treeMap[fullID] = s } - return s.Apply(op) + for i := range op { + err := s.Apply(&op[i]) + if err != nil { + return err + } + } + + return nil } func (f *memoryForest) Init() error { diff --git a/pkg/local_object_storage/pilorama/forest_test.go b/pkg/local_object_storage/pilorama/forest_test.go index 01f16573e..0adcdbe21 100644 --- a/pkg/local_object_storage/pilorama/forest_test.go +++ b/pkg/local_object_storage/pilorama/forest_test.go @@ -329,20 +329,20 @@ func testForestTreeApply(t *testing.T, constructor func(t testing.TB) Forest) { t.Run("invalid descriptor", func(t *testing.T) { s := constructor(t) - err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, &Move{ + err := s.TreeApply(CIDDescriptor{cid, 0, 0}, treeID, []Move{{ Child: 10, Parent: 0, Meta: Meta{Time: 1, Items: []KeyValue{{"grand", []byte{1}}}}, - }) + }}) require.ErrorIs(t, err, ErrInvalidCIDDescriptor) }) testApply := func(t *testing.T, s Forest, child, parent Node, meta Meta) { - require.NoError(t, s.TreeApply(d, treeID, &Move{ + require.NoError(t, s.TreeApply(d, treeID, []Move{{ Child: child, Parent: parent, Meta: meta, - })) + }})) } t.Run("add a child, then insert a parent removal", func(t *testing.T) { @@ -404,7 +404,7 @@ func testForestTreeGetOpLog(t *testing.T, constructor func(t testing.TB) Forest) }) for i := range logs { - require.NoError(t, s.TreeApply(d, treeID, &logs[i])) + require.NoError(t, s.TreeApply(d, treeID, logs[i:i+1])) } testGetOpLog := func(t *testing.T, height uint64, m Move) { @@ -483,7 +483,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore rand.Read(ops[i].Meta.Items[1].Value) } for i := range ops { - require.NoError(t, expected.TreeApply(d, treeID, &ops[i])) + require.NoError(t, expected.TreeApply(d, treeID, ops[i:i+1])) } for i := 0; i < iterCount; i++ { @@ -492,7 +492,7 @@ func testForestTreeApplyRandom(t *testing.T, constructor func(t testing.TB) Fore actual := constructor(t) for i := range ops { - require.NoError(t, actual.TreeApply(d, treeID, &ops[i])) + require.NoError(t, actual.TreeApply(d, treeID, ops[i:i+1])) } for i := uint64(0); i < nodeCount; i++ { expectedMeta, expectedParent, err := expected.TreeGetMeta(cid, treeID, i) @@ -517,20 +517,24 @@ func BenchmarkApplySequential(b *testing.B) { continue } b.Run(providers[i].name, func(b *testing.B) { - benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { - ops := make([]Move, opCount) - for i := range ops { - ops[i] = Move{ - Parent: uint64(rand.Intn(benchNodeCount)), - Meta: Meta{ - Time: Timestamp(i), - Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, - }, - Child: uint64(rand.Intn(benchNodeCount)), - } - } - return ops - }) + for _, bs := range []int{1, 2, 4} { + b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) { + benchmarkApply(b, providers[i].construct(b), bs, func(opCount int) []Move { + ops := make([]Move, opCount) + for i := range ops { + ops[i] = Move{ + Parent: uint64(rand.Intn(benchNodeCount)), + Meta: Meta{ + Time: Timestamp(i), + Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, + }, + Child: uint64(rand.Intn(benchNodeCount)), + } + } + return ops + }) + }) + } }) } } @@ -545,48 +549,61 @@ func BenchmarkApplyReorderLast(b *testing.B) { continue } b.Run(providers[i].name, func(b *testing.B) { - benchmarkApply(b, providers[i].construct(b), func(opCount int) []Move { - ops := make([]Move, opCount) - for i := range ops { - ops[i] = Move{ - Parent: uint64(rand.Intn(benchNodeCount)), - Meta: Meta{ - Time: Timestamp(i), - Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, - }, - Child: uint64(rand.Intn(benchNodeCount)), - } - if i != 0 && i%blockSize == 0 { - for j := 0; j < blockSize/2; j++ { - ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j] + for _, bs := range []int{1, 2, 4} { + b.Run("batchsize="+strconv.Itoa(bs), func(b *testing.B) { + benchmarkApply(b, providers[i].construct(b), bs, func(opCount int) []Move { + ops := make([]Move, opCount) + for i := range ops { + ops[i] = Move{ + Parent: uint64(rand.Intn(benchNodeCount)), + Meta: Meta{ + Time: Timestamp(i), + Items: []KeyValue{{Value: []byte{0, 1, 2, 3, 4}}}, + }, + Child: uint64(rand.Intn(benchNodeCount)), + } + if i != 0 && i%blockSize == 0 { + for j := 0; j < blockSize/2; j++ { + ops[i-j], ops[i+j-blockSize] = ops[i+j-blockSize], ops[i-j] + } + } } - } - } - return ops - }) + return ops + }) + }) + } }) } } -func benchmarkApply(b *testing.B, s Forest, genFunc func(int) []Move) { +func benchmarkApply(b *testing.B, s Forest, batchSize int, genFunc func(int) []Move) { rand.Seed(42) ops := genFunc(b.N) cid := cidtest.ID() d := CIDDescriptor{cid, 0, 1} treeID := "version" - ch := make(chan *Move, b.N) - for i := range ops { - ch <- &ops[i] + ch := make(chan int, b.N) + for i := 0; i < b.N; i++ { + ch <- i } b.ResetTimer() b.ReportAllocs() - b.SetParallelism(50) + b.SetParallelism(20) b.RunParallel(func(pb *testing.PB) { + batch := make([]Move, 0, batchSize) for pb.Next() { - op := <-ch - if err := s.TreeApply(d, treeID, op); err != nil { + batch = append(batch, ops[<-ch]) + if len(batch) == batchSize { + if err := s.TreeApply(d, treeID, batch); err != nil { + b.Fatalf("error in `Apply`: %v", err) + } + batch = batch[:0] + } + } + if len(batch) > 0 { + if err := s.TreeApply(d, treeID, batch); err != nil { b.Fatalf("error in `Apply`: %v", err) } } @@ -662,12 +679,12 @@ func testMove(t *testing.T, s Forest, ts int, node, parent Node, d CIDDescriptor items = append(items, KeyValue{AttributeVersion, []byte(version)}) } - require.NoError(t, s.TreeApply(d, treeID, &Move{ + require.NoError(t, s.TreeApply(d, treeID, []Move{{ Parent: parent, Child: node, Meta: Meta{ Time: uint64(ts), Items: items, }, - })) + }})) } diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index 6d685e951..9b1e08d5f 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -17,7 +17,7 @@ type Forest interface { // Internal nodes in path should have exactly one attribute, otherwise a new node is created. TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]LogMove, error) // TreeApply applies replicated operation from another node. - TreeApply(d CIDDescriptor, treeID string, m *Move) error + TreeApply(d CIDDescriptor, treeID string, m []Move) error // TreeGetByPath returns all nodes corresponding to the path. // The path is constructed by descending from the root using the values of the // AttributeFilename in meta. diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index e6d7c85a1..5ec08d6b4 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -24,7 +24,7 @@ func (s *Shard) TreeAddByPath(d pilorama.CIDDescriptor, treeID string, attr stri } // TreeApply implements the pilorama.Forest interface. -func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pilorama.Move) error { +func (s *Shard) TreeApply(d pilorama.CIDDescriptor, treeID string, m []pilorama.Move) error { if s.GetMode() == ModeReadOnly { return ErrReadOnlyMode } diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 9b6b5e544..3d08be10c 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -456,11 +456,11 @@ func (s *Service) Apply(_ context.Context, req *ApplyRequest) (*ApplyResponse, e d := pilorama.CIDDescriptor{CID: cid, Position: pos, Size: size} resp := &ApplyResponse{Body: &ApplyResponse_Body{}, Signature: &Signature{}} - return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), &pilorama.Move{ + return resp, s.forest.TreeApply(d, req.GetBody().GetTreeId(), []pilorama.Move{{ Parent: op.GetParentId(), Child: op.GetChildId(), Meta: meta, - }) + }}) } func (s *Service) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index e9a7d1899..eb399aed7 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -89,7 +89,7 @@ func (s *Service) synchronizeSingle(ctx context.Context, cid cid.ID, treeID stri return newHeight, err } d := pilorama.CIDDescriptor{CID: cid} - if err := s.forest.TreeApply(d, treeID, m); err != nil { + if err := s.forest.TreeApply(d, treeID, []pilorama.Move{*m}); err != nil { return newHeight, err } if m.Time > newHeight {