frostfs-node/cmd/frostfs-node/config.go
Dmitrii Stepanov e515dd4582
[#1444] config: Fix data race on morph component init
It could be called for every shard on metabase resync concurrently and
it is possible to get state with initialized client but not initialized
contract hashes.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-23 10:41:36 +03:00

1489 lines
44 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"io/fs"
"net"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/audit"
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/multinet"
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
lsmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
netmap2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
policy_engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
policy_client "git.frostfs.info/TrueCloudLab/policy-engine/pkg/morph/policy"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/panjf2000/ants/v2"
"go.etcd.io/bbolt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const addressSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding
const maxMsgSize = 4 << 20 // transport msg limit 4 MiB
// capacity of the pools of the morph notification handlers
// for each contract listener.
const notificationHandlerPoolSize = 10
// applicationConfiguration reads and stores component-specific configuration
// values. It should not store any application helpers structs (pointers to shared
// structs).
// It must not be used concurrently.
type applicationConfiguration struct {
// _read indicated whether a config
// has already been read
_read bool
LoggerCfg struct {
level string
destination string
timestamp bool
}
ObjectCfg struct {
tombstoneLifetime uint64
}
EngineCfg struct {
errorThreshold uint32
shardPoolSize uint32
shards []shardCfg
lowMem bool
}
// if need to run node in compatibility with other versions mode
cmode *atomic.Bool
}
type shardCfg struct {
compress bool
estimateCompressibility bool
estimateCompressibilityThreshold float64
smallSizeObjectLimit uint64
uncompressableContentType []string
refillMetabase bool
refillMetabaseWorkersCount int
mode shardmode.Mode
metaCfg struct {
path string
perm fs.FileMode
maxBatchSize int
maxBatchDelay time.Duration
}
subStorages []subStorageCfg
gcCfg struct {
removerBatchSize int
removerSleepInterval time.Duration
expiredCollectorBatchSize int
expiredCollectorWorkerCount int
}
writecacheCfg struct {
enabled bool
path string
maxObjSize uint64
flushWorkerCount int
sizeLimit uint64
countLimit uint64
noSync bool
flushSizeLimit uint64
}
piloramaCfg struct {
enabled bool
path string
perm fs.FileMode
noSync bool
maxBatchSize int
maxBatchDelay time.Duration
}
}
// id returns persistent id of a shard. It is different from the ID used in runtime
// and is primarily used to identify shards in the configuration.
func (c *shardCfg) id() string {
// This calculation should be kept in sync with
// pkg/local_object_storage/engine/control.go file.
var sb strings.Builder
for i := range c.subStorages {
sb.WriteString(filepath.Clean(c.subStorages[i].path))
}
return sb.String()
}
type subStorageCfg struct {
// common for all storages
typ string
path string
perm fs.FileMode
depth uint64
noSync bool
// blobovnicza-specific
size uint64
width uint64
openedCacheSize int
initWorkerCount int
rebuildDropTimeout time.Duration
openedCacheTTL time.Duration
openedCacheExpInterval time.Duration
}
// readConfig fills applicationConfiguration with raw configuration values
// not modifying them.
func (a *applicationConfiguration) readConfig(c *config.Config) error {
if a._read {
err := c.Reload()
if err != nil {
return fmt.Errorf("could not reload configuration: %w", err)
}
err = validateConfig(c)
if err != nil {
return fmt.Errorf("configuration's validation: %w", err)
}
// clear if it is rereading
cmode := a.cmode
*a = applicationConfiguration{}
a.cmode = cmode
}
a._read = true
a.cmode.Store(nodeconfig.CompatibilityMode(c))
// Logger
a.LoggerCfg.level = loggerconfig.Level(c)
a.LoggerCfg.destination = loggerconfig.Destination(c)
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
// Object
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
// Storage Engine
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
}
func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig *shardconfig.Config) error {
var newConfig shardCfg
newConfig.refillMetabase = oldConfig.RefillMetabase()
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
newConfig.mode = oldConfig.Mode()
newConfig.compress = oldConfig.Compress()
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
newConfig.estimateCompressibilityThreshold = oldConfig.EstimateCompressibilityThreshold()
newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes()
newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit()
a.setShardWriteCacheConfig(&newConfig, oldConfig)
a.setShardPiloramaConfig(c, &newConfig, oldConfig)
if err := a.setShardStorageConfig(&newConfig, oldConfig); err != nil {
return err
}
a.setMetabaseConfig(&newConfig, oldConfig)
a.setGCConfig(&newConfig, oldConfig)
a.EngineCfg.shards = append(a.EngineCfg.shards, newConfig)
return nil
}
func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
writeCacheCfg := oldConfig.WriteCache()
if writeCacheCfg.Enabled() {
wc := &newConfig.writecacheCfg
wc.enabled = true
wc.path = writeCacheCfg.Path()
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
wc.sizeLimit = writeCacheCfg.SizeLimit()
wc.countLimit = writeCacheCfg.CountLimit()
wc.noSync = writeCacheCfg.NoSync()
wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize()
}
}
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newConfig *shardCfg, oldConfig *shardconfig.Config) {
if config.BoolSafe(c.Sub("tree"), "enabled") {
piloramaCfg := oldConfig.Pilorama()
pr := &newConfig.piloramaCfg
pr.enabled = true
pr.path = piloramaCfg.Path()
pr.perm = piloramaCfg.Perm()
pr.noSync = piloramaCfg.NoSync()
pr.maxBatchSize = piloramaCfg.MaxBatchSize()
pr.maxBatchDelay = piloramaCfg.MaxBatchDelay()
}
}
func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) error {
blobStorCfg := oldConfig.BlobStor()
storagesCfg := blobStorCfg.Storages()
ss := make([]subStorageCfg, 0, len(storagesCfg))
for i := range storagesCfg {
var sCfg subStorageCfg
sCfg.typ = storagesCfg[i].Type()
sCfg.path = storagesCfg[i].Path()
sCfg.perm = storagesCfg[i].Perm()
switch storagesCfg[i].Type() {
case blobovniczatree.Type:
sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i]))
sCfg.size = sub.Size()
sCfg.depth = sub.ShallowDepth()
sCfg.width = sub.ShallowWidth()
sCfg.openedCacheSize = sub.OpenedCacheSize()
sCfg.openedCacheTTL = sub.OpenedCacheTTL()
sCfg.openedCacheExpInterval = sub.OpenedCacheExpInterval()
sCfg.initWorkerCount = sub.InitWorkerCount()
sCfg.rebuildDropTimeout = sub.RebuildDropTimeout()
case fstree.Type:
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
sCfg.depth = sub.Depth()
sCfg.noSync = sub.NoSync()
default:
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type())
}
ss = append(ss, sCfg)
}
newConfig.subStorages = ss
return nil
}
func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
metabaseCfg := oldConfig.Metabase()
m := &newConfig.metaCfg
m.path = metabaseCfg.Path()
m.perm = metabaseCfg.BoltDB().Perm()
m.maxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay()
m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize()
}
func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
gcCfg := oldConfig.GC()
newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
}
// internals contains application-specific internals that are created
// on application startup and are shared b/w the components during
// the application life cycle.
// It should not contain any read configuration values, component-specific
// helpers and fields.
type internals struct {
done chan struct{}
ctxCancel func()
internalErr chan error // channel for internal application errors at runtime
appCfg *config.Config
log *logger.Logger
wg sync.WaitGroup
workers []worker
closers []closer
apiVersion version.Version
healthStatus *atomic.Int32
// is node under maintenance
isMaintenance atomic.Bool
audit *atomic.Bool
sdNotify bool
}
// starts node's maintenance.
func (c *cfg) startMaintenance() {
c.isMaintenance.Store(true)
c.cfgNetmap.state.setControlNetmapStatus(control.NetmapStatus_MAINTENANCE)
c.log.Info(logs.FrostFSNodeStartedLocalNodesMaintenance)
}
// stops node's maintenance.
func (c *internals) stopMaintenance() {
if c.isMaintenance.CompareAndSwap(true, false) {
c.log.Info(logs.FrostFSNodeStoppedLocalNodesMaintenance)
}
}
// IsMaintenance checks if storage node is under maintenance.
//
// Provides util.NodeState to Object service.
func (c *internals) IsMaintenance() bool {
return c.isMaintenance.Load()
}
// shared contains component-specific structs/helpers that should
// be shared during initialization of the application.
type shared struct {
privateTokenStore sessionStorage
persistate *state.PersistentStorage
clientCache *cache.ClientCache
bgClientCache *cache.ClientCache
putClientCache *cache.ClientCache
localAddr network.AddressGroup
key *keys.PrivateKey
binPublicKey []byte
ownerIDFromKey user.ID // user ID calculated from key
// current network map
netMap atomic.Value // type netmap.NetMap
netMapSource netmapCore.Source
cnrClient *containerClient.Client
frostfsidClient frostfsidcore.SubjectProvider
respSvc *response.Service
replicator *replicator.Replicator
treeService *tree.Service
metricsCollector *metrics.NodeMetrics
metricsSvc *objectService.MetricCollector
dialerSource *internalNet.DialerSource
}
// dynamicConfiguration stores parameters of the
// components that supports runtime reconfigurations.
type dynamicConfiguration struct {
logger *logger.Prm
pprof *httpComponent
metrics *httpComponent
}
type appConfigGuard struct {
mtx sync.RWMutex
}
func (g *appConfigGuard) LockAppConfigShared() func() {
g.mtx.RLock()
return func() { g.mtx.RUnlock() }
}
func (g *appConfigGuard) LockAppConfigExclusive() func() {
g.mtx.Lock()
return func() { g.mtx.Unlock() }
}
type cfg struct {
applicationConfiguration
internals
shared
dynamicConfiguration
appConfigGuard
// configuration of the internal
// services
cfgGRPC cfgGRPC
cfgMorph cfgMorph
cfgAccounting cfgAccounting
cfgContainer cfgContainer
cfgFrostfsID cfgFrostfsID
cfgNodeInfo cfgNodeInfo
cfgNetmap cfgNetmap
cfgControlService cfgControlService
cfgObject cfgObject
}
// ReadCurrentNetMap reads network map which has been cached at the
// latest epoch. Returns an error if value has not been cached yet.
//
// Provides interface for NetmapService server.
func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
val := c.netMap.Load()
if val == nil {
return errors.New("missing local network map")
}
val.(netmap.NetMap).WriteToV2(msg)
return nil
}
type grpcServer struct {
Listener net.Listener
Server *grpc.Server
Endpoint string
}
type cfgGRPC struct {
// guard protects connections and handlers
guard sync.RWMutex
// servers must be protected with guard
servers []grpcServer
// handlers must be protected with guard
handlers []func(e string, l net.Listener, s *grpc.Server)
maxChunkSize uint64
maxAddrAmount uint64
reconnectTimeout time.Duration
}
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
}
func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) {
c.guard.Lock()
defer c.guard.Unlock()
c.servers = append(c.servers, grpcServer{
Listener: l,
Server: s,
Endpoint: e,
})
for _, h := range c.handlers {
h(e, l, s)
}
}
func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) {
c.guard.Lock()
defer c.guard.Unlock()
for _, conn := range c.servers {
handler(conn.Endpoint, conn.Listener, conn.Server)
}
c.handlers = append(c.handlers, handler)
}
func (c *cfgGRPC) dropConnection(endpoint string) {
c.guard.Lock()
defer c.guard.Unlock()
pos := -1
for idx, srv := range c.servers {
if srv.Endpoint == endpoint {
pos = idx
break
}
}
if pos < 0 {
return
}
c.servers[pos].Server.Stop() // closes listener
c.servers = append(c.servers[0:pos], c.servers[pos+1:]...)
}
type cfgMorph struct {
initialized bool
guard sync.Mutex
client *client.Client
notaryEnabled bool
// TTL of Sidechain cached values. Non-positive value disables caching.
cacheTTL time.Duration
containerCacheSize uint32
proxyScriptHash neogoutil.Uint160
}
type cfgAccounting struct {
scriptHash neogoutil.Uint160
}
type cfgContainer struct {
scriptHash neogoutil.Uint160
parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers
}
type cfgFrostfsID struct {
scriptHash neogoutil.Uint160
}
type cfgNetmap struct {
scriptHash neogoutil.Uint160
wrapper *nmClient.Client
parsers map[event.Type]event.NotificationParser
subscribers map[event.Type][]event.Handler
workerPool util.WorkerPool // pool for asynchronous handlers
state *networkState
needBootstrap bool
reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime
}
type cfgNodeInfo struct {
// values from config
localInfo netmap.NodeInfo
}
type cfgObject struct {
getSvc *getsvc.Service
cnrSource container.Source
eaclSource container.EACLSource
cfgAccessPolicyEngine cfgAccessPolicyEngine
pool cfgObjectRoutines
cfgLocalStorage cfgLocalStorage
tombstoneLifetime *atomic.Uint64
skipSessionTokenIssuerVerification bool
}
type cfgLocalStorage struct {
localStorage *engine.StorageEngine
}
type cfgAccessPolicyEngine struct {
policyContractHash neogoutil.Uint160
accessPolicyEngine *accessPolicyEngine
}
type cfgObjectRoutines struct {
putRemote *ants.Pool
putLocal *ants.Pool
replication *ants.Pool
}
type cfgControlService struct {
server *grpc.Server
}
var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
func initCfg(appCfg *config.Config) *cfg {
c := &cfg{
applicationConfiguration: applicationConfiguration{
cmode: &atomic.Bool{},
},
}
err := c.readConfig(appCfg)
if err != nil {
panic(fmt.Errorf("config reading: %w", err))
}
key := nodeconfig.Key(appCfg)
relayOnly := nodeconfig.Relay(appCfg)
netState := newNetworkState()
c.shared = initShared(appCfg, key, netState, relayOnly)
netState.metrics = c.metricsCollector
logPrm, err := c.loggerPrm()
fatalOnErr(err)
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
log, err := logger.NewLogger(logPrm)
fatalOnErr(err)
if loggerconfig.ToLokiConfig(appCfg).Enabled {
log.Logger = log.Logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(appCfg))
return lokiCore
}))
}
c.internals = initInternals(appCfg, log)
c.cfgAccounting = cfgAccounting{
scriptHash: contractsconfig.Balance(appCfg),
}
c.cfgContainer = initContainer(appCfg)
c.cfgFrostfsID = initFrostfsID(appCfg)
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
c.cfgGRPC = initCfgGRPC()
c.cfgMorph = cfgMorph{
proxyScriptHash: contractsconfig.Proxy(appCfg),
}
c.cfgObject = initCfgObject(appCfg)
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
c.onShutdown(c.clientCache.CloseAll) // clean up connections
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
c.onShutdown(c.putClientCache.CloseAll) // clean up connections
c.onShutdown(func() { _ = c.persistate.Close() })
return c
}
func initInternals(appCfg *config.Config, log *logger.Logger) internals {
var healthStatus atomic.Int32
healthStatus.Store(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED))
var auditRequests atomic.Bool
auditRequests.Store(audit.Enabled(appCfg))
return internals{
done: make(chan struct{}),
appCfg: appCfg,
internalErr: make(chan error),
log: log,
apiVersion: version.Current(),
healthStatus: &healthStatus,
sdNotify: initSdNotify(appCfg),
audit: &auditRequests,
}
}
func initSdNotify(appCfg *config.Config) bool {
if config.BoolSafe(appCfg.Sub("systemdnotify"), "enabled") {
fatalOnErr(sdnotify.InitSocket())
return true
}
return false
}
func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState, relayOnly bool) shared {
var netAddr network.AddressGroup
if !relayOnly {
netAddr = nodeconfig.BootstrapAddresses(appCfg)
}
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
fatalOnErr(err)
nodeMetrics := metrics.NewNodeMetrics()
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg, nodeMetrics.MultinetMetrics()))
fatalOnErr(err)
cacheOpts := cache.ClientCacheOpts{
DialTimeout: apiclientconfig.DialTimeout(appCfg),
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
Key: &key.PrivateKey,
AllowExternal: apiclientconfig.AllowExternal(appCfg),
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
DialerSource: ds,
}
return shared{
key: key,
binPublicKey: key.PublicKey().Bytes(),
localAddr: netAddr,
respSvc: response.NewService(netState),
clientCache: cache.NewSDKClientCache(cacheOpts),
bgClientCache: cache.NewSDKClientCache(cacheOpts),
putClientCache: cache.NewSDKClientCache(cacheOpts),
persistate: persistate,
metricsCollector: nodeMetrics,
dialerSource: ds,
}
}
func internalNetConfig(appCfg *config.Config, m metrics.MultinetMetrics) internalNet.Config {
result := internalNet.Config{
Enabled: multinet.Enabled(appCfg),
Balancer: multinet.Balancer(appCfg),
Restrict: multinet.Restrict(appCfg),
FallbackDelay: multinet.FallbackDelay(appCfg),
Metrics: m,
}
sn := multinet.Subnets(appCfg)
for _, s := range sn {
result.Subnets = append(result.Subnets, internalNet.Subnet{
Prefix: s.Mask,
SourceIPs: s.SourceIPs,
})
}
return result
}
func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap {
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
var reBootstrapTurnedOff atomic.Bool
reBootstrapTurnedOff.Store(relayOnly)
return cfgNetmap{
scriptHash: contractsconfig.Netmap(appCfg),
state: netState,
workerPool: netmapWorkerPool,
needBootstrap: !relayOnly,
reBoostrapTurnedOff: &reBootstrapTurnedOff,
}
}
func initContainer(appCfg *config.Config) cfgContainer {
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
return cfgContainer{
scriptHash: contractsconfig.Container(appCfg),
workerPool: containerWorkerPool,
}
}
func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
return cfgFrostfsID{
scriptHash: contractsconfig.FrostfsID(appCfg),
}
}
func initCfgGRPC() cfgGRPC {
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes
return cfgGRPC{
maxChunkSize: maxChunkSize,
maxAddrAmount: maxAddrAmount,
}
}
func initCfgObject(appCfg *config.Config) cfgObject {
var tsLifetime atomic.Uint64
tsLifetime.Store(objectconfig.TombstoneLifetime(appCfg))
return cfgObject{
pool: initObjectPool(appCfg),
tombstoneLifetime: &tsLifetime,
skipSessionTokenIssuerVerification: objectconfig.Put(appCfg).SkipSessionTokenIssuerVerification(),
}
}
func (c *cfg) engineOpts() []engine.Option {
var opts []engine.Option
opts = append(opts,
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
engine.WithLogger(c.log),
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
)
if c.metricsCollector != nil {
opts = append(opts, engine.WithMetrics(c.metricsCollector.Engine()))
}
return opts
}
type shardOptsWithID struct {
configID string
shOpts []shard.Option
}
func (c *cfg) shardOpts(ctx context.Context) []shardOptsWithID {
shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards))
for _, shCfg := range c.EngineCfg.shards {
shards = append(shards, c.getShardOpts(ctx, shCfg))
}
return shards
}
func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
var writeCacheOpts []writecache.Option
if wcRead := shCfg.writecacheCfg; wcRead.enabled {
writeCacheOpts = append(writeCacheOpts,
writecache.WithPath(wcRead.path),
writecache.WithFlushSizeLimit(wcRead.flushSizeLimit),
writecache.WithMaxObjectSize(wcRead.maxObjSize),
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
writecache.WithMaxCacheSize(wcRead.sizeLimit),
writecache.WithMaxCacheCount(wcRead.countLimit),
writecache.WithNoSync(wcRead.noSync),
writecache.WithLogger(c.log),
)
}
return writeCacheOpts
}
func (c *cfg) getPiloramaOpts(shCfg shardCfg) []pilorama.Option {
var piloramaOpts []pilorama.Option
if prRead := shCfg.piloramaCfg; prRead.enabled {
piloramaOpts = append(piloramaOpts,
pilorama.WithPath(prRead.path),
pilorama.WithPerm(prRead.perm),
pilorama.WithNoSync(prRead.noSync),
pilorama.WithMaxBatchSize(prRead.maxBatchSize),
pilorama.WithMaxBatchDelay(prRead.maxBatchDelay),
)
if c.metricsCollector != nil {
piloramaOpts = append(piloramaOpts, pilorama.WithMetrics(lsmetrics.NewPiloramaMetrics(c.metricsCollector.PiloramaMetrics())))
}
}
return piloramaOpts
}
func (c *cfg) getSubstorageOpts(ctx context.Context, shCfg shardCfg) []blobstor.SubStorage {
var ss []blobstor.SubStorage
for _, sRead := range shCfg.subStorages {
switch sRead.typ {
case blobovniczatree.Type:
blobTreeOpts := []blobovniczatree.Option{
blobovniczatree.WithRootPath(sRead.path),
blobovniczatree.WithPermissions(sRead.perm),
blobovniczatree.WithBlobovniczaSize(sRead.size),
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
blobovniczatree.WithOpenedCacheTTL(sRead.openedCacheTTL),
blobovniczatree.WithOpenedCacheExpInterval(sRead.openedCacheExpInterval),
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
blobovniczatree.WithWaitBeforeDropDB(sRead.rebuildDropTimeout),
blobovniczatree.WithLogger(c.log),
blobovniczatree.WithObjectSizeLimit(shCfg.smallSizeObjectLimit),
}
if c.metricsCollector != nil {
blobTreeOpts = append(blobTreeOpts,
blobovniczatree.WithMetrics(
lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobobvnizcaTreeMetrics()),
),
)
}
ss = append(ss, blobstor.SubStorage{
Storage: blobovniczatree.NewBlobovniczaTree(ctx, blobTreeOpts...),
Policy: func(_ *objectSDK.Object, data []byte) bool {
return uint64(len(data)) < shCfg.smallSizeObjectLimit
},
})
case fstree.Type:
fstreeOpts := []fstree.Option{
fstree.WithPath(sRead.path),
fstree.WithPerm(sRead.perm),
fstree.WithDepth(sRead.depth),
fstree.WithNoSync(sRead.noSync),
fstree.WithLogger(c.log),
}
if c.metricsCollector != nil {
fstreeOpts = append(fstreeOpts,
fstree.WithMetrics(
lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()),
),
)
}
ss = append(ss, blobstor.SubStorage{
Storage: fstree.New(fstreeOpts...),
Policy: func(_ *objectSDK.Object, _ []byte) bool {
return true
},
})
default:
// should never happen, that has already
// been handled: when the config was read
}
}
return ss
}
func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID {
writeCacheOpts := c.getWriteCacheOpts(shCfg)
piloramaOpts := c.getPiloramaOpts(shCfg)
ss := c.getSubstorageOpts(ctx, shCfg)
blobstoreOpts := []blobstor.Option{
blobstor.WithCompressObjects(shCfg.compress),
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
blobstor.WithCompressibilityEstimate(shCfg.estimateCompressibility),
blobstor.WithCompressibilityEstimateThreshold(shCfg.estimateCompressibilityThreshold),
blobstor.WithStorages(ss),
blobstor.WithLogger(c.log),
}
if c.metricsCollector != nil {
blobstoreOpts = append(blobstoreOpts, blobstor.WithMetrics(lsmetrics.NewBlobstoreMetrics(c.metricsCollector.Blobstore())))
}
mbOptions := []meta.Option{
meta.WithPath(shCfg.metaCfg.path),
meta.WithPermissions(shCfg.metaCfg.perm),
meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize),
meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay),
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
}),
meta.WithLogger(c.log),
meta.WithEpochState(c.cfgNetmap.state),
}
if c.metricsCollector != nil {
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
}
var sh shardOptsWithID
sh.configID = shCfg.id()
sh.shOpts = []shard.Option{
shard.WithLogger(c.log),
shard.WithRefillMetabase(shCfg.refillMetabase),
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
shard.WithMode(shCfg.mode),
shard.WithBlobStorOptions(blobstoreOpts...),
shard.WithMetaBaseOptions(mbOptions...),
shard.WithPiloramaOptions(piloramaOpts...),
shard.WithWriteCache(shCfg.writecacheCfg.enabled),
shard.WithWriteCacheOptions(writeCacheOpts),
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
shard.WithExpiredCollectorWorkerCount(shCfg.gcCfg.expiredCollectorWorkerCount),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
pool, err := ants.NewPool(sz)
fatalOnErr(err)
return pool
}),
}
return sh
}
func (c *cfg) loggerPrm() (*logger.Prm, error) {
// check if it has been inited before
if c.dynamicConfiguration.logger == nil {
c.dynamicConfiguration.logger = new(logger.Prm)
}
// (re)init read configuration
err := c.dynamicConfiguration.logger.SetLevelString(c.LoggerCfg.level)
if err != nil {
// not expected since validation should be performed before
panic("incorrect log level format: " + c.LoggerCfg.level)
}
err = c.dynamicConfiguration.logger.SetDestination(c.LoggerCfg.destination)
if err != nil {
// not expected since validation should be performed before
panic("incorrect log destination format: " + c.LoggerCfg.destination)
}
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
return c.dynamicConfiguration.logger, nil
}
func (c *cfg) LocalAddress() network.AddressGroup {
return c.localAddr
}
func initLocalStorage(ctx context.Context, c *cfg) {
ls := engine.New(c.engineOpts()...)
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
})
// allocate memory for the service;
// service will be created later
c.cfgObject.getSvc = new(getsvc.Service)
var shardsAttached int
for _, optsWithMeta := range c.shardOpts(ctx) {
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts,
shard.WithTombstoneSource(c.createTombstoneSource()),
shard.WithContainerInfoProvider(c.createContainerInfoProvider(ctx)))...)
if err != nil {
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
} else {
shardsAttached++
c.log.Info(logs.FrostFSNodeShardAttachedToEngine, zap.Stringer("id", id))
}
}
if shardsAttached == 0 {
fatalOnErr(engineconfig.ErrNoShardConfigured)
}
c.cfgObject.cfgLocalStorage.localStorage = ls
c.onShutdown(func() {
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
err := ls.Close(context.WithoutCancel(ctx))
if err != nil {
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
zap.String("error", err.Error()),
)
} else {
c.log.Info(logs.FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully)
}
})
}
func initAccessPolicyEngine(_ context.Context, c *cfg) {
var localOverrideDB chainbase.LocalOverrideDatabase
if nodeconfig.PersistentPolicyRules(c.appCfg).Path() == "" {
c.log.Warn(logs.FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed)
localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase()
} else {
localOverrideDB = chainbase.NewBoltLocalOverrideDatabase(
chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()),
chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()),
chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()),
)
}
var morphRuleStorage policy_engine.MorphRuleChainStorageReader
morphRuleStorage = policy_client.NewContractStorage(
client.NewSwitchRPCGuardedActor(c.cfgMorph.client),
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
cacheSize := morphconfig.APEChainCacheSize(c.appCfg)
if cacheSize > 0 {
morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL)
}
ape := newAccessPolicyEngine(morphRuleStorage, localOverrideDB)
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine = ape
c.onShutdown(func() {
if err := ape.LocalOverrideDatabaseCore().Close(); err != nil {
c.log.Warn(logs.FrostFSNodeAccessPolicyEngineClosingFailure,
zap.Error(err),
)
}
})
}
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
var err error
optNonBlocking := ants.WithNonblocking(true)
putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
fatalOnErr(err)
putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
fatalOnErr(err)
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
if replicatorPoolSize <= 0 {
replicatorPoolSize = putRemoteCapacity
}
pool.replication, err = ants.NewPool(replicatorPoolSize)
fatalOnErr(err)
return pool
}
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
var res netmapV2.NodeInfo
ni, ok := c.cfgNetmap.state.getNodeInfo()
if ok {
ni.WriteToV2(&res)
} else {
c.cfgNodeInfo.localInfo.WriteToV2(&res)
}
return &res, nil
}
// setContractNodeInfo rewrites local node info from the FrostFS network map.
// Called with nil when storage node is outside the FrostFS network map
// (before entering the network and after leaving it).
func (c *cfg) setContractNodeInfo(ni *netmap.NodeInfo) {
c.cfgNetmap.state.setNodeInfo(ni)
}
func (c *cfg) updateContractNodeInfo(epoch uint64) {
ni, err := c.netmapLocalNodeState(epoch)
if err != nil {
c.log.Error(logs.FrostFSNodeCouldNotUpdateNodeStateOnNewEpoch,
zap.Uint64("epoch", epoch),
zap.String("error", err.Error()))
return
}
c.setContractNodeInfo(ni)
}
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
// with the binary-encoded information from the current node's configuration.
// The state is set using the provided setter which MUST NOT be nil.
func (c *cfg) bootstrapWithState(stateSetter func(*netmap.NodeInfo)) error {
ni := c.cfgNodeInfo.localInfo
stateSetter(&ni)
prm := nmClient.AddPeerPrm{}
prm.SetNodeInfo(ni)
return c.cfgNetmap.wrapper.AddPeer(prm)
}
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
func bootstrapOnline(c *cfg) error {
return c.bootstrapWithState(func(ni *netmap.NodeInfo) {
ni.SetStatus(netmap.Online)
})
}
// bootstrap calls bootstrapWithState with:
// - "maintenance" state if maintenance is in progress on the current node
// - "online", otherwise
func (c *cfg) bootstrap() error {
// switch to online except when under maintenance
st := c.cfgNetmap.state.controlNetmapStatus()
if st == control.NetmapStatus_MAINTENANCE {
c.log.Info(logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
return c.bootstrapWithState(func(ni *netmap.NodeInfo) {
ni.SetStatus(netmap.Maintenance)
})
}
c.log.Info(logs.FrostFSNodeBootstrappingWithOnlineState,
zap.Stringer("previous", st),
)
return bootstrapOnline(c)
}
// needBootstrap checks if local node should be registered in network on bootup.
func (c *cfg) needBootstrap() bool {
return c.cfgNetmap.needBootstrap
}
type dCmp struct {
name string
reloadFunc func() error
}
func (c *cfg) signalWatcher(ctx context.Context) {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
sighupCh := make(chan os.Signal, 1)
signal.Notify(sighupCh, syscall.SIGHUP)
for {
select {
// signals causing application to shut down should have priority over
// reconfiguration signal
case <-ch:
c.log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)
c.shutdown()
c.log.Info(logs.FrostFSNodeTerminationSignalProcessingIsComplete)
return
case err := <-c.internalErr: // internal application error
c.log.Warn(logs.FrostFSNodeInternalApplicationError,
zap.String("message", err.Error()))
c.shutdown()
c.log.Info(logs.FrostFSNodeInternalErrorProcessingIsComplete)
return
default:
// block until any signal is receieved
select {
case <-sighupCh:
c.reloadConfig(ctx)
case <-ch:
c.log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)
c.shutdown()
c.log.Info(logs.FrostFSNodeTerminationSignalProcessingIsComplete)
return
case err := <-c.internalErr: // internal application error
c.log.Warn(logs.FrostFSNodeInternalApplicationError,
zap.String("message", err.Error()))
c.shutdown()
c.log.Info(logs.FrostFSNodeInternalErrorProcessingIsComplete)
return
}
}
}
}
func (c *cfg) reloadConfig(ctx context.Context) {
c.log.Info(logs.FrostFSNodeSIGHUPHasBeenReceivedRereadingConfiguration)
if !c.compareAndSwapHealthStatus(control.HealthStatus_READY, control.HealthStatus_RECONFIGURING) {
c.log.Info(logs.FrostFSNodeSIGHUPSkip)
return
}
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
err := c.reloadAppConfig()
if err != nil {
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
return
}
// all the components are expected to support
// Logger's dynamic reconfiguration approach
// Logger
logPrm, err := c.loggerPrm()
if err != nil {
c.log.Error(logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err))
return
}
components := c.getComponents(ctx, logPrm)
// Object
c.cfgObject.tombstoneLifetime.Store(c.ObjectCfg.tombstoneLifetime)
// Storage Engine
var rcfg engine.ReConfiguration
for _, optsWithID := range c.shardOpts(ctx) {
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts,
shard.WithTombstoneSource(c.createTombstoneSource()),
shard.WithContainerInfoProvider(c.createContainerInfoProvider(ctx)),
))
}
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
if err != nil {
c.log.Error(logs.FrostFSNodeStorageEngineConfigurationUpdate, zap.Error(err))
return
}
for _, component := range components {
err = component.reloadFunc()
if err != nil {
c.log.Error(logs.FrostFSNodeUpdatedConfigurationApplying,
zap.String("component", component.name),
zap.Error(err))
}
}
if err := c.dialerSource.Update(internalNetConfig(c.appCfg, c.metricsCollector.MultinetMetrics())); err != nil {
c.log.Error(logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
return
}
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
}
func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
var components []dCmp
components = append(components, dCmp{"logger", logPrm.Reload})
components = append(components, dCmp{"runtime", func() error {
setRuntimeParameters(c)
return nil
}})
components = append(components, dCmp{"audit", func() error {
c.audit.Store(audit.Enabled(c.appCfg))
return nil
}})
components = append(components, dCmp{"pools", c.reloadPools})
components = append(components, dCmp{"tracing", func() error {
traceConfig, err := tracingconfig.ToTracingConfig(c.appCfg)
if err != nil {
return err
}
updated, err := tracing.Setup(ctx, *traceConfig)
if updated {
c.log.Info(logs.FrostFSNodeTracingConfigationUpdated)
}
return err
}})
if cmp, updated := metricsComponent(c); updated {
if cmp.enabled {
cmp.preReload = enableMetricsSvc
} else {
cmp.preReload = disableMetricsSvc
}
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
}
if cmp, updated := pprofComponent(c); updated {
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
}
return components
}
func (c *cfg) reloadPools() error {
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
newSize = replicatorconfig.PoolSize(c.appCfg)
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
return nil
}
func (c *cfg) reloadPool(p *ants.Pool, newSize int, name string) {
oldSize := p.Cap()
if oldSize != newSize {
c.log.Info(logs.FrostFSNodePoolConfigurationUpdate, zap.String("field", name),
zap.Int("old", oldSize), zap.Int("new", newSize))
p.Tune(newSize)
}
}
func (c *cfg) reloadAppConfig() error {
unlock := c.LockAppConfigExclusive()
defer unlock()
return c.readConfig(c.appCfg)
}
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
var tssPrm tsourse.TombstoneSourcePrm
tssPrm.SetGetService(c.cfgObject.getSvc)
tombstoneSrc := tsourse.NewSource(tssPrm)
tombstoneSource := tombstone.NewChecker(
tombstone.WithLogger(c.log),
tombstone.WithTombstoneSource(tombstoneSrc),
)
return tombstoneSource
}
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
return container.NewInfoProvider(func() (container.Source, error) {
c.initMorphComponents(ctx)
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0, containerClient.TryNotary())
if err != nil {
return nil, err
}
return containerClient.AsContainerSource(cc), nil
})
}
func (c *cfg) shutdown() {
old := c.swapHealthStatus(control.HealthStatus_SHUTTING_DOWN)
if old == control.HealthStatus_SHUTTING_DOWN {
c.log.Info(logs.FrostFSNodeShutdownSkip)
return
}
if old == control.HealthStatus_STARTING {
c.log.Warn(logs.FrostFSNodeShutdownWhenNotReady)
}
c.ctxCancel()
close(c.done)
for i := range c.closers {
c.closers[len(c.closers)-1-i].fn()
}
if err := sdnotify.ClearStatus(); err != nil {
c.log.Error(logs.FailedToReportStatusToSystemd, zap.Error(err))
}
}