[#661] blobovniczatree: Make Rebuild concurrent for objects

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-10-10 16:01:35 +03:00
parent 888f966eb4
commit 52c625df0f
3 changed files with 73 additions and 42 deletions

View file

@ -25,6 +25,7 @@ type cfg struct {
metrics Metrics metrics Metrics
waitBeforeDropDB time.Duration waitBeforeDropDB time.Duration
blzInitWorkerCount int blzInitWorkerCount int
blzMoveBatchSize int
} }
type Option func(*cfg) type Option func(*cfg)
@ -36,6 +37,7 @@ const (
defaultBlzShallowWidth = 16 defaultBlzShallowWidth = 16
defaultWaitBeforeDropDB = 10 * time.Second defaultWaitBeforeDropDB = 10 * time.Second
defaultBlzInitWorkerCount = 5 defaultBlzInitWorkerCount = 5
defaulBlzMoveBatchSize = 10000
) )
func initConfig(c *cfg) { func initConfig(c *cfg) {
@ -49,6 +51,7 @@ func initConfig(c *cfg) {
metrics: &noopMetrics{}, metrics: &noopMetrics{},
waitBeforeDropDB: defaultWaitBeforeDropDB, waitBeforeDropDB: defaultWaitBeforeDropDB,
blzInitWorkerCount: defaultBlzInitWorkerCount, blzInitWorkerCount: defaultBlzInitWorkerCount,
blzMoveBatchSize: defaulBlzMoveBatchSize,
} }
} }
@ -119,7 +122,13 @@ func WithWaitBeforeDropDB(t time.Duration) Option {
} }
} }
// WithInitWorkerCount sets maximum workers count to init blobovnicza tree. func WithMoveBatchSize(v int) Option {
return func(c *cfg) {
c.blzMoveBatchSize = v
}
}
// WithInitWorkersCount sets maximum workers count to init blobovnicza tree.
// //
// Negative or zero value means no limit. // Negative or zero value means no limit.
func WithInitWorkerCount(v int) Option { func WithInitWorkerCount(v int) Option {

View file

@ -1,6 +1,7 @@
package blobovniczatree package blobovniczatree
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"os" "os"
@ -18,7 +19,10 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed") var (
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
errBatchFull = errors.New("batch full")
)
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) { func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
if b.readOnly { if b.readOnly {
@ -67,38 +71,18 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
} }
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) { func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
eg, ctx := errgroup.WithContext(ctx)
var movedObjectsAcc atomic.Uint64
var filesMovedAcc atomic.Uint64
for _, db := range dbs { for _, db := range dbs {
db := db b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
if err := prm.WorkerLimiter.AcquireWorkSlot(ctx); err != nil { movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
_ = eg.Wait() res.ObjectsMoved += movedObjects
res.FilesRemoved += filesMovedAcc.Load() if err != nil {
res.ObjectsMoved += movedObjectsAcc.Load() b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
return res, err return res, err
} }
eg.Go(func() error { b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
defer prm.WorkerLimiter.ReleaseWorkSlot() res.FilesRemoved++
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage)
movedObjectsAcc.Add(movedObjects)
if err != nil {
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
filesMovedAcc.Add(1)
return nil
})
} }
return res, nil
err := eg.Wait()
res.FilesRemoved += filesMovedAcc.Load()
res.ObjectsMoved += movedObjectsAcc.Load()
return res, err
} }
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) { func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
@ -122,7 +106,7 @@ func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
return result, nil return result, nil
} }
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage) (uint64, error) { func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
shDB := b.getBlobovnicza(path) shDB := b.getBlobovnicza(path)
blz, err := shDB.Open() blz, err := shDB.Open()
if err != nil { if err != nil {
@ -136,7 +120,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
shDB.Close() shDB.Close()
}() }()
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta) migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
if err != nil { if err != nil {
return migratedObjects, err return migratedObjects, err
} }
@ -144,21 +128,57 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
return migratedObjects, err return migratedObjects, err
} }
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage) (uint64, error) { func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
var result uint64 var result atomic.Uint64
batch := make(map[oid.Address][]byte)
var prm blobovnicza.IteratePrm var prm blobovnicza.IteratePrm
prm.DecodeAddresses() prm.DecodeAddresses()
prm.SetHandler(func(ie blobovnicza.IterationElement) error { prm.SetHandler(func(ie blobovnicza.IterationElement) error {
e := b.moveObject(ctx, blz, blzPath, ie.Address(), ie.ObjectData(), meta) batch[ie.Address()] = bytes.Clone(ie.ObjectData())
if e == nil { if len(batch) == b.blzMoveBatchSize {
result++ return errBatchFull
} }
return e return nil
}) })
_, err := blz.Iterate(ctx, prm) for {
return result, err _, err := blz.Iterate(ctx, prm)
if err != nil && !errors.Is(err, errBatchFull) {
return result.Load(), err
}
if len(batch) == 0 {
break
}
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
addr := addr
data := data
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
if err == nil {
result.Add(1)
}
return err
})
}
if err := eg.Wait(); err != nil {
return result.Load(), err
}
batch = make(map[oid.Address][]byte)
}
return result.Load(), nil
} }
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string, func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,

View file

@ -48,7 +48,8 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
WithRootPath(dir), WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024), WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0), WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000)) WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false)) require.NoError(t, b.Open(false))
require.NoError(t, b.Init()) require.NoError(t, b.Init())
@ -87,7 +88,8 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
WithRootPath(dir), WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024), WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0), WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000)) WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false)) require.NoError(t, b.Open(false))
require.NoError(t, b.Init()) require.NoError(t, b.Init())