Blobovnicza tree optimizations #1337
16 changed files with 49 additions and 138 deletions
|
@ -126,8 +126,6 @@ type shardCfg struct {
|
||||||
refillMetabaseWorkersCount int
|
refillMetabaseWorkersCount int
|
||||||
mode shardmode.Mode
|
mode shardmode.Mode
|
||||||
|
|
||||||
rebuildWorkersCount uint32
|
|
||||||
|
|
||||||
metaCfg struct {
|
metaCfg struct {
|
||||||
path string
|
path string
|
||||||
perm fs.FileMode
|
perm fs.FileMode
|
||||||
|
@ -240,7 +238,6 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
|
||||||
|
|
||||||
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
||||||
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
||||||
newConfig.rebuildWorkersCount = oldConfig.RebuildWorkerCount()
|
|
||||||
newConfig.mode = oldConfig.Mode()
|
newConfig.mode = oldConfig.Mode()
|
||||||
newConfig.compress = oldConfig.Compress()
|
newConfig.compress = oldConfig.Compress()
|
||||||
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
||||||
|
@ -998,7 +995,6 @@ func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID
|
||||||
shard.WithLogger(c.log),
|
shard.WithLogger(c.log),
|
||||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||||
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
||||||
shard.WithRebuildWorkersCount(shCfg.rebuildWorkersCount),
|
|
||||||
shard.WithMode(shCfg.mode),
|
shard.WithMode(shCfg.mode),
|
||||||
shard.WithBlobStorOptions(blobstoreOpts...),
|
shard.WithBlobStorOptions(blobstoreOpts...),
|
||||||
shard.WithMetaBaseOptions(mbOptions...),
|
shard.WithMetaBaseOptions(mbOptions...),
|
||||||
|
|
|
@ -119,7 +119,6 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.Equal(t, false, sc.RefillMetabase())
|
require.Equal(t, false, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||||
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
||||||
require.Equal(t, uint32(1000), sc.RebuildWorkerCount())
|
|
||||||
case 1:
|
case 1:
|
||||||
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
||||||
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
||||||
|
@ -175,7 +174,6 @@ func TestEngineSection(t *testing.T) {
|
||||||
require.Equal(t, true, sc.RefillMetabase())
|
require.Equal(t, true, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||||
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
||||||
require.Equal(t, uint32(shardconfig.RebuildWorkersCountDefault), sc.RebuildWorkerCount())
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -19,7 +19,6 @@ const (
|
||||||
SmallSizeLimitDefault = 1 << 20
|
SmallSizeLimitDefault = 1 << 20
|
||||||
EstimateCompressibilityThresholdDefault = 0.1
|
EstimateCompressibilityThresholdDefault = 0.1
|
||||||
RefillMetabaseWorkersCountDefault = 500
|
RefillMetabaseWorkersCountDefault = 500
|
||||||
RebuildWorkersCountDefault = 5
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// From wraps config section into Config.
|
// From wraps config section into Config.
|
||||||
|
@ -150,20 +149,6 @@ func (x *Config) RefillMetabaseWorkersCount() int {
|
||||||
return RefillMetabaseWorkersCountDefault
|
return RefillMetabaseWorkersCountDefault
|
||||||
}
|
}
|
||||||
|
|
||||||
// RebuildWorkersCount returns the value of "resync_metabase_worker_count" config parameter.
|
|
||||||
//
|
|
||||||
// Returns RebuildWorkersCountDefault if the value is not a positive number.
|
|
||||||
func (x *Config) RebuildWorkerCount() uint32 {
|
|
||||||
v := config.Uint32Safe(
|
|
||||||
(*config.Config)(x),
|
|
||||||
"rebuild_worker_count",
|
|
||||||
)
|
|
||||||
if v > 0 {
|
|
||||||
return v
|
|
||||||
}
|
|
||||||
return RebuildWorkersCountDefault
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mode return the value of "mode" config parameter.
|
// Mode return the value of "mode" config parameter.
|
||||||
//
|
//
|
||||||
// Panics if read the value is not one of predefined
|
// Panics if read the value is not one of predefined
|
||||||
|
|
|
@ -94,7 +94,6 @@ FROSTFS_STORAGE_SHARD_RO_ERROR_THRESHOLD=100
|
||||||
### Flag to refill Metabase from BlobStor
|
### Flag to refill Metabase from BlobStor
|
||||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
||||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100
|
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100
|
||||||
FROSTFS_STORAGE_SHARD_0_REBUILD_WORKER_COUNT=1000
|
|
||||||
### Flag to set shard mode
|
### Flag to set shard mode
|
||||||
FROSTFS_STORAGE_SHARD_0_MODE=read-only
|
FROSTFS_STORAGE_SHARD_0_MODE=read-only
|
||||||
### Write cache config
|
### Write cache config
|
||||||
|
|
|
@ -140,7 +140,6 @@
|
||||||
"mode": "read-only",
|
"mode": "read-only",
|
||||||
"resync_metabase": false,
|
"resync_metabase": false,
|
||||||
"resync_metabase_worker_count": 100,
|
"resync_metabase_worker_count": 100,
|
||||||
"rebuild_worker_count": 1000,
|
|
||||||
"writecache": {
|
"writecache": {
|
||||||
"enabled": false,
|
"enabled": false,
|
||||||
"no_sync": true,
|
"no_sync": true,
|
||||||
|
|
|
@ -164,7 +164,6 @@ storage:
|
||||||
# disabled (do not work with the shard, allows to not remove it from the config)
|
# disabled (do not work with the shard, allows to not remove it from the config)
|
||||||
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
|
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
|
||||||
resync_metabase_worker_count: 100
|
resync_metabase_worker_count: 100
|
||||||
rebuild_worker_count: 1000 # count of rebuild storage concurrent workers
|
|
||||||
|
|
||||||
writecache:
|
writecache:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
|
|
@ -189,7 +189,6 @@ The following table describes configuration for each shard.
|
||||||
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
||||||
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
||||||
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
|
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
|
||||||
| `rebuild_worker_count` | `int` | `5` | Count of concurrent workers to rebuild blobstore. |
|
|
||||||
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
||||||
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
||||||
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
||||||
|
|
|
@ -60,7 +60,7 @@ func (b *Blobovniczas) Rebuild(ctx context.Context, prm common.RebuildPrm) (comm
|
||||||
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
|
b.log.Debug(logs.BlobovniczaTreeCompletedPreviousRebuildSuccess)
|
||||||
|
|
||||||
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
b.log.Debug(logs.BlobovniczaTreeCollectingDBToRebuild)
|
||||||
dbsToMigrate, err := b.getDBsToRebuild(ctx, prm.Action)
|
dbsToMigrate, err := b.getDBsToRebuild(ctx, prm.FillPercent)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
|
b.log.Warn(logs.BlobovniczaTreeCollectingDBToRebuildFailed, zap.Error(err))
|
||||||
success = false
|
success = false
|
||||||
|
@ -94,27 +94,20 @@ func (b *Blobovniczas) migrateDBs(ctx context.Context, dbs []string, prm common.
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context, action common.RebuildAction) ([]string, error) {
|
func (b *Blobovniczas) getDBsToRebuild(ctx context.Context, fillPercent int) ([]string, error) {
|
||||||
schemaChange := make(map[string]struct{})
|
withSchemaChange, err := b.selectDBsDoNotMatchSchema(ctx)
|
||||||
fillPercent := make(map[string]struct{})
|
if err != nil {
|
||||||
var err error
|
return nil, err
|
||||||
if action.SchemaChange {
|
|
||||||
schemaChange, err = b.selectDBsDoNotMatchSchema(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if action.FillPercent {
|
withFillPercent, err := b.selectDBsDoNotMatchFillPercent(ctx, fillPercent)
|
||||||
fillPercent, err = b.selectDBsDoNotMatchFillPercent(ctx, action.FillPercentValue)
|
if err != nil {
|
||||||
if err != nil {
|
return nil, err
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for k := range fillPercent {
|
for k := range withFillPercent {
|
||||||
schemaChange[k] = struct{}{}
|
withSchemaChange[k] = struct{}{}
|
||||||
}
|
}
|
||||||
result := make([]string, 0, len(schemaChange))
|
result := make([]string, 0, len(withSchemaChange))
|
||||||
for db := range schemaChange {
|
for db := range withSchemaChange {
|
||||||
result = append(result, db)
|
result = append(result, db)
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
|
|
|
@ -145,7 +145,7 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
|
||||||
WithBlobovniczaShallowWidth(2),
|
WithBlobovniczaShallowWidth(2),
|
||||||
WithBlobovniczaShallowDepth(2),
|
WithBlobovniczaShallowDepth(2),
|
||||||
WithRootPath(dir),
|
WithRootPath(dir),
|
||||||
WithBlobovniczaSize(100*1024*1024),
|
WithBlobovniczaSize(10*1024),
|
||||||
WithWaitBeforeDropDB(0),
|
WithWaitBeforeDropDB(0),
|
||||||
WithOpenedCacheSize(1000))
|
WithOpenedCacheSize(1000))
|
||||||
require.NoError(t, b.Open(mode.ComponentReadWrite))
|
require.NoError(t, b.Open(mode.ComponentReadWrite))
|
||||||
|
@ -164,6 +164,7 @@ func testRebuildFailoverValidate(t *testing.T, dir string, obj *objectSDK.Object
|
||||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||||
MetaStorage: metaStub,
|
MetaStorage: metaStub,
|
||||||
WorkerLimiter: &rebuildLimiterStub{},
|
WorkerLimiter: &rebuildLimiterStub{},
|
||||||
|
FillPercent: 1,
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(1), rRes.ObjectsMoved)
|
require.Equal(t, uint64(1), rRes.ObjectsMoved)
|
||||||
|
|
|
@ -79,11 +79,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
||||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||||
MetaStorage: metaStub,
|
MetaStorage: metaStub,
|
||||||
WorkerLimiter: &rebuildLimiterStub{},
|
WorkerLimiter: &rebuildLimiterStub{},
|
||||||
Action: common.RebuildAction{
|
FillPercent: 60,
|
||||||
SchemaChange: false,
|
|
||||||
FillPercent: true,
|
|
||||||
FillPercentValue: 60,
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||||
|
@ -135,11 +131,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
||||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||||
MetaStorage: metaStub,
|
MetaStorage: metaStub,
|
||||||
WorkerLimiter: &rebuildLimiterStub{},
|
WorkerLimiter: &rebuildLimiterStub{},
|
||||||
Action: common.RebuildAction{
|
FillPercent: 90, // 64KB / 100KB = 64%
|
||||||
SchemaChange: false,
|
|
||||||
FillPercent: true,
|
|
||||||
FillPercentValue: 90, // 64KB / 100KB = 64%
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||||
|
@ -204,11 +196,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
||||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||||
MetaStorage: metaStub,
|
MetaStorage: metaStub,
|
||||||
WorkerLimiter: &rebuildLimiterStub{},
|
WorkerLimiter: &rebuildLimiterStub{},
|
||||||
Action: common.RebuildAction{
|
FillPercent: 80,
|
||||||
SchemaChange: false,
|
|
||||||
FillPercent: true,
|
|
||||||
FillPercentValue: 80,
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(49), rRes.FilesRemoved)
|
require.Equal(t, uint64(49), rRes.FilesRemoved)
|
||||||
|
@ -281,11 +269,7 @@ func TestBlobovniczaTreeFillPercentRebuild(t *testing.T) {
|
||||||
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
rRes, err := b.Rebuild(context.Background(), common.RebuildPrm{
|
||||||
MetaStorage: metaStub,
|
MetaStorage: metaStub,
|
||||||
WorkerLimiter: &rebuildLimiterStub{},
|
WorkerLimiter: &rebuildLimiterStub{},
|
||||||
Action: common.RebuildAction{
|
FillPercent: 80,
|
||||||
SchemaChange: false,
|
|
||||||
FillPercent: true,
|
|
||||||
FillPercentValue: 80,
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint64(49), rRes.FilesRemoved)
|
require.Equal(t, uint64(49), rRes.FilesRemoved)
|
||||||
|
@ -357,7 +341,7 @@ func TestBlobovniczaTreeRebuildLargeObject(t *testing.T) {
|
||||||
var rPrm common.RebuildPrm
|
var rPrm common.RebuildPrm
|
||||||
rPrm.MetaStorage = metaStub
|
rPrm.MetaStorage = metaStub
|
||||||
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
||||||
rPrm.Action = common.RebuildAction{SchemaChange: true}
|
rPrm.FillPercent = 1
|
||||||
rRes, err := b.Rebuild(context.Background(), rPrm)
|
rRes, err := b.Rebuild(context.Background(), rPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||||
|
@ -446,7 +430,7 @@ func testBlobovniczaTreeRebuildHelper(t *testing.T, sourceDepth, sourceWidth, ta
|
||||||
var rPrm common.RebuildPrm
|
var rPrm common.RebuildPrm
|
||||||
rPrm.MetaStorage = metaStub
|
rPrm.MetaStorage = metaStub
|
||||||
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
rPrm.WorkerLimiter = &rebuildLimiterStub{}
|
||||||
rPrm.Action = common.RebuildAction{SchemaChange: true}
|
rPrm.FillPercent = 1
|
||||||
rRes, err := b.Rebuild(context.Background(), rPrm)
|
rRes, err := b.Rebuild(context.Background(), rPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
dataMigrated := rRes.ObjectsMoved > 0 || rRes.FilesRemoved > 0 || metaStub.updatedCount > 0
|
||||||
|
|
|
@ -11,17 +11,10 @@ type RebuildRes struct {
|
||||||
FilesRemoved uint64
|
FilesRemoved uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
type RebuildAction struct {
|
|
||||||
SchemaChange bool
|
|
||||||
|
|
||||||
FillPercent bool
|
|
||||||
FillPercentValue int
|
|
||||||
}
|
|
||||||
|
|
||||||
type RebuildPrm struct {
|
type RebuildPrm struct {
|
||||||
MetaStorage MetaStorage
|
MetaStorage MetaStorage
|
||||||
WorkerLimiter ConcurrentWorkersLimiter
|
WorkerLimiter ConcurrentWorkersLimiter
|
||||||
Action RebuildAction
|
FillPercent int
|
||||||
}
|
}
|
||||||
|
|
||||||
type MetaStorage interface {
|
type MetaStorage interface {
|
||||||
|
|
|
@ -18,14 +18,14 @@ type ConcurrentWorkersLimiter interface {
|
||||||
ReleaseWorkSlot()
|
ReleaseWorkSlot()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, action common.RebuildAction) error {
|
func (b *BlobStor) Rebuild(ctx context.Context, upd StorageIDUpdate, limiter ConcurrentWorkersLimiter, fillPercent int) error {
|
||||||
var summary common.RebuildRes
|
var summary common.RebuildRes
|
||||||
var rErr error
|
var rErr error
|
||||||
for _, storage := range b.storage {
|
for _, storage := range b.storage {
|
||||||
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
|
res, err := storage.Storage.Rebuild(ctx, common.RebuildPrm{
|
||||||
MetaStorage: upd,
|
MetaStorage: upd,
|
||||||
WorkerLimiter: limiter,
|
WorkerLimiter: limiter,
|
||||||
Action: action,
|
FillPercent: fillPercent,
|
||||||
})
|
})
|
||||||
summary.FilesRemoved += res.FilesRemoved
|
summary.FilesRemoved += res.FilesRemoved
|
||||||
summary.ObjectsMoved += res.ObjectsMoved
|
summary.ObjectsMoved += res.ObjectsMoved
|
||||||
|
|
|
@ -129,8 +129,8 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
|
|
||||||
s.gc.init(ctx)
|
s.gc.init(ctx)
|
||||||
|
|
||||||
s.rb = newRebuilder(NewRebuildLimiter(s.rebuildWorkersCount))
|
s.rb = newRebuilder()
|
||||||
if !m.NoMetabase() && !s.rebuildDisabled {
|
if !m.NoMetabase() {
|
||||||
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||||
}
|
}
|
||||||
s.writecacheSealCancel.Store(dummyCancel)
|
s.writecacheSealCancel.Store(dummyCancel)
|
||||||
|
@ -398,7 +398,7 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
||||||
s.rb.Stop(s.log)
|
s.rb.Stop(s.log)
|
||||||
if !s.info.Mode.NoMetabase() && !s.rebuildDisabled {
|
if !s.info.Mode.NoMetabase() {
|
||||||
defer func() {
|
defer func() {
|
||||||
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -187,7 +187,7 @@ func TestGCDropsObjectInhumedFromWritecache(t *testing.T) {
|
||||||
|
|
||||||
func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool) {
|
func testGCDropsObjectInhumedFromWritecache(t *testing.T, flushbeforeInhume bool) {
|
||||||
sh := newCustomShard(t, true, shardOptions{
|
sh := newCustomShard(t, true, shardOptions{
|
||||||
additionalShardOptions: []Option{WithDisabledGC(), WithDisabledRebuild()},
|
additionalShardOptions: []Option{WithDisabledGC()},
|
||||||
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
|
wcOpts: []writecache.Option{writecache.WithDisableBackgroundFlush()},
|
||||||
})
|
})
|
||||||
defer func() { require.NoError(t, sh.Close()) }()
|
defer func() { require.NoError(t, sh.Close()) }()
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -17,6 +16,8 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrRebuildInProgress = errors.New("shard rebuild in progress")
|
||||||
|
|
||||||
type RebuildWorkerLimiter interface {
|
type RebuildWorkerLimiter interface {
|
||||||
AcquireWorkSlot(ctx context.Context) error
|
AcquireWorkSlot(ctx context.Context) error
|
||||||
ReleaseWorkSlot()
|
ReleaseWorkSlot()
|
||||||
|
@ -46,25 +47,23 @@ func (l *rebuildLimiter) ReleaseWorkSlot() {
|
||||||
}
|
}
|
||||||
|
|
||||||
type rebuildTask struct {
|
type rebuildTask struct {
|
||||||
limiter RebuildWorkerLimiter
|
limiter RebuildWorkerLimiter
|
||||||
action common.RebuildAction
|
fillPercent int
|
||||||
}
|
}
|
||||||
|
|
||||||
type rebuilder struct {
|
type rebuilder struct {
|
||||||
mtx *sync.Mutex
|
mtx *sync.Mutex
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
cancel func()
|
cancel func()
|
||||||
limiter RebuildWorkerLimiter
|
done chan struct{}
|
||||||
done chan struct{}
|
tasks chan rebuildTask
|
||||||
tasks chan rebuildTask
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
|
func newRebuilder() *rebuilder {
|
||||||
return &rebuilder{
|
return &rebuilder{
|
||||||
mtx: &sync.Mutex{},
|
mtx: &sync.Mutex{},
|
||||||
wg: &sync.WaitGroup{},
|
wg: &sync.WaitGroup{},
|
||||||
limiter: l,
|
tasks: make(chan rebuildTask),
|
||||||
tasks: make(chan rebuildTask, 10),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,25 +88,14 @@ func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.D
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
runRebuild(ctx, bs, mb, log, t.action, t.limiter)
|
runRebuild(ctx, bs, mb, log, t.fillPercent, t.limiter)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case r.tasks <- rebuildTask{
|
|
||||||
limiter: r.limiter,
|
|
||||||
action: common.RebuildAction{
|
|
||||||
SchemaChange: true,
|
|
||||||
},
|
|
||||||
}:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
|
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
|
||||||
action common.RebuildAction, limiter RebuildWorkerLimiter,
|
fillPercent int, limiter RebuildWorkerLimiter,
|
||||||
) {
|
) {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -115,23 +103,25 @@ func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *lo
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
log.Info(logs.BlobstoreRebuildStarted)
|
log.Info(logs.BlobstoreRebuildStarted)
|
||||||
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, action); err != nil {
|
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, fillPercent); err != nil {
|
||||||
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
|
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
|
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, action common.RebuildAction,
|
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, fillPercent int,
|
||||||
) error {
|
) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case r.tasks <- rebuildTask{
|
case r.tasks <- rebuildTask{
|
||||||
limiter: limiter,
|
limiter: limiter,
|
||||||
action: action,
|
fillPercent: fillPercent,
|
||||||
}:
|
}:
|
||||||
return nil
|
return nil
|
||||||
|
default:
|
||||||
|
return ErrRebuildInProgress
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,9 +188,5 @@ func (s *Shard) ScheduleRebuild(ctx context.Context, p RebuildPrm) error {
|
||||||
return ErrDegradedMode
|
return ErrDegradedMode
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, common.RebuildAction{
|
return s.rb.ScheduleRebuild(ctx, p.ConcurrencyLimiter, int(p.TargetFillPercent))
|
||||||
SchemaChange: true,
|
|
||||||
FillPercent: true,
|
|
||||||
FillPercentValue: int(p.TargetFillPercent),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,10 +139,6 @@ type cfg struct {
|
||||||
metricsWriter MetricsWriter
|
metricsWriter MetricsWriter
|
||||||
|
|
||||||
reportErrorFunc func(selfID string, message string, err error)
|
reportErrorFunc func(selfID string, message string, err error)
|
||||||
|
|
||||||
rebuildWorkersCount uint32
|
|
||||||
|
|
||||||
rebuildDisabled bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
func defaultCfg() *cfg {
|
||||||
|
@ -151,7 +147,6 @@ func defaultCfg() *cfg {
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
gcCfg: defaultGCCfg(),
|
gcCfg: defaultGCCfg(),
|
||||||
reportErrorFunc: func(string, string, error) {},
|
reportErrorFunc: func(string, string, error) {},
|
||||||
rebuildWorkersCount: 1,
|
|
||||||
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
|
zeroSizeContainersCallback: func(context.Context, []cid.ID) {},
|
||||||
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
|
zeroCountContainersCallback: func(context.Context, []cid.ID) {},
|
||||||
}
|
}
|
||||||
|
@ -384,14 +379,6 @@ func WithExpiredCollectorWorkerCount(count int) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithRebuildWorkersCount return option to set concurrent
|
|
||||||
// workers count of storage rebuild operation.
|
|
||||||
func WithRebuildWorkersCount(count uint32) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.rebuildWorkersCount = count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithDisabledGC disables GC.
|
// WithDisabledGC disables GC.
|
||||||
// For testing purposes only.
|
// For testing purposes only.
|
||||||
func WithDisabledGC() Option {
|
func WithDisabledGC() Option {
|
||||||
|
@ -414,14 +401,6 @@ func WithZeroCountCallback(cb EmptyContainersCallback) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithDisabledRebuild returns an option to disable a shard rebuild.
|
|
||||||
dstepanov-yadro
commented
Background rebuild disabled now. Background rebuild disabled now.
|
|||||||
// For testing purposes only.
|
|
||||||
func WithDisabledRebuild() Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.rebuildDisabled = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Shard) fillInfo() {
|
func (s *Shard) fillInfo() {
|
||||||
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo()
|
||||||
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo()
|
||||||
|
|
Loading…
Reference in a new issue
How is this change related to
disable rebuild
commit message?Describe it in the commit message.
Fixed