package main import ( "context" "errors" "fmt" "io/fs" "net" "os" "os/signal" "path/filepath" "strings" "sync" "sync/atomic" "syscall" "time" netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config" apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient" contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts" engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine" shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard" blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza" fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree" loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger" morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph" nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node" objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object" replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator" tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" lsmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" netmap2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone" tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state" "git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version" policy_engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" policy_client "git.frostfs.info/TrueCloudLab/policy-engine/pkg/morph/policy" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" neogoutil "github.com/nspcc-dev/neo-go/pkg/util" "github.com/panjf2000/ants/v2" "go.etcd.io/bbolt" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" ) const addressSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding const maxMsgSize = 4 << 20 // transport msg limit 4 MiB // capacity of the pools of the morph notification handlers // for each contract listener. const notificationHandlerPoolSize = 10 // applicationConfiguration reads and stores component-specific configuration // values. It should not store any application helpers structs (pointers to shared // structs). // It must not be used concurrently. type applicationConfiguration struct { // _read indicated whether a config // has already been read _read bool LoggerCfg struct { level string destination string } EngineCfg struct { errorThreshold uint32 shardPoolSize uint32 shards []shardCfg lowMem bool rebuildWorkers uint32 } // if need to run node in compatibility with other versions mode cmode *atomic.Bool } type shardCfg struct { compress bool estimateCompressibility bool estimateCompressibilityThreshold float64 smallSizeObjectLimit uint64 uncompressableContentType []string refillMetabase bool mode shardmode.Mode metaCfg struct { path string perm fs.FileMode maxBatchSize int maxBatchDelay time.Duration } subStorages []subStorageCfg gcCfg struct { removerBatchSize int removerSleepInterval time.Duration expiredCollectorBatchSize int expiredCollectorWorkerCount int } writecacheCfg struct { enabled bool path string maxBatchSize int maxBatchDelay time.Duration smallObjectSize uint64 maxObjSize uint64 flushWorkerCount int sizeLimit uint64 noSync bool } piloramaCfg struct { enabled bool path string perm fs.FileMode noSync bool maxBatchSize int maxBatchDelay time.Duration } } // id returns persistent id of a shard. It is different from the ID used in runtime // and is primarily used to identify shards in the configuration. func (c *shardCfg) id() string { // This calculation should be kept in sync with // pkg/local_object_storage/engine/control.go file. var sb strings.Builder for i := range c.subStorages { sb.WriteString(filepath.Clean(c.subStorages[i].path)) } return sb.String() } type subStorageCfg struct { // common for all storages typ string path string perm fs.FileMode depth uint64 noSync bool // blobovnicza-specific size uint64 width uint64 leafWidth uint64 openedCacheSize int initWorkerCount int initInAdvance bool rebuildDropTimeout time.Duration } // readConfig fills applicationConfiguration with raw configuration values // not modifying them. func (a *applicationConfiguration) readConfig(c *config.Config) error { if a._read { err := c.Reload() if err != nil { return fmt.Errorf("could not reload configuration: %w", err) } err = validateConfig(c) if err != nil { return fmt.Errorf("configuration's validation: %w", err) } // clear if it is rereading cmode := a.cmode *a = applicationConfiguration{} a.cmode = cmode } a._read = true a.cmode.Store(nodeconfig.CompatibilityMode(c)) // Logger a.LoggerCfg.level = loggerconfig.Level(c) a.LoggerCfg.destination = loggerconfig.Destination(c) // Storage Engine a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c) a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c) a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c) a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c) return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) }) } func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig *shardconfig.Config) error { var newConfig shardCfg newConfig.refillMetabase = oldConfig.RefillMetabase() newConfig.mode = oldConfig.Mode() newConfig.compress = oldConfig.Compress() newConfig.estimateCompressibility = oldConfig.EstimateCompressibility() newConfig.estimateCompressibilityThreshold = oldConfig.EstimateCompressibilityThreshold() newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes() newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit() a.setShardWriteCacheConfig(&newConfig, oldConfig) a.setShardPiloramaConfig(c, &newConfig, oldConfig) if err := a.setShardStorageConfig(&newConfig, oldConfig); err != nil { return err } a.setMetabaseConfig(&newConfig, oldConfig) a.setGCConfig(&newConfig, oldConfig) a.EngineCfg.shards = append(a.EngineCfg.shards, newConfig) return nil } func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) { writeCacheCfg := oldConfig.WriteCache() if writeCacheCfg.Enabled() { wc := &newConfig.writecacheCfg wc.enabled = true wc.path = writeCacheCfg.Path() wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize() wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay() wc.maxObjSize = writeCacheCfg.MaxObjectSize() wc.smallObjectSize = writeCacheCfg.SmallObjectSize() wc.flushWorkerCount = writeCacheCfg.WorkerCount() wc.sizeLimit = writeCacheCfg.SizeLimit() wc.noSync = writeCacheCfg.NoSync() } } func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newConfig *shardCfg, oldConfig *shardconfig.Config) { if config.BoolSafe(c.Sub("tree"), "enabled") { piloramaCfg := oldConfig.Pilorama() pr := &newConfig.piloramaCfg pr.enabled = true pr.path = piloramaCfg.Path() pr.perm = piloramaCfg.Perm() pr.noSync = piloramaCfg.NoSync() pr.maxBatchSize = piloramaCfg.MaxBatchSize() pr.maxBatchDelay = piloramaCfg.MaxBatchDelay() } } func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) error { blobStorCfg := oldConfig.BlobStor() storagesCfg := blobStorCfg.Storages() ss := make([]subStorageCfg, 0, len(storagesCfg)) for i := range storagesCfg { var sCfg subStorageCfg sCfg.typ = storagesCfg[i].Type() sCfg.path = storagesCfg[i].Path() sCfg.perm = storagesCfg[i].Perm() switch storagesCfg[i].Type() { case blobovniczatree.Type: sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i])) sCfg.size = sub.Size() sCfg.depth = sub.ShallowDepth() sCfg.width = sub.ShallowWidth() sCfg.leafWidth = sub.LeafWidth() sCfg.openedCacheSize = sub.OpenedCacheSize() sCfg.initWorkerCount = sub.InitWorkerCount() sCfg.initInAdvance = sub.InitInAdvance() sCfg.rebuildDropTimeout = sub.RebuildDropTimeout() case fstree.Type: sub := fstreeconfig.From((*config.Config)(storagesCfg[i])) sCfg.depth = sub.Depth() sCfg.noSync = sub.NoSync() default: return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type()) } ss = append(ss, sCfg) } newConfig.subStorages = ss return nil } func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) { metabaseCfg := oldConfig.Metabase() m := &newConfig.metaCfg m.path = metabaseCfg.Path() m.perm = metabaseCfg.BoltDB().Perm() m.maxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay() m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize() } func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) { gcCfg := oldConfig.GC() newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize() newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval() newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize() newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount() } // internals contains application-specific internals that are created // on application startup and are shared b/w the components during // the application life cycle. // It should not contain any read configuration values, component-specific // helpers and fields. type internals struct { done chan struct{} ctxCancel func() internalErr chan error // channel for internal application errors at runtime appCfg *config.Config log *logger.Logger wg sync.WaitGroup workers []worker closers []closer apiVersion version.Version healthStatus *atomic.Int32 // is node under maintenance isMaintenance atomic.Bool sdNotify bool } // starts node's maintenance. func (c *cfg) startMaintenance() { c.isMaintenance.Store(true) c.cfgNetmap.state.setControlNetmapStatus(control.NetmapStatus_MAINTENANCE) c.log.Info(logs.FrostFSNodeStartedLocalNodesMaintenance) } // stops node's maintenance. func (c *internals) stopMaintenance() { c.isMaintenance.Store(false) c.log.Info(logs.FrostFSNodeStoppedLocalNodesMaintenance) } // IsMaintenance checks if storage node is under maintenance. // // Provides util.NodeState to Object service. func (c *internals) IsMaintenance() bool { return c.isMaintenance.Load() } // shared contains component-specific structs/helpers that should // be shared during initialization of the application. type shared struct { privateTokenStore sessionStorage persistate *state.PersistentStorage clientCache *cache.ClientCache bgClientCache *cache.ClientCache putClientCache *cache.ClientCache localAddr network.AddressGroup key *keys.PrivateKey binPublicKey []byte ownerIDFromKey user.ID // user ID calculated from key // current network map netMap atomic.Value // type netmap.NetMap netMapSource netmapCore.Source cnrClient *containerClient.Client respSvc *response.Service replicator *replicator.Replicator treeService *tree.Service metricsCollector *metrics.NodeMetrics metricsSvc *objectService.MetricCollector } // dynamicConfiguration stores parameters of the // components that supports runtime reconfigurations. type dynamicConfiguration struct { logger *logger.Prm pprof *httpComponent metrics *httpComponent } type appConfigGuard struct { mtx sync.RWMutex } func (g *appConfigGuard) LockAppConfigShared() func() { g.mtx.RLock() return func() { g.mtx.RUnlock() } } func (g *appConfigGuard) LockAppConfigExclusive() func() { g.mtx.Lock() return func() { g.mtx.Unlock() } } type cfg struct { applicationConfiguration internals shared dynamicConfiguration appConfigGuard // configuration of the internal // services cfgGRPC cfgGRPC cfgMorph cfgMorph cfgAccounting cfgAccounting cfgContainer cfgContainer cfgFrostfsID cfgFrostfsID cfgNodeInfo cfgNodeInfo cfgNetmap cfgNetmap cfgControlService cfgControlService cfgObject cfgObject cfgNotifications cfgNotifications } // ReadCurrentNetMap reads network map which has been cached at the // latest epoch. Returns an error if value has not been cached yet. // // Provides interface for NetmapService server. func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error { val := c.netMap.Load() if val == nil { return errors.New("missing local network map") } val.(netmap.NetMap).WriteToV2(msg) return nil } type grpcServer struct { Listener net.Listener Server *grpc.Server Endpoint string } type cfgGRPC struct { // guard protects connections and handlers guard sync.RWMutex // servers must be protected with guard servers []grpcServer // handlers must be protected with guard handlers []func(e string, l net.Listener, s *grpc.Server) maxChunkSize uint64 maxAddrAmount uint64 reconnectTimeout time.Duration } func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) { c.guard.Lock() defer c.guard.Unlock() c.servers = append(c.servers, grpcServer{ Listener: l, Server: s, Endpoint: e, }) } func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) { c.guard.Lock() defer c.guard.Unlock() c.servers = append(c.servers, grpcServer{ Listener: l, Server: s, Endpoint: e, }) for _, h := range c.handlers { h(e, l, s) } } func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) { c.guard.Lock() defer c.guard.Unlock() for _, conn := range c.servers { handler(conn.Endpoint, conn.Listener, conn.Server) } c.handlers = append(c.handlers, handler) } func (c *cfgGRPC) dropConnection(endpoint string) { c.guard.Lock() defer c.guard.Unlock() pos := -1 for idx, srv := range c.servers { if srv.Endpoint == endpoint { pos = idx break } } if pos < 0 { return } c.servers[pos].Server.Stop() // closes listener c.servers = append(c.servers[0:pos], c.servers[pos+1:]...) } type cfgMorph struct { client *client.Client notaryEnabled bool // TTL of Sidechain cached values. Non-positive value disables caching. cacheTTL time.Duration proxyScriptHash neogoutil.Uint160 } type cfgAccounting struct { scriptHash neogoutil.Uint160 } type cfgContainer struct { scriptHash neogoutil.Uint160 parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers } type cfgFrostfsID struct { scriptHash neogoutil.Uint160 } type cfgNetmap struct { scriptHash neogoutil.Uint160 wrapper *nmClient.Client parsers map[event.Type]event.NotificationParser subscribers map[event.Type][]event.Handler workerPool util.WorkerPool // pool for asynchronous handlers state *networkState needBootstrap bool reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime startEpoch uint64 // epoch number when application is started } type cfgNodeInfo struct { // values from config localInfo netmap.NodeInfo } type cfgObject struct { getSvc *getsvc.Service cnrSource container.Source eaclSource container.EACLSource cfgAccessPolicyEngine cfgAccessPolicyEngine pool cfgObjectRoutines cfgLocalStorage cfgLocalStorage tombstoneLifetime uint64 skipSessionTokenIssuerVerification bool } type cfgNotifications struct { enabled bool nw notificationWriter defaultTopic string } type cfgLocalStorage struct { localStorage *engine.StorageEngine } type cfgAccessPolicyEngine struct { policyContractHash neogoutil.Uint160 accessPolicyEngine *accessPolicyEngine } type cfgObjectRoutines struct { putRemote *ants.Pool putLocal *ants.Pool replication *ants.Pool } type cfgControlService struct { server *grpc.Server } var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block") func initCfg(appCfg *config.Config) *cfg { c := &cfg{ applicationConfiguration: applicationConfiguration{ cmode: &atomic.Bool{}, }, } err := c.readConfig(appCfg) if err != nil { panic(fmt.Errorf("config reading: %w", err)) } key := nodeconfig.Key(appCfg) relayOnly := nodeconfig.Relay(appCfg) netState := newNetworkState() c.shared = initShared(appCfg, key, netState, relayOnly) netState.metrics = c.metricsCollector logPrm, err := c.loggerPrm() fatalOnErr(err) logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook() log, err := logger.NewLogger(logPrm) fatalOnErr(err) if loggerconfig.ToLokiConfig(appCfg).Enabled { log.Logger = log.Logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(appCfg)) return lokiCore })) } c.internals = initInternals(appCfg, log) c.cfgAccounting = cfgAccounting{ scriptHash: contractsconfig.Balance(appCfg), } c.cfgContainer = initContainer(appCfg) c.cfgFrostfsID = initFrostfsID(appCfg) c.cfgNetmap = initNetmap(appCfg, netState, relayOnly) c.cfgGRPC = initCfgGRPC() c.cfgMorph = cfgMorph{ proxyScriptHash: contractsconfig.Proxy(appCfg), } c.cfgObject = initCfgObject(appCfg) user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey) c.onShutdown(c.clientCache.CloseAll) // clean up connections c.onShutdown(c.bgClientCache.CloseAll) // clean up connections c.onShutdown(c.putClientCache.CloseAll) // clean up connections c.onShutdown(func() { _ = c.persistate.Close() }) return c } func initInternals(appCfg *config.Config, log *logger.Logger) internals { var healthStatus atomic.Int32 healthStatus.Store(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)) return internals{ done: make(chan struct{}), appCfg: appCfg, internalErr: make(chan error), log: log, apiVersion: version.Current(), healthStatus: &healthStatus, sdNotify: initSdNotify(appCfg), } } func initSdNotify(appCfg *config.Config) bool { if config.BoolSafe(appCfg.Sub("systemdnotify"), "enabled") { fatalOnErr(sdnotify.InitSocket()) return true } return false } func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState, relayOnly bool) shared { var netAddr network.AddressGroup if !relayOnly { netAddr = nodeconfig.BootstrapAddresses(appCfg) } persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path()) fatalOnErr(err) cacheOpts := cache.ClientCacheOpts{ DialTimeout: apiclientconfig.DialTimeout(appCfg), StreamTimeout: apiclientconfig.StreamTimeout(appCfg), Key: &key.PrivateKey, AllowExternal: apiclientconfig.AllowExternal(appCfg), ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg), } return shared{ key: key, binPublicKey: key.PublicKey().Bytes(), localAddr: netAddr, respSvc: response.NewService(netState), clientCache: cache.NewSDKClientCache(cacheOpts), bgClientCache: cache.NewSDKClientCache(cacheOpts), putClientCache: cache.NewSDKClientCache(cacheOpts), persistate: persistate, metricsCollector: metrics.NewNodeMetrics(), } } func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap { netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) var reBootstrapTurnedOff atomic.Bool reBootstrapTurnedOff.Store(relayOnly) return cfgNetmap{ scriptHash: contractsconfig.Netmap(appCfg), state: netState, workerPool: netmapWorkerPool, needBootstrap: !relayOnly, reBoostrapTurnedOff: &reBootstrapTurnedOff, } } func initContainer(appCfg *config.Config) cfgContainer { containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize) fatalOnErr(err) return cfgContainer{ scriptHash: contractsconfig.Container(appCfg), workerPool: containerWorkerPool, } } func initFrostfsID(appCfg *config.Config) cfgFrostfsID { return cfgFrostfsID{ scriptHash: contractsconfig.FrostfsID(appCfg), } } func initCfgGRPC() cfgGRPC { maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes return cfgGRPC{ maxChunkSize: maxChunkSize, maxAddrAmount: maxAddrAmount, } } func initCfgObject(appCfg *config.Config) cfgObject { return cfgObject{ pool: initObjectPool(appCfg), tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg), skipSessionTokenIssuerVerification: objectconfig.Put(appCfg).SkipSessionTokenIssuerVerification(), } } func (c *cfg) engineOpts() []engine.Option { var opts []engine.Option opts = append(opts, engine.WithShardPoolSize(c.EngineCfg.shardPoolSize), engine.WithErrorThreshold(c.EngineCfg.errorThreshold), engine.WithLogger(c.log), engine.WithLowMemoryConsumption(c.EngineCfg.lowMem), engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers), ) if c.metricsCollector != nil { opts = append(opts, engine.WithMetrics(c.metricsCollector.Engine())) } return opts } type shardOptsWithID struct { configID string shOpts []shard.Option } func (c *cfg) shardOpts() []shardOptsWithID { shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards)) for _, shCfg := range c.EngineCfg.shards { shards = append(shards, c.getShardOpts(shCfg)) } return shards } func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option { var writeCacheOpts []writecache.Option if wcRead := shCfg.writecacheCfg; wcRead.enabled { writeCacheOpts = append(writeCacheOpts, writecache.WithPath(wcRead.path), writecache.WithMaxBatchSize(wcRead.maxBatchSize), writecache.WithMaxBatchDelay(wcRead.maxBatchDelay), writecache.WithMaxObjectSize(wcRead.maxObjSize), writecache.WithSmallObjectSize(wcRead.smallObjectSize), writecache.WithFlushWorkersCount(wcRead.flushWorkerCount), writecache.WithMaxCacheSize(wcRead.sizeLimit), writecache.WithNoSync(wcRead.noSync), writecache.WithLogger(c.log), ) } return writeCacheOpts } func (c *cfg) getPiloramaOpts(shCfg shardCfg) []pilorama.Option { var piloramaOpts []pilorama.Option if prRead := shCfg.piloramaCfg; prRead.enabled { piloramaOpts = append(piloramaOpts, pilorama.WithPath(prRead.path), pilorama.WithPerm(prRead.perm), pilorama.WithNoSync(prRead.noSync), pilorama.WithMaxBatchSize(prRead.maxBatchSize), pilorama.WithMaxBatchDelay(prRead.maxBatchDelay), ) if c.metricsCollector != nil { piloramaOpts = append(piloramaOpts, pilorama.WithMetrics(lsmetrics.NewPiloramaMetrics(c.metricsCollector.PiloramaMetrics()))) } } return piloramaOpts } func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage { var ss []blobstor.SubStorage for _, sRead := range shCfg.subStorages { switch sRead.typ { case blobovniczatree.Type: blobTreeOpts := []blobovniczatree.Option{ blobovniczatree.WithRootPath(sRead.path), blobovniczatree.WithPermissions(sRead.perm), blobovniczatree.WithBlobovniczaSize(sRead.size), blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth), blobovniczatree.WithBlobovniczaShallowWidth(sRead.width), blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth), blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize), blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount), blobovniczatree.WithInitInAdvance(sRead.initInAdvance), blobovniczatree.WithWaitBeforeDropDB(sRead.rebuildDropTimeout), blobovniczatree.WithLogger(c.log), blobovniczatree.WithObjectSizeLimit(shCfg.smallSizeObjectLimit), } if c.metricsCollector != nil { blobTreeOpts = append(blobTreeOpts, blobovniczatree.WithMetrics( lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobobvnizcaTreeMetrics()), ), ) } ss = append(ss, blobstor.SubStorage{ Storage: blobovniczatree.NewBlobovniczaTree(blobTreeOpts...), Policy: func(_ *objectSDK.Object, data []byte) bool { return uint64(len(data)) < shCfg.smallSizeObjectLimit }, }) case fstree.Type: fstreeOpts := []fstree.Option{ fstree.WithPath(sRead.path), fstree.WithPerm(sRead.perm), fstree.WithDepth(sRead.depth), fstree.WithNoSync(sRead.noSync), fstree.WithLogger(c.log), } if c.metricsCollector != nil { fstreeOpts = append(fstreeOpts, fstree.WithMetrics( lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()), ), ) } ss = append(ss, blobstor.SubStorage{ Storage: fstree.New(fstreeOpts...), Policy: func(_ *objectSDK.Object, _ []byte) bool { return true }, }) default: // should never happen, that has already // been handled: when the config was read } } return ss } func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID { writeCacheOpts := c.getWriteCacheOpts(shCfg) piloramaOpts := c.getPiloramaOpts(shCfg) ss := c.getSubstorageOpts(shCfg) blobstoreOpts := []blobstor.Option{ blobstor.WithCompressObjects(shCfg.compress), blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType), blobstor.WithCompressibilityEstimate(shCfg.estimateCompressibility), blobstor.WithCompressibilityEstimateThreshold(shCfg.estimateCompressibilityThreshold), blobstor.WithStorages(ss), blobstor.WithLogger(c.log), } if c.metricsCollector != nil { blobstoreOpts = append(blobstoreOpts, blobstor.WithMetrics(lsmetrics.NewBlobstoreMetrics(c.metricsCollector.Blobstore()))) } mbOptions := []meta.Option{ meta.WithPath(shCfg.metaCfg.path), meta.WithPermissions(shCfg.metaCfg.perm), meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize), meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay), meta.WithBoltDBOptions(&bbolt.Options{ Timeout: 100 * time.Millisecond, }), meta.WithLogger(c.log), meta.WithEpochState(c.cfgNetmap.state), } if c.metricsCollector != nil { mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics()))) } var sh shardOptsWithID sh.configID = shCfg.id() sh.shOpts = []shard.Option{ shard.WithLogger(c.log), shard.WithRefillMetabase(shCfg.refillMetabase), shard.WithMode(shCfg.mode), shard.WithBlobStorOptions(blobstoreOpts...), shard.WithMetaBaseOptions(mbOptions...), shard.WithPiloramaOptions(piloramaOpts...), shard.WithWriteCache(shCfg.writecacheCfg.enabled), shard.WithWriteCacheOptions(writeCacheOpts), shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize), shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval), shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize), shard.WithExpiredCollectorWorkerCount(shCfg.gcCfg.expiredCollectorWorkerCount), shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool { pool, err := ants.NewPool(sz) fatalOnErr(err) return pool }), } return sh } func (c *cfg) loggerPrm() (*logger.Prm, error) { // check if it has been inited before if c.dynamicConfiguration.logger == nil { c.dynamicConfiguration.logger = new(logger.Prm) } // (re)init read configuration err := c.dynamicConfiguration.logger.SetLevelString(c.LoggerCfg.level) if err != nil { // not expected since validation should be performed before panic("incorrect log level format: " + c.LoggerCfg.level) } err = c.dynamicConfiguration.logger.SetDestination(c.LoggerCfg.destination) if err != nil { // not expected since validation should be performed before panic("incorrect log destination format: " + c.LoggerCfg.destination) } return c.dynamicConfiguration.logger, nil } func (c *cfg) LocalAddress() network.AddressGroup { return c.localAddr } func initLocalStorage(ctx context.Context, c *cfg) { ls := engine.New(c.engineOpts()...) addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber()) }) // allocate memory for the service; // service will be created later c.cfgObject.getSvc = new(getsvc.Service) var shardsAttached int for _, optsWithMeta := range c.shardOpts() { id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...) if err != nil { c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err)) } else { shardsAttached++ c.log.Info(logs.FrostFSNodeShardAttachedToEngine, zap.Stringer("id", id)) } } if shardsAttached == 0 { fatalOnErr(engineconfig.ErrNoShardConfigured) } c.cfgObject.cfgLocalStorage.localStorage = ls c.onShutdown(func() { c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine) err := ls.Close(context.Background()) if err != nil { c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure, zap.String("error", err.Error()), ) } else { c.log.Info(logs.FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully) } }) } func initAccessPolicyEngine(_ context.Context, c *cfg) { var localOverrideDB chainbase.LocalOverrideDatabase if nodeconfig.PersistentPolicyRules(c.appCfg).Path() == "" { c.log.Warn(logs.FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed) localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase() } else { localOverrideDB = chainbase.NewBoltLocalOverrideDatabase( chainbase.WithLogger(c.log), chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()), chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()), chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()), ) } var morphRuleStorage policy_engine.MorphRuleChainStorageReader morphRuleStorage = policy_client.NewContractStorage( client.NewSwitchRPCGuardedActor(c.cfgMorph.client), c.cfgObject.cfgAccessPolicyEngine.policyContractHash) cacheSize := morphconfig.APEChainCacheSize(c.appCfg) if cacheSize > 0 { morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL) } ape := newAccessPolicyEngine(morphRuleStorage, localOverrideDB) c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine = ape c.onShutdown(func() { if err := ape.LocalOverrideDatabaseCore().Close(); err != nil { c.log.Warn(logs.FrostFSNodeAccessPolicyEngineClosingFailure, zap.Error(err), ) } }) } func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) { var err error optNonBlocking := ants.WithNonblocking(true) putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote() pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking) fatalOnErr(err) putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal() pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking) fatalOnErr(err) replicatorPoolSize := replicatorconfig.PoolSize(cfg) if replicatorPoolSize <= 0 { replicatorPoolSize = putRemoteCapacity } pool.replication, err = ants.NewPool(replicatorPoolSize) fatalOnErr(err) return pool } func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) { var res netmapV2.NodeInfo ni, ok := c.cfgNetmap.state.getNodeInfo() if ok { ni.WriteToV2(&res) } else { c.cfgNodeInfo.localInfo.WriteToV2(&res) } return &res, nil } // handleLocalNodeInfo rewrites local node info from the FrostFS network map. // Called with nil when storage node is outside the FrostFS network map // (before entering the network and after leaving it). func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) { c.cfgNetmap.state.setNodeInfo(ni) } // bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract // with the binary-encoded information from the current node's configuration. // The state is set using the provided setter which MUST NOT be nil. func (c *cfg) bootstrapWithState(stateSetter func(*netmap.NodeInfo)) error { ni := c.cfgNodeInfo.localInfo stateSetter(&ni) prm := nmClient.AddPeerPrm{} prm.SetNodeInfo(ni) return c.cfgNetmap.wrapper.AddPeer(prm) } // bootstrapOnline calls cfg.bootstrapWithState with "online" state. func bootstrapOnline(c *cfg) error { return c.bootstrapWithState((*netmap.NodeInfo).SetOnline) } // bootstrap calls bootstrapWithState with: // - "maintenance" state if maintenance is in progress on the current node // - "online", otherwise func (c *cfg) bootstrap() error { // switch to online except when under maintenance st := c.cfgNetmap.state.controlNetmapStatus() if st == control.NetmapStatus_MAINTENANCE { c.log.Info(logs.FrostFSNodeBootstrappingWithTheMaintenanceState) return c.bootstrapWithState((*netmap.NodeInfo).SetMaintenance) } c.log.Info(logs.FrostFSNodeBootstrappingWithOnlineState, zap.Stringer("previous", st), ) return bootstrapOnline(c) } // needBootstrap checks if local node should be registered in network on bootup. func (c *cfg) needBootstrap() bool { return c.cfgNetmap.needBootstrap } type dCmp struct { name string reloadFunc func() error } func (c *cfg) signalWatcher(ctx context.Context) { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) for { select { case sig := <-ch: switch sig { case syscall.SIGHUP: c.reloadConfig(ctx) case syscall.SIGTERM, syscall.SIGINT: c.log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping) c.shutdown() c.log.Info(logs.FrostFSNodeTerminationSignalProcessingIsComplete) return } case err := <-c.internalErr: // internal application error c.log.Warn(logs.FrostFSNodeInternalApplicationError, zap.String("message", err.Error())) c.shutdown() c.log.Info(logs.FrostFSNodeInternalErrorProcessingIsComplete) return } } } func (c *cfg) reloadConfig(ctx context.Context) { c.log.Info(logs.FrostFSNodeSIGHUPHasBeenReceivedRereadingConfiguration) if !c.compareAndSwapHealthStatus(control.HealthStatus_READY, control.HealthStatus_RECONFIGURING) { c.log.Info(logs.FrostFSNodeSIGHUPSkip) return } defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY) err := c.reloadAppConfig() if err != nil { c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err)) return } // all the components are expected to support // Logger's dynamic reconfiguration approach var components []dCmp // Logger logPrm, err := c.loggerPrm() if err != nil { c.log.Error(logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err)) return } components = append(components, dCmp{"logger", logPrm.Reload}) components = append(components, dCmp{"runtime", func() error { setRuntimeParameters(c) return nil }}) components = append(components, dCmp{"pools", c.reloadPools}) components = append(components, dCmp{"tracing", func() error { updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg)) if updated { c.log.Info(logs.FrostFSNodeTracingConfigationUpdated) } return err }}) if cmp, updated := metricsComponent(c); updated { if cmp.enabled { cmp.preReload = enableMetricsSvc } else { cmp.preReload = disableMetricsSvc } components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }}) } if cmp, updated := pprofComponent(c); updated { components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }}) } // Storage Engine var rcfg engine.ReConfiguration for _, optsWithID := range c.shardOpts() { rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))) } err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg) if err != nil { c.log.Error(logs.FrostFSNodeStorageEngineConfigurationUpdate, zap.Error(err)) return } for _, component := range components { err = component.reloadFunc() if err != nil { c.log.Error(logs.FrostFSNodeUpdatedConfigurationApplying, zap.String("component", component.name), zap.Error(err)) } } c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully) } func (c *cfg) reloadPools() error { newSize := objectconfig.Put(c.appCfg).PoolSizeLocal() c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size") newSize = objectconfig.Put(c.appCfg).PoolSizeRemote() c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size") newSize = replicatorconfig.PoolSize(c.appCfg) c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size") return nil } func (c *cfg) reloadPool(p *ants.Pool, newSize int, name string) { oldSize := p.Cap() if oldSize != newSize { c.log.Info(logs.FrostFSNodePoolConfigurationUpdate, zap.String("field", name), zap.Int("old", oldSize), zap.Int("new", newSize)) p.Tune(newSize) } } func (c *cfg) reloadAppConfig() error { unlock := c.LockAppConfigExclusive() defer unlock() return c.readConfig(c.appCfg) } func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker { var tssPrm tsourse.TombstoneSourcePrm tssPrm.SetGetService(c.cfgObject.getSvc) tombstoneSrc := tsourse.NewSource(tssPrm) tombstoneSource := tombstone.NewChecker( tombstone.WithLogger(c.log), tombstone.WithTombstoneSource(tombstoneSrc), ) return tombstoneSource } func (c *cfg) shutdown() { old := c.swapHealthStatus(control.HealthStatus_SHUTTING_DOWN) if old == control.HealthStatus_SHUTTING_DOWN { c.log.Info(logs.FrostFSNodeShutdownSkip) return } if old == control.HealthStatus_STARTING { c.log.Warn(logs.FrostFSNodeShutdownWhenNotReady) } c.ctxCancel() close(c.done) for i := range c.closers { c.closers[len(c.closers)-1-i].fn() } }