[#698] blobovniczatree: Init blobovniczas concurrently

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-09-22 17:08:48 +03:00
parent 71fac2b9c3
commit 8afb42aae8
8 changed files with 111 additions and 53 deletions

View file

@ -176,6 +176,7 @@ type subStorageCfg struct {
width uint64 width uint64
leafWidth uint64 leafWidth uint64
openedCacheSize int openedCacheSize int
initWorkerCount int
} }
// readConfig fills applicationConfiguration with raw configuration values // readConfig fills applicationConfiguration with raw configuration values
@ -291,6 +292,7 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
sCfg.width = sub.ShallowWidth() sCfg.width = sub.ShallowWidth()
sCfg.leafWidth = sub.LeafWidth() sCfg.leafWidth = sub.LeafWidth()
sCfg.openedCacheSize = sub.OpenedCacheSize() sCfg.openedCacheSize = sub.OpenedCacheSize()
sCfg.initWorkerCount = sub.InitWorkerCount()
case fstree.Type: case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth() sCfg.depth = sub.Depth()
@ -780,6 +782,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth), blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
blobovniczatree.WithLogger(c.log), blobovniczatree.WithLogger(c.log),
} }

View file

@ -42,7 +42,7 @@ func TestEngineSection(t *testing.T) {
const path = "../../../../config/example/node" const path = "../../../../config/example/node"
var fileConfigTest = func(c *config.Config) { fileConfigTest := func(c *config.Config) {
num := 0 num := 0
require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c)) require.EqualValues(t, 100, engineconfig.ShardErrorThreshold(c))
@ -78,7 +78,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 3221225472, wc.SizeLimit()) require.EqualValues(t, 3221225472, wc.SizeLimit())
require.Equal(t, "tmp/0/meta", meta.Path()) require.Equal(t, "tmp/0/meta", meta.Path())
require.Equal(t, fs.FileMode(0644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
require.Equal(t, 100, meta.BoltDB().MaxBatchSize()) require.Equal(t, 100, meta.BoltDB().MaxBatchSize())
require.Equal(t, 10*time.Millisecond, meta.BoltDB().MaxBatchDelay()) require.Equal(t, 10*time.Millisecond, meta.BoltDB().MaxBatchDelay())
@ -89,15 +89,16 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, 2, len(ss)) require.Equal(t, 2, len(ss))
blz := blobovniczaconfig.From((*config.Config)(ss[0])) blz := blobovniczaconfig.From((*config.Config)(ss[0]))
require.Equal(t, "tmp/0/blob/blobovnicza", ss[0].Path()) require.Equal(t, "tmp/0/blob/blobovnicza", ss[0].Path())
require.EqualValues(t, 0644, blz.BoltDB().Perm()) require.EqualValues(t, 0o644, blz.BoltDB().Perm())
require.EqualValues(t, 4194304, blz.Size()) require.EqualValues(t, 4194304, blz.Size())
require.EqualValues(t, 1, blz.ShallowDepth()) require.EqualValues(t, 1, blz.ShallowDepth())
require.EqualValues(t, 4, blz.ShallowWidth()) require.EqualValues(t, 4, blz.ShallowWidth())
require.EqualValues(t, 50, blz.OpenedCacheSize()) require.EqualValues(t, 50, blz.OpenedCacheSize())
require.EqualValues(t, 10, blz.LeafWidth()) require.EqualValues(t, 10, blz.LeafWidth())
require.EqualValues(t, 10, blz.InitWorkerCount())
require.Equal(t, "tmp/0/blob", ss[1].Path()) require.Equal(t, "tmp/0/blob", ss[1].Path())
require.EqualValues(t, 0644, ss[1].Perm()) require.EqualValues(t, 0o644, ss[1].Perm())
fst := fstreeconfig.From((*config.Config)(ss[1])) fst := fstreeconfig.From((*config.Config)(ss[1]))
require.EqualValues(t, 5, fst.Depth()) require.EqualValues(t, 5, fst.Depth())
@ -112,7 +113,7 @@ func TestEngineSection(t *testing.T) {
require.Equal(t, mode.ReadOnly, sc.Mode()) require.Equal(t, mode.ReadOnly, sc.Mode())
case 1: case 1:
require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path()) require.Equal(t, "tmp/1/blob/pilorama.db", pl.Path())
require.Equal(t, fs.FileMode(0644), pl.Perm()) require.Equal(t, fs.FileMode(0o644), pl.Perm())
require.True(t, pl.NoSync()) require.True(t, pl.NoSync())
require.Equal(t, 5*time.Millisecond, pl.MaxBatchDelay()) require.Equal(t, 5*time.Millisecond, pl.MaxBatchDelay())
require.Equal(t, 100, pl.MaxBatchSize()) require.Equal(t, 100, pl.MaxBatchSize())
@ -127,7 +128,7 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 4294967296, wc.SizeLimit()) require.EqualValues(t, 4294967296, wc.SizeLimit())
require.Equal(t, "tmp/1/meta", meta.Path()) require.Equal(t, "tmp/1/meta", meta.Path())
require.Equal(t, fs.FileMode(0644), meta.BoltDB().Perm()) require.Equal(t, fs.FileMode(0o644), meta.BoltDB().Perm())
require.Equal(t, 200, meta.BoltDB().MaxBatchSize()) require.Equal(t, 200, meta.BoltDB().MaxBatchSize())
require.Equal(t, 20*time.Millisecond, meta.BoltDB().MaxBatchDelay()) require.Equal(t, 20*time.Millisecond, meta.BoltDB().MaxBatchDelay())
@ -144,9 +145,10 @@ func TestEngineSection(t *testing.T) {
require.EqualValues(t, 4, blz.ShallowWidth()) require.EqualValues(t, 4, blz.ShallowWidth())
require.EqualValues(t, 50, blz.OpenedCacheSize()) require.EqualValues(t, 50, blz.OpenedCacheSize())
require.EqualValues(t, 10, blz.LeafWidth()) require.EqualValues(t, 10, blz.LeafWidth())
require.EqualValues(t, blobovniczaconfig.InitWorkerCountDefault, blz.InitWorkerCount())
require.Equal(t, "tmp/1/blob", ss[1].Path()) require.Equal(t, "tmp/1/blob", ss[1].Path())
require.EqualValues(t, 0644, ss[1].Perm()) require.EqualValues(t, 0o644, ss[1].Perm())
fst := fstreeconfig.From((*config.Config)(ss[1])) fst := fstreeconfig.From((*config.Config)(ss[1]))
require.EqualValues(t, 5, fst.Depth()) require.EqualValues(t, 5, fst.Depth())

View file

@ -22,6 +22,9 @@ const (
// OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's. // OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's.
OpenedCacheSizeDefault = 16 OpenedCacheSizeDefault = 16
// InitWorkerCountDefault is a default workers count to initialize Blobovnicza's.
InitWorkerCountDefault = 5
) )
// From wraps config section into Config. // From wraps config section into Config.
@ -112,3 +115,19 @@ func (x *Config) LeafWidth() uint64 {
"leaf_width", "leaf_width",
) )
} }
// InitWorkersCount returns the value of "init_worker_count" config parameter.
//
// Returns InitWorkerCountDefault if the value is not a positive number.
func (x *Config) InitWorkerCount() int {
d := config.IntSafe(
(*config.Config)(x),
"init_worker_count",
)
if d > 0 {
return int(d)
}
return InitWorkerCountDefault
}

View file

@ -123,6 +123,7 @@ FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH=1
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10 FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_WORKER_COUNT=10
### FSTree config ### FSTree config
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE=fstree FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE=fstree
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH=tmp/0/blob FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH=tmp/0/blob

View file

@ -170,7 +170,8 @@
"depth": 1, "depth": 1,
"width": 4, "width": 4,
"opened_cache_capacity": 50, "opened_cache_capacity": 50,
"leaf_width": 10 "leaf_width": 10,
"init_worker_count": 10
}, },
{ {
"type": "fstree", "type": "fstree",

View file

@ -182,6 +182,7 @@ storage:
blobstor: blobstor:
- type: blobovnicza - type: blobovnicza
path: tmp/0/blob/blobovnicza path: tmp/0/blob/blobovnicza
init_worker_count: 10 #count of workers to initialize blobovniczas
- type: fstree - type: fstree
path: tmp/0/blob # blobstor path path: tmp/0/blob # blobstor path

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
var errFailedToChangeExtensionReadOnly = errors.New("failed to change blobovnicza extension: read only mode") var errFailedToChangeExtensionReadOnly = errors.New("failed to change blobovnicza extension: read only mode")
@ -49,48 +50,64 @@ func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
return err return err
} }
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(b.blzInitWorkerCount)
visited := make(map[string]struct{}) visited := make(map[string]struct{})
err = b.iterateExistingDBPaths(ctx, func(p string) (bool, error) { err = b.iterateExistingDBPaths(egCtx, func(p string) (bool, error) {
visited[p] = struct{}{} visited[p] = struct{}{}
shBlz := b.getBlobovniczaWithoutCaching(p) eg.Go(func() error {
_, err := shBlz.Open() shBlz := b.getBlobovniczaWithoutCaching(p)
if err != nil { _, err := shBlz.Open()
return true, err if err != nil {
} return err
defer shBlz.Close() }
defer shBlz.Close()
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil return false, nil
}) })
if err != nil { if err != nil {
_ = eg.Wait()
return err return err
} }
return b.iterateSortedLeaves(ctx, nil, func(p string) (bool, error) { err = b.iterateSortedLeaves(egCtx, nil, func(p string) (bool, error) {
if _, found := visited[p]; found { if _, found := visited[p]; found {
return false, nil return false, nil
} }
shBlz := b.getBlobovniczaWithoutCaching(p) eg.Go(func() error {
_, err := shBlz.Open() shBlz := b.getBlobovniczaWithoutCaching(p)
if err != nil { _, err := shBlz.Open()
return true, err if err != nil {
} return err
defer shBlz.Close() }
defer shBlz.Close()
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p)) b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil return false, nil
}) })
if err != nil {
_ = eg.Wait()
return err
}
return eg.Wait()
} }
func (b *Blobovniczas) openManagers() { func (b *Blobovniczas) openManagers() {
b.commondbManager.Open() //order important b.commondbManager.Open() // order important
b.activeDBManager.Open() b.activeDBManager.Open()
b.dbCache.Open() b.dbCache.Open()
} }
// Close implements common.Storage. // Close implements common.Storage.
func (b *Blobovniczas) Close() error { func (b *Blobovniczas) Close() error {
b.dbCache.Close() //order important b.dbCache.Close() // order important
b.activeDBManager.Close() b.activeDBManager.Close()
b.commondbManager.Close() b.commondbManager.Close()

View file

@ -11,42 +11,44 @@ import (
) )
type cfg struct { type cfg struct {
log *logger.Logger log *logger.Logger
perm fs.FileMode perm fs.FileMode
readOnly bool readOnly bool
rootPath string rootPath string
openedCacheSize int openedCacheSize int
blzShallowDepth uint64 blzShallowDepth uint64
blzShallowWidth uint64 blzShallowWidth uint64
blzLeafWidth uint64 blzLeafWidth uint64
compression *compression.Config compression *compression.Config
blzOpts []blobovnicza.Option blzOpts []blobovnicza.Option
// reportError is the function called when encountering disk errors. reportError func(string, error) // reportError is the function called when encountering disk errors.
reportError func(string, error) metrics Metrics
metrics Metrics waitBeforeDropDB time.Duration
waitBeforeDropDB time.Duration blzInitWorkerCount int
} }
type Option func(*cfg) type Option func(*cfg)
const ( const (
defaultPerm = 0700 defaultPerm = 0o700
defaultOpenedCacheSize = 50 defaultOpenedCacheSize = 50
defaultBlzShallowDepth = 2 defaultBlzShallowDepth = 2
defaultBlzShallowWidth = 16 defaultBlzShallowWidth = 16
defaultWaitBeforeDropDB = 10 * time.Second defaultWaitBeforeDropDB = 10 * time.Second
defaultBlzInitWorkerCount = 5
) )
func initConfig(c *cfg) { func initConfig(c *cfg) {
*c = cfg{ *c = cfg{
log: &logger.Logger{Logger: zap.L()}, log: &logger.Logger{Logger: zap.L()},
perm: defaultPerm, perm: defaultPerm,
openedCacheSize: defaultOpenedCacheSize, openedCacheSize: defaultOpenedCacheSize,
blzShallowDepth: defaultBlzShallowDepth, blzShallowDepth: defaultBlzShallowDepth,
blzShallowWidth: defaultBlzShallowWidth, blzShallowWidth: defaultBlzShallowWidth,
reportError: func(string, error) {}, reportError: func(string, error) {},
metrics: &noopMetrics{}, metrics: &noopMetrics{},
waitBeforeDropDB: defaultWaitBeforeDropDB, waitBeforeDropDB: defaultWaitBeforeDropDB,
blzInitWorkerCount: defaultBlzInitWorkerCount,
} }
} }
@ -116,3 +118,15 @@ func WithWaitBeforeDropDB(t time.Duration) Option {
c.waitBeforeDropDB = t c.waitBeforeDropDB = t
} }
} }
// WithInitWorkerCount sets maximum workers count to init blobovnicza tree.
//
// Negative or zero value means no limit.
func WithInitWorkerCount(v int) Option {
if v <= 0 {
v = -1
}
return func(c *cfg) {
c.blzInitWorkerCount = v
}
}