forked from TrueCloudLab/frostfs-node
[#168] node: Refactor shard opts initialization
Resolve funlen linter for shardOpts method Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
c94372e6f9
commit
dcd39f8fdd
1 changed files with 118 additions and 103 deletions
|
@ -708,118 +708,133 @@ type shardOptsWithID struct {
|
|||
shOpts []shard.Option
|
||||
}
|
||||
|
||||
// nolint: funlen
|
||||
func (c *cfg) shardOpts() []shardOptsWithID {
|
||||
shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards))
|
||||
|
||||
for _, shCfg := range c.EngineCfg.shards {
|
||||
var writeCacheOpts []writecache.Option
|
||||
if wcRead := shCfg.writecacheCfg; wcRead.enabled {
|
||||
writeCacheOpts = append(writeCacheOpts,
|
||||
writecache.WithPath(wcRead.path),
|
||||
writecache.WithMaxBatchSize(wcRead.maxBatchSize),
|
||||
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
|
||||
writecache.WithMaxObjectSize(wcRead.maxObjSize),
|
||||
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
|
||||
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
|
||||
writecache.WithMaxCacheSize(wcRead.sizeLimit),
|
||||
writecache.WithNoSync(wcRead.noSync),
|
||||
writecache.WithLogger(c.log),
|
||||
)
|
||||
}
|
||||
|
||||
var piloramaOpts []pilorama.Option
|
||||
if prRead := shCfg.piloramaCfg; prRead.enabled {
|
||||
piloramaOpts = append(piloramaOpts,
|
||||
pilorama.WithPath(prRead.path),
|
||||
pilorama.WithPerm(prRead.perm),
|
||||
pilorama.WithNoSync(prRead.noSync),
|
||||
pilorama.WithMaxBatchSize(prRead.maxBatchSize),
|
||||
pilorama.WithMaxBatchDelay(prRead.maxBatchDelay),
|
||||
)
|
||||
}
|
||||
|
||||
var ss []blobstor.SubStorage
|
||||
for _, sRead := range shCfg.subStorages {
|
||||
switch sRead.typ {
|
||||
case blobovniczatree.Type:
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
blobovniczatree.WithRootPath(sRead.path),
|
||||
blobovniczatree.WithPermissions(sRead.perm),
|
||||
blobovniczatree.WithBlobovniczaSize(sRead.size),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
|
||||
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
|
||||
|
||||
blobovniczatree.WithLogger(c.log)),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
||||
},
|
||||
})
|
||||
case fstree.Type:
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: fstree.New(
|
||||
fstree.WithPath(sRead.path),
|
||||
fstree.WithPerm(sRead.perm),
|
||||
fstree.WithDepth(sRead.depth),
|
||||
fstree.WithNoSync(sRead.noSync)),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return true
|
||||
},
|
||||
})
|
||||
default:
|
||||
// should never happen, that has already
|
||||
// been handled: when the config was read
|
||||
}
|
||||
}
|
||||
|
||||
var sh shardOptsWithID
|
||||
sh.configID = shCfg.id()
|
||||
sh.shOpts = []shard.Option{
|
||||
shard.WithLogger(c.log),
|
||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||
shard.WithMode(shCfg.mode),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithCompressObjects(shCfg.compress),
|
||||
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
|
||||
blobstor.WithStorages(ss),
|
||||
|
||||
blobstor.WithLogger(c.log),
|
||||
),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(shCfg.metaCfg.path),
|
||||
meta.WithPermissions(shCfg.metaCfg.perm),
|
||||
meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize),
|
||||
meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay),
|
||||
meta.WithBoltDBOptions(&bbolt.Options{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
}),
|
||||
|
||||
meta.WithLogger(c.log),
|
||||
meta.WithEpochState(c.cfgNetmap.state),
|
||||
),
|
||||
shard.WithPiloramaOptions(piloramaOpts...),
|
||||
shard.WithWriteCache(shCfg.writecacheCfg.enabled),
|
||||
shard.WithWriteCacheOptions(writeCacheOpts...),
|
||||
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
|
||||
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
|
||||
shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
|
||||
shard.WithExpiredCollectorWorkersCount(shCfg.gcCfg.expiredCollectorWorkersCount),
|
||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||
pool, err := ants.NewPool(sz)
|
||||
fatalOnErr(err)
|
||||
|
||||
return pool
|
||||
}),
|
||||
}
|
||||
|
||||
shards = append(shards, sh)
|
||||
shards = append(shards, c.getShardOpts(shCfg))
|
||||
}
|
||||
|
||||
return shards
|
||||
}
|
||||
|
||||
func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
|
||||
var writeCacheOpts []writecache.Option
|
||||
if wcRead := shCfg.writecacheCfg; wcRead.enabled {
|
||||
writeCacheOpts = append(writeCacheOpts,
|
||||
writecache.WithPath(wcRead.path),
|
||||
writecache.WithMaxBatchSize(wcRead.maxBatchSize),
|
||||
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
|
||||
writecache.WithMaxObjectSize(wcRead.maxObjSize),
|
||||
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
|
||||
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
|
||||
writecache.WithMaxCacheSize(wcRead.sizeLimit),
|
||||
writecache.WithNoSync(wcRead.noSync),
|
||||
writecache.WithLogger(c.log),
|
||||
)
|
||||
}
|
||||
return writeCacheOpts
|
||||
}
|
||||
|
||||
func (c *cfg) getPiloramaOpts(shCfg shardCfg) []pilorama.Option {
|
||||
var piloramaOpts []pilorama.Option
|
||||
if prRead := shCfg.piloramaCfg; prRead.enabled {
|
||||
piloramaOpts = append(piloramaOpts,
|
||||
pilorama.WithPath(prRead.path),
|
||||
pilorama.WithPerm(prRead.perm),
|
||||
pilorama.WithNoSync(prRead.noSync),
|
||||
pilorama.WithMaxBatchSize(prRead.maxBatchSize),
|
||||
pilorama.WithMaxBatchDelay(prRead.maxBatchDelay),
|
||||
)
|
||||
}
|
||||
return piloramaOpts
|
||||
}
|
||||
|
||||
func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
||||
var ss []blobstor.SubStorage
|
||||
for _, sRead := range shCfg.subStorages {
|
||||
switch sRead.typ {
|
||||
case blobovniczatree.Type:
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||
blobovniczatree.WithRootPath(sRead.path),
|
||||
blobovniczatree.WithPermissions(sRead.perm),
|
||||
blobovniczatree.WithBlobovniczaSize(sRead.size),
|
||||
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
|
||||
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
|
||||
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
|
||||
|
||||
blobovniczatree.WithLogger(c.log)),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
||||
},
|
||||
})
|
||||
case fstree.Type:
|
||||
ss = append(ss, blobstor.SubStorage{
|
||||
Storage: fstree.New(
|
||||
fstree.WithPath(sRead.path),
|
||||
fstree.WithPerm(sRead.perm),
|
||||
fstree.WithDepth(sRead.depth),
|
||||
fstree.WithNoSync(sRead.noSync)),
|
||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||
return true
|
||||
},
|
||||
})
|
||||
default:
|
||||
// should never happen, that has already
|
||||
// been handled: when the config was read
|
||||
}
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
||||
writeCacheOpts := c.getWriteCacheOpts(shCfg)
|
||||
piloramaOpts := c.getPiloramaOpts(shCfg)
|
||||
ss := c.getSubstorageOpts(shCfg)
|
||||
|
||||
var sh shardOptsWithID
|
||||
sh.configID = shCfg.id()
|
||||
sh.shOpts = []shard.Option{
|
||||
shard.WithLogger(c.log),
|
||||
shard.WithRefillMetabase(shCfg.refillMetabase),
|
||||
shard.WithMode(shCfg.mode),
|
||||
shard.WithBlobStorOptions(
|
||||
blobstor.WithCompressObjects(shCfg.compress),
|
||||
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
|
||||
blobstor.WithStorages(ss),
|
||||
|
||||
blobstor.WithLogger(c.log),
|
||||
),
|
||||
shard.WithMetaBaseOptions(
|
||||
meta.WithPath(shCfg.metaCfg.path),
|
||||
meta.WithPermissions(shCfg.metaCfg.perm),
|
||||
meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize),
|
||||
meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay),
|
||||
meta.WithBoltDBOptions(&bbolt.Options{
|
||||
Timeout: 100 * time.Millisecond,
|
||||
}),
|
||||
|
||||
meta.WithLogger(c.log),
|
||||
meta.WithEpochState(c.cfgNetmap.state),
|
||||
),
|
||||
shard.WithPiloramaOptions(piloramaOpts...),
|
||||
shard.WithWriteCache(shCfg.writecacheCfg.enabled),
|
||||
shard.WithWriteCacheOptions(writeCacheOpts...),
|
||||
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
|
||||
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
|
||||
shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
|
||||
shard.WithExpiredCollectorWorkersCount(shCfg.gcCfg.expiredCollectorWorkersCount),
|
||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||
pool, err := ants.NewPool(sz)
|
||||
fatalOnErr(err)
|
||||
|
||||
return pool
|
||||
}),
|
||||
}
|
||||
return sh
|
||||
}
|
||||
|
||||
func (c *cfg) loggerPrm() (*logger.Prm, error) {
|
||||
// check if it has been inited before
|
||||
if c.dynamicConfiguration.logger == nil {
|
||||
|
|
Loading…
Reference in a new issue