From 5daff28e69afcc5a3144a7adfd554b0fd727c4d7 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Wed, 18 Sep 2024 12:15:32 +0300 Subject: [PATCH] [#1356] engine: Evacuate object from shards concurrently Signed-off-by: Anton Nikiforov --- cmd/frostfs-node/config.go | 13 ++ cmd/frostfs-node/config/engine/config.go | 44 +++++ config/example/node.env | 4 + config/example/node.json | 4 + config/example/node.yaml | 4 + docs/storage-node-configuration.md | 16 +- pkg/local_object_storage/engine/engine.go | 38 +++++ pkg/local_object_storage/engine/evacuate.go | 77 ++++----- .../engine/evacuate_test.go | 4 +- pkg/local_object_storage/metabase/list.go | 159 ++++++++++++++++++ pkg/local_object_storage/shard/list.go | 41 +++++ 11 files changed, 356 insertions(+), 48 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index ed3a65c25..e8adc9855 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -110,6 +110,11 @@ type applicationConfiguration struct { shardPoolSize uint32 shards []shardCfg lowMem bool + + evacuationShardWorkerCount uint32 + evacuationObjectWorkerCount uint32 + evacuationContainerWorkerCount uint32 + evacuationObjectBatchSize uint32 } // if need to run node in compatibility with other versions mode @@ -228,6 +233,10 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) + a.EngineCfg.evacuationShardWorkerCount = engineconfig.EngineEvacuationShardWorkerCount(c) + a.EngineCfg.evacuationObjectWorkerCount = engineconfig.EngineEvacuationObjectWorkerCount(c) + a.EngineCfg.evacuationObjectBatchSize = engineconfig.EngineEvacuationObjectBatchSize(c) + a.EngineCfg.evacuationContainerWorkerCount = engineconfig.EngineEvacuationContainerWorkerCount(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) } @@ -829,6 +838,10 @@ func (c *cfg) engineOpts() []engine.Option { engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), + engine.WithEvacuationShardWorkerCount(c.EngineCfg.evacuationShardWorkerCount), + engine.WithEvacuationObjectWorkerCount(c.EngineCfg.evacuationObjectWorkerCount), + engine.WithEvacuationObjectBatchSize(c.EngineCfg.evacuationObjectBatchSize), + engine.WithEvacuationContainerWorkerCount(c.EngineCfg.evacuationContainerWorkerCount), ) if c.metricsCollector != nil { diff --git a/cmd/frostfs-node/config/engine/config.go b/cmd/frostfs-node/config/engine/config.go index c944d1c58..318508ea1 100644 --- a/cmd/frostfs-node/config/engine/config.go +++ b/cmd/frostfs-node/config/engine/config.go @@ -15,6 +15,18 @@ const ( // ShardPoolSizeDefault is a default value of routine pool size per-shard to // process object PUT operations in a storage engine. ShardPoolSizeDefault = 20 + // EvacuationShardWorkerCountDefault is a default value of the count of shards + // evacuees concurrently. + EvacuationShardWorkerCountDefault = 5 + // EvacuationContainerWorkerCountDefault is a default value of the count of + // concurrent container evacuation workers per shard. + EvacuationContainerWorkerCountDefault = 10 + // EvacuationObjectWorkerCountDefault is a default value of the count of + // concurrent object evacuation workers for each container. + EvacuationObjectWorkerCountDefault = 10 + // EvacuationObjectBatchSizeDefault is a default value of the count of + // objects reading from metabase. + EvacuationObjectBatchSizeDefault = 100 ) // ErrNoShardConfigured is returned when at least 1 shard is required but none are found. @@ -88,3 +100,35 @@ func ShardErrorThreshold(c *config.Config) uint32 { func EngineLowMemoryConsumption(c *config.Config) bool { return config.BoolSafe(c.Sub(subsection), "low_mem") } + +// EngineEvacuationShardWorkerCount returns value of "evacuation_shard_worker_count" config parameter from "storage" section. +func EngineEvacuationShardWorkerCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_shard_worker_count"); v > 0 { + return v + } + return EvacuationShardWorkerCountDefault +} + +// EngineEvacuationObjectWorkerCount returns value of "evacuation_object_worker_count" config parameter from "storage" section. +func EngineEvacuationObjectWorkerCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_worker_count"); v > 0 { + return v + } + return EvacuationObjectWorkerCountDefault +} + +// EngineEvacuationContainerWorkerCount returns value of "evacuation_container_worker_count" config parameter from "storage" section. +func EngineEvacuationContainerWorkerCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_container_worker_count"); v > 0 { + return v + } + return EvacuationContainerWorkerCountDefault +} + +// EngineEvacuationObjectBatchSize returns value of "evacuation_object_batch_size" config parameter from "storage" section. +func EngineEvacuationObjectBatchSize(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_batch_size"); v > 0 { + return v + } + return EvacuationObjectBatchSizeDefault +} diff --git a/config/example/node.env b/config/example/node.env index 6618a981a..33e1710ed 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -91,6 +91,10 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 # Storage engine section FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 +EVACUATION_SHARD_WORKER_COUNT=5 +EVACUATION_OBJECT_WORKER_COUNT=10 +EVACUATION_CONTAINER_WORKER_COUNT=10 +EVACUATION_OBJECT_BATCH_SIZE=100 ## 0 shard ### Flag to refill Metabase from BlobStor FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false diff --git a/config/example/node.json b/config/example/node.json index 0d100ed80..605e39877 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -136,6 +136,10 @@ "storage": { "shard_pool_size": 15, "shard_ro_error_threshold": 100, + "evacuation_object_batch_size": 100, + "evacuation_container_worker_count": 10, + "evacuation_object_worker_count": 10, + "evacuation_shard_worker_count": 5, "shard": { "0": { "mode": "read-only", diff --git a/config/example/node.yaml b/config/example/node.yaml index 86be35ba8..39c2e7c9d 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -118,6 +118,10 @@ storage: # note: shard configuration can be omitted for relay node (see `node.relay`) shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) + evacuation_object_batch_size: 100 # amount of objects reading from metabase at once for evacuation + evacuation_container_worker_count: 10 # the count of the concurrent container evacuation workers for each shard + evacuation_object_worker_count: 10 # the count of the concurrent object evacuation workers per container + evacuation_shard_worker_count: 5 # amount of shards evacuees concurrently shard: default: # section with the default shard parameters diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index c74695e2b..1a8de5847 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -167,12 +167,16 @@ morph: Local storage engine configuration. -| Parameter | Type | Default value | Description | -|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| -| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | -| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | -| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | -| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | +| Parameter | Type | Default value | Description | +|-------------------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| +| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | +| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | +| `evacuation_object_batch_size` | `int` | `100` | Amount of objects reading from metabase at once for evacuation. | +| `evacuation_container_worker_count` | `int` | `10` | The count of the concurrent container evacuation workers for each shard. | +| `evacuation_object_worker_count` | `int` | `10` | The count of the concurrent object evacuation workers per container. | +| `evacuation_shard_worker_count` | `int` | `5` | Amount of shards evacuees concurrently. | +| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | +| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | ## `shard` subsection diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 5e883a641..62f2bcd43 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -213,12 +213,22 @@ type cfg struct { lowMem bool containerSource atomic.Pointer[containerSource] + + evacuationShardWorkerCount uint32 + evacuationObjectWorkerCount uint32 + evacuationObjectBatchSize uint32 + evacuationContainerWorkerCount uint32 } func defaultCfg() *cfg { res := &cfg{ log: &logger.Logger{Logger: zap.L()}, shardPoolSize: 20, + + evacuationShardWorkerCount: 5, + evacuationObjectWorkerCount: 10, + evacuationObjectBatchSize: 100, + evacuationContainerWorkerCount: 10, } res.containerSource.Store(&containerSource{}) return res @@ -277,6 +287,34 @@ func WithLowMemoryConsumption(lowMemCons bool) Option { } } +// WithEvacuationShardWorkerCount returns an option to set the count of shards evacuees concurrently. +func WithEvacuationShardWorkerCount(count uint32) Option { + return func(c *cfg) { + c.evacuationShardWorkerCount = count + } +} + +// WithEvacuationObjectWorkerCount returns an option to set the count of the concurrent object evacuation workers per container. +func WithEvacuationObjectWorkerCount(count uint32) Option { + return func(c *cfg) { + c.evacuationObjectWorkerCount = count + } +} + +// WithEvacuationObjectBatchSize returns an option to set the count of object reading from metabase. +func WithEvacuationObjectBatchSize(count uint32) Option { + return func(c *cfg) { + c.evacuationObjectBatchSize = count + } +} + +// WithEvacuationContainerWorkerCount returns an option to set the count of the concurrent container evacuation workers for each shard. +func WithEvacuationContainerWorkerCount(count uint32) Option { + return func(c *cfg) { + c.evacuationContainerWorkerCount = count + } +} + // SetContainerSource sets container source. func (e *StorageEngine) SetContainerSource(cs container.Source) { e.containerSource.Store(&containerSource{cs: cs}) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 7bef6edfb..8ca2f851c 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -10,7 +10,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -24,6 +23,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ( @@ -189,8 +189,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { return res } -const defaultEvacuateBatchSize = 100 - type pooledShard struct { hashedShard pool util.WorkerPool @@ -287,12 +285,21 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p return err } + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(int(e.cfg.evacuationShardWorkerCount)) for _, shardID := range shardIDs { - if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) - return err - } + eg.Go(func() error { + if err = e.evacuateShard(egCtx, shardID, prm, res, shards, shardsToEvacuate); err != nil { + e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) + return err + } + return nil + }) + } + err = eg.Wait() + if err != nil { + return err } e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, @@ -344,53 +351,41 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E )) defer span.End() + eg, egCtx := errgroup.WithContext(ctx) if prm.Scope.WithObjects() { - if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - return err - } + eg.Go(func() error { + return e.evacuateShardObjects(egCtx, shardID, prm, res, shards, shardsToEvacuate) + }) } - if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { - if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - return err - } + eg.Go(func() error { + return e.evacuateShardTrees(egCtx, shardID, prm, res, shards, shardsToEvacuate) + }) } - return nil + return eg.Wait() } func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { - var listPrm shard.ListWithCursorPrm - listPrm.WithCount(defaultEvacuateBatchSize) - sh := shardsToEvacuate[shardID] sh.SetEvacuationInProgress(true) - var c *meta.Cursor - for { - listPrm.WithCursor(c) - - // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes - // because ListWithCursor works only with the metabase. - listRes, err := sh.ListWithCursor(ctx, listPrm) - if err != nil { - if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { - break - } - e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } - - if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil { - return err - } - - c = listRes.Cursor() + var listPrm shard.ListConcurrentlyPrm + listPrm.BatchSize = e.cfg.evacuationObjectBatchSize + listPrm.Handler = func(ctx context.Context, addrList []object.Info) error { + return e.evacuateObjects(ctx, sh, addrList, prm, res, shards, shardsToEvacuate) } - return nil + listPrm.ObjectsWorkers = e.cfg.evacuationObjectWorkerCount + listPrm.ContainersWorkers = e.cfg.evacuationContainerWorkerCount + + err := sh.ListConcurrently(ctx, listPrm) + if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + return err } func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 28529fab9..803079a6d 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -29,7 +29,9 @@ import ( func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir := t.TempDir() - te := testNewEngine(t). + te := testNewEngine(t, + WithEvacuationObjectBatchSize(1), WithEvacuationShardWorkerCount(2), + WithEvacuationContainerWorkerCount(2), WithEvacuationObjectWorkerCount(2)). setShardsNumOpts(t, shardNum, func(id int) []shard.Option { return []shard.Option{ shard.WithLogger(test.NewLogger(t)), diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index b4326a92c..9b63f8faa 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -14,6 +14,7 @@ import ( "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" ) // ErrEndOfListing is returned from object listing with cursor @@ -61,6 +62,18 @@ func (l ListRes) Cursor() *Cursor { return l.cursor } +// ListConcurrentlyPrm contains parameters for ListWithCursor operation. +type ListConcurrentlyPrm struct { + // Handler concurrently executed upon objects in db. + Handler func(context.Context, []objectcore.Info) error + // BatchSize maximum amount of addresses that will be passed to Handler. + BatchSize uint32 + // ContainersWorkers amount of containers computed concurrently. + ContainersWorker uint32 + // ObjectsWorkers amount of workers runs Handler concurrently for each container. + ObjectsWorker uint32 +} + // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes objects of all types. Does not include inhumed objects. // Use cursor value from response for consecutive requests. @@ -259,3 +272,149 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) return rawID, name[0] } + +// ListConcurrently lists physical objects available in metabase starting from first. +// Includes objects of all types. Does not include inhumed objects. +func (db *DB) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ListConcurrently", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.ListConcurrently", + trace.WithAttributes( + attribute.Int("batch_size", int(prm.BatchSize)), + attribute.Bool("has_handler", prm.Handler != nil), + attribute.Int("objects_worker", int(prm.ObjectsWorker)), + attribute.Int("containers_worker", int(prm.ContainersWorker)), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + return db.listConcurrently(ctx, tx, prm) + }) + success = err == nil + return metaerr.Wrap(err) +} + +func (db *DB) listConcurrently(ctx context.Context, tx *bbolt.Tx, prm ListConcurrentlyPrm) error { + c := tx.Cursor() + name, _ := c.First() + + var containerID cid.ID + graveyardBkt := tx.Bucket(graveyardBucketName) + garbageBkt := tx.Bucket(garbageBucketName) + + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(int(prm.ContainersWorker)) + + for ; name != nil; name, _ = c.Next() { + cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name) + if cidRaw == nil { + continue + } + + var objType objectSDK.Type + + switch prefix { + case primaryPrefix: + objType = objectSDK.TypeRegular + case lockersPrefix: + objType = objectSDK.TypeLock + case tombstonePrefix: + objType = objectSDK.TypeTombstone + default: + continue + } + + bkt := tx.Bucket(name) + if bkt != nil { + rawAddr := make([]byte, cidSize, addressKeySize) + copy(rawAddr, cidRaw) + var cnt cid.ID + copy(cnt[:], containerID[:]) + eg.Go(func() error { + return selectConcurrentlyFromBucket(egCtx, + bkt, objType, graveyardBkt, garbageBkt, rawAddr, cnt, prm) + }) + } + } + + return eg.Wait() +} + +// selectConcurrentlyFromBucket similar to selectAllFromBucket but process selected objects concurrently. +// Ignores inhumed objects. +func selectConcurrentlyFromBucket(ctx context.Context, + bkt *bbolt.Bucket, // main bucket + objType objectSDK.Type, // type of the objects stored in the main bucket + graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets + cidRaw []byte, // container ID prefix, optimization + cnt cid.ID, // container ID + prm ListConcurrentlyPrm, +) error { + c := bkt.Cursor() + k, v := c.First() + batch := make([]objectcore.Info, 0, prm.BatchSize) + + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(int(prm.ObjectsWorker)) + + for ; k != nil; k, v = c.Next() { + var obj oid.ID + if err := obj.Decode(k); err != nil { + break + } + + if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { + continue + } + + var isLinkingObj bool + var ecInfo *objectcore.ECInfo + if objType == objectSDK.TypeRegular { + var o objectSDK.Object + if err := o.Unmarshal(v); err != nil { + return err + } + isLinkingObj = isLinkObject(&o) + ecHeader := o.ECHeader() + if ecHeader != nil { + ecInfo = &objectcore.ECInfo{ + ParentID: ecHeader.Parent(), + Index: ecHeader.Index(), + Total: ecHeader.Total(), + } + } + } + + var a oid.Address + a.SetContainer(cnt) + a.SetObject(obj) + batch = append(batch, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}) + if len(batch) >= int(prm.BatchSize) { + addrs := make([]objectcore.Info, 0, len(batch)) + addrs = append(addrs, batch...) + eg.Go(func() error { + return prm.Handler(egCtx, addrs) + }) + batch = batch[:0] + } + } + if len(batch) > 0 { + eg.Go(func() error { + return prm.Handler(egCtx, batch) + }) + } + + return eg.Wait() +} diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 08ea81a0c..4f6076d6e 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -34,6 +34,18 @@ func (r ListContainersRes) Containers() []cid.ID { return r.containers } +// ListConcurrentlyPrm contains parameters for ListWithCursor operation. +type ListConcurrentlyPrm struct { + // Handler concurrently executed upon objects in db. + Handler func(context.Context, []objectcore.Info) error + // BatchSize maximum amount of addresses that will be passed to Handler. + BatchSize uint32 + // ContainersWorkers amount of containers computed concurrently. + ContainersWorkers uint32 + // ObjectsWorkers amount of workers runs Handler concurrently + ObjectsWorkers uint32 +} + // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 @@ -164,3 +176,32 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List cursor: res.Cursor(), }, nil } + +// ListConcurrently lists physical objects available in shard starting from first. +// Includes regular, tombstone and storage group objects. Does not include inhumed objects. +func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) error { + _, span := tracing.StartSpanFromContext(ctx, "shard.ListConcurrently", + trace.WithAttributes( + attribute.Int64("batch_size", int64(prm.BatchSize)), + attribute.Bool("has_handler", prm.Handler != nil), + attribute.Int("objects_workers", int(prm.ObjectsWorkers)), + attribute.Int("containers_workers", int(prm.ContainersWorkers)), + )) + defer span.End() + + if s.GetMode().NoMetabase() { + return ErrDegradedMode + } + + var metaPrm meta.ListConcurrentlyPrm + metaPrm.BatchSize = prm.BatchSize + metaPrm.Handler = prm.Handler + metaPrm.ContainersWorker = prm.ContainersWorkers + metaPrm.ObjectsWorker = prm.ObjectsWorkers + err := s.metaBase.ListConcurrently(ctx, metaPrm) + if err != nil { + return fmt.Errorf("could not list objects concurrently: %w", err) + } + + return nil +}