[#1024] shard: Resync metabase concurrently
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
1005bf4f56
commit
57466594fb
10 changed files with 93 additions and 39 deletions
|
@ -122,6 +122,7 @@ type shardCfg struct {
|
||||||
smallSizeObjectLimit uint64
|
smallSizeObjectLimit uint64
|
||||||
uncompressableContentType []string
|
uncompressableContentType []string
|
||||||
refillMetabase bool
|
refillMetabase bool
|
||||||
|
refillMetabaseWorkersCount int
|
||||||
mode shardmode.Mode
|
mode shardmode.Mode
|
||||||
|
|
||||||
metaCfg struct {
|
metaCfg struct {
|
||||||
|
@ -234,6 +235,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig
|
||||||
var newConfig shardCfg
|
var newConfig shardCfg
|
||||||
|
|
||||||
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
||||||
|
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
||||||
newConfig.mode = oldConfig.Mode()
|
newConfig.mode = oldConfig.Mode()
|
||||||
newConfig.compress = oldConfig.Compress()
|
newConfig.compress = oldConfig.Compress()
|
||||||
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
||||||
|
@ -986,6 +988,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
||||||
sh.shOpts = []shard.Option{
|
sh.shOpts = []shard.Option{
|
||||||
shard.WithLogger(c.log),
|
shard.WithLogger(c.log),
|
||||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||||
|
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
||||||
shard.WithMode(shCfg.mode),
|
shard.WithMode(shCfg.mode),
|
||||||
shard.WithBlobStorOptions(blobstoreOpts...),
|
shard.WithBlobStorOptions(blobstoreOpts...),
|
||||||
shard.WithMetaBaseOptions(mbOptions...),
|
shard.WithMetaBaseOptions(mbOptions...),
|
||||||
|
|
|
@ -117,6 +117,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, false, sc.RefillMetabase())
|
require.Equal(t, false, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadOnly, sc.Mode())
|
require.Equal(t, mode.ReadOnly, sc.Mode())
|
||||||
|
require.Equal(t, 100, sc.RefillMetabaseWorkersCount())
|
||||||
case 1:
|
case 1:
|
||||||
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
|
||||||
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
require.Equal(t, fs.FileMode(0o644), pl.Perm())
|
||||||
|
@ -168,6 +169,7 @@ func TestEngineSection(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, true, sc.RefillMetabase())
|
require.Equal(t, true, sc.RefillMetabase())
|
||||||
require.Equal(t, mode.ReadWrite, sc.Mode())
|
require.Equal(t, mode.ReadWrite, sc.Mode())
|
||||||
|
require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
|
@ -18,6 +18,7 @@ const (
|
||||||
// SmallSizeLimitDefault is a default limit of small objects payload in bytes.
|
// SmallSizeLimitDefault is a default limit of small objects payload in bytes.
|
||||||
SmallSizeLimitDefault = 1 << 20
|
SmallSizeLimitDefault = 1 << 20
|
||||||
EstimateCompressibilityThresholdDefault = 0.1
|
EstimateCompressibilityThresholdDefault = 0.1
|
||||||
|
RefillMetabaseWorkersCountDefault = 500
|
||||||
)
|
)
|
||||||
|
|
||||||
// From wraps config section into Config.
|
// From wraps config section into Config.
|
||||||
|
@ -134,6 +135,20 @@ func (x *Config) RefillMetabase() bool {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RefillMetabaseWorkersCount returns the value of "resync_metabase_worker_count" config parameter.
|
||||||
|
//
|
||||||
|
// Returns RefillMetabaseWorkersCountDefault if the value is not a positive number.
|
||||||
|
func (x *Config) RefillMetabaseWorkersCount() int {
|
||||||
|
v := config.IntSafe(
|
||||||
|
(*config.Config)(x),
|
||||||
|
"resync_metabase_worker_count",
|
||||||
|
)
|
||||||
|
if v > 0 {
|
||||||
|
return int(v)
|
||||||
|
}
|
||||||
|
return RefillMetabaseWorkersCountDefault
|
||||||
|
}
|
||||||
|
|
||||||
// Mode return the value of "mode" config parameter.
|
// Mode return the value of "mode" config parameter.
|
||||||
//
|
//
|
||||||
// Panics if read the value is not one of predefined
|
// Panics if read the value is not one of predefined
|
||||||
|
|
|
@ -98,6 +98,7 @@ FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000
|
||||||
## 0 shard
|
## 0 shard
|
||||||
### Flag to refill Metabase from BlobStor
|
### Flag to refill Metabase from BlobStor
|
||||||
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false
|
||||||
|
FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100
|
||||||
### Flag to set shard mode
|
### Flag to set shard mode
|
||||||
FROSTFS_STORAGE_SHARD_0_MODE=read-only
|
FROSTFS_STORAGE_SHARD_0_MODE=read-only
|
||||||
### Write cache config
|
### Write cache config
|
||||||
|
|
|
@ -144,6 +144,7 @@
|
||||||
"0": {
|
"0": {
|
||||||
"mode": "read-only",
|
"mode": "read-only",
|
||||||
"resync_metabase": false,
|
"resync_metabase": false,
|
||||||
|
"resync_metabase_worker_count": 100,
|
||||||
"writecache": {
|
"writecache": {
|
||||||
"enabled": false,
|
"enabled": false,
|
||||||
"no_sync": true,
|
"no_sync": true,
|
||||||
|
|
|
@ -167,6 +167,7 @@ storage:
|
||||||
# degraded-read-only
|
# degraded-read-only
|
||||||
# disabled (do not work with the shard, allows to not remove it from the config)
|
# disabled (do not work with the shard, allows to not remove it from the config)
|
||||||
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
|
resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding
|
||||||
|
resync_metabase_worker_count: 100
|
||||||
|
|
||||||
writecache:
|
writecache:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
|
|
@ -180,13 +180,14 @@ Contains configuration for each shard. Keys must be consecutive numbers starting
|
||||||
The following table describes configuration for each shard.
|
The following table describes configuration for each shard.
|
||||||
|
|
||||||
| Parameter | Type | Default value | Description |
|
| Parameter | Type | Default value | Description |
|
||||||
|-------------------------------------|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
| ------------------------------------------------ | ------------------------------------------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||||
| `compress` | `bool` | `false` | Flag to enable compression. |
|
| `compress` | `bool` | `false` | Flag to enable compression. |
|
||||||
| `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). |
|
| `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). |
|
||||||
| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. |
|
| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. |
|
||||||
| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. |
|
| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. |
|
||||||
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
| `mode` | `string` | `read-write` | Shard Mode.<br/>Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` |
|
||||||
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
| `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. |
|
||||||
|
| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. |
|
||||||
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
| `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. |
|
||||||
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
| `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. |
|
||||||
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
| `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. |
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
func (s *Shard) handleMetabaseFailure(stage string, err error) error {
|
||||||
|
@ -185,9 +186,14 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
return fmt.Errorf("could not reset metabase: %w", err)
|
return fmt.Errorf("could not reset metabase: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj := objectSDK.New()
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
if s.cfg.refillMetabaseWorkersCount > 0 {
|
||||||
|
eg.SetLimit(s.cfg.refillMetabaseWorkersCount)
|
||||||
|
}
|
||||||
|
|
||||||
err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error {
|
||||||
|
eg.Go(func() error {
|
||||||
|
obj := objectSDK.New()
|
||||||
if err := obj.Unmarshal(data); err != nil {
|
if err := obj.Unmarshal(data); err != nil {
|
||||||
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
s.log.Warn(logs.ShardCouldNotUnmarshalObject,
|
||||||
zap.Stringer("address", addr),
|
zap.Stringer("address", addr),
|
||||||
|
@ -198,9 +204,9 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
var err error
|
var err error
|
||||||
switch obj.Type() {
|
switch obj.Type() {
|
||||||
case objectSDK.TypeTombstone:
|
case objectSDK.TypeTombstone:
|
||||||
err = s.refillTombstoneObject(ctx, obj)
|
err = s.refillTombstoneObject(egCtx, obj)
|
||||||
case objectSDK.TypeLock:
|
case objectSDK.TypeLock:
|
||||||
err = s.refillLockObject(ctx, obj)
|
err = s.refillLockObject(egCtx, obj)
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -211,13 +217,25 @@ func (s *Shard) refillMetabase(ctx context.Context) error {
|
||||||
mPrm.SetObject(obj)
|
mPrm.SetObject(obj)
|
||||||
mPrm.SetStorageID(descriptor)
|
mPrm.SetStorageID(descriptor)
|
||||||
|
|
||||||
_, err = s.metaBase.Put(ctx, mPrm)
|
_, err = s.metaBase.Put(egCtx, mPrm)
|
||||||
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-egCtx.Done():
|
||||||
|
return egCtx.Err()
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
egErr := eg.Wait()
|
||||||
|
|
||||||
|
err = errors.Join(egErr, itErr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not put objects to the meta: %w", err)
|
return fmt.Errorf("could not put objects to the meta: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -29,7 +30,10 @@ func BenchmarkRefillMetabase(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchRefillMetabase(b *testing.B, objectsCount int) {
|
func benchRefillMetabase(b *testing.B, objectsCount int) {
|
||||||
sh := newShard(b, false)
|
sh := newCustomShard(b, false, shardOptions{
|
||||||
|
additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)},
|
||||||
|
})
|
||||||
|
|
||||||
defer func() { require.NoError(b, sh.Close()) }()
|
defer func() { require.NoError(b, sh.Close()) }()
|
||||||
|
|
||||||
var putPrm PutPrm
|
var putPrm PutPrm
|
||||||
|
|
|
@ -96,6 +96,7 @@ type cfg struct {
|
||||||
m sync.RWMutex
|
m sync.RWMutex
|
||||||
|
|
||||||
refillMetabase bool
|
refillMetabase bool
|
||||||
|
refillMetabaseWorkersCount int
|
||||||
|
|
||||||
rmBatchSize int
|
rmBatchSize int
|
||||||
|
|
||||||
|
@ -300,6 +301,13 @@ func WithRefillMetabase(v bool) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithRefillMetabaseWorkersCount returns option to set count of workers to refill the Metabase on Shard's initialization step.
|
||||||
|
func WithRefillMetabaseWorkersCount(v int) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.refillMetabaseWorkersCount = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
|
// WithMode returns option to set shard's mode. Mode must be one of the predefined:
|
||||||
// - mode.ReadWrite;
|
// - mode.ReadWrite;
|
||||||
// - mode.ReadOnly.
|
// - mode.ReadOnly.
|
||||||
|
|
Loading…
Reference in a new issue