diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/option.go b/pkg/local_object_storage/blobstor/blobovniczatree/option.go index f1eb1eb8..93dc487e 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/option.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/option.go @@ -25,6 +25,7 @@ type cfg struct { metrics Metrics waitBeforeDropDB time.Duration blzInitWorkerCount int + blzMoveBatchSize int } type Option func(*cfg) @@ -36,6 +37,7 @@ const ( defaultBlzShallowWidth = 16 defaultWaitBeforeDropDB = 10 * time.Second defaultBlzInitWorkerCount = 5 + defaulBlzMoveBatchSize = 10000 ) func initConfig(c *cfg) { @@ -49,6 +51,7 @@ func initConfig(c *cfg) { metrics: &noopMetrics{}, waitBeforeDropDB: defaultWaitBeforeDropDB, blzInitWorkerCount: defaultBlzInitWorkerCount, + blzMoveBatchSize: defaulBlzMoveBatchSize, } } @@ -119,6 +122,12 @@ func WithWaitBeforeDropDB(t time.Duration) Option { } } +func WithMoveBatchSize(v int) Option { + return func(c *cfg) { + c.blzMoveBatchSize = v + } +} + // WithInitWorkerCount sets maximum workers count to init blobovnicza tree. // // Negative or zero value means no limit. diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go index a0ddbfde..c7e44fdb 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go @@ -1,6 +1,7 @@ package blobovniczatree import ( + "bytes" "context" "errors" "os" @@ -18,7 +19,10 @@ import ( "golang.org/x/sync/errgroup" ) -var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed") +var ( + errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed") + errBatchFull = errors.New("batch full") +) func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) { if b.readOnly { @@ -67,38 +71,18 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm } func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) { - eg, ctx := errgroup.WithContext(ctx) - - var movedObjectsAcc atomic.Uint64 - var filesMovedAcc atomic.Uint64 for _, db := range dbs { - db := db - if err := prm.WorkerLimiter.AcquireWorkSlot(ctx); err != nil { - _ = eg.Wait() - res.FilesRemoved += filesMovedAcc.Load() - res.ObjectsMoved += movedObjectsAcc.Load() + b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db)) + movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter) + res.ObjectsMoved += movedObjects + if err != nil { + b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err)) return res, err } - eg.Go(func() error { - defer prm.WorkerLimiter.ReleaseWorkSlot() - - b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db)) - movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage) - movedObjectsAcc.Add(movedObjects) - if err != nil { - b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err)) - return err - } - b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects)) - filesMovedAcc.Add(1) - return nil - }) + b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects)) + res.FilesRemoved++ } - - err := eg.Wait() - res.FilesRemoved += filesMovedAcc.Load() - res.ObjectsMoved += movedObjectsAcc.Load() - return res, err + return res, nil } func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) { @@ -122,7 +106,7 @@ func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) { return result, nil } -func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage) (uint64, error) { +func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) { shDB := b.getBlobovnicza(path) blz, err := shDB.Open() if err != nil { @@ -136,7 +120,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M shDB.Close() }() - migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta) + migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter) if err != nil { return migratedObjects, err } @@ -144,21 +128,57 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M return migratedObjects, err } -func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage) (uint64, error) { - var result uint64 +func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) { + var result atomic.Uint64 + batch := make(map[oid.Address][]byte) var prm blobovnicza.IteratePrm prm.DecodeAddresses() prm.SetHandler(func(ie blobovnicza.IterationElement) error { - e := b.moveObject(ctx, blz, blzPath, ie.Address(), ie.ObjectData(), meta) - if e == nil { - result++ + batch[ie.Address()] = bytes.Clone(ie.ObjectData()) + if len(batch) == b.blzMoveBatchSize { + return errBatchFull } - return e + return nil }) - _, err := blz.Iterate(ctx, prm) - return result, err + for { + _, err := blz.Iterate(ctx, prm) + if err != nil && !errors.Is(err, errBatchFull) { + return result.Load(), err + } + + if len(batch) == 0 { + break + } + + eg, egCtx := errgroup.WithContext(ctx) + + for addr, data := range batch { + addr := addr + data := data + + if err := limiter.AcquireWorkSlot(egCtx); err != nil { + _ = eg.Wait() + return result.Load(), err + } + eg.Go(func() error { + defer limiter.ReleaseWorkSlot() + err := b.moveObject(egCtx, blz, blzPath, addr, data, meta) + if err == nil { + result.Add(1) + } + return err + }) + } + if err := eg.Wait(); err != nil { + return result.Load(), err + } + + batch = make(map[oid.Address][]byte) + } + + return result.Load(), nil } func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string, diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go index 34006b27..921515a6 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go @@ -48,7 +48,8 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta WithRootPath(dir), WithBlobovniczaSize(100*1024*1024), WithWaitBeforeDropDB(0), - WithOpenedCacheSize(1000)) + WithOpenedCacheSize(1000), + WithMoveBatchSize(3)) require.NoError(t, b.Open(false)) require.NoError(t, b.Init()) @@ -87,7 +88,8 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta WithRootPath(dir), WithBlobovniczaSize(100*1024*1024), WithWaitBeforeDropDB(0), - WithOpenedCacheSize(1000)) + WithOpenedCacheSize(1000), + WithMoveBatchSize(3)) require.NoError(t, b.Open(false)) require.NoError(t, b.Init())