diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 04dd1bfd..ab615d34 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -708,118 +708,133 @@ type shardOptsWithID struct { shOpts []shard.Option } -// nolint: funlen func (c *cfg) shardOpts() []shardOptsWithID { shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards)) for _, shCfg := range c.EngineCfg.shards { - var writeCacheOpts []writecache.Option - if wcRead := shCfg.writecacheCfg; wcRead.enabled { - writeCacheOpts = append(writeCacheOpts, - writecache.WithPath(wcRead.path), - writecache.WithMaxBatchSize(wcRead.maxBatchSize), - writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), - writecache.WithMaxObjectSize(wcRead.maxObjSize), - writecache.WithSmallObjectSize(wcRead.smallObjectSize), - writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), - writecache.WithMaxCacheSize(wcRead.sizeLimit), - writecache.WithNoSync(wcRead.noSync), - writecache.WithLogger(c.log), - ) - } - - var piloramaOpts []pilorama.Option - if prRead := shCfg.piloramaCfg; prRead.enabled { - piloramaOpts = append(piloramaOpts, - pilorama.WithPath(prRead.path), - pilorama.WithPerm(prRead.perm), - pilorama.WithNoSync(prRead.noSync), - pilorama.WithMaxBatchSize(prRead.maxBatchSize), - pilorama.WithMaxBatchDelay(prRead.maxBatchDelay), - ) - } - - var ss []blobstor.SubStorage - for _, sRead := range shCfg.subStorages { - switch sRead.typ { - case blobovniczatree.Type: - ss = append(ss, blobstor.SubStorage{ - Storage: blobovniczatree.NewBlobovniczaTree( - blobovniczatree.WithRootPath(sRead.path), - blobovniczatree.WithPermissions(sRead.perm), - blobovniczatree.WithBlobovniczaSize(sRead.size), - blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth), - blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), - blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), - - blobovniczatree.WithLogger(c.log)), - Policy: func(_ *objectSDK.Object, data []byte) bool { - return uint64(len(data)) < shCfg.smallSizeObjectLimit - }, - }) - case fstree.Type: - ss = append(ss, blobstor.SubStorage{ - Storage: fstree.New( - fstree.WithPath(sRead.path), - fstree.WithPerm(sRead.perm), - fstree.WithDepth(sRead.depth), - fstree.WithNoSync(sRead.noSync)), - Policy: func(_ *objectSDK.Object, data []byte) bool { - return true - }, - }) - default: - // should never happen, that has already - // been handled: when the config was read - } - } - - var sh shardOptsWithID - sh.configID = shCfg.id() - sh.shOpts = []shard.Option{ - shard.WithLogger(c.log), - shard.WithRefillMetabase(shCfg.refillMetabase), - shard.WithMode(shCfg.mode), - shard.WithBlobStorOptions( - blobstor.WithCompressObjects(shCfg.compress), - blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType), - blobstor.WithStorages(ss), - - blobstor.WithLogger(c.log), - ), - shard.WithMetaBaseOptions( - meta.WithPath(shCfg.metaCfg.path), - meta.WithPermissions(shCfg.metaCfg.perm), - meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize), - meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay), - meta.WithBoltDBOptions(&bbolt.Options{ - Timeout: 100 * time.Millisecond, - }), - - meta.WithLogger(c.log), - meta.WithEpochState(c.cfgNetmap.state), - ), - shard.WithPiloramaOptions(piloramaOpts...), - shard.WithWriteCache(shCfg.writecacheCfg.enabled), - shard.WithWriteCacheOptions(writeCacheOpts...), - shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize), - shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval), - shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize), - shard.WithExpiredCollectorWorkersCount(shCfg.gcCfg.expiredCollectorWorkersCount), - shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { - pool, err := ants.NewPool(sz) - fatalOnErr(err) - - return pool - }), - } - - shards = append(shards, sh) + shards = append(shards, c.getShardOpts(shCfg)) } return shards } +func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { + var writeCacheOpts []writecache.Option + if wcRead := shCfg.writecacheCfg; wcRead.enabled { + writeCacheOpts = append(writeCacheOpts, + writecache.WithPath(wcRead.path), + writecache.WithMaxBatchSize(wcRead.maxBatchSize), + writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), + writecache.WithMaxObjectSize(wcRead.maxObjSize), + writecache.WithSmallObjectSize(wcRead.smallObjectSize), + writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), + writecache.WithMaxCacheSize(wcRead.sizeLimit), + writecache.WithNoSync(wcRead.noSync), + writecache.WithLogger(c.log), + ) + } + return writeCacheOpts +} + +func (c *cfg) getPiloramaOpts(shCfg shardCfg) []pilorama.Option { + var piloramaOpts []pilorama.Option + if prRead := shCfg.piloramaCfg; prRead.enabled { + piloramaOpts = append(piloramaOpts, + pilorama.WithPath(prRead.path), + pilorama.WithPerm(prRead.perm), + pilorama.WithNoSync(prRead.noSync), + pilorama.WithMaxBatchSize(prRead.maxBatchSize), + pilorama.WithMaxBatchDelay(prRead.maxBatchDelay), + ) + } + return piloramaOpts +} + +func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage { + var ss []blobstor.SubStorage + for _, sRead := range shCfg.subStorages { + switch sRead.typ { + case blobovniczatree.Type: + ss = append(ss, blobstor.SubStorage{ + Storage: blobovniczatree.NewBlobovniczaTree( + blobovniczatree.WithRootPath(sRead.path), + blobovniczatree.WithPermissions(sRead.perm), + blobovniczatree.WithBlobovniczaSize(sRead.size), + blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth), + blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), + blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), + + blobovniczatree.WithLogger(c.log)), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return uint64(len(data)) < shCfg.smallSizeObjectLimit + }, + }) + case fstree.Type: + ss = append(ss, blobstor.SubStorage{ + Storage: fstree.New( + fstree.WithPath(sRead.path), + fstree.WithPerm(sRead.perm), + fstree.WithDepth(sRead.depth), + fstree.WithNoSync(sRead.noSync)), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return true + }, + }) + default: + // should never happen, that has already + // been handled: when the config was read + } + } + return ss +} + +func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID { + writeCacheOpts := c.getWriteCacheOpts(shCfg) + piloramaOpts := c.getPiloramaOpts(shCfg) + ss := c.getSubstorageOpts(shCfg) + + var sh shardOptsWithID + sh.configID = shCfg.id() + sh.shOpts = []shard.Option{ + shard.WithLogger(c.log), + shard.WithRefillMetabase(shCfg.refillMetabase), + shard.WithMode(shCfg.mode), + shard.WithBlobStorOptions( + blobstor.WithCompressObjects(shCfg.compress), + blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType), + blobstor.WithStorages(ss), + + blobstor.WithLogger(c.log), + ), + shard.WithMetaBaseOptions( + meta.WithPath(shCfg.metaCfg.path), + meta.WithPermissions(shCfg.metaCfg.perm), + meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize), + meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay), + meta.WithBoltDBOptions(&bbolt.Options{ + Timeout: 100 * time.Millisecond, + }), + + meta.WithLogger(c.log), + meta.WithEpochState(c.cfgNetmap.state), + ), + shard.WithPiloramaOptions(piloramaOpts...), + shard.WithWriteCache(shCfg.writecacheCfg.enabled), + shard.WithWriteCacheOptions(writeCacheOpts...), + shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize), + shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval), + shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize), + shard.WithExpiredCollectorWorkersCount(shCfg.gcCfg.expiredCollectorWorkersCount), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + fatalOnErr(err) + + return pool + }), + } + return sh +} + func (c *cfg) loggerPrm() (*logger.Prm, error) { // check if it has been inited before if c.dynamicConfiguration.logger == nil {