[#1636] blobovniczatree: Use RebuildLimiter

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2025-02-07 17:25:47 +03:00
parent 1c42753994
commit fce25f90eb
Signed by: dstepanov-yadro
GPG key ID: 237AF1A763293BC0
4 changed files with 73 additions and 9 deletions

View file

@ -50,7 +50,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
var res common.RebuildRes var res common.RebuildRes
b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild) b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild)
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage) completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage, prm.Limiter)
res.ObjectsMoved += completedPreviosMoves res.ObjectsMoved += completedPreviosMoves
if err != nil { if err != nil {
b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err)) b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
@ -238,7 +238,7 @@ func (b *Blobovniczas) addRebuildTempFile(ctx context.Context, path string) (fun
}, nil }, nil
} }
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) { func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.RebuildLimiter) (uint64, error) {
var result atomic.Uint64 var result atomic.Uint64
batch := make(map[oid.Address][]byte) batch := make(map[oid.Address][]byte)
@ -253,7 +253,12 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
}) })
for { for {
_, err := blz.Iterate(ctx, prm) release, err := limiter.ReadRequest(ctx)
if err != nil {
return result.Load(), err
}
_, err = blz.Iterate(ctx, prm)
release()
if err != nil && !errors.Is(err, errBatchFull) { if err != nil && !errors.Is(err, errBatchFull) {
return result.Load(), err return result.Load(), err
} }
@ -265,14 +270,19 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
eg, egCtx := errgroup.WithContext(ctx) eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch { for addr, data := range batch {
release, err := concLimiter.AcquireWorkSlot(egCtx) release, err := limiter.AcquireWorkSlot(egCtx)
if err != nil { if err != nil {
_ = eg.Wait() _ = eg.Wait()
return result.Load(), err return result.Load(), err
} }
eg.Go(func() error { eg.Go(func() error {
defer release() defer release()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta) moveRelease, err := limiter.WriteRequest(ctx)
if err != nil {
return err
}
err = b.moveObject(egCtx, blz, blzPath, addr, data, meta)
moveRelease()
if err == nil { if err == nil {
result.Add(1) result.Add(1)
} }
@ -360,7 +370,7 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
return b.dropDirectoryIfEmpty(filepath.Dir(path)) return b.dropDirectoryIfEmpty(filepath.Dir(path))
} }
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) { func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage, rateLimiter common.RateLimiter) (uint64, error) {
var count uint64 var count uint64
var rebuildTempFilesToRemove []string var rebuildTempFilesToRemove []string
err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) { err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) {
@ -373,13 +383,24 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
} }
defer shDB.Close(ctx) defer shDB.Close(ctx)
release, err := rateLimiter.ReadRequest(ctx)
if err != nil {
return false, err
}
incompletedMoves, err := blz.ListMoveInfo(ctx) incompletedMoves, err := blz.ListMoveInfo(ctx)
release()
if err != nil { if err != nil {
return true, err return true, err
} }
for _, move := range incompletedMoves { for _, move := range incompletedMoves {
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil { release, err := rateLimiter.WriteRequest(ctx)
if err != nil {
return false, err
}
err = b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore)
release()
if err != nil {
return true, err return true, err
} }
count++ count++
@ -389,9 +410,14 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
return false, nil return false, nil
}) })
for _, tmp := range rebuildTempFilesToRemove { for _, tmp := range rebuildTempFilesToRemove {
release, err := rateLimiter.WriteRequest(ctx)
if err != nil {
return count, err
}
if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil { if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil {
b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err)) b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
} }
release()
} }
return count, err return count, err
} }

View file

@ -467,3 +467,11 @@ type rebuildLimiterStub struct{}
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) { func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) {
return func() {}, nil return func() {}, nil
} }
func (s *rebuildLimiterStub) ReadRequest(context.Context) (common.ReleaseFunc, error) {
return func() {}, nil
}
func (s *rebuildLimiterStub) WriteRequest(context.Context) (common.ReleaseFunc, error) {
return func() {}, nil
}

View file

@ -27,6 +27,12 @@ type ConcurrencyLimiter interface {
AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error) AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error)
} }
type RateLimiter interface {
ReadRequest(context.Context) (ReleaseFunc, error)
WriteRequest(context.Context) (ReleaseFunc, error)
}
type RebuildLimiter interface { type RebuildLimiter interface {
ConcurrencyLimiter ConcurrencyLimiter
RateLimiter
} }

View file

@ -79,7 +79,6 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
} }
log.Info(ctx, logs.BlobstoreRebuildStarted) log.Info(ctx, logs.BlobstoreRebuildStarted)
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String()) ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
// TODO use shard limiter
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil { if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil {
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err)) log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
} else { } else {
@ -165,5 +164,30 @@ func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error {
return ErrDegradedMode return ErrDegradedMode
} }
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, int(p.TargetFillPercent)) limiter := &rebuildLimiter{
concurrencyLimiter: p.ConcurrencyLimiter,
rateLimiter: s.opsLimiter,
}
return s.rb.ScheduleRebuild(ctx, limiter, int(p.TargetFillPercent))
}
var _ common.RebuildLimiter = (*rebuildLimiter)(nil)
type rebuildLimiter struct {
concurrencyLimiter common.ConcurrencyLimiter
rateLimiter qos.Limiter
}
func (r *rebuildLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) {
return r.concurrencyLimiter.AcquireWorkSlot(ctx)
}
func (r *rebuildLimiter) ReadRequest(ctx context.Context) (common.ReleaseFunc, error) {
release, err := r.rateLimiter.ReadRequest(ctx)
return common.ReleaseFunc(release), err
}
func (r *rebuildLimiter) WriteRequest(ctx context.Context) (common.ReleaseFunc, error) {
release, err := r.rateLimiter.WriteRequest(ctx)
return common.ReleaseFunc(release), err
} }