[#1636] blobovniczatree: Use RebuildLimiter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
b010de88c2
commit
bf2dcd1e81
4 changed files with 73 additions and 9 deletions
|
@ -50,7 +50,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
|
||||||
var res common.RebuildRes
|
var res common.RebuildRes
|
||||||
|
|
||||||
b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild)
|
b.log.Debug(ctx, logs.BlobovniczaTreeCompletingPreviousRebuild)
|
||||||
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage)
|
completedPreviosMoves, err := b.completeIncompletedMove(ctx, prm.MetaStorage, prm.Limiter)
|
||||||
res.ObjectsMoved += completedPreviosMoves
|
res.ObjectsMoved += completedPreviosMoves
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
|
b.log.Warn(ctx, logs.BlobovniczaTreeCompletedPreviousRebuildFailed, zap.Error(err))
|
||||||
|
@ -238,7 +238,7 @@ func (b *Blobovniczas) addRebuildTempFile(ctx context.Context, path string) (fun
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, concLimiter common.RebuildLimiter) (uint64, error) {
|
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.RebuildLimiter) (uint64, error) {
|
||||||
var result atomic.Uint64
|
var result atomic.Uint64
|
||||||
batch := make(map[oid.Address][]byte)
|
batch := make(map[oid.Address][]byte)
|
||||||
|
|
||||||
|
@ -253,7 +253,12 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
|
||||||
})
|
})
|
||||||
|
|
||||||
for {
|
for {
|
||||||
_, err := blz.Iterate(ctx, prm)
|
release, err := limiter.ReadRequest(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return result.Load(), err
|
||||||
|
}
|
||||||
|
_, err = blz.Iterate(ctx, prm)
|
||||||
|
release()
|
||||||
if err != nil && !errors.Is(err, errBatchFull) {
|
if err != nil && !errors.Is(err, errBatchFull) {
|
||||||
return result.Load(), err
|
return result.Load(), err
|
||||||
}
|
}
|
||||||
|
@ -265,14 +270,19 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
|
||||||
eg, egCtx := errgroup.WithContext(ctx)
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
for addr, data := range batch {
|
for addr, data := range batch {
|
||||||
release, err := concLimiter.AcquireWorkSlot(egCtx)
|
release, err := limiter.AcquireWorkSlot(egCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = eg.Wait()
|
_ = eg.Wait()
|
||||||
return result.Load(), err
|
return result.Load(), err
|
||||||
}
|
}
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
defer release()
|
defer release()
|
||||||
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
|
moveRelease, err := limiter.WriteRequest(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = b.moveObject(egCtx, blz, blzPath, addr, data, meta)
|
||||||
|
moveRelease()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
result.Add(1)
|
result.Add(1)
|
||||||
}
|
}
|
||||||
|
@ -360,7 +370,7 @@ func (b *Blobovniczas) dropDirectoryIfEmpty(path string) error {
|
||||||
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
return b.dropDirectoryIfEmpty(filepath.Dir(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage) (uint64, error) {
|
func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore common.MetaStorage, rateLimiter common.RateLimiter) (uint64, error) {
|
||||||
var count uint64
|
var count uint64
|
||||||
var rebuildTempFilesToRemove []string
|
var rebuildTempFilesToRemove []string
|
||||||
err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) {
|
err := b.iterateIncompletedRebuildDBPaths(ctx, func(s string) (bool, error) {
|
||||||
|
@ -373,13 +383,24 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
|
||||||
}
|
}
|
||||||
defer shDB.Close(ctx)
|
defer shDB.Close(ctx)
|
||||||
|
|
||||||
|
release, err := rateLimiter.ReadRequest(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
incompletedMoves, err := blz.ListMoveInfo(ctx)
|
incompletedMoves, err := blz.ListMoveInfo(ctx)
|
||||||
|
release()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, move := range incompletedMoves {
|
for _, move := range incompletedMoves {
|
||||||
if err := b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore); err != nil {
|
release, err := rateLimiter.WriteRequest(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
err = b.performMove(ctx, blz, shDB.SystemPath(), move, metaStore)
|
||||||
|
release()
|
||||||
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
count++
|
count++
|
||||||
|
@ -389,9 +410,14 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
|
||||||
return false, nil
|
return false, nil
|
||||||
})
|
})
|
||||||
for _, tmp := range rebuildTempFilesToRemove {
|
for _, tmp := range rebuildTempFilesToRemove {
|
||||||
|
release, err := rateLimiter.WriteRequest(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return count, err
|
||||||
|
}
|
||||||
if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil {
|
if err := os.Remove(filepath.Join(b.rootPath, tmp)); err != nil {
|
||||||
b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
|
b.log.Warn(ctx, logs.BlobovniczatreeFailedToRemoveRebuildTempFile, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
release()
|
||||||
}
|
}
|
||||||
return count, err
|
return count, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -467,3 +467,11 @@ type rebuildLimiterStub struct{}
|
||||||
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) {
|
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) (common.ReleaseFunc, error) {
|
||||||
return func() {}, nil
|
return func() {}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *rebuildLimiterStub) ReadRequest(context.Context) (common.ReleaseFunc, error) {
|
||||||
|
return func() {}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *rebuildLimiterStub) WriteRequest(context.Context) (common.ReleaseFunc, error) {
|
||||||
|
return func() {}, nil
|
||||||
|
}
|
||||||
|
|
|
@ -27,6 +27,12 @@ type ConcurrencyLimiter interface {
|
||||||
AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error)
|
AcquireWorkSlot(ctx context.Context) (ReleaseFunc, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RateLimiter interface {
|
||||||
|
ReadRequest(context.Context) (ReleaseFunc, error)
|
||||||
|
WriteRequest(context.Context) (ReleaseFunc, error)
|
||||||
|
}
|
||||||
|
|
||||||
type RebuildLimiter interface {
|
type RebuildLimiter interface {
|
||||||
ConcurrencyLimiter
|
ConcurrencyLimiter
|
||||||
|
RateLimiter
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,6 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
|
||||||
}
|
}
|
||||||
log.Info(ctx, logs.BlobstoreRebuildStarted)
|
log.Info(ctx, logs.BlobstoreRebuildStarted)
|
||||||
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
ctx = tagging.ContextWithIOTag(ctx, qos.IOTagBackground.String())
|
||||||
// TODO use shard limiter
|
|
||||||
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil {
|
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, concLimiter, fillPercent); err != nil {
|
||||||
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
|
log.Warn(ctx, logs.FailedToRebuildBlobstore, zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
|
@ -165,5 +164,30 @@ func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error {
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, int(p.TargetFillPercent))
|
limiter := &rebuildLimiter{
|
||||||
|
concurrencyLimiter: p.ConcurrencyLimiter,
|
||||||
|
rateLimiter: s.opsLimiter,
|
||||||
|
}
|
||||||
|
return s.rb.ScheduleRebuild(ctx, limiter, int(p.TargetFillPercent))
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ common.RebuildLimiter = (*rebuildLimiter)(nil)
|
||||||
|
|
||||||
|
type rebuildLimiter struct {
|
||||||
|
concurrencyLimiter common.ConcurrencyLimiter
|
||||||
|
rateLimiter qos.Limiter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rebuildLimiter) AcquireWorkSlot(ctx context.Context) (common.ReleaseFunc, error) {
|
||||||
|
return r.concurrencyLimiter.AcquireWorkSlot(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rebuildLimiter) ReadRequest(ctx context.Context) (common.ReleaseFunc, error) {
|
||||||
|
release, err := r.rateLimiter.ReadRequest(ctx)
|
||||||
|
return common.ReleaseFunc(release), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rebuildLimiter) WriteRequest(ctx context.Context) (common.ReleaseFunc, error) {
|
||||||
|
release, err := r.rateLimiter.WriteRequest(ctx)
|
||||||
|
return common.ReleaseFunc(release), err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue