diff --git a/cmd/frostfs-node/config/tree/config.go b/cmd/frostfs-node/config/tree/config.go index 8a8919999..da877791e 100644 --- a/cmd/frostfs-node/config/tree/config.go +++ b/cmd/frostfs-node/config/tree/config.go @@ -10,6 +10,8 @@ import ( const ( subsection = "tree" + + SyncBatchSizeDefault = 1000 ) // TreeConfig is a wrapper over "tree" config section @@ -74,6 +76,17 @@ func (c TreeConfig) SyncInterval() time.Duration { return config.DurationSafe(c.cfg, "sync_interval") } +// SyncBatchSize returns the value of "sync_batch_size" +// config parameter from the "tree" section. +// +// Returns `SyncBatchSizeDefault` if config value is not specified. +func (c TreeConfig) SyncBatchSize() int { + if v := config.IntSafe(c.cfg, "sync_batch_size"); v > 0 { + return int(v) + } + return SyncBatchSizeDefault +} + // AuthorizedKeys parses and returns an array of "authorized_keys" config // parameter from "tree" section. // diff --git a/cmd/frostfs-node/config/tree/config_test.go b/cmd/frostfs-node/config/tree/config_test.go index 285ea0725..6628b8878 100644 --- a/cmd/frostfs-node/config/tree/config_test.go +++ b/cmd/frostfs-node/config/tree/config_test.go @@ -44,6 +44,7 @@ func TestTreeSection(t *testing.T) { require.Equal(t, 32, treeSec.ReplicationWorkerCount()) require.Equal(t, 5*time.Second, treeSec.ReplicationTimeout()) require.Equal(t, time.Hour, treeSec.SyncInterval()) + require.Equal(t, 2000, treeSec.SyncBatchSize()) require.Equal(t, expectedKeys, treeSec.AuthorizedKeys()) } diff --git a/cmd/frostfs-node/tree.go b/cmd/frostfs-node/tree.go index f188e2fbc..a92979daf 100644 --- a/cmd/frostfs-node/tree.go +++ b/cmd/frostfs-node/tree.go @@ -62,6 +62,7 @@ func initTreeService(c *cfg) { tree.WithReplicationTimeout(treeConfig.ReplicationTimeout()), tree.WithReplicationChannelCapacity(treeConfig.ReplicationChannelCapacity()), tree.WithReplicationWorkerCount(treeConfig.ReplicationWorkerCount()), + tree.WithSyncBatchSize(treeConfig.SyncBatchSize()), tree.WithAuthorizedKeys(treeConfig.AuthorizedKeys()), tree.WithMetrics(c.metricsCollector.TreeService()), tree.WithAPELocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine.LocalStorage()), diff --git a/config/example/node.env b/config/example/node.env index 3979eb18f..e21328b60 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -31,6 +31,7 @@ FROSTFS_TREE_REPLICATION_CHANNEL_CAPACITY=32 FROSTFS_TREE_REPLICATION_WORKER_COUNT=32 FROSTFS_TREE_REPLICATION_TIMEOUT=5s FROSTFS_TREE_SYNC_INTERVAL=1h +FROSTFS_TREE_SYNC_BATCH_SIZE=2000 FROSTFS_TREE_AUTHORIZED_KEYS="0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56" # gRPC section diff --git a/config/example/node.json b/config/example/node.json index 1ea28de6c..ac7125949 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -69,6 +69,7 @@ "replication_worker_count": 32, "replication_timeout": "5s", "sync_interval": "1h", + "sync_batch_size": 2000, "authorized_keys": [ "0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0", "02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56" diff --git a/config/example/node.yaml b/config/example/node.yaml index 4a418dfcb..d547f5cee 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -59,6 +59,7 @@ tree: replication_channel_capacity: 32 replication_timeout: 5s sync_interval: 1h + sync_batch_size: 2000 authorized_keys: # list of hex-encoded public keys that have rights to use the Tree Service with frostfs-cli - 0397d207ea77909f7d66fa6f36d08daae22ace672be7ea4f53513484dde8a142a0 - 02053819235c20d784132deba10bb3061629e3a5c819a039ef091841d9d35dad56 diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 39122628f..6bb5e3a41 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -110,6 +110,34 @@ func (e *StorageEngine) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID str return nil } +// TreeApplyBatch implements the pilorama.Forest interface. +func (e *StorageEngine) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error { + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeApplyBatch", + trace.WithAttributes( + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + index, lst, err := e.getTreeShard(ctx, cnr, treeID) + if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) { + return err + } + + err = lst[index].TreeApplyBatch(ctx, cnr, treeID, m) + if err != nil { + if !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled { + e.reportShardError(lst[index], "can't perform `TreeApplyBatch`", err, + zap.Stringer("cid", cnr), + zap.String("tree", treeID), + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + return err + } + return nil +} + // TreeGetByPath implements the pilorama.Forest interface. func (e *StorageEngine) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.TreeGetByPath", diff --git a/pkg/local_object_storage/pilorama/boltdb.go b/pkg/local_object_storage/pilorama/boltdb.go index 7bce1f340..a778434dd 100644 --- a/pkg/local_object_storage/pilorama/boltdb.go +++ b/pkg/local_object_storage/pilorama/boltdb.go @@ -558,6 +558,80 @@ func (t *boltForest) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string return metaerr.Wrap(err) } +func (t *boltForest) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + t.metrics.AddMethodDuration("TreeApplyBatch", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "boltForest.TreeApplyBatch", + trace.WithAttributes( + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + m, err := t.filterSeen(cnr, treeID, m) + if err != nil { + return err + } + if len(m) == 0 { + success = true + return nil + } + + ch := make(chan error) + b := &batch{ + forest: t, + cid: cnr, + treeID: treeID, + results: []chan<- error{ch}, + operations: m, + } + go func() { + b.run() + }() + err = <-ch + success = err == nil + return metaerr.Wrap(err) +} + +func (t *boltForest) filterSeen(cnr cidSDK.ID, treeID string, m []*Move) ([]*Move, error) { + t.modeMtx.RLock() + defer t.modeMtx.RUnlock() + + if t.mode.NoMetabase() { + return nil, ErrDegradedMode + } + + ops := make([]*Move, 0, len(m)) + err := t.db.View(func(tx *bbolt.Tx) error { + treeRoot := tx.Bucket(bucketName(cnr, treeID)) + if treeRoot == nil { + ops = m + return nil + } + b := treeRoot.Bucket(logBucket) + for _, op := range m { + var logKey [8]byte + binary.BigEndian.PutUint64(logKey[:], op.Time) + seen := b.Get(logKey[:]) != nil + if !seen { + ops = append(ops, op) + } + } + return nil + }) + if err != nil { + return nil, metaerr.Wrap(err) + } + return ops, nil +} + // TreeApplyStream should be used with caution: this method locks other write transactions while `source` is not closed. func (t *boltForest) TreeApplyStream(ctx context.Context, cnr cidSDK.ID, treeID string, source <-chan *Move) error { var ( diff --git a/pkg/local_object_storage/pilorama/forest.go b/pkg/local_object_storage/pilorama/forest.go index bb5c22e51..374943745 100644 --- a/pkg/local_object_storage/pilorama/forest.go +++ b/pkg/local_object_storage/pilorama/forest.go @@ -111,6 +111,15 @@ func (f *memoryForest) TreeApply(_ context.Context, cnr cid.ID, treeID string, o return s.Apply(op) } +func (f *memoryForest) TreeApplyBatch(ctx context.Context, cnr cid.ID, treeID string, ops []*Move) error { + for _, op := range ops { + if err := f.TreeApply(ctx, cnr, treeID, op, true); err != nil { + return err + } + } + return nil +} + func (f *memoryForest) Init() error { return nil } diff --git a/pkg/local_object_storage/pilorama/interface.go b/pkg/local_object_storage/pilorama/interface.go index 61a3849bf..b6ca246f2 100644 --- a/pkg/local_object_storage/pilorama/interface.go +++ b/pkg/local_object_storage/pilorama/interface.go @@ -21,6 +21,8 @@ type Forest interface { // TreeApply applies replicated operation from another node. // If background is true, TreeApply will first check whether an operation exists. TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m *Move, backgroundSync bool) error + // TreeApplyBatch applies replicated operations from another node. + TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*Move) error // TreeGetByPath returns all nodes corresponding to the path. // The path is constructed by descending from the root using the values of the // AttributeFilename in meta. diff --git a/pkg/local_object_storage/shard/tree.go b/pkg/local_object_storage/shard/tree.go index 26dc8ec1e..01a014cec 100644 --- a/pkg/local_object_storage/shard/tree.go +++ b/pkg/local_object_storage/shard/tree.go @@ -106,6 +106,33 @@ func (s *Shard) TreeApply(ctx context.Context, cnr cidSDK.ID, treeID string, m * return s.pilorama.TreeApply(ctx, cnr, treeID, m, backgroundSync) } +// TreeApplyBatch implements the pilorama.Forest interface. +func (s *Shard) TreeApplyBatch(ctx context.Context, cnr cidSDK.ID, treeID string, m []*pilorama.Move) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeApplyBatch", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.String("container_id", cnr.EncodeToString()), + attribute.String("tree_id", treeID), + ), + ) + defer span.End() + + if s.pilorama == nil { + return ErrPiloramaDisabled + } + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + return s.pilorama.TreeApplyBatch(ctx, cnr, treeID, m) +} + // TreeGetByPath implements the pilorama.Forest interface. func (s *Shard) TreeGetByPath(ctx context.Context, cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.TreeGetByPath", diff --git a/pkg/services/tree/options.go b/pkg/services/tree/options.go index 1633ae557..a3f488009 100644 --- a/pkg/services/tree/options.go +++ b/pkg/services/tree/options.go @@ -41,6 +41,7 @@ type cfg struct { replicatorTimeout time.Duration containerCacheSize int authorizedKeys [][]byte + syncBatchSize int localOverrideStorage policyengine.LocalOverrideStorage morphChainStorage policyengine.MorphRuleChainStorageReader @@ -113,6 +114,12 @@ func WithReplicationWorkerCount(n int) Option { } } +func WithSyncBatchSize(n int) Option { + return func(c *cfg) { + c.syncBatchSize = n + } +} + func WithContainerCacheSize(n int) Option { return func(c *cfg) { if n > 0 { diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 95c8f8013..84e376cf7 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -40,6 +40,7 @@ const ( defaultReplicatorCapacity = 64 defaultReplicatorWorkerCount = 64 defaultReplicatorSendTimeout = time.Second * 5 + defaultSyncBatchSize = 1000 ) func (s *Service) localReplicationWorker(ctx context.Context) { diff --git a/pkg/services/tree/service.go b/pkg/services/tree/service.go index 8097d545c..b63338d25 100644 --- a/pkg/services/tree/service.go +++ b/pkg/services/tree/service.go @@ -55,6 +55,7 @@ func New(opts ...Option) *Service { s.replicatorChannelCapacity = defaultReplicatorCapacity s.replicatorWorkerCount = defaultReplicatorWorkerCount s.replicatorTimeout = defaultReplicatorSendTimeout + s.syncBatchSize = defaultSyncBatchSize s.metrics = defaultMetricsRegister{} for i := range opts { diff --git a/pkg/services/tree/sync.go b/pkg/services/tree/sync.go index ce1e72104..b93410616 100644 --- a/pkg/services/tree/sync.go +++ b/pkg/services/tree/sync.go @@ -177,37 +177,29 @@ func mergeOperationStreams(streams []chan *pilorama.Move, merged chan<- *piloram func (s *Service) applyOperationStream(ctx context.Context, cid cid.ID, treeID string, operationStream <-chan *pilorama.Move, ) uint64 { - errGroup, _ := errgroup.WithContext(ctx) - const workersCount = 1024 - errGroup.SetLimit(workersCount) - - // We run TreeApply concurrently for the operation batch. Let's consider two operations - // in the batch m1 and m2 such that m1.Time < m2.Time. The engine may apply m2 and fail - // on m1. That means the service must start sync from m1.Time in the next iteration and - // this height is stored in unappliedOperationHeight. - var unappliedOperationHeight uint64 = math.MaxUint64 - var heightMtx sync.Mutex - var prev *pilorama.Move + var batch []*pilorama.Move for m := range operationStream { // skip already applied op if prev != nil && prev.Time == m.Time { continue } prev = m + batch = append(batch, m) - errGroup.Go(func() error { - if err := s.forest.TreeApply(ctx, cid, treeID, m, true); err != nil { - heightMtx.Lock() - unappliedOperationHeight = min(unappliedOperationHeight, m.Time) - heightMtx.Unlock() - return err + if len(batch) == s.syncBatchSize { + if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { + return batch[0].Time } - return nil - }) + batch = batch[:0] + } } - _ = errGroup.Wait() - return unappliedOperationHeight + if len(batch) > 0 { + if err := s.forest.TreeApplyBatch(ctx, cid, treeID, batch); err != nil { + return batch[0].Time + } + } + return math.MaxUint64 } func (s *Service) startStream(ctx context.Context, cid cid.ID, treeID string,