From 57466594fbef35d014479e7d96ccd17db438fef5 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 5 Mar 2024 15:39:50 +0300 Subject: [PATCH] [#1024] shard: Resync metabase concurrently Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 11 ++- cmd/frostfs-node/config/engine/config_test.go | 2 + .../config/engine/shard/config.go | 15 ++++ config/example/node.env | 1 + config/example/node.json | 1 + config/example/node.yaml | 1 + docs/storage-node-configuration.md | 9 ++- pkg/local_object_storage/shard/control.go | 76 ++++++++++++------- pkg/local_object_storage/shard/refill_test.go | 6 +- pkg/local_object_storage/shard/shard.go | 10 ++- 10 files changed, 93 insertions(+), 39 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index d78a90cfc..c5ed8b50c 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -119,10 +119,11 @@ type shardCfg struct { estimateCompressibility bool estimateCompressibilityThreshold float64 - smallSizeObjectLimit uint64 - uncompressableContentType []string - refillMetabase bool - mode shardmode.Mode + smallSizeObjectLimit uint64 + uncompressableContentType []string + refillMetabase bool + refillMetabaseWorkersCount int + mode shardmode.Mode metaCfg struct { path string @@ -234,6 +235,7 @@ func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig var newConfig shardCfg newConfig.refillMetabase = oldConfig.RefillMetabase() + newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount() newConfig.mode = oldConfig.Mode() newConfig.compress = oldConfig.Compress() newConfig.estimateCompressibility = oldConfig.EstimateCompressibility() @@ -986,6 +988,7 @@ func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID { sh.shOpts = []shard.Option{ shard.WithLogger(c.log), shard.WithRefillMetabase(shCfg.refillMetabase), + shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount), shard.WithMode(shCfg.mode), shard.WithBlobStorOptions(blobstoreOpts...), shard.WithMetaBaseOptions(mbOptions...), diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index 86c938309..b5c926fc3 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -117,6 +117,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, false, sc.RefillMetabase()) require.Equal(t, mode.ReadOnly, sc.Mode()) + require.Equal(t, 100, sc.RefillMetabaseWorkersCount()) case 1: require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path()) require.Equal(t, fs.FileMode(0o644), pl.Perm()) @@ -168,6 +169,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, true, sc.RefillMetabase()) require.Equal(t, mode.ReadWrite, sc.Mode()) + require.Equal(t, shardconfig.RefillMetabaseWorkersCountDefault, sc.RefillMetabaseWorkersCount()) } return nil }) diff --git a/cmd/frostfs-node/config/engine/shard/config.go b/cmd/frostfs-node/config/engine/shard/config.go index f9cfdf2a8..0620c9f63 100644 --- a/cmd/frostfs-node/config/engine/shard/config.go +++ b/cmd/frostfs-node/config/engine/shard/config.go @@ -18,6 +18,7 @@ const ( // SmallSizeLimitDefault is a default limit of small objects payload in bytes. SmallSizeLimitDefault = 1 << 20 EstimateCompressibilityThresholdDefault = 0.1 + RefillMetabaseWorkersCountDefault = 500 ) // From wraps config section into Config. @@ -134,6 +135,20 @@ func (x *Config) RefillMetabase() bool { ) } +// RefillMetabaseWorkersCount returns the value of "resync_metabase_worker_count" config parameter. +// +// Returns RefillMetabaseWorkersCountDefault if the value is not a positive number. +func (x *Config) RefillMetabaseWorkersCount() int { + v := config.IntSafe( + (*config.Config)(x), + "resync_metabase_worker_count", + ) + if v > 0 { + return int(v) + } + return RefillMetabaseWorkersCountDefault +} + // Mode return the value of "mode" config parameter. // // Panics if read the value is not one of predefined diff --git a/config/example/node.env b/config/example/node.env index 976c42629..9068886d3 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -98,6 +98,7 @@ FROSTFS_STORAGE_REBUILD_WORKERS_COUNT=1000 ## 0 shard ### Flag to refill Metabase from BlobStor FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE=false +FROSTFS_STORAGE_SHARD_0_RESYNC_METABASE_WORKER_COUNT=100 ### Flag to set shard mode FROSTFS_STORAGE_SHARD_0_MODE=read-only ### Write cache config diff --git a/config/example/node.json b/config/example/node.json index 648fb77b0..bae28720b 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -144,6 +144,7 @@ "0": { "mode": "read-only", "resync_metabase": false, + "resync_metabase_worker_count": 100, "writecache": { "enabled": false, "no_sync": true, diff --git a/config/example/node.yaml b/config/example/node.yaml index 2dcf7c4d9..7a824adbe 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -167,6 +167,7 @@ storage: # degraded-read-only # disabled (do not work with the shard, allows to not remove it from the config) resync_metabase: false # sync metabase with blobstor on start, expensive, leave false until complete understanding + resync_metabase_worker_count: 100 writecache: enabled: false diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 02ead3020..aa3a17815 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -179,14 +179,15 @@ Contains configuration for each shard. Keys must be consecutive numbers starting `default` subsection has the same format and specifies defaults for missing values. The following table describes configuration for each shard. -| Parameter | Type | Default value | Description | -|-------------------------------------|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Parameter | Type | Default value | Description | +| ------------------------------------------------ | ------------------------------------------- | ------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | `compress` | `bool` | `false` | Flag to enable compression. | | `compression_exclude_content_types` | `[]string` | | List of content-types to disable compression for. Content-type is taken from `Content-Type` object attribute. Each element can contain a star `*` as a first (last) character, which matches any prefix (suffix). | -| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. | -| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. | +| `compression_estimate_compressibility` | `bool` | `false` | If `true`, then noramalized compressibility estimation is used to decide compress data or not. | +| `compression_estimate_compressibility_threshold` | `float` | `0.1` | Normilized compressibility estimate threshold: data will compress if estimation if greater than this value. | | `mode` | `string` | `read-write` | Shard Mode.
Possible values: `read-write`, `read-only`, `degraded`, `degraded-read-only`, `disabled` | | `resync_metabase` | `bool` | `false` | Flag to enable metabase resync on start. | +| `resync_metabase_worker_count` | `int` | `1000` | Count of concurrent workers to resync metabase. | | `writecache` | [Writecache config](#writecache-subsection) | | Write-cache configuration. | | `metabase` | [Metabase config](#metabase-subsection) | | Metabase configuration. | | `blobstor` | [Blobstor config](#blobstor-subsection) | | Blobstor configuration. | diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 774fe8045..d080e2919 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -15,6 +15,7 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) func (s *Shard) handleMetabaseFailure(stage string, err error) error { @@ -185,39 +186,56 @@ func (s *Shard) refillMetabase(ctx context.Context) error { return fmt.Errorf("could not reset metabase: %w", err) } - obj := objectSDK.New() + eg, egCtx := errgroup.WithContext(ctx) + if s.cfg.refillMetabaseWorkersCount > 0 { + eg.SetLimit(s.cfg.refillMetabaseWorkersCount) + } - err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { - if err := obj.Unmarshal(data); err != nil { - s.log.Warn(logs.ShardCouldNotUnmarshalObject, - zap.Stringer("address", addr), - zap.String("err", err.Error())) + itErr := blobstor.IterateBinaryObjects(egCtx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { + eg.Go(func() error { + obj := objectSDK.New() + if err := obj.Unmarshal(data); err != nil { + s.log.Warn(logs.ShardCouldNotUnmarshalObject, + zap.Stringer("address", addr), + zap.String("err", err.Error())) + return nil + } + + var err error + switch obj.Type() { + case objectSDK.TypeTombstone: + err = s.refillTombstoneObject(egCtx, obj) + case objectSDK.TypeLock: + err = s.refillLockObject(egCtx, obj) + default: + } + if err != nil { + return err + } + + var mPrm meta.PutPrm + mPrm.SetObject(obj) + mPrm.SetStorageID(descriptor) + + _, err = s.metaBase.Put(egCtx, mPrm) + if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { + return err + } + + return nil + }) + + select { + case <-egCtx.Done(): + return egCtx.Err() + default: return nil } - - var err error - switch obj.Type() { - case objectSDK.TypeTombstone: - err = s.refillTombstoneObject(ctx, obj) - case objectSDK.TypeLock: - err = s.refillLockObject(ctx, obj) - default: - } - if err != nil { - return err - } - - var mPrm meta.PutPrm - mPrm.SetObject(obj) - mPrm.SetStorageID(descriptor) - - _, err = s.metaBase.Put(ctx, mPrm) - if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !errors.Is(err, meta.ErrObjectIsExpired) { - return err - } - - return nil }) + + egErr := eg.Wait() + + err = errors.Join(egErr, itErr) if err != nil { return fmt.Errorf("could not put objects to the meta: %w", err) } diff --git a/pkg/local_object_storage/shard/refill_test.go b/pkg/local_object_storage/shard/refill_test.go index 07ec9180c..509ccaaa6 100644 --- a/pkg/local_object_storage/shard/refill_test.go +++ b/pkg/local_object_storage/shard/refill_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" @@ -29,7 +30,10 @@ func BenchmarkRefillMetabase(b *testing.B) { } func benchRefillMetabase(b *testing.B, objectsCount int) { - sh := newShard(b, false) + sh := newCustomShard(b, false, shardOptions{ + additionalShardOptions: []Option{WithRefillMetabaseWorkersCount(shardconfig.RefillMetabaseWorkersCountDefault)}, + }) + defer func() { require.NoError(b, sh.Close()) }() var putPrm PutPrm diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 8368f6db4..81d854373 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -95,7 +95,8 @@ type MetricsWriter interface { type cfg struct { m sync.RWMutex - refillMetabase bool + refillMetabase bool + refillMetabaseWorkersCount int rmBatchSize int @@ -300,6 +301,13 @@ func WithRefillMetabase(v bool) Option { } } +// WithRefillMetabaseWorkersCount returns option to set count of workers to refill the Metabase on Shard's initialization step. +func WithRefillMetabaseWorkersCount(v int) Option { + return func(c *cfg) { + c.refillMetabaseWorkersCount = v + } +} + // WithMode returns option to set shard's mode. Mode must be one of the predefined: // - mode.ReadWrite; // - mode.ReadOnly.