Blobovnicza tree rebuild #812
18 changed files with 170 additions and 30 deletions
|
@ -103,6 +103,7 @@ type applicationConfiguration struct {
|
|||
shardPoolSize uint32
|
||||
shards []shardCfg
|
||||
lowMem bool
|
||||
rebuildWorkers uint32
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -213,6 +214,7 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|||
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
||||
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
||||
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
||||
a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c)
|
||||
|
||||
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
||||
}
|
||||
|
@ -703,13 +705,14 @@ func initCfgObject(appCfg *config.Config) cfgObject {
|
|||
}
|
||||
|
||||
func (c *cfg) engineOpts() []engine.Option {
|
||||
opts := make([]engine.Option, 0, 4)
|
||||
var opts []engine.Option
|
||||
|
||||
opts = append(opts,
|
||||
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
|
||||
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
||||
engine.WithLogger(c.log),
|
||||
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
||||
engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers),
|
||||
)
|
||||
|
||||
if c.metricsCollector != nil {
|
||||
|
|
|
@ -15,6 +15,9 @@ const (
|
|||
// ShardPoolSizeDefault is a default value of routine pool size per-shard to
|
||||
// process object PUT operations in a storage engine.
|
||||
ShardPoolSizeDefault = 20
|
||||
// RebuildWorkersCountDefault is a default value of the workers count to
|
||||
// process storage rebuild operations in a storage engine.
|
||||
RebuildWorkersCountDefault = 100
|
||||
)
|
||||
|
||||
// ErrNoShardConfigured is returned when at least 1 shard is required but none are found.
|
||||
|
@ -88,3 +91,11 @@ func ShardErrorThreshold(c *config.Config) uint32 {
|
|||
func EngineLowMemoryConsumption(c *config.Config) bool {
|
||||
return config.BoolSafe(c.Sub(subsection), "low_mem")
|
||||
}
|
||||
|
||||
// EngineRebuildWorkersCount returns value of "rebuild_workers_count" config parmeter from "storage" section.
|
||||
func EngineRebuildWorkersCount(c *config.Config) uint32 {
|
||||
if v := config.Uint32Safe(c.Sub(subsection), "rebuild_workers_count"); v > 0 {
|
||||
return v
|
||||
}
|
||||
return RebuildWorkersCountDefault
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.EqualValues(t, 0, engineconfig.ShardErrorThreshold(empty))
|
||||
require.EqualValues(t, engineconfig.ShardPoolSizeDefault, engineconfig.ShardPoolSize(empty))
|
||||
require.EqualValues(t, mode.ReadWrite, shardconfig.From(empty).Mode())
|
||||
require.EqualValues(t, engineconfig.RebuildWorkersCountDefault, engineconfig.EngineRebuildWorkersCount(empty))
|
||||
})
|
||||
|
||||
const path = "../../../../config/example/node"
|
||||
|
@ -47,6 +48,7 @@ func TestEngineSection(t *testing.T) {
|
|||
|
||||
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
|
||||
require.EqualValues(t, 15, engineconfig.ShardPoolSize(c))
|
||||
require.EqualValues(t, uint32(1000), engineconfig.EngineRebuildWorkersCount(c))
|
||||
|
||||
err := engineconfig.IterateShards(c, true, func(sc *shardconfig.Config) error {
|
||||
defer func() {
|
||||
|
|
|
@ -92,6 +92,7 @@ FROSTFS_OBJECT_DELETE_TOMBSTONE_LIFETIME=10
|
|||
# Storage engine section
|
||||
FROSTFS_STORAGE_SHARD_POOL_SIZE=15
|
||||
FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
||||
FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
|
||||
## 0 shard
|
||||
### Flag to refill Metabase from BlobStor
|
||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
||||
|
|
|
@ -137,6 +137,7 @@
|
|||
"storage": {
|
||||
"shard_pool_size": 15,
|
||||
"shard_ro_error_threshold": 100,
|
||||
"rebuild_workers_count": 1000,
|
||||
"shard": {
|
||||
"0": {
|
||||
"mode": "read-only",
|
||||
|
|
|
@ -116,6 +116,7 @@ storage:
|
|||
# note: shard configuration can be omitted for relay node (see `node.relay`)
|
||||
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
|
||||
shard_ro_error_threshold: 100 # amount of errors to occur before shard is made read-only (default: 0, ignore errors)
|
||||
rebuild_workers_count: 1000 # count of rebuild storage concurrent workers
|
||||
|
||||
shard:
|
||||
default: # section with the default shard parameters
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -14,6 +15,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var errRebuildInProgress = errors.New("rebuild is in progress, the operation cannot be performed")
|
||||
|
@ -55,21 +57,48 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
|
|||
success = false
|
||||
return res, err
|
||||
}
|
||||
|
||||
b.log.Info(logs.BlobovniczaTreeCollectingDBToRebuildSuccess, zap.Int("blobovniczas_to_rebuild", len(dbsToMigrate)))
|
||||
for _, db := range dbsToMigrate {
|
||||
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
|
||||
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage)
|
||||
res.ObjectsMoved += movedObjects
|
||||
res, err = b.migrateDBs(ctx, dbsToMigrate, prm, res)
|
||||
if err != nil {
|
||||
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
|
||||
success = false
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.RebuildPrm, res common.RebuildRes) (common.RebuildRes, error) {
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
var movedObjectsAcc atomic.Uint64
|
||||
var filesMovedAcc atomic.Uint64
|
||||
for _, db := range dbs {
|
||||
db := db
|
||||
if err := prm.WorkerLimiter.AcquireWorkSlot(ctx); err != nil {
|
||||
_ = eg.Wait()
|
||||
res.FilesRemoved += filesMovedAcc.Load()
|
||||
res.ObjectsMoved += movedObjectsAcc.Load()
|
||||
return res, err
|
||||
}
|
||||
eg.Go(func() error {
|
||||
defer prm.WorkerLimiter.ReleaseWorkSlot()
|
||||
|
||||
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovnicza, zap.String("path", db))
|
||||
movedObjects, err := b.rebuildDB(ctx, db, prm.MetaStorage)
|
||||
movedObjectsAcc.Add(movedObjects)
|
||||
if err != nil {
|
||||
b.log.Warn(logs.BlobovniczaTreeRebuildingBlobovniczaFailed, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
b.log.Debug(logs.BlobovniczaTreeRebuildingBlobovniczaSuccess, zap.String("path", db), zap.Uint64("moved_objects_count", movedObjects))
|
||||
res.FilesRemoved++
|
||||
filesMovedAcc.Add(1)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return res, nil
|
||||
err := eg.Wait()
|
||||
res.FilesRemoved += filesMovedAcc.Load()
|
||||
res.ObjectsMoved += movedObjectsAcc.Load()
|
||||
return res, err
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context) ([]string, error) {
|
||||
|
@ -133,7 +162,8 @@ func (b *Blobovniczas) moveObjects(ctx context.Context, blz *blobovnicza.Blobovn
|
|||
}
|
||||
|
||||
func (b *Blobovniczas) moveObject(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
||||
addr oid.Address, data []byte, metaStore common.MetaStorage) error {
|
||||
addr oid.Address, data []byte, metaStore common.MetaStorage,
|
||||
) error {
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
b.metrics.ObjectMoved(time.Since(startedAt))
|
||||
|
@ -229,7 +259,8 @@ func (b *Blobovniczas) completeIncompletedMove(ctx context.Context, metaStore co
|
|||
}
|
||||
|
||||
func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blobovnicza, sourcePath string,
|
||||
move blobovnicza.MoveInfo, metaStore common.MetaStorage) error {
|
||||
move blobovnicza.MoveInfo, metaStore common.MetaStorage,
|
||||
) error {
|
||||
targetDB := b.getBlobovnicza(NewIDFromBytes(move.TargetStorageID).Path())
|
||||
target, err := targetDB.Open()
|
||||
if err != nil {
|
||||
|
@ -250,7 +281,7 @@ func (b *Blobovniczas) performMove(ctx context.Context, source *blobovnicza.Blob
|
|||
}
|
||||
}
|
||||
|
||||
if !existsInSource { //object was deleted by Rebuild, need to delete move info
|
||||
if !existsInSource { // object was deleted by Rebuild, need to delete move info
|
||||
if err = source.DropMoveInfo(ctx, move.Address); err != nil {
|
||||
b.log.Warn(logs.BlobovniczatreeCouldNotDropMoveInfo, zap.String("path", sourcePath), zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
|
@ -147,9 +148,11 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
|
|||
|
||||
metaStub := &storageIDUpdateStub{
|
||||
storageIDs: make(map[oid.Address][]byte),
|
||||
guard: &sync.Mutex{},
|
||||
}
|
||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||
MetaStorage: metaStub,
|
||||
WorkerLimiter: &rebuildLimiterStub{},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(1), rRes.ObjectsMoved)
|
||||
|
|
|
@ -101,9 +101,11 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
|
|||
|
||||
metaStub := &storageIDUpdateStub{
|
||||
storageIDs: storageIDs,
|
||||
guard: &sync.Mutex{},
|
||||
}
|
||||
var rPrm common.RebuildPrm
|
||||
rPrm.MetaStorage = metaStub
|
||||
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
||||
rRes, err := b.Rebuild(context.Background(), rPrm)
|
||||
require.NoError(t, err)
|
||||
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||
|
@ -121,12 +123,21 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
|
|||
}
|
||||
|
||||
type storageIDUpdateStub struct {
|
||||
guard *sync.Mutex
|
||||
storageIDs map[oid.Address][]byte
|
||||
updatedCount uint64
|
||||
}
|
||||
|
||||
func (s *storageIDUpdateStub) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
|
||||
s.guard.Lock()
|
||||
defer s.guard.Unlock()
|
||||
|
||||
s.storageIDs[addr] = storageID
|
||||
s.updatedCount++
|
||||
return nil
|
||||
}
|
||||
|
||||
type rebuildLimiterStub struct{}
|
||||
|
||||
func (s *rebuildLimiterStub) AcquireWorkSlot(context.Context) error { return nil }
|
||||
func (s *rebuildLimiterStub) ReleaseWorkSlot() {}
|
||||
|
|
|
@ -13,8 +13,14 @@ type RebuildRes struct {
|
|||
|
||||
type RebuildPrm struct {
|
||||
MetaStorage MetaStorage
|
||||
WorkerLimiter ConcurrentWorkersLimiter
|
||||
}
|
||||
|
||||
type MetaStorage interface {
|
||||
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
|
||||
}
|
||||
|
||||
type ConcurrentWorkersLimiter interface {
|
||||
AcquireWorkSlot(ctx context.Context) error
|
||||
ReleaseWorkSlot()
|
||||
}
|
||||
|
|
|
@ -13,12 +13,18 @@ type StorageIDUpdate interface {
|
|||
UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error
|
||||
}
|
||||
|
||||
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate) error {
|
||||
type ConcurrentWorkersLimiter interface {
|
||||
AcquireWorkSlot(ctx context.Context) error
|
||||
ReleaseWorkSlot()
|
||||
}
|
||||
|
||||
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter) error {
|
||||
var summary common.RebuildRes
|
||||
var rErr error
|
||||
for _, storage := range b.storage {
|
||||
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
|
||||
MetaStorage: upd,
|
||||
WorkerLimiter: limiter,
|
||||
})
|
||||
summary.FilesRemoved += res.FilesRemoved
|
||||
summary.ObjectsMoved += res.ObjectsMoved
|
||||
|
|
|
@ -38,6 +38,7 @@ type StorageEngine struct {
|
|||
err error
|
||||
}
|
||||
evacuateLimiter *evacuationLimiter
|
||||
rebuildLimiter *rebuildLimiter
|
||||
}
|
||||
|
||||
type shardWrapper struct {
|
||||
|
@ -215,13 +216,15 @@ type cfg struct {
|
|||
shardPoolSize uint32
|
||||
|
||||
lowMem bool
|
||||
|
||||
rebuildWorkersCount uint32
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
|
||||
shardPoolSize: 20,
|
||||
rebuildWorkersCount: 100,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -240,6 +243,7 @@ func New(opts ...Option) *StorageEngine {
|
|||
closeCh: make(chan struct{}),
|
||||
setModeCh: make(chan setModeRequest),
|
||||
evacuateLimiter: &evacuationLimiter{},
|
||||
rebuildLimiter: newRebuildLimiter(c.rebuildWorkersCount),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -277,3 +281,10 @@ func WithLowMemoryConsumption(lowMemCons bool) Option {
|
|||
c.lowMem = lowMemCons
|
||||
}
|
||||
}
|
||||
|
||||
// WithRebuildWorkersCount returns an option to set the count of concurrent rebuild workers.
|
||||
func WithRebuildWorkersCount(count uint32) Option {
|
||||
return func(c *cfg) {
|
||||
c.rebuildWorkersCount = count
|
||||
}
|
||||
}
|
||||
|
|
26
pkg/local_object_storage/engine/rebuild_limiter.go
Normal file
26
pkg/local_object_storage/engine/rebuild_limiter.go
Normal file
|
@ -0,0 +1,26 @@
|
|||
package engine
|
||||
|
||||
import "context"
|
||||
|
||||
type rebuildLimiter struct {
|
||||
semaphore chan struct{}
|
||||
}
|
||||
|
||||
func newRebuildLimiter(workersCount uint32) *rebuildLimiter {
|
||||
return &rebuildLimiter{
|
||||
semaphore: make(chan struct{}, workersCount),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
|
||||
select {
|
||||
case l.semaphore <- struct{}{}:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (l *rebuildLimiter) ReleaseWorkSlot() {
|
||||
<-l.semaphore
|
||||
}
|
|
@ -118,6 +118,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (*
|
|||
shard.WithExpiredLocksCallback(e.processExpiredLocks),
|
||||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||
shard.WithReportErrorFunc(e.reportShardErrorBackground),
|
||||
shard.WithRebuildWorkerLimiter(e.rebuildLimiter),
|
||||
)...)
|
||||
|
||||
if err := sh.UpdateID(ctx); err != nil {
|
||||
|
|
|
@ -162,7 +162,7 @@ func (s *Shard) Init(ctx context.Context) error {
|
|||
|
||||
s.gc.init(ctx)
|
||||
|
||||
s.rb = newRebuilder()
|
||||
s.rb = newRebuilder(s.rebuildLimiter)
|
||||
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||
|
||||
return nil
|
||||
|
|
13
pkg/local_object_storage/shard/rebuild_limiter.go
Normal file
13
pkg/local_object_storage/shard/rebuild_limiter.go
Normal file
|
@ -0,0 +1,13 @@
|
|||
package shard
|
||||
|
||||
import "context"
|
||||
|
||||
type RebuildWorkerLimiter interface {
|
||||
AcquireWorkSlot(ctx context.Context) error
|
||||
ReleaseWorkSlot()
|
||||
}
|
||||
|
||||
type noopRebuildLimiter struct{}
|
||||
|
||||
func (l *noopRebuildLimiter) AcquireWorkSlot(context.Context) error { return nil }
|
||||
func (l *noopRebuildLimiter) ReleaseWorkSlot() {}
|
|
@ -17,13 +17,15 @@ type rebuilder struct {
|
|||
mtx *sync.Mutex
|
||||
wg *sync.WaitGroup
|
||||
cancel func()
|
||||
limiter RebuildWorkerLimiter
|
||||
}
|
||||
|
||||
func newRebuilder() *rebuilder {
|
||||
func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
|
||||
return &rebuilder{
|
||||
mtx: &sync.Mutex{},
|
||||
wg: &sync.WaitGroup{},
|
||||
cancel: nil,
|
||||
limiter: l,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,7 +47,7 @@ func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D
|
|||
defer r.wg.Done()
|
||||
|
||||
log.Info(logs.BlobstoreRebuildStarted)
|
||||
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}); err != nil {
|
||||
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil {
|
||||
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
|
||||
} else {
|
||||
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
|
||||
|
|
|
@ -127,6 +127,8 @@ type cfg struct {
|
|||
metricsWriter MetricsWriter
|
||||
|
||||
reportErrorFunc func(selfID string, message string, err error)
|
||||
|
||||
rebuildLimiter RebuildWorkerLimiter
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
|
@ -135,6 +137,7 @@ func defaultCfg() *cfg {
|
|||
log: &logger.Logger{Logger: zap.L()},
|
||||
gcCfg: defaultGCCfg(),
|
||||
reportErrorFunc: func(string, string, error) {},
|
||||
rebuildLimiter: &noopRebuildLimiter{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -372,6 +375,14 @@ func WithExpiredCollectorWorkerCount(count int) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithRebuildWorkerLimiter return option to set concurrent
|
||||
// workers count of storage rebuild operation.
|
||||
func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
|
||||
return func(c *cfg) {
|
||||
c.rebuildLimiter = l
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Shard) fillInfo() {
|
||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||
|
|
Loading…
Reference in a new issue