frostfs-node/cmd/frostfs-node/config.go
Dmitrii Stepanov b01f05c951
All checks were successful
Vulncheck / Vulncheck (push) Successful in 1m15s
Pre-commit hooks / Pre-commit (push) Successful in 1m40s
Build / Build Components (push) Successful in 1m46s
Tests and linters / Run gofumpt (push) Successful in 3m36s
Tests and linters / Lint (push) Successful in 3m55s
Tests and linters / Staticcheck (push) Successful in 3m59s
Tests and linters / Tests (push) Successful in 4m19s
Tests and linters / Tests with -race (push) Successful in 4m33s
Tests and linters / gopls check (push) Successful in 4m40s
OCI image / Build container images (push) Successful in 5m17s
[#1689] shard: Use single qos.Limiter instance for shard and writecache
Before fix shard and writecache used the same instance of qos.Limiter.
In case of SIGHUP signal shard was closing qos.Limiter, but didn't
update writecache's pointer.

Now shard uses atomic pointer to qos.Limiter and shares it with writecache.
On SIGHUP shard updates atomic pointer value and closes old qos.Limiter.

Change-Id: Ic2ab62441d3872e71c5771f54d070e0ca48fe375
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2025-05-06 12:46:48 +03:00

1479 lines
44 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"io/fs"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/audit"
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/multinet"
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
treeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/qos"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/compression"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
lsmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
netmap2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-qos/limiting"
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
policy_engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
policy_client "git.frostfs.info/TrueCloudLab/policy-engine/pkg/morph/policy"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/panjf2000/ants/v2"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const addressSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding
const maxMsgSize = 4 << 20 // transport msg limit 4 MiB
// capacity of the pools of the morph notification handlers
// for each contract listener.
const notificationHandlerPoolSize = 10
// applicationConfiguration reads and stores component-specific configuration
// values. It should not store any application helpers structs (pointers to shared
// structs).
// It must not be used concurrently.
type applicationConfiguration struct {
// _read indicated whether a config
// has already been read
_read bool
LoggerCfg struct {
level string
destination string
timestamp bool
options []zap.Option
tags [][]string
}
ObjectCfg struct {
tombstoneLifetime uint64
priorityMetrics []placement.Metric
}
EngineCfg struct {
errorThreshold uint32
shards []shardCfg
lowMem bool
}
// if need to run node in compatibility with other versions mode
cmode *atomic.Bool
}
type shardCfg struct {
compression compression.Config
smallSizeObjectLimit uint64
refillMetabase bool
refillMetabaseWorkersCount int
mode shardmode.Mode
limiter qos.Limiter
metaCfg struct {
path string
perm fs.FileMode
maxBatchSize int
maxBatchDelay time.Duration
}
subStorages []subStorageCfg
gcCfg struct {
removerBatchSize int
removerSleepInterval time.Duration
expiredCollectorBatchSize int
expiredCollectorWorkerCount int
}
writecacheCfg struct {
enabled bool
path string
maxObjSize uint64
flushWorkerCount int
sizeLimit uint64
countLimit uint64
noSync bool
flushSizeLimit uint64
}
piloramaCfg struct {
enabled bool
path string
perm fs.FileMode
noSync bool
maxBatchSize int
maxBatchDelay time.Duration
}
}
// id returns persistent id of a shard. It is different from the ID used in runtime
// and is primarily used to identify shards in the configuration.
func (c *shardCfg) id() string {
// This calculation should be kept in sync with
// pkg/local_object_storage/engine/control.go file.
var sb strings.Builder
for i := range c.subStorages {
sb.WriteString(filepath.Clean(c.subStorages[i].path))
}
return sb.String()
}
type subStorageCfg struct {
// common for all storages
typ string
path string
perm fs.FileMode
depth uint64
noSync bool
// blobovnicza-specific
size uint64
width uint64
openedCacheSize int
initWorkerCount int
rebuildDropTimeout time.Duration
openedCacheTTL time.Duration
openedCacheExpInterval time.Duration
}
// readConfig fills applicationConfiguration with raw configuration values
// not modifying them.
func (a *applicationConfiguration) readConfig(c *config.Config) error {
if a._read {
err := c.Reload()
if err != nil {
return fmt.Errorf("could not reload configuration: %w", err)
}
err = validateConfig(c)
if err != nil {
return fmt.Errorf("configuration's validation: %w", err)
}
// clear if it is rereading
cmode := a.cmode
*a = applicationConfiguration{}
a.cmode = cmode
}
a._read = true
a.cmode.Store(nodeconfig.CompatibilityMode(c))
// Logger
a.LoggerCfg.level = loggerconfig.Level(c)
a.LoggerCfg.destination = loggerconfig.Destination(c)
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
var opts []zap.Option
if loggerconfig.ToLokiConfig(c).Enabled {
opts = []zap.Option{zap.WrapCore(func(core zapcore.Core) zapcore.Core {
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(c))
return lokiCore
})}
}
a.LoggerCfg.options = opts
a.LoggerCfg.tags = loggerconfig.Tags(c)
// Object
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
locodeDBPath := nodeconfig.LocodeDBPath(c)
parser, err := placement.NewMetricsParser(locodeDBPath)
if err != nil {
return fmt.Errorf("metrics parser creation: %w", err)
}
m, err := parser.ParseMetrics(objectconfig.Get(c).Priority())
if err != nil {
return fmt.Errorf("parse metrics: %w", err)
}
a.ObjectCfg.priorityMetrics = m
// Storage Engine
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
func (a *applicationConfiguration) updateShardConfig(c *config.Config, source *shardconfig.Config) error {
var target shardCfg
target.refillMetabase = source.RefillMetabase()
target.refillMetabaseWorkersCount = source.RefillMetabaseWorkersCount()
target.mode = source.Mode()
target.compression = source.Compression()
target.smallSizeObjectLimit = source.SmallSizeLimit()
a.setShardWriteCacheConfig(&target, source)
a.setShardPiloramaConfig(c, &target, source)
if err := a.setShardStorageConfig(&target, source); err != nil {
return err
}
a.setMetabaseConfig(&target, source)
a.setGCConfig(&target, source)
if err := a.setLimiter(&target, source); err != nil {
return err
}
a.EngineCfg.shards = append(a.EngineCfg.shards, target)
return nil
}
func (a *applicationConfiguration) setShardWriteCacheConfig(target *shardCfg, source *shardconfig.Config) {
writeCacheCfg := source.WriteCache()
if writeCacheCfg.Enabled() {
wc := &target.writecacheCfg
wc.enabled = true
wc.path = writeCacheCfg.Path()
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
wc.sizeLimit = writeCacheCfg.SizeLimit()
wc.countLimit = writeCacheCfg.CountLimit()
wc.noSync = writeCacheCfg.NoSync()
wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize()
}
}
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, target *shardCfg, source *shardconfig.Config) {
if config.BoolSafe(c.Sub("tree"), "enabled") {
piloramaCfg := source.Pilorama()
pr := &target.piloramaCfg
pr.enabled = true
pr.path = piloramaCfg.Path()
pr.perm = piloramaCfg.Perm()
pr.noSync = piloramaCfg.NoSync()
pr.maxBatchSize = piloramaCfg.MaxBatchSize()
pr.maxBatchDelay = piloramaCfg.MaxBatchDelay()
}
}
func (a *applicationConfiguration) setShardStorageConfig(target *shardCfg, source *shardconfig.Config) error {
blobStorCfg := source.BlobStor()
storagesCfg := blobStorCfg.Storages()
ss := make([]subStorageCfg, 0, len(storagesCfg))
for i := range storagesCfg {
var sCfg subStorageCfg
sCfg.typ = storagesCfg[i].Type()
sCfg.path = storagesCfg[i].Path()
sCfg.perm = storagesCfg[i].Perm()
switch storagesCfg[i].Type() {
case blobovniczatree.Type:
sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i]))
sCfg.size = sub.Size()
sCfg.depth = sub.ShallowDepth()
sCfg.width = sub.ShallowWidth()
sCfg.openedCacheSize = sub.OpenedCacheSize()
sCfg.openedCacheTTL = sub.OpenedCacheTTL()
sCfg.openedCacheExpInterval = sub.OpenedCacheExpInterval()
sCfg.initWorkerCount = sub.InitWorkerCount()
sCfg.rebuildDropTimeout = sub.RebuildDropTimeout()
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth()
sCfg.noSync = sub.NoSync()
default:
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type())
}
ss = append(ss, sCfg)
}
target.subStorages = ss
return nil
}
func (a *applicationConfiguration) setMetabaseConfig(target *shardCfg, source *shardconfig.Config) {
metabaseCfg := source.Metabase()
m := &target.metaCfg
m.path = metabaseCfg.Path()
m.perm = metabaseCfg.BoltDB().Perm()
m.maxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay()
m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize()
}
func (a *applicationConfiguration) setGCConfig(target *shardCfg, source *shardconfig.Config) {
gcCfg := source.GC()
target.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
target.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
target.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
target.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
}
func (a *applicationConfiguration) setLimiter(target *shardCfg, source *shardconfig.Config) error {
limitsConfig := source.Limits().ToConfig()
limiter, err := qos.NewLimiter(limitsConfig)
if err != nil {
return err
}
target.limiter = limiter
return nil
}
// internals contains application-specific internals that are created
// on application startup and are shared b/w the components during
// the application life cycle.
// It should not contain any read configuration values, component-specific
// helpers and fields.
type internals struct {
done chan struct{}
ctxCancel func()
internalErr chan error // channel for internal application errors at runtime
appCfg *config.Config
log *logger.Logger
wg sync.WaitGroup
workers []worker
closers []closer
apiVersion version.Version
healthStatus *atomic.Int32
// is node under maintenance
isMaintenance atomic.Bool
audit *atomic.Bool
sdNotify bool
}
// starts node's maintenance.
func (c *cfg) startMaintenance(ctx context.Context) {
c.isMaintenance.Store(true)
c.cfgNetmap.state.setControlNetmapStatus(control.NetmapStatus_MAINTENANCE)
c.log.Info(ctx, logs.FrostFSNodeStartedLocalNodesMaintenance)
}
// stops node's maintenance.
func (c *internals) stopMaintenance(ctx context.Context) {
if c.isMaintenance.CompareAndSwap(true, false) {
c.log.Info(ctx, logs.FrostFSNodeStoppedLocalNodesMaintenance)
}
}
// IsMaintenance checks if storage node is under maintenance.
//
// Provides util.NodeState to Object service.
func (c *internals) IsMaintenance() bool {
return c.isMaintenance.Load()
}
// shared contains component-specific structs/helpers that should
// be shared during initialization of the application.
type shared struct {
privateTokenStore sessionStorage
persistate *state.PersistentStorage
clientCache *cache.ClientCache
bgClientCache *cache.ClientCache
putClientCache *cache.ClientCache
localAddr network.AddressGroup
key *keys.PrivateKey
binPublicKey []byte
ownerIDFromKey user.ID // user ID calculated from key
// current network map
netMap atomic.Value // type netmap.NetMap
netMapSource netmapCore.Source
cnrClient *containerClient.Client
frostfsidClient frostfsidcore.SubjectProvider
respSvc *response.Service
replicator *replicator.Replicator
treeService *tree.Service
metricsCollector *metrics.NodeMetrics
metricsSvc *objectService.MetricCollector
dialerSource *internalNet.DialerSource
}
// dynamicConfiguration stores parameters of the
// components that supports runtime reconfigurations.
type dynamicConfiguration struct {
pprof *httpComponent
metrics *httpComponent
}
type appConfigGuard struct {
mtx sync.RWMutex
}
func (g *appConfigGuard) LockAppConfigShared() func() {
g.mtx.RLock()
return func() { g.mtx.RUnlock() }
}
func (g *appConfigGuard) LockAppConfigExclusive() func() {
g.mtx.Lock()
return func() { g.mtx.Unlock() }
}
type cfg struct {
applicationConfiguration
internals
shared
dynamicConfiguration
appConfigGuard
// configuration of the internal
// services
cfgGRPC cfgGRPC
cfgMorph cfgMorph
cfgAccounting cfgAccounting
cfgContainer cfgContainer
cfgFrostfsID cfgFrostfsID
cfgNodeInfo cfgNodeInfo
cfgNetmap cfgNetmap
cfgControlService cfgControlService
cfgObject cfgObject
cfgQoSService cfgQoSService
}
// ReadCurrentNetMap reads network map which has been cached at the
// latest epoch. Returns an error if value has not been cached yet.
//
// Provides interface for NetmapService server.
func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
val := c.netMap.Load()
if val == nil {
return errors.New("missing local network map")
}
val.(netmap.NetMap).WriteToV2(msg)
return nil
}
type grpcServer struct {
Listener net.Listener
Server *grpc.Server
Endpoint string
}
type cfgGRPC struct {
// guard protects connections and handlers
guard sync.RWMutex
// servers must be protected with guard
servers []grpcServer
// handlers must be protected with guard
handlers []func(e string, l net.Listener, s *grpc.Server)
maxChunkSize uint64
maxAddrAmount uint64
reconnectTimeout time.Duration
limiter atomic.Pointer[limiting.SemaphoreLimiter]
}
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
}
func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
for _, h := range c.handlers {
h(e, l, s)
}
}
func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) {
c.guard.Lock()
defer c.guard.Unlock()
for _, conn := range c.servers {
handler(conn.Endpoint, conn.Listener, conn.Server)
}
c.handlers = append(c.handlers, handler)
}
func (c *cfgGRPC) dropConnection(endpoint string) {
c.guard.Lock()
defer c.guard.Unlock()
pos := -1
for idx, srv := range c.servers {
if srv.Endpoint == endpoint {
pos = idx
break
}
}
if pos < 0 {
return
}
c.servers[pos].Server.Stop() // closes listener
c.servers = append(c.servers[0:pos], c.servers[pos+1:]...)
}
type cfgMorph struct {
initialized bool
guard sync.Mutex
client *client.Client
// TTL of Sidechain cached values. Non-positive value disables caching.
cacheTTL time.Duration
containerCacheSize uint32
proxyScriptHash neogoutil.Uint160
}
type cfgAccounting struct {
scriptHash neogoutil.Uint160
}
type cfgContainer struct {
scriptHash neogoutil.Uint160
parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers
containerBatchSize uint32
}
type cfgFrostfsID struct {
scriptHash neogoutil.Uint160
}
type cfgNetmap struct {
scriptHash neogoutil.Uint160
wrapper *nmClient.Client
parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers
state *networkState
reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime
}
type cfgNodeInfo struct {
// values from config
localInfo netmap.NodeInfo
}
type cfgObject struct {
getSvc *getsvc.Service
cnrSource container.Source
cfgAccessPolicyEngine cfgAccessPolicyEngine
pool cfgObjectRoutines
cfgLocalStorage cfgLocalStorage
tombstoneLifetime *atomic.Uint64
skipSessionTokenIssuerVerification bool
}
type cfgLocalStorage struct {
localStorage *engine.StorageEngine
}
type cfgAccessPolicyEngine struct {
policyContractHash neogoutil.Uint160
accessPolicyEngine *accessPolicyEngine
}
type cfgObjectRoutines struct {
replication *ants.Pool
}
type cfgControlService struct {
server *grpc.Server
}
var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
func initCfg(appCfg *config.Config) *cfg {
c := &cfg{
applicationConfiguration: applicationConfiguration{
cmode: &atomic.Bool{},
},
}
err := c.readConfig(appCfg)
if err != nil {
panic(fmt.Errorf("config reading: %w", err))
}
key := nodeconfig.Key(appCfg)
netState := newNetworkState()
c.shared = initShared(appCfg, key, netState)
netState.metrics = c.metricsCollector
logPrm, err := c.loggerPrm()
fatalOnErr(err)
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
log, err := logger.NewLogger(logPrm)
fatalOnErr(err)
logger.UpdateLevelForTags(logPrm)
c.internals = initInternals(appCfg, log)
c.cfgAccounting = cfgAccounting{
scriptHash: contractsconfig.Balance(appCfg),
}
c.cfgContainer = initContainer(appCfg)
c.cfgFrostfsID = initFrostfsID(appCfg)
c.cfgNetmap = initNetmap(appCfg, netState)
c.cfgGRPC = initCfgGRPC()
c.cfgMorph = cfgMorph{
proxyScriptHash: contractsconfig.Proxy(appCfg),
}
c.cfgObject = initCfgObject(appCfg)
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
c.onShutdown(c.clientCache.CloseAll) // clean up connections
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
c.onShutdown(c.putClientCache.CloseAll) // clean up connections
c.onShutdown(func() { _ = c.persistate.Close() })
return c
}
func initInternals(appCfg *config.Config, log *logger.Logger) internals {
var healthStatus atomic.Int32
healthStatus.Store(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED))
var auditRequests atomic.Bool
auditRequests.Store(audit.Enabled(appCfg))
return internals{
done: make(chan struct{}),
appCfg: appCfg,
internalErr: make(chan error),
log: log,
apiVersion: version.Current(),
healthStatus: &healthStatus,
sdNotify: initSdNotify(appCfg),
audit: &auditRequests,
}
}
func initSdNotify(appCfg *config.Config) bool {
if config.BoolSafe(appCfg.Sub("systemdnotify"), "enabled") {
fatalOnErr(sdnotify.InitSocket())
return true
}
return false
}
func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState) shared {
netAddr := nodeconfig.BootstrapAddresses(appCfg)
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
fatalOnErr(err)
nodeMetrics := metrics.NewNodeMetrics()
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg, nodeMetrics.MultinetMetrics()))
fatalOnErr(err)
cacheOpts := cache.ClientCacheOpts{
DialTimeout: apiclientconfig.DialTimeout(appCfg),
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
Key: &key.PrivateKey,
AllowExternal: apiclientconfig.AllowExternal(appCfg),
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
DialerSource: ds,
}
return shared{
key: key,
binPublicKey: key.PublicKey().Bytes(),
localAddr: netAddr,
respSvc: response.NewService(netState),
clientCache: cache.NewSDKClientCache(cacheOpts),
bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts),
persistate: persistate,
metricsCollector: nodeMetrics,
dialerSource: ds,
}
}
func internalNetConfig(appCfg *config.Config, m metrics.MultinetMetrics) internalNet.Config {
result := internalNet.Config{
Enabled: multinet.Enabled(appCfg),
Balancer: multinet.Balancer(appCfg),
Restrict: multinet.Restrict(appCfg),
FallbackDelay: multinet.FallbackDelay(appCfg),
Metrics: m,
}
sn := multinet.Subnets(appCfg)
for _, s := range sn {
result.Subnets = append(result.Subnets, internalNet.Subnet{
Prefix: s.Mask,
SourceIPs: s.SourceIPs,
})
}
return result
}
func initNetmap(appCfg *config.Config, netState *networkState) cfgNetmap {
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
return cfgNetmap{
scriptHash: contractsconfig.Netmap(appCfg),
state: netState,
workerPool: netmapWorkerPool,
reBoostrapTurnedOff: &atomic.Bool{},
}
}
func initContainer(appCfg *config.Config) cfgContainer {
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
return cfgContainer{
scriptHash: contractsconfig.Container(appCfg),
workerPool: containerWorkerPool,
}
}
func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
return cfgFrostfsID{
scriptHash: contractsconfig.FrostfsID(appCfg),
}
}
func initCfgGRPC() (cfg cfgGRPC) {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := maxChunkSize / addressSize // each address is about 72 bytes
cfg.maxChunkSize = maxChunkSize
cfg.maxAddrAmount = maxAddrAmount
return
}
func initCfgObject(appCfg *config.Config) cfgObject {
var tsLifetime atomic.Uint64
tsLifetime.Store(objectconfig.TombstoneLifetime(appCfg))
return cfgObject{
pool: initObjectPool(appCfg),
tombstoneLifetime: &tsLifetime,
skipSessionTokenIssuerVerification: objectconfig.Put(appCfg).SkipSessionTokenIssuerVerification(),
}
}
func (c *cfg) engineOpts() []engine.Option {
var opts []engine.Option
opts = append(opts,
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log.WithTag(logger.TagEngine)),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
)
if c.metricsCollector != nil {
opts = append(opts, engine.WithMetrics(c.metricsCollector.Engine()))
}
return opts
}
type shardOptsWithID struct {
configID string
shOpts []shard.Option
}
func (c *cfg) shardOpts(ctx context.Context) []shardOptsWithID {
shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards))
for _, shCfg := range c.EngineCfg.shards {
shards = append(shards, c.getShardOpts(ctx, shCfg))
}
return shards
}
func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
var writeCacheOpts []writecache.Option
if wcRead := shCfg.writecacheCfg; wcRead.enabled {
writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.path),
writecache.WithFlushSizeLimit(wcRead.flushSizeLimit),
writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit),
writecache.WithMaxCacheCount(wcRead.countLimit),
writecache.WithNoSync(wcRead.noSync),
writecache.WithLogger(c.log.WithTag(logger.TagWriteCache)),
)
}
return writeCacheOpts
}
func (c *cfg) getPiloramaOpts(shCfg shardCfg) []pilorama.Option {
var piloramaOpts []pilorama.Option
if prRead := shCfg.piloramaCfg; prRead.enabled {
piloramaOpts = append(piloramaOpts,
pilorama.WithPath(prRead.path),
pilorama.WithPerm(prRead.perm),
pilorama.WithNoSync(prRead.noSync),
pilorama.WithMaxBatchSize(prRead.maxBatchSize),
pilorama.WithMaxBatchDelay(prRead.maxBatchDelay),
)
if c.metricsCollector != nil {
piloramaOpts = append(piloramaOpts, pilorama.WithMetrics(lsmetrics.NewPiloramaMetrics(c.metricsCollector.PiloramaMetrics())))
}
}
return piloramaOpts
}
func (c *cfg) getSubstorageOpts(ctx context.Context, shCfg shardCfg) []blobstor.SubStorage {
var ss []blobstor.SubStorage
for _, sRead := range shCfg.subStorages {
switch sRead.typ {
case blobovniczatree.Type:
blobTreeOpts := []blobovniczatree.Option{
blobovniczatree.WithRootPath(sRead.path),
blobovniczatree.WithPermissions(sRead.perm),
blobovniczatree.WithBlobovniczaSize(sRead.size),
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
blobovniczatree.WithOpenedCacheTTL(sRead.openedCacheTTL),
blobovniczatree.WithOpenedCacheExpInterval(sRead.openedCacheExpInterval),
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
blobovniczatree.WithWaitBeforeDropDB(sRead.rebuildDropTimeout),
blobovniczatree.WithBlobovniczaLogger(c.log.WithTag(logger.TagBlobovnicza)),
blobovniczatree.WithBlobovniczaTreeLogger(c.log.WithTag(logger.TagBlobovniczaTree)),
blobovniczatree.WithObjectSizeLimit(shCfg.smallSizeObjectLimit),
}
if c.metricsCollector != nil {
blobTreeOpts = append(blobTreeOpts,
blobovniczatree.WithMetrics(
lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobobvnizcaTreeMetrics()),
),
)
}
ss = append(ss, blobstor.SubStorage{
Storage: blobovniczatree.NewBlobovniczaTree(ctx, blobTreeOpts...),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
case fstree.Type:
fstreeOpts := []fstree.Option{
fstree.WithPath(sRead.path),
fstree.WithPerm(sRead.perm),
fstree.WithDepth(sRead.depth),
fstree.WithNoSync(sRead.noSync),
fstree.WithLogger(c.log.WithTag(logger.TagFSTree)),
}
if c.metricsCollector != nil {
fstreeOpts = append(fstreeOpts,
fstree.WithMetrics(
lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()),
),
)
}
ss = append(ss, blobstor.SubStorage{
Storage: fstree.New(fstreeOpts...),
Policy: func(_ *objectSDK.Object, _ []byte) bool {
return true
},
})
default:
// should never happen, that has already
// been handled: when the config was read
}
}
return ss
}
func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID {
writeCacheOpts := c.getWriteCacheOpts(shCfg)
piloramaOpts := c.getPiloramaOpts(shCfg)
ss := c.getSubstorageOpts(ctx, shCfg)
blobstoreOpts := []blobstor.Option{
blobstor.WithCompression(shCfg.compression),
blobstor.WithStorages(ss),
blobstor.WithLogger(c.log.WithTag(logger.TagBlobstor)),
}
if c.metricsCollector != nil {
blobstoreOpts = append(blobstoreOpts, blobstor.WithMetrics(lsmetrics.NewBlobstoreMetrics(c.metricsCollector.Blobstore())))
}
mbOptions := []meta.Option{
meta.WithPath(shCfg.metaCfg.path),
meta.WithPermissions(shCfg.metaCfg.perm),
meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize),
meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay),
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
}),
meta.WithLogger(c.log),
meta.WithEpochState(c.cfgNetmap.state),
}
if c.metricsCollector != nil {
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
shCfg.limiter.SetMetrics(c.metricsCollector.QoSMetrics())
}
var sh shardOptsWithID
sh.configID = shCfg.id()
sh.shOpts = []shard.Option{
shard.WithLogger(c.log.WithTag(logger.TagShard)),
shard.WithRefillMetabase(shCfg.refillMetabase),
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
shard.WithMode(shCfg.mode),
shard.WithBlobStorOptions(blobstoreOpts...),
shard.WithMetaBaseOptions(mbOptions...),
shard.WithPiloramaOptions(piloramaOpts...),
shard.WithWriteCache(shCfg.writecacheCfg.enabled),
shard.WithWriteCacheOptions(writeCacheOpts),
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
shard.WithExpiredCollectorWorkerCount(shCfg.gcCfg.expiredCollectorWorkerCount),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
fatalOnErr(err)
return pool
}),
shard.WithLimiter(shCfg.limiter),
}
return sh
}
func (c *cfg) loggerPrm() (logger.Prm, error) {
var prm logger.Prm
// (re)init read configuration
err := prm.SetLevelString(c.LoggerCfg.level)
if err != nil {
// not expected since validation should be performed before
return logger.Prm{}, errors.New("incorrect log level format: " + c.LoggerCfg.level)
}
err = prm.SetDestination(c.LoggerCfg.destination)
if err != nil {
// not expected since validation should be performed before
return logger.Prm{}, errors.New("incorrect log destination format: " + c.LoggerCfg.destination)
}
prm.PrependTimestamp = c.LoggerCfg.timestamp
prm.Options = c.LoggerCfg.options
err = prm.SetTags(c.LoggerCfg.tags)
if err != nil {
// not expected since validation should be performed before
return logger.Prm{}, errors.New("incorrect allowed tags format: " + c.LoggerCfg.destination)
}
return prm, nil
}
func (c *cfg) LocalAddress() network.AddressGroup {
return c.localAddr
}
func initLocalStorage(ctx context.Context, c *cfg) {
ls := engine.New(c.engineOpts()...)
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
})
// allocate memory for the service;
// service will be created later
c.cfgObject.getSvc = new(getsvc.Service)
var shardsAttached int
for _, optsWithMeta := range c.shardOpts(ctx) {
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts,
shard.WithTombstoneSource(c.createTombstoneSource()),
shard.WithContainerInfoProvider(c.createContainerInfoProvider(ctx)))...)
if err != nil {
c.log.Error(ctx, logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
} else {
shardsAttached++
c.log.Info(ctx, logs.FrostFSNodeShardAttachedToEngine, zap.Stringer("id", id))
}
}
if shardsAttached == 0 {
fatalOnErr(engineconfig.ErrNoShardConfigured)
}
c.cfgObject.cfgLocalStorage.localStorage = ls
c.onShutdown(func() {
c.log.Info(ctx, logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
err := ls.Close(context.WithoutCancel(ctx))
if err != nil {
c.log.Info(ctx, logs.FrostFSNodeStorageEngineClosingFailure,
zap.Error(err),
)
} else {
c.log.Info(ctx, logs.FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully)
}
})
}
func initAccessPolicyEngine(ctx context.Context, c *cfg) {
var localOverrideDB chainbase.LocalOverrideDatabase
if nodeconfig.PersistentPolicyRules(c.appCfg).Path() == "" {
c.log.Warn(ctx, logs.FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed)
localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase()
} else {
localOverrideDB = chainbase.NewBoltLocalOverrideDatabase(
chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()),
chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()),
chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()),
)
}
var morphRuleStorage policy_engine.MorphRuleChainStorageReader
morphRuleStorage = policy_client.NewContractStorage(
client.NewSwitchRPCGuardedActor(c.cfgMorph.client),
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
cacheSize := morphconfig.APEChainCacheSize(c.appCfg)
if cacheSize > 0 && c.cfgMorph.cacheTTL > 0 {
morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL)
}
ape := newAccessPolicyEngine(morphRuleStorage, localOverrideDB)
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine = ape
c.onShutdown(func() {
if err := ape.LocalOverrideDatabaseCore().Close(); err != nil {
c.log.Warn(ctx, logs.FrostFSNodeAccessPolicyEngineClosingFailure,
zap.Error(err),
)
}
})
}
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
var err error
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
pool.replication, err = ants.NewPool(replicatorPoolSize)
fatalOnErr(err)
return pool
}
func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
var res netmap.NodeInfo
ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok {
res = ni
} else {
res = c.cfgNodeInfo.localInfo
}
return &res
}
// setContractNodeInfo rewrites local node info from the FrostFS network map.
// Called with nil when storage node is outside the FrostFS network map
// (before entering the network and after leaving it).
func (c *cfg) setContractNodeInfo(ni *netmap.NodeInfo) {
c.cfgNetmap.state.setNodeInfo(ni)
}
func (c *cfg) updateContractNodeInfo(ctx context.Context, epoch uint64) {
ni, err := c.netmapLocalNodeState(ctx, epoch)
if err != nil {
c.log.Error(ctx, logs.FrostFSNodeCouldNotUpdateNodeStateOnNewEpoch,
zap.Uint64("epoch", epoch),
zap.Error(err))
return
}
c.setContractNodeInfo(ni)
}
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
// with the binary-encoded information from the current node's configuration.
// The state is set using the provided setter which MUST NOT be nil.
func (c *cfg) bootstrapWithState(ctx context.Context, state netmap.NodeState) error {
ni := c.cfgNodeInfo.localInfo
ni.SetStatus(state)
prm := nmClient.AddPeerPrm{}
prm.SetNodeInfo(ni)
return c.cfgNetmap.wrapper.AddPeer(ctx, prm)
}
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
func bootstrapOnline(ctx context.Context, c *cfg) error {
return c.bootstrapWithState(ctx, netmap.Online)
}
// bootstrap calls bootstrapWithState with:
// - "maintenance" state if maintenance is in progress on the current node
// - "online", otherwise
func (c *cfg) bootstrap(ctx context.Context) error {
// switch to online except when under maintenance
st := c.cfgNetmap.state.controlNetmapStatus()
if st == control.NetmapStatus_MAINTENANCE {
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
return c.bootstrapWithState(ctx, netmap.Maintenance)
}
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithOnlineState,
zap.Stringer("previous", st),
)
return bootstrapOnline(ctx, c)
}
type dCmp struct {
name string
reloadFunc func() error
}
func (c *cfg) signalWatcher(ctx context.Context) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
sighupCh := make(chan os.Signal, 1)
signal.Notify(sighupCh, syscall.SIGHUP)
for {
select {
// signals causing application to shut down should have priority over
// reconfiguration signal
case <-ch:
c.log.Info(ctx, logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)
c.shutdown(ctx)
c.log.Info(ctx, logs.FrostFSNodeTerminationSignalProcessingIsComplete)
return
case err := <-c.internalErr: // internal application error
c.log.Warn(ctx, logs.FrostFSNodeInternalApplicationError,
zap.String("message", err.Error()))
c.shutdown(ctx)
c.log.Info(ctx, logs.FrostFSNodeInternalErrorProcessingIsComplete)
return
default:
// block until any signal is receieved
select {
case <-sighupCh:
c.reloadConfig(ctx)
case <-ch:
c.log.Info(ctx, logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)
c.shutdown(ctx)
c.log.Info(ctx, logs.FrostFSNodeTerminationSignalProcessingIsComplete)
return
case err := <-c.internalErr: // internal application error
c.log.Warn(ctx, logs.FrostFSNodeInternalApplicationError,
zap.String("message", err.Error()))
c.shutdown(ctx)
c.log.Info(ctx, logs.FrostFSNodeInternalErrorProcessingIsComplete)
return
}
}
}
}
func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(ctx, logs.FrostFSNodeSIGHUPHasBeenReceivedRereadingConfiguration)
if !c.compareAndSwapHealthStatus(ctx, control.HealthStatus_READY, control.HealthStatus_RECONFIGURING) {
c.log.Info(ctx, logs.FrostFSNodeSIGHUPSkip)
return
}
defer c.compareAndSwapHealthStatus(ctx, control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
err := c.reloadAppConfig()
if err != nil {
c.log.Error(ctx, logs.FrostFSNodeConfigurationReading, zap.Error(err))
return
}
// all the components are expected to support
// Logger's dynamic reconfiguration approach
components := c.getComponents(ctx)
// Object
c.cfgObject.tombstoneLifetime.Store(c.ObjectCfg.tombstoneLifetime)
// Storage Engine
var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts(ctx) {
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts,
shard.WithTombstoneSource(c.createTombstoneSource()),
shard.WithContainerInfoProvider(c.createContainerInfoProvider(ctx)),
))
}
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
if err != nil {
c.log.Error(ctx, logs.FrostFSNodeStorageEngineConfigurationUpdate, zap.Error(err))
return
}
for _, component := range components {
err = component.reloadFunc()
if err != nil {
c.log.Error(ctx, logs.FrostFSNodeUpdatedConfigurationApplying,
zap.String("component", component.name),
zap.Error(err))
}
}
if err := c.dialerSource.Update(internalNetConfig(c.appCfg, c.metricsCollector.MultinetMetrics())); err != nil {
c.log.Error(ctx, logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
return
}
c.log.Info(ctx, logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
}
func (c *cfg) getComponents(ctx context.Context) []dCmp {
var components []dCmp
components = append(components, dCmp{"logger", func() error {
prm, err := c.loggerPrm()
if err != nil {
return err
}
logger.UpdateLevelForTags(prm)
return nil
}})
components = append(components, dCmp{"runtime", func() error {
setRuntimeParameters(ctx, c)
return nil
}})
components = append(components, dCmp{"audit", func() error {
c.audit.Store(audit.Enabled(c.appCfg))
return nil
}})
components = append(components, dCmp{"pools", c.reloadPools})
components = append(components, dCmp{"tracing", func() error {
traceConfig, err := tracingconfig.ToTracingConfig(c.appCfg)
if err != nil {
return err
}
updated, err := tracing.Setup(ctx, *traceConfig)
if updated {
c.log.Info(ctx, logs.FrostFSNodeTracingConfigationUpdated)
}
return err
}})
if c.treeService != nil {
components = append(components, dCmp{"tree", func() error {
c.treeService.ReloadAuthorizedKeys(treeconfig.Tree(c.appCfg).AuthorizedKeys())
return nil
}})
}
if cmp, updated := metricsComponent(c); updated {
if cmp.enabled {
cmp.preReload = enableMetricsSvc
} else {
cmp.preReload = disableMetricsSvc
}
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
}
if cmp, updated := pprofComponent(c); updated {
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
}
components = append(components, dCmp{"rpc_limiter", func() error { return initRPCLimiter(c) }})
return components
}
func (c *cfg) reloadPools() error {
newSize := replicatorconfig.PoolSize(c.appCfg)
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
return nil
}
func (c *cfg) reloadPool(p *ants.Pool, newSize int, name string) {
oldSize := p.Cap()
if oldSize != newSize {
c.log.Info(context.Background(), logs.FrostFSNodePoolConfigurationUpdate, zap.String("field", name),
zap.Int("old", oldSize), zap.Int("new", newSize))
p.Tune(newSize)
}
}
func (c *cfg) reloadAppConfig() error {
unlock := c.LockAppConfigExclusive()
defer unlock()
return c.readConfig(c.appCfg)
}
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
tombstoneSrc := tsourse.NewSource(tssPrm)
tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)
return tombstoneSource
}
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
return container.NewInfoProvider(func() (container.Source, error) {
c.initMorphComponents(ctx)
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
if err != nil {
return nil, err
}
return containerClient.AsContainerSource(cc), nil
})
}
func (c *cfg) shutdown(ctx context.Context) {
old := c.swapHealthStatus(ctx, control.HealthStatus_SHUTTING_DOWN)
if old == control.HealthStatus_SHUTTING_DOWN {
c.log.Info(ctx, logs.FrostFSNodeShutdownSkip)
return
}
if old == control.HealthStatus_STARTING {
c.log.Warn(ctx, logs.FrostFSNodeShutdownWhenNotReady)
}
c.ctxCancel()
close(c.done)
for i := range c.closers {
c.closers[len(c.closers)-1-i].fn()
}
if err := sdnotify.ClearStatus(); err != nil {
c.log.Error(ctx, logs.FailedToReportStatusToSystemd, zap.Error(err))
}
}