[#9999] tree: Add ApplyBatch method
Some checks failed
DCO action / DCO (pull_request) Successful in 1m6s
Tests and linters / Run gofumpt (pull_request) Successful in 1m27s
Tests and linters / Lint (pull_request) Failing after 2m17s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m19s
Vulncheck / Vulncheck (pull_request) Successful in 2m12s
Build / Build Components (pull_request) Successful in 2m28s
Tests and linters / gopls check (pull_request) Successful in 2m48s
Tests and linters / Staticcheck (pull_request) Successful in 2m58s
Tests and linters / Tests (pull_request) Successful in 4m19s
Tests and linters / Tests with -race (pull_request) Successful in 6m6s
Some checks failed
DCO action / DCO (pull_request) Successful in 1m6s
Tests and linters / Run gofumpt (pull_request) Successful in 1m27s
Tests and linters / Lint (pull_request) Failing after 2m17s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m19s
Vulncheck / Vulncheck (pull_request) Successful in 2m12s
Build / Build Components (pull_request) Successful in 2m28s
Tests and linters / gopls check (pull_request) Successful in 2m48s
Tests and linters / Staticcheck (pull_request) Successful in 2m58s
Tests and linters / Tests (pull_request) Successful in 4m19s
Tests and linters / Tests with -race (pull_request) Successful in 6m6s
Concurrent Apply can lead to child node applies before parent, so undo/redo operations will perform. This leads to performance degradation in case of tree with many sublevels. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
bc8d79ddf9
commit
5dd4f0afd7
7 changed files with 121 additions and 21 deletions
|
@ -110,6 +110,34 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str
|
|||
return nil
|
||||
}
|
||||
|
||||
// TreeApply implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApplyBatch",
|
||||
trace.WithAttributes(
|
||||
attribute.String("container_id", cnr.EncodeToString()),
|
||||
attribute.String("tree_id", treeID),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
index, lst, err := e.getTreeShard(ctx, cnr, treeID)
|
||||
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
err = lst[index].TreeApplyBatch(ctx, cnr, treeID, m)
|
||||
if err != nil {
|
||||
if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
|
||||
e.reportShardError(lst[index], "can't perform `TreeApplyBatch`", err,
|
||||
zap.Stringer("cid", cnr),
|
||||
zap.String("tree", treeID),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TreeGetByPath implements the pilorama.Forest interface.
|
||||
func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath",
|
||||
|
|
|
@ -2,8 +2,10 @@ package pilorama
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -71,12 +73,14 @@ func (b *batch) run() {
|
|||
_, _, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], b.operations[i].Child))
|
||||
if inTree {
|
||||
slow = true
|
||||
fmt.Println("node in tree, slow = true")
|
||||
break
|
||||
}
|
||||
|
||||
key := childrenKey(cKey[:], b.operations[i].Child, 0)
|
||||
k, _ := bTree.Cursor().Seek(key)
|
||||
if len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == b.operations[i].Child {
|
||||
fmt.Println("node is parent, slow = true")
|
||||
slow = true
|
||||
break
|
||||
}
|
||||
|
@ -94,6 +98,7 @@ func (b *batch) run() {
|
|||
p := b.operations[i].Parent
|
||||
_, ts, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], p))
|
||||
if !inTree || b.operations[0].Time < ts {
|
||||
fmt.Printf("parent in tree: %s, b.operations[0].Time: %d, ts: %d\n", strconv.FormatBool(inTree), b.operations[0].Time, ts)
|
||||
slow = true
|
||||
break
|
||||
}
|
||||
|
|
|
@ -559,6 +559,42 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string
|
|||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success)
|
||||
}()
|
||||
|
||||
_, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch",
|
||||
trace.WithAttributes(
|
||||
attribute.String("container_id", cnr.EncodeToString()),
|
||||
attribute.String("tree_id", treeID),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
if len(m) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ch := make(chan error)
|
||||
b := &batch{
|
||||
forest: t,
|
||||
cid: cnr,
|
||||
treeID: treeID,
|
||||
results: []chan<- error{ch},
|
||||
operations: m,
|
||||
}
|
||||
go func() {
|
||||
b.run()
|
||||
}()
|
||||
err := <-ch
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed.
|
||||
func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error {
|
||||
var (
|
||||
|
@ -692,6 +728,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M
|
|||
// 1. Undo up until the desired timestamp is here.
|
||||
for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) {
|
||||
b.Reset(value)
|
||||
fmt.Printf("applyOperation: time %d\n", binary.BigEndian.Uint64(key))
|
||||
|
||||
tmp.Child = r.ReadU64LE()
|
||||
tmp.Parent = r.ReadU64LE()
|
||||
|
|
|
@ -112,6 +112,15 @@ func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, o
|
|||
return s.Apply(op)
|
||||
}
|
||||
|
||||
func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID string, ops []*Move) error {
|
||||
for _, op := range ops {
|
||||
if err := f.TreeApply(ctx, cnr, treeID, op, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *memoryForest) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ type Forest interface {
|
|||
// TreeApply applies replicated operation from another node.
|
||||
// If background is true, TreeApply will first check whether an operation exists.
|
||||
TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error
|
||||
// TreeApplyBatch applies replicated operations from another node.
|
||||
TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error
|
||||
// TreeGetByPath returns all nodes corresponding to the path.
|
||||
// The path is constructed by descending from the root using the values of the
|
||||
// AttributeFilename in meta.
|
||||
|
|
|
@ -106,6 +106,33 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *
|
|||
return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync)
|
||||
}
|
||||
|
||||
// TreeApply implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyBatch",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.String("container_id", cnr.EncodeToString()),
|
||||
attribute.String("tree_id", treeID),
|
||||
),
|
||||
)
|
||||
defer span.End()
|
||||
|
||||
if s.pilorama == nil {
|
||||
return ErrPiloramaDisabled
|
||||
}
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
if s.info.Mode.ReadOnly() {
|
||||
return ErrReadOnlyMode
|
||||
}
|
||||
if s.info.Mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m)
|
||||
}
|
||||
|
||||
// TreeGetByPath implements the pilorama.Forest interface.
|
||||
func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath",
|
||||
|
|
|
@ -177,37 +177,29 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram
|
|||
func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
operationStream <-chan *pilorama.Move,
|
||||
) uint64 {
|
||||
errGroup, _ := errgroup.WithContext(ctx)
|
||||
const workersCount = 1024
|
||||
errGroup.SetLimit(workersCount)
|
||||
|
||||
// We run TreeApply concurrently for the operation batch. Let's consider two operations
|
||||
// in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail
|
||||
// on m1. That means the service must start sync from m1.Time in the next iteration and
|
||||
// this height is stored in unappliedOperationHeight.
|
||||
var unappliedOperationHeight uint64 = math.MaxUint64
|
||||
var heightMtx sync.Mutex
|
||||
|
||||
var prev *pilorama.Move
|
||||
var batch []*pilorama.Move
|
||||
for m := range operationStream {
|
||||
// skip already applied op
|
||||
if prev != nil && prev.Time == m.Time {
|
||||
continue
|
||||
}
|
||||
prev = m
|
||||
batch = append(batch, m)
|
||||
|
||||
errGroup.Go(func() error {
|
||||
if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil {
|
||||
heightMtx.Lock()
|
||||
unappliedOperationHeight = min(unappliedOperationHeight, m.Time)
|
||||
heightMtx.Unlock()
|
||||
return err
|
||||
if len(batch) == 1000 {
|
||||
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||
return batch[0].Time
|
||||
}
|
||||
return nil
|
||||
})
|
||||
batch = batch[:0]
|
||||
}
|
||||
_ = errGroup.Wait()
|
||||
return unappliedOperationHeight
|
||||
}
|
||||
if len(batch) > 0 {
|
||||
if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil {
|
||||
return batch[0].Time
|
||||
}
|
||||
}
|
||||
return math.MaxUint64
|
||||
}
|
||||
|
||||
func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,
|
||||
|
|
Loading…
Reference in a new issue