diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 39122628f..18f411c97 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -110,6 +110,34 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str return nil } +// TreeApply implements the pilorama.Forest interface. +func (e *StorageEngine) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApplyBatch", + trace.WithAttributes( + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + index, lst, err := e.getTreeShard(ctx, cnr, treeID) + if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { + return err + } + + err = lst[index].TreeApplyBatch(ctx, cnr, treeID, m) + if err != nil { + if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled { + e.reportShardError(lst[index], "can't perform `TreeApplyBatch`", err, + zap.Stringer("cid", cnr), + zap.String("tree", treeID), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + return err + } + return nil +} + // TreeGetByPath implements the pilorama.Forest interface. func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath", diff --git a/pkg/local_object_storage/pilorama/batch.go b/pkg/local_object_storage/pilorama/batch.go index 520c6dfb4..5164f2261 100644 --- a/pkg/local_object_storage/pilorama/batch.go +++ b/pkg/local_object_storage/pilorama/batch.go @@ -2,8 +2,10 @@ package pilorama import ( "encoding/binary" + "fmt" "slices" "sort" + "strconv" "sync" "time" @@ -35,6 +37,10 @@ func (b *batch) trigger() { } func (b *batch) run() { + fmt.Printf("batch.run started %s\n", time.Now().Format(time.RFC3339)) + defer func() { + fmt.Printf("batch.run completed %s\n", time.Now().Format(time.RFC3339)) + }() fullID := bucketName(b.cid, b.treeID) err := b.forest.db.Update(func(tx *bbolt.Tx) error { bLog, bTree, err := b.forest.getTreeBuckets(tx, fullID) @@ -71,12 +77,14 @@ func (b *batch) run() { _, _, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], b.operations[i].Child)) if inTree { slow = true + fmt.Println("node in tree, slow = true") break } key := childrenKey(cKey[:], b.operations[i].Child, 0) k, _ := bTree.Cursor().Seek(key) if len(k) == childrenKeySize && binary.LittleEndian.Uint64(k[1:]) == b.operations[i].Child { + fmt.Println("node is parent, slow = true") slow = true break } @@ -94,6 +102,7 @@ func (b *batch) run() { p := b.operations[i].Parent _, ts, _, inTree := b.forest.getState(bTree, stateKey(cKey[:], p)) if !inTree || b.operations[0].Time < ts { + fmt.Printf("parent in tree: %s, b.operations[0].Time: %d, ts: %d\n", strconv.FormatBool(inTree), b.operations[0].Time, ts) slow = true break } diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index e2d69cafa..9e8391b21 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -559,6 +559,82 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string return metaerr.Wrap(err) } +func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch", + trace.WithAttributes( + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + m, err := t.filterSeen(cnr, treeID, m) + if err != nil { + return err + } + if len(m) == 0 { + success = true + return nil + } + + ch := make(chan error) + b := &batch{ + forest: t, + cid: cnr, + treeID: treeID, + results: []chan<- error{ch}, + operations: m, + } + go func() { + b.run() + }() + err = <-ch + success = err == nil + return metaerr.Wrap(err) +} + +func (t *boltForest) filterSeen(cnr cidSDK.ID, treeID string, m []*Move) ([]*Move, error) { + t.modeMtx.RLock() + defer t.modeMtx.RUnlock() + + if t.mode.NoMetabase() { + return nil, ErrDegradedMode + } + + ops := make([]*Move, 0, len(m)) + for _, op := range m { + var seen bool + err := t.db.View(func(tx *bbolt.Tx) error { + treeRoot := tx.Bucket(bucketName(cnr, treeID)) + if treeRoot == nil { + return nil + } + + b := treeRoot.Bucket(logBucket) + + var logKey [8]byte + binary.BigEndian.PutUint64(logKey[:], op.Time) + seen = b.Get(logKey[:]) != nil + return nil + }) + if err != nil { + return nil, metaerr.Wrap(err) + } + if !seen { + ops = append(ops, op) + } + } + return ops, nil +} + // TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed. func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error { var ( @@ -689,6 +765,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M b := bytes.NewReader(nil) r := io.NewBinReaderFromIO(b) + fmt.Printf("batch.run.undo started %s, time %d\n", time.Now().Format(time.RFC3339), ms[0].Time) // 1. Undo up until the desired timestamp is here. for len(key) == 8 && ms[0].Time < binary.BigEndian.Uint64(key) { b.Reset(value) @@ -705,6 +782,7 @@ func (t *boltForest) applyOperation(logBucket, treeBucket *bbolt.Bucket, ms []*M key, value = c.Prev() } + fmt.Printf("batch.run.insert/redo started %s\n", time.Now().Format(time.RFC3339)) for i := range len(ms) { // Loop invariant: key represents the next stored timestamp after ms[i].Time. diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index 78503bada..a5769af16 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -112,6 +112,15 @@ func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, o return s.Apply(op) } +func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID string, ops []*Move) error { + for _, op := range ops { + if err := f.TreeApply(ctx, cnr, treeID, op, true); err != nil { + return err + } + } + return nil +} + func (f *memoryForest) Init() error { return nil } diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index 61a3849bf..b6ca246f2 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -21,6 +21,8 @@ type Forest interface { // TreeApply applies replicated operation from another node. // If background is true, TreeApply will first check whether an operation exists. TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error + // TreeApplyBatch applies replicated operations from another node. + TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error // TreeGetByPath returns all nodes corresponding to the path. // The path is constructed by descending from the root using the values of the // AttributeFilename in meta. diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index 26dc8ec1e..3af88aa7e 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -106,6 +106,33 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m * return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync) } +// TreeApply implements the pilorama.Forest interface. +func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyBatch", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + if s.pilorama == nil { + return ErrPiloramaDisabled + } + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m) +} + // TreeGetByPath implements the pilorama.Forest interface. func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath", diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index ce1e72104..157de2c1b 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -177,37 +177,29 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string, operationStream <-chan *pilorama.Move, ) uint64 { - errGroup, _ := errgroup.WithContext(ctx) - const workersCount = 1024 - errGroup.SetLimit(workersCount) - - // We run TreeApply concurrently for the operation batch. Let's consider two operations - // in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail - // on m1. That means the service must start sync from m1.Time in the next iteration and - // this height is stored in unappliedOperationHeight. - var unappliedOperationHeight uint64 = math.MaxUint64 - var heightMtx sync.Mutex - var prev *pilorama.Move + var batch []*pilorama.Move for m := range operationStream { // skip already applied op if prev != nil && prev.Time == m.Time { continue } prev = m + batch = append(batch, m) - errGroup.Go(func() error { - if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil { - heightMtx.Lock() - unappliedOperationHeight = min(unappliedOperationHeight, m.Time) - heightMtx.Unlock() - return err + if len(batch) == 1000 { + if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { + return batch[0].Time } - return nil - }) + batch = batch[:0] + } } - _ = errGroup.Wait() - return unappliedOperationHeight + if len(batch) > 0 { + if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { + return batch[0].Time + } + } + return math.MaxUint64 } func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string, @@ -384,7 +376,7 @@ func (s *Service) syncLoop(ctx context.Context) { return case <-s.syncChan: ctx, span := tracing.StartSpanFromContext(ctx, "TreeService.sync") - s.log.Debug(logs.TreeSyncingTrees) + s.log.Info(logs.TreeSyncingTrees) start := time.Now() @@ -402,7 +394,7 @@ func (s *Service) syncLoop(ctx context.Context) { s.removeContainers(ctx, newMap) - s.log.Debug(logs.TreeTreesHaveBeenSynchronized) + s.log.Info(logs.TreeTreesHaveBeenSynchronized) s.metrics.AddSyncDuration(time.Since(start), true) span.End()