[#9999] storage: Refactor shard rebuild
All checks were successful
DCO action / DCO (pull_request) Successful in 45s
Vulncheck / Vulncheck (pull_request) Successful in 54s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m27s
Build / Build Components (pull_request) Successful in 1m27s
Tests and linters / Run gofumpt (pull_request) Successful in 1m44s
Tests and linters / Staticcheck (pull_request) Successful in 2m2s
Tests and linters / Lint (pull_request) Successful in 2m57s
Tests and linters / Tests (pull_request) Successful in 3m24s
Tests and linters / gopls check (pull_request) Successful in 3m37s
Tests and linters / Tests with -race (pull_request) Successful in 3m48s

Drop redundant interfaces.
Rename fields.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-02-06 17:24:23 +03:00
parent 36fd4d8dfd
commit c82056f963
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
7 changed files with 72 additions and 79 deletions

View file

@ -79,7 +79,7 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.
var completedDBCount uint32
for _, db := range dbs {
b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.Limiter)
res.ObjectsMoved += movedObjects
if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
@ -195,7 +195,7 @@ func (b *Blobovniczas) rebuildBySize(ctx context.Context, path string, targetFil
return fp < targetFillPercent || fp > 100+(100-targetFillPercent), nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) {
shDB := b.getBlobovnicza(ctx, path)
blz, err := shDB.Open(ctx)
if err != nil {
@ -212,7 +212,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
if err != nil {
return 0, err
}
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, concLimiter)
if err != nil {
return migratedObjects, err
}
@ -238,7 +238,7 @@ func (b *Blobovniczas) addRebuildTempFile(ctx context.Context, path string) (fun
}, nil
}
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) {
var result atomic.Uint64
batch := make(map[oid.Address][]byte)
@ -265,12 +265,13 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
release, err := concLimiter.AcquireWorkSlot(egCtx)
if err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
defer release()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
if err == nil {
result.Add(1)

View file

@ -162,9 +162,9 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 1,
MetaStorage: metaStub,
Limiter: &rebuildLimiterStub{},
FillPercent: 1,
})
require.NoError(t, err)
require.Equal(t, uint64(1), rRes.ObjectsMoved)

View file

@ -77,9 +77,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 60,
MetaStorage: metaStub,
Limiter: &rebuildLimiterStub{},
FillPercent: 60,
})
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
@ -129,9 +129,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 90, // 64KB / 100KB = 64%
MetaStorage: metaStub,
Limiter: &rebuildLimiterStub{},
FillPercent: 90, // 64KB / 100KB = 64%
})
require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
@ -194,9 +194,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
guard: &sync.Mutex{},
}
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 80,
MetaStorage: metaStub,
Limiter: &rebuildLimiterStub{},
FillPercent: 80,
})
require.NoError(t, err)
require.Equal(t, uint64(49), rRes.FilesRemoved)
@ -267,9 +267,9 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
require.NoError(t, b.Init())
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 80,
MetaStorage: metaStub,
Limiter: &rebuildLimiterStub{},
FillPercent: 80,
})
require.NoError(t, err)
require.Equal(t, uint64(49), rRes.FilesRemoved)
@ -340,7 +340,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rPrm.Limiter = &rebuildLimiterStub{}
rPrm.FillPercent = 1
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
@ -429,7 +429,7 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
}
var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{}
rPrm.Limiter = &rebuildLimiterStub{}
rPrm.FillPercent = 1
rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err)
@ -464,5 +464,6 @@ func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Addr
type rebuildLimiterStub struct{}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) {
return func() {}, nil
}

View file

@ -12,16 +12,21 @@ type RebuildRes struct {
}
type RebuildPrm struct {
MetaStorage MetaStorage
WorkerLimiter ConcurrentWorkersLimiter
FillPercent int
MetaStorage MetaStorage
Limiter RebuildLimiter
FillPercent int
}
type MetaStorage interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
type ReleaseFunc func()
type ConcurrencyLimiter interface {
AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error)
}
type RebuildLimiter interface {
ConcurrencyLimiter
}

View file

@ -13,19 +13,14 @@ type StorageIDUpdate interface {
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
}
type ConcurrentWorkersLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, fillPercent int) error {
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, concLimiter common.RebuildLimiter, fillPercent int) error {
var summary common.RebuildRes
var rErr error
for _, storage := range b.storage {
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
MetaStorage: upd,
WorkerLimiter: limiter,
FillPercent: fillPercent,
MetaStorage: upd,
Limiter: concLimiter,
FillPercent: fillPercent,
})
summary.FilesRemoved += res.FilesRemoved
summary.ObjectsMoved += res.ObjectsMoved

View file

@ -4,6 +4,7 @@ import (
"context"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"go.opentelemetry.io/otel/attribute"
@ -41,7 +42,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
}
resGuard := &sync.Mutex{}
limiter := shard.NewRebuildLimiter(prm.ConcurrencyLimit)
concLimiter := &concurrencyLimiter{semaphore: make(chan struct{}, prm.ConcurrencyLimit)}
eg, egCtx := errgroup.WithContext(ctx)
for _, shardID := range prm.ShardIDs {
@ -61,7 +62,7 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
}
err := sh.ScheduleRebuild(egCtx, shard.RebuildPrm{
ConcurrencyLimiter: limiter,
ConcurrencyLimiter: concLimiter,
TargetFillPercent: prm.TargetFillPercent,
})
@ -88,3 +89,20 @@ func (e *StorageEngine) Rebuild(ctx context.Context, prm RebuildPrm) (RebuildRes
}
return res, nil
}
type concurrencyLimiter struct {
semaphore chan struct{}
}
func (l *concurrencyLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) {
select {
case l.semaphore <- struct{}{}:
return l.releaseWorkSlot, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (l *concurrencyLimiter) releaseWorkSlot() {
<-l.semaphore
}

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -18,37 +19,9 @@ import (
var ErrRebuildInProgress = errors.New("shard rebuild in progress")
type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot()
}
type rebuildLimiter struct {
semaphore chan struct{}
}
func NewRebuildLimiter(workersCount uint32) RebuildWorkerLimiter {
return &rebuildLimiter{
semaphore: make(chan struct{}, workersCount),
}
}
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
select {
case l.semaphore <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (l *rebuildLimiter) ReleaseWorkSlot() {
<-l.semaphore
}
type rebuildTask struct {
limiter RebuildWorkerLimiter
fillPercent int
concurrencyLimiter common.RebuildLimiter
fillPercent int
}
type rebuilder struct {
@ -88,14 +61,14 @@ func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D
if !ok {
continue
}
runRebuild(ctx, bs, mb, log, t.fillPercent, t.limiter)
runRebuild(ctx, bs, mb, log, t.fillPercent, t.concurrencyLimiter)
}
}
}()
}
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
fillPercent int, limiter RebuildWorkerLimiter,
fillPercent int, concLimiter common.RebuildLimiter,
) {
select {
case <-ctx.Done():
@ -104,21 +77,21 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
}
log.Info(ctx, logs.BlobstoreRebuildStarted)
// TODO use shard limiter
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil {
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(ctx, logs.BlobstoreRebuildCompletedSuccessfully)
}
}
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, fillPercent int,
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter common.RebuildLimiter, fillPercent int,
) error {
select {
case <-ctx.Done():
return ctx.Err()
case r.tasks <- rebuildTask{
limiter: limiter,
fillPercent: fillPercent,
concurrencyLimiter: limiter,
fillPercent: fillPercent,
}:
return nil
default:
@ -167,7 +140,7 @@ func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Addres
}
type RebuildPrm struct {
ConcurrencyLimiter RebuildWorkerLimiter
ConcurrencyLimiter common.ConcurrencyLimiter
TargetFillPercent uint32
}