[#1337] shard: Disable background rebuild
All checks were successful
Vulncheck / Vulncheck (pull_request) Successful in 1m3s
DCO action / DCO (pull_request) Successful in 1m28s
Pre-commit hooks / Pre-commit (pull_request) Successful in 2m2s
Build / Build Components (pull_request) Successful in 2m15s
Tests and linters / Run gofumpt (pull_request) Successful in 2m10s
Tests and linters / gopls check (pull_request) Successful in 2m21s
Tests and linters / Staticcheck (pull_request) Successful in 2m55s
Tests and linters / Lint (pull_request) Successful in 3m29s
Tests and linters / Tests (pull_request) Successful in 3m47s
Tests and linters / Tests with -race (pull_request) Successful in 3m58s

Since `frostfs-cli control shards rebuild` command was added,
there is no need for background rebuild now.
For failover tests used used value 1 to rebuild only schema change.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2024-09-04 09:30:56 +03:00
parent edb1747af7
commit d3b209c8e1
16 changed files with 49 additions and 138 deletions

View file

@ -126,8 +126,6 @@ type shardCfg struct {
refillMetabaseWorkersCount int refillMetabaseWorkersCount int
mode shardmode.Mode mode shardmode.Mode
rebuildWorkersCount uint32
metaCfg struct { metaCfg struct {
path string path string
perm fs.FileMode perm fs.FileMode
@ -240,7 +238,6 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
newConfig.refillMetabase = oldConfig.RefillMetabase() newConfig.refillMetabase = oldConfig.RefillMetabase()
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount() newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
newConfig.rebuildWorkersCount = oldConfig.RebuildWorkerCount()
newConfig.mode = oldConfig.Mode() newConfig.mode = oldConfig.Mode()
newConfig.compress = oldConfig.Compress() newConfig.compress = oldConfig.Compress()
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility() newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
@ -998,7 +995,6 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
shard.WithLogger(c.log), shard.WithLogger(c.log),
shard.WithRefillMetabase(shCfg.refillMetabase), shard.WithRefillMetabase(shCfg.refillMetabase),
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount), shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
shard.WithRebuildWorkersCount(shCfg.rebuildWorkersCount),
shard.WithMode(shCfg.mode), shard.WithMode(shCfg.mode),
shard.WithBlobStorOptions(blobstoreOpts...), shard.WithBlobStorOptions(blobstoreOpts...),
shard.WithMetaBaseOptions(mbOptions...), shard.WithMetaBaseOptions(mbOptions...),

View file

@ -119,7 +119,6 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, false, sc.RefillMetabase()) require.Equal(t, false, sc.RefillMetabase())
require.Equal(t, mode.ReadOnly, sc.Mode()) require.Equal(t, mode.ReadOnly, sc.Mode())
require.Equal(t, 100, sc.RefillMetabaseWorkersCount()) require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
require.Equal(t, uint32(1000), sc.RebuildWorkerCount())
case 1: case 1:
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path()) require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
require.Equal(t, fs.FileMode(0o644), pl.Perm()) require.Equal(t, fs.FileMode(0o644), pl.Perm())
@ -175,7 +174,6 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, true, sc.RefillMetabase()) require.Equal(t, true, sc.RefillMetabase())
require.Equal(t, mode.ReadWrite, sc.Mode()) require.Equal(t, mode.ReadWrite, sc.Mode())
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount()) require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
require.Equal(t, uint32(shardconfig.RebuildWorkersCountDefault), sc.RebuildWorkerCount())
} }
return nil return nil
}) })

View file

@ -19,7 +19,6 @@ const (
SmallSizeLimitDefault = 1 << 20 SmallSizeLimitDefault = 1 << 20
EstimateCompressibilityThresholdDefault = 0.1 EstimateCompressibilityThresholdDefault = 0.1
RefillMetabaseWorkersCountDefault = 500 RefillMetabaseWorkersCountDefault = 500
RebuildWorkersCountDefault = 5
) )
// From wraps config section into Config. // From wraps config section into Config.
@ -150,20 +149,6 @@ func (x *Config) RefillMetabaseWorkersCount() int {
return RefillMetabaseWorkersCountDefault return RefillMetabaseWorkersCountDefault
} }
// RebuildWorkersCount returns the value of "resync_metabase_worker_count" config parameter.
//
// Returns RebuildWorkersCountDefault if the value is not a positive number.
func (x *Config) RebuildWorkerCount() uint32 {
v := config.Uint32Safe(
(*config.Config)(x),
"rebuild_worker_count",
)
if v > 0 {
return v
}
return RebuildWorkersCountDefault
}
// Mode return the value of "mode" config parameter. // Mode return the value of "mode" config parameter.
// //
// Panics if read the value is not one of predefined // Panics if read the value is not one of predefined

View file

@ -94,7 +94,6 @@ FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
### Flag to refill Metabase from BlobStor ### Flag to refill Metabase from BlobStor
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100 FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100
FROSTFS_STORAGE_SHARD_0_REBUILD_WORKER_COUNT=1000
### Flag to set shard mode ### Flag to set shard mode
FROSTFS_STORAGE_SHARD_0_MODE=read-only FROSTFS_STORAGE_SHARD_0_MODE=read-only
### Write cache config ### Write cache config

View file

@ -140,7 +140,6 @@
"mode": "read-only", "mode": "read-only",
"resync_metabase": false, "resync_metabase": false,
"resync_metabase_worker_count": 100, "resync_metabase_worker_count": 100,
"rebuild_worker_count": 1000,
"writecache": { "writecache": {
"enabled": false, "enabled": false,
"no_sync": true, "no_sync": true,

View file

@ -164,7 +164,6 @@ storage:
# disabled (do not work with the shard, allows to not remove it from the config) # disabled (do not work with the shard, allows to not remove it from the config)
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
resync_metabase_worker_count: 100 resync_metabase_worker_count: 100
rebuild_worker_count: 1000 # count of rebuild storage concurrent workers
writecache: writecache:
enabled: false enabled: false

View file

@ -189,7 +189,6 @@ The following table describes configuration for each shard.
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` | | `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. | | `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. | | `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
| `rebuild_worker_count` | `int` | `5` | Count of concurrent workers to rebuild blobstore. |
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. | | `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. | | `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. | | `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |

View file

@ -60,7 +60,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess) b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild) b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
dbsToMigrate, err := b.getDBsToRebuild(ctx, prm.Action) dbsToMigrate, err := b.getDBsToRebuild(ctx, prm.FillPercent)
if err != nil { if err != nil {
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err)) b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
success = false success = false
@ -94,27 +94,20 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.
return res, nil return res, nil
} }
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context, action common.RebuildAction) ([]string, error) { func (b *Blobovniczas) getDBsToRebuild(ctx context.Context, fillPercent int) ([]string, error) {
schemaChange := make(map[string]struct{}) withSchemaChange, err := b.selectDBsDoNotMatchSchema(ctx)
fillPercent := make(map[string]struct{})
var err error
if action.SchemaChange {
schemaChange, err = b.selectDBsDoNotMatchSchema(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} withFillPercent, err := b.selectDBsDoNotMatchFillPercent(ctx, fillPercent)
if action.FillPercent {
fillPercent, err = b.selectDBsDoNotMatchFillPercent(ctx, action.FillPercentValue)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for k := range withFillPercent {
withSchemaChange[k] = struct{}{}
} }
for k := range fillPercent { result := make([]string, 0, len(withSchemaChange))
schemaChange[k] = struct{}{} for db := range withSchemaChange {
}
result := make([]string, 0, len(schemaChange))
for db := range schemaChange {
result = append(result, db) result = append(result, db)
} }
return result, nil return result, nil

View file

@ -145,7 +145,7 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
WithBlobovniczaShallowWidth(2), WithBlobovniczaShallowWidth(2),
WithBlobovniczaShallowDepth(2), WithBlobovniczaShallowDepth(2),
WithRootPath(dir), WithRootPath(dir),
WithBlobovniczaSize(100*1024*1024), WithBlobovniczaSize(10*1024),
WithWaitBeforeDropDB(0), WithWaitBeforeDropDB(0),
WithOpenedCacheSize(1000)) WithOpenedCacheSize(1000))
require.NoError(t, b.Open(mode.ComponentReadWrite)) require.NoError(t, b.Open(mode.ComponentReadWrite))
@ -164,6 +164,7 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub, MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{}, WorkerLimiter: &rebuildLimiterStub{},
FillPercent: 1,
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(1), rRes.ObjectsMoved) require.Equal(t, uint64(1), rRes.ObjectsMoved)

View file

@ -79,11 +79,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub, MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{}, WorkerLimiter: &rebuildLimiterStub{},
Action: common.RebuildAction{ FillPercent: 60,
SchemaChange: false,
FillPercent: true,
FillPercentValue: 60,
},
}) })
require.NoError(t, err) require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
@ -135,11 +131,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub, MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{}, WorkerLimiter: &rebuildLimiterStub{},
Action: common.RebuildAction{ FillPercent: 90, // 64KB / 100KB = 64%
SchemaChange: false,
FillPercent: true,
FillPercentValue: 90, // 64KB / 100KB = 64%
},
}) })
require.NoError(t, err) require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
@ -204,11 +196,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub, MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{}, WorkerLimiter: &rebuildLimiterStub{},
Action: common.RebuildAction{ FillPercent: 80,
SchemaChange: false,
FillPercent: true,
FillPercentValue: 80,
},
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(49), rRes.FilesRemoved) require.Equal(t, uint64(49), rRes.FilesRemoved)
@ -281,11 +269,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{ rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
MetaStorage: metaStub, MetaStorage: metaStub,
WorkerLimiter: &rebuildLimiterStub{}, WorkerLimiter: &rebuildLimiterStub{},
Action: common.RebuildAction{ FillPercent: 80,
SchemaChange: false,
FillPercent: true,
FillPercentValue: 80,
},
}) })
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(49), rRes.FilesRemoved) require.Equal(t, uint64(49), rRes.FilesRemoved)
@ -357,7 +341,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
var rPrm common.RebuildPrm var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{} rPrm.WorkerLimiter = &rebuildLimiterStub{}
rPrm.Action = common.RebuildAction{SchemaChange: true} rPrm.FillPercent = 1
rRes, err := b.Rebuild(context.Background(), rPrm) rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err) require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
@ -446,7 +430,7 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
var rPrm common.RebuildPrm var rPrm common.RebuildPrm
rPrm.MetaStorage = metaStub rPrm.MetaStorage = metaStub
rPrm.WorkerLimiter = &rebuildLimiterStub{} rPrm.WorkerLimiter = &rebuildLimiterStub{}
rPrm.Action = common.RebuildAction{SchemaChange: true} rPrm.FillPercent = 1
rRes, err := b.Rebuild(context.Background(), rPrm) rRes, err := b.Rebuild(context.Background(), rPrm)
require.NoError(t, err) require.NoError(t, err)
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0 dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0

View file

@ -11,17 +11,10 @@ type RebuildRes struct {
FilesRemoved uint64 FilesRemoved uint64
} }
type RebuildAction struct {
SchemaChange bool
FillPercent bool
FillPercentValue int
}
type RebuildPrm struct { type RebuildPrm struct {
MetaStorage MetaStorage MetaStorage MetaStorage
WorkerLimiter ConcurrentWorkersLimiter WorkerLimiter ConcurrentWorkersLimiter
Action RebuildAction FillPercent int
} }
type MetaStorage interface { type MetaStorage interface {

View file

@ -18,14 +18,14 @@ type ConcurrentWorkersLimiter interface {
ReleaseWorkSlot() ReleaseWorkSlot()
} }
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, action common.RebuildAction) error { func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, fillPercent int) error {
var summary common.RebuildRes var summary common.RebuildRes
var rErr error var rErr error
for _, storage := range b.storage { for _, storage := range b.storage {
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{ res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
MetaStorage: upd, MetaStorage: upd,
WorkerLimiter: limiter, WorkerLimiter: limiter,
Action: action, FillPercent: fillPercent,
}) })
summary.FilesRemoved += res.FilesRemoved summary.FilesRemoved += res.FilesRemoved
summary.ObjectsMoved += res.ObjectsMoved summary.ObjectsMoved += res.ObjectsMoved

View file

@ -129,8 +129,8 @@ func (s *Shard) Init(ctx context.Context) error {
s.gc.init(ctx) s.gc.init(ctx)
s.rb = newRebuilder(NewRebuildLimiter(s.rebuildWorkersCount)) s.rb = newRebuilder()
if !m.NoMetabase() && !s.rebuildDisabled { if !m.NoMetabase() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
} }
s.writecacheSealCancel.Store(dummyCancel) s.writecacheSealCancel.Store(dummyCancel)
@ -398,7 +398,7 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
defer unlock() defer unlock()
s.rb.Stop(s.log) s.rb.Stop(s.log)
if !s.info.Mode.NoMetabase() && !s.rebuildDisabled { if !s.info.Mode.NoMetabase() {
defer func() { defer func() {
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log) s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
}() }()

View file

@ -187,7 +187,7 @@ func TestGCDropsObjectInhumedFromWritecache(t *testing.T) {
func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool) { func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool) {
sh := newCustomShard(t, true, shardOptions{ sh := newCustomShard(t, true, shardOptions{
additionalShardOptions: []Option{WithDisabledGC(), WithDisabledRebuild()}, additionalShardOptions: []Option{WithDisabledGC()},
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()}, wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
}) })
defer func() { require.NoError(t, sh.Close()) }() defer func() { require.NoError(t, sh.Close()) }()

View file

@ -7,7 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
@ -17,6 +16,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
var ErrRebuildInProgress = errors.New("shard rebuild in progress")
type RebuildWorkerLimiter interface { type RebuildWorkerLimiter interface {
AcquireWorkSlot(ctx context.Context) error AcquireWorkSlot(ctx context.Context) error
ReleaseWorkSlot() ReleaseWorkSlot()
@ -47,24 +48,22 @@ func (l *rebuildLimiter) ReleaseWorkSlot() {
type rebuildTask struct { type rebuildTask struct {
limiter RebuildWorkerLimiter limiter RebuildWorkerLimiter
action common.RebuildAction fillPercent int
} }
type rebuilder struct { type rebuilder struct {
mtx *sync.Mutex mtx *sync.Mutex
wg *sync.WaitGroup wg *sync.WaitGroup
cancel func() cancel func()
limiter RebuildWorkerLimiter
done chan struct{} done chan struct{}
tasks chan rebuildTask tasks chan rebuildTask
} }
func newRebuilder(l RebuildWorkerLimiter) *rebuilder { func newRebuilder() *rebuilder {
return &rebuilder{ return &rebuilder{
mtx: &sync.Mutex{}, mtx: &sync.Mutex{},
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
limiter: l, tasks: make(chan rebuildTask),
tasks: make(chan rebuildTask, 10),
} }
} }
@ -89,25 +88,14 @@ func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D
if !ok { if !ok {
continue continue
} }
runRebuild(ctx, bs, mb, log, t.action, t.limiter) runRebuild(ctx, bs, mb, log, t.fillPercent, t.limiter)
} }
} }
}() }()
select {
case <-ctx.Done():
return
case r.tasks <- rebuildTask{
limiter: r.limiter,
action: common.RebuildAction{
SchemaChange: true,
},
}:
return
}
} }
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger, func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
action common.RebuildAction, limiter RebuildWorkerLimiter, fillPercent int, limiter RebuildWorkerLimiter,
) { ) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -115,23 +103,25 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
default: default:
} }
log.Info(logs.BlobstoreRebuildStarted) log.Info(logs.BlobstoreRebuildStarted)
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, action); err != nil { if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err)) log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
} else { } else {
log.Info(logs.BlobstoreRebuildCompletedSuccessfully) log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
} }
} }
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, action common.RebuildAction, func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, fillPercent int,
) error { ) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case r.tasks <- rebuildTask{ case r.tasks <- rebuildTask{
limiter: limiter, limiter: limiter,
action: action, fillPercent: fillPercent,
}: }:
return nil return nil
default:
return ErrRebuildInProgress
} }
} }
@ -198,9 +188,5 @@ func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error {
return ErrDegradedMode return ErrDegradedMode
} }
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, common.RebuildAction{ return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, int(p.TargetFillPercent))
SchemaChange: true,
FillPercent: true,
FillPercentValue: int(p.TargetFillPercent),
})
} }

View file

@ -139,10 +139,6 @@ type cfg struct {
metricsWriter MetricsWriter metricsWriter MetricsWriter
reportErrorFunc func(selfID string, message string, err error) reportErrorFunc func(selfID string, message string, err error)
rebuildWorkersCount uint32
rebuildDisabled bool
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -151,7 +147,6 @@ func defaultCfg() *cfg {
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
gcCfg: defaultGCCfg(), gcCfg: defaultGCCfg(),
reportErrorFunc: func(string, string, error) {}, reportErrorFunc: func(string, string, error) {},
rebuildWorkersCount: 1,
zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
zeroCountContainersCallback: func(context.Context, []cid.ID) {}, zeroCountContainersCallback: func(context.Context, []cid.ID) {},
} }
@ -384,14 +379,6 @@ func WithExpiredCollectorWorkerCount(count int) Option {
} }
} }
// WithRebuildWorkersCount return option to set concurrent
// workers count of storage rebuild operation.
func WithRebuildWorkersCount(count uint32) Option {
return func(c *cfg) {
c.rebuildWorkersCount = count
}
}
// WithDisabledGC disables GC. // WithDisabledGC disables GC.
// For testing purposes only. // For testing purposes only.
func WithDisabledGC() Option { func WithDisabledGC() Option {
@ -414,14 +401,6 @@ func WithZeroCountCallback(cb EmptyContainersCallback) Option {
} }
} }
// WithDisabledRebuild returns an option to disable a shard rebuild.
// For testing purposes only.
func WithDisabledRebuild() Option {
return func(c *cfg) {
c.rebuildDisabled = true
}
}
func (s *Shard) fillInfo() { func (s *Shard) fillInfo() {
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()