[#661] blobovniczatree: Make Rebuild concurrent

Different DBs can be rebuild concurrently.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-10-03 11:58:35 +03:00
parent f325b9557c
commit 888f966eb4
18 changed files with 165 additions and 27 deletions

View file

@ -101,6 +101,7 @@ type applicationConfiguration struct {
shardPoolSize uint32 shardPoolSize uint32
shards []shardCfg shards []shardCfg
lowMem bool lowMem bool
rebuildWorkers uint32
} }
} }
@ -208,6 +209,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
} }
@ -687,13 +689,14 @@ func initCfgObject(appCfg *config.Config) cfgObject {
} }
func (c *cfg) engineOpts() []engine.Option { func (c *cfg) engineOpts() []engine.Option {
opts := make([]engine.Option, 0, 4) var opts []engine.Option
opts = append(opts, opts = append(opts,
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize), engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log), engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers),
) )
if c.metricsCollector != nil { if c.metricsCollector != nil {

View file

@ -15,6 +15,9 @@ const (
// ShardPoolSizeDefault is a default value of routine pool size per-shard to // ShardPoolSizeDefault is a default value of routine pool size per-shard to
// process object PUT operations in a storage engine. // process object PUT operations in a storage engine.
ShardPoolSizeDefault = 20 ShardPoolSizeDefault = 20
// RebuildWorkersCountDefault is a default value of the workers count to
// process storage rebuild operations in a storage engine.
RebuildWorkersCountDefault = 100
) )
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found. // ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
@ -88,3 +91,11 @@ func ShardErrorThreshold(c *config.Config) uint32 {
func EngineLowMemoryConsumption(c *config.Config) bool { func EngineLowMemoryConsumption(c *config.Config) bool {
return config.BoolSafe(c.Sub(subsection), "low_mem") return config.BoolSafe(c.Sub(subsection), "low_mem")
} }
// EngineRebuildWorkersCount returns value of "rebuild_workers_count" config parmeter from "storage" section.
func EngineRebuildWorkersCount(c *config.Config) uint32 {
if v := config.Uint32Safe(c.Sub(subsection), "rebuild_workers_count"); v > 0 {
return v
}
return RebuildWorkersCountDefault
}

View file

@ -38,6 +38,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty)) require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty)) require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode()) require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
require.EqualValues(t, engineconfig.RebuildWorkersCountDefault, engineconfig.EngineRebuildWorkersCount(empty))
}) })
const path = "../../../../config/example/node" const path = "../../../../config/example/node"
@ -47,6 +48,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c)) require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c)) require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
require.EqualValues(t, uint32(1000), engineconfig.EngineRebuildWorkersCount(c))
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error { err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
defer func() { defer func() {

View file

@ -92,6 +92,7 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
# Storage engine section # Storage engine section
FROSTFS_STORAGE_SHARD_POOL_SIZE=15 FROSTFS_STORAGE_SHARD_POOL_SIZE=15
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100 FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
## 0 shard ## 0 shard
### Flag to refill Metabase from BlobStor ### Flag to refill Metabase from BlobStor
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false

View file

@ -137,6 +137,7 @@
"storage": { "storage": {
"shard_pool_size": 15, "shard_pool_size": 15,
"shard_ro_error_threshold": 100, "shard_ro_error_threshold": 100,
"rebuild_workers_count": 1000,
"shard": { "shard": {
"0": { "0": {
"mode": "read-only", "mode": "read-only",

View file

@ -116,6 +116,7 @@ storage:
# note: shard configuration can be omitted for relay node (see `node.relay`) # note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors) shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
rebuild_workers_count: 1000 # count of rebuild storage concurrent workers
shard: shard:
default: # section with the default shard parameters default: # section with the default shard parameters

View file

@ -6,6 +6,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -14,6 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed") var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
@ -55,21 +57,48 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
success = false success = false
return res, err return res, err
} }
b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate))) b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
for _, db := range dbsToMigrate { res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res)
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db)) if err != nil {
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage) success = false
res.ObjectsMoved += movedObjects }
if err != nil { return res, err
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err)) }
success = false
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
eg, ctx := errgroup.WithContext(ctx)
var movedObjectsAcc atomic.Uint64
var filesMovedAcc atomic.Uint64
for _, db := range dbs {
db := db
if err := prm.WorkerLimiter.AcquireWorkSlot(ctx); err != nil {
_ = eg.Wait()
res.FilesRemoved += filesMovedAcc.Load()
res.ObjectsMoved += movedObjectsAcc.Load()
return res, err return res, err
} }
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects)) eg.Go(func() error {
res.FilesRemoved++ defer prm.WorkerLimiter.ReleaseWorkSlot()
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage)
movedObjectsAcc.Add(movedObjects)
if err != nil {
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
filesMovedAcc.Add(1)
return nil
})
} }
return res, nil err := eg.Wait()
res.FilesRemoved += filesMovedAcc.Load()
res.ObjectsMoved += movedObjectsAcc.Load()
return res, err
} }
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) { func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {

View file

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"path/filepath" "path/filepath"
"sync"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -147,9 +148,11 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
metaStub := &storageIDUpdateStub{ metaStub := &storageIDUpdateStub{
storageIDs: make(map[oid.Address][]byte), storageIDs: make(map[oid.Address][]byte),
guard: &sync.Mutex{},
} }
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub, MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(1), rRes.ObjectsMoved) require.Equal(t, uint64(1), rRes.ObjectsMoved)

View file

@ -101,9 +101,11 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
metaStub := &storageIDUpdateStub{ metaStub := &storageIDUpdateStub{
storageIDs: storageIDs, storageIDs: storageIDs,
guard: &sync.Mutex{},
} }
var rPrm common.RebuildPrm var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rRes, err := b.Rebuild(context.Background(), rPrm) rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err) require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
@ -121,12 +123,21 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
} }
type storageIDUpdateStub struct { type storageIDUpdateStub struct {
guard *sync.Mutex
storageIDs map[oid.Address][]byte storageIDs map[oid.Address][]byte
updatedCount uint64 updatedCount uint64
} }
func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error { func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
s.guard.Lock()
defer s.guard.Unlock()
s.storageIDs[addr] = storageID s.storageIDs[addr] = storageID
s.updatedCount++ s.updatedCount++
return nil return nil
} }
type rebuildLimiterStub struct{}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}

View file

@ -12,9 +12,15 @@ type RebuildRes struct {
} }
type RebuildPrm struct { type RebuildPrm struct {
MetaStorage MetaStorage MetaStorage MetaStorage
WorkerLimiter ConcurrentWorkersLimiter
} }
type MetaStorage interface { type MetaStorage interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
} }
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}

View file

@ -13,12 +13,18 @@ type StorageIDUpdate interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
} }
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate) error { type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter) error {
var summary common.RebuildRes var summary common.RebuildRes
var rErr error var rErr error
for _, storage := range b.storage { for _, storage := range b.storage {
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{ res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
MetaStorage: upd, MetaStorage: upd,
WorkerLimiter: limiter,
}) })
summary.FilesRemoved += res.FilesRemoved summary.FilesRemoved += res.FilesRemoved
summary.ObjectsMoved += res.ObjectsMoved summary.ObjectsMoved += res.ObjectsMoved

View file

@ -38,6 +38,7 @@ type StorageEngine struct {
err error err error
} }
evacuateLimiter *evacuationLimiter evacuateLimiter *evacuationLimiter
rebuildLimiter *rebuildLimiter
} }
type shardWrapper struct { type shardWrapper struct {
@ -213,13 +214,15 @@ type cfg struct {
shardPoolSize uint32 shardPoolSize uint32
lowMem bool lowMem bool
rebuildWorkersCount uint32
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
return &cfg{ return &cfg{
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
shardPoolSize: 20,
shardPoolSize: 20, rebuildWorkersCount: 100,
} }
} }
@ -238,6 +241,7 @@ func New(opts ...Option) *StorageEngine {
closeCh: make(chan struct{}), closeCh: make(chan struct{}),
setModeCh: make(chan setModeRequest), setModeCh: make(chan setModeRequest),
evacuateLimiter: &evacuationLimiter{}, evacuateLimiter: &evacuationLimiter{},
rebuildLimiter: newRebuildLimiter(c.rebuildWorkersCount),
} }
} }
@ -275,3 +279,10 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
c.lowMem = lowMemCons c.lowMem = lowMemCons
} }
} }
// WithRebuildWorkersCount returns an option to set the count of concurrent rebuild workers.
func WithRebuildWorkersCount(count uint32) Option {
return func(c *cfg) {
c.rebuildWorkersCount = count
}
}

View file

@ -0,0 +1,26 @@
package engine
import "context"
type rebuildLimiter struct {
semaphore chan struct{}
}
func newRebuildLimiter(workersCount uint32) *rebuildLimiter {
return &rebuildLimiter{
semaphore: make(chan struct{}, workersCount),
}
}
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
select {
case l.semaphore <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (l *rebuildLimiter) ReleaseWorkSlot() {
<-l.semaphore
}

View file

@ -110,6 +110,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
shard.WithExpiredLocksCallback(e.processExpiredLocks), shard.WithExpiredLocksCallback(e.processExpiredLocks),
shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithDeletedLockCallback(e.processDeletedLocks),
shard.WithReportErrorFunc(e.reportShardErrorBackground), shard.WithReportErrorFunc(e.reportShardErrorBackground),
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
)...) )...)
if err := sh.UpdateID(ctx); err != nil { if err := sh.UpdateID(ctx); err != nil {

View file

@ -162,7 +162,7 @@ func (s *Shard) Init(ctx context.Context) error {
s.gc.init(ctx) s.gc.init(ctx)
s.rb = newRebuilder() s.rb = newRebuilder(s.rebuildLimiter)
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
return nil return nil

View file

@ -0,0 +1,13 @@
package shard
import "context"
type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
type noopRebuildLimiter struct{}
func (l *noopRebuildLimiter) AcquireWorkSlot(context.Context) error { return nil }
func (l *noopRebuildLimiter) ReleaseWorkSlot() {}

View file

@ -14,16 +14,18 @@ import (
) )
type rebuilder struct { type rebuilder struct {
mtx *sync.Mutex mtx *sync.Mutex
wg *sync.WaitGroup wg *sync.WaitGroup
cancel func() cancel func()
limiter RebuildWorkerLimiter
} }
func newRebuilder() *rebuilder { func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
return &rebuilder{ return &rebuilder{
mtx: &sync.Mutex{}, mtx: &sync.Mutex{},
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
cancel: nil, cancel: nil,
limiter: l,
} }
} }
@ -45,7 +47,7 @@ func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D
defer r.wg.Done() defer r.wg.Done()
log.Info(logs.BlobstoreRebuildStarted) log.Info(logs.BlobstoreRebuildStarted)
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}); err != nil { if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil {
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err)) log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
} else { } else {
log.Info(logs.BlobstoreRebuildCompletedSuccessfully) log.Info(logs.BlobstoreRebuildCompletedSuccessfully)

View file

@ -123,6 +123,8 @@ type cfg struct {
metricsWriter MetricsWriter metricsWriter MetricsWriter
reportErrorFunc func(selfID string, message string, err error) reportErrorFunc func(selfID string, message string, err error)
rebuildLimiter RebuildWorkerLimiter
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -131,6 +133,7 @@ func defaultCfg() *cfg {
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
gcCfg: defaultGCCfg(), gcCfg: defaultGCCfg(),
reportErrorFunc: func(string, string, error) {}, reportErrorFunc: func(string, string, error) {},
rebuildLimiter: &noopRebuildLimiter{},
} }
} }
@ -368,6 +371,14 @@ func WithExpiredCollectorWorkersCount(count int) Option {
} }
} }
// WithRebuildWorkerLimiter return option to set concurrent
// workers count of storage rebuild operation.
func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
return func(c *cfg) {
c.rebuildLimiter = l
}
}
func (s *Shard) fillInfo() { func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()