From eada9bd28b8c2bc26e8786bb1f0d7ff531069a57 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Wed, 18 Sep 2024 12:15:32 +0300 Subject: [PATCH] [#1356] engine: Evacuate object from shards concurrently Signed-off-by: Anton Nikiforov --- cmd/frostfs-node/config.go | 7 + cmd/frostfs-node/config/engine/config.go | 22 ++ config/example/node.env | 2 + config/example/node.json | 4 + config/example/node.yaml | 2 + docs/storage-node-configuration.md | 14 +- pkg/local_object_storage/engine/engine.go | 20 ++ pkg/local_object_storage/engine/evacuate.go | 229 ++++++++++-------- .../engine/evacuate_test.go | 14 +- pkg/local_object_storage/metabase/list.go | 168 +++++++++++++ pkg/local_object_storage/shard/list.go | 59 +++++ 11 files changed, 430 insertions(+), 111 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index ed3a65c25..33ba5f2bc 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -110,6 +110,9 @@ type applicationConfiguration struct { shardPoolSize uint32 shards []shardCfg lowMem bool + + evacuationObjectWorkerCount uint32 + evacuationContainerWorkerCount uint32 } // if need to run node in compatibility with other versions mode @@ -228,6 +231,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) + a.EngineCfg.evacuationObjectWorkerCount = engineconfig.EngineEvacuationObjectWorkerCount(c) + a.EngineCfg.evacuationContainerWorkerCount = engineconfig.EngineEvacuationContainerWorkerCount(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) } @@ -829,6 +834,8 @@ func (c *cfg) engineOpts() []engine.Option { engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), + engine.WithEvacuationObjectWorkerCount(c.EngineCfg.evacuationObjectWorkerCount), + engine.WithEvacuationContainerWorkerCount(c.EngineCfg.evacuationContainerWorkerCount), ) if c.metricsCollector != nil { diff --git a/cmd/frostfs-node/config/engine/config.go b/cmd/frostfs-node/config/engine/config.go index c944d1c58..e5fe638b1 100644 --- a/cmd/frostfs-node/config/engine/config.go +++ b/cmd/frostfs-node/config/engine/config.go @@ -15,6 +15,12 @@ const ( // ShardPoolSizeDefault is a default value of routine pool size per-shard to // process object PUT operations in a storage engine. ShardPoolSizeDefault = 20 + // EvacuationContainerWorkerCountDefault is a default value of the count of + // concurrent container evacuation workers. + EvacuationContainerWorkerCountDefault = 10 + // EvacuationObjectWorkerCountDefault is a default value of the count of + // concurrent object evacuation workers. + EvacuationObjectWorkerCountDefault = 10 ) // ErrNoShardConfigured is returned when at least 1 shard is required but none are found. @@ -88,3 +94,19 @@ func ShardErrorThreshold(c *config.Config) uint32 { func EngineLowMemoryConsumption(c *config.Config) bool { return config.BoolSafe(c.Sub(subsection), "low_mem") } + +// EngineEvacuationObjectWorkerCount returns value of "evacuation_object_worker_count" config parameter from "storage" section. +func EngineEvacuationObjectWorkerCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_worker_count"); v > 0 { + return v + } + return EvacuationObjectWorkerCountDefault +} + +// EngineEvacuationContainerWorkerCount returns value of "evacuation_container_worker_count" config parameter from "storage" section. +func EngineEvacuationContainerWorkerCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "evacuation_container_worker_count"); v > 0 { + return v + } + return EvacuationContainerWorkerCountDefault +} diff --git a/config/example/node.env b/config/example/node.env index 6618a981a..6f43ce1bc 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -91,6 +91,8 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 # Storage engine section FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 +EVACUATION_OBJECT_WORKER_COUNT=10 +EVACUATION_CONTAINER_WORKER_COUNT=10 ## 0 shard ### Flag to refill Metabase from BlobStor FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false diff --git a/config/example/node.json b/config/example/node.json index 0d100ed80..605e39877 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -136,6 +136,10 @@ "storage": { "shard_pool_size": 15, "shard_ro_error_threshold": 100, + "evacuation_object_batch_size": 100, + "evacuation_container_worker_count": 10, + "evacuation_object_worker_count": 10, + "evacuation_shard_worker_count": 5, "shard": { "0": { "mode": "read-only", diff --git a/config/example/node.yaml b/config/example/node.yaml index 86be35ba8..db0f380da 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -118,6 +118,8 @@ storage: # note: shard configuration can be omitted for relay node (see `node.relay`) shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) + evacuation_container_worker_count: 10 # the count of the concurrent container evacuation workers + evacuation_object_worker_count: 10 # the count of the concurrent object evacuation workers shard: default: # section with the default shard parameters diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index c74695e2b..519ffab0a 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -167,12 +167,14 @@ morph: Local storage engine configuration. -| Parameter | Type | Default value | Description | -|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| -| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | -| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | -| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | -| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | +| Parameter | Type | Default value | Description | +|-------------------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------| +| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. | +| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. | +| `evacuation_container_worker_count` | `int` | `10` | The count of the concurrent container evacuation workers. | +| `evacuation_object_worker_count` | `int` | `10` | The count of the concurrent object evacuation workers. | +| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. | +| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. | ## `shard` subsection diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 5e883a641..4c78ee278 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -213,12 +213,18 @@ type cfg struct { lowMem bool containerSource atomic.Pointer[containerSource] + + evacuationObjectWorkerCount uint32 + evacuationContainerWorkerCount uint32 } func defaultCfg() *cfg { res := &cfg{ log: &logger.Logger{Logger: zap.L()}, shardPoolSize: 20, + + evacuationObjectWorkerCount: 10, + evacuationContainerWorkerCount: 10, } res.containerSource.Store(&containerSource{}) return res @@ -277,6 +283,20 @@ func WithLowMemoryConsumption(lowMemCons bool) Option { } } +// WithEvacuationObjectWorkerCount returns an option to set the count of the concurrent object evacuation workers per container. +func WithEvacuationObjectWorkerCount(count uint32) Option { + return func(c *cfg) { + c.evacuationObjectWorkerCount = count + } +} + +// WithEvacuationContainerWorkerCount returns an option to set the count of the concurrent container evacuation workers for each shard. +func WithEvacuationContainerWorkerCount(count uint32) Option { + return func(c *cfg) { + c.evacuationContainerWorkerCount = count + } +} + // SetContainerSource sets container source. func (e *StorageEngine) SetContainerSource(cs container.Source) { e.containerSource.Store(&containerSource{cs: cs}) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 7bef6edfb..3ac551720 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -10,7 +10,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" - meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" @@ -24,6 +23,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var ( @@ -189,8 +189,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes { return res } -const defaultEvacuateBatchSize = 100 - type pooledShard struct { hashedShard pool util.WorkerPool @@ -242,8 +240,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev return nil, err } + var mtx sync.RWMutex + copyShards := func() []pooledShard { + mtx.RLock() + defer mtx.RUnlock() + t := make([]pooledShard, len(shards)) + copy(t, shards) + return t + } eg.Go(func() error { - return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate) + return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate) }) if prm.Async { @@ -261,7 +267,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context { } func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { var err error ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards", @@ -287,12 +293,38 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p return err } + egShard, _ := errgroup.WithContext(ctx) + + egContainer, _ := errgroup.WithContext(ctx) + egContainer.SetLimit(int(e.cfg.evacuationContainerWorkerCount)) + + egObject, _ := errgroup.WithContext(ctx) + egObject.SetLimit(int(e.cfg.evacuationObjectWorkerCount)) + for _, shardID := range shardIDs { - if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) - return err - } + egShard.Go(func() error { + if err := e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil { + e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope)) + return err + } + return nil + }) + } + errShard := egShard.Wait() + errContainer := egContainer.Wait() + errObject := egObject.Wait() + if errShard != nil { + err = errShard + return errShard + } + if errContainer != nil { + err = errContainer + return errContainer + } + if errObject != nil { + err = errObject + return errObject } e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation, @@ -336,7 +368,8 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha } func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + egContainer *errgroup.Group, egObject *errgroup.Group, ) error { ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard", trace.WithAttributes( @@ -344,59 +377,56 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E )) defer span.End() + eg, egCtx := errgroup.WithContext(ctx) if prm.Scope.WithObjects() { - if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - return err - } + eg.Go(func() error { + return e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject) + }) } - if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() { - if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil { - return err - } + eg.Go(func() error { + return e.evacuateShardTrees(egCtx, shardID, prm, res, shards, shardsToEvacuate) + }) } - return nil + return eg.Wait() } func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, + egContainer *errgroup.Group, egObject *errgroup.Group, ) error { - var listPrm shard.ListWithCursorPrm - listPrm.WithCount(defaultEvacuateBatchSize) - sh := shardsToEvacuate[shardID] - sh.SetEvacuationInProgress(true) - - var c *meta.Cursor - for { - listPrm.WithCursor(c) - - // TODO (@fyrchik): #1731 this approach doesn't work in degraded modes - // because ListWithCursor works only with the metabase. - listRes, err := sh.ListWithCursor(ctx, listPrm) - if err != nil { - if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { - break + var cntPrm shard.IterateOverContainersPrm + cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error { + egContainer.Go(func() error { + var objPrm shard.IterateOverObjectsInContainerPrm + objPrm.BucketName = name + objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error { + egObject.Go(func() error { + return e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate) + }) + return nil } - e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } - - if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil { - return err - } - - c = listRes.Cursor() + return sh.IterateOverObjectsInContainer(ctx, objPrm) + }) + return nil } - return nil + + sh.SetEvacuationInProgress(true) + err := sh.IterateOverContainers(ctx, cntPrm) + if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } + return err } func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, + getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { sh := shardsToEvacuate[shardID] + shards := getShards() var listPrm pilorama.TreeListTreesPrm first := true @@ -637,68 +667,65 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm) return shards, nil } -func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, - shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, +func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes, + getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard, ) error { - ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects", - trace.WithAttributes( - attribute.Int("objects_count", len(toEvacuate)), - )) + ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects") defer span.End() - for i := range toEvacuate { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - addr := toEvacuate[i].Address + select { + case <-ctx.Done(): + return ctx.Err() + default: + } - var getPrm shard.GetPrm - getPrm.SetAddress(addr) - getPrm.SkipEvacCheck(true) + shards := getShards() + addr := objInfo.Address - getRes, err := sh.Get(ctx, getPrm) - if err != nil { - if prm.IgnoreErrors { - res.objFailed.Add(1) - continue - } - e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } + var getPrm shard.GetPrm + getPrm.SetAddress(addr) + getPrm.SkipEvacCheck(true) - evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res) - if err != nil { - return err - } - - if evacuatedLocal { - continue - } - - if prm.ObjectsHandler == nil { - // Do not check ignoreErrors flag here because - // ignoring errors on put make this command kinda useless. - return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i]) - } - - moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) - if err != nil { - e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - return err - } - if moved { - res.objEvacuated.Add(1) - } else if prm.IgnoreErrors { + getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm) + if err != nil { + if prm.IgnoreErrors { res.objFailed.Add(1) - e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, - zap.String("trace_id", tracingPkg.GetTraceID(ctx))) - } else { - return fmt.Errorf("object %s was not replicated", addr) + return nil } + e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return err + } + + evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res) + if err != nil { + return err + } + + if evacuatedLocal { + return nil + } + + if prm.ObjectsHandler == nil { + // Do not check ignoreErrors flag here because + // ignoring errors on put make this command kinda useless. + return fmt.Errorf("%w: %s", errPutShard, objInfo) + } + + moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object()) + if err != nil { + e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + return err + } + if moved { + res.objEvacuated.Add(1) + } else if prm.IgnoreErrors { + res.objFailed.Add(1) + e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField, + zap.String("trace_id", tracingPkg.GetTraceID(ctx))) + } else { + return fmt.Errorf("object %s was not replicated", addr) } return nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 28529fab9..8fdc4daae 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -6,6 +6,8 @@ import ( "fmt" "path/filepath" "strconv" + "sync" + "sync/atomic" "testing" "time" @@ -29,7 +31,8 @@ import ( func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) { dir := t.TempDir() - te := testNewEngine(t). + te := testNewEngine(t, + WithEvacuationContainerWorkerCount(2), WithEvacuationObjectWorkerCount(2)). setShardsNumOpts(t, shardNum, func(id int) []shard.Option { return []shard.Option{ shard.WithLogger(test.NewLogger(t)), @@ -174,13 +177,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) { errReplication := errors.New("handler error") acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) { - var n uint64 + var n atomic.Uint64 return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) { - if n == max { + if n.Load() == max { return false, errReplication } - n++ + n.Add(1) for i := range objects { if addr == objectCore.AddressOf(objects[i]) { require.Equal(t, objects[i], obj) @@ -531,6 +534,7 @@ func TestEvacuateTreesRemote(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) + mutex := sync.Mutex{} evacuatedTreeOps := make(map[string][]*pilorama.Move) var prm EvacuateShardPrm prm.ShardID = ids @@ -545,7 +549,9 @@ func TestEvacuateTreesRemote(t *testing.T) { if op.Time == 0 { return true, "", nil } + mutex.Lock() evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op) + mutex.Unlock() height = op.Time + 1 } } diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index b4326a92c..34fff6bf6 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -61,6 +61,20 @@ func (l ListRes) Cursor() *Cursor { return l.cursor } +// IterateOverContainersPrm contains parameters for IterateOverContainers operation. +type IterateOverContainersPrm struct { + // Handler function executed upon containers in db. + Handler func(context.Context, []byte, cid.ID) error +} + +// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. +type IterateOverObjectsInContainerPrm struct { + // BucketName container's bucket name + BucketName []byte + // Handler function executed upon objects in db. + Handler func(context.Context, *objectcore.Info) error +} + // ListWithCursor lists physical objects available in metabase starting from // cursor. Includes objects of all types. Does not include inhumed objects. // Use cursor value from response for consecutive requests. @@ -259,3 +273,157 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) return rawID, name[0] } + +// IterateOverContainers lists physical containers available in metabase starting from first. +func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ListContainers", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.ListContainers", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateOverContainers(ctx, tx, prm) + }) + success = err == nil + return metaerr.Wrap(err) +} + +func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error { + c := tx.Cursor() + name, _ := c.First() + + var containerID cid.ID + for ; name != nil; name, _ = c.Next() { + cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name) + if cidRaw == nil { + continue + } + if prefix != primaryPrefix && prefix != lockersPrefix && prefix != tombstonePrefix { + continue + } + + bkt := tx.Bucket(name) + if bkt != nil { + bktName := make([]byte, len(name)) + copy(bktName, name) + var cnt cid.ID + copy(cnt[:], containerID[:]) + err := prm.Handler(ctx, bktName, cnt) + if err != nil { + return err + } + } + } + + return nil +} + +// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first. +func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ListContainers", time.Since(startedAt), success) + }() + _, span := tracing.StartSpanFromContext(ctx, "metabase.ListContainers", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateOverObjectsInContainer(ctx, tx, prm) + }) + success = err == nil + return metaerr.Wrap(err) +} + +func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error { + bkt := tx.Bucket(prm.BucketName) + graveyardBkt := tx.Bucket(graveyardBucketName) + garbageBkt := tx.Bucket(garbageBucketName) + c := bkt.Cursor() + k, v := c.First() + + var containerID cid.ID + cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName) + if cidRaw == nil { + return nil + } + + var objType objectSDK.Type + + switch prefix { + case primaryPrefix: + objType = objectSDK.TypeRegular + case lockersPrefix: + objType = objectSDK.TypeLock + case tombstonePrefix: + objType = objectSDK.TypeTombstone + default: + return nil + } + + for ; k != nil; k, v = c.Next() { + var obj oid.ID + if err := obj.Decode(k); err != nil { + break + } + + if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { + continue + } + + var isLinkingObj bool + var ecInfo *objectcore.ECInfo + if objType == objectSDK.TypeRegular { + var o objectSDK.Object + if err := o.Unmarshal(v); err != nil { + return err + } + isLinkingObj = isLinkObject(&o) + ecHeader := o.ECHeader() + if ecHeader != nil { + ecInfo = &objectcore.ECInfo{ + ParentID: ecHeader.Parent(), + Index: ecHeader.Index(), + Total: ecHeader.Total(), + } + } + } + + var a oid.Address + a.SetContainer(containerID) + a.SetObject(obj) + objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo} + err := prm.Handler(ctx, &objInfo) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 08ea81a0c..143bf23d4 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID { return r.containers } +// IterateOverContainersPrm contains parameters for IterateOverContainers operation. +type IterateOverContainersPrm struct { + // Handler function executed upon containers in db. + Handler func(context.Context, []byte, cid.ID) error +} + +// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation. +type IterateOverObjectsInContainerPrm struct { + // BucketName container's bucket name + BucketName []byte + // Handler function executed upon containers in db. + Handler func(context.Context, *objectcore.Info) error +} + // ListWithCursorPrm contains parameters for ListWithCursor operation. type ListWithCursorPrm struct { count uint32 @@ -164,3 +178,48 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List cursor: res.Cursor(), }, nil } + +// IterateOverContainers lists physical containers presented in shard. +func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error { + _, span := tracing.StartSpanFromContext(ctx, "shard.ListConcurrently", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + if s.GetMode().NoMetabase() { + return ErrDegradedMode + } + + var metaPrm meta.IterateOverContainersPrm + metaPrm.Handler = prm.Handler + err := s.metaBase.IterateOverContainers(ctx, metaPrm) + if err != nil { + return fmt.Errorf("could not iterate over containers: %w", err) + } + + return nil +} + +// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name. +func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error { + _, span := tracing.StartSpanFromContext(ctx, "shard.ListConcurrently", + trace.WithAttributes( + attribute.Bool("has_handler", prm.Handler != nil), + )) + defer span.End() + + if s.GetMode().NoMetabase() { + return ErrDegradedMode + } + + var metaPrm meta.IterateOverObjectsInContainerPrm + metaPrm.BucketName = prm.BucketName + metaPrm.Handler = prm.Handler + err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm) + if err != nil { + return fmt.Errorf("could not iterate over containers: %w", err) + } + + return nil +}