[#9999] tree: Add ApplyBatch method
Some checks failed
Tests and linters / Run gofumpt (pull_request) Successful in 1m26s
DCO action / DCO (pull_request) Successful in 1m38s
Vulncheck / Vulncheck (pull_request) Successful in 2m10s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m21s
Build / Build Components (pull_request) Successful in 2m35s
Tests and linters / Lint (pull_request) Failing after 2m35s
Tests and linters / gopls check (pull_request) Successful in 2m50s
Tests and linters / Staticcheck (pull_request) Successful in 2m58s
Tests and linters / Tests (pull_request) Successful in 4m18s
Tests and linters / Tests with -race (pull_request) Successful in 5m58s

Concurrent Apply can lead to child node applies before parent, so
undo/redo operations will perform. This leads to performance degradation
in case of tree with many sublevels.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-10-25 11:13:33 +03:00
parent bc8d79ddf9
commit 87ce7722c7
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
7 changed files with 168 additions and 23 deletions

View file

@ -110,6 +110,34 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str
return nil return nil
} }
// TreeApply implements the pilorama.Forest interface.
func (e *StorageEngine) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApplyBatch",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
index, lst, err := e.getTreeShard(ctx, cnr, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return err
}
err = lst[index].TreeApplyBatch(ctx, cnr, treeID, m)
if err != nil {
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't perform `TreeApplyBatch`", err,
zap.Stringer("cid", cnr),
zap.String("tree", treeID),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err
}
return nil
}
// TreeGetByPath implements the pilorama.Forest interface. // TreeGetByPath implements the pilorama.Forest interface.
func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath", ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath",

View file

@ -2,8 +2,10 @@ package pilorama
import ( import (
"encoding/binary" "encoding/binary"
"fmt"
"slices" "slices"
"sort" "sort"
"strconv"
"sync" "sync"
"time" "time"
@ -35,6 +37,10 @@ func (b *batch) trigger() {
} }
func (b *batch) run() { func (b *batch) run() {
fmt.Printf("batch.run started %s\n", time.Now().Format(time.RFC3339))
defer func() {
fmt.Printf("batch.run completed %s\n", time.Now().Format(time.RFC3339))
}()
fullID := bucketName(b.cid, b.treeID) fullID := bucketName(b.cid, b.treeID)
err := b.forest.db.Update(func(tx *bbolt.Tx) error { err := b.forest.db.Update(func(tx *bbolt.Tx) error {
bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID) bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID)
@ -71,12 +77,14 @@ func (b *batch) run() {
_, _, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], b.operations[i].Child)) _, _, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], b.operations[i].Child))
if inTree { if inTree {
slow = true slow = true
fmt.Println("node in tree, slow = true")
break break
} }
key := childrenKey(cKey[:], b.operations[i].Child, 0) key := childrenKey(cKey[:], b.operations[i].Child, 0)
k, _ := bTree.Cursor().Seek(key) k, _ := bTree.Cursor().Seek(key)
if len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == b.operations[i].Child { if len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == b.operations[i].Child {
fmt.Println("node is parent, slow = true")
slow = true slow = true
break break
} }
@ -94,6 +102,7 @@ func (b *batch) run() {
p := b.operations[i].Parent p := b.operations[i].Parent
_, ts, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], p)) _, ts, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], p))
if !inTree || b.operations[0].Time < ts { if !inTree || b.operations[0].Time < ts {
fmt.Printf("parent in tree: %s, b.operations[0].Time: %d, ts: %d\n", strconv.FormatBool(inTree), b.operations[0].Time, ts)
slow = true slow = true
break break
} }

View file

@ -559,6 +559,82 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
return metaerr.Wrap(err) return metaerr.Wrap(err)
} }
func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch",
trace.WithAttributes(
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
m, err := t.filterSeen(cnr, treeID, m)
if err != nil {
return err
}
if len(m) == 0 {
success = true
return nil
}
ch := make(chan error)
b := &batch{
forest: t,
cid: cnr,
treeID: treeID,
results: []chan<- error{ch},
operations: m,
}
go func() {
b.run()
}()
err = <-ch
success = err == nil
return metaerr.Wrap(err)
}
func (t *boltForest) filterSeen(cnr cidSDK.ID, treeID string, m []*Move) ([]*Move, error) {
t.modeMtx.RLock()
defer t.modeMtx.RUnlock()
if t.mode.NoMetabase() {
return nil, ErrDegradedMode
}
ops := make([]*Move, 0, len(m))
for _, op := range m {
var seen bool
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(bucketName(cnr, treeID))
if treeRoot == nil {
return nil
}
b := treeRoot.Bucket(logBucket)
var logKey [8]byte
binary.BigEndian.PutUint64(logKey[:], op.Time)
seen = b.Get(logKey[:]) != nil
return nil
})
if err != nil {
return nil, metaerr.Wrap(err)
}
if !seen {
ops = append(ops, op)
}
}
return ops, nil
}
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed. // TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error { func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
var ( var (
@ -689,6 +765,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
b := bytes.NewReader(nil) b := bytes.NewReader(nil)
r := io.NewBinReaderFromIO(b) r := io.NewBinReaderFromIO(b)
fmt.Printf("batch.run.undo started %s, time %d\n", time.Now().Format(time.RFC3339), ms[0].Time)
// 1. Undo up until the desired timestamp is here. // 1. Undo up until the desired timestamp is here.
for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) { for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
b.Reset(value) b.Reset(value)
@ -705,6 +782,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
key, value = c.Prev() key, value = c.Prev()
} }
fmt.Printf("batch.run.insert/redo started %s\n", time.Now().Format(time.RFC3339))
for i := range len(ms) { for i := range len(ms) {
// Loop invariant: key represents the next stored timestamp after ms[i].Time. // Loop invariant: key represents the next stored timestamp after ms[i].Time.

View file

@ -112,6 +112,15 @@ func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, o
return s.Apply(op) return s.Apply(op)
} }
func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID string, ops []*Move) error {
for _, op := range ops {
if err := f.TreeApply(ctx, cnr, treeID, op, true); err != nil {
return err
}
}
return nil
}
func (f *memoryForest) Init() error { func (f *memoryForest) Init() error {
return nil return nil
} }

View file

@ -21,6 +21,8 @@ type Forest interface {
// TreeApply applies replicated operation from another node. // TreeApply applies replicated operation from another node.
// If background is true, TreeApply will first check whether an operation exists. // If background is true, TreeApply will first check whether an operation exists.
TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
// TreeApplyBatch applies replicated operations from another node.
TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error
// TreeGetByPath returns all nodes corresponding to the path. // TreeGetByPath returns all nodes corresponding to the path.
// The path is constructed by descending from the root using the values of the // The path is constructed by descending from the root using the values of the
// AttributeFilename in meta. // AttributeFilename in meta.

View file

@ -106,6 +106,33 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync) return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
} }
// TreeApply implements the pilorama.Forest interface.
func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyBatch",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("container_id", cnr.EncodeToString()),
attribute.String("tree_id", treeID),
),
)
defer span.End()
if s.pilorama == nil {
return ErrPiloramaDisabled
}
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.ReadOnly() {
return ErrReadOnlyMode
}
if s.info.Mode.NoMetabase() {
return ErrDegradedMode
}
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
}
// TreeGetByPath implements the pilorama.Forest interface. // TreeGetByPath implements the pilorama.Forest interface.
func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath", ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath",

View file

@ -177,37 +177,29 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string, func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
operationStream <-chan *pilorama.Move, operationStream <-chan *pilorama.Move,
) uint64 { ) uint64 {
errGroup, _ := errgroup.WithContext(ctx)
const workersCount = 1024
errGroup.SetLimit(workersCount)
// We run TreeApply concurrently for the operation batch. Let's consider two operations
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
// on m1. That means the service must start sync from m1.Time in the next iteration and
// this height is stored in unappliedOperationHeight.
var unappliedOperationHeight uint64 = math.MaxUint64
var heightMtx sync.Mutex
var prev *pilorama.Move var prev *pilorama.Move
var batch []*pilorama.Move
for m := range operationStream { for m := range operationStream {
// skip already applied op // skip already applied op
if prev != nil && prev.Time == m.Time { if prev != nil && prev.Time == m.Time {
continue continue
} }
prev = m prev = m
batch = append(batch, m)
errGroup.Go(func() error { if len(batch) == 1000 {
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil { if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
heightMtx.Lock() return batch[0].Time
unappliedOperationHeight = min(unappliedOperationHeight, m.Time)
heightMtx.Unlock()
return err
} }
return nil batch = batch[:0]
}) }
} }
_ = errGroup.Wait() if len(batch) > 0 {
return unappliedOperationHeight if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
return batch[0].Time
}
}
return math.MaxUint64
} }
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
@ -384,7 +376,7 @@ func (s *Service) syncLoop(ctx context.Context) {
return return
case <-s.syncChan: case <-s.syncChan:
ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.sync") ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.sync")
s.log.Debug(logs.TreeSyncingTrees) s.log.Info(logs.TreeSyncingTrees)
start := time.Now() start := time.Now()
@ -402,7 +394,7 @@ func (s *Service) syncLoop(ctx context.Context) {
s.removeContainers(ctx, newMap) s.removeContainers(ctx, newMap)
s.log.Debug(logs.TreeTreesHaveBeenSynchronized) s.log.Info(logs.TreeTreesHaveBeenSynchronized)
s.metrics.AddSyncDuration(time.Since(start), true) s.metrics.AddSyncDuration(time.Since(start), true)
span.End() span.End()