From 6ca7f4511c1d9250fee6c384089a03f4e60c38be Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 1 Jun 2021 21:27:15 +0300 Subject: [PATCH] [#493] cmd/node: Use engineconfig pkg for storage engine construction Use `engineconfig.IterateShards` in order to compose options of the shards. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/config.go | 210 +++++++-------------------------------- 1 file changed, 36 insertions(+), 174 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 74cdef6a4..819780f50 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -3,11 +3,9 @@ package main import ( "context" "crypto/ecdsa" - "errors" "net" "os" "path" - "strconv" "strings" "sync" "time" @@ -18,6 +16,8 @@ import ( netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap" crypto "github.com/nspcc-dev/neofs-crypto" "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" + engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine" + shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard" loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger" metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics" "github.com/nspcc-dev/neofs-node/misc" @@ -95,40 +95,6 @@ const ( cfgObjectPutPoolSize = "object.put.pool_size" ) -const ( - cfgLocalStorageSection = "storage" - cfgStorageShardSection = "shard" - - cfgShardUseWriteCache = "use_write_cache" - - cfgBlobStorSection = "blobstor" - cfgWriteCacheSection = "writecache" - cfgWriteCacheMemSize = "mem_size" - cfgWriteCacheDBSize = "db_size" - cfgWriteCacheSmallSize = "small_size" - cfgWriteCacheMaxSize = "max_size" - cfgWriteCacheWrkCount = "workers_count" - cfgBlobStorCompress = "compress" - cfgBlobStorShallowDepth = "shallow_depth" - cfgBlobStorTreePath = "path" - cfgBlobStorTreePerm = "perm" - cfgBlobStorSmallSzLimit = "small_size_limit" - - cfgBlobStorBlzSection = "blobovnicza" - cfgBlzSize = "size" - cfgBlzShallowDepth = "shallow_depth" - cfgBlzShallowWidth = "shallow_width" - cfgBlzOpenedCacheSize = "opened_cache_size" - - cfgMetaBaseSection = "metabase" - cfgMetaBasePath = "path" - cfgMetaBasePerm = "perm" - - cfgGCSection = "gc" - cfgGCRemoverBatchSize = "remover_batch_size" - cfgGCRemoverSleepInt = "remover_sleep_interval" -) - const ( addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding ) @@ -520,119 +486,45 @@ func initLocalStorage(c *cfg) { func initShardOptions(c *cfg) { var opts [][]shard.Option - for i := 0; ; i++ { - prefix := configPath( - cfgLocalStorageSection, - cfgStorageShardSection, - strconv.Itoa(i), - ) + engineconfig.IterateShards(c.appCfg, func(sc *shardconfig.Config) { + var writeCacheOpts []writecache.Option - useCache := c.viper.GetBool( - configPath(prefix, cfgShardUseWriteCache), - ) + useWriteCache := sc.UseWriteCache() + if useWriteCache { + writeCacheCfg := sc.WriteCache() - writeCachePrefix := configPath(prefix, cfgWriteCacheSection) - - writeCachePath := c.viper.GetString( - configPath(writeCachePrefix, cfgBlobStorTreePath), - ) - if useCache && writeCachePath == "" { - c.log.Warn("incorrect writeCache path, ignore shard") - break - } - writeCacheMemSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMemSize)) - writeCacheDBSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheDBSize)) - writeCacheSmallSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheSmallSize)) - writeCacheMaxSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMaxSize)) - writeCacheWrkCount := c.viper.GetInt(configPath(writeCachePrefix, cfgWriteCacheWrkCount)) - - blobPrefix := configPath(prefix, cfgBlobStorSection) - - blobPath := c.viper.GetString( - configPath(blobPrefix, cfgBlobStorTreePath), - ) - if blobPath == "" { - c.log.Warn("incorrect blobStor path, ignore shard") - break + writeCacheOpts = []writecache.Option{ + writecache.WithPath(writeCacheCfg.Path()), + writecache.WithLogger(c.log), + writecache.WithMaxMemSize(writeCacheCfg.MemSize()), + writecache.WithMaxObjectSize(writeCacheCfg.MaxObjectSize()), + writecache.WithSmallObjectSize(writeCacheCfg.SmallObjectSize()), + writecache.WithMaxDBSize(writeCacheCfg.MaxDBSize()), + writecache.WithFlushWorkersCount(writeCacheCfg.WorkersNumber()), + } } - compressObjects := c.viper.GetBool( - configPath(blobPrefix, cfgBlobStorCompress), - ) - - blobPerm := os.FileMode(c.viper.GetInt( - configPath(blobPrefix, cfgBlobStorTreePerm), - )) - - shallowDepth := c.viper.GetInt( - configPath(blobPrefix, cfgBlobStorShallowDepth), - ) - - smallSzLimit := c.viper.GetUint64( - configPath(blobPrefix, cfgBlobStorSmallSzLimit), - ) - if smallSzLimit == 0 { - smallSzLimit = 1 << 20 // 1MB - } - if writeCacheMaxSize <= 0 { - writeCacheSmallSize = smallSzLimit - } - - blzPrefix := configPath(blobPrefix, cfgBlobStorBlzSection) - - blzSize := c.viper.GetUint64( - configPath(blzPrefix, cfgBlzSize), - ) - if blzSize == 0 { - blzSize = 1 << 30 // 1 GB - } - - blzShallowDepth := c.viper.GetUint64( - configPath(blzPrefix, cfgBlzShallowDepth), - ) - - blzShallowWidth := c.viper.GetUint64( - configPath(blzPrefix, cfgBlzShallowWidth), - ) - - blzCacheSize := c.viper.GetInt( - configPath(blzPrefix, cfgBlzOpenedCacheSize), - ) - - metaPrefix := configPath(prefix, cfgMetaBaseSection) - - metaPath := c.viper.GetString( - configPath(metaPrefix, cfgMetaBasePath), - ) - - metaPerm := os.FileMode(c.viper.GetUint32( - configPath(metaPrefix, cfgMetaBasePerm), - )) + blobStorCfg := sc.BlobStor() + blobovniczaCfg := blobStorCfg.Blobovnicza() + metabaseCfg := sc.Metabase() + gcCfg := sc.GC() + metaPath := metabaseCfg.Path() + metaPerm := metabaseCfg.Perm() fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm)) - gcPrefix := configPath(prefix, cfgGCSection) - - rmBatchSize := c.viper.GetInt( - configPath(gcPrefix, cfgGCRemoverBatchSize), - ) - - rmSleepInterval := c.viper.GetDuration( - configPath(gcPrefix, cfgGCRemoverSleepInt), - ) - opts = append(opts, []shard.Option{ shard.WithLogger(c.log), shard.WithBlobStorOptions( - blobstor.WithRootPath(blobPath), - blobstor.WithCompressObjects(compressObjects, c.log), - blobstor.WithRootPerm(blobPerm), - blobstor.WithShallowDepth(shallowDepth), - blobstor.WithSmallSizeLimit(smallSzLimit), - blobstor.WithBlobovniczaSize(blzSize), - blobstor.WithBlobovniczaShallowDepth(blzShallowDepth), - blobstor.WithBlobovniczaShallowWidth(blzShallowWidth), - blobstor.WithBlobovniczaOpenedCacheSize(blzCacheSize), + blobstor.WithRootPath(blobStorCfg.Path()), + blobstor.WithCompressObjects(blobStorCfg.Compress(), c.log), + blobstor.WithRootPerm(blobStorCfg.Perm()), + blobstor.WithShallowDepth(blobStorCfg.ShallowDepth()), + blobstor.WithSmallSizeLimit(blobStorCfg.SmallSizeLimit()), + blobstor.WithBlobovniczaSize(blobovniczaCfg.Size()), + blobstor.WithBlobovniczaShallowDepth(blobovniczaCfg.ShallowDepth()), + blobstor.WithBlobovniczaShallowWidth(blobovniczaCfg.ShallowWidth()), + blobstor.WithBlobovniczaOpenedCacheSize(blobovniczaCfg.OpenedCacheSize()), blobstor.WithLogger(c.log), ), shard.WithMetaBaseOptions( @@ -643,18 +535,10 @@ func initShardOptions(c *cfg) { Timeout: 100 * time.Millisecond, }), ), - shard.WithWriteCache(useCache), - shard.WithWriteCacheOptions( - writecache.WithPath(writeCachePath), - writecache.WithLogger(c.log), - writecache.WithMaxMemSize(writeCacheMemSize), - writecache.WithMaxObjectSize(writeCacheMaxSize), - writecache.WithSmallObjectSize(writeCacheSmallSize), - writecache.WithMaxDBSize(writeCacheDBSize), - writecache.WithFlushWorkersCount(writeCacheWrkCount), - ), - shard.WithRemoverBatchSize(rmBatchSize), - shard.WithGCRemoverSleepInterval(rmSleepInterval), + shard.WithWriteCache(useWriteCache), + shard.WithWriteCacheOptions(writeCacheOpts...), + shard.WithRemoverBatchSize(gcCfg.RemoverBatchSize()), + shard.WithGCRemoverSleepInterval(gcCfg.RemoverSleepInterval()), shard.WithGCWorkerPoolInitializer(func(sz int) util2.WorkerPool { pool, err := ants.NewPool(sz) fatalOnErr(err) @@ -671,33 +555,11 @@ func initShardOptions(c *cfg) { return ch }), }) - - c.log.Info("storage shard options", - zap.Bool("with write cache", useCache), - zap.String("with write cache path", writeCachePath), - zap.String("BLOB path", blobPath), - zap.Stringer("BLOB permissions", blobPerm), - zap.Bool("BLOB compress", compressObjects), - zap.Int("BLOB shallow depth", shallowDepth), - zap.Uint64("BLOB small size limit", smallSzLimit), - zap.String("metabase path", metaPath), - zap.Stringer("metabase permissions", metaPerm), - zap.Int("GC remover batch size", rmBatchSize), - zap.Duration("GC remover sleep interval", rmSleepInterval), - ) - } - - if len(opts) == 0 { - fatalOnErr(errors.New("no correctly set up shards, exit")) - } + }) c.cfgObject.cfgLocalStorage.shardOpts = opts } -func configPath(sections ...string) string { - return strings.Join(sections, ".") -} - func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) { var err error