From 888f966eb487a05f4e5bb46877f9897fc36608a9 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 3 Oct 2023 11:58:35 +0300 Subject: [PATCH] [#661] blobovniczatree: Make Rebuild concurrent Different DBs can be rebuild concurrently. Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 5 +- cmd/frostfs-node/config/engine/config.go | 11 +++++ cmd/frostfs-node/config/engine/config_test.go | 2 + config/example/node.env | 1 + config/example/node.json | 1 + config/example/node.yaml | 1 + .../blobstor/blobovniczatree/rebuild.go | 49 +++++++++++++++---- .../blobovniczatree/rebuild_failover_test.go | 5 +- .../blobstor/blobovniczatree/rebuild_test.go | 11 +++++ .../blobstor/common/rebuild.go | 8 ++- pkg/local_object_storage/blobstor/rebuild.go | 10 +++- pkg/local_object_storage/engine/engine.go | 17 +++++-- .../engine/rebuild_limiter.go | 26 ++++++++++ pkg/local_object_storage/engine/shards.go | 1 + pkg/local_object_storage/shard/control.go | 2 +- .../shard/rebuild_limiter.go | 13 +++++ pkg/local_object_storage/shard/rebuilder.go | 18 ++++--- pkg/local_object_storage/shard/shard.go | 11 +++++ 18 files changed, 165 insertions(+), 27 deletions(-) create mode 100644 pkg/local_object_storage/engine/rebuild_limiter.go create mode 100644 pkg/local_object_storage/shard/rebuild_limiter.go diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index c4eb81c81..fd3ee4b74 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -101,6 +101,7 @@ type applicationConfiguration struct { shardPoolSize uint32 shards []shardCfg lowMem bool + rebuildWorkers uint32 } } @@ -208,6 +209,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) + a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) } @@ -687,13 +689,14 @@ func initCfgObject(appCfg *config.Config) cfgObject { } func (c *cfg) engineOpts() []engine.Option { - opts := make([]engine.Option, 0, 4) + var opts []engine.Option opts = append(opts, engine.WithShardPoolSize(c.EngineCfg.shardPoolSize), engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), + engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers), ) if c.metricsCollector != nil { diff --git a/cmd/frostfs-node/config/engine/config.go b/cmd/frostfs-node/config/engine/config.go index c944d1c58..baa4e3c9d 100644 --- a/cmd/frostfs-node/config/engine/config.go +++ b/cmd/frostfs-node/config/engine/config.go @@ -15,6 +15,9 @@ const ( // ShardPoolSizeDefault is a default value of routine pool size per-shard to // process object PUT operations in a storage engine. ShardPoolSizeDefault = 20 + // RebuildWorkersCountDefault is a default value of the workers count to + // process storage rebuild operations in a storage engine. + RebuildWorkersCountDefault = 100 ) // ErrNoShardConfigured is returned when at least 1 shard is required but none are found. @@ -88,3 +91,11 @@ func ShardErrorThreshold(c *config.Config) uint32 { func EngineLowMemoryConsumption(c *config.Config) bool { return config.BoolSafe(c.Sub(subsection), "low_mem") } + +// EngineRebuildWorkersCount returns value of "rebuild_workers_count" config parmeter from "storage" section. +func EngineRebuildWorkersCount(c *config.Config) uint32 { + if v := config.Uint32Safe(c.Sub(subsection), "rebuild_workers_count"); v > 0 { + return v + } + return RebuildWorkersCountDefault +} diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index 2f47229bc..14e623407 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -38,6 +38,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty)) require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty)) require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode()) + require.EqualValues(t, engineconfig.RebuildWorkersCountDefault, engineconfig.EngineRebuildWorkersCount(empty)) }) const path = "../../../../config/example/node" @@ -47,6 +48,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c)) require.EqualValues(t, 15, engineconfig.ShardPoolSize(c)) + require.EqualValues(t, uint32(1000), engineconfig.EngineRebuildWorkersCount(c)) err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error { defer func() { diff --git a/config/example/node.env b/config/example/node.env index b2e694582..1cff449ee 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -92,6 +92,7 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10 # Storage engine section FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 +FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000 ## 0 shard ### Flag to refill Metabase from BlobStor FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false diff --git a/config/example/node.json b/config/example/node.json index 5beefd334..0281f46df 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -137,6 +137,7 @@ "storage": { "shard_pool_size": 15, "shard_ro_error_threshold": 100, + "rebuild_workers_count": 1000, "shard": { "0": { "mode": "read-only", diff --git a/config/example/node.yaml b/config/example/node.yaml index d603f0d89..7d2ed3e29 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -116,6 +116,7 @@ storage: # note: shard configuration can be omitted for relay node (see `node.relay`) shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) + rebuild_workers_count: 1000 # count of rebuild storage concurrent workers shard: default: # section with the default shard parameters diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go index c1f21703c..94e5ec0d1 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "sync" + "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -14,6 +15,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed") @@ -55,21 +57,48 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm success = false return res, err } + b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate))) - for _, db := range dbsToMigrate { - b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db)) - movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage) - res.ObjectsMoved += movedObjects - if err != nil { - b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err)) - success = false + res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res) + if err != nil { + success = false + } + return res, err +} + +func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) { + eg, ctx := errgroup.WithContext(ctx) + + var movedObjectsAcc atomic.Uint64 + var filesMovedAcc atomic.Uint64 + for _, db := range dbs { + db := db + if err := prm.WorkerLimiter.AcquireWorkSlot(ctx); err != nil { + _ = eg.Wait() + res.FilesRemoved += filesMovedAcc.Load() + res.ObjectsMoved += movedObjectsAcc.Load() return res, err } - b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects)) - res.FilesRemoved++ + eg.Go(func() error { + defer prm.WorkerLimiter.ReleaseWorkSlot() + + b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db)) + movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage) + movedObjectsAcc.Add(movedObjects) + if err != nil { + b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err)) + return err + } + b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects)) + filesMovedAcc.Add(1) + return nil + }) } - return res, nil + err := eg.Wait() + res.FilesRemoved += filesMovedAcc.Load() + res.ObjectsMoved += movedObjectsAcc.Load() + return res, err } func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) { diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go index c9ce564c9..ff86c036a 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "path/filepath" + "sync" "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -147,9 +148,11 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object metaStub := &storageIDUpdateStub{ storageIDs: make(map[oid.Address][]byte), + guard: &sync.Mutex{}, } rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ - MetaStorage: metaStub, + MetaStorage: metaStub, + WorkerLimiter: &rebuildLimiterStub{}, }) require.NoError(t, err) require.Equal(t, uint64(1), rRes.ObjectsMoved) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go index 00b38f69a..34006b270 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go @@ -101,9 +101,11 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta metaStub := &storageIDUpdateStub{ storageIDs: storageIDs, + guard: &sync.Mutex{}, } var rPrm common.RebuildPrm rPrm.MetaStorage = metaStub + rPrm.WorkerLimiter = &rebuildLimiterStub{} rRes, err := b.Rebuild(context.Background(), rPrm) require.NoError(t, err) dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 @@ -121,12 +123,21 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta } type storageIDUpdateStub struct { + guard *sync.Mutex storageIDs map[oid.Address][]byte updatedCount uint64 } func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error { + s.guard.Lock() + defer s.guard.Unlock() + s.storageIDs[addr] = storageID s.updatedCount++ return nil } + +type rebuildLimiterStub struct{} + +func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil } +func (s *rebuildLimiterStub) ReleaseWorkSlot() {} diff --git a/pkg/local_object_storage/blobstor/common/rebuild.go b/pkg/local_object_storage/blobstor/common/rebuild.go index 896ecbb33..9f629ef8c 100644 --- a/pkg/local_object_storage/blobstor/common/rebuild.go +++ b/pkg/local_object_storage/blobstor/common/rebuild.go @@ -12,9 +12,15 @@ type RebuildRes struct { } type RebuildPrm struct { - MetaStorage MetaStorage + MetaStorage MetaStorage + WorkerLimiter ConcurrentWorkersLimiter } type MetaStorage interface { UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error } + +type ConcurrentWorkersLimiter interface { + AcquireWorkSlot(ctx context.Context) error + ReleaseWorkSlot() +} diff --git a/pkg/local_object_storage/blobstor/rebuild.go b/pkg/local_object_storage/blobstor/rebuild.go index 882381dcf..101c60752 100644 --- a/pkg/local_object_storage/blobstor/rebuild.go +++ b/pkg/local_object_storage/blobstor/rebuild.go @@ -13,12 +13,18 @@ type StorageIDUpdate interface { UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error } -func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate) error { +type ConcurrentWorkersLimiter interface { + AcquireWorkSlot(ctx context.Context) error + ReleaseWorkSlot() +} + +func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter) error { var summary common.RebuildRes var rErr error for _, storage := range b.storage { res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{ - MetaStorage: upd, + MetaStorage: upd, + WorkerLimiter: limiter, }) summary.FilesRemoved += res.FilesRemoved summary.ObjectsMoved += res.ObjectsMoved diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index bb0b682d6..6f62be5f7 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -38,6 +38,7 @@ type StorageEngine struct { err error } evacuateLimiter *evacuationLimiter + rebuildLimiter *rebuildLimiter } type shardWrapper struct { @@ -213,13 +214,15 @@ type cfg struct { shardPoolSize uint32 lowMem bool + + rebuildWorkersCount uint32 } func defaultCfg() *cfg { return &cfg{ - log: &logger.Logger{Logger: zap.L()}, - - shardPoolSize: 20, + log: &logger.Logger{Logger: zap.L()}, + shardPoolSize: 20, + rebuildWorkersCount: 100, } } @@ -238,6 +241,7 @@ func New(opts ...Option) *StorageEngine { closeCh: make(chan struct{}), setModeCh: make(chan setModeRequest), evacuateLimiter: &evacuationLimiter{}, + rebuildLimiter: newRebuildLimiter(c.rebuildWorkersCount), } } @@ -275,3 +279,10 @@ func WithLowMemoryConsumption(lowMemCons bool) Option { c.lowMem = lowMemCons } } + +// WithRebuildWorkersCount returns an option to set the count of concurrent rebuild workers. +func WithRebuildWorkersCount(count uint32) Option { + return func(c *cfg) { + c.rebuildWorkersCount = count + } +} diff --git a/pkg/local_object_storage/engine/rebuild_limiter.go b/pkg/local_object_storage/engine/rebuild_limiter.go new file mode 100644 index 000000000..28b02b0a3 --- /dev/null +++ b/pkg/local_object_storage/engine/rebuild_limiter.go @@ -0,0 +1,26 @@ +package engine + +import "context" + +type rebuildLimiter struct { + semaphore chan struct{} +} + +func newRebuildLimiter(workersCount uint32) *rebuildLimiter { + return &rebuildLimiter{ + semaphore: make(chan struct{}, workersCount), + } +} + +func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error { + select { + case l.semaphore <- struct{}{}: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (l *rebuildLimiter) ReleaseWorkSlot() { + <-l.semaphore +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 4b9d8752a..71ef8e6fb 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -110,6 +110,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (* shard.WithExpiredLocksCallback(e.processExpiredLocks), shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithReportErrorFunc(e.reportShardErrorBackground), + shard.WithRebuildWorkerLimiter(e.rebuildLimiter), )...) if err := sh.UpdateID(ctx); err != nil { diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index a988683ad..f103ebc2b 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -162,7 +162,7 @@ func (s *Shard) Init(ctx context.Context) error { s.gc.init(ctx) - s.rb = newRebuilder() + s.rb = newRebuilder(s.rebuildLimiter) s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) return nil diff --git a/pkg/local_object_storage/shard/rebuild_limiter.go b/pkg/local_object_storage/shard/rebuild_limiter.go new file mode 100644 index 000000000..efc21837c --- /dev/null +++ b/pkg/local_object_storage/shard/rebuild_limiter.go @@ -0,0 +1,13 @@ +package shard + +import "context" + +type RebuildWorkerLimiter interface { + AcquireWorkSlot(ctx context.Context) error + ReleaseWorkSlot() +} + +type noopRebuildLimiter struct{} + +func (l *noopRebuildLimiter) AcquireWorkSlot(context.Context) error { return nil } +func (l *noopRebuildLimiter) ReleaseWorkSlot() {} diff --git a/pkg/local_object_storage/shard/rebuilder.go b/pkg/local_object_storage/shard/rebuilder.go index f46488c82..1ae90869b 100644 --- a/pkg/local_object_storage/shard/rebuilder.go +++ b/pkg/local_object_storage/shard/rebuilder.go @@ -14,16 +14,18 @@ import ( ) type rebuilder struct { - mtx *sync.Mutex - wg *sync.WaitGroup - cancel func() + mtx *sync.Mutex + wg *sync.WaitGroup + cancel func() + limiter RebuildWorkerLimiter } -func newRebuilder() *rebuilder { +func newRebuilder(l RebuildWorkerLimiter) *rebuilder { return &rebuilder{ - mtx: &sync.Mutex{}, - wg: &sync.WaitGroup{}, - cancel: nil, + mtx: &sync.Mutex{}, + wg: &sync.WaitGroup{}, + cancel: nil, + limiter: l, } } @@ -45,7 +47,7 @@ func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D defer r.wg.Done() log.Info(logs.BlobstoreRebuildStarted) - if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}); err != nil { + if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil { log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err)) } else { log.Info(logs.BlobstoreRebuildCompletedSuccessfully) diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 3edf50731..2aa01469b 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -123,6 +123,8 @@ type cfg struct { metricsWriter MetricsWriter reportErrorFunc func(selfID string, message string, err error) + + rebuildLimiter RebuildWorkerLimiter } func defaultCfg() *cfg { @@ -131,6 +133,7 @@ func defaultCfg() *cfg { log: &logger.Logger{Logger: zap.L()}, gcCfg: defaultGCCfg(), reportErrorFunc: func(string, string, error) {}, + rebuildLimiter: &noopRebuildLimiter{}, } } @@ -368,6 +371,14 @@ func WithExpiredCollectorWorkersCount(count int) Option { } } +// WithRebuildWorkerLimiter return option to set concurrent +// workers count of storage rebuild operation. +func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option { + return func(c *cfg) { + c.rebuildLimiter = l + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()