[#1356] engine: Evacuate object from shards concurrently
Some checks failed
Vulncheck / Vulncheck (pull_request) Successful in 1m17s
DCO action / DCO (pull_request) Successful in 1m31s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m4s
Tests and linters / Run gofumpt (pull_request) Successful in 2m2s
Build / Build Components (pull_request) Successful in 2m19s
Tests and linters / gopls check (pull_request) Successful in 2m34s
Tests and linters / Staticcheck (pull_request) Successful in 2m46s
Tests and linters / Tests with -race (pull_request) Failing after 2m58s
Tests and linters / Lint (pull_request) Successful in 3m23s
Tests and linters / Tests (pull_request) Successful in 3m48s

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-06 11:56:34 +03:00
parent d3b209c8e1
commit aa21a71b61
4 changed files with 105 additions and 23 deletions

View file

@ -109,6 +109,10 @@ type applicationConfiguration struct {
shardPoolSize uint32
shards []shardCfg
lowMem bool
evacuationWorkers uint32
evacuationObjectWorkers uint32
evacuationObjectBatchSize uint32
}
// if need to run node in compatibility with other versions mode
@ -229,6 +233,9 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
a.EngineCfg.evacuationWorkers = engineconfig.EngineEvacuationWorkersCount(c)
a.EngineCfg.evacuationObjectWorkers = engineconfig.EngineEvacuationObjectWorkersCount(c)
a.EngineCfg.evacuationObjectBatchSize = engineconfig.EngineEvacuationObjectBatchSize(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
@ -833,6 +840,9 @@ func (c *cfg) engineOpts() []engine.Option {
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
engine.WithEvacuationWorkers(c.EngineCfg.evacuationWorkers),
engine.WithEvacuationObjectWorkers(c.EngineCfg.evacuationObjectWorkers),
engine.WithEvacuationObjectBatchSize(c.EngineCfg.evacuationObjectBatchSize),
)
if c.metricsCollector != nil {

View file

@ -15,6 +15,15 @@ const (
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
// process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20
// EvacuationWorkersCountDefault is a default value of the count of shards
// evacuees concurrently.
EvacuationWorkersCountDefault = 5
// EvacuationObjectWorkersCountDefault is a default value of the count of
// concurrent object evacuation workers.
EvacuationObjectWorkersCountDefault = 10
// EvacuationObjectBatchSizeDefault is a default value of the count of
// objects reading from metabase.
EvacuationObjectBatchSizeDefault = 100
)
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
@ -88,3 +97,27 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func EngineLowMemoryConsumption(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "low_mem")
}
// EngineEvacuationWorkersCount returns value of "evacuation_workers_count" config parmeter from "storage" section.
func EngineEvacuationWorkersCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_workers_count"); v > 0 {
return v
}
return EvacuationWorkersCountDefault
}
// EngineEvacuationObjectWorkersCount returns value of "evacuation_object_workers_count" config parmeter from "storage" section.
func EngineEvacuationObjectWorkersCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_workers_count"); v > 0 {
return v
}
return EvacuationObjectWorkersCountDefault
}
// EngineEvacuationObjectBatchSize returns value of "evacuation_object_batch_size" config parmeter from "storage" section.
func EngineEvacuationObjectBatchSize(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_batch_size"); v > 0 {
return v
}
return EvacuationObjectBatchSizeDefault
}

View file

@ -213,12 +213,20 @@ type cfg struct {
lowMem bool
containerSource atomic.Pointer[containerSource]
evacuationWorkers uint32
evacuationObjectWorkers uint32
evacuationObjectBatchSize uint32
}
func defaultCfg() *cfg {
res := &cfg{
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
evacuationWorkers: 5,
evacuationObjectWorkers: 10,
evacuationObjectBatchSize: 100,
}
res.containerSource.Store(&containerSource{})
return res
@ -277,6 +285,27 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
}
}
// WithEvacuationWorkers returns an option to set the count of shards evacuees concurrently.
func WithEvacuationWorkers(count uint32) Option {
return func(c *cfg) {
c.evacuationWorkers = count
}
}
// WithEvacuationObjectWorkers returns an option to set the count of the concurrent object evacuation workers.
func WithEvacuationObjectWorkers(count uint32) Option {
return func(c *cfg) {
c.evacuationObjectWorkers = count
}
}
// WithEvacuationObjectBatchSize returns an option to set the count of object reading from metabase.
func WithEvacuationObjectBatchSize(count uint32) Option {
return func(c *cfg) {
c.evacuationObjectBatchSize = count
}
}
// SetContainerSource sets container source.
func (e *StorageEngine) SetContainerSource(cs container.Source) {
e.containerSource.Store(&containerSource{cs: cs})

View file

@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
@ -189,8 +190,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
return res
}
const defaultEvacuateBatchSize = 100
type pooledShard struct {
hashedShard
pool util.WorkerPool
@ -287,12 +286,21 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return err
}
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(int(e.cfg.evacuationWorkers))
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
eg.Go(func() error {
if err = e.evacuateShard(egCtx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
return nil
})
}
err = eg.Wait()
if err != nil {
return err
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
@ -344,53 +352,55 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
))
defer span.End()
eg, egCtx := errgroup.WithContext(ctx)
if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
}
eg.Go(func() error {
return e.evacuateShardObjects(egCtx, shardID, prm, res, shards, shardsToEvacuate)
})
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
}
eg.Go(func() error {
return e.evacuateShardTrees(egCtx, shardID, prm, res, shards, shardsToEvacuate)
})
}
return nil
return eg.Wait()
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
listPrm.WithCount(e.cfg.evacuationObjectBatchSize)
sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(int(e.cfg.evacuationObjectWorkers))
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(ctx, listPrm)
listRes, err := sh.ListWithCursor(egCtx, listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
}
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
return err
}
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
zap.String("trace_id", tracingPkg.GetTraceID(egCtx)))
return err
}
eg.Go(func() error {
return e.evacuateObjects(egCtx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate)
})
c = listRes.Cursor()
}
return nil
return eg.Wait()
}
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,