[#1356] engine: Evacuate object from shards concurrently
Some checks failed
DCO action / DCO (pull_request) Successful in 1m18s
Vulncheck / Vulncheck (pull_request) Successful in 2m47s
Tests and linters / Run gofumpt (pull_request) Successful in 2m52s
Tests and linters / Staticcheck (pull_request) Successful in 3m11s
Pre-commit hooks / Pre-commit (pull_request) Successful in 3m23s
Tests and linters / Tests (pull_request) Successful in 3m38s
Build / Build Components (pull_request) Successful in 3m44s
Tests and linters / gopls check (pull_request) Successful in 3m53s
Tests and linters / Tests with -race (pull_request) Failing after 4m8s
Tests and linters / Lint (pull_request) Successful in 4m59s

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-06 11:56:34 +03:00
parent 4b8956e7ca
commit 15fba7a29c
5 changed files with 112 additions and 20 deletions

View file

@ -110,6 +110,10 @@ type applicationConfiguration struct {
shards []shardCfg
lowMem bool
rebuildWorkers uint32
evacuationWorkers uint32
evacuationObjectWorkers uint32
evacuationObjectBatchSize uint32
}
// if need to run node in compatibility with other versions mode
@ -231,6 +235,9 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c)
a.EngineCfg.evacuationWorkers = engineconfig.EngineEvacuationWorkersCount(c)
a.EngineCfg.evacuationObjectWorkers = engineconfig.EngineEvacuationObjectWorkersCount(c)
a.EngineCfg.evacuationObjectBatchSize = engineconfig.EngineEvacuationObjectBatchSize(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
@ -836,6 +843,9 @@ func (c *cfg) engineOpts() []engine.Option {
engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers),
engine.WithEvacuationWorkers(c.EngineCfg.evacuationWorkers),
engine.WithEvacuationObjectWorkers(c.EngineCfg.evacuationObjectWorkers),
engine.WithEvacuationObjectBatchSize(c.EngineCfg.evacuationObjectBatchSize),
)
if c.metricsCollector != nil {

View file

@ -18,6 +18,15 @@ const (
// RebuildWorkersCountDefault is a default value of the workers count to
// process storage rebuild operations in a storage engine.
RebuildWorkersCountDefault = 100
// EvacuationWorkersCountDefault is a default value of the count of shards
// evacuees concurrently.
EvacuationWorkersCountDefault = 5
// EvacuationObjectWorkersCountDefault is a default value of the count of
// concurrent object evacuation workers.
EvacuationObjectWorkersCountDefault = 10
// EvacuationObjectBatchSizeDefault is a default value of the count of
// objects reading from metabase.
EvacuationObjectBatchSizeDefault = 100
)
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
@ -99,3 +108,27 @@ func EngineRebuildWorkersCount(c *config.Config) uint32 {
}
return RebuildWorkersCountDefault
}
// EngineEvacuationWorkersCount returns value of "evacuation_workers_count" config parmeter from "storage" section.
func EngineEvacuationWorkersCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_workers_count"); v > 0 {
return v
}
return EvacuationWorkersCountDefault
}
// EngineEvacuationObjectWorkersCount returns value of "evacuation_object_workers_count" config parmeter from "storage" section.
func EngineEvacuationObjectWorkersCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_workers_count"); v > 0 {
return v
}
return EvacuationObjectWorkersCountDefault
}
// EngineEvacuationObjectBatchSize returns value of "evacuation_object_batch_size" config parmeter from "storage" section.
func EngineEvacuationObjectBatchSize(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "evacuation_object_batch_size"); v > 0 {
return v
}
return EvacuationObjectBatchSizeDefault
}

View file

@ -216,6 +216,10 @@ type cfg struct {
rebuildWorkersCount uint32
containerSource atomic.Pointer[containerSource]
evacuationWorkers uint32
evacuationObjectWorkers uint32
evacuationObjectBatchSize uint32
}
func defaultCfg() *cfg {
@ -223,6 +227,10 @@ func defaultCfg() *cfg {
log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
rebuildWorkersCount: 100,
evacuationWorkers: 5,
evacuationObjectWorkers: 10,
evacuationObjectBatchSize: 100,
}
res.containerSource.Store(&containerSource{})
return res
@ -289,6 +297,27 @@ func WithRebuildWorkersCount(count uint32) Option {
}
}
// WithEvacuationWorkers returns an option to set the count of shards evacuees concurrently.
func WithEvacuationWorkers(count uint32) Option {
return func(c *cfg) {
c.evacuationWorkers = count
}
}
// WithEvacuationObjectWorkers returns an option to set the count of the concurrent object evacuation workers.
func WithEvacuationObjectWorkers(count uint32) Option {
return func(c *cfg) {
c.evacuationObjectWorkers = count
}
}
// WithEvacuationObjectBatchSize returns an option to set the count of object reading from metabase.
func WithEvacuationObjectBatchSize(count uint32) Option {
return func(c *cfg) {
c.evacuationObjectBatchSize = count
}
}
// SetContainerSource sets container source.
func (e *StorageEngine) SetContainerSource(cs container.Source) {
e.containerSource.Store(&containerSource{cs: cs})

View file

@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
@ -189,8 +190,6 @@ func (p *EvacuateShardRes) DeepCopy() *EvacuateShardRes {
return res
}
const defaultEvacuateBatchSize = 100
type pooledShard struct {
hashedShard
pool util.WorkerPool
@ -287,12 +286,26 @@ func (e *StorageEngine) evacuateShards(ctx context.Context, shardIDs []string, p
return err
}
limiter := newRoutineLimiter(e.cfg.evacuationWorkers)
eg, egCtx := errgroup.WithContext(ctx)
for _, shardID := range shardIDs {
if err = e.evacuateShard(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
_ = eg.Wait()
return err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
if err = e.evacuateShard(egCtx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
e.log.Error(logs.EngineFinishedWithErrorShardsEvacuation, zap.Error(err), zap.Strings("shard_ids", shardIDs), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)), zap.Stringer("scope", prm.Scope))
return err
}
return nil
})
}
err = eg.Wait()
if err != nil {
return err
}
e.log.Info(logs.EngineFinishedSuccessfullyShardsEvacuation,
@ -344,53 +357,60 @@ func (e *StorageEngine) evacuateShard(ctx context.Context, shardID string, prm E
))
defer span.End()
eg, egCtx := errgroup.WithContext(ctx)
if prm.Scope.WithObjects() {
if err := e.evacuateShardObjects(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
}
eg.Go(func() error {
return e.evacuateShardObjects(egCtx, shardID, prm, res, shards, shardsToEvacuate)
})
}
if prm.Scope.WithTrees() && shardsToEvacuate[shardID].PiloramaEnabled() {
if err := e.evacuateShardTrees(ctx, shardID, prm, res, shards, shardsToEvacuate); err != nil {
return err
}
eg.Go(func() error {
return e.evacuateShardTrees(egCtx, shardID, prm, res, shards, shardsToEvacuate)
})
}
return nil
return eg.Wait()
}
func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard,
) error {
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
listPrm.WithCount(e.cfg.evacuationObjectBatchSize)
sh := shardsToEvacuate[shardID]
sh.SetEvacuationInProgress(true)
eg, egCtx := errgroup.WithContext(ctx)
limiter := newRoutineLimiter(e.cfg.evacuationObjectWorkers)
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(ctx, listPrm)
listRes, err := sh.ListWithCursor(egCtx, listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
break
}
e.log.Error(logs.EngineShardsEvacuationFailedToListObjects, zap.String("shard_id", shardID), zap.Error(err), evacuationOperationLogField,
zap.String("trace_id", tracingPkg.GetTraceID(ctx)))
zap.String("trace_id", tracingPkg.GetTraceID(egCtx)))
return err
}
if err = e.evacuateObjects(ctx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate); err != nil {
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
_ = eg.Wait()
return err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
return e.evacuateObjects(egCtx, sh, listRes.AddressList(), prm, res, shards, shardsToEvacuate)
})
c = listRes.Cursor()
}
return nil
return eg.Wait()
}
func (e *StorageEngine) evacuateShardTrees(ctx context.Context, shardID string, prm EvacuateShardPrm, res *EvacuateShardRes,

View file

@ -169,7 +169,7 @@ func (s *Server) getContainerNodes(contID cid.ID) ([]netmap.NodeInfo, error) {
nodes := placement.FlattenNodes(ns)
bs := (*keys.PublicKey)(&s.key.PublicKey).Bytes()
for i := range len(nodes) {
for i := 0; i < len(nodes); i++ {
if bytes.Equal(nodes[i].PublicKey(), bs) {
copy(nodes[i:], nodes[i+1:])
nodes = nodes[:len(nodes)-1]