From 91b56ad3e8ae6c221ca3281f25493dc549f23ba0 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 26 Sep 2022 21:31:45 +0300 Subject: [PATCH] [#1770] node: Read storage config in a separate struct It will allow rereading config values and will simplify distinguishing them from the custom values in the `cfg` structure. Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 433 +++++++++++++++++++++++++++------------ 1 file changed, 299 insertions(+), 134 deletions(-) diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 39f6c135e..d786cddc4 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io/fs" "net" "sync" atomicstd "sync/atomic" @@ -32,6 +33,7 @@ import ( meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/pilorama" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + shardmode "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" "github.com/nspcc-dev/neofs-node/pkg/metrics" "github.com/nspcc-dev/neofs-node/pkg/morph/client" @@ -71,7 +73,175 @@ const maxMsgSize = 4 << 20 // transport msg limit 4 MiB // for each contract listener. const notificationHandlerPoolSize = 10 +// applicationConfiguration reads and stores component-specific configuration +// values. It should not store any application helpers structs (pointers to shared +// structs). +// It must not be used concurrently. +type applicationConfiguration struct { + EngineCfg struct { + errorThreshold uint32 + shardPoolSize uint32 + shards []shardCfg + } +} + +type shardCfg struct { + compress bool + smallSizeObjectLimit uint64 + uncompressableContentType []string + refillMetabase bool + mode shardmode.Mode + + metaCfg struct { + path string + perm fs.FileMode + maxBatchSize int + maxBatchDelay time.Duration + } + + subStorages []subStorageCfg + + gcCfg struct { + removerBatchSize int + removerSleepInterval time.Duration + } + + writecacheCfg struct { + enabled bool + path string + maxBatchSize int + maxBatchDelay time.Duration + smallObjectSize uint64 + maxObjSize uint64 + flushWorkerCount int + maxCacheSize uint64 + sizeLimit uint64 + } + + piloramaCfg struct { + enabled bool + path string + perm fs.FileMode + noSync bool + maxBatchSize int + maxBatchDelay time.Duration + } +} + +type subStorageCfg struct { + // common for all storages + typ string + path string + perm fs.FileMode + depth uint64 + + // blobovnicza-specific + size uint64 + width uint64 + openedCacheSize int +} + +// readConfig fills applicationConfiguration with raw configuration values +// not modifying them. +func (a *applicationConfiguration) readConfig(c *config.Config) error { + a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) + a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) + + return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { + var sh shardCfg + + sh.refillMetabase = sc.RefillMetabase() + sh.mode = sc.Mode() + sh.compress = sc.Compress() + sh.uncompressableContentType = sc.UncompressableContentTypes() + sh.smallSizeObjectLimit = sc.SmallSizeLimit() + + // write-cache + + writeCacheCfg := sc.WriteCache() + if writeCacheCfg.Enabled() { + wc := &sh.writecacheCfg + + wc.enabled = true + wc.path = writeCacheCfg.Path() + wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() + wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() + wc.maxCacheSize = writeCacheCfg.MaxObjectSize() + wc.smallObjectSize = writeCacheCfg.SmallObjectSize() + wc.flushWorkerCount = writeCacheCfg.WorkersNumber() + wc.sizeLimit = writeCacheCfg.SizeLimit() + } + + // blobstor with substorages + + blobStorCfg := sc.BlobStor() + storagesCfg := blobStorCfg.Storages() + metabaseCfg := sc.Metabase() + gcCfg := sc.GC() + + if config.BoolSafe(c.Sub("tree"), "enabled") { + piloramaCfg := sc.Pilorama() + pr := &sh.piloramaCfg + + pr.enabled = true + pr.path = piloramaCfg.Path() + pr.perm = piloramaCfg.Perm() + pr.noSync = piloramaCfg.NoSync() + pr.maxBatchSize = piloramaCfg.MaxBatchSize() + pr.maxBatchDelay = piloramaCfg.MaxBatchDelay() + } + + ss := make([]subStorageCfg, 0, len(storagesCfg)) + for i := range storagesCfg { + var sCfg subStorageCfg + + sCfg.typ = storagesCfg[i].Type() + sCfg.path = storagesCfg[i].Path() + sCfg.perm = storagesCfg[i].Perm() + + switch storagesCfg[i].Type() { + case blobovniczatree.Type: + sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i])) + + sCfg.size = sub.Size() + sCfg.depth = sub.ShallowDepth() + sCfg.width = sub.ShallowWidth() + sCfg.openedCacheSize = sub.OpenedCacheSize() + case fstree.Type: + sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) + sCfg.depth = sub.Depth() + default: + return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) + } + + ss = append(ss, sCfg) + } + + sh.subStorages = ss + + // meta + + m := &sh.metaCfg + + m.path = metabaseCfg.Path() + m.perm = metabaseCfg.BoltDB().Perm() + m.maxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay() + m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize() + + // GC + + sh.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize() + sh.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval() + + a.EngineCfg.shards = append(a.EngineCfg.shards, sh) + + return nil + }) +} + type cfg struct { + applicationConfiguration + ctx context.Context appCfg *config.Config @@ -231,8 +401,6 @@ type cfgNotifications struct { type cfgLocalStorage struct { localStorage *engine.StorageEngine - - shardOpts [][]shard.Option } type cfgObjectRoutines struct { @@ -346,6 +514,12 @@ func initCfg(appCfg *config.Config) *cfg { persistate: persistate, } + // returned err must be nil during first time read + err = c.readConfig(appCfg) + if err != nil { + panic(fmt.Errorf("config reading: %w", err)) + } + user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey) if metricsconfig.Enabled(c.appCfg) { @@ -359,23 +533,133 @@ func initCfg(appCfg *config.Config) *cfg { return c } +func (c *cfg) engineOpts() []engine.Option { + opts := make([]engine.Option, 0, 4) + + opts = append(opts, + engine.WithShardPoolSize(c.EngineCfg.shardPoolSize), + engine.WithErrorThreshold(c.EngineCfg.errorThreshold), + + engine.WithLogger(c.log), + ) + + if c.metricsCollector != nil { + opts = append(opts, engine.WithMetrics(c.metricsCollector)) + } + + return opts +} + +func (c *cfg) shardOpts() [][]shard.Option { + oo := make([][]shard.Option, 0, len(c.EngineCfg.shards)) + + for _, shCfg := range c.EngineCfg.shards { + var writeCacheOpts []writecache.Option + if wcRead := shCfg.writecacheCfg; wcRead.enabled { + writeCacheOpts = append(writeCacheOpts, + writecache.WithPath(wcRead.path), + writecache.WithMaxBatchSize(wcRead.maxBatchSize), + writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), + writecache.WithMaxObjectSize(wcRead.maxObjSize), + writecache.WithSmallObjectSize(wcRead.smallObjectSize), + writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), + writecache.WithMaxCacheSize(wcRead.sizeLimit), + + writecache.WithLogger(c.log), + ) + } + + var piloramaOpts []pilorama.Option + if prRead := shCfg.piloramaCfg; prRead.enabled { + piloramaOpts = append(piloramaOpts, + pilorama.WithPath(prRead.path), + pilorama.WithPerm(prRead.perm), + pilorama.WithNoSync(prRead.noSync), + pilorama.WithMaxBatchSize(prRead.maxBatchSize), + pilorama.WithMaxBatchDelay(prRead.maxBatchDelay), + ) + } + + var ss []blobstor.SubStorage + for _, sRead := range shCfg.subStorages { + switch sRead.typ { + case blobovniczatree.Type: + ss = append(ss, blobstor.SubStorage{ + Storage: blobovniczatree.NewBlobovniczaTree( + blobovniczatree.WithRootPath(sRead.path), + blobovniczatree.WithPermissions(sRead.perm), + blobovniczatree.WithBlobovniczaSize(sRead.size), + blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth), + blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), + blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), + + blobovniczatree.WithLogger(c.log)), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return uint64(len(data)) < shCfg.smallSizeObjectLimit + }, + }) + case fstree.Type: + ss = append(ss, blobstor.SubStorage{ + Storage: fstree.New( + fstree.WithPath(sRead.path), + fstree.WithPerm(sRead.perm), + fstree.WithDepth(sRead.depth)), + Policy: func(_ *objectSDK.Object, data []byte) bool { + return true + }, + }) + default: + // should never happen, that has already + // been handled: when the config was read + } + } + + oo = append(oo, []shard.Option{ + shard.WithLogger(c.log), + shard.WithRefillMetabase(shCfg.refillMetabase), + shard.WithMode(shCfg.mode), + shard.WithBlobStorOptions( + blobstor.WithCompressObjects(shCfg.compress), + blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType), + blobstor.WithStorages(ss), + + blobstor.WithLogger(c.log), + ), + shard.WithMetaBaseOptions( + meta.WithPath(shCfg.metaCfg.path), + meta.WithPermissions(shCfg.metaCfg.perm), + meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize), + meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay), + meta.WithBoltDBOptions(&bbolt.Options{ + Timeout: 100 * time.Millisecond, + }), + + meta.WithLogger(c.log), + meta.WithEpochState(c.cfgNetmap.state), + ), + shard.WithPiloramaOptions(piloramaOpts...), + shard.WithWriteCache(shCfg.writecacheCfg.enabled), + shard.WithWriteCacheOptions(writeCacheOpts...), + shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize), + shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval), + shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { + pool, err := ants.NewPool(sz) + fatalOnErr(err) + + return pool + }), + }) + } + + return oo +} + func (c *cfg) LocalAddress() network.AddressGroup { return c.localAddr } func initLocalStorage(c *cfg) { - initShardOptions(c) - - engineOpts := []engine.Option{ - engine.WithLogger(c.log), - engine.WithShardPoolSize(engineconfig.ShardPoolSize(c.appCfg)), - engine.WithErrorThreshold(engineconfig.ShardErrorThreshold(c.appCfg)), - } - if c.metricsCollector != nil { - engineOpts = append(engineOpts, engine.WithMetrics(c.metricsCollector)) - } - - ls := engine.New(engineOpts...) + ls := engine.New(c.engineOpts()...) addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) @@ -394,7 +678,7 @@ func initLocalStorage(c *cfg) { tombstone.WithTombstoneSource(tombstoneSrc), ) - for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts { + for _, opts := range c.shardOpts() { id, err := ls.AddShard(append(opts, shard.WithTombstoneSource(tombstoneSource))...) fatalOnErr(err) @@ -419,125 +703,6 @@ func initLocalStorage(c *cfg) { }) } -func initShardOptions(c *cfg) { - var opts [][]shard.Option - - require := !nodeconfig.Relay(c.appCfg) // relay node does not require shards - - err := engineconfig.IterateShards(c.appCfg, require, func(sc *shardconfig.Config) error { - var writeCacheOpts []writecache.Option - - writeCacheCfg := sc.WriteCache() - if writeCacheCfg.Enabled() { - writeCacheOpts = []writecache.Option{ - writecache.WithPath(writeCacheCfg.Path()), - writecache.WithLogger(c.log), - writecache.WithMaxBatchSize(writeCacheCfg.BoltDB().MaxBatchSize()), - writecache.WithMaxBatchDelay(writeCacheCfg.BoltDB().MaxBatchDelay()), - writecache.WithMaxObjectSize(writeCacheCfg.MaxObjectSize()), - writecache.WithSmallObjectSize(writeCacheCfg.SmallObjectSize()), - writecache.WithFlushWorkersCount(writeCacheCfg.WorkersNumber()), - writecache.WithMaxCacheSize(writeCacheCfg.SizeLimit()), - } - } - - blobStorCfg := sc.BlobStor() - storages := blobStorCfg.Storages() - metabaseCfg := sc.Metabase() - gcCfg := sc.GC() - - var piloramaOpts []pilorama.Option - - piloramaCfg := sc.Pilorama() - if config.BoolSafe(c.appCfg.Sub("tree"), "enabled") { - piloramaOpts = []pilorama.Option{ - pilorama.WithPath(piloramaCfg.Path()), - pilorama.WithPerm(piloramaCfg.Perm()), - pilorama.WithNoSync(piloramaCfg.NoSync()), - pilorama.WithMaxBatchSize(piloramaCfg.MaxBatchSize()), - pilorama.WithMaxBatchDelay(piloramaCfg.MaxBatchDelay())} - } - - var st []blobstor.SubStorage - for i := range storages { - switch storages[i].Type() { - case blobovniczatree.Type: - sub := blobovniczaconfig.From((*config.Config)(storages[i])) - lim := sc.SmallSizeLimit() - st = append(st, blobstor.SubStorage{ - Storage: blobovniczatree.NewBlobovniczaTree( - blobovniczatree.WithLogger(c.log), - blobovniczatree.WithRootPath(storages[i].Path()), - blobovniczatree.WithPermissions(storages[i].Perm()), - blobovniczatree.WithBlobovniczaSize(sub.Size()), - blobovniczatree.WithBlobovniczaShallowDepth(sub.ShallowDepth()), - blobovniczatree.WithBlobovniczaShallowWidth(sub.ShallowWidth()), - blobovniczatree.WithOpenedCacheSize(sub.OpenedCacheSize())), - Policy: func(_ *objectSDK.Object, data []byte) bool { - return uint64(len(data)) < lim - }, - }) - case fstree.Type: - sub := fstreeconfig.From((*config.Config)(storages[i])) - st = append(st, blobstor.SubStorage{ - Storage: fstree.New( - fstree.WithPath(storages[i].Path()), - fstree.WithPerm(storages[i].Perm()), - fstree.WithDepth(sub.Depth())), - Policy: func(_ *objectSDK.Object, data []byte) bool { - return true - }, - }) - default: - return fmt.Errorf("invalid storage type: %s", storages[i].Type()) - } - } - - metaPath := metabaseCfg.Path() - metaPerm := metabaseCfg.BoltDB().Perm() - - opts = append(opts, []shard.Option{ - shard.WithLogger(c.log), - shard.WithRefillMetabase(sc.RefillMetabase()), - shard.WithMode(sc.Mode()), - shard.WithBlobStorOptions( - blobstor.WithCompressObjects(sc.Compress()), - blobstor.WithUncompressableContentTypes(sc.UncompressableContentTypes()), - blobstor.WithStorages(st), - blobstor.WithLogger(c.log), - ), - shard.WithMetaBaseOptions( - meta.WithLogger(c.log), - meta.WithPath(metaPath), - meta.WithPermissions(metaPerm), - meta.WithMaxBatchSize(metabaseCfg.BoltDB().MaxBatchSize()), - meta.WithMaxBatchDelay(metabaseCfg.BoltDB().MaxBatchDelay()), - meta.WithBoltDBOptions(&bbolt.Options{ - Timeout: 100 * time.Millisecond, - }), - meta.WithEpochState(c.cfgNetmap.state), - ), - shard.WithPiloramaOptions(piloramaOpts...), - shard.WithWriteCache(writeCacheCfg.Enabled()), - shard.WithWriteCacheOptions(writeCacheOpts...), - shard.WithRemoverBatchSize(gcCfg.RemoverBatchSize()), - shard.WithGCRemoverSleepInterval(gcCfg.RemoverSleepInterval()), - shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { - pool, err := ants.NewPool(sz) - fatalOnErr(err) - - return pool - }), - }) - return nil - }) - if err != nil { - panic(err) - } - - c.cfgObject.cfgLocalStorage.shardOpts = opts -} - func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { var err error