[#1356] engine: Evacuate object from shards concurrently
Some checks failed
DCO action / DCO (pull_request) Successful in 56s
Tests and linters / Run gofumpt (pull_request) Successful in 1m13s
Vulncheck / Vulncheck (pull_request) Successful in 1m20s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m9s
Build / Build Components (pull_request) Successful in 2m17s
Tests and linters / gopls check (pull_request) Successful in 2m36s
Tests and linters / Staticcheck (pull_request) Successful in 2m42s
Tests and linters / Lint (pull_request) Successful in 3m22s
Tests and linters / Tests (pull_request) Successful in 4m11s
Tests and linters / Tests with -race (pull_request) Failing after 4m46s
Some checks failed
DCO action / DCO (pull_request) Successful in 56s
Tests and linters / Run gofumpt (pull_request) Successful in 1m13s
Vulncheck / Vulncheck (pull_request) Successful in 1m20s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m9s
Build / Build Components (pull_request) Successful in 2m17s
Tests and linters / gopls check (pull_request) Successful in 2m36s
Tests and linters / Staticcheck (pull_request) Successful in 2m42s
Tests and linters / Lint (pull_request) Successful in 3m22s
Tests and linters / Tests (pull_request) Successful in 4m11s
Tests and linters / Tests with -race (pull_request) Failing after 4m46s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
d4bec24c9f
commit
af6139b981
11 changed files with 360 additions and 51 deletions
|
@ -110,6 +110,11 @@ type applicationConfiguration struct {
|
||||||
shardPoolSize uint32
|
shardPoolSize uint32
|
||||||
shards []shardCfg
|
shards []shardCfg
|
||||||
lowMem bool
|
lowMem bool
|
||||||
|
|
||||||
|
evacuationShardWorkerCount uint32
|
||||||
|
evacuationObjectWorkerCount uint32
|
||||||
|
evacuationContainerWorkerCount uint32
|
||||||
|
evacuationObjectBatchSize uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// if need to run node in compatibility with other versions mode
|
// if need to run node in compatibility with other versions mode
|
||||||
|
@ -228,6 +233,10 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
||||||
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
||||||
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
||||||
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
||||||
|
a.EngineCfg.evacuationShardWorkerCount = engineconfig.EngineEvacuationShardWorkerCount(c)
|
||||||
|
a.EngineCfg.evacuationObjectWorkerCount = engineconfig.EngineEvacuationObjectWorkerCount(c)
|
||||||
|
a.EngineCfg.evacuationObjectBatchSize = engineconfig.EngineEvacuationObjectBatchSize(c)
|
||||||
|
a.EngineCfg.evacuationContainerWorkerCount = engineconfig.EngineEvacuationContainerWorkerCount(c)
|
||||||
|
|
||||||
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
||||||
}
|
}
|
||||||
|
@ -829,6 +838,10 @@ func (c *cfg) engineOpts() []engine.Option {
|
||||||
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
||||||
engine.WithLogger(c.log),
|
engine.WithLogger(c.log),
|
||||||
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
||||||
|
engine.WithEvacuationShardWorkerCount(c.EngineCfg.evacuationShardWorkerCount),
|
||||||
|
engine.WithEvacuationObjectWorkerCount(c.EngineCfg.evacuationObjectWorkerCount),
|
||||||
|
engine.WithEvacuationObjectBatchSize(c.EngineCfg.evacuationObjectBatchSize),
|
||||||
|
engine.WithEvacuationContainerWorkerCount(c.EngineCfg.evacuationContainerWorkerCount),
|
||||||
)
|
)
|
||||||
|
|
||||||
if c.metricsCollector != nil {
|
if c.metricsCollector != nil {
|
||||||
|
|
|
@ -15,6 +15,18 @@ const (
|
||||||
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
||||||
// process object PUT operations in a storage engine.
|
// process object PUT operations in a storage engine.
|
||||||
ShardPoolSizeDefault = 20
|
ShardPoolSizeDefault = 20
|
||||||
|
// EvacuationShardWorkerCountDefault is a default value of the count of shards
|
||||||
|
// evacuees concurrently.
|
||||||
|
EvacuationShardWorkerCountDefault = 5
|
||||||
|
// EvacuationContainerWorkerCountDefault is a default value of the count of
|
||||||
|
// concurrent container evacuation workers per shard.
|
||||||
|
EvacuationContainerWorkerCountDefault = 10
|
||||||
|
// EvacuationObjectWorkerCountDefault is a default value of the count of
|
||||||
|
// concurrent object evacuation workers for each container.
|
||||||
|
EvacuationObjectWorkerCountDefault = 10
|
||||||
|
// EvacuationObjectBatchSizeDefault is a default value of the count of
|
||||||
|
// objects reading from metabase.
|
||||||
|
EvacuationObjectBatchSizeDefault = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
|
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
|
||||||
|
@ -88,3 +100,35 @@ func ShardErrorThreshold(c *config.Config) uint32 {
|
||||||
func EngineLowMemoryConsumption(c *config.Config) bool {
|
func EngineLowMemoryConsumption(c *config.Config) bool {
|
||||||
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EngineEvacuationShardWorkerCount returns value of "evacuation_shard_worker_count" config parameter from "storage" section.
|
||||||
|
func EngineEvacuationShardWorkerCount(c *config.Config) uint32 {
|
||||||
|
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_shard_worker_count"); v > 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return EvacuationShardWorkerCountDefault
|
||||||
|
}
|
||||||
|
|
||||||
|
// EngineEvacuationObjectWorkerCount returns value of "evacuation_object_worker_count" config parameter from "storage" section.
|
||||||
|
func EngineEvacuationObjectWorkerCount(c *config.Config) uint32 {
|
||||||
|
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_worker_count"); v > 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return EvacuationObjectWorkerCountDefault
|
||||||
|
}
|
||||||
|
|
||||||
|
// EngineEvacuationContainerWorkerCount returns value of "evacuation_container_worker_count" config parameter from "storage" section.
|
||||||
|
func EngineEvacuationContainerWorkerCount(c *config.Config) uint32 {
|
||||||
|
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_container_worker_count"); v > 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return EvacuationContainerWorkerCountDefault
|
||||||
|
}
|
||||||
|
|
||||||
|
// EngineEvacuationObjectBatchSize returns value of "evacuation_object_batch_size" config parameter from "storage" section.
|
||||||
|
func EngineEvacuationObjectBatchSize(c *config.Config) uint32 {
|
||||||
|
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_batch_size"); v > 0 {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return EvacuationObjectBatchSizeDefault
|
||||||
|
}
|
||||||
|
|
|
@ -91,6 +91,10 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
||||||
# Storage engine section
|
# Storage engine section
|
||||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||||
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
||||||
|
EVACUATION_SHARD_WORKER_COUNT=5
|
||||||
|
EVACUATION_OBJECT_WORKER_COUNT=10
|
||||||
|
EVACUATION_CONTAINER_WORKER_COUNT=10
|
||||||
|
EVACUATION_OBJECT_BATCH_SIZE=100
|
||||||
## 0 shard
|
## 0 shard
|
||||||
### Flag to refill Metabase from BlobStor
|
### Flag to refill Metabase from BlobStor
|
||||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
||||||
|
|
|
@ -136,6 +136,10 @@
|
||||||
"storage": {
|
"storage": {
|
||||||
"shard_pool_size": 15,
|
"shard_pool_size": 15,
|
||||||
"shard_ro_error_threshold": 100,
|
"shard_ro_error_threshold": 100,
|
||||||
|
"evacuation_object_batch_size": 100,
|
||||||
|
"evacuation_container_worker_count": 10,
|
||||||
|
"evacuation_object_worker_count": 10,
|
||||||
|
"evacuation_shard_worker_count": 5,
|
||||||
"shard": {
|
"shard": {
|
||||||
"0": {
|
"0": {
|
||||||
"mode": "read-only",
|
"mode": "read-only",
|
||||||
|
|
|
@ -118,6 +118,10 @@ storage:
|
||||||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||||
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
|
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
|
||||||
|
evacuation_object_batch_size: 100 # amount of objects reading from metabase at once for evacuation
|
||||||
|
evacuation_container_worker_count: 10 # the count of the concurrent container evacuation workers for each shard
|
||||||
|
evacuation_object_worker_count: 10 # the count of the concurrent object evacuation workers per container
|
||||||
|
evacuation_shard_worker_count: 5 # amount of shards evacuees concurrently
|
||||||
|
|
||||||
shard:
|
shard:
|
||||||
default: # section with the default shard parameters
|
default: # section with the default shard parameters
|
||||||
|
|
|
@ -167,12 +167,16 @@ morph:
|
||||||
|
|
||||||
Local storage engine configuration.
|
Local storage engine configuration.
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
|
|-------------------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
|
||||||
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
|
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
|
||||||
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
|
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
|
||||||
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
|
| `evacuation_object_batch_size` | `int` | `100` | Amount of objects reading from metabase at once for evacuation. |
|
||||||
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
|
| `evacuation_container_worker_count` | `int` | `10` | The count of the concurrent container evacuation workers for each shard. |
|
||||||
|
| `evacuation_object_worker_count` | `int` | `10` | The count of the concurrent object evacuation workers per container. |
|
||||||
|
| `evacuation_shard_worker_count` | `int` | `5` | Amount of shards evacuees concurrently. |
|
||||||
|
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
|
||||||
|
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
|
||||||
|
|
||||||
## `shard` subsection
|
## `shard` subsection
|
||||||
|
|
||||||
|
|
|
@ -213,12 +213,22 @@ type cfg struct {
|
||||||
lowMem bool
|
lowMem bool
|
||||||
|
|
||||||
containerSource atomic.Pointer[containerSource]
|
containerSource atomic.Pointer[containerSource]
|
||||||
|
|
||||||
|
evacuationShardWorkerCount uint32
|
||||||
|
evacuationObjectWorkerCount uint32
|
||||||
|
evacuationObjectBatchSize uint32
|
||||||
|
evacuationContainerWorkerCount uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
res := &cfg{
|
res := &cfg{
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
shardPoolSize: 20,
|
shardPoolSize: 20,
|
||||||
|
|
||||||
|
evacuationShardWorkerCount: 5,
|
||||||
|
evacuationObjectWorkerCount: 10,
|
||||||
|
evacuationObjectBatchSize: 100,
|
||||||
|
evacuationContainerWorkerCount: 10,
|
||||||
}
|
}
|
||||||
res.containerSource.Store(&containerSource{})
|
res.containerSource.Store(&containerSource{})
|
||||||
return res
|
return res
|
||||||
|
@ -277,6 +287,34 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithEvacuationShardWorkerCount returns an option to set the count of shards evacuees concurrently.
|
||||||
|
func WithEvacuationShardWorkerCount(count uint32) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.evacuationShardWorkerCount = count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithEvacuationObjectWorkerCount returns an option to set the count of the concurrent object evacuation workers per container.
|
||||||
|
func WithEvacuationObjectWorkerCount(count uint32) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.evacuationObjectWorkerCount = count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithEvacuationObjectBatchSize returns an option to set the count of object reading from metabase.
|
||||||
|
func WithEvacuationObjectBatchSize(count uint32) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.evacuationObjectBatchSize = count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithEvacuationContainerWorkerCount returns an option to set the count of the concurrent container evacuation workers for each shard.
|
||||||
|
func WithEvacuationContainerWorkerCount(count uint32) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.evacuationContainerWorkerCount = count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// SetContainerSource sets container source.
|
// SetContainerSource sets container source.
|
||||||
func (e *StorageEngine) SetContainerSource(cs container.Source) {
|
func (e *StorageEngine) SetContainerSource(cs container.Source) {
|
||||||
e.containerSource.Store(&containerSource{cs: cs})
|
e.containerSource.Store(&containerSource{cs: cs})
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
|
@ -24,6 +23,7 @@ import (
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -189,8 +189,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultEvacuateBatchSize = 100
|
|
||||||
|
|
||||||
type pooledShard struct {
|
type pooledShard struct {
|
||||||
hashedShard
|
hashedShard
|
||||||
pool util.WorkerPool
|
pool util.WorkerPool
|
||||||
|
@ -287,12 +285,21 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
eg.SetLimit(int(e.cfg.evacuationShardWorkerCount))
|
||||||
for _, shardID := range shardIDs {
|
for _, shardID := range shardIDs {
|
||||||
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
eg.Go(func() error {
|
||||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
if err = e.evacuateShard(egCtx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
||||||
return err
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
||||||
}
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err = eg.Wait()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
|
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
|
||||||
|
@ -344,53 +351,41 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
if prm.Scope.WithObjects() {
|
if prm.Scope.WithObjects() {
|
||||||
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
eg.Go(func() error {
|
||||||
return err
|
return e.evacuateShardObjects(egCtx, shardID, prm, res, shards, shardsToEvacuate)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
|
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
|
||||||
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
eg.Go(func() error {
|
||||||
return err
|
return e.evacuateShardTrees(egCtx, shardID, prm, res, shards, shardsToEvacuate)
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return eg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||||
) error {
|
) error {
|
||||||
var listPrm shard.ListWithCursorPrm
|
|
||||||
listPrm.WithCount(defaultEvacuateBatchSize)
|
|
||||||
|
|
||||||
sh := shardsToEvacuate[shardID]
|
sh := shardsToEvacuate[shardID]
|
||||||
sh.SetEvacuationInProgress(true)
|
sh.SetEvacuationInProgress(true)
|
||||||
|
|
||||||
var c *meta.Cursor
|
var listPrm shard.ListConcurrentlyPrm
|
||||||
for {
|
listPrm.BatchSize = e.cfg.evacuationObjectBatchSize
|
||||||
listPrm.WithCursor(c)
|
listPrm.Handler = func(ctx context.Context, addrList []object.Info) error {
|
||||||
|
return e.evacuateObjects(ctx, sh, addrList, prm, res, shards, shardsToEvacuate)
|
||||||
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
|
|
||||||
// because ListWithCursor works only with the metabase.
|
|
||||||
listRes, err := sh.ListWithCursor(ctx, listPrm)
|
|
||||||
if err != nil {
|
|
||||||
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
|
||||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
c = listRes.Cursor()
|
|
||||||
}
|
}
|
||||||
return nil
|
listPrm.ObjectWorkerCount = e.cfg.evacuationObjectWorkerCount
|
||||||
|
listPrm.ContainerWorkerCount = e.cfg.evacuationContainerWorkerCount
|
||||||
|
|
||||||
|
err := sh.ListConcurrently(ctx, listPrm)
|
||||||
|
if err != nil {
|
||||||
|
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
||||||
|
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -29,7 +30,9 @@ import (
|
||||||
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
|
|
||||||
te := testNewEngine(t).
|
te := testNewEngine(t,
|
||||||
|
WithEvacuationObjectBatchSize(1), WithEvacuationShardWorkerCount(2),
|
||||||
|
WithEvacuationContainerWorkerCount(2), WithEvacuationObjectWorkerCount(2)).
|
||||||
setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
|
setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
|
||||||
return []shard.Option{
|
return []shard.Option{
|
||||||
shard.WithLogger(test.NewLogger(t)),
|
shard.WithLogger(test.NewLogger(t)),
|
||||||
|
@ -174,13 +177,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
||||||
errReplication := errors.New("handler error")
|
errReplication := errors.New("handler error")
|
||||||
|
|
||||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
||||||
var n uint64
|
var n atomic.Uint64
|
||||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
||||||
if n == max {
|
if n.Load() == max {
|
||||||
return false, errReplication
|
return false, errReplication
|
||||||
}
|
}
|
||||||
|
|
||||||
n++
|
n.Add(1)
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
if addr == objectCore.AddressOf(objects[i]) {
|
if addr == objectCore.AddressOf(objects[i]) {
|
||||||
require.Equal(t, objects[i], obj)
|
require.Equal(t, objects[i], obj)
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrEndOfListing is returned from object listing with cursor
|
// ErrEndOfListing is returned from object listing with cursor
|
||||||
|
@ -61,6 +62,18 @@ func (l ListRes) Cursor() *Cursor {
|
||||||
return l.cursor
|
return l.cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListConcurrentlyPrm contains parameters for ListWithCursor operation.
|
||||||
|
type ListConcurrentlyPrm struct {
|
||||||
|
// Handler concurrently executed upon objects in db.
|
||||||
|
Handler func(context.Context, []objectcore.Info) error
|
||||||
|
// BatchSize maximum amount of addresses that will be passed to Handler.
|
||||||
|
BatchSize uint32
|
||||||
|
// ContainerWorkerCount amount of containers computed concurrently.
|
||||||
|
ContainerWorkerCount uint32
|
||||||
|
// ObjectWorkerCount amount of workers runs Handler concurrently for each container.
|
||||||
|
ObjectWorkerCount uint32
|
||||||
|
}
|
||||||
|
|
||||||
// ListWithCursor lists physical objects available in metabase starting from
|
// ListWithCursor lists physical objects available in metabase starting from
|
||||||
// cursor. Includes objects of all types. Does not include inhumed objects.
|
// cursor. Includes objects of all types. Does not include inhumed objects.
|
||||||
// Use cursor value from response for consecutive requests.
|
// Use cursor value from response for consecutive requests.
|
||||||
|
@ -259,3 +272,149 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte)
|
||||||
|
|
||||||
return rawID, name[0]
|
return rawID, name[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListConcurrently lists physical objects available in metabase starting from first.
|
||||||
|
// Includes objects of all types. Does not include inhumed objects.
|
||||||
|
func (db *DB) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) error {
|
||||||
|
var (
|
||||||
|
startedAt = time.Now()
|
||||||
|
success = false
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
db.metrics.AddMethodDuration("ListConcurrently", time.Since(startedAt), success)
|
||||||
|
}()
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.ListConcurrently",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Int("batch_size", int(prm.BatchSize)),
|
||||||
|
attribute.Bool("has_handler", prm.Handler != nil),
|
||||||
|
attribute.Int("object_worker_count", int(prm.ObjectWorkerCount)),
|
||||||
|
attribute.Int("container_worker_count", int(prm.ContainerWorkerCount)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
db.modeMtx.RLock()
|
||||||
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
if db.mode.NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
return db.listConcurrently(ctx, tx, prm)
|
||||||
|
})
|
||||||
|
success = err == nil
|
||||||
|
return metaerr.Wrap(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db *DB) listConcurrently(ctx context.Context, tx *bbolt.Tx, prm ListConcurrentlyPrm) error {
|
||||||
|
c := tx.Cursor()
|
||||||
|
name, _ := c.First()
|
||||||
|
|
||||||
|
var containerID cid.ID
|
||||||
|
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||||
|
garbageBkt := tx.Bucket(garbageBucketName)
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
eg.SetLimit(int(prm.ContainerWorkerCount))
|
||||||
|
|
||||||
|
for ; name != nil; name, _ = c.Next() {
|
||||||
|
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
||||||
|
if cidRaw == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var objType objectSDK.Type
|
||||||
|
|
||||||
|
switch prefix {
|
||||||
|
case primaryPrefix:
|
||||||
|
objType = objectSDK.TypeRegular
|
||||||
|
case lockersPrefix:
|
||||||
|
objType = objectSDK.TypeLock
|
||||||
|
case tombstonePrefix:
|
||||||
|
objType = objectSDK.TypeTombstone
|
||||||
|
default:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
bkt := tx.Bucket(name)
|
||||||
|
if bkt != nil {
|
||||||
|
rawAddr := make([]byte, cidSize, addressKeySize)
|
||||||
|
copy(rawAddr, cidRaw)
|
||||||
|
var cnt cid.ID
|
||||||
|
copy(cnt[:], containerID[:])
|
||||||
|
eg.Go(func() error {
|
||||||
|
return selectConcurrentlyFromBucket(egCtx,
|
||||||
|
bkt, objType, graveyardBkt, garbageBkt, rawAddr, cnt, prm)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return eg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// selectConcurrentlyFromBucket similar to selectAllFromBucket but process selected objects concurrently.
|
||||||
|
// Ignores inhumed objects.
|
||||||
|
func selectConcurrentlyFromBucket(ctx context.Context,
|
||||||
|
bkt *bbolt.Bucket, // main bucket
|
||||||
|
objType objectSDK.Type, // type of the objects stored in the main bucket
|
||||||
|
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
|
||||||
|
cidRaw []byte, // container ID prefix, optimization
|
||||||
|
cnt cid.ID, // container ID
|
||||||
|
prm ListConcurrentlyPrm,
|
||||||
|
) error {
|
||||||
|
c := bkt.Cursor()
|
||||||
|
k, v := c.First()
|
||||||
|
batch := make([]objectcore.Info, 0, prm.BatchSize)
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
eg.SetLimit(int(prm.ObjectWorkerCount))
|
||||||
|
|
||||||
|
for ; k != nil; k, v = c.Next() {
|
||||||
|
var obj oid.ID
|
||||||
|
if err := obj.Decode(k); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
var isLinkingObj bool
|
||||||
|
var ecInfo *objectcore.ECInfo
|
||||||
|
if objType == objectSDK.TypeRegular {
|
||||||
|
var o objectSDK.Object
|
||||||
|
if err := o.Unmarshal(v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
isLinkingObj = isLinkObject(&o)
|
||||||
|
ecHeader := o.ECHeader()
|
||||||
|
if ecHeader != nil {
|
||||||
|
ecInfo = &objectcore.ECInfo{
|
||||||
|
ParentID: ecHeader.Parent(),
|
||||||
|
Index: ecHeader.Index(),
|
||||||
|
Total: ecHeader.Total(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var a oid.Address
|
||||||
|
a.SetContainer(cnt)
|
||||||
|
a.SetObject(obj)
|
||||||
|
batch = append(batch, objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo})
|
||||||
|
if len(batch) >= int(prm.BatchSize) {
|
||||||
|
addrs := make([]objectcore.Info, 0, len(batch))
|
||||||
|
addrs = append(addrs, batch...)
|
||||||
|
eg.Go(func() error {
|
||||||
|
return prm.Handler(egCtx, addrs)
|
||||||
|
})
|
||||||
|
batch = batch[:0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(batch) > 0 {
|
||||||
|
eg.Go(func() error {
|
||||||
|
return prm.Handler(egCtx, batch)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return eg.Wait()
|
||||||
|
}
|
||||||
|
|
|
@ -34,6 +34,18 @@ func (r ListContainersRes) Containers() []cid.ID {
|
||||||
return r.containers
|
return r.containers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListConcurrentlyPrm contains parameters for ListWithCursor operation.
|
||||||
|
type ListConcurrentlyPrm struct {
|
||||||
|
// Handler concurrently executed upon objects in db.
|
||||||
|
Handler func(context.Context, []objectcore.Info) error
|
||||||
|
// BatchSize maximum amount of addresses that will be passed to Handler.
|
||||||
|
BatchSize uint32
|
||||||
|
// ContainerWorkerCount amount of containers computed concurrently.
|
||||||
|
ContainerWorkerCount uint32
|
||||||
|
// ObjectWorkerCount amount of workers runs Handler concurrently
|
||||||
|
ObjectWorkerCount uint32
|
||||||
|
}
|
||||||
|
|
||||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||||
type ListWithCursorPrm struct {
|
type ListWithCursorPrm struct {
|
||||||
count uint32
|
count uint32
|
||||||
|
@ -164,3 +176,32 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
|
||||||
cursor: res.Cursor(),
|
cursor: res.Cursor(),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListConcurrently lists physical objects available in shard starting from first.
|
||||||
|
// Includes regular, tombstone and storage group objects. Does not include inhumed objects.
|
||||||
|
func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) error {
|
||||||
|
_, span := tracing.StartSpanFromContext(ctx, "shard.ListConcurrently",
|
||||||
|
trace.WithAttributes(
|
||||||
|
attribute.Int64("batch_size", int64(prm.BatchSize)),
|
||||||
|
attribute.Bool("has_handler", prm.Handler != nil),
|
||||||
|
attribute.Int("object_worker_count", int(prm.ObjectWorkerCount)),
|
||||||
|
attribute.Int("container_worker_count", int(prm.ContainerWorkerCount)),
|
||||||
|
))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
if s.GetMode().NoMetabase() {
|
||||||
|
return ErrDegradedMode
|
||||||
|
}
|
||||||
|
|
||||||
|
var metaPrm meta.ListConcurrentlyPrm
|
||||||
|
metaPrm.BatchSize = prm.BatchSize
|
||||||
|
metaPrm.Handler = prm.Handler
|
||||||
|
metaPrm.ContainerWorkerCount = prm.ContainerWorkerCount
|
||||||
|
metaPrm.ObjectWorkerCount = prm.ObjectWorkerCount
|
||||||
|
err := s.metaBase.ListConcurrently(ctx, metaPrm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not list objects concurrently: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue