From da4fee2d0b693500bf917b3d850570ca889f0bbd Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 22 Sep 2023 17:08:48 +0300 Subject: [PATCH] [#698] blobovniczatree: Init blobovniczas concurrently Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 3 + cmd/frostfs-node/config/engine/config_test.go | 2 + .../shard/blobstor/blobovnicza/config.go | 19 ++++++ config/example/node.env | 1 + config/example/node.json | 3 +- config/example/node.yaml | 1 + .../blobstor/blobovniczatree/control.go | 49 ++++++++----- .../blobstor/blobovniczatree/option.go | 68 +++++++++++-------- 8 files changed, 102 insertions(+), 44 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index cc106cf9..840dbf33 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -181,6 +181,7 @@ type subStorageCfg struct { width uint64 leafWidth uint64 openedCacheSize int + initWorkerCount int } // readConfig fills applicationConfiguration with raw configuration values @@ -298,6 +299,7 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol sCfg.width = sub.ShallowWidth() sCfg.leafWidth = sub.LeafWidth() sCfg.openedCacheSize = sub.OpenedCacheSize() + sCfg.initWorkerCount = sub.InitWorkerCount() case fstree.Type: sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.depth = sub.Depth() @@ -796,6 +798,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage { blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth), blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), + blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount), blobovniczatree.WithLogger(c.log), } diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index 665f70bc..6b2b3722 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -97,6 +97,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 4, blz.ShallowWidth()) require.EqualValues(t, 50, blz.OpenedCacheSize()) require.EqualValues(t, 10, blz.LeafWidth()) + require.EqualValues(t, 10, blz.InitWorkerCount()) require.Equal(t, "tmp/0/blob", ss[1].Path()) require.EqualValues(t, 0o644, ss[1].Perm()) @@ -146,6 +147,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 4, blz.ShallowWidth()) require.EqualValues(t, 50, blz.OpenedCacheSize()) require.EqualValues(t, 10, blz.LeafWidth()) + require.EqualValues(t, blobovniczaconfig.InitWorkerCountDefault, blz.InitWorkerCount()) require.Equal(t, "tmp/1/blob", ss[1].Path()) require.EqualValues(t, 0o644, ss[1].Perm()) diff --git a/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza/config.go b/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza/config.go index a780ea92..1f7fc77f 100644 --- a/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza/config.go +++ b/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza/config.go @@ -22,6 +22,9 @@ const ( // OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's. OpenedCacheSizeDefault = 16 + + // InitWorkerCountDefault is a default workers count to initialize Blobovnicza's. + InitWorkerCountDefault = 5 ) // From wraps config section into Config. @@ -112,3 +115,19 @@ func (x *Config) LeafWidth() uint64 { "leaf_width", ) } + +// InitWorkerCount returns the value of "init_worker_count" config parameter. +// +// Returns InitWorkerCountDefault if the value is not a positive number. +func (x *Config) InitWorkerCount() int { + d := config.IntSafe( + (*config.Config)(x), + "init_worker_count", + ) + + if d > 0 { + return int(d) + } + + return InitWorkerCountDefault +} diff --git a/config/example/node.env b/config/example/node.env index f4dc218a..9d36b510 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -125,6 +125,7 @@ FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH=1 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10 +FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_WORKER_COUNT=10 ### FSTree config FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE=fstree FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH=tmp/0/blob diff --git a/config/example/node.json b/config/example/node.json index c2e3d0a5..b008009d 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -172,7 +172,8 @@ "depth": 1, "width": 4, "opened_cache_capacity": 50, - "leaf_width": 10 + "leaf_width": 10, + "init_worker_count": 10 }, { "type": "fstree", diff --git a/config/example/node.yaml b/config/example/node.yaml index d04e122b..bf92388a 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -184,6 +184,7 @@ storage: blobstor: - type: blobovnicza path: tmp/0/blob/blobovnicza + init_worker_count: 10 #count of workers to initialize blobovniczas - type: fstree path: tmp/0/blob # blobstor path diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/control.go b/pkg/local_object_storage/blobstor/blobovniczatree/control.go index 69f3ca97..ad463c11 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/control.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/control.go @@ -10,6 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var errFailedToChangeExtensionReadOnly = errors.New("failed to change blobovnicza extension: read only mode") @@ -49,37 +50,53 @@ func (b *Blobovniczas) initializeDBs(ctx context.Context) error { return err } + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(b.blzInitWorkerCount) visited := make(map[string]struct{}) - err = b.iterateExistingDBPaths(ctx, func(p string) (bool, error) { + err = b.iterateExistingDBPaths(egCtx, func(p string) (bool, error) { visited[p] = struct{}{} - shBlz := b.getBlobovniczaWithoutCaching(p) - _, err := shBlz.Open() - if err != nil { - return true, err - } - defer shBlz.Close() + eg.Go(func() error { + shBlz := b.getBlobovniczaWithoutCaching(p) + _, err := shBlz.Open() + if err != nil { + return err + } + defer shBlz.Close() - b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) + b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) + return nil + }) return false, nil }) if err != nil { + _ = eg.Wait() return err } - return b.iterateSortedLeaves(ctx, nil, func(p string) (bool, error) { + err = b.iterateSortedLeaves(egCtx, nil, func(p string) (bool, error) { if _, found := visited[p]; found { return false, nil } - shBlz := b.getBlobovniczaWithoutCaching(p) - _, err := shBlz.Open() - if err != nil { - return true, err - } - defer shBlz.Close() + eg.Go(func() error { + shBlz := b.getBlobovniczaWithoutCaching(p) + _, err := shBlz.Open() + if err != nil { + return err + } + defer shBlz.Close() - b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) + b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) + return nil + }) return false, nil }) + + if err != nil { + _ = eg.Wait() + return err + } + + return eg.Wait() } func (b *Blobovniczas) openManagers() { diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/option.go b/pkg/local_object_storage/blobstor/blobovniczatree/option.go index 54c52118..f1eb1eb8 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/option.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/option.go @@ -11,42 +11,44 @@ import ( ) type cfg struct { - log *logger.Logger - perm fs.FileMode - readOnly bool - rootPath string - openedCacheSize int - blzShallowDepth uint64 - blzShallowWidth uint64 - blzLeafWidth uint64 - compression *compression.Config - blzOpts []blobovnicza.Option - // reportError is the function called when encountering disk errors. - reportError func(string, error) - metrics Metrics - waitBeforeDropDB time.Duration + log *logger.Logger + perm fs.FileMode + readOnly bool + rootPath string + openedCacheSize int + blzShallowDepth uint64 + blzShallowWidth uint64 + blzLeafWidth uint64 + compression *compression.Config + blzOpts []blobovnicza.Option + reportError func(string, error) // reportError is the function called when encountering disk errors. + metrics Metrics + waitBeforeDropDB time.Duration + blzInitWorkerCount int } type Option func(*cfg) const ( - defaultPerm = 0o700 - defaultOpenedCacheSize = 50 - defaultBlzShallowDepth = 2 - defaultBlzShallowWidth = 16 - defaultWaitBeforeDropDB = 10 * time.Second + defaultPerm = 0o700 + defaultOpenedCacheSize = 50 + defaultBlzShallowDepth = 2 + defaultBlzShallowWidth = 16 + defaultWaitBeforeDropDB = 10 * time.Second + defaultBlzInitWorkerCount = 5 ) func initConfig(c *cfg) { *c = cfg{ - log: &logger.Logger{Logger: zap.L()}, - perm: defaultPerm, - openedCacheSize: defaultOpenedCacheSize, - blzShallowDepth: defaultBlzShallowDepth, - blzShallowWidth: defaultBlzShallowWidth, - reportError: func(string, error) {}, - metrics: &noopMetrics{}, - waitBeforeDropDB: defaultWaitBeforeDropDB, + log: &logger.Logger{Logger: zap.L()}, + perm: defaultPerm, + openedCacheSize: defaultOpenedCacheSize, + blzShallowDepth: defaultBlzShallowDepth, + blzShallowWidth: defaultBlzShallowWidth, + reportError: func(string, error) {}, + metrics: &noopMetrics{}, + waitBeforeDropDB: defaultWaitBeforeDropDB, + blzInitWorkerCount: defaultBlzInitWorkerCount, } } @@ -116,3 +118,15 @@ func WithWaitBeforeDropDB(t time.Duration) Option { c.waitBeforeDropDB = t } } + +// WithInitWorkerCount sets maximum workers count to init blobovnicza tree. +// +// Negative or zero value means no limit. +func WithInitWorkerCount(v int) Option { + if v <= 0 { + v = -1 + } + return func(c *cfg) { + c.blzInitWorkerCount = v + } +}