diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 39122628f..18f411c97 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -110,6 +110,34 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str return nil } +// TreeApply implements the pilorama.Forest interface. +func (e *StorageEngine) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApplyBatch", + trace.WithAttributes( + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + index, lst, err := e.getTreeShard(ctx, cnr, treeID) + if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { + return err + } + + err = lst[index].TreeApplyBatch(ctx, cnr, treeID, m) + if err != nil { + if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled { + e.reportShardError(lst[index], "can't perform `TreeApplyBatch`", err, + zap.Stringer("cid", cnr), + zap.String("tree", treeID), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + return err + } + return nil +} + // TreeGetByPath implements the pilorama.Forest interface. func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath", diff --git a/pkg/local_object_storage/pilorama/batch.go b/pkg/local_object_storage/pilorama/batch.go index 520c6dfb4..d9f70d5f3 100644 --- a/pkg/local_object_storage/pilorama/batch.go +++ b/pkg/local_object_storage/pilorama/batch.go @@ -2,8 +2,10 @@ package pilorama import ( "encoding/binary" + "fmt" "slices" "sort" + "strconv" "sync" "time" @@ -71,12 +73,14 @@ func (b *batch) run() { _, _, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], b.operations[i].Child)) if inTree { slow = true + fmt.Println("node in tree, slow = true") break } key := childrenKey(cKey[:], b.operations[i].Child, 0) k, _ := bTree.Cursor().Seek(key) if len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == b.operations[i].Child { + fmt.Println("node is parent, slow = true") slow = true break } @@ -94,6 +98,7 @@ func (b *batch) run() { p := b.operations[i].Parent _, ts, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], p)) if !inTree || b.operations[0].Time < ts { + fmt.Printf("parent in tree: %s, b.operations[0].Time: %d, ts: %d\n", strconv.FormatBool(inTree), b.operations[0].Time, ts) slow = true break } diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index e2d69cafa..8fabbe0ed 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -559,6 +559,42 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string return metaerr.Wrap(err) } +func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch", + trace.WithAttributes( + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + if len(m) == 0 { + return nil + } + + ch := make(chan error) + b := &batch{ + forest: t, + cid: cnr, + treeID: treeID, + results: []chan<- error{ch}, + operations: m, + } + go func() { + b.run() + }() + err := <-ch + return metaerr.Wrap(err) +} + // TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed. func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error { var ( @@ -692,6 +728,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M // 1. Undo up until the desired timestamp is here. for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) { b.Reset(value) + fmt.Printf("applyOperation: time %d\n", binary.BigEndian.Uint64(key)) tmp.Child = r.ReadU64LE() tmp.Parent = r.ReadU64LE() diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 78503bada..a5769af16 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -112,6 +112,15 @@ func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, o return s.Apply(op) } +func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID string, ops []*Move) error { + for _, op := range ops { + if err := f.TreeApply(ctx, cnr, treeID, op, true); err != nil { + return err + } + } + return nil +} + func (f *memoryForest) Init() error { return nil } diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index 61a3849bf..b6ca246f2 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -21,6 +21,8 @@ type Forest interface { // TreeApply applies replicated operation from another node. // If background is true, TreeApply will first check whether an operation exists. TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error + // TreeApplyBatch applies replicated operations from another node. + TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error // TreeGetByPath returns all nodes corresponding to the path. // The path is constructed by descending from the root using the values of the // AttributeFilename in meta. diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index 26dc8ec1e..3af88aa7e 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -106,6 +106,33 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m * return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync) } +// TreeApply implements the pilorama.Forest interface. +func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyBatch", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + if s.pilorama == nil { + return ErrPiloramaDisabled + } + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m) +} + // TreeGetByPath implements the pilorama.Forest interface. func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath", diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index ce1e72104..5a83dd161 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -177,37 +177,29 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string, operationStream <-chan *pilorama.Move, ) uint64 { - errGroup, _ := errgroup.WithContext(ctx) - const workersCount = 1024 - errGroup.SetLimit(workersCount) - - // We run TreeApply concurrently for the operation batch. Let's consider two operations - // in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail - // on m1. That means the service must start sync from m1.Time in the next iteration and - // this height is stored in unappliedOperationHeight. - var unappliedOperationHeight uint64 = math.MaxUint64 - var heightMtx sync.Mutex - var prev *pilorama.Move + var batch []*pilorama.Move for m := range operationStream { // skip already applied op if prev != nil && prev.Time == m.Time { continue } prev = m + batch = append(batch, m) - errGroup.Go(func() error { - if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil { - heightMtx.Lock() - unappliedOperationHeight = min(unappliedOperationHeight, m.Time) - heightMtx.Unlock() - return err + if len(batch) == 1000 { + if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { + return batch[0].Time } - return nil - }) + batch = batch[:0] + } } - _ = errGroup.Wait() - return unappliedOperationHeight + if len(batch) > 0 { + if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { + return batch[0].Time + } + } + return math.MaxUint64 } func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,