[#1770] node: Read storage config in a separate struct

It will allow rereading config values and will simplify distinguishing them
from the custom values in the `cfg` structure.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2022-09-26 21:31:45 +03:00 committed by fyrchik
parent 5c69e19016
commit 91b56ad3e8

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"net"
"sync"
atomicstd "sync/atomic"
@ -32,6 +33,7 @@ import (
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
shardmode "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
@ -71,7 +73,175 @@ const maxMsgSize = 4 << 20 // transport msg limit 4 MiB
// for each contract listener.
const notificationHandlerPoolSize = 10
// applicationConfiguration reads and stores component-specific configuration
// values. It should not store any application helpers structs (pointers to shared
// structs).
// It must not be used concurrently.
type applicationConfiguration struct {
EngineCfg struct {
errorThreshold uint32
shardPoolSize uint32
shards []shardCfg
}
}
type shardCfg struct {
compress bool
smallSizeObjectLimit uint64
uncompressableContentType []string
refillMetabase bool
mode shardmode.Mode
metaCfg struct {
path string
perm fs.FileMode
maxBatchSize int
maxBatchDelay time.Duration
}
subStorages []subStorageCfg
gcCfg struct {
removerBatchSize int
removerSleepInterval time.Duration
}
writecacheCfg struct {
enabled bool
path string
maxBatchSize int
maxBatchDelay time.Duration
smallObjectSize uint64
maxObjSize uint64
flushWorkerCount int
maxCacheSize uint64
sizeLimit uint64
}
piloramaCfg struct {
enabled bool
path string
perm fs.FileMode
noSync bool
maxBatchSize int
maxBatchDelay time.Duration
}
}
type subStorageCfg struct {
// common for all storages
typ string
path string
perm fs.FileMode
depth uint64
// blobovnicza-specific
size uint64
width uint64
openedCacheSize int
}
// readConfig fills applicationConfiguration with raw configuration values
// not modifying them.
func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error {
var sh shardCfg
sh.refillMetabase = sc.RefillMetabase()
sh.mode = sc.Mode()
sh.compress = sc.Compress()
sh.uncompressableContentType = sc.UncompressableContentTypes()
sh.smallSizeObjectLimit = sc.SmallSizeLimit()
// write-cache
writeCacheCfg := sc.WriteCache()
if writeCacheCfg.Enabled() {
wc := &sh.writecacheCfg
wc.enabled = true
wc.path = writeCacheCfg.Path()
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
wc.maxCacheSize = writeCacheCfg.MaxObjectSize()
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkersNumber()
wc.sizeLimit = writeCacheCfg.SizeLimit()
}
// blobstor with substorages
blobStorCfg := sc.BlobStor()
storagesCfg := blobStorCfg.Storages()
metabaseCfg := sc.Metabase()
gcCfg := sc.GC()
if config.BoolSafe(c.Sub("tree"), "enabled") {
piloramaCfg := sc.Pilorama()
pr := &sh.piloramaCfg
pr.enabled = true
pr.path = piloramaCfg.Path()
pr.perm = piloramaCfg.Perm()
pr.noSync = piloramaCfg.NoSync()
pr.maxBatchSize = piloramaCfg.MaxBatchSize()
pr.maxBatchDelay = piloramaCfg.MaxBatchDelay()
}
ss := make([]subStorageCfg, 0, len(storagesCfg))
for i := range storagesCfg {
var sCfg subStorageCfg
sCfg.typ = storagesCfg[i].Type()
sCfg.path = storagesCfg[i].Path()
sCfg.perm = storagesCfg[i].Perm()
switch storagesCfg[i].Type() {
case blobovniczatree.Type:
sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i]))
sCfg.size = sub.Size()
sCfg.depth = sub.ShallowDepth()
sCfg.width = sub.ShallowWidth()
sCfg.openedCacheSize = sub.OpenedCacheSize()
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth()
default:
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type())
}
ss = append(ss, sCfg)
}
sh.subStorages = ss
// meta
m := &sh.metaCfg
m.path = metabaseCfg.Path()
m.perm = metabaseCfg.BoltDB().Perm()
m.maxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay()
m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize()
// GC
sh.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
sh.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
a.EngineCfg.shards = append(a.EngineCfg.shards, sh)
return nil
})
}
type cfg struct {
applicationConfiguration
ctx context.Context
appCfg *config.Config
@ -231,8 +401,6 @@ type cfgNotifications struct {
type cfgLocalStorage struct {
localStorage *engine.StorageEngine
shardOpts [][]shard.Option
}
type cfgObjectRoutines struct {
@ -346,6 +514,12 @@ func initCfg(appCfg *config.Config) *cfg {
persistate: persistate,
}
// returned err must be nil during first time read
err = c.readConfig(appCfg)
if err != nil {
panic(fmt.Errorf("config reading: %w", err))
}
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
if metricsconfig.Enabled(c.appCfg) {
@ -359,23 +533,133 @@ func initCfg(appCfg *config.Config) *cfg {
return c
}
func (c *cfg) engineOpts() []engine.Option {
opts := make([]engine.Option, 0, 4)
opts = append(opts,
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log),
)
if c.metricsCollector != nil {
opts = append(opts, engine.WithMetrics(c.metricsCollector))
}
return opts
}
func (c *cfg) shardOpts() [][]shard.Option {
oo := make([][]shard.Option, 0, len(c.EngineCfg.shards))
for _, shCfg := range c.EngineCfg.shards {
var writeCacheOpts []writecache.Option
if wcRead := shCfg.writecacheCfg; wcRead.enabled {
writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.path),
writecache.WithMaxBatchSize(wcRead.maxBatchSize),
writecache.WithMaxBatchDelay(wcRead.maxBatchDelay),
writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithSmallObjectSize(wcRead.smallObjectSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit),
writecache.WithLogger(c.log),
)
}
var piloramaOpts []pilorama.Option
if prRead := shCfg.piloramaCfg; prRead.enabled {
piloramaOpts = append(piloramaOpts,
pilorama.WithPath(prRead.path),
pilorama.WithPerm(prRead.perm),
pilorama.WithNoSync(prRead.noSync),
pilorama.WithMaxBatchSize(prRead.maxBatchSize),
pilorama.WithMaxBatchDelay(prRead.maxBatchDelay),
)
}
var ss []blobstor.SubStorage
for _, sRead := range shCfg.subStorages {
switch sRead.typ {
case blobovniczatree.Type:
ss = append(ss, blobstor.SubStorage{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithRootPath(sRead.path),
blobovniczatree.WithPermissions(sRead.perm),
blobovniczatree.WithBlobovniczaSize(sRead.size),
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
blobovniczatree.WithLogger(c.log)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
case fstree.Type:
ss = append(ss, blobstor.SubStorage{
Storage: fstree.New(
fstree.WithPath(sRead.path),
fstree.WithPerm(sRead.perm),
fstree.WithDepth(sRead.depth)),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return true
},
})
default:
// should never happen, that has already
// been handled: when the config was read
}
}
oo = append(oo, []shard.Option{
shard.WithLogger(c.log),
shard.WithRefillMetabase(shCfg.refillMetabase),
shard.WithMode(shCfg.mode),
shard.WithBlobStorOptions(
blobstor.WithCompressObjects(shCfg.compress),
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
blobstor.WithStorages(ss),
blobstor.WithLogger(c.log),
),
shard.WithMetaBaseOptions(
meta.WithPath(shCfg.metaCfg.path),
meta.WithPermissions(shCfg.metaCfg.perm),
meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize),
meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay),
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
}),
meta.WithLogger(c.log),
meta.WithEpochState(c.cfgNetmap.state),
),
shard.WithPiloramaOptions(piloramaOpts...),
shard.WithWriteCache(shCfg.writecacheCfg.enabled),
shard.WithWriteCacheOptions(writeCacheOpts...),
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
fatalOnErr(err)
return pool
}),
})
}
return oo
}
func (c *cfg) LocalAddress() network.AddressGroup {
return c.localAddr
}
func initLocalStorage(c *cfg) {
initShardOptions(c)
engineOpts := []engine.Option{
engine.WithLogger(c.log),
engine.WithShardPoolSize(engineconfig.ShardPoolSize(c.appCfg)),
engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c.appCfg)),
}
if c.metricsCollector != nil {
engineOpts = append(engineOpts, engine.WithMetrics(c.metricsCollector))
}
ls := engine.New(engineOpts...)
ls := engine.New(c.engineOpts()...)
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
@ -394,7 +678,7 @@ func initLocalStorage(c *cfg) {
tombstone.WithTombstoneSource(tombstoneSrc),
)
for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts {
for _, opts := range c.shardOpts() {
id, err := ls.AddShard(append(opts, shard.WithTombstoneSource(tombstoneSource))...)
fatalOnErr(err)
@ -419,125 +703,6 @@ func initLocalStorage(c *cfg) {
})
}
func initShardOptions(c *cfg) {
var opts [][]shard.Option
require := !nodeconfig.Relay(c.appCfg) // relay node does not require shards
err := engineconfig.IterateShards(c.appCfg, require, func(sc *shardconfig.Config) error {
var writeCacheOpts []writecache.Option
writeCacheCfg := sc.WriteCache()
if writeCacheCfg.Enabled() {
writeCacheOpts = []writecache.Option{
writecache.WithPath(writeCacheCfg.Path()),
writecache.WithLogger(c.log),
writecache.WithMaxBatchSize(writeCacheCfg.BoltDB().MaxBatchSize()),
writecache.WithMaxBatchDelay(writeCacheCfg.BoltDB().MaxBatchDelay()),
writecache.WithMaxObjectSize(writeCacheCfg.MaxObjectSize()),
writecache.WithSmallObjectSize(writeCacheCfg.SmallObjectSize()),
writecache.WithFlushWorkersCount(writeCacheCfg.WorkersNumber()),
writecache.WithMaxCacheSize(writeCacheCfg.SizeLimit()),
}
}
blobStorCfg := sc.BlobStor()
storages := blobStorCfg.Storages()
metabaseCfg := sc.Metabase()
gcCfg := sc.GC()
var piloramaOpts []pilorama.Option
piloramaCfg := sc.Pilorama()
if config.BoolSafe(c.appCfg.Sub("tree"), "enabled") {
piloramaOpts = []pilorama.Option{
pilorama.WithPath(piloramaCfg.Path()),
pilorama.WithPerm(piloramaCfg.Perm()),
pilorama.WithNoSync(piloramaCfg.NoSync()),
pilorama.WithMaxBatchSize(piloramaCfg.MaxBatchSize()),
pilorama.WithMaxBatchDelay(piloramaCfg.MaxBatchDelay())}
}
var st []blobstor.SubStorage
for i := range storages {
switch storages[i].Type() {
case blobovniczatree.Type:
sub := blobovniczaconfig.From((*config.Config)(storages[i]))
lim := sc.SmallSizeLimit()
st = append(st, blobstor.SubStorage{
Storage: blobovniczatree.NewBlobovniczaTree(
blobovniczatree.WithLogger(c.log),
blobovniczatree.WithRootPath(storages[i].Path()),
blobovniczatree.WithPermissions(storages[i].Perm()),
blobovniczatree.WithBlobovniczaSize(sub.Size()),
blobovniczatree.WithBlobovniczaShallowDepth(sub.ShallowDepth()),
blobovniczatree.WithBlobovniczaShallowWidth(sub.ShallowWidth()),
blobovniczatree.WithOpenedCacheSize(sub.OpenedCacheSize())),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < lim
},
})
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storages[i]))
st = append(st, blobstor.SubStorage{
Storage: fstree.New(
fstree.WithPath(storages[i].Path()),
fstree.WithPerm(storages[i].Perm()),
fstree.WithDepth(sub.Depth())),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return true
},
})
default:
return fmt.Errorf("invalid storage type: %s", storages[i].Type())
}
}
metaPath := metabaseCfg.Path()
metaPerm := metabaseCfg.BoltDB().Perm()
opts = append(opts, []shard.Option{
shard.WithLogger(c.log),
shard.WithRefillMetabase(sc.RefillMetabase()),
shard.WithMode(sc.Mode()),
shard.WithBlobStorOptions(
blobstor.WithCompressObjects(sc.Compress()),
blobstor.WithUncompressableContentTypes(sc.UncompressableContentTypes()),
blobstor.WithStorages(st),
blobstor.WithLogger(c.log),
),
shard.WithMetaBaseOptions(
meta.WithLogger(c.log),
meta.WithPath(metaPath),
meta.WithPermissions(metaPerm),
meta.WithMaxBatchSize(metabaseCfg.BoltDB().MaxBatchSize()),
meta.WithMaxBatchDelay(metabaseCfg.BoltDB().MaxBatchDelay()),
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
}),
meta.WithEpochState(c.cfgNetmap.state),
),
shard.WithPiloramaOptions(piloramaOpts...),
shard.WithWriteCache(writeCacheCfg.Enabled()),
shard.WithWriteCacheOptions(writeCacheOpts...),
shard.WithRemoverBatchSize(gcCfg.RemoverBatchSize()),
shard.WithGCRemoverSleepInterval(gcCfg.RemoverSleepInterval()),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
fatalOnErr(err)
return pool
}),
})
return nil
})
if err != nil {
panic(err)
}
c.cfgObject.cfgLocalStorage.shardOpts = opts
}
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
var err error