From c6a739e7467bc0fbbcdb1f813b38d1387f95d2c8 Mon Sep 17 00:00:00 2001
From: Dmitrii Stepanov <d.stepanov@yadro.com>
Date: Tue, 10 Oct 2023 16:01:35 +0300
Subject: [PATCH] [#661] blobovniczatree: Make Rebuild concurrent for objects

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
---
 .../blobstor/blobovniczatree/option.go        |  9 ++
 .../blobstor/blobovniczatree/rebuild.go       | 98 +++++++++++--------
 .../blobstor/blobovniczatree/rebuild_test.go  |  6 +-
 3 files changed, 72 insertions(+), 41 deletions(-)

diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/option.go b/pkg/local_object_storage/blobstor/blobovniczatree/option.go
index f1eb1eb8e1..93dc487ef1 100644
--- a/pkg/local_object_storage/blobstor/blobovniczatree/option.go
+++ b/pkg/local_object_storage/blobstor/blobovniczatree/option.go
@@ -25,6 +25,7 @@ type cfg struct {
 	metrics            Metrics
 	waitBeforeDropDB   time.Duration
 	blzInitWorkerCount int
+	blzMoveBatchSize   int
 }
 
 type Option func(*cfg)
@@ -36,6 +37,7 @@ const (
 	defaultBlzShallowWidth    = 16
 	defaultWaitBeforeDropDB   = 10 * time.Second
 	defaultBlzInitWorkerCount = 5
+	defaulBlzMoveBatchSize    = 10000
 )
 
 func initConfig(c *cfg) {
@@ -49,6 +51,7 @@ func initConfig(c *cfg) {
 		metrics:            &noopMetrics{},
 		waitBeforeDropDB:   defaultWaitBeforeDropDB,
 		blzInitWorkerCount: defaultBlzInitWorkerCount,
+		blzMoveBatchSize:   defaulBlzMoveBatchSize,
 	}
 }
 
@@ -119,6 +122,12 @@ func WithWaitBeforeDropDB(t time.Duration) Option {
 	}
 }
 
+func WithMoveBatchSize(v int) Option {
+	return func(c *cfg) {
+		c.blzMoveBatchSize = v
+	}
+}
+
 // WithInitWorkerCount sets maximum workers count to init blobovnicza tree.
 //
 // Negative or zero value means no limit.
diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go
index a0ddbfdeff..c7e44fdb35 100644
--- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go
+++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild.go
@@ -1,6 +1,7 @@
 package blobovniczatree
 
 import (
+	"bytes"
 	"context"
 	"errors"
 	"os"
@@ -18,7 +19,10 @@ import (
 	"golang.org/x/sync/errgroup"
 )
 
-var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
+var (
+	errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
+	errBatchFull         = errors.New("batch full")
+)
 
 func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
 	if b.readOnly {
@@ -67,38 +71,18 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
 }
 
 func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
-	eg, ctx := errgroup.WithContext(ctx)
-
-	var movedObjectsAcc atomic.Uint64
-	var filesMovedAcc atomic.Uint64
 	for _, db := range dbs {
-		db := db
-		if err := prm.WorkerLimiter.AcquireWorkSlot(ctx); err != nil {
-			_ = eg.Wait()
-			res.FilesRemoved += filesMovedAcc.Load()
-			res.ObjectsMoved += movedObjectsAcc.Load()
+		b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
+		movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
+		res.ObjectsMoved += movedObjects
+		if err != nil {
+			b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
 			return res, err
 		}
-		eg.Go(func() error {
-			defer prm.WorkerLimiter.ReleaseWorkSlot()
-
-			b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
-			movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage)
-			movedObjectsAcc.Add(movedObjects)
-			if err != nil {
-				b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
-				return err
-			}
-			b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
-			filesMovedAcc.Add(1)
-			return nil
-		})
+		b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
+		res.FilesRemoved++
 	}
-
-	err := eg.Wait()
-	res.FilesRemoved += filesMovedAcc.Load()
-	res.ObjectsMoved += movedObjectsAcc.Load()
-	return res, err
+	return res, nil
 }
 
 func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
@@ -122,7 +106,7 @@ func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
 	return result, nil
 }
 
-func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage) (uint64, error) {
+func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
 	shDB := b.getBlobovnicza(path)
 	blz, err := shDB.Open()
 	if err != nil {
@@ -136,7 +120,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
 		shDB.Close()
 	}()
 
-	migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta)
+	migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
 	if err != nil {
 		return migratedObjects, err
 	}
@@ -144,21 +128,57 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
 	return migratedObjects, err
 }
 
-func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage) (uint64, error) {
-	var result uint64
+func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
+	var result atomic.Uint64
+	batch := make(map[oid.Address][]byte)
 
 	var prm blobovnicza.IteratePrm
 	prm.DecodeAddresses()
 	prm.SetHandler(func(ie blobovnicza.IterationElement) error {
-		e := b.moveObject(ctx, blz, blzPath, ie.Address(), ie.ObjectData(), meta)
-		if e == nil {
-			result++
+		batch[ie.Address()] = bytes.Clone(ie.ObjectData())
+		if len(batch) == b.blzMoveBatchSize {
+			return errBatchFull
 		}
-		return e
+		return nil
 	})
 
-	_, err := blz.Iterate(ctx, prm)
-	return result, err
+	for {
+		_, err := blz.Iterate(ctx, prm)
+		if err != nil && !errors.Is(err, errBatchFull) {
+			return result.Load(), err
+		}
+
+		if len(batch) == 0 {
+			break
+		}
+
+		eg, egCtx := errgroup.WithContext(ctx)
+
+		for addr, data := range batch {
+			addr := addr
+			data := data
+
+			if err := limiter.AcquireWorkSlot(egCtx); err != nil {
+				_ = eg.Wait()
+				return result.Load(), err
+			}
+			eg.Go(func() error {
+				defer limiter.ReleaseWorkSlot()
+				err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
+				if err == nil {
+					result.Add(1)
+				}
+				return err
+			})
+		}
+		if err := eg.Wait(); err != nil {
+			return result.Load(), err
+		}
+
+		batch = make(map[oid.Address][]byte)
+	}
+
+	return result.Load(), nil
 }
 
 func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go
index 34006b270e..921515a6ab 100644
--- a/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go
+++ b/pkg/local_object_storage/blobstor/blobovniczatree/rebuild_test.go
@@ -48,7 +48,8 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
 		WithRootPath(dir),
 		WithBlobovniczaSize(100*1024*1024),
 		WithWaitBeforeDropDB(0),
-		WithOpenedCacheSize(1000))
+		WithOpenedCacheSize(1000),
+		WithMoveBatchSize(3))
 	require.NoError(t, b.Open(false))
 	require.NoError(t, b.Init())
 
@@ -87,7 +88,8 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
 		WithRootPath(dir),
 		WithBlobovniczaSize(100*1024*1024),
 		WithWaitBeforeDropDB(0),
-		WithOpenedCacheSize(1000))
+		WithOpenedCacheSize(1000),
+		WithMoveBatchSize(3))
 	require.NoError(t, b.Open(false))
 	require.NoError(t, b.Init())