[#1356] engine: Evacuate object from shards concurrently
Some checks failed
DCO action / DCO (pull_request) Successful in 1m1s
Tests and linters / Run gofumpt (pull_request) Successful in 1m11s
Vulncheck / Vulncheck (pull_request) Successful in 2m1s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m11s
Build / Build Components (pull_request) Successful in 2m25s
Tests and linters / gopls check (pull_request) Successful in 2m42s
Tests and linters / Staticcheck (pull_request) Successful in 2m46s
Tests and linters / Lint (pull_request) Successful in 3m37s
Tests and linters / Tests with -race (pull_request) Failing after 4m48s
Tests and linters / Tests (pull_request) Failing after 1m50s

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-18 12:15:32 +03:00
parent d4bec24c9f
commit 5daff28e69
11 changed files with 356 additions and 48 deletions

View file

@ -110,6 +110,11 @@ type applicationConfiguration struct {
shardPoolSize uint32 shardPoolSize uint32
shards []shardCfg shards []shardCfg
lowMem bool lowMem bool
evacuationShardWorkerCount uint32
evacuationObjectWorkerCount uint32
evacuationContainerWorkerCount uint32
evacuationObjectBatchSize uint32
} }
// if need to run node in compatibility with other versions mode // if need to run node in compatibility with other versions mode
@ -228,6 +233,10 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
a.EngineCfg.evacuationShardWorkerCount = engineconfig.EngineEvacuationShardWorkerCount(c)
a.EngineCfg.evacuationObjectWorkerCount = engineconfig.EngineEvacuationObjectWorkerCount(c)
a.EngineCfg.evacuationObjectBatchSize = engineconfig.EngineEvacuationObjectBatchSize(c)
a.EngineCfg.evacuationContainerWorkerCount = engineconfig.EngineEvacuationContainerWorkerCount(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
} }
@ -829,6 +838,10 @@ func (c *cfg) engineOpts() []engine.Option {
engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log), engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
engine.WithEvacuationShardWorkerCount(c.EngineCfg.evacuationShardWorkerCount),
engine.WithEvacuationObjectWorkerCount(c.EngineCfg.evacuationObjectWorkerCount),
engine.WithEvacuationObjectBatchSize(c.EngineCfg.evacuationObjectBatchSize),
engine.WithEvacuationContainerWorkerCount(c.EngineCfg.evacuationContainerWorkerCount),
) )
if c.metricsCollector != nil { if c.metricsCollector != nil {

View file

@ -15,6 +15,18 @@ const (
// ShardPoolSizeDefault is a default value of routine pool size per-shard to // ShardPoolSizeDefault is a default value of routine pool size per-shard to
// process object PUT operations in a storage engine. // process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20 ShardPoolSizeDefault = 20
// EvacuationShardWorkerCountDefault is a default value of the count of shards
// evacuees concurrently.
EvacuationShardWorkerCountDefault = 5
// EvacuationContainerWorkerCountDefault is a default value of the count of
// concurrent container evacuation workers per shard.
EvacuationContainerWorkerCountDefault = 10
// EvacuationObjectWorkerCountDefault is a default value of the count of
// concurrent object evacuation workers for each container.
EvacuationObjectWorkerCountDefault = 10
// EvacuationObjectBatchSizeDefault is a default value of the count of
// objects reading from metabase.
EvacuationObjectBatchSizeDefault = 100
) )
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found. // ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
@ -88,3 +100,35 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func EngineLowMemoryConsumption(c *config.Config) bool { func EngineLowMemoryConsumption(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "low_mem") return config.BoolSafe(c.Sub(subsection), "low_mem")
} }
// EngineEvacuationShardWorkerCount returns value of "evacuation_shard_worker_count" config parameter from "storage" section.
func EngineEvacuationShardWorkerCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_shard_worker_count"); v > 0 {
return v
}
return EvacuationShardWorkerCountDefault
}
// EngineEvacuationObjectWorkerCount returns value of "evacuation_object_worker_count" config parameter from "storage" section.
func EngineEvacuationObjectWorkerCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_worker_count"); v > 0 {
return v
}
return EvacuationObjectWorkerCountDefault
}
// EngineEvacuationContainerWorkerCount returns value of "evacuation_container_worker_count" config parameter from "storage" section.
func EngineEvacuationContainerWorkerCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_container_worker_count"); v > 0 {
return v
}
return EvacuationContainerWorkerCountDefault
}
// EngineEvacuationObjectBatchSize returns value of "evacuation_object_batch_size" config parameter from "storage" section.
func EngineEvacuationObjectBatchSize(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_batch_size"); v > 0 {
return v
}
return EvacuationObjectBatchSizeDefault
}

View file

@ -91,6 +91,10 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
# Storage engine section # Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_POOL_SIZE=15
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
EVACUATION_SHARD_WORKER_COUNT=5
EVACUATION_OBJECT_WORKER_COUNT=10
EVACUATION_CONTAINER_WORKER_COUNT=10
EVACUATION_OBJECT_BATCH_SIZE=100
## 0 shard ## 0 shard
### Flag to refill Metabase from BlobStor ### Flag to refill Metabase from BlobStor
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false

View file

@ -136,6 +136,10 @@
"storage": { "storage": {
"shard_pool_size": 15, "shard_pool_size": 15,
"shard_ro_error_threshold": 100, "shard_ro_error_threshold": 100,
"evacuation_object_batch_size": 100,
"evacuation_container_worker_count": 10,
"evacuation_object_worker_count": 10,
"evacuation_shard_worker_count": 5,
"shard": { "shard": {
"0": { "0": {
"mode": "read-only", "mode": "read-only",

View file

@ -118,6 +118,10 @@ storage:
# note: shard configuration can be omitted for relay node (see `node.relay`) # note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
evacuation_object_batch_size: 100 # amount of objects reading from metabase at once for evacuation
evacuation_container_worker_count: 10 # the count of the concurrent container evacuation workers for each shard
evacuation_object_worker_count: 10 # the count of the concurrent object evacuation workers per container
evacuation_shard_worker_count: 5 # amount of shards evacuees concurrently
shard: shard:
default: # section with the default shard parameters default: # section with the default shard parameters

View file

@ -168,9 +168,13 @@ morph:
Local storage engine configuration. Local storage engine configuration.
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| |-------------------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | | `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | | `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
| `evacuation_object_batch_size` | `int` | `100` | Amount of objects reading from metabase at once for evacuation. |
| `evacuation_container_worker_count` | `int` | `10` | The count of the concurrent container evacuation workers for each shard. |
| `evacuation_object_worker_count` | `int` | `10` | The count of the concurrent object evacuation workers per container. |
| `evacuation_shard_worker_count` | `int` | `5` | Amount of shards evacuees concurrently. |
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | | `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | | `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |

View file

@ -213,12 +213,22 @@ type cfg struct {
lowMem bool lowMem bool
containerSource atomic.Pointer[containerSource] containerSource atomic.Pointer[containerSource]
evacuationShardWorkerCount uint32
evacuationObjectWorkerCount uint32
evacuationObjectBatchSize uint32
evacuationContainerWorkerCount uint32
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
res := &cfg{ res := &cfg{
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20, shardPoolSize: 20,
evacuationShardWorkerCount: 5,
evacuationObjectWorkerCount: 10,
evacuationObjectBatchSize: 100,
evacuationContainerWorkerCount: 10,
} }
res.containerSource.Store(&containerSource{}) res.containerSource.Store(&containerSource{})
return res return res
@ -277,6 +287,34 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
} }
} }
// WithEvacuationShardWorkerCount returns an option to set the count of shards evacuees concurrently.
func WithEvacuationShardWorkerCount(count uint32) Option {
return func(c *cfg) {
c.evacuationShardWorkerCount = count
}
}
// WithEvacuationObjectWorkerCount returns an option to set the count of the concurrent object evacuation workers per container.
func WithEvacuationObjectWorkerCount(count uint32) Option {
return func(c *cfg) {
c.evacuationObjectWorkerCount = count
}
}
// WithEvacuationObjectBatchSize returns an option to set the count of object reading from metabase.
func WithEvacuationObjectBatchSize(count uint32) Option {
return func(c *cfg) {
c.evacuationObjectBatchSize = count
}
}
// WithEvacuationContainerWorkerCount returns an option to set the count of the concurrent container evacuation workers for each shard.
func WithEvacuationContainerWorkerCount(count uint32) Option {
return func(c *cfg) {
c.evacuationContainerWorkerCount = count
}
}
// SetContainerSource sets container source. // SetContainerSource sets container source.
func (e *StorageEngine) SetContainerSource(cs container.Source) { func (e *StorageEngine) SetContainerSource(cs container.Source) {
e.containerSource.Store(&containerSource{cs: cs}) e.containerSource.Store(&containerSource{cs: cs})

View file

@ -10,7 +10,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
@ -24,6 +23,7 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
var ( var (
@ -189,8 +189,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
return res return res
} }
const defaultEvacuateBatchSize = 100
type pooledShard struct { type pooledShard struct {
hashedShard hashedShard
pool util.WorkerPool pool util.WorkerPool
@ -287,12 +285,21 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return err return err
} }
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(int(e.cfg.evacuationShardWorkerCount))
for _, shardID := range shardIDs { for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { eg.Go(func() error {
if err = e.evacuateShard(egCtx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err return err
} }
return nil
})
}
err = eg.Wait()
if err != nil {
return err
} }
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
@ -344,55 +351,43 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
)) ))
defer span.End() defer span.End()
eg, egCtx := errgroup.WithContext(ctx)
if prm.Scope.WithObjects() { if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { eg.Go(func() error {
return err return e.evacuateShardObjects(egCtx, shardID, prm, res, shards, shardsToEvacuate)
})
} }
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { eg.Go(func() error {
return err return e.evacuateShardTrees(egCtx, shardID, prm, res, shards, shardsToEvacuate)
} })
} }
return nil return eg.Wait()
} }
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error { ) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
sh := shardsToEvacuate[shardID] sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true) sh.SetEvacuationInProgress(true)
var c *meta.Cursor var listPrm shard.ListConcurrentlyPrm
for { listPrm.BatchSize = e.cfg.evacuationObjectBatchSize
listPrm.WithCursor(c) listPrm.Handler = func(ctx context.Context, addrList []object.Info) error {
return e.evacuateObjects(ctx, sh, addrList, prm, res, shards, shardsToEvacuate)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(ctx, listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
} }
listPrm.ObjectsWorkers = e.cfg.evacuationObjectWorkerCount
listPrm.ContainersWorkers = e.cfg.evacuationContainerWorkerCount
err := sh.ListConcurrently(ctx, listPrm)
if err != nil {
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx))) zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
}
return err return err
} }
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
return err
}
c = listRes.Cursor()
}
return nil
}
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error { ) error {

View file

@ -29,7 +29,9 @@ import (
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
dir := t.TempDir() dir := t.TempDir()
te := testNewEngine(t). te := testNewEngine(t,
WithEvacuationObjectBatchSize(1), WithEvacuationShardWorkerCount(2),
WithEvacuationContainerWorkerCount(2), WithEvacuationObjectWorkerCount(2)).
setShardsNumOpts(t, shardNum, func(id int) []shard.Option { setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
return []shard.Option{ return []shard.Option{
shard.WithLogger(test.NewLogger(t)), shard.WithLogger(test.NewLogger(t)),

View file

@ -14,6 +14,7 @@ import (
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
) )
// ErrEndOfListing is returned from object listing with cursor // ErrEndOfListing is returned from object listing with cursor
@ -61,6 +62,18 @@ func (l ListRes) Cursor() *Cursor {
return l.cursor return l.cursor
} }
// ListConcurrentlyPrm contains parameters for ListWithCursor operation.
type ListConcurrentlyPrm struct {
// Handler concurrently executed upon objects in db.
Handler func(context.Context, []objectcore.Info) error
// BatchSize maximum amount of addresses that will be passed to Handler.
BatchSize uint32
// ContainersWorkers amount of containers computed concurrently.
ContainersWorker uint32
// ObjectsWorkers amount of workers runs Handler concurrently for each container.
ObjectsWorker uint32
}
// ListWithCursor lists physical objects available in metabase starting from // ListWithCursor lists physical objects available in metabase starting from
// cursor. Includes objects of all types. Does not include inhumed objects. // cursor. Includes objects of all types. Does not include inhumed objects.
// Use cursor value from response for consecutive requests. // Use cursor value from response for consecutive requests.
@ -259,3 +272,149 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte)
return rawID, name[0] return rawID, name[0]
} }
// ListConcurrently lists physical objects available in metabase starting from first.
// Includes objects of all types. Does not include inhumed objects.
func (db *DB) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) error {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("ListConcurrently", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.ListConcurrently",
trace.WithAttributes(
attribute.Int("batch_size", int(prm.BatchSize)),
attribute.Bool("has_handler", prm.Handler != nil),
attribute.Int("objects_worker", int(prm.ObjectsWorker)),
attribute.Int("containers_worker", int(prm.ContainersWorker)),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
}
err := db.boltDB.View(func(tx *bbolt.Tx) error {
return db.listConcurrently(ctx, tx, prm)
})
success = err == nil
return metaerr.Wrap(err)
}
func (db *DB) listConcurrently(ctx context.Context, tx *bbolt.Tx, prm ListConcurrentlyPrm) error {
c := tx.Cursor()
name, _ := c.First()
var containerID cid.ID
graveyardBkt := tx.Bucket(graveyardBucketName)
garbageBkt := tx.Bucket(garbageBucketName)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(int(prm.ContainersWorker))
for ; name != nil; name, _ = c.Next() {
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
if cidRaw == nil {
continue
}
var objType objectSDK.Type
switch prefix {
case primaryPrefix:
objType = objectSDK.TypeRegular
case lockersPrefix:
objType = objectSDK.TypeLock
case tombstonePrefix:
objType = objectSDK.TypeTombstone
default:
continue
}
bkt := tx.Bucket(name)
if bkt != nil {
rawAddr := make([]byte, cidSize, addressKeySize)
copy(rawAddr, cidRaw)
var cnt cid.ID
copy(cnt[:], containerID[:])
eg.Go(func() error {
return selectConcurrentlyFromBucket(egCtx,
bkt, objType, graveyardBkt, garbageBkt, rawAddr, cnt, prm)
})
}
}
return eg.Wait()
}
// selectConcurrentlyFromBucket similar to selectAllFromBucket but process selected objects concurrently.
// Ignores inhumed objects.
func selectConcurrentlyFromBucket(ctx context.Context,
bkt *bbolt.Bucket, // main bucket
objType objectSDK.Type, // type of the objects stored in the main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization
cnt cid.ID, // container ID
prm ListConcurrentlyPrm,
) error {
c := bkt.Cursor()
k, v := c.First()
batch := make([]objectcore.Info, 0, prm.BatchSize)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(int(prm.ObjectsWorker))
for ; k != nil; k, v = c.Next() {
var obj oid.ID
if err := obj.Decode(k); err != nil {
break
}
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
var isLinkingObj bool
var ecInfo *objectcore.ECInfo
if objType == objectSDK.TypeRegular {
var o objectSDK.Object
if err := o.Unmarshal(v); err != nil {
return err
}
isLinkingObj = isLinkObject(&o)
ecHeader := o.ECHeader()
if ecHeader != nil {
ecInfo = &objectcore.ECInfo{
ParentID: ecHeader.Parent(),
Index: ecHeader.Index(),
Total: ecHeader.Total(),
}
}
}
var a oid.Address
a.SetContainer(cnt)
a.SetObject(obj)
batch = append(batch, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
if len(batch) >= int(prm.BatchSize) {
addrs := make([]objectcore.Info, 0, len(batch))
addrs = append(addrs, batch...)
eg.Go(func() error {
return prm.Handler(egCtx, addrs)
})
batch = batch[:0]
}
}
if len(batch) > 0 {
eg.Go(func() error {
return prm.Handler(egCtx, batch)
})
}
return eg.Wait()
}

View file

@ -34,6 +34,18 @@ func (r ListContainersRes) Containers() []cid.ID {
return r.containers return r.containers
} }
// ListConcurrentlyPrm contains parameters for ListWithCursor operation.
type ListConcurrentlyPrm struct {
// Handler concurrently executed upon objects in db.
Handler func(context.Context, []objectcore.Info) error
// BatchSize maximum amount of addresses that will be passed to Handler.
BatchSize uint32
// ContainersWorkers amount of containers computed concurrently.
ContainersWorkers uint32
// ObjectsWorkers amount of workers runs Handler concurrently
ObjectsWorkers uint32
}
// ListWithCursorPrm contains parameters for ListWithCursor operation. // ListWithCursorPrm contains parameters for ListWithCursor operation.
type ListWithCursorPrm struct { type ListWithCursorPrm struct {
count uint32 count uint32
@ -164,3 +176,32 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
cursor: res.Cursor(), cursor: res.Cursor(),
}, nil }, nil
} }
// ListConcurrently lists physical objects available in shard starting from first.
// Includes regular, tombstone and storage group objects. Does not include inhumed objects.
func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) error {
_, span := tracing.StartSpanFromContext(ctx, "shard.ListConcurrently",
trace.WithAttributes(
attribute.Int64("batch_size", int64(prm.BatchSize)),
attribute.Bool("has_handler", prm.Handler != nil),
attribute.Int("objects_workers", int(prm.ObjectsWorkers)),
attribute.Int("containers_workers", int(prm.ContainersWorkers)),
))
defer span.End()
if s.GetMode().NoMetabase() {
return ErrDegradedMode
}
var metaPrm meta.ListConcurrentlyPrm
metaPrm.BatchSize = prm.BatchSize
metaPrm.Handler = prm.Handler
metaPrm.ContainersWorker = prm.ContainersWorkers
metaPrm.ObjectsWorker = prm.ObjectsWorkers
err := s.metaBase.ListConcurrently(ctx, metaPrm)
if err != nil {
return fmt.Errorf("could not list objects concurrently: %w", err)
}
return nil
}