package main import ( "context" "errors" "fmt" "io/fs" "net" "os" "os/signal" "path/filepath" "strings" "sync" "sync/atomic" "syscall" "time" netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient" contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts" engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine" shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object" replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" netmap2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone" tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" neogoutil "github.com/nspcc-dev/neo-go/pkg/util" "github.com/panjf2000/ants/v2" "go.etcd.io/bbolt" "go.uber.org/zap" "google.golang.org/grpc" ) const addressSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding const maxMsgSize = 4 << 20 // transport msg limit 4 MiB // capacity of the pools of the morph notification handlers // for each contract listener. const notificationHandlerPoolSize = 10 // applicationConfiguration reads and stores component-specific configuration // values. It should not store any application helpers structs (pointers to shared // structs). // It must not be used concurrently. type applicationConfiguration struct { // _read indicated whether a config // has already been read _read bool LoggerCfg struct { level string } EngineCfg struct { errorThreshold uint32 shardPoolSize uint32 shards []shardCfg } } type shardCfg struct { compress bool smallSizeObjectLimit uint64 uncompressableContentType []string refillMetabase bool mode shardmode.Mode metaCfg struct { path string perm fs.FileMode maxBatchSize int maxBatchDelay time.Duration } subStorages []subStorageCfg gcCfg struct { removerBatchSize int removerSleepInterval time.Duration expiredCollectorBatchSize int expiredCollectorWorkersCount int } writecacheCfg struct { enabled bool path string maxBatchSize int maxBatchDelay time.Duration smallObjectSize uint64 maxObjSize uint64 flushWorkerCount int sizeLimit uint64 noSync bool } piloramaCfg struct { enabled bool path string perm fs.FileMode noSync bool maxBatchSize int maxBatchDelay time.Duration } } // id returns persistent id of a shard. It is different from the ID used in runtime // and is primarily used to identify shards in the configuration. func (c *shardCfg) id() string { // This calculation should be kept in sync with // pkg/local_object_storage/engine/control.go file. var sb strings.Builder for i := range c.subStorages { sb.WriteString(filepath.Clean(c.subStorages[i].path)) } return sb.String() } type subStorageCfg struct { // common for all storages typ string path string perm fs.FileMode depth uint64 noSync bool // blobovnicza-specific size uint64 width uint64 openedCacheSize int } // readConfig fills applicationConfiguration with raw configuration values // not modifying them. func (a *applicationConfiguration) readConfig(c *config.Config) error { if a._read { err := c.Reload() if err != nil { return fmt.Errorf("could not reload configuration: %w", err) } err = validateConfig(c) if err != nil { return fmt.Errorf("configuration's validation: %w", err) } // clear if it is rereading *a = applicationConfiguration{} } a._read = true // Logger a.LoggerCfg.level = loggerconfig.Level(c) // Storage Engine a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) } func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig *shardconfig.Config) error { var newConfig shardCfg newConfig.refillMetabase = oldConfig.RefillMetabase() newConfig.mode = oldConfig.Mode() newConfig.compress = oldConfig.Compress() newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes() newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit() a.setShardWriteCacheConfig(&newConfig, oldConfig) a.setShardPiloramaConfig(c, &newConfig, oldConfig) if err := a.setShardStorageConfig(&newConfig, oldConfig); err != nil { return err } a.setMetabaseConfig(&newConfig, oldConfig) a.setGCConfig(&newConfig, oldConfig) a.EngineCfg.shards = append(a.EngineCfg.shards, newConfig) return nil } func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) { writeCacheCfg := oldConfig.WriteCache() if writeCacheCfg.Enabled() { wc := &newConfig.writecacheCfg wc.enabled = true wc.path = writeCacheCfg.Path() wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() wc.maxObjSize = writeCacheCfg.MaxObjectSize() wc.smallObjectSize = writeCacheCfg.SmallObjectSize() wc.flushWorkerCount = writeCacheCfg.WorkersNumber() wc.sizeLimit = writeCacheCfg.SizeLimit() wc.noSync = writeCacheCfg.NoSync() } } func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newConfig *shardCfg, oldConfig *shardconfig.Config) { if config.BoolSafe(c.Sub("tree"), "enabled") { piloramaCfg := oldConfig.Pilorama() pr := &newConfig.piloramaCfg pr.enabled = true pr.path = piloramaCfg.Path() pr.perm = piloramaCfg.Perm() pr.noSync = piloramaCfg.NoSync() pr.maxBatchSize = piloramaCfg.MaxBatchSize() pr.maxBatchDelay = piloramaCfg.MaxBatchDelay() } } func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) error { blobStorCfg := oldConfig.BlobStor() storagesCfg := blobStorCfg.Storages() ss := make([]subStorageCfg, 0, len(storagesCfg)) for i := range storagesCfg { var sCfg subStorageCfg sCfg.typ = storagesCfg[i].Type() sCfg.path = storagesCfg[i].Path() sCfg.perm = storagesCfg[i].Perm() switch storagesCfg[i].Type() { case blobovniczatree.Type: sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i])) sCfg.size = sub.Size() sCfg.depth = sub.ShallowDepth() sCfg.width = sub.ShallowWidth() sCfg.openedCacheSize = sub.OpenedCacheSize() case fstree.Type: sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.depth = sub.Depth() sCfg.noSync = sub.NoSync() default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } ss = append(ss, sCfg) } newConfig.subStorages = ss return nil } func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) { metabaseCfg := oldConfig.Metabase() m := &newConfig.metaCfg m.path = metabaseCfg.Path() m.perm = metabaseCfg.BoltDB().Perm() m.maxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay() m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize() } func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) { gcCfg := oldConfig.GC() newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize() newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval() newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize() newConfig.gcCfg.expiredCollectorWorkersCount = gcCfg.ExpiredCollectorWorkersCount() } // internals contains application-specific internals that are created // on application startup and are shared b/w the components during // the application life cycle. // It should not contain any read configuration values, component-specific // helpers and fields. type internals struct { done chan struct{} ctxCancel func() internalErr chan error // channel for internal application errors at runtime appCfg *config.Config log *logger.Logger wg sync.WaitGroup workers []worker closers []closer apiVersion version.Version healthStatus *atomic.Int32 // is node under maintenance isMaintenance atomic.Bool } // starts node's maintenance. func (c *cfg) startMaintenance() { c.isMaintenance.Store(true) c.cfgNetmap.state.setControlNetmapStatus(control.NetmapStatus_MAINTENANCE) c.log.Info(logs.FrostFSNodeStartedLocalNodesMaintenance) } // stops node's maintenance. func (c *internals) stopMaintenance() { c.isMaintenance.Store(false) c.log.Info(logs.FrostFSNodeStoppedLocalNodesMaintenance) } // IsMaintenance checks if storage node is under maintenance. // // Provides util.NodeState to Object service. func (c *internals) IsMaintenance() bool { return c.isMaintenance.Load() } // shared contains component-specific structs/helpers that should // be shared during initialization of the application. type shared struct { privateTokenStore sessionStorage persistate *state.PersistentStorage clientCache *cache.ClientCache bgClientCache *cache.ClientCache putClientCache *cache.ClientCache localAddr network.AddressGroup key *keys.PrivateKey binPublicKey []byte ownerIDFromKey user.ID // user ID calculated from key // current network map netMap atomic.Value // type netmap.NetMap netMapSource netmapCore.Source cnrClient *containerClient.Client respSvc *response.Service replicator *replicator.Replicator treeService *tree.Service metricsCollector *metrics.NodeMetrics metricsSvc *objectService.MetricCollector } // dynamicConfiguration stores parameters of the // components that supports runtime reconfigurations. type dynamicConfiguration struct { logger *logger.Prm pprof *httpComponent metrics *httpComponent } type cfg struct { applicationConfiguration internals shared dynamicConfiguration // configuration of the internal // services cfgGRPC cfgGRPC cfgMorph cfgMorph cfgAccounting cfgAccounting cfgContainer cfgContainer cfgNodeInfo cfgNodeInfo cfgNetmap cfgNetmap cfgControlService cfgControlService cfgObject cfgObject cfgNotifications cfgNotifications } // ReadCurrentNetMap reads network map which has been cached at the // latest epoch. Returns an error if value has not been cached yet. // // Provides interface for NetmapService server. func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error { val := c.netMap.Load() if val == nil { return errors.New("missing local network map") } val.(netmap.NetMap).WriteToV2(msg) return nil } type cfgGRPC struct { listeners []net.Listener servers []*grpc.Server maxChunkSize uint64 maxAddrAmount uint64 } type cfgMorph struct { client *client.Client notaryEnabled bool // TTL of Sidechain cached values. Non-positive value disables caching. cacheTTL time.Duration proxyScriptHash neogoutil.Uint160 } type cfgAccounting struct { scriptHash neogoutil.Uint160 } type cfgContainer struct { scriptHash neogoutil.Uint160 parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers } type cfgNetmap struct { scriptHash neogoutil.Uint160 wrapper *nmClient.Client parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers state *networkState needBootstrap bool reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime startEpoch uint64 // epoch number when application is started } type cfgNodeInfo struct { // values from config localInfo netmap.NodeInfo } type cfgObject struct { getSvc *getsvc.Service cnrSource container.Source eaclSource container.EACLSource pool cfgObjectRoutines cfgLocalStorage cfgLocalStorage tombstoneLifetime uint64 } type cfgNotifications struct { enabled bool nw notificationWriter defaultTopic string } type cfgLocalStorage struct { localStorage *engine.StorageEngine } type cfgObjectRoutines struct { putRemote *ants.Pool putRemoteCapacity int putLocal *ants.Pool putLocalCapacity int replicatorPoolSize int replication *ants.Pool } type cfgControlService struct { server *grpc.Server } var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block") func initCfg(appCfg *config.Config) *cfg { c := &cfg{} err := c.readConfig(appCfg) if err != nil { panic(fmt.Errorf("config reading: %w", err)) } key := nodeconfig.Key(appCfg) logPrm, err := c.loggerPrm() fatalOnErr(err) logPrm.MetricsNamespace = "frostfs_node" log, err := logger.NewLogger(logPrm) fatalOnErr(err) c.internals = initInternals(appCfg, log) relayOnly := nodeconfig.Relay(appCfg) netState := newNetworkState() c.shared = initShared(appCfg, key, netState, relayOnly) c.cfgAccounting = cfgAccounting{ scriptHash: contractsconfig.Balance(appCfg), } c.cfgContainer = initContainer(appCfg) c.cfgNetmap = initNetmap(appCfg, netState, relayOnly) c.cfgGRPC = initCfgGRPC() c.cfgMorph = cfgMorph{ proxyScriptHash: contractsconfig.Proxy(appCfg), } c.cfgObject = initCfgObject(appCfg) user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey) c.metricsCollector = metrics.NewNodeMetrics() netState.metrics = c.metricsCollector c.onShutdown(c.clientCache.CloseAll) // clean up connections c.onShutdown(c.bgClientCache.CloseAll) // clean up connections c.onShutdown(c.putClientCache.CloseAll) // clean up connections c.onShutdown(func() { _ = c.persistate.Close() }) return c } func initInternals(appCfg *config.Config, log *logger.Logger) internals { var healthStatus atomic.Int32 healthStatus.Store(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)) return internals{ done: make(chan struct{}), appCfg: appCfg, internalErr: make(chan error), log: log, apiVersion: version.Current(), healthStatus: &healthStatus, } } func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState, relayOnly bool) shared { var netAddr network.AddressGroup if !relayOnly { netAddr = nodeconfig.BootstrapAddresses(appCfg) } persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path()) fatalOnErr(err) cacheOpts := cache.ClientCacheOpts{ DialTimeout: apiclientconfig.DialTimeout(appCfg), StreamTimeout: apiclientconfig.StreamTimeout(appCfg), Key: &key.PrivateKey, AllowExternal: apiclientconfig.AllowExternal(appCfg), ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg), } return shared{ key: key, binPublicKey: key.PublicKey().Bytes(), localAddr: netAddr, respSvc: response.NewService(response.WithNetworkState(netState)), clientCache: cache.NewSDKClientCache(cacheOpts), bgClientCache: cache.NewSDKClientCache(cacheOpts), putClientCache: cache.NewSDKClientCache(cacheOpts), persistate: persistate, } } func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap { netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) var reBootstrapTurnedOff atomic.Bool reBootstrapTurnedOff.Store(relayOnly) return cfgNetmap{ scriptHash: contractsconfig.Netmap(appCfg), state: netState, workerPool: netmapWorkerPool, needBootstrap: !relayOnly, reBoostrapTurnedOff: &reBootstrapTurnedOff, } } func initContainer(appCfg *config.Config) cfgContainer { containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) return cfgContainer{ scriptHash: contractsconfig.Container(appCfg), workerPool: containerWorkerPool, } } func initCfgGRPC() cfgGRPC { maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes return cfgGRPC{ maxChunkSize: maxChunkSize, maxAddrAmount: maxAddrAmount, } } func initCfgObject(appCfg *config.Config) cfgObject { return cfgObject{ pool: initObjectPool(appCfg), tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg), } } func (c *cfg) engineOpts() []engine.Option { opts := make([]engine.Option, 0, 4) opts = append(opts, engine.WithShardPoolSize(c.EngineCfg.shardPoolSize), engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), ) if c.metricsCollector != nil { opts = append(opts, engine.WithMetrics(c.metricsCollector.Engine())) } return opts } type shardOptsWithID struct { configID string shOpts []shard.Option } func (c *cfg) shardOpts() []shardOptsWithID { shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards)) for _, shCfg := range c.EngineCfg.shards { shards = append(shards, c.getShardOpts(shCfg)) } return shards } func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { var writeCacheOpts []writecache.Option if wcRead := shCfg.writecacheCfg; wcRead.enabled { writeCacheOpts = append(writeCacheOpts, writecache.WithPath(wcRead.path), writecache.WithMaxBatchSize(wcRead.maxBatchSize), writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithSmallObjectSize(wcRead.smallObjectSize), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithMaxCacheSize(wcRead.sizeLimit), writecache.WithNoSync(wcRead.noSync), writecache.WithLogger(c.log), ) } return writeCacheOpts } func (c *cfg) getPiloramaOpts(shCfg shardCfg) []pilorama.Option { var piloramaOpts []pilorama.Option if prRead := shCfg.piloramaCfg; prRead.enabled { piloramaOpts = append(piloramaOpts, pilorama.WithPath(prRead.path), pilorama.WithPerm(prRead.perm), pilorama.WithNoSync(prRead.noSync), pilorama.WithMaxBatchSize(prRead.maxBatchSize), pilorama.WithMaxBatchDelay(prRead.maxBatchDelay), ) } return piloramaOpts } func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage { var ss []blobstor.SubStorage for _, sRead := range shCfg.subStorages { switch sRead.typ { case blobovniczatree.Type: ss = append(ss, blobstor.SubStorage{ Storage: blobovniczatree.NewBlobovniczaTree( blobovniczatree.WithRootPath(sRead.path), blobovniczatree.WithPermissions(sRead.perm), blobovniczatree.WithBlobovniczaSize(sRead.size), blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth), blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), blobovniczatree.WithLogger(c.log)), Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) < shCfg.smallSizeObjectLimit }, }) case fstree.Type: ss = append(ss, blobstor.SubStorage{ Storage: fstree.New( fstree.WithPath(sRead.path), fstree.WithPerm(sRead.perm), fstree.WithDepth(sRead.depth), fstree.WithNoSync(sRead.noSync)), Policy: func(_ *objectSDK.Object, data []byte) bool { return true }, }) default: // should never happen, that has already // been handled: when the config was read } } return ss } func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID { writeCacheOpts := c.getWriteCacheOpts(shCfg) piloramaOpts := c.getPiloramaOpts(shCfg) ss := c.getSubstorageOpts(shCfg) var sh shardOptsWithID sh.configID = shCfg.id() sh.shOpts = []shard.Option{ shard.WithLogger(c.log), shard.WithRefillMetabase(shCfg.refillMetabase), shard.WithMode(shCfg.mode), shard.WithBlobStorOptions( blobstor.WithCompressObjects(shCfg.compress), blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType), blobstor.WithStorages(ss), blobstor.WithLogger(c.log), ), shard.WithMetaBaseOptions( meta.WithPath(shCfg.metaCfg.path), meta.WithPermissions(shCfg.metaCfg.perm), meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize), meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay), meta.WithBoltDBOptions(&bbolt.Options{ Timeout: 100 * time.Millisecond, }), meta.WithLogger(c.log), meta.WithEpochState(c.cfgNetmap.state), ), shard.WithPiloramaOptions(piloramaOpts...), shard.WithWriteCache(shCfg.writecacheCfg.enabled), shard.WithWriteCacheOptions(writeCacheOpts...), shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize), shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval), shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize), shard.WithExpiredCollectorWorkersCount(shCfg.gcCfg.expiredCollectorWorkersCount), shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) fatalOnErr(err) return pool }), } return sh } func (c *cfg) loggerPrm() (*logger.Prm, error) { // check if it has been inited before if c.dynamicConfiguration.logger == nil { c.dynamicConfiguration.logger = new(logger.Prm) } // (re)init read configuration err := c.dynamicConfiguration.logger.SetLevelString(c.LoggerCfg.level) if err != nil { // not expected since validation should be performed before panic(fmt.Sprintf("incorrect log level format: %s", c.LoggerCfg.level)) } return c.dynamicConfiguration.logger, nil } func (c *cfg) LocalAddress() network.AddressGroup { return c.localAddr } func initLocalStorage(c *cfg) { ls := engine.New(c.engineOpts()...) addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) }) // allocate memory for the service; // service will be created later c.cfgObject.getSvc = new(getsvc.Service) var shardsAttached int for _, optsWithMeta := range c.shardOpts() { id, err := ls.AddShard(append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...) if err != nil { c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err)) } else { shardsAttached++ c.log.Info(logs.FrostFSNodeShardAttachedToEngine, zap.Stringer("id", id)) } } if shardsAttached == 0 { fatalOnErr(engineconfig.ErrNoShardConfigured) } c.cfgObject.cfgLocalStorage.localStorage = ls c.onShutdown(func() { c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine) err := ls.Close() if err != nil { c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure, zap.String("error", err.Error()), ) } else { c.log.Info(logs.FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully) } }) } func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { var err error optNonBlocking := ants.WithNonblocking(true) pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote() pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking) fatalOnErr(err) pool.putLocalCapacity = objectconfig.Put(cfg).PoolSizeLocal() pool.putLocal, err = ants.NewPool(pool.putLocalCapacity, optNonBlocking) fatalOnErr(err) pool.replicatorPoolSize = replicatorconfig.PoolSize(cfg) if pool.replicatorPoolSize <= 0 { pool.replicatorPoolSize = pool.putRemoteCapacity } pool.replication, err = ants.NewPool(pool.replicatorPoolSize) fatalOnErr(err) return pool } func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { var res netmapV2.NodeInfo ni, ok := c.cfgNetmap.state.getNodeInfo() if ok { ni.WriteToV2(&res) } else { c.cfgNodeInfo.localInfo.WriteToV2(&res) } return &res, nil } // handleLocalNodeInfo rewrites local node info from the FrostFS network map. // Called with nil when storage node is outside the FrostFS network map // (before entering the network and after leaving it). func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) { c.cfgNetmap.state.setNodeInfo(ni) } // bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract // with the binary-encoded information from the current node's configuration. // The state is set using the provided setter which MUST NOT be nil. func (c *cfg) bootstrapWithState(stateSetter func(*netmap.NodeInfo)) error { ni := c.cfgNodeInfo.localInfo stateSetter(&ni) prm := nmClient.AddPeerPrm{} prm.SetNodeInfo(ni) return c.cfgNetmap.wrapper.AddPeer(prm) } // bootstrapOnline calls cfg.bootstrapWithState with "online" state. func bootstrapOnline(c *cfg) error { return c.bootstrapWithState((*netmap.NodeInfo).SetOnline) } // bootstrap calls bootstrapWithState with: // - "maintenance" state if maintenance is in progress on the current node // - "online", otherwise func (c *cfg) bootstrap() error { // switch to online except when under maintenance st := c.cfgNetmap.state.controlNetmapStatus() if st == control.NetmapStatus_MAINTENANCE { c.log.Info(logs.FrostFSNodeBootstrappingWithTheMaintenanceState) return c.bootstrapWithState((*netmap.NodeInfo).SetMaintenance) } c.log.Info(logs.FrostFSNodeBootstrappingWithOnlineState, zap.Stringer("previous", st), ) return bootstrapOnline(c) } // needBootstrap checks if local node should be registered in network on bootup. func (c *cfg) needBootstrap() bool { return c.cfgNetmap.needBootstrap } // ObjectServiceLoad implements system loader interface for policer component. // It is calculated as size/capacity ratio of "remote object put" worker. // Returns float value between 0.0 and 1.0. func (c *cfg) ObjectServiceLoad() float64 { return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity) } type dCmp struct { name string reloadFunc func() error } func (c *cfg) signalWatcher(ctx context.Context) { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) for { select { case sig := <-ch: switch sig { case syscall.SIGHUP: c.reloadConfig(ctx) case syscall.SIGTERM, syscall.SIGINT: c.log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping) // TODO (@acid-ant): #49 need to cover case when stuck at the middle(node health UNDEFINED or STARTING) c.shutdown() c.log.Info(logs.FrostFSNodeTerminationSignalProcessingIsComplete) return } case err := <-c.internalErr: // internal application error c.log.Warn(logs.FrostFSNodeInternalApplicationError, zap.String("message", err.Error())) c.shutdown() c.log.Info(logs.FrostFSNodeInternalErrorProcessingIsComplete) return } } } func (c *cfg) reloadConfig(ctx context.Context) { c.log.Info(logs.FrostFSNodeSIGHUPHasBeenReceivedRereadingConfiguration) err := c.readConfig(c.appCfg) if err != nil { c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err)) return } // all the components are expected to support // Logger's dynamic reconfiguration approach var components []dCmp // Logger logPrm, err := c.loggerPrm() if err != nil { c.log.Error(logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err)) return } components = append(components, dCmp{"logger", logPrm.Reload}) components = append(components, dCmp{"tracing", func() error { updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg)) if updated { c.log.Info(logs.FrostFSNodeTracingConfigationUpdated) } return err }}) if cmp, updated := metricsComponent(c); updated { if cmp.enabled { cmp.preReload = enableMetricsSvc } else { cmp.preReload = disableMetricsSvc } components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }}) } if cmp, updated := pprofComponent(c); updated { components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }}) } // Storage Engine var rcfg engine.ReConfiguration for _, optsWithID := range c.shardOpts() { rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))) } err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg) if err != nil { c.log.Error(logs.FrostFSNodeStorageEngineConfigurationUpdate, zap.Error(err)) return } for _, component := range components { err = component.reloadFunc() if err != nil { c.log.Error(logs.FrostFSNodeUpdatedConfigurationApplying, zap.String("component", component.name), zap.Error(err)) } } c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) } func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker { var tssPrm tsourse.TombstoneSourcePrm tssPrm.SetGetService(c.cfgObject.getSvc) tombstoneSrc := tsourse.NewSource(tssPrm) tombstoneSource := tombstone.NewChecker( tombstone.WithLogger(c.log), tombstone.WithTombstoneSource(tombstoneSrc), ) return tombstoneSource } func (c *cfg) shutdown() { c.setHealthStatus(control.HealthStatus_SHUTTING_DOWN) c.ctxCancel() c.done <- struct{}{} for i := range c.closers { c.closers[len(c.closers)-1-i].fn() } close(c.internalErr) }