From 8afb42aae850865bd0792c4397d595d813984941 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 22 Sep 2023 17:08:48 +0300 Subject: [PATCH] [#698] blobovniczatree: Init blobovniczas concurrently Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/config.go | 3 + cmd/frostfs-node/config/engine/config_test.go | 16 +++-- .../shard/blobstor/blobovnicza/config.go | 19 ++++++ config/example/node.env | 1 + config/example/node.json | 3 +- config/example/node.yaml | 1 + .../blobstor/blobovniczatree/control.go | 53 ++++++++++----- .../blobstor/blobovniczatree/option.go | 68 +++++++++++-------- 8 files changed, 111 insertions(+), 53 deletions(-) diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 60e567c5a..c4eb81c81 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -176,6 +176,7 @@ type subStorageCfg struct { width uint64 leafWidth uint64 openedCacheSize int + initWorkerCount int } // readConfig fills applicationConfiguration with raw configuration values @@ -291,6 +292,7 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol sCfg.width = sub.ShallowWidth() sCfg.leafWidth = sub.LeafWidth() sCfg.openedCacheSize = sub.OpenedCacheSize() + sCfg.initWorkerCount = sub.InitWorkerCount() case fstree.Type: sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.depth = sub.Depth() @@ -780,6 +782,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage { blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth), blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), + blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount), blobovniczatree.WithLogger(c.log), } diff --git a/cmd/frostfs-node/config/engine/config_test.go b/cmd/frostfs-node/config/engine/config_test.go index b8e95db6d..2f47229bc 100644 --- a/cmd/frostfs-node/config/engine/config_test.go +++ b/cmd/frostfs-node/config/engine/config_test.go @@ -42,7 +42,7 @@ func TestEngineSection(t *testing.T) { const path = "../../../../config/example/node" - var fileConfigTest = func(c *config.Config) { + fileConfigTest := func(c *config.Config) { num := 0 require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c)) @@ -78,7 +78,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 3221225472, wc.SizeLimit()) require.Equal(t, "tmp/0/meta", meta.Path()) - require.Equal(t, fs.FileMode(0644), meta.BoltDB().Perm()) + require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, 100, meta.BoltDB().MaxBatchSize()) require.Equal(t, 10*time.Millisecond, meta.BoltDB().MaxBatchDelay()) @@ -89,15 +89,16 @@ func TestEngineSection(t *testing.T) { require.Equal(t, 2, len(ss)) blz := blobovniczaconfig.From((*config.Config)(ss[0])) require.Equal(t, "tmp/0/blob/blobovnicza", ss[0].Path()) - require.EqualValues(t, 0644, blz.BoltDB().Perm()) + require.EqualValues(t, 0o644, blz.BoltDB().Perm()) require.EqualValues(t, 4194304, blz.Size()) require.EqualValues(t, 1, blz.ShallowDepth()) require.EqualValues(t, 4, blz.ShallowWidth()) require.EqualValues(t, 50, blz.OpenedCacheSize()) require.EqualValues(t, 10, blz.LeafWidth()) + require.EqualValues(t, 10, blz.InitWorkerCount()) require.Equal(t, "tmp/0/blob", ss[1].Path()) - require.EqualValues(t, 0644, ss[1].Perm()) + require.EqualValues(t, 0o644, ss[1].Perm()) fst := fstreeconfig.From((*config.Config)(ss[1])) require.EqualValues(t, 5, fst.Depth()) @@ -112,7 +113,7 @@ func TestEngineSection(t *testing.T) { require.Equal(t, mode.ReadOnly, sc.Mode()) case 1: require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path()) - require.Equal(t, fs.FileMode(0644), pl.Perm()) + require.Equal(t, fs.FileMode(0o644), pl.Perm()) require.True(t, pl.NoSync()) require.Equal(t, 5*time.Millisecond, pl.MaxBatchDelay()) require.Equal(t, 100, pl.MaxBatchSize()) @@ -127,7 +128,7 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 4294967296, wc.SizeLimit()) require.Equal(t, "tmp/1/meta", meta.Path()) - require.Equal(t, fs.FileMode(0644), meta.BoltDB().Perm()) + require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm()) require.Equal(t, 200, meta.BoltDB().MaxBatchSize()) require.Equal(t, 20*time.Millisecond, meta.BoltDB().MaxBatchDelay()) @@ -144,9 +145,10 @@ func TestEngineSection(t *testing.T) { require.EqualValues(t, 4, blz.ShallowWidth()) require.EqualValues(t, 50, blz.OpenedCacheSize()) require.EqualValues(t, 10, blz.LeafWidth()) + require.EqualValues(t, blobovniczaconfig.InitWorkerCountDefault, blz.InitWorkerCount()) require.Equal(t, "tmp/1/blob", ss[1].Path()) - require.EqualValues(t, 0644, ss[1].Perm()) + require.EqualValues(t, 0o644, ss[1].Perm()) fst := fstreeconfig.From((*config.Config)(ss[1])) require.EqualValues(t, 5, fst.Depth()) diff --git a/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza/config.go b/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza/config.go index a780ea927..a4dbf6fae 100644 --- a/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza/config.go +++ b/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza/config.go @@ -22,6 +22,9 @@ const ( // OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's. OpenedCacheSizeDefault = 16 + + // InitWorkerCountDefault is a default workers count to initialize Blobovnicza's. + InitWorkerCountDefault = 5 ) // From wraps config section into Config. @@ -112,3 +115,19 @@ func (x *Config) LeafWidth() uint64 { "leaf_width", ) } + +// InitWorkersCount returns the value of "init_worker_count" config parameter. +// +// Returns InitWorkerCountDefault if the value is not a positive number. +func (x *Config) InitWorkerCount() int { + d := config.IntSafe( + (*config.Config)(x), + "init_worker_count", + ) + + if d > 0 { + return int(d) + } + + return InitWorkerCountDefault +} diff --git a/config/example/node.env b/config/example/node.env index fde65173b..b2e694582 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -123,6 +123,7 @@ FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH=1 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10 +FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_WORKER_COUNT=10 ### FSTree config FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE=fstree FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH=tmp/0/blob diff --git a/config/example/node.json b/config/example/node.json index e8455ee55..5beefd334 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -170,7 +170,8 @@ "depth": 1, "width": 4, "opened_cache_capacity": 50, - "leaf_width": 10 + "leaf_width": 10, + "init_worker_count": 10 }, { "type": "fstree", diff --git a/config/example/node.yaml b/config/example/node.yaml index 2ca1b426c..d603f0d89 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -182,6 +182,7 @@ storage: blobstor: - type: blobovnicza path: tmp/0/blob/blobovnicza + init_worker_count: 10 #count of workers to initialize blobovniczas - type: fstree path: tmp/0/blob # blobstor path diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/control.go b/pkg/local_object_storage/blobstor/blobovniczatree/control.go index 8a1c7e8f5..ad463c113 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/control.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/control.go @@ -10,6 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) var errFailedToChangeExtensionReadOnly = errors.New("failed to change blobovnicza extension: read only mode") @@ -49,48 +50,64 @@ func (b *Blobovniczas) initializeDBs(ctx context.Context) error { return err } + eg, egCtx := errgroup.WithContext(ctx) + eg.SetLimit(b.blzInitWorkerCount) visited := make(map[string]struct{}) - err = b.iterateExistingDBPaths(ctx, func(p string) (bool, error) { + err = b.iterateExistingDBPaths(egCtx, func(p string) (bool, error) { visited[p] = struct{}{} - shBlz := b.getBlobovniczaWithoutCaching(p) - _, err := shBlz.Open() - if err != nil { - return true, err - } - defer shBlz.Close() + eg.Go(func() error { + shBlz := b.getBlobovniczaWithoutCaching(p) + _, err := shBlz.Open() + if err != nil { + return err + } + defer shBlz.Close() - b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) + b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) + return nil + }) return false, nil }) if err != nil { + _ = eg.Wait() return err } - return b.iterateSortedLeaves(ctx, nil, func(p string) (bool, error) { + err = b.iterateSortedLeaves(egCtx, nil, func(p string) (bool, error) { if _, found := visited[p]; found { return false, nil } - shBlz := b.getBlobovniczaWithoutCaching(p) - _, err := shBlz.Open() - if err != nil { - return true, err - } - defer shBlz.Close() + eg.Go(func() error { + shBlz := b.getBlobovniczaWithoutCaching(p) + _, err := shBlz.Open() + if err != nil { + return err + } + defer shBlz.Close() - b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) + b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) + return nil + }) return false, nil }) + + if err != nil { + _ = eg.Wait() + return err + } + + return eg.Wait() } func (b *Blobovniczas) openManagers() { - b.commondbManager.Open() //order important + b.commondbManager.Open() // order important b.activeDBManager.Open() b.dbCache.Open() } // Close implements common.Storage. func (b *Blobovniczas) Close() error { - b.dbCache.Close() //order important + b.dbCache.Close() // order important b.activeDBManager.Close() b.commondbManager.Close() diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/option.go b/pkg/local_object_storage/blobstor/blobovniczatree/option.go index 0b444b4fc..f1eb1eb8e 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/option.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/option.go @@ -11,42 +11,44 @@ import ( ) type cfg struct { - log *logger.Logger - perm fs.FileMode - readOnly bool - rootPath string - openedCacheSize int - blzShallowDepth uint64 - blzShallowWidth uint64 - blzLeafWidth uint64 - compression *compression.Config - blzOpts []blobovnicza.Option - // reportError is the function called when encountering disk errors. - reportError func(string, error) - metrics Metrics - waitBeforeDropDB time.Duration + log *logger.Logger + perm fs.FileMode + readOnly bool + rootPath string + openedCacheSize int + blzShallowDepth uint64 + blzShallowWidth uint64 + blzLeafWidth uint64 + compression *compression.Config + blzOpts []blobovnicza.Option + reportError func(string, error) // reportError is the function called when encountering disk errors. + metrics Metrics + waitBeforeDropDB time.Duration + blzInitWorkerCount int } type Option func(*cfg) const ( - defaultPerm = 0700 - defaultOpenedCacheSize = 50 - defaultBlzShallowDepth = 2 - defaultBlzShallowWidth = 16 - defaultWaitBeforeDropDB = 10 * time.Second + defaultPerm = 0o700 + defaultOpenedCacheSize = 50 + defaultBlzShallowDepth = 2 + defaultBlzShallowWidth = 16 + defaultWaitBeforeDropDB = 10 * time.Second + defaultBlzInitWorkerCount = 5 ) func initConfig(c *cfg) { *c = cfg{ - log: &logger.Logger{Logger: zap.L()}, - perm: defaultPerm, - openedCacheSize: defaultOpenedCacheSize, - blzShallowDepth: defaultBlzShallowDepth, - blzShallowWidth: defaultBlzShallowWidth, - reportError: func(string, error) {}, - metrics: &noopMetrics{}, - waitBeforeDropDB: defaultWaitBeforeDropDB, + log: &logger.Logger{Logger: zap.L()}, + perm: defaultPerm, + openedCacheSize: defaultOpenedCacheSize, + blzShallowDepth: defaultBlzShallowDepth, + blzShallowWidth: defaultBlzShallowWidth, + reportError: func(string, error) {}, + metrics: &noopMetrics{}, + waitBeforeDropDB: defaultWaitBeforeDropDB, + blzInitWorkerCount: defaultBlzInitWorkerCount, } } @@ -116,3 +118,15 @@ func WithWaitBeforeDropDB(t time.Duration) Option { c.waitBeforeDropDB = t } } + +// WithInitWorkerCount sets maximum workers count to init blobovnicza tree. +// +// Negative or zero value means no limit. +func WithInitWorkerCount(v int) Option { + if v <= 0 { + v = -1 + } + return func(c *cfg) { + c.blzInitWorkerCount = v + } +}