[#1356] engine: Evacuate object from shards concurrently
All checks were successful
DCO action / DCO (pull_request) Successful in 42s
Tests and linters / Run gofumpt (pull_request) Successful in 39s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m55s
Vulncheck / Vulncheck (pull_request) Successful in 1m51s
Build / Build Components (pull_request) Successful in 2m11s
Tests and linters / Staticcheck (pull_request) Successful in 2m11s
Tests and linters / gopls check (pull_request) Successful in 2m52s
Tests and linters / Lint (pull_request) Successful in 3m19s
Tests and linters / Tests (pull_request) Successful in 3m31s
Tests and linters / Tests with -race (pull_request) Successful in 5m11s
All checks were successful
DCO action / DCO (pull_request) Successful in 42s
Tests and linters / Run gofumpt (pull_request) Successful in 39s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m55s
Vulncheck / Vulncheck (pull_request) Successful in 1m51s
Build / Build Components (pull_request) Successful in 2m11s
Tests and linters / Staticcheck (pull_request) Successful in 2m11s
Tests and linters / gopls check (pull_request) Successful in 2m52s
Tests and linters / Lint (pull_request) Successful in 3m19s
Tests and linters / Tests (pull_request) Successful in 3m31s
Tests and linters / Tests with -race (pull_request) Successful in 5m11s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
c34b8acedd
commit
51e64c3101
15 changed files with 463 additions and 120 deletions
|
@ -21,6 +21,9 @@ const (
|
|||
noProgressFlag = "no-progress"
|
||||
scopeFlag = "scope"
|
||||
|
||||
containerWorkerCountFlag = "container-worker-count"
|
||||
objectWorkerCountFlag = "object-worker-count"
|
||||
|
||||
scopeAll = "all"
|
||||
scopeObjects = "objects"
|
||||
scopeTrees = "trees"
|
||||
|
@ -64,12 +67,16 @@ func startEvacuateShard(cmd *cobra.Command, _ []string) {
|
|||
pk := key.Get(cmd)
|
||||
|
||||
ignoreErrors, _ := cmd.Flags().GetBool(ignoreErrorsFlag)
|
||||
containerWorkerCount, _ := cmd.Flags().GetUint32(containerWorkerCountFlag)
|
||||
objectWorkerCount, _ := cmd.Flags().GetUint32(objectWorkerCountFlag)
|
||||
|
||||
req := &control.StartShardEvacuationRequest{
|
||||
Body: &control.StartShardEvacuationRequest_Body{
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
IgnoreErrors: ignoreErrors,
|
||||
Scope: getEvacuationScope(cmd),
|
||||
Shard_ID: getShardIDList(cmd),
|
||||
IgnoreErrors: ignoreErrors,
|
||||
Scope: getEvacuationScope(cmd),
|
||||
ContainerWorkerCount: containerWorkerCount,
|
||||
ObjectWorkerCount: objectWorkerCount,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -371,6 +378,8 @@ func initControlStartEvacuationShardCmd() {
|
|||
flags.String(scopeFlag, scopeAll, fmt.Sprintf("Evacuation scope; possible values: %s, %s, %s", scopeTrees, scopeObjects, scopeAll))
|
||||
flags.Bool(awaitFlag, false, "Block execution until evacuation is completed")
|
||||
flags.Bool(noProgressFlag, false, fmt.Sprintf("Print progress if %s provided", awaitFlag))
|
||||
flags.Uint32(containerWorkerCountFlag, 0, "Count of concurrent container evacuation workers")
|
||||
flags.Uint32(objectWorkerCountFlag, 0, "Count of concurrent object evacuation workers")
|
||||
|
||||
startEvacuationShardCmd.MarkFlagsMutuallyExclusive(shardIDFlag, shardAllFlag)
|
||||
}
|
||||
|
|
|
@ -110,6 +110,9 @@ type applicationConfiguration struct {
|
|||
shardPoolSize uint32
|
||||
shards []shardCfg
|
||||
lowMem bool
|
||||
|
||||
evacuationObjectWorkerCount uint32
|
||||
evacuationContainerWorkerCount uint32
|
||||
}
|
||||
|
||||
// if need to run node in compatibility with other versions mode
|
||||
|
@ -228,6 +231,8 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|||
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
||||
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
||||
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
||||
a.EngineCfg.evacuationObjectWorkerCount = engineconfig.EngineEvacuationObjectWorkerCount(c)
|
||||
a.EngineCfg.evacuationContainerWorkerCount = engineconfig.EngineEvacuationContainerWorkerCount(c)
|
||||
|
||||
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
||||
}
|
||||
|
@ -829,6 +834,8 @@ func (c *cfg) engineOpts() []engine.Option {
|
|||
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
||||
engine.WithLogger(c.log),
|
||||
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
||||
engine.WithEvacuationObjectWorkerCount(c.EngineCfg.evacuationObjectWorkerCount),
|
||||
engine.WithEvacuationContainerWorkerCount(c.EngineCfg.evacuationContainerWorkerCount),
|
||||
)
|
||||
|
||||
if c.metricsCollector != nil {
|
||||
|
|
|
@ -15,6 +15,12 @@ const (
|
|||
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
||||
// process object PUT operations in a storage engine.
|
||||
ShardPoolSizeDefault = 20
|
||||
// EvacuationContainerWorkerCountDefault is a default value of the count of
|
||||
// concurrent container evacuation workers.
|
||||
EvacuationContainerWorkerCountDefault = 10
|
||||
// EvacuationObjectWorkerCountDefault is a default value of the count of
|
||||
// concurrent object evacuation workers.
|
||||
EvacuationObjectWorkerCountDefault = 10
|
||||
)
|
||||
|
||||
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
|
||||
|
@ -88,3 +94,19 @@ func ShardErrorThreshold(c *config.Config) uint32 {
|
|||
func EngineLowMemoryConsumption(c *config.Config) bool {
|
||||
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
||||
}
|
||||
|
||||
// EngineEvacuationObjectWorkerCount returns value of "evacuation_object_worker_count" config parameter from "storage" section.
|
||||
func EngineEvacuationObjectWorkerCount(c *config.Config) uint32 {
|
||||
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_worker_count"); v > 0 {
|
||||
return v
|
||||
}
|
||||
return EvacuationObjectWorkerCountDefault
|
||||
}
|
||||
|
||||
// EngineEvacuationContainerWorkerCount returns value of "evacuation_container_worker_count" config parameter from "storage" section.
|
||||
func EngineEvacuationContainerWorkerCount(c *config.Config) uint32 {
|
||||
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_container_worker_count"); v > 0 {
|
||||
return v
|
||||
}
|
||||
return EvacuationContainerWorkerCountDefault
|
||||
}
|
||||
|
|
|
@ -91,6 +91,8 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
|||
# Storage engine section
|
||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
||||
EVACUATION_OBJECT_WORKER_COUNT=10
|
||||
EVACUATION_CONTAINER_WORKER_COUNT=10
|
||||
## 0 shard
|
||||
### Flag to refill Metabase from BlobStor
|
||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
||||
|
|
|
@ -136,6 +136,8 @@
|
|||
"storage": {
|
||||
"shard_pool_size": 15,
|
||||
"shard_ro_error_threshold": 100,
|
||||
"evacuation_container_worker_count": 10,
|
||||
"evacuation_object_worker_count": 10,
|
||||
"shard": {
|
||||
"0": {
|
||||
"mode": "read-only",
|
||||
|
|
|
@ -118,6 +118,8 @@ storage:
|
|||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
|
||||
evacuation_container_worker_count: 10 # the count of the concurrent container evacuation workers
|
||||
evacuation_object_worker_count: 10 # the count of the concurrent object evacuation workers
|
||||
|
||||
shard:
|
||||
default: # section with the default shard parameters
|
||||
|
|
|
@ -167,12 +167,14 @@ morph:
|
|||
|
||||
Local storage engine configuration.
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|----------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
|
||||
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
|
||||
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
|
||||
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
|
||||
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-------------------------------------|-----------------------------------|---------------|------------------------------------------------------------------------------------------------------------------|
|
||||
| `shard_pool_size` | `int` | `20` | Pool size for shard workers. Limits the amount of concurrent `PUT` operations on each shard. |
|
||||
| `shard_ro_error_threshold` | `int` | `0` | Maximum amount of storage errors to encounter before shard automatically moves to `Degraded` or `ReadOnly` mode. |
|
||||
| `evacuation_container_worker_count` | `int` | `10` | The count of the concurrent container evacuation workers. |
|
||||
| `evacuation_object_worker_count` | `int` | `10` | The count of the concurrent object evacuation workers. |
|
||||
| `low_mem` | `bool` | `false` | Reduce memory consumption by reducing performance. |
|
||||
| `shard` | [Shard config](#shard-subsection) | | Configuration for separate shards. |
|
||||
|
||||
## `shard` subsection
|
||||
|
||||
|
|
|
@ -213,12 +213,18 @@ type cfg struct {
|
|||
lowMem bool
|
||||
|
||||
containerSource atomic.Pointer[containerSource]
|
||||
|
||||
evacuationObjectWorkerCount uint32
|
||||
evacuationContainerWorkerCount uint32
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
res := &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
shardPoolSize: 20,
|
||||
|
||||
evacuationObjectWorkerCount: 10,
|
||||
evacuationContainerWorkerCount: 10,
|
||||
}
|
||||
res.containerSource.Store(&containerSource{})
|
||||
return res
|
||||
|
@ -277,6 +283,20 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithEvacuationObjectWorkerCount returns an option to set the count of the concurrent object evacuation workers per container.
|
||||
func WithEvacuationObjectWorkerCount(count uint32) Option {
|
||||
return func(c *cfg) {
|
||||
c.evacuationObjectWorkerCount = count
|
||||
}
|
||||
}
|
||||
|
||||
// WithEvacuationContainerWorkerCount returns an option to set the count of the concurrent container evacuation workers for each shard.
|
||||
func WithEvacuationContainerWorkerCount(count uint32) Option {
|
||||
return func(c *cfg) {
|
||||
c.evacuationContainerWorkerCount = count
|
||||
}
|
||||
}
|
||||
|
||||
// SetContainerSource sets container source.
|
||||
func (e *StorageEngine) SetContainerSource(cs container.Source) {
|
||||
e.containerSource.Store(&containerSource{cs: cs})
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
|
@ -24,6 +23,7 @@ import (
|
|||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -79,6 +79,9 @@ type EvacuateShardPrm struct {
|
|||
IgnoreErrors bool
|
||||
Async bool
|
||||
Scope EvacuateScope
|
||||
|
||||
ContainerWorkerCount uint32
|
||||
ObjectWorkerCount uint32
|
||||
}
|
||||
|
||||
// EvacuateShardRes represents result of the EvacuateShard operation.
|
||||
|
@ -189,8 +192,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
|
|||
return res
|
||||
}
|
||||
|
||||
const defaultEvacuateBatchSize = 100
|
||||
|
||||
type pooledShard struct {
|
||||
hashedShard
|
||||
pool util.WorkerPool
|
||||
|
@ -242,8 +243,16 @@ func (e *StorageEngine) Evacuate(ctx context.Context, prm EvacuateShardPrm) (*Ev
|
|||
return nil, err
|
||||
}
|
||||
|
||||
var mtx sync.RWMutex
|
||||
copyShards := func() []pooledShard {
|
||||
mtx.RLock()
|
||||
defer mtx.RUnlock()
|
||||
t := make([]pooledShard, len(shards))
|
||||
copy(t, shards)
|
||||
return t
|
||||
}
|
||||
eg.Go(func() error {
|
||||
return e.evacuateShards(egCtx, shardIDs, prm, res, shards, shardsToEvacuate)
|
||||
return e.evacuateShards(egCtx, shardIDs, prm, res, copyShards, shardsToEvacuate)
|
||||
})
|
||||
|
||||
if prm.Async {
|
||||
|
@ -261,7 +270,7 @@ func ctxOrBackground(ctx context.Context, background bool) context.Context {
|
|||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
var err error
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShards",
|
||||
|
@ -287,12 +296,46 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
|
|||
return err
|
||||
}
|
||||
|
||||
egShard, _ := errgroup.WithContext(ctx)
|
||||
|
||||
egContainer, _ := errgroup.WithContext(ctx)
|
||||
containerWorkerCount := prm.ContainerWorkerCount
|
||||
if containerWorkerCount == 0 {
|
||||
containerWorkerCount = e.cfg.evacuationContainerWorkerCount
|
||||
}
|
||||
egContainer.SetLimit(int(containerWorkerCount))
|
||||
|
||||
egObject, _ := errgroup.WithContext(ctx)
|
||||
objectWorkerCount := prm.ObjectWorkerCount
|
||||
if objectWorkerCount == 0 {
|
||||
objectWorkerCount = e.cfg.evacuationObjectWorkerCount
|
||||
}
|
||||
egObject.SetLimit(int(objectWorkerCount))
|
||||
|
||||
for _, shardID := range shardIDs {
|
||||
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
||||
return err
|
||||
}
|
||||
egShard.Go(func() error {
|
||||
if err := e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject); err != nil {
|
||||
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
errShard := egShard.Wait()
|
||||
errContainer := egContainer.Wait()
|
||||
errObject := egObject.Wait()
|
||||
if errShard != nil {
|
||||
err = errShard
|
||||
return errShard
|
||||
}
|
||||
if errContainer != nil {
|
||||
err = errContainer
|
||||
return errContainer
|
||||
}
|
||||
if errObject != nil {
|
||||
err = errObject
|
||||
return errObject
|
||||
}
|
||||
|
||||
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
|
||||
|
@ -336,7 +379,8 @@ func (e *StorageEngine) getTotals(ctx context.Context, prm EvacuateShardPrm, sha
|
|||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
egContainer *errgroup.Group, egObject *errgroup.Group,
|
||||
) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateShard",
|
||||
trace.WithAttributes(
|
||||
|
@ -344,59 +388,56 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
|
|||
))
|
||||
defer span.End()
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
if prm.Scope.WithObjects() {
|
||||
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||
return err
|
||||
}
|
||||
eg.Go(func() error {
|
||||
return e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate, egContainer, egObject)
|
||||
})
|
||||
}
|
||||
|
||||
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
|
||||
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
|
||||
return err
|
||||
}
|
||||
eg.Go(func() error {
|
||||
return e.evacuateShardTrees(egCtx, shardID, prm, res, shards, shardsToEvacuate)
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
shards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
egContainer *errgroup.Group, egObject *errgroup.Group,
|
||||
) error {
|
||||
var listPrm shard.ListWithCursorPrm
|
||||
listPrm.WithCount(defaultEvacuateBatchSize)
|
||||
|
||||
sh := shardsToEvacuate[shardID]
|
||||
sh.SetEvacuationInProgress(true)
|
||||
|
||||
var c *meta.Cursor
|
||||
for {
|
||||
listPrm.WithCursor(c)
|
||||
|
||||
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
|
||||
// because ListWithCursor works only with the metabase.
|
||||
listRes, err := sh.ListWithCursor(ctx, listPrm)
|
||||
if err != nil {
|
||||
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
|
||||
break
|
||||
var cntPrm shard.IterateOverContainersPrm
|
||||
cntPrm.Handler = func(ctx context.Context, name []byte, _ cid.ID) error {
|
||||
egContainer.Go(func() error {
|
||||
var objPrm shard.IterateOverObjectsInContainerPrm
|
||||
objPrm.BucketName = name
|
||||
objPrm.Handler = func(ctx context.Context, objInfo *object.Info) error {
|
||||
egObject.Go(func() error {
|
||||
return e.evacuateObject(ctx, shardID, objInfo, prm, res, shards, shardsToEvacuate)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
|
||||
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c = listRes.Cursor()
|
||||
return sh.IterateOverObjectsInContainer(ctx, objPrm)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
|
||||
sh.SetEvacuationInProgress(true)
|
||||
err := sh.IterateOverContainers(ctx, cntPrm)
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
sh := shardsToEvacuate[shardID]
|
||||
shards := getShards()
|
||||
|
||||
var listPrm pilorama.TreeListTreesPrm
|
||||
first := true
|
||||
|
@ -637,68 +678,65 @@ func (e *StorageEngine) getActualShards(shardIDs []string, prm EvacuateShardPrm)
|
|||
return shards, nil
|
||||
}
|
||||
|
||||
func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, toEvacuate []object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
func (e *StorageEngine) evacuateObject(ctx context.Context, shardID string, objInfo *object.Info, prm EvacuateShardPrm, res *EvacuateShardRes,
|
||||
getShards func() []pooledShard, shardsToEvacuate map[string]*shard.Shard,
|
||||
) error {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects",
|
||||
trace.WithAttributes(
|
||||
attribute.Int("objects_count", len(toEvacuate)),
|
||||
))
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.evacuateObjects")
|
||||
defer span.End()
|
||||
|
||||
for i := range toEvacuate {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
addr := toEvacuate[i].Address
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
var getPrm shard.GetPrm
|
||||
getPrm.SetAddress(addr)
|
||||
getPrm.SkipEvacCheck(true)
|
||||
shards := getShards()
|
||||
addr := objInfo.Address
|
||||
|
||||
getRes, err := sh.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
res.objFailed.Add(1)
|
||||
continue
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
var getPrm shard.GetPrm
|
||||
getPrm.SetAddress(addr)
|
||||
getPrm.SkipEvacCheck(true)
|
||||
|
||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), sh, shards, shardsToEvacuate, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if evacuatedLocal {
|
||||
continue
|
||||
}
|
||||
|
||||
if prm.ObjectsHandler == nil {
|
||||
// Do not check ignoreErrors flag here because
|
||||
// ignoring errors on put make this command kinda useless.
|
||||
return fmt.Errorf("%w: %s", errPutShard, toEvacuate[i])
|
||||
}
|
||||
|
||||
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
if moved {
|
||||
res.objEvacuated.Add(1)
|
||||
} else if prm.IgnoreErrors {
|
||||
getRes, err := shardsToEvacuate[shardID].Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
if prm.IgnoreErrors {
|
||||
res.objFailed.Add(1)
|
||||
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
} else {
|
||||
return fmt.Errorf("object %s was not replicated", addr)
|
||||
return nil
|
||||
}
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToReadObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
|
||||
evacuatedLocal, err := e.tryEvacuateObjectLocal(ctx, addr, getRes.Object(), shardsToEvacuate[shardID], shards, shardsToEvacuate, res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if evacuatedLocal {
|
||||
return nil
|
||||
}
|
||||
|
||||
if prm.ObjectsHandler == nil {
|
||||
// Do not check ignoreErrors flag here because
|
||||
// ignoring errors on put make this command kinda useless.
|
||||
return fmt.Errorf("%w: %s", errPutShard, objInfo)
|
||||
}
|
||||
|
||||
moved, err := prm.ObjectsHandler(ctx, addr, getRes.Object())
|
||||
if err != nil {
|
||||
e.log.Error(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
return err
|
||||
}
|
||||
if moved {
|
||||
res.objEvacuated.Add(1)
|
||||
} else if prm.IgnoreErrors {
|
||||
res.objFailed.Add(1)
|
||||
e.log.Warn(logs.EngineShardsEvacuationFailedToMoveObject, zap.String("address", addr.EncodeToString()), zap.Error(err), evacuationOperationLogField,
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
|
||||
} else {
|
||||
return fmt.Errorf("object %s was not replicated", addr)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"fmt"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -29,7 +31,8 @@ import (
|
|||
func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*objectSDK.Object) {
|
||||
dir := t.TempDir()
|
||||
|
||||
te := testNewEngine(t).
|
||||
te := testNewEngine(t,
|
||||
WithEvacuationContainerWorkerCount(2), WithEvacuationObjectWorkerCount(2)).
|
||||
setShardsNumOpts(t, shardNum, func(id int) []shard.Option {
|
||||
return []shard.Option{
|
||||
shard.WithLogger(test.NewLogger(t)),
|
||||
|
@ -174,13 +177,13 @@ func TestEvacuateObjectsNetwork(t *testing.T) {
|
|||
errReplication := errors.New("handler error")
|
||||
|
||||
acceptOneOf := func(objects []*objectSDK.Object, max uint64) func(context.Context, oid.Address, *objectSDK.Object) (bool, error) {
|
||||
var n uint64
|
||||
var n atomic.Uint64
|
||||
return func(_ context.Context, addr oid.Address, obj *objectSDK.Object) (bool, error) {
|
||||
if n == max {
|
||||
if n.Load() == max {
|
||||
return false, errReplication
|
||||
}
|
||||
|
||||
n++
|
||||
n.Add(1)
|
||||
for i := range objects {
|
||||
if addr == objectCore.AddressOf(objects[i]) {
|
||||
require.Equal(t, objects[i], obj)
|
||||
|
@ -531,6 +534,7 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
|||
require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly))
|
||||
require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly))
|
||||
|
||||
mutex := sync.Mutex{}
|
||||
evacuatedTreeOps := make(map[string][]*pilorama.Move)
|
||||
var prm EvacuateShardPrm
|
||||
prm.ShardID = ids
|
||||
|
@ -545,7 +549,9 @@ func TestEvacuateTreesRemote(t *testing.T) {
|
|||
if op.Time == 0 {
|
||||
return true, "", nil
|
||||
}
|
||||
mutex.Lock()
|
||||
evacuatedTreeOps[key] = append(evacuatedTreeOps[key], &op)
|
||||
mutex.Unlock()
|
||||
height = op.Time + 1
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,6 +61,20 @@ func (l ListRes) Cursor() *Cursor {
|
|||
return l.cursor
|
||||
}
|
||||
|
||||
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||
type IterateOverContainersPrm struct {
|
||||
// Handler function executed upon containers in db.
|
||||
Handler func(context.Context, []byte, cid.ID) error
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||
type IterateOverObjectsInContainerPrm struct {
|
||||
// BucketName container's bucket name
|
||||
BucketName []byte
|
||||
// Handler function executed upon objects in db.
|
||||
Handler func(context.Context, *objectcore.Info) error
|
||||
}
|
||||
|
||||
// ListWithCursor lists physical objects available in metabase starting from
|
||||
// cursor. Includes objects of all types. Does not include inhumed objects.
|
||||
// Use cursor value from response for consecutive requests.
|
||||
|
@ -259,3 +273,157 @@ func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte)
|
|||
|
||||
return rawID, name[0]
|
||||
}
|
||||
|
||||
// IterateOverContainers lists physical containers available in metabase starting from first.
|
||||
func (db *DB) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("IterateOverContainers", time.Since(startedAt), success)
|
||||
}()
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverContainers",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("has_handler", prm.Handler != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateOverContainers(ctx, tx, prm)
|
||||
})
|
||||
success = err == nil
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) iterateOverContainers(ctx context.Context, tx *bbolt.Tx, prm IterateOverContainersPrm) error {
|
||||
c := tx.Cursor()
|
||||
name, _ := c.First()
|
||||
|
||||
var containerID cid.ID
|
||||
for ; name != nil; name, _ = c.Next() {
|
||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, name)
|
||||
if cidRaw == nil {
|
||||
continue
|
||||
}
|
||||
if prefix != primaryPrefix && prefix != lockersPrefix && prefix != tombstonePrefix {
|
||||
continue
|
||||
}
|
||||
|
||||
bkt := tx.Bucket(name)
|
||||
if bkt != nil {
|
||||
bktName := make([]byte, len(name))
|
||||
copy(bktName, name)
|
||||
var cnt cid.ID
|
||||
copy(cnt[:], containerID[:])
|
||||
err := prm.Handler(ctx, bktName, cnt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainer iterate over physical objects available in metabase starting from first.
|
||||
func (db *DB) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
|
||||
var (
|
||||
startedAt = time.Now()
|
||||
success = false
|
||||
)
|
||||
defer func() {
|
||||
db.metrics.AddMethodDuration("IterateOverObjectsInContainer", time.Since(startedAt), success)
|
||||
}()
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.IterateOverObjectsInContainer",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("has_handler", prm.Handler != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateOverObjectsInContainer(ctx, tx, prm)
|
||||
})
|
||||
success = err == nil
|
||||
return metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
func (db *DB) iterateOverObjectsInContainer(ctx context.Context, tx *bbolt.Tx, prm IterateOverObjectsInContainerPrm) error {
|
||||
bkt := tx.Bucket(prm.BucketName)
|
||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||
garbageBkt := tx.Bucket(garbageBucketName)
|
||||
c := bkt.Cursor()
|
||||
k, v := c.First()
|
||||
|
||||
var containerID cid.ID
|
||||
cidRaw, prefix := parseContainerIDWithPrefix(&containerID, prm.BucketName)
|
||||
if cidRaw == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var objType objectSDK.Type
|
||||
|
||||
switch prefix {
|
||||
case primaryPrefix:
|
||||
objType = objectSDK.TypeRegular
|
||||
case lockersPrefix:
|
||||
objType = objectSDK.TypeLock
|
||||
case tombstonePrefix:
|
||||
objType = objectSDK.TypeTombstone
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
for ; k != nil; k, v = c.Next() {
|
||||
var obj oid.ID
|
||||
if err := obj.Decode(k); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var isLinkingObj bool
|
||||
var ecInfo *objectcore.ECInfo
|
||||
if objType == objectSDK.TypeRegular {
|
||||
var o objectSDK.Object
|
||||
if err := o.Unmarshal(v); err != nil {
|
||||
return err
|
||||
}
|
||||
isLinkingObj = isLinkObject(&o)
|
||||
ecHeader := o.ECHeader()
|
||||
if ecHeader != nil {
|
||||
ecInfo = &objectcore.ECInfo{
|
||||
ParentID: ecHeader.Parent(),
|
||||
Index: ecHeader.Index(),
|
||||
Total: ecHeader.Total(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var a oid.Address
|
||||
a.SetContainer(containerID)
|
||||
a.SetObject(obj)
|
||||
objInfo := objectcore.Info{Address: a, Type: objType, IsLinkingObject: isLinkingObj, ECInfo: ecInfo}
|
||||
err := prm.Handler(ctx, &objInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -34,6 +34,20 @@ func (r ListContainersRes) Containers() []cid.ID {
|
|||
return r.containers
|
||||
}
|
||||
|
||||
// IterateOverContainersPrm contains parameters for IterateOverContainers operation.
|
||||
type IterateOverContainersPrm struct {
|
||||
// Handler function executed upon containers in db.
|
||||
Handler func(context.Context, []byte, cid.ID) error
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainerPrm contains parameters for IterateOverObjectsInContainer operation.
|
||||
type IterateOverObjectsInContainerPrm struct {
|
||||
// BucketName container's bucket name
|
||||
BucketName []byte
|
||||
// Handler function executed upon containers in db.
|
||||
Handler func(context.Context, *objectcore.Info) error
|
||||
}
|
||||
|
||||
// ListWithCursorPrm contains parameters for ListWithCursor operation.
|
||||
type ListWithCursorPrm struct {
|
||||
count uint32
|
||||
|
@ -164,3 +178,48 @@ func (s *Shard) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (List
|
|||
cursor: res.Cursor(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// IterateOverContainers lists physical containers presented in shard.
|
||||
func (s *Shard) IterateOverContainers(ctx context.Context, prm IterateOverContainersPrm) error {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverContainers",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("has_handler", prm.Handler != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if s.GetMode().NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
var metaPrm meta.IterateOverContainersPrm
|
||||
metaPrm.Handler = prm.Handler
|
||||
err := s.metaBase.IterateOverContainers(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not iterate over containers: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IterateOverObjectsInContainer lists physical objects presented in shard for provided container's bucket name.
|
||||
func (s *Shard) IterateOverObjectsInContainer(ctx context.Context, prm IterateOverObjectsInContainerPrm) error {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "shard.IterateOverObjectsInContainer",
|
||||
trace.WithAttributes(
|
||||
attribute.Bool("has_handler", prm.Handler != nil),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
if s.GetMode().NoMetabase() {
|
||||
return ErrDegradedMode
|
||||
}
|
||||
|
||||
var metaPrm meta.IterateOverObjectsInContainerPrm
|
||||
metaPrm.BucketName = prm.BucketName
|
||||
metaPrm.Handler = prm.Handler
|
||||
err := s.metaBase.IterateOverObjectsInContainer(ctx, metaPrm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not iterate over objects: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -23,12 +23,14 @@ func (s *Server) StartShardEvacuation(ctx context.Context, req *control.StartSha
|
|||
}
|
||||
|
||||
prm := engine.EvacuateShardPrm{
|
||||
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||
ObjectsHandler: s.replicateObject,
|
||||
TreeHandler: s.replicateTree,
|
||||
Async: true,
|
||||
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
||||
ShardID: s.getShardIDList(req.GetBody().GetShard_ID()),
|
||||
IgnoreErrors: req.GetBody().GetIgnoreErrors(),
|
||||
ObjectsHandler: s.replicateObject,
|
||||
TreeHandler: s.replicateTree,
|
||||
Async: true,
|
||||
Scope: engine.EvacuateScope(req.GetBody().GetScope()),
|
||||
ContainerWorkerCount: req.GetBody().GetContainerWorkerCount(),
|
||||
ObjectWorkerCount: req.GetBody().GetObjectWorkerCount(),
|
||||
}
|
||||
|
||||
_, err = s.s.Evacuate(ctx, prm)
|
||||
|
|
|
@ -394,6 +394,10 @@ message StartShardEvacuationRequest {
|
|||
bool ignore_errors = 2;
|
||||
// Evacuation scope.
|
||||
uint32 scope = 3;
|
||||
// Count of concurrent container evacuation workers.
|
||||
uint32 container_worker_count = 4;
|
||||
// Count of concurrent object evacuation workers.
|
||||
uint32 object_worker_count = 5;
|
||||
}
|
||||
|
||||
Body body = 1;
|
||||
|
|
BIN
pkg/services/control/service_frostfs.pb.go
generated
BIN
pkg/services/control/service_frostfs.pb.go
generated
Binary file not shown.
Loading…
Reference in a new issue