This fixes shutdown panic: 1. Some morph connection gets error and passes it to internalErr channel. 2. Storage node starts to shutdow and closes internalErr channel. 3. Other morph connection gets error and tries to pass it to internalErr channel. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
1331 lines
39 KiB
Go
1331 lines
39 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/fs"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
|
apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient"
|
|
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
|
|
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
|
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
|
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
|
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
|
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
|
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
|
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
|
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
lsmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metrics"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
|
containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
netmap2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
|
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
|
policy_client "git.frostfs.info/TrueCloudLab/policy-engine/pkg/morph/policy"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
|
|
"github.com/panjf2000/ants/v2"
|
|
"go.etcd.io/bbolt"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const addressSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding
|
|
|
|
const maxMsgSize = 4 << 20 // transport msg limit 4 MiB
|
|
|
|
// capacity of the pools of the morph notification handlers
|
|
// for each contract listener.
|
|
const notificationHandlerPoolSize = 10
|
|
|
|
// applicationConfiguration reads and stores component-specific configuration
|
|
// values. It should not store any application helpers structs (pointers to shared
|
|
// structs).
|
|
// It must not be used concurrently.
|
|
type applicationConfiguration struct {
|
|
// _read indicated whether a config
|
|
// has already been read
|
|
_read bool
|
|
|
|
LoggerCfg struct {
|
|
level string
|
|
}
|
|
|
|
EngineCfg struct {
|
|
errorThreshold uint32
|
|
shardPoolSize uint32
|
|
shards []shardCfg
|
|
lowMem bool
|
|
rebuildWorkers uint32
|
|
}
|
|
}
|
|
|
|
type shardCfg struct {
|
|
compress bool
|
|
estimateCompressibility bool
|
|
estimateCompressibilityThreshold float64
|
|
|
|
smallSizeObjectLimit uint64
|
|
uncompressableContentType []string
|
|
refillMetabase bool
|
|
mode shardmode.Mode
|
|
|
|
metaCfg struct {
|
|
path string
|
|
perm fs.FileMode
|
|
maxBatchSize int
|
|
maxBatchDelay time.Duration
|
|
}
|
|
|
|
subStorages []subStorageCfg
|
|
|
|
gcCfg struct {
|
|
removerBatchSize int
|
|
removerSleepInterval time.Duration
|
|
expiredCollectorBatchSize int
|
|
expiredCollectorWorkerCount int
|
|
}
|
|
|
|
writecacheCfg struct {
|
|
enabled bool
|
|
typ writecacheconfig.Type
|
|
path string
|
|
maxBatchSize int
|
|
maxBatchDelay time.Duration
|
|
smallObjectSize uint64
|
|
maxObjSize uint64
|
|
flushWorkerCount int
|
|
sizeLimit uint64
|
|
noSync bool
|
|
gcInterval time.Duration
|
|
}
|
|
|
|
piloramaCfg struct {
|
|
enabled bool
|
|
path string
|
|
perm fs.FileMode
|
|
noSync bool
|
|
maxBatchSize int
|
|
maxBatchDelay time.Duration
|
|
}
|
|
}
|
|
|
|
// id returns persistent id of a shard. It is different from the ID used in runtime
|
|
// and is primarily used to identify shards in the configuration.
|
|
func (c *shardCfg) id() string {
|
|
// This calculation should be kept in sync with
|
|
// pkg/local_object_storage/engine/control.go file.
|
|
var sb strings.Builder
|
|
for i := range c.subStorages {
|
|
sb.WriteString(filepath.Clean(c.subStorages[i].path))
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
type subStorageCfg struct {
|
|
// common for all storages
|
|
typ string
|
|
path string
|
|
perm fs.FileMode
|
|
depth uint64
|
|
noSync bool
|
|
|
|
// blobovnicza-specific
|
|
size uint64
|
|
width uint64
|
|
leafWidth uint64
|
|
openedCacheSize int
|
|
openedCacheTTL time.Duration
|
|
initWorkerCount int
|
|
initInAdvance bool
|
|
}
|
|
|
|
// readConfig fills applicationConfiguration with raw configuration values
|
|
// not modifying them.
|
|
func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|
if a._read {
|
|
err := c.Reload()
|
|
if err != nil {
|
|
return fmt.Errorf("could not reload configuration: %w", err)
|
|
}
|
|
|
|
err = validateConfig(c)
|
|
if err != nil {
|
|
return fmt.Errorf("configuration's validation: %w", err)
|
|
}
|
|
|
|
// clear if it is rereading
|
|
*a = applicationConfiguration{}
|
|
}
|
|
|
|
a._read = true
|
|
|
|
// Logger
|
|
|
|
a.LoggerCfg.level = loggerconfig.Level(c)
|
|
|
|
// Storage Engine
|
|
|
|
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
|
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
|
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
|
a.EngineCfg.rebuildWorkers = engineconfig.EngineRebuildWorkersCount(c)
|
|
|
|
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
|
}
|
|
|
|
func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig *shardconfig.Config) error {
|
|
var newConfig shardCfg
|
|
|
|
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
|
newConfig.mode = oldConfig.Mode()
|
|
newConfig.compress = oldConfig.Compress()
|
|
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
|
newConfig.estimateCompressibilityThreshold = oldConfig.EstimateCompressibilityThreshold()
|
|
newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes()
|
|
newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit()
|
|
|
|
a.setShardWriteCacheConfig(&newConfig, oldConfig)
|
|
|
|
a.setShardPiloramaConfig(c, &newConfig, oldConfig)
|
|
|
|
if err := a.setShardStorageConfig(&newConfig, oldConfig); err != nil {
|
|
return err
|
|
}
|
|
|
|
a.setMetabaseConfig(&newConfig, oldConfig)
|
|
|
|
a.setGCConfig(&newConfig, oldConfig)
|
|
|
|
a.EngineCfg.shards = append(a.EngineCfg.shards, newConfig)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
|
writeCacheCfg := oldConfig.WriteCache()
|
|
if writeCacheCfg.Enabled() {
|
|
wc := &newConfig.writecacheCfg
|
|
|
|
wc.enabled = true
|
|
wc.typ = writeCacheCfg.Type()
|
|
wc.path = writeCacheCfg.Path()
|
|
wc.maxBatchSize = writeCacheCfg.BoltDB().MaxBatchSize()
|
|
wc.maxBatchDelay = writeCacheCfg.BoltDB().MaxBatchDelay()
|
|
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
|
|
wc.smallObjectSize = writeCacheCfg.SmallObjectSize()
|
|
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
|
|
wc.sizeLimit = writeCacheCfg.SizeLimit()
|
|
wc.noSync = writeCacheCfg.NoSync()
|
|
wc.gcInterval = writeCacheCfg.GCInterval()
|
|
}
|
|
}
|
|
|
|
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
|
if config.BoolSafe(c.Sub("tree"), "enabled") {
|
|
piloramaCfg := oldConfig.Pilorama()
|
|
pr := &newConfig.piloramaCfg
|
|
|
|
pr.enabled = true
|
|
pr.path = piloramaCfg.Path()
|
|
pr.perm = piloramaCfg.Perm()
|
|
pr.noSync = piloramaCfg.NoSync()
|
|
pr.maxBatchSize = piloramaCfg.MaxBatchSize()
|
|
pr.maxBatchDelay = piloramaCfg.MaxBatchDelay()
|
|
}
|
|
}
|
|
|
|
func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) error {
|
|
blobStorCfg := oldConfig.BlobStor()
|
|
storagesCfg := blobStorCfg.Storages()
|
|
|
|
ss := make([]subStorageCfg, 0, len(storagesCfg))
|
|
for i := range storagesCfg {
|
|
var sCfg subStorageCfg
|
|
|
|
sCfg.typ = storagesCfg[i].Type()
|
|
sCfg.path = storagesCfg[i].Path()
|
|
sCfg.perm = storagesCfg[i].Perm()
|
|
|
|
switch storagesCfg[i].Type() {
|
|
case blobovniczatree.Type:
|
|
sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i]))
|
|
|
|
sCfg.size = sub.Size()
|
|
sCfg.depth = sub.ShallowDepth()
|
|
sCfg.width = sub.ShallowWidth()
|
|
sCfg.leafWidth = sub.LeafWidth()
|
|
sCfg.openedCacheSize = sub.OpenedCacheSize()
|
|
sCfg.openedCacheTTL = sub.OpenedCacheTTL()
|
|
sCfg.initWorkerCount = sub.InitWorkerCount()
|
|
sCfg.initInAdvance = sub.InitInAdvance()
|
|
case fstree.Type:
|
|
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
|
|
sCfg.depth = sub.Depth()
|
|
sCfg.noSync = sub.NoSync()
|
|
default:
|
|
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type())
|
|
}
|
|
|
|
ss = append(ss, sCfg)
|
|
}
|
|
|
|
newConfig.subStorages = ss
|
|
return nil
|
|
}
|
|
|
|
func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
|
metabaseCfg := oldConfig.Metabase()
|
|
m := &newConfig.metaCfg
|
|
|
|
m.path = metabaseCfg.Path()
|
|
m.perm = metabaseCfg.BoltDB().Perm()
|
|
m.maxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay()
|
|
m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize()
|
|
}
|
|
|
|
func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
|
gcCfg := oldConfig.GC()
|
|
newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
|
|
newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
|
|
newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
|
|
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
|
|
}
|
|
|
|
// internals contains application-specific internals that are created
|
|
// on application startup and are shared b/w the components during
|
|
// the application life cycle.
|
|
// It should not contain any read configuration values, component-specific
|
|
// helpers and fields.
|
|
type internals struct {
|
|
done chan struct{}
|
|
ctxCancel func()
|
|
internalErr chan error // channel for internal application errors at runtime
|
|
|
|
appCfg *config.Config
|
|
|
|
log *logger.Logger
|
|
|
|
wg sync.WaitGroup
|
|
workers []worker
|
|
closers []closer
|
|
|
|
apiVersion version.Version
|
|
healthStatus *atomic.Int32
|
|
// is node under maintenance
|
|
isMaintenance atomic.Bool
|
|
|
|
sdNotify bool
|
|
}
|
|
|
|
// starts node's maintenance.
|
|
func (c *cfg) startMaintenance() {
|
|
c.isMaintenance.Store(true)
|
|
c.cfgNetmap.state.setControlNetmapStatus(control.NetmapStatus_MAINTENANCE)
|
|
c.log.Info(logs.FrostFSNodeStartedLocalNodesMaintenance)
|
|
}
|
|
|
|
// stops node's maintenance.
|
|
func (c *internals) stopMaintenance() {
|
|
c.isMaintenance.Store(false)
|
|
c.log.Info(logs.FrostFSNodeStoppedLocalNodesMaintenance)
|
|
}
|
|
|
|
// IsMaintenance checks if storage node is under maintenance.
|
|
//
|
|
// Provides util.NodeState to Object service.
|
|
func (c *internals) IsMaintenance() bool {
|
|
return c.isMaintenance.Load()
|
|
}
|
|
|
|
// shared contains component-specific structs/helpers that should
|
|
// be shared during initialization of the application.
|
|
type shared struct {
|
|
privateTokenStore sessionStorage
|
|
persistate *state.PersistentStorage
|
|
|
|
clientCache *cache.ClientCache
|
|
bgClientCache *cache.ClientCache
|
|
putClientCache *cache.ClientCache
|
|
localAddr network.AddressGroup
|
|
|
|
key *keys.PrivateKey
|
|
binPublicKey []byte
|
|
ownerIDFromKey user.ID // user ID calculated from key
|
|
|
|
// current network map
|
|
netMap atomic.Value // type netmap.NetMap
|
|
netMapSource netmapCore.Source
|
|
|
|
cnrClient *containerClient.Client
|
|
|
|
respSvc *response.Service
|
|
|
|
replicator *replicator.Replicator
|
|
|
|
treeService *tree.Service
|
|
|
|
metricsCollector *metrics.NodeMetrics
|
|
|
|
metricsSvc *objectService.MetricCollector
|
|
}
|
|
|
|
// dynamicConfiguration stores parameters of the
|
|
// components that supports runtime reconfigurations.
|
|
type dynamicConfiguration struct {
|
|
logger *logger.Prm
|
|
pprof *httpComponent
|
|
metrics *httpComponent
|
|
}
|
|
|
|
type appConfigGuard struct {
|
|
mtx sync.RWMutex
|
|
}
|
|
|
|
func (g *appConfigGuard) LockAppConfigShared() func() {
|
|
g.mtx.RLock()
|
|
return func() { g.mtx.RUnlock() }
|
|
}
|
|
|
|
func (g *appConfigGuard) LockAppConfigExclusive() func() {
|
|
g.mtx.Lock()
|
|
return func() { g.mtx.Unlock() }
|
|
}
|
|
|
|
type cfg struct {
|
|
applicationConfiguration
|
|
internals
|
|
shared
|
|
dynamicConfiguration
|
|
appConfigGuard
|
|
|
|
// configuration of the internal
|
|
// services
|
|
cfgGRPC cfgGRPC
|
|
cfgMorph cfgMorph
|
|
cfgAccounting cfgAccounting
|
|
cfgContainer cfgContainer
|
|
cfgNodeInfo cfgNodeInfo
|
|
cfgNetmap cfgNetmap
|
|
cfgControlService cfgControlService
|
|
cfgObject cfgObject
|
|
cfgNotifications cfgNotifications
|
|
}
|
|
|
|
// ReadCurrentNetMap reads network map which has been cached at the
|
|
// latest epoch. Returns an error if value has not been cached yet.
|
|
//
|
|
// Provides interface for NetmapService server.
|
|
func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
|
|
val := c.netMap.Load()
|
|
if val == nil {
|
|
return errors.New("missing local network map")
|
|
}
|
|
|
|
val.(netmap.NetMap).WriteToV2(msg)
|
|
|
|
return nil
|
|
}
|
|
|
|
type grpcServer struct {
|
|
Listener net.Listener
|
|
Server *grpc.Server
|
|
Endpoint string
|
|
}
|
|
|
|
type cfgGRPC struct {
|
|
// guard protects connections and handlers
|
|
guard sync.RWMutex
|
|
// servers must be protected with guard
|
|
servers []grpcServer
|
|
// handlers must be protected with guard
|
|
handlers []func(e string, l net.Listener, s *grpc.Server)
|
|
|
|
maxChunkSize uint64
|
|
maxAddrAmount uint64
|
|
reconnectTimeout time.Duration
|
|
}
|
|
|
|
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
|
c.guard.Lock()
|
|
defer c.guard.Unlock()
|
|
|
|
c.servers = append(c.servers, grpcServer{
|
|
Listener: l,
|
|
Server: s,
|
|
Endpoint: e,
|
|
})
|
|
}
|
|
|
|
func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) {
|
|
c.guard.Lock()
|
|
defer c.guard.Unlock()
|
|
|
|
c.servers = append(c.servers, grpcServer{
|
|
Listener: l,
|
|
Server: s,
|
|
Endpoint: e,
|
|
})
|
|
|
|
for _, h := range c.handlers {
|
|
h(e, l, s)
|
|
}
|
|
}
|
|
|
|
func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) {
|
|
c.guard.Lock()
|
|
defer c.guard.Unlock()
|
|
|
|
for _, conn := range c.servers {
|
|
handler(conn.Endpoint, conn.Listener, conn.Server)
|
|
}
|
|
|
|
c.handlers = append(c.handlers, handler)
|
|
}
|
|
|
|
func (c *cfgGRPC) dropConnection(endpoint string) {
|
|
c.guard.Lock()
|
|
defer c.guard.Unlock()
|
|
|
|
pos := -1
|
|
for idx, srv := range c.servers {
|
|
if srv.Endpoint == endpoint {
|
|
pos = idx
|
|
break
|
|
}
|
|
}
|
|
if pos < 0 {
|
|
return
|
|
}
|
|
|
|
c.servers[pos].Server.Stop() // closes listener
|
|
c.servers = append(c.servers[0:pos], c.servers[pos+1:]...)
|
|
}
|
|
|
|
type cfgMorph struct {
|
|
client *client.Client
|
|
|
|
notaryEnabled bool
|
|
|
|
// TTL of Sidechain cached values. Non-positive value disables caching.
|
|
cacheTTL time.Duration
|
|
|
|
proxyScriptHash neogoutil.Uint160
|
|
}
|
|
|
|
type cfgAccounting struct {
|
|
scriptHash neogoutil.Uint160
|
|
}
|
|
|
|
type cfgContainer struct {
|
|
scriptHash neogoutil.Uint160
|
|
|
|
parsers map[event.Type]event.NotificationParser
|
|
subscribers map[event.Type][]event.Handler
|
|
workerPool util.WorkerPool // pool for asynchronous handlers
|
|
}
|
|
|
|
type cfgNetmap struct {
|
|
scriptHash neogoutil.Uint160
|
|
wrapper *nmClient.Client
|
|
|
|
parsers map[event.Type]event.NotificationParser
|
|
|
|
subscribers map[event.Type][]event.Handler
|
|
workerPool util.WorkerPool // pool for asynchronous handlers
|
|
|
|
state *networkState
|
|
|
|
needBootstrap bool
|
|
reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime
|
|
startEpoch uint64 // epoch number when application is started
|
|
}
|
|
|
|
type cfgNodeInfo struct {
|
|
// values from config
|
|
localInfo netmap.NodeInfo
|
|
}
|
|
|
|
type cfgObject struct {
|
|
getSvc *getsvc.Service
|
|
|
|
cnrSource container.Source
|
|
|
|
eaclSource container.EACLSource
|
|
|
|
cfgAccessPolicyEngine cfgAccessPolicyEngine
|
|
|
|
pool cfgObjectRoutines
|
|
|
|
cfgLocalStorage cfgLocalStorage
|
|
|
|
tombstoneLifetime uint64
|
|
|
|
skipSessionTokenIssuerVerification bool
|
|
}
|
|
|
|
type cfgNotifications struct {
|
|
enabled bool
|
|
nw notificationWriter
|
|
defaultTopic string
|
|
}
|
|
|
|
type cfgLocalStorage struct {
|
|
localStorage *engine.StorageEngine
|
|
}
|
|
|
|
type cfgAccessPolicyEngine struct {
|
|
policyContractHash neogoutil.Uint160
|
|
|
|
accessPolicyEngine *accessPolicyEngine
|
|
}
|
|
|
|
type cfgObjectRoutines struct {
|
|
putRemote *ants.Pool
|
|
|
|
putRemoteCapacity int
|
|
|
|
putLocal *ants.Pool
|
|
|
|
putLocalCapacity int
|
|
|
|
replicatorPoolSize int
|
|
|
|
replication *ants.Pool
|
|
}
|
|
|
|
type cfgControlService struct {
|
|
server *grpc.Server
|
|
}
|
|
|
|
var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
|
|
|
|
func initCfg(appCfg *config.Config) *cfg {
|
|
c := &cfg{}
|
|
|
|
err := c.readConfig(appCfg)
|
|
if err != nil {
|
|
panic(fmt.Errorf("config reading: %w", err))
|
|
}
|
|
|
|
key := nodeconfig.Key(appCfg)
|
|
|
|
relayOnly := nodeconfig.Relay(appCfg)
|
|
|
|
netState := newNetworkState()
|
|
|
|
c.shared = initShared(appCfg, key, netState, relayOnly)
|
|
|
|
netState.metrics = c.metricsCollector
|
|
|
|
logPrm, err := c.loggerPrm()
|
|
fatalOnErr(err)
|
|
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
|
log, err := logger.NewLogger(logPrm)
|
|
fatalOnErr(err)
|
|
if loggerconfig.ToLokiConfig(appCfg).Enabled {
|
|
log.Logger = log.Logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
|
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(appCfg))
|
|
return lokiCore
|
|
}))
|
|
}
|
|
|
|
c.internals = initInternals(appCfg, log)
|
|
|
|
c.cfgAccounting = cfgAccounting{
|
|
scriptHash: contractsconfig.Balance(appCfg),
|
|
}
|
|
c.cfgContainer = initContainer(appCfg)
|
|
|
|
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
|
|
|
|
c.cfgGRPC = initCfgGRPC()
|
|
|
|
c.cfgMorph = cfgMorph{
|
|
proxyScriptHash: contractsconfig.Proxy(appCfg),
|
|
}
|
|
c.cfgObject = initCfgObject(appCfg)
|
|
|
|
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
|
|
|
|
c.onShutdown(c.clientCache.CloseAll) // clean up connections
|
|
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
|
|
c.onShutdown(c.putClientCache.CloseAll) // clean up connections
|
|
c.onShutdown(func() { _ = c.persistate.Close() })
|
|
|
|
return c
|
|
}
|
|
|
|
func initInternals(appCfg *config.Config, log *logger.Logger) internals {
|
|
var healthStatus atomic.Int32
|
|
healthStatus.Store(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED))
|
|
|
|
return internals{
|
|
done: make(chan struct{}),
|
|
appCfg: appCfg,
|
|
internalErr: make(chan error),
|
|
log: log,
|
|
apiVersion: version.Current(),
|
|
healthStatus: &healthStatus,
|
|
sdNotify: initSdNotify(appCfg),
|
|
}
|
|
}
|
|
|
|
func initSdNotify(appCfg *config.Config) bool {
|
|
if config.BoolSafe(appCfg.Sub("systemdnotify"), "enabled") {
|
|
fatalOnErr(sdnotify.InitSocket())
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState, relayOnly bool) shared {
|
|
var netAddr network.AddressGroup
|
|
|
|
if !relayOnly {
|
|
netAddr = nodeconfig.BootstrapAddresses(appCfg)
|
|
}
|
|
|
|
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
|
|
fatalOnErr(err)
|
|
|
|
cacheOpts := cache.ClientCacheOpts{
|
|
DialTimeout: apiclientconfig.DialTimeout(appCfg),
|
|
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
|
|
Key: &key.PrivateKey,
|
|
AllowExternal: apiclientconfig.AllowExternal(appCfg),
|
|
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
|
|
}
|
|
|
|
return shared{
|
|
key: key,
|
|
binPublicKey: key.PublicKey().Bytes(),
|
|
localAddr: netAddr,
|
|
respSvc: response.NewService(netState),
|
|
clientCache: cache.NewSDKClientCache(cacheOpts),
|
|
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
|
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
|
persistate: persistate,
|
|
metricsCollector: metrics.NewNodeMetrics(),
|
|
}
|
|
}
|
|
|
|
func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap {
|
|
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
|
fatalOnErr(err)
|
|
|
|
var reBootstrapTurnedOff atomic.Bool
|
|
reBootstrapTurnedOff.Store(relayOnly)
|
|
return cfgNetmap{
|
|
scriptHash: contractsconfig.Netmap(appCfg),
|
|
state: netState,
|
|
workerPool: netmapWorkerPool,
|
|
needBootstrap: !relayOnly,
|
|
reBoostrapTurnedOff: &reBootstrapTurnedOff,
|
|
}
|
|
}
|
|
|
|
func initContainer(appCfg *config.Config) cfgContainer {
|
|
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
|
fatalOnErr(err)
|
|
|
|
return cfgContainer{
|
|
scriptHash: contractsconfig.Container(appCfg),
|
|
workerPool: containerWorkerPool,
|
|
}
|
|
}
|
|
|
|
func initCfgGRPC() cfgGRPC {
|
|
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
|
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes
|
|
|
|
return cfgGRPC{
|
|
maxChunkSize: maxChunkSize,
|
|
maxAddrAmount: maxAddrAmount,
|
|
}
|
|
}
|
|
|
|
func initCfgObject(appCfg *config.Config) cfgObject {
|
|
return cfgObject{
|
|
pool: initObjectPool(appCfg),
|
|
tombstoneLifetime: objectconfig.TombstoneLifetime(appCfg),
|
|
skipSessionTokenIssuerVerification: objectconfig.Put(appCfg).SkipSessionTokenIssuerVerification(),
|
|
}
|
|
}
|
|
|
|
func (c *cfg) engineOpts() []engine.Option {
|
|
var opts []engine.Option
|
|
|
|
opts = append(opts,
|
|
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
|
|
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
|
engine.WithLogger(c.log),
|
|
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
|
engine.WithRebuildWorkersCount(c.EngineCfg.rebuildWorkers),
|
|
)
|
|
|
|
if c.metricsCollector != nil {
|
|
opts = append(opts, engine.WithMetrics(c.metricsCollector.Engine()))
|
|
}
|
|
|
|
return opts
|
|
}
|
|
|
|
type shardOptsWithID struct {
|
|
configID string
|
|
shOpts []shard.Option
|
|
}
|
|
|
|
func (c *cfg) shardOpts() []shardOptsWithID {
|
|
shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards))
|
|
|
|
for _, shCfg := range c.EngineCfg.shards {
|
|
shards = append(shards, c.getShardOpts(shCfg))
|
|
}
|
|
|
|
return shards
|
|
}
|
|
|
|
func (c *cfg) getWriteCacheOpts(shCfg shardCfg) writecacheconfig.Options {
|
|
var writeCacheOpts writecacheconfig.Options
|
|
if wcRead := shCfg.writecacheCfg; wcRead.enabled {
|
|
switch wcRead.typ {
|
|
case writecacheconfig.TypeBBolt:
|
|
writeCacheOpts.Type = writecacheconfig.TypeBBolt
|
|
writeCacheOpts.BBoltOptions = append(writeCacheOpts.BBoltOptions,
|
|
writecachebbolt.WithPath(wcRead.path),
|
|
writecachebbolt.WithMaxBatchSize(wcRead.maxBatchSize),
|
|
writecachebbolt.WithMaxBatchDelay(wcRead.maxBatchDelay),
|
|
writecachebbolt.WithMaxObjectSize(wcRead.maxObjSize),
|
|
writecachebbolt.WithSmallObjectSize(wcRead.smallObjectSize),
|
|
writecachebbolt.WithFlushWorkersCount(wcRead.flushWorkerCount),
|
|
writecachebbolt.WithMaxCacheSize(wcRead.sizeLimit),
|
|
writecachebbolt.WithNoSync(wcRead.noSync),
|
|
writecachebbolt.WithLogger(c.log),
|
|
)
|
|
case writecacheconfig.TypeBadger:
|
|
writeCacheOpts.Type = writecacheconfig.TypeBadger
|
|
writeCacheOpts.BadgerOptions = append(writeCacheOpts.BadgerOptions,
|
|
writecachebadger.WithPath(wcRead.path),
|
|
writecachebadger.WithMaxObjectSize(wcRead.maxObjSize),
|
|
writecachebadger.WithFlushWorkersCount(wcRead.flushWorkerCount),
|
|
writecachebadger.WithMaxCacheSize(wcRead.sizeLimit),
|
|
writecachebadger.WithLogger(c.log),
|
|
writecachebadger.WithGCInterval(wcRead.gcInterval),
|
|
)
|
|
default:
|
|
panic(fmt.Sprintf("unknown writecache type: %q", wcRead.typ))
|
|
}
|
|
}
|
|
return writeCacheOpts
|
|
}
|
|
|
|
func (c *cfg) getPiloramaOpts(shCfg shardCfg) []pilorama.Option {
|
|
var piloramaOpts []pilorama.Option
|
|
if prRead := shCfg.piloramaCfg; prRead.enabled {
|
|
piloramaOpts = append(piloramaOpts,
|
|
pilorama.WithPath(prRead.path),
|
|
pilorama.WithPerm(prRead.perm),
|
|
pilorama.WithNoSync(prRead.noSync),
|
|
pilorama.WithMaxBatchSize(prRead.maxBatchSize),
|
|
pilorama.WithMaxBatchDelay(prRead.maxBatchDelay),
|
|
)
|
|
if c.metricsCollector != nil {
|
|
piloramaOpts = append(piloramaOpts, pilorama.WithMetrics(lsmetrics.NewPiloramaMetrics(c.metricsCollector.PiloramaMetrics())))
|
|
}
|
|
}
|
|
return piloramaOpts
|
|
}
|
|
|
|
func (c *cfg) getSubstorageOpts(shCfg shardCfg) []blobstor.SubStorage {
|
|
var ss []blobstor.SubStorage
|
|
for _, sRead := range shCfg.subStorages {
|
|
switch sRead.typ {
|
|
case blobovniczatree.Type:
|
|
blobTreeOpts := []blobovniczatree.Option{
|
|
blobovniczatree.WithRootPath(sRead.path),
|
|
blobovniczatree.WithPermissions(sRead.perm),
|
|
blobovniczatree.WithBlobovniczaSize(sRead.size),
|
|
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
|
|
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
|
|
blobovniczatree.WithBlobovniczaLeafWidth(sRead.leafWidth),
|
|
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
|
|
blobovniczatree.WithOpenedCacheTTL(sRead.openedCacheTTL),
|
|
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
|
|
blobovniczatree.WithInitInAdvance(sRead.initInAdvance),
|
|
blobovniczatree.WithLogger(c.log),
|
|
blobovniczatree.WithObjectSizeLimit(shCfg.smallSizeObjectLimit),
|
|
}
|
|
|
|
if c.metricsCollector != nil {
|
|
blobTreeOpts = append(blobTreeOpts,
|
|
blobovniczatree.WithMetrics(
|
|
lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobobvnizcaTreeMetrics()),
|
|
),
|
|
)
|
|
}
|
|
ss = append(ss, blobstor.SubStorage{
|
|
Storage: blobovniczatree.NewBlobovniczaTree(blobTreeOpts...),
|
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
|
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
|
},
|
|
})
|
|
case fstree.Type:
|
|
fstreeOpts := []fstree.Option{
|
|
fstree.WithPath(sRead.path),
|
|
fstree.WithPerm(sRead.perm),
|
|
fstree.WithDepth(sRead.depth),
|
|
fstree.WithNoSync(sRead.noSync),
|
|
fstree.WithLogger(c.log),
|
|
}
|
|
if c.metricsCollector != nil {
|
|
fstreeOpts = append(fstreeOpts,
|
|
fstree.WithMetrics(
|
|
lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()),
|
|
),
|
|
)
|
|
}
|
|
|
|
ss = append(ss, blobstor.SubStorage{
|
|
Storage: fstree.New(fstreeOpts...),
|
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
|
return true
|
|
},
|
|
})
|
|
default:
|
|
// should never happen, that has already
|
|
// been handled: when the config was read
|
|
}
|
|
}
|
|
return ss
|
|
}
|
|
|
|
func (c *cfg) getShardOpts(shCfg shardCfg) shardOptsWithID {
|
|
writeCacheOpts := c.getWriteCacheOpts(shCfg)
|
|
piloramaOpts := c.getPiloramaOpts(shCfg)
|
|
ss := c.getSubstorageOpts(shCfg)
|
|
|
|
blobstoreOpts := []blobstor.Option{
|
|
blobstor.WithCompressObjects(shCfg.compress),
|
|
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
|
|
blobstor.WithCompressibilityEstimate(shCfg.estimateCompressibility),
|
|
blobstor.WithCompressibilityEstimateThreshold(shCfg.estimateCompressibilityThreshold),
|
|
blobstor.WithStorages(ss),
|
|
blobstor.WithLogger(c.log),
|
|
}
|
|
if c.metricsCollector != nil {
|
|
blobstoreOpts = append(blobstoreOpts, blobstor.WithMetrics(lsmetrics.NewBlobstoreMetrics(c.metricsCollector.Blobstore())))
|
|
}
|
|
|
|
mbOptions := []meta.Option{
|
|
meta.WithPath(shCfg.metaCfg.path),
|
|
meta.WithPermissions(shCfg.metaCfg.perm),
|
|
meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize),
|
|
meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay),
|
|
meta.WithBoltDBOptions(&bbolt.Options{
|
|
Timeout: 100 * time.Millisecond,
|
|
}),
|
|
meta.WithLogger(c.log),
|
|
meta.WithEpochState(c.cfgNetmap.state),
|
|
}
|
|
if c.metricsCollector != nil {
|
|
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
|
|
}
|
|
|
|
var sh shardOptsWithID
|
|
sh.configID = shCfg.id()
|
|
sh.shOpts = []shard.Option{
|
|
shard.WithLogger(c.log),
|
|
shard.WithRefillMetabase(shCfg.refillMetabase),
|
|
shard.WithMode(shCfg.mode),
|
|
shard.WithBlobStorOptions(blobstoreOpts...),
|
|
shard.WithMetaBaseOptions(mbOptions...),
|
|
shard.WithPiloramaOptions(piloramaOpts...),
|
|
shard.WithWriteCache(shCfg.writecacheCfg.enabled),
|
|
shard.WithWriteCacheOptions(writeCacheOpts),
|
|
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
|
|
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
|
|
shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
|
|
shard.WithExpiredCollectorWorkerCount(shCfg.gcCfg.expiredCollectorWorkerCount),
|
|
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
|
pool, err := ants.NewPool(sz)
|
|
fatalOnErr(err)
|
|
|
|
return pool
|
|
}),
|
|
}
|
|
return sh
|
|
}
|
|
|
|
func (c *cfg) loggerPrm() (*logger.Prm, error) {
|
|
// check if it has been inited before
|
|
if c.dynamicConfiguration.logger == nil {
|
|
c.dynamicConfiguration.logger = new(logger.Prm)
|
|
}
|
|
|
|
// (re)init read configuration
|
|
err := c.dynamicConfiguration.logger.SetLevelString(c.LoggerCfg.level)
|
|
if err != nil {
|
|
// not expected since validation should be performed before
|
|
panic(fmt.Sprintf("incorrect log level format: %s", c.LoggerCfg.level))
|
|
}
|
|
|
|
return c.dynamicConfiguration.logger, nil
|
|
}
|
|
|
|
func (c *cfg) LocalAddress() network.AddressGroup {
|
|
return c.localAddr
|
|
}
|
|
|
|
func initLocalStorage(ctx context.Context, c *cfg) {
|
|
ls := engine.New(c.engineOpts()...)
|
|
|
|
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
|
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
|
|
})
|
|
|
|
// allocate memory for the service;
|
|
// service will be created later
|
|
c.cfgObject.getSvc = new(getsvc.Service)
|
|
|
|
var shardsAttached int
|
|
for _, optsWithMeta := range c.shardOpts() {
|
|
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts, shard.WithTombstoneSource(c.createTombstoneSource()))...)
|
|
if err != nil {
|
|
c.log.Error(logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
|
} else {
|
|
shardsAttached++
|
|
c.log.Info(logs.FrostFSNodeShardAttachedToEngine, zap.Stringer("id", id))
|
|
}
|
|
}
|
|
if shardsAttached == 0 {
|
|
fatalOnErr(engineconfig.ErrNoShardConfigured)
|
|
}
|
|
|
|
c.cfgObject.cfgLocalStorage.localStorage = ls
|
|
|
|
c.onShutdown(func() {
|
|
c.log.Info(logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
|
|
|
|
err := ls.Close(context.Background())
|
|
if err != nil {
|
|
c.log.Info(logs.FrostFSNodeStorageEngineClosingFailure,
|
|
zap.String("error", err.Error()),
|
|
)
|
|
} else {
|
|
c.log.Info(logs.FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully)
|
|
}
|
|
})
|
|
}
|
|
|
|
func initAccessPolicyEngine(_ context.Context, c *cfg) {
|
|
var localOverrideDB chainbase.LocalOverrideDatabase
|
|
if nodeconfig.PersistentPolicyRules(c.appCfg).Path() == "" {
|
|
c.log.Warn(logs.FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed)
|
|
localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase()
|
|
} else {
|
|
localOverrideDB = chainbase.NewBoltLocalOverrideDatabase(
|
|
chainbase.WithLogger(c.log),
|
|
chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()),
|
|
chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()),
|
|
chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()),
|
|
)
|
|
}
|
|
|
|
morphRuleStorage := policy_client.NewContractStorage(
|
|
c.cfgMorph.client.GetActor(),
|
|
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
|
|
|
|
ape := newAccessPolicyEngine(morphRuleStorage, localOverrideDB)
|
|
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine = ape
|
|
|
|
c.onShutdown(func() {
|
|
if err := ape.LocalOverrideDatabaseCore().Close(); err != nil {
|
|
c.log.Warn(logs.FrostFSNodeAccessPolicyEngineClosingFailure,
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
})
|
|
}
|
|
|
|
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
|
var err error
|
|
|
|
optNonBlocking := ants.WithNonblocking(true)
|
|
|
|
pool.putRemoteCapacity = objectconfig.Put(cfg).PoolSizeRemote()
|
|
pool.putRemote, err = ants.NewPool(pool.putRemoteCapacity, optNonBlocking)
|
|
fatalOnErr(err)
|
|
|
|
pool.putLocalCapacity = objectconfig.Put(cfg).PoolSizeLocal()
|
|
pool.putLocal, err = ants.NewPool(pool.putLocalCapacity, optNonBlocking)
|
|
fatalOnErr(err)
|
|
|
|
pool.replicatorPoolSize = replicatorconfig.PoolSize(cfg)
|
|
if pool.replicatorPoolSize <= 0 {
|
|
pool.replicatorPoolSize = pool.putRemoteCapacity
|
|
}
|
|
|
|
pool.replication, err = ants.NewPool(pool.replicatorPoolSize)
|
|
fatalOnErr(err)
|
|
|
|
return pool
|
|
}
|
|
|
|
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
|
|
var res netmapV2.NodeInfo
|
|
|
|
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
|
if ok {
|
|
ni.WriteToV2(&res)
|
|
} else {
|
|
c.cfgNodeInfo.localInfo.WriteToV2(&res)
|
|
}
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
// handleLocalNodeInfo rewrites local node info from the FrostFS network map.
|
|
// Called with nil when storage node is outside the FrostFS network map
|
|
// (before entering the network and after leaving it).
|
|
func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) {
|
|
c.cfgNetmap.state.setNodeInfo(ni)
|
|
}
|
|
|
|
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
|
|
// with the binary-encoded information from the current node's configuration.
|
|
// The state is set using the provided setter which MUST NOT be nil.
|
|
func (c *cfg) bootstrapWithState(stateSetter func(*netmap.NodeInfo)) error {
|
|
ni := c.cfgNodeInfo.localInfo
|
|
stateSetter(&ni)
|
|
|
|
prm := nmClient.AddPeerPrm{}
|
|
prm.SetNodeInfo(ni)
|
|
|
|
return c.cfgNetmap.wrapper.AddPeer(prm)
|
|
}
|
|
|
|
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
|
|
func bootstrapOnline(c *cfg) error {
|
|
return c.bootstrapWithState((*netmap.NodeInfo).SetOnline)
|
|
}
|
|
|
|
// bootstrap calls bootstrapWithState with:
|
|
// - "maintenance" state if maintenance is in progress on the current node
|
|
// - "online", otherwise
|
|
func (c *cfg) bootstrap() error {
|
|
// switch to online except when under maintenance
|
|
st := c.cfgNetmap.state.controlNetmapStatus()
|
|
if st == control.NetmapStatus_MAINTENANCE {
|
|
c.log.Info(logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
|
|
return c.bootstrapWithState((*netmap.NodeInfo).SetMaintenance)
|
|
}
|
|
|
|
c.log.Info(logs.FrostFSNodeBootstrappingWithOnlineState,
|
|
zap.Stringer("previous", st),
|
|
)
|
|
|
|
return bootstrapOnline(c)
|
|
}
|
|
|
|
// needBootstrap checks if local node should be registered in network on bootup.
|
|
func (c *cfg) needBootstrap() bool {
|
|
return c.cfgNetmap.needBootstrap
|
|
}
|
|
|
|
type dCmp struct {
|
|
name string
|
|
reloadFunc func() error
|
|
}
|
|
|
|
func (c *cfg) signalWatcher(ctx context.Context) {
|
|
ch := make(chan os.Signal, 1)
|
|
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
for {
|
|
select {
|
|
case sig := <-ch:
|
|
switch sig {
|
|
case syscall.SIGHUP:
|
|
c.reloadConfig(ctx)
|
|
case syscall.SIGTERM, syscall.SIGINT:
|
|
c.log.Info(logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)
|
|
|
|
c.shutdown()
|
|
|
|
c.log.Info(logs.FrostFSNodeTerminationSignalProcessingIsComplete)
|
|
return
|
|
}
|
|
case err := <-c.internalErr: // internal application error
|
|
c.log.Warn(logs.FrostFSNodeInternalApplicationError,
|
|
zap.String("message", err.Error()))
|
|
|
|
c.shutdown()
|
|
|
|
c.log.Info(logs.FrostFSNodeInternalErrorProcessingIsComplete)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *cfg) reloadConfig(ctx context.Context) {
|
|
c.log.Info(logs.FrostFSNodeSIGHUPHasBeenReceivedRereadingConfiguration)
|
|
|
|
if !c.compareAndSwapHealthStatus(control.HealthStatus_READY, control.HealthStatus_RECONFIGURING) {
|
|
c.log.Info(logs.FrostFSNodeSIGHUPSkip)
|
|
return
|
|
}
|
|
defer c.compareAndSwapHealthStatus(control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
|
|
|
|
err := c.reloadAppConfig()
|
|
if err != nil {
|
|
c.log.Error(logs.FrostFSNodeConfigurationReading, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// all the components are expected to support
|
|
// Logger's dynamic reconfiguration approach
|
|
var components []dCmp
|
|
|
|
// Logger
|
|
|
|
logPrm, err := c.loggerPrm()
|
|
if err != nil {
|
|
c.log.Error(logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
components = append(components, dCmp{"logger", logPrm.Reload})
|
|
components = append(components, dCmp{"runtime", func() error {
|
|
setRuntimeParameters(c)
|
|
return nil
|
|
}})
|
|
components = append(components, dCmp{"tracing", func() error {
|
|
updated, err := tracing.Setup(ctx, *tracingconfig.ToTracingConfig(c.appCfg))
|
|
if updated {
|
|
c.log.Info(logs.FrostFSNodeTracingConfigationUpdated)
|
|
}
|
|
return err
|
|
}})
|
|
if cmp, updated := metricsComponent(c); updated {
|
|
if cmp.enabled {
|
|
cmp.preReload = enableMetricsSvc
|
|
} else {
|
|
cmp.preReload = disableMetricsSvc
|
|
}
|
|
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
|
}
|
|
if cmp, updated := pprofComponent(c); updated {
|
|
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
|
}
|
|
|
|
// Storage Engine
|
|
|
|
var rcfg engine.ReConfiguration
|
|
for _, optsWithID := range c.shardOpts() {
|
|
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts, shard.WithTombstoneSource(c.createTombstoneSource())))
|
|
}
|
|
|
|
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
|
|
if err != nil {
|
|
c.log.Error(logs.FrostFSNodeStorageEngineConfigurationUpdate, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
for _, component := range components {
|
|
err = component.reloadFunc()
|
|
if err != nil {
|
|
c.log.Error(logs.FrostFSNodeUpdatedConfigurationApplying,
|
|
zap.String("component", component.name),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
c.log.Info(logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
|
}
|
|
|
|
func (c *cfg) reloadAppConfig() error {
|
|
unlock := c.LockAppConfigExclusive()
|
|
defer unlock()
|
|
|
|
return c.readConfig(c.appCfg)
|
|
}
|
|
|
|
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
|
var tssPrm tsourse.TombstoneSourcePrm
|
|
tssPrm.SetGetService(c.cfgObject.getSvc)
|
|
tombstoneSrc := tsourse.NewSource(tssPrm)
|
|
|
|
tombstoneSource := tombstone.NewChecker(
|
|
tombstone.WithLogger(c.log),
|
|
tombstone.WithTombstoneSource(tombstoneSrc),
|
|
)
|
|
return tombstoneSource
|
|
}
|
|
|
|
func (c *cfg) shutdown() {
|
|
old := c.swapHealthStatus(control.HealthStatus_SHUTTING_DOWN)
|
|
if old == control.HealthStatus_SHUTTING_DOWN {
|
|
c.log.Info(logs.FrostFSNodeShutdownSkip)
|
|
return
|
|
}
|
|
if old == control.HealthStatus_STARTING {
|
|
c.log.Warn(logs.FrostFSNodeShutdownWhenNotReady)
|
|
}
|
|
|
|
c.ctxCancel()
|
|
close(c.done)
|
|
for i := range c.closers {
|
|
c.closers[len(c.closers)-1-i].fn()
|
|
}
|
|
}
|