From b9360be1dcd8b08ab20e2a73a150ceadeeeea966 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 7 Feb 2025 17:25:47 +0300 Subject: [PATCH] [#1636] blobovniczatree: Use RebuildLimiter Signed-off-by: Dmitrii Stepanov --- .../blobstor/blobovniczatree/rebuild.go | 40 +++++++++++++++---- .../blobstor/blobovniczatree/rebuild_test.go | 8 ++++ .../blobstor/common/rebuild.go | 6 +++ pkg/local_object_storage/shard/rebuild.go | 28 ++++++++++++- 4 files changed, 73 insertions(+), 9 deletions(-) diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go index cbd45c3b..7ef3317f 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go @@ -50,7 +50,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm var res common.RebuildRes b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild) - completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage) + completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage, prm.Limiter) res.ObjectsMoved += completedPreviosMoves if err != nil { b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err)) @@ -238,7 +238,7 @@ func (b *Blobovniczas) addRebuildTempFile(ctx context.Context, path string) (fun }, nil } -func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) { +func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.RebuildLimiter) (uint64, error) { var result atomic.Uint64 batch := make(map[oid.Address][]byte) @@ -253,7 +253,12 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn }) for { - _, err := blz.Iterate(ctx, prm) + release, err := limiter.ReadRequest(ctx) + if err != nil { + return result.Load(), err + } + _, err = blz.Iterate(ctx, prm) + release() if err != nil && !errors.Is(err, errBatchFull) { return result.Load(), err } @@ -265,14 +270,19 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn eg, egCtx := errgroup.WithContext(ctx) for addr, data := range batch { - release, err := concLimiter.AcquireWorkSlot(egCtx) + release, err := limiter.AcquireWorkSlot(egCtx) if err != nil { _ = eg.Wait() return result.Load(), err } eg.Go(func() error { defer release() - err := b.moveObject(egCtx, blz, blzPath, addr, data, meta) + moveRelease, err := limiter.WriteRequest(ctx) + if err != nil { + return err + } + err = b.moveObject(egCtx, blz, blzPath, addr, data, meta) + moveRelease() if err == nil { result.Add(1) } @@ -360,7 +370,7 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error { return b.dropDirectoryIfEmpty(filepath.Dir(path)) } -func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) { +func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage, rateLimiter common.RateLimiter) (uint64, error) { var count uint64 var rebuildTempFilesToRemove []string err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) { @@ -373,13 +383,24 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co } defer shDB.Close(ctx) + release, err := rateLimiter.ReadRequest(ctx) + if err != nil { + return false, err + } incompletedMoves, err := blz.ListMoveInfo(ctx) + release() if err != nil { return true, err } for _, move := range incompletedMoves { - if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil { + release, err := rateLimiter.WriteRequest(ctx) + if err != nil { + return false, err + } + err = b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore) + release() + if err != nil { return true, err } count++ @@ -389,9 +410,14 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co return false, nil }) for _, tmp := range rebuildTempFilesToRemove { + release, err := rateLimiter.WriteRequest(ctx) + if err != nil { + return count, err + } if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil { b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err)) } + release() } return count, err } diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go index e26c485b..865d04a8 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go @@ -467,3 +467,11 @@ type rebuildLimiterStub struct{} func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) { return func() {}, nil } + +func (s *rebuildLimiterStub) ReadRequest(context.Context) (common.ReleaseFunc, error) { + return func() {}, nil +} + +func (s *rebuildLimiterStub) WriteRequest(context.Context) (common.ReleaseFunc, error) { + return func() {}, nil +} diff --git a/pkg/local_object_storage/blobstor/common/rebuild.go b/pkg/local_object_storage/blobstor/common/rebuild.go index 4615190f..788fe66f 100644 --- a/pkg/local_object_storage/blobstor/common/rebuild.go +++ b/pkg/local_object_storage/blobstor/common/rebuild.go @@ -27,6 +27,12 @@ type ConcurrencyLimiter interface { AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error) } +type RateLimiter interface { + ReadRequest(context.Context) (ReleaseFunc, error) + WriteRequest(context.Context) (ReleaseFunc, error) +} + type RebuildLimiter interface { ConcurrencyLimiter + RateLimiter } diff --git a/pkg/local_object_storage/shard/rebuild.go b/pkg/local_object_storage/shard/rebuild.go index 3aa94d5a..20f1f2b6 100644 --- a/pkg/local_object_storage/shard/rebuild.go +++ b/pkg/local_object_storage/shard/rebuild.go @@ -79,7 +79,6 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo } log.Info(ctx, logs.BlobstoreRebuildStarted) ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String()) - // TODO use shard limiter if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil { log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err)) } else { @@ -165,5 +164,30 @@ func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error { return ErrDegradedMode } - return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, int(p.TargetFillPercent)) + limiter := &rebuildLimiter{ + concurrencyLimiter: p.ConcurrencyLimiter, + rateLimiter: s.opsLimiter, + } + return s.rb.ScheduleRebuild(ctx, limiter, int(p.TargetFillPercent)) +} + +var _ common.RebuildLimiter = (*rebuildLimiter)(nil) + +type rebuildLimiter struct { + concurrencyLimiter common.ConcurrencyLimiter + rateLimiter qos.Limiter +} + +func (r *rebuildLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) { + return r.concurrencyLimiter.AcquireWorkSlot(ctx) +} + +func (r *rebuildLimiter) ReadRequest(ctx context.Context) (common.ReleaseFunc, error) { + release, err := r.rateLimiter.ReadRequest(ctx) + return common.ReleaseFunc(release), err +} + +func (r *rebuildLimiter) WriteRequest(ctx context.Context) (common.ReleaseFunc, error) { + release, err := r.rateLimiter.WriteRequest(ctx) + return common.ReleaseFunc(release), err }