[#1636] storage: Refactor shard rebuild

Drop redundant interfaces.
Rename fields.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-02-06 17:24:23 +03:00
parent e0dc3c3d0c
commit ceff5e1f6a
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
7 changed files with 72 additions and 79 deletions

View file

@ -79,7 +79,7 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.
var completedDBCount uint32
for _, db := range dbs {
b.log.Debug(ctx, logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.Limiter)
res.ObjectsMoved += movedObjects
if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
@ -195,7 +195,7 @@ func (b *Blobovniczas) rebuildBySize(ctx context.Context, path string, targetFil
return fp < targetFillPercent || fp > 100+(100-targetFillPercent), nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) {
shDB := b.getBlobovnicza(ctx, path)
blz, err := shDB.Open(ctx)
if err != nil {
@ -212,7 +212,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
if err != nil {
return 0, err
}
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, concLimiter)
if err != nil {
return migratedObjects, err
}
@ -238,7 +238,7 @@ func (b *Blobovniczas) addRebuildTempFile(ctx context.Context, path string) (fun
}, nil
}
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) {
var result atomic.Uint64
batch := make(map[oid.Address][]byte)
@ -265,12 +265,13 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
release, err := concLimiter.AcquireWorkSlot(egCtx)
if err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
defer release()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
if err == nil {
result.Add(1)