[#493] cmd/node: Use engineconfig pkg for storage engine construction

Use `engineconfig.IterateShards` in order to compose options of the shards.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-06-01 21:27:15 +03:00 committed by Leonard Lyubich
parent e149eae7b2
commit 6ca7f4511c

View file

@ -3,11 +3,9 @@ package main
import (
"context"
"crypto/ecdsa"
"errors"
"net"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
@ -18,6 +16,8 @@ import (
netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine"
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics"
"github.com/nspcc-dev/neofs-node/misc"
@ -95,40 +95,6 @@ const (
cfgObjectPutPoolSize = "object.put.pool_size"
)
const (
cfgLocalStorageSection = "storage"
cfgStorageShardSection = "shard"
cfgShardUseWriteCache = "use_write_cache"
cfgBlobStorSection = "blobstor"
cfgWriteCacheSection = "writecache"
cfgWriteCacheMemSize = "mem_size"
cfgWriteCacheDBSize = "db_size"
cfgWriteCacheSmallSize = "small_size"
cfgWriteCacheMaxSize = "max_size"
cfgWriteCacheWrkCount = "workers_count"
cfgBlobStorCompress = "compress"
cfgBlobStorShallowDepth = "shallow_depth"
cfgBlobStorTreePath = "path"
cfgBlobStorTreePerm = "perm"
cfgBlobStorSmallSzLimit = "small_size_limit"
cfgBlobStorBlzSection = "blobovnicza"
cfgBlzSize = "size"
cfgBlzShallowDepth = "shallow_depth"
cfgBlzShallowWidth = "shallow_width"
cfgBlzOpenedCacheSize = "opened_cache_size"
cfgMetaBaseSection = "metabase"
cfgMetaBasePath = "path"
cfgMetaBasePerm = "perm"
cfgGCSection = "gc"
cfgGCRemoverBatchSize = "remover_batch_size"
cfgGCRemoverSleepInt = "remover_sleep_interval"
)
const (
addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding
)
@ -520,119 +486,45 @@ func initLocalStorage(c *cfg) {
func initShardOptions(c *cfg) {
var opts [][]shard.Option
for i := 0; ; i++ {
prefix := configPath(
cfgLocalStorageSection,
cfgStorageShardSection,
strconv.Itoa(i),
)
engineconfig.IterateShards(c.appCfg, func(sc *shardconfig.Config) {
var writeCacheOpts []writecache.Option
useCache := c.viper.GetBool(
configPath(prefix, cfgShardUseWriteCache),
)
useWriteCache := sc.UseWriteCache()
if useWriteCache {
writeCacheCfg := sc.WriteCache()
writeCachePrefix := configPath(prefix, cfgWriteCacheSection)
writeCachePath := c.viper.GetString(
configPath(writeCachePrefix, cfgBlobStorTreePath),
)
if useCache && writeCachePath == "" {
c.log.Warn("incorrect writeCache path, ignore shard")
break
}
writeCacheMemSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMemSize))
writeCacheDBSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheDBSize))
writeCacheSmallSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheSmallSize))
writeCacheMaxSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMaxSize))
writeCacheWrkCount := c.viper.GetInt(configPath(writeCachePrefix, cfgWriteCacheWrkCount))
blobPrefix := configPath(prefix, cfgBlobStorSection)
blobPath := c.viper.GetString(
configPath(blobPrefix, cfgBlobStorTreePath),
)
if blobPath == "" {
c.log.Warn("incorrect blobStor path, ignore shard")
break
writeCacheOpts = []writecache.Option{
writecache.WithPath(writeCacheCfg.Path()),
writecache.WithLogger(c.log),
writecache.WithMaxMemSize(writeCacheCfg.MemSize()),
writecache.WithMaxObjectSize(writeCacheCfg.MaxObjectSize()),
writecache.WithSmallObjectSize(writeCacheCfg.SmallObjectSize()),
writecache.WithMaxDBSize(writeCacheCfg.MaxDBSize()),
writecache.WithFlushWorkersCount(writeCacheCfg.WorkersNumber()),
}
}
compressObjects := c.viper.GetBool(
configPath(blobPrefix, cfgBlobStorCompress),
)
blobPerm := os.FileMode(c.viper.GetInt(
configPath(blobPrefix, cfgBlobStorTreePerm),
))
shallowDepth := c.viper.GetInt(
configPath(blobPrefix, cfgBlobStorShallowDepth),
)
smallSzLimit := c.viper.GetUint64(
configPath(blobPrefix, cfgBlobStorSmallSzLimit),
)
if smallSzLimit == 0 {
smallSzLimit = 1 << 20 // 1MB
}
if writeCacheMaxSize <= 0 {
writeCacheSmallSize = smallSzLimit
}
blzPrefix := configPath(blobPrefix, cfgBlobStorBlzSection)
blzSize := c.viper.GetUint64(
configPath(blzPrefix, cfgBlzSize),
)
if blzSize == 0 {
blzSize = 1 << 30 // 1 GB
}
blzShallowDepth := c.viper.GetUint64(
configPath(blzPrefix, cfgBlzShallowDepth),
)
blzShallowWidth := c.viper.GetUint64(
configPath(blzPrefix, cfgBlzShallowWidth),
)
blzCacheSize := c.viper.GetInt(
configPath(blzPrefix, cfgBlzOpenedCacheSize),
)
metaPrefix := configPath(prefix, cfgMetaBaseSection)
metaPath := c.viper.GetString(
configPath(metaPrefix, cfgMetaBasePath),
)
metaPerm := os.FileMode(c.viper.GetUint32(
configPath(metaPrefix, cfgMetaBasePerm),
))
blobStorCfg := sc.BlobStor()
blobovniczaCfg := blobStorCfg.Blobovnicza()
metabaseCfg := sc.Metabase()
gcCfg := sc.GC()
metaPath := metabaseCfg.Path()
metaPerm := metabaseCfg.Perm()
fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm))
gcPrefix := configPath(prefix, cfgGCSection)
rmBatchSize := c.viper.GetInt(
configPath(gcPrefix, cfgGCRemoverBatchSize),
)
rmSleepInterval := c.viper.GetDuration(
configPath(gcPrefix, cfgGCRemoverSleepInt),
)
opts = append(opts, []shard.Option{
shard.WithLogger(c.log),
shard.WithBlobStorOptions(
blobstor.WithRootPath(blobPath),
blobstor.WithCompressObjects(compressObjects, c.log),
blobstor.WithRootPerm(blobPerm),
blobstor.WithShallowDepth(shallowDepth),
blobstor.WithSmallSizeLimit(smallSzLimit),
blobstor.WithBlobovniczaSize(blzSize),
blobstor.WithBlobovniczaShallowDepth(blzShallowDepth),
blobstor.WithBlobovniczaShallowWidth(blzShallowWidth),
blobstor.WithBlobovniczaOpenedCacheSize(blzCacheSize),
blobstor.WithRootPath(blobStorCfg.Path()),
blobstor.WithCompressObjects(blobStorCfg.Compress(), c.log),
blobstor.WithRootPerm(blobStorCfg.Perm()),
blobstor.WithShallowDepth(blobStorCfg.ShallowDepth()),
blobstor.WithSmallSizeLimit(blobStorCfg.SmallSizeLimit()),
blobstor.WithBlobovniczaSize(blobovniczaCfg.Size()),
blobstor.WithBlobovniczaShallowDepth(blobovniczaCfg.ShallowDepth()),
blobstor.WithBlobovniczaShallowWidth(blobovniczaCfg.ShallowWidth()),
blobstor.WithBlobovniczaOpenedCacheSize(blobovniczaCfg.OpenedCacheSize()),
blobstor.WithLogger(c.log),
),
shard.WithMetaBaseOptions(
@ -643,18 +535,10 @@ func initShardOptions(c *cfg) {
Timeout: 100 * time.Millisecond,
}),
),
shard.WithWriteCache(useCache),
shard.WithWriteCacheOptions(
writecache.WithPath(writeCachePath),
writecache.WithLogger(c.log),
writecache.WithMaxMemSize(writeCacheMemSize),
writecache.WithMaxObjectSize(writeCacheMaxSize),
writecache.WithSmallObjectSize(writeCacheSmallSize),
writecache.WithMaxDBSize(writeCacheDBSize),
writecache.WithFlushWorkersCount(writeCacheWrkCount),
),
shard.WithRemoverBatchSize(rmBatchSize),
shard.WithGCRemoverSleepInterval(rmSleepInterval),
shard.WithWriteCache(useWriteCache),
shard.WithWriteCacheOptions(writeCacheOpts...),
shard.WithRemoverBatchSize(gcCfg.RemoverBatchSize()),
shard.WithGCRemoverSleepInterval(gcCfg.RemoverSleepInterval()),
shard.WithGCWorkerPoolInitializer(func(sz int) util2.WorkerPool {
pool, err := ants.NewPool(sz)
fatalOnErr(err)
@ -671,33 +555,11 @@ func initShardOptions(c *cfg) {
return ch
}),
})
c.log.Info("storage shard options",
zap.Bool("with write cache", useCache),
zap.String("with write cache path", writeCachePath),
zap.String("BLOB path", blobPath),
zap.Stringer("BLOB permissions", blobPerm),
zap.Bool("BLOB compress", compressObjects),
zap.Int("BLOB shallow depth", shallowDepth),
zap.Uint64("BLOB small size limit", smallSzLimit),
zap.String("metabase path", metaPath),
zap.Stringer("metabase permissions", metaPerm),
zap.Int("GC remover batch size", rmBatchSize),
zap.Duration("GC remover sleep interval", rmSleepInterval),
)
}
if len(opts) == 0 {
fatalOnErr(errors.New("no correctly set up shards, exit"))
}
})
c.cfgObject.cfgLocalStorage.shardOpts = opts
}
func configPath(sections ...string) string {
return strings.Join(sections, ".")
}
func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) {
var err error