From ceff5e1f6a08c12d84a04fcfa2e23f097bfa97fa Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 6 Feb 2025 17:24:23 +0300 Subject: [PATCH] [#1636] storage: Refactor shard rebuild Drop redundant interfaces. Rename fields. Signed-off-by: Dmitrii Stepanov --- .../blobstor/blobovniczatree/rebuild.go | 13 ++--- .../blobovniczatree/rebuild_failover_test.go | 6 +-- .../blobstor/blobovniczatree/rebuild_test.go | 33 ++++++------- .../blobstor/common/rebuild.go | 17 ++++--- pkg/local_object_storage/blobstor/rebuild.go | 13 ++--- pkg/local_object_storage/engine/rebuild.go | 22 ++++++++- pkg/local_object_storage/shard/rebuild.go | 47 ++++--------------- 7 files changed, 72 insertions(+), 79 deletions(-) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go index 16ef2b180..cbd45c3b4 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go @@ -79,7 +79,7 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common. var completedDBCount uint32 for _, db := range dbs { b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db)) - movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter) + movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.Limiter) res.ObjectsMoved += movedObjects if err != nil { b.log.Warn(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err)) @@ -195,7 +195,7 @@ func (b *Blobovniczas) rebuildBySize(ctx context.Context, path string, targetFil return fp < targetFillPercent || fp > 100+(100-targetFillPercent), nil } -func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) { +func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) { shDB := b.getBlobovnicza(ctx, path) blz, err := shDB.Open(ctx) if err != nil { @@ -212,7 +212,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M if err != nil { return 0, err } - migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter) + migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, concLimiter) if err != nil { return migratedObjects, err } @@ -238,7 +238,7 @@ func (b *Blobovniczas) addRebuildTempFile(ctx context.Context, path string) (fun }, nil } -func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) { +func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) { var result atomic.Uint64 batch := make(map[oid.Address][]byte) @@ -265,12 +265,13 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn eg, egCtx := errgroup.WithContext(ctx) for addr, data := range batch { - if err := limiter.AcquireWorkSlot(egCtx); err != nil { + release, err := concLimiter.AcquireWorkSlot(egCtx) + if err != nil { _ = eg.Wait() return result.Load(), err } eg.Go(func() error { - defer limiter.ReleaseWorkSlot() + defer release() err := b.moveObject(egCtx, blz, blzPath, addr, data, meta) if err == nil { result.Add(1) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go index 2f58624aa..91578d5e8 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_failover_test.go @@ -162,9 +162,9 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object guard: &sync.Mutex{}, } rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ - MetaStorage: metaStub, - WorkerLimiter: &rebuildLimiterStub{}, - FillPercent: 1, + MetaStorage: metaStub, + Limiter: &rebuildLimiterStub{}, + FillPercent: 1, }) require.NoError(t, err) require.Equal(t, uint64(1), rRes.ObjectsMoved) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go index aae72b5ff..e26c485ba 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go @@ -77,9 +77,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) { guard: &sync.Mutex{}, } rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ - MetaStorage: metaStub, - WorkerLimiter: &rebuildLimiterStub{}, - FillPercent: 60, + MetaStorage: metaStub, + Limiter: &rebuildLimiterStub{}, + FillPercent: 60, }) require.NoError(t, err) dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 @@ -129,9 +129,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) { guard: &sync.Mutex{}, } rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ - MetaStorage: metaStub, - WorkerLimiter: &rebuildLimiterStub{}, - FillPercent: 90, // 64KB / 100KB = 64% + MetaStorage: metaStub, + Limiter: &rebuildLimiterStub{}, + FillPercent: 90, // 64KB / 100KB = 64% }) require.NoError(t, err) dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 @@ -194,9 +194,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) { guard: &sync.Mutex{}, } rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ - MetaStorage: metaStub, - WorkerLimiter: &rebuildLimiterStub{}, - FillPercent: 80, + MetaStorage: metaStub, + Limiter: &rebuildLimiterStub{}, + FillPercent: 80, }) require.NoError(t, err) require.Equal(t, uint64(49), rRes.FilesRemoved) @@ -267,9 +267,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) { require.NoError(t, b.Init()) rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ - MetaStorage: metaStub, - WorkerLimiter: &rebuildLimiterStub{}, - FillPercent: 80, + MetaStorage: metaStub, + Limiter: &rebuildLimiterStub{}, + FillPercent: 80, }) require.NoError(t, err) require.Equal(t, uint64(49), rRes.FilesRemoved) @@ -340,7 +340,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) { } var rPrm common.RebuildPrm rPrm.MetaStorage = metaStub - rPrm.WorkerLimiter = &rebuildLimiterStub{} + rPrm.Limiter = &rebuildLimiterStub{} rPrm.FillPercent = 1 rRes, err := b.Rebuild(context.Background(), rPrm) require.NoError(t, err) @@ -429,7 +429,7 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta } var rPrm common.RebuildPrm rPrm.MetaStorage = metaStub - rPrm.WorkerLimiter = &rebuildLimiterStub{} + rPrm.Limiter = &rebuildLimiterStub{} rPrm.FillPercent = 1 rRes, err := b.Rebuild(context.Background(), rPrm) require.NoError(t, err) @@ -464,5 +464,6 @@ func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Addr type rebuildLimiterStub struct{} -func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil } -func (s *rebuildLimiterStub) ReleaseWorkSlot() {} +func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) { + return func() {}, nil +} diff --git a/pkg/local_object_storage/blobstor/common/rebuild.go b/pkg/local_object_storage/blobstor/common/rebuild.go index 19e181ee7..4615190f7 100644 --- a/pkg/local_object_storage/blobstor/common/rebuild.go +++ b/pkg/local_object_storage/blobstor/common/rebuild.go @@ -12,16 +12,21 @@ type RebuildRes struct { } type RebuildPrm struct { - MetaStorage MetaStorage - WorkerLimiter ConcurrentWorkersLimiter - FillPercent int + MetaStorage MetaStorage + Limiter RebuildLimiter + FillPercent int } type MetaStorage interface { UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error } -type ConcurrentWorkersLimiter interface { - AcquireWorkSlot(ctx context.Context) error - ReleaseWorkSlot() +type ReleaseFunc func() + +type ConcurrencyLimiter interface { + AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error) +} + +type RebuildLimiter interface { + ConcurrencyLimiter } diff --git a/pkg/local_object_storage/blobstor/rebuild.go b/pkg/local_object_storage/blobstor/rebuild.go index 2a6b94789..f28816555 100644 --- a/pkg/local_object_storage/blobstor/rebuild.go +++ b/pkg/local_object_storage/blobstor/rebuild.go @@ -13,19 +13,14 @@ type StorageIDUpdate interface { UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error } -type ConcurrentWorkersLimiter interface { - AcquireWorkSlot(ctx context.Context) error - ReleaseWorkSlot() -} - -func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, fillPercent int) error { +func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, concLimiter common.RebuildLimiter, fillPercent int) error { var summary common.RebuildRes var rErr error for _, storage := range b.storage { res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{ - MetaStorage: upd, - WorkerLimiter: limiter, - FillPercent: fillPercent, + MetaStorage: upd, + Limiter: concLimiter, + FillPercent: fillPercent, }) summary.FilesRemoved += res.FilesRemoved summary.ObjectsMoved += res.ObjectsMoved diff --git a/pkg/local_object_storage/engine/rebuild.go b/pkg/local_object_storage/engine/rebuild.go index 83c6a54ed..a29dd7ed9 100644 --- a/pkg/local_object_storage/engine/rebuild.go +++ b/pkg/local_object_storage/engine/rebuild.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "go.opentelemetry.io/otel/attribute" @@ -41,7 +42,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes } resGuard := &sync.Mutex{} - limiter := shard.NewRebuildLimiter(prm.ConcurrencyLimit) + concLimiter := &concurrencyLimiter{semaphore: make(chan struct{}, prm.ConcurrencyLimit)} eg, egCtx := errgroup.WithContext(ctx) for _, shardID := range prm.ShardIDs { @@ -61,7 +62,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes } err := sh.ScheduleRebuild(egCtx, shard.RebuildPrm{ - ConcurrencyLimiter: limiter, + ConcurrencyLimiter: concLimiter, TargetFillPercent: prm.TargetFillPercent, }) @@ -88,3 +89,20 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes } return res, nil } + +type concurrencyLimiter struct { + semaphore chan struct{} +} + +func (l *concurrencyLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) { + select { + case l.semaphore <- struct{}{}: + return l.releaseWorkSlot, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (l *concurrencyLimiter) releaseWorkSlot() { + <-l.semaphore +} diff --git a/pkg/local_object_storage/shard/rebuild.go b/pkg/local_object_storage/shard/rebuild.go index 9fe1bbe8c..3aa94d5a3 100644 --- a/pkg/local_object_storage/shard/rebuild.go +++ b/pkg/local_object_storage/shard/rebuild.go @@ -8,6 +8,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" @@ -20,37 +21,9 @@ import ( var ErrRebuildInProgress = errors.New("shard rebuild in progress") -type RebuildWorkerLimiter interface { - AcquireWorkSlot(ctx context.Context) error - ReleaseWorkSlot() -} - -type rebuildLimiter struct { - semaphore chan struct{} -} - -func NewRebuildLimiter(workersCount uint32) RebuildWorkerLimiter { - return &rebuildLimiter{ - semaphore: make(chan struct{}, workersCount), - } -} - -func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error { - select { - case l.semaphore <- struct{}{}: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -func (l *rebuildLimiter) ReleaseWorkSlot() { - <-l.semaphore -} - type rebuildTask struct { - limiter RebuildWorkerLimiter - fillPercent int + concurrencyLimiter common.RebuildLimiter + fillPercent int } type rebuilder struct { @@ -90,14 +63,14 @@ func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D if !ok { continue } - runRebuild(ctx, bs, mb, log, t.fillPercent, t.limiter) + runRebuild(ctx, bs, mb, log, t.fillPercent, t.concurrencyLimiter) } } }() } func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger, - fillPercent int, limiter RebuildWorkerLimiter, + fillPercent int, concLimiter common.RebuildLimiter, ) { select { case <-ctx.Done(): @@ -107,21 +80,21 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo log.Info(ctx, logs.BlobstoreRebuildStarted) ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String()) // TODO use shard limiter - if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil { + if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil { log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err)) } else { log.Info(ctx, logs.BlobstoreRebuildCompletedSuccessfully) } } -func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, fillPercent int, +func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter common.RebuildLimiter, fillPercent int, ) error { select { case <-ctx.Done(): return ctx.Err() case r.tasks <- rebuildTask{ - limiter: limiter, - fillPercent: fillPercent, + concurrencyLimiter: limiter, + fillPercent: fillPercent, }: return nil default: @@ -170,7 +143,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres } type RebuildPrm struct { - ConcurrencyLimiter RebuildWorkerLimiter + ConcurrencyLimiter common.ConcurrencyLimiter TargetFillPercent uint32 }