[#661] blobovniczatree: Make Rebuild concurrent for objects

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
pull/812/head
Dmitrii Stepanov 2023-10-10 16:01:35 +03:00
parent f1c7905263
commit c6a739e746
3 changed files with 72 additions and 41 deletions

View File

@ -25,6 +25,7 @@ type cfg struct {
metrics Metrics
waitBeforeDropDB time.Duration
blzInitWorkerCount int
blzMoveBatchSize int
}
type Option func(*cfg)
@ -36,6 +37,7 @@ const (
defaultBlzShallowWidth = 16
defaultWaitBeforeDropDB = 10 * time.Second
defaultBlzInitWorkerCount = 5
defaulBlzMoveBatchSize = 10000
)
func initConfig(c *cfg) {
@ -49,6 +51,7 @@ func initConfig(c *cfg) {
metrics: &noopMetrics{},
waitBeforeDropDB: defaultWaitBeforeDropDB,
blzInitWorkerCount: defaultBlzInitWorkerCount,
blzMoveBatchSize: defaulBlzMoveBatchSize,
}
}
@ -119,6 +122,12 @@ func WithWaitBeforeDropDB(t time.Duration) Option {
}
}
func WithMoveBatchSize(v int) Option {
return func(c *cfg) {
c.blzMoveBatchSize = v
}
}
// WithInitWorkerCount sets maximum workers count to init blobovnicza tree.
//
// Negative or zero value means no limit.

View File

@ -1,6 +1,7 @@
package blobovniczatree
import (
"bytes"
"context"
"errors"
"os"
@ -18,7 +19,10 @@ import (
"golang.org/x/sync/errgroup"
)
var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
var (
errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
errBatchFull = errors.New("batch full")
)
func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (common.RebuildRes, error) {
if b.readOnly {
@ -67,38 +71,18 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
}
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
eg, ctx := errgroup.WithContext(ctx)
var movedObjectsAcc atomic.Uint64
var filesMovedAcc atomic.Uint64
for _, db := range dbs {
db := db
if err := prm.WorkerLimiter.AcquireWorkSlot(ctx); err != nil {
_ = eg.Wait()
res.FilesRemoved += filesMovedAcc.Load()
res.ObjectsMoved += movedObjectsAcc.Load()
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage, prm.WorkerLimiter)
res.ObjectsMoved += movedObjects
if err != nil {
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
return res, err
}
eg.Go(func() error {
defer prm.WorkerLimiter.ReleaseWorkSlot()
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage)
movedObjectsAcc.Add(movedObjects)
if err != nil {
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
filesMovedAcc.Add(1)
return nil
})
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
res.FilesRemoved++
}
err := eg.Wait()
res.FilesRemoved += filesMovedAcc.Load()
res.ObjectsMoved += movedObjectsAcc.Load()
return res, err
return res, nil
}
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
@ -122,7 +106,7 @@ func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
return result, nil
}
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage) (uint64, error) {
func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
shDB := b.getBlobovnicza(path)
blz, err := shDB.Open()
if err != nil {
@ -136,7 +120,7 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
shDB.Close()
}()
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta)
migratedObjects, err := b.moveObjects(ctx, blz, shDB.SystemPath(), meta, limiter)
if err != nil {
return migratedObjects, err
}
@ -144,21 +128,57 @@ func (b *Blobovniczas) rebuildDB(ctx context.Context, path string, meta common.M
return migratedObjects, err
}
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage) (uint64, error) {
var result uint64
func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovnicza, blzPath string, meta common.MetaStorage, limiter common.ConcurrentWorkersLimiter) (uint64, error) {
var result atomic.Uint64
batch := make(map[oid.Address][]byte)
var prm blobovnicza.IteratePrm
prm.DecodeAddresses()
prm.SetHandler(func(ie blobovnicza.IterationElement) error {
e := b.moveObject(ctx, blz, blzPath, ie.Address(), ie.ObjectData(), meta)
if e == nil {
result++
batch[ie.Address()] = bytes.Clone(ie.ObjectData())
if len(batch) == b.blzMoveBatchSize {
return errBatchFull
}
return e
return nil
})
_, err := blz.Iterate(ctx, prm)
return result, err
for {
_, err := blz.Iterate(ctx, prm)
if err != nil && !errors.Is(err, errBatchFull) {
return result.Load(), err
}
if len(batch) == 0 {
break
}
eg, egCtx := errgroup.WithContext(ctx)
for addr, data := range batch {
addr := addr
data := data
if err := limiter.AcquireWorkSlot(egCtx); err != nil {
_ = eg.Wait()
return result.Load(), err
}
eg.Go(func() error {
defer limiter.ReleaseWorkSlot()
err := b.moveObject(egCtx, blz, blzPath, addr, data, meta)
if err == nil {
result.Add(1)
}
return err
})
}
if err := eg.Wait(); err != nil {
return result.Load(), err
}
batch = make(map[oid.Address][]byte)
}
return result.Load(), nil
}
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,

View File

@ -48,7 +48,8 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000))
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())
@ -87,7 +88,8 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024),
WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000))
WithOpenedCacheSize(1000),
WithMoveBatchSize(3))
require.NoError(t, b.Open(false))
require.NoError(t, b.Init())