Blobovnicza tree rebuild #812
8 changed files with 102 additions and 44 deletions
|
@ -181,6 +181,7 @@ type subStorageCfg struct {
|
|||
width uint64
|
||||
leafWidth uint64
|
||||
openedCacheSize int
|
||||
initWorkerCount int
|
||||
}
|
||||
|
||||
// readConfig fills applicationConfiguration with raw configuration values
|
||||
|
@ -298,6 +299,7 @@ func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, ol
|
|||
sCfg.width = sub.ShallowWidth()
|
||||
sCfg.leafWidth = sub.LeafWidth()
|
||||
sCfg.openedCacheSize = sub.OpenedCacheSize()
|
||||
sCfg.initWorkerCount = sub.InitWorkerCount()
|
||||
case fstree.Type:
|
||||
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
|
||||
sCfg.depth = sub.Depth()
|
||||
|
@ -796,6 +798,7 @@ func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|||
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
|
||||
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
|
||||
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
|
||||
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
|
||||
blobovniczatree.WithLogger(c.log),
|
||||
}
|
||||
|
||||
|
|
|
@ -97,6 +97,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.EqualValues(t, 4, blz.ShallowWidth())
|
||||
require.EqualValues(t, 50, blz.OpenedCacheSize())
|
||||
require.EqualValues(t, 10, blz.LeafWidth())
|
||||
require.EqualValues(t, 10, blz.InitWorkerCount())
|
||||
|
||||
require.Equal(t, "tmp/0/blob", ss[1].Path())
|
||||
require.EqualValues(t, 0o644, ss[1].Perm())
|
||||
|
@ -146,6 +147,7 @@ func TestEngineSection(t *testing.T) {
|
|||
require.EqualValues(t, 4, blz.ShallowWidth())
|
||||
require.EqualValues(t, 50, blz.OpenedCacheSize())
|
||||
require.EqualValues(t, 10, blz.LeafWidth())
|
||||
require.EqualValues(t, blobovniczaconfig.InitWorkerCountDefault, blz.InitWorkerCount())
|
||||
|
||||
require.Equal(t, "tmp/1/blob", ss[1].Path())
|
||||
require.EqualValues(t, 0o644, ss[1].Perm())
|
||||
|
|
|
@ -22,6 +22,9 @@ const (
|
|||
|
||||
// OpenedCacheSizeDefault is a default cache size of opened Blobovnicza's.
|
||||
OpenedCacheSizeDefault = 16
|
||||
|
||||
// InitWorkerCountDefault is a default workers count to initialize Blobovnicza's.
|
||||
InitWorkerCountDefault = 5
|
||||
)
|
||||
|
||||
// From wraps config section into Config.
|
||||
|
@ -112,3 +115,19 @@ func (x *Config) LeafWidth() uint64 {
|
|||
"leaf_width",
|
||||
)
|
||||
}
|
||||
|
||||
// InitWorkerCount returns the value of "init_worker_count" config parameter.
|
||||
//
|
||||
// Returns InitWorkerCountDefault if the value is not a positive number.
|
||||
func (x *Config) InitWorkerCount() int {
|
||||
d := config.IntSafe(
|
||||
(*config.Config)(x),
|
||||
"init_worker_count",
|
||||
)
|
||||
|
||||
if d > 0 {
|
||||
return int(d)
|
||||
}
|
||||
|
||||
return InitWorkerCountDefault
|
||||
}
|
||||
|
|
|
@ -125,6 +125,7 @@ FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_DEPTH=1
|
|||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_WIDTH=4
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_OPENED_CACHE_CAPACITY=50
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_LEAF_WIDTH=10
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_0_INIT_WORKER_COUNT=10
|
||||
### FSTree config
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_TYPE=fstree
|
||||
FROSTFS_STORAGE_SHARD_0_BLOBSTOR_1_PATH=tmp/0/blob
|
||||
|
|
|
@ -172,7 +172,8 @@
|
|||
"depth": 1,
|
||||
"width": 4,
|
||||
"opened_cache_capacity": 50,
|
||||
"leaf_width": 10
|
||||
"leaf_width": 10,
|
||||
"init_worker_count": 10
|
||||
},
|
||||
{
|
||||
"type": "fstree",
|
||||
|
|
|
@ -184,6 +184,7 @@ storage:
|
|||
blobstor:
|
||||
- type: blobovnicza
|
||||
path: tmp/0/blob/blobovnicza
|
||||
init_worker_count: 10 #count of workers to initialize blobovniczas
|
||||
- type: fstree
|
||||
path: tmp/0/blob # blobstor path
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var errFailedToChangeExtensionReadOnly = errors.New("failed to change blobovnicza extension: read only mode")
|
||||
|
@ -49,37 +50,53 @@ func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
eg.SetLimit(b.blzInitWorkerCount)
|
||||
visited := make(map[string]struct{})
|
||||
err = b.iterateExistingDBPaths(ctx, func(p string) (bool, error) {
|
||||
err = b.iterateExistingDBPaths(egCtx, func(p string) (bool, error) {
|
||||
visited[p] = struct{}{}
|
||||
eg.Go(func() error {
|
||||
shBlz := b.getBlobovniczaWithoutCaching(p)
|
||||
_, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
return b.iterateSortedLeaves(ctx, nil, func(p string) (bool, error) {
|
||||
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
|
||||
return nil
|
||||
})
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
_ = eg.Wait()
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.iterateSortedLeaves(egCtx, nil, func(p string) (bool, error) {
|
||||
if _, found := visited[p]; found {
|
||||
return false, nil
|
||||
}
|
||||
eg.Go(func() error {
|
||||
shBlz := b.getBlobovniczaWithoutCaching(p)
|
||||
_, err := shBlz.Open()
|
||||
if err != nil {
|
||||
return true, err
|
||||
return err
|
||||
}
|
||||
defer shBlz.Close()
|
||||
|
||||
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
|
||||
return nil
|
||||
})
|
||||
return false, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
_ = eg.Wait()
|
||||
return err
|
||||
}
|
||||
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (b *Blobovniczas) openManagers() {
|
||||
|
|
|
@ -21,10 +21,10 @@ type cfg struct {
|
|||
blzLeafWidth uint64
|
||||
compression *compression.Config
|
||||
blzOpts []blobovnicza.Option
|
||||
// reportError is the function called when encountering disk errors.
|
||||
reportError func(string, error)
|
||||
reportError func(string, error) // reportError is the function called when encountering disk errors.
|
||||
metrics Metrics
|
||||
waitBeforeDropDB time.Duration
|
||||
blzInitWorkerCount int
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
@ -35,6 +35,7 @@ const (
|
|||
defaultBlzShallowDepth = 2
|
||||
defaultBlzShallowWidth = 16
|
||||
defaultWaitBeforeDropDB = 10 * time.Second
|
||||
defaultBlzInitWorkerCount = 5
|
||||
)
|
||||
|
||||
func initConfig(c *cfg) {
|
||||
|
@ -47,6 +48,7 @@ func initConfig(c *cfg) {
|
|||
reportError: func(string, error) {},
|
||||
metrics: &noopMetrics{},
|
||||
waitBeforeDropDB: defaultWaitBeforeDropDB,
|
||||
blzInitWorkerCount: defaultBlzInitWorkerCount,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,3 +118,15 @@ func WithWaitBeforeDropDB(t time.Duration) Option {
|
|||
c.waitBeforeDropDB = t
|
||||
}
|
||||
}
|
||||
|
||||
// WithInitWorkerCount sets maximum workers count to init blobovnicza tree.
|
||||
//
|
||||
// Negative or zero value means no limit.
|
||||
func WithInitWorkerCount(v int) Option {
|
||||
if v <= 0 {
|
||||
v = -1
|
||||
}
|
||||
return func(c *cfg) {
|
||||
c.blzInitWorkerCount = v
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue