forked from TrueCloudLab/frostfs-node
Evgenii Stratonikov
6a51086030
The notary is always enabled and this option does always work. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
1494 lines
44 KiB
Go
1494 lines
44 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io/fs"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config"
|
|
apiclientconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/apiclient"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/audit"
|
|
contractsconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/contracts"
|
|
engineconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine"
|
|
shardconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard"
|
|
blobovniczaconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/blobovnicza"
|
|
fstreeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/engine/shard/blobstor/fstree"
|
|
loggerconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/logger"
|
|
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/multinet"
|
|
nodeconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/node"
|
|
objectconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/object"
|
|
replicatorconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/replicator"
|
|
tracingconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/metrics"
|
|
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/chainbase"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
frostfsidcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/frostfsid"
|
|
netmapCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
|
lsmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metrics"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
shardmode "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
|
containerClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
netmap2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/cache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control"
|
|
objectService "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone"
|
|
tsourse "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/tombstone/source"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/tree"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util/response"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sdnotify"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/logging/lokicore"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
netmapV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
|
|
policy_engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
|
policy_client "git.frostfs.info/TrueCloudLab/policy-engine/pkg/morph/policy"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
neogoutil "github.com/nspcc-dev/neo-go/pkg/util"
|
|
"github.com/panjf2000/ants/v2"
|
|
"go.etcd.io/bbolt"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const addressSize = 72 // 32 bytes object ID, 32 bytes container ID, 8 bytes protobuf encoding
|
|
|
|
const maxMsgSize = 4 << 20 // transport msg limit 4 MiB
|
|
|
|
// capacity of the pools of the morph notification handlers
|
|
// for each contract listener.
|
|
const notificationHandlerPoolSize = 10
|
|
|
|
// applicationConfiguration reads and stores component-specific configuration
|
|
// values. It should not store any application helpers structs (pointers to shared
|
|
// structs).
|
|
// It must not be used concurrently.
|
|
type applicationConfiguration struct {
|
|
// _read indicated whether a config
|
|
// has already been read
|
|
_read bool
|
|
|
|
LoggerCfg struct {
|
|
level string
|
|
destination string
|
|
timestamp bool
|
|
}
|
|
|
|
ObjectCfg struct {
|
|
tombstoneLifetime uint64
|
|
priorityMetrics []placement.Metric
|
|
}
|
|
|
|
EngineCfg struct {
|
|
errorThreshold uint32
|
|
shardPoolSize uint32
|
|
shards []shardCfg
|
|
lowMem bool
|
|
}
|
|
|
|
// if need to run node in compatibility with other versions mode
|
|
cmode *atomic.Bool
|
|
}
|
|
|
|
type shardCfg struct {
|
|
compress bool
|
|
estimateCompressibility bool
|
|
estimateCompressibilityThreshold float64
|
|
|
|
smallSizeObjectLimit uint64
|
|
uncompressableContentType []string
|
|
refillMetabase bool
|
|
refillMetabaseWorkersCount int
|
|
mode shardmode.Mode
|
|
|
|
metaCfg struct {
|
|
path string
|
|
perm fs.FileMode
|
|
maxBatchSize int
|
|
maxBatchDelay time.Duration
|
|
}
|
|
|
|
subStorages []subStorageCfg
|
|
|
|
gcCfg struct {
|
|
removerBatchSize int
|
|
removerSleepInterval time.Duration
|
|
expiredCollectorBatchSize int
|
|
expiredCollectorWorkerCount int
|
|
}
|
|
|
|
writecacheCfg struct {
|
|
enabled bool
|
|
path string
|
|
maxObjSize uint64
|
|
flushWorkerCount int
|
|
sizeLimit uint64
|
|
countLimit uint64
|
|
noSync bool
|
|
flushSizeLimit uint64
|
|
}
|
|
|
|
piloramaCfg struct {
|
|
enabled bool
|
|
path string
|
|
perm fs.FileMode
|
|
noSync bool
|
|
maxBatchSize int
|
|
maxBatchDelay time.Duration
|
|
}
|
|
}
|
|
|
|
// id returns persistent id of a shard. It is different from the ID used in runtime
|
|
// and is primarily used to identify shards in the configuration.
|
|
func (c *shardCfg) id() string {
|
|
// This calculation should be kept in sync with
|
|
// pkg/local_object_storage/engine/control.go file.
|
|
var sb strings.Builder
|
|
for i := range c.subStorages {
|
|
sb.WriteString(filepath.Clean(c.subStorages[i].path))
|
|
}
|
|
return sb.String()
|
|
}
|
|
|
|
type subStorageCfg struct {
|
|
// common for all storages
|
|
typ string
|
|
path string
|
|
perm fs.FileMode
|
|
depth uint64
|
|
noSync bool
|
|
|
|
// blobovnicza-specific
|
|
size uint64
|
|
width uint64
|
|
openedCacheSize int
|
|
initWorkerCount int
|
|
rebuildDropTimeout time.Duration
|
|
openedCacheTTL time.Duration
|
|
openedCacheExpInterval time.Duration
|
|
}
|
|
|
|
// readConfig fills applicationConfiguration with raw configuration values
|
|
// not modifying them.
|
|
func (a *applicationConfiguration) readConfig(c *config.Config) error {
|
|
if a._read {
|
|
err := c.Reload()
|
|
if err != nil {
|
|
return fmt.Errorf("could not reload configuration: %w", err)
|
|
}
|
|
|
|
err = validateConfig(c)
|
|
if err != nil {
|
|
return fmt.Errorf("configuration's validation: %w", err)
|
|
}
|
|
|
|
// clear if it is rereading
|
|
cmode := a.cmode
|
|
*a = applicationConfiguration{}
|
|
a.cmode = cmode
|
|
}
|
|
|
|
a._read = true
|
|
a.cmode.Store(nodeconfig.CompatibilityMode(c))
|
|
|
|
// Logger
|
|
|
|
a.LoggerCfg.level = loggerconfig.Level(c)
|
|
a.LoggerCfg.destination = loggerconfig.Destination(c)
|
|
a.LoggerCfg.timestamp = loggerconfig.Timestamp(c)
|
|
|
|
// Object
|
|
|
|
a.ObjectCfg.tombstoneLifetime = objectconfig.TombstoneLifetime(c)
|
|
var pm []placement.Metric
|
|
for _, raw := range objectconfig.Get(c).Priority() {
|
|
m, err := placement.ParseMetric(raw)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pm = append(pm, m)
|
|
}
|
|
a.ObjectCfg.priorityMetrics = pm
|
|
|
|
// Storage Engine
|
|
|
|
a.EngineCfg.errorThreshold = engineconfig.ShardErrorThreshold(c)
|
|
a.EngineCfg.shardPoolSize = engineconfig.ShardPoolSize(c)
|
|
a.EngineCfg.lowMem = engineconfig.EngineLowMemoryConsumption(c)
|
|
|
|
return engineconfig.IterateShards(c, false, func(sc *shardconfig.Config) error { return a.updateShardConfig(c, sc) })
|
|
}
|
|
|
|
func (a *applicationConfiguration) updateShardConfig(c *config.Config, oldConfig *shardconfig.Config) error {
|
|
var newConfig shardCfg
|
|
|
|
newConfig.refillMetabase = oldConfig.RefillMetabase()
|
|
newConfig.refillMetabaseWorkersCount = oldConfig.RefillMetabaseWorkersCount()
|
|
newConfig.mode = oldConfig.Mode()
|
|
newConfig.compress = oldConfig.Compress()
|
|
newConfig.estimateCompressibility = oldConfig.EstimateCompressibility()
|
|
newConfig.estimateCompressibilityThreshold = oldConfig.EstimateCompressibilityThreshold()
|
|
newConfig.uncompressableContentType = oldConfig.UncompressableContentTypes()
|
|
newConfig.smallSizeObjectLimit = oldConfig.SmallSizeLimit()
|
|
|
|
a.setShardWriteCacheConfig(&newConfig, oldConfig)
|
|
|
|
a.setShardPiloramaConfig(c, &newConfig, oldConfig)
|
|
|
|
if err := a.setShardStorageConfig(&newConfig, oldConfig); err != nil {
|
|
return err
|
|
}
|
|
|
|
a.setMetabaseConfig(&newConfig, oldConfig)
|
|
|
|
a.setGCConfig(&newConfig, oldConfig)
|
|
|
|
a.EngineCfg.shards = append(a.EngineCfg.shards, newConfig)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *applicationConfiguration) setShardWriteCacheConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
|
writeCacheCfg := oldConfig.WriteCache()
|
|
if writeCacheCfg.Enabled() {
|
|
wc := &newConfig.writecacheCfg
|
|
|
|
wc.enabled = true
|
|
wc.path = writeCacheCfg.Path()
|
|
wc.maxObjSize = writeCacheCfg.MaxObjectSize()
|
|
wc.flushWorkerCount = writeCacheCfg.WorkerCount()
|
|
wc.sizeLimit = writeCacheCfg.SizeLimit()
|
|
wc.countLimit = writeCacheCfg.CountLimit()
|
|
wc.noSync = writeCacheCfg.NoSync()
|
|
wc.flushSizeLimit = writeCacheCfg.MaxFlushingObjectsSize()
|
|
}
|
|
}
|
|
|
|
func (a *applicationConfiguration) setShardPiloramaConfig(c *config.Config, newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
|
if config.BoolSafe(c.Sub("tree"), "enabled") {
|
|
piloramaCfg := oldConfig.Pilorama()
|
|
pr := &newConfig.piloramaCfg
|
|
|
|
pr.enabled = true
|
|
pr.path = piloramaCfg.Path()
|
|
pr.perm = piloramaCfg.Perm()
|
|
pr.noSync = piloramaCfg.NoSync()
|
|
pr.maxBatchSize = piloramaCfg.MaxBatchSize()
|
|
pr.maxBatchDelay = piloramaCfg.MaxBatchDelay()
|
|
}
|
|
}
|
|
|
|
func (a *applicationConfiguration) setShardStorageConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) error {
|
|
blobStorCfg := oldConfig.BlobStor()
|
|
storagesCfg := blobStorCfg.Storages()
|
|
|
|
ss := make([]subStorageCfg, 0, len(storagesCfg))
|
|
for i := range storagesCfg {
|
|
var sCfg subStorageCfg
|
|
|
|
sCfg.typ = storagesCfg[i].Type()
|
|
sCfg.path = storagesCfg[i].Path()
|
|
sCfg.perm = storagesCfg[i].Perm()
|
|
|
|
switch storagesCfg[i].Type() {
|
|
case blobovniczatree.Type:
|
|
sub := blobovniczaconfig.From((*config.Config)(storagesCfg[i]))
|
|
|
|
sCfg.size = sub.Size()
|
|
sCfg.depth = sub.ShallowDepth()
|
|
sCfg.width = sub.ShallowWidth()
|
|
sCfg.openedCacheSize = sub.OpenedCacheSize()
|
|
sCfg.openedCacheTTL = sub.OpenedCacheTTL()
|
|
sCfg.openedCacheExpInterval = sub.OpenedCacheExpInterval()
|
|
sCfg.initWorkerCount = sub.InitWorkerCount()
|
|
sCfg.rebuildDropTimeout = sub.RebuildDropTimeout()
|
|
case fstree.Type:
|
|
sub := fstreeconfig.From((*config.Config)(storagesCfg[i]))
|
|
sCfg.depth = sub.Depth()
|
|
sCfg.noSync = sub.NoSync()
|
|
default:
|
|
return fmt.Errorf("invalid storage type: %s", storagesCfg[i].Type())
|
|
}
|
|
|
|
ss = append(ss, sCfg)
|
|
}
|
|
|
|
newConfig.subStorages = ss
|
|
return nil
|
|
}
|
|
|
|
func (a *applicationConfiguration) setMetabaseConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
|
metabaseCfg := oldConfig.Metabase()
|
|
m := &newConfig.metaCfg
|
|
|
|
m.path = metabaseCfg.Path()
|
|
m.perm = metabaseCfg.BoltDB().Perm()
|
|
m.maxBatchDelay = metabaseCfg.BoltDB().MaxBatchDelay()
|
|
m.maxBatchSize = metabaseCfg.BoltDB().MaxBatchSize()
|
|
}
|
|
|
|
func (a *applicationConfiguration) setGCConfig(newConfig *shardCfg, oldConfig *shardconfig.Config) {
|
|
gcCfg := oldConfig.GC()
|
|
newConfig.gcCfg.removerBatchSize = gcCfg.RemoverBatchSize()
|
|
newConfig.gcCfg.removerSleepInterval = gcCfg.RemoverSleepInterval()
|
|
newConfig.gcCfg.expiredCollectorBatchSize = gcCfg.ExpiredCollectorBatchSize()
|
|
newConfig.gcCfg.expiredCollectorWorkerCount = gcCfg.ExpiredCollectorWorkerCount()
|
|
}
|
|
|
|
// internals contains application-specific internals that are created
|
|
// on application startup and are shared b/w the components during
|
|
// the application life cycle.
|
|
// It should not contain any read configuration values, component-specific
|
|
// helpers and fields.
|
|
type internals struct {
|
|
done chan struct{}
|
|
ctxCancel func()
|
|
internalErr chan error // channel for internal application errors at runtime
|
|
|
|
appCfg *config.Config
|
|
|
|
log *logger.Logger
|
|
|
|
wg sync.WaitGroup
|
|
workers []worker
|
|
closers []closer
|
|
|
|
apiVersion version.Version
|
|
healthStatus *atomic.Int32
|
|
// is node under maintenance
|
|
isMaintenance atomic.Bool
|
|
audit *atomic.Bool
|
|
|
|
sdNotify bool
|
|
}
|
|
|
|
// starts node's maintenance.
|
|
func (c *cfg) startMaintenance(ctx context.Context) {
|
|
c.isMaintenance.Store(true)
|
|
c.cfgNetmap.state.setControlNetmapStatus(control.NetmapStatus_MAINTENANCE)
|
|
c.log.Info(ctx, logs.FrostFSNodeStartedLocalNodesMaintenance)
|
|
}
|
|
|
|
// stops node's maintenance.
|
|
func (c *internals) stopMaintenance(ctx context.Context) {
|
|
if c.isMaintenance.CompareAndSwap(true, false) {
|
|
c.log.Info(ctx, logs.FrostFSNodeStoppedLocalNodesMaintenance)
|
|
}
|
|
}
|
|
|
|
// IsMaintenance checks if storage node is under maintenance.
|
|
//
|
|
// Provides util.NodeState to Object service.
|
|
func (c *internals) IsMaintenance() bool {
|
|
return c.isMaintenance.Load()
|
|
}
|
|
|
|
// shared contains component-specific structs/helpers that should
|
|
// be shared during initialization of the application.
|
|
type shared struct {
|
|
privateTokenStore sessionStorage
|
|
persistate *state.PersistentStorage
|
|
|
|
clientCache *cache.ClientCache
|
|
bgClientCache *cache.ClientCache
|
|
putClientCache *cache.ClientCache
|
|
localAddr network.AddressGroup
|
|
|
|
key *keys.PrivateKey
|
|
binPublicKey []byte
|
|
ownerIDFromKey user.ID // user ID calculated from key
|
|
|
|
// current network map
|
|
netMap atomic.Value // type netmap.NetMap
|
|
netMapSource netmapCore.Source
|
|
|
|
cnrClient *containerClient.Client
|
|
|
|
frostfsidClient frostfsidcore.SubjectProvider
|
|
|
|
respSvc *response.Service
|
|
|
|
replicator *replicator.Replicator
|
|
|
|
treeService *tree.Service
|
|
|
|
metricsCollector *metrics.NodeMetrics
|
|
|
|
metricsSvc *objectService.MetricCollector
|
|
|
|
dialerSource *internalNet.DialerSource
|
|
}
|
|
|
|
// dynamicConfiguration stores parameters of the
|
|
// components that supports runtime reconfigurations.
|
|
type dynamicConfiguration struct {
|
|
logger *logger.Prm
|
|
pprof *httpComponent
|
|
metrics *httpComponent
|
|
}
|
|
|
|
type appConfigGuard struct {
|
|
mtx sync.RWMutex
|
|
}
|
|
|
|
func (g *appConfigGuard) LockAppConfigShared() func() {
|
|
g.mtx.RLock()
|
|
return func() { g.mtx.RUnlock() }
|
|
}
|
|
|
|
func (g *appConfigGuard) LockAppConfigExclusive() func() {
|
|
g.mtx.Lock()
|
|
return func() { g.mtx.Unlock() }
|
|
}
|
|
|
|
type cfg struct {
|
|
applicationConfiguration
|
|
internals
|
|
shared
|
|
dynamicConfiguration
|
|
appConfigGuard
|
|
|
|
// configuration of the internal
|
|
// services
|
|
cfgGRPC cfgGRPC
|
|
cfgMorph cfgMorph
|
|
cfgAccounting cfgAccounting
|
|
cfgContainer cfgContainer
|
|
cfgFrostfsID cfgFrostfsID
|
|
cfgNodeInfo cfgNodeInfo
|
|
cfgNetmap cfgNetmap
|
|
cfgControlService cfgControlService
|
|
cfgObject cfgObject
|
|
}
|
|
|
|
// ReadCurrentNetMap reads network map which has been cached at the
|
|
// latest epoch. Returns an error if value has not been cached yet.
|
|
//
|
|
// Provides interface for NetmapService server.
|
|
func (c *cfg) ReadCurrentNetMap(msg *netmapV2.NetMap) error {
|
|
val := c.netMap.Load()
|
|
if val == nil {
|
|
return errors.New("missing local network map")
|
|
}
|
|
|
|
val.(netmap.NetMap).WriteToV2(msg)
|
|
|
|
return nil
|
|
}
|
|
|
|
type grpcServer struct {
|
|
Listener net.Listener
|
|
Server *grpc.Server
|
|
Endpoint string
|
|
}
|
|
|
|
type cfgGRPC struct {
|
|
// guard protects connections and handlers
|
|
guard sync.RWMutex
|
|
// servers must be protected with guard
|
|
servers []grpcServer
|
|
// handlers must be protected with guard
|
|
handlers []func(e string, l net.Listener, s *grpc.Server)
|
|
|
|
maxChunkSize uint64
|
|
maxAddrAmount uint64
|
|
reconnectTimeout time.Duration
|
|
}
|
|
|
|
func (c *cfgGRPC) append(e string, l net.Listener, s *grpc.Server) {
|
|
c.guard.Lock()
|
|
defer c.guard.Unlock()
|
|
|
|
c.servers = append(c.servers, grpcServer{
|
|
Listener: l,
|
|
Server: s,
|
|
Endpoint: e,
|
|
})
|
|
}
|
|
|
|
func (c *cfgGRPC) appendAndHandle(e string, l net.Listener, s *grpc.Server) {
|
|
c.guard.Lock()
|
|
defer c.guard.Unlock()
|
|
|
|
c.servers = append(c.servers, grpcServer{
|
|
Listener: l,
|
|
Server: s,
|
|
Endpoint: e,
|
|
})
|
|
|
|
for _, h := range c.handlers {
|
|
h(e, l, s)
|
|
}
|
|
}
|
|
|
|
func (c *cfgGRPC) performAndSave(handler func(e string, l net.Listener, s *grpc.Server)) {
|
|
c.guard.Lock()
|
|
defer c.guard.Unlock()
|
|
|
|
for _, conn := range c.servers {
|
|
handler(conn.Endpoint, conn.Listener, conn.Server)
|
|
}
|
|
|
|
c.handlers = append(c.handlers, handler)
|
|
}
|
|
|
|
func (c *cfgGRPC) dropConnection(endpoint string) {
|
|
c.guard.Lock()
|
|
defer c.guard.Unlock()
|
|
|
|
pos := -1
|
|
for idx, srv := range c.servers {
|
|
if srv.Endpoint == endpoint {
|
|
pos = idx
|
|
break
|
|
}
|
|
}
|
|
if pos < 0 {
|
|
return
|
|
}
|
|
|
|
c.servers[pos].Server.Stop() // closes listener
|
|
c.servers = append(c.servers[0:pos], c.servers[pos+1:]...)
|
|
}
|
|
|
|
type cfgMorph struct {
|
|
initialized bool
|
|
guard sync.Mutex
|
|
|
|
client *client.Client
|
|
|
|
// TTL of Sidechain cached values. Non-positive value disables caching.
|
|
cacheTTL time.Duration
|
|
|
|
containerCacheSize uint32
|
|
|
|
proxyScriptHash neogoutil.Uint160
|
|
}
|
|
|
|
type cfgAccounting struct {
|
|
scriptHash neogoutil.Uint160
|
|
}
|
|
|
|
type cfgContainer struct {
|
|
scriptHash neogoutil.Uint160
|
|
|
|
parsers map[event.Type]event.NotificationParser
|
|
subscribers map[event.Type][]event.Handler
|
|
workerPool util.WorkerPool // pool for asynchronous handlers
|
|
}
|
|
|
|
type cfgFrostfsID struct {
|
|
scriptHash neogoutil.Uint160
|
|
}
|
|
|
|
type cfgNetmap struct {
|
|
scriptHash neogoutil.Uint160
|
|
wrapper *nmClient.Client
|
|
|
|
parsers map[event.Type]event.NotificationParser
|
|
|
|
subscribers map[event.Type][]event.Handler
|
|
workerPool util.WorkerPool // pool for asynchronous handlers
|
|
|
|
state *networkState
|
|
|
|
needBootstrap bool
|
|
reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime
|
|
}
|
|
|
|
type cfgNodeInfo struct {
|
|
// values from config
|
|
localInfo netmap.NodeInfo
|
|
}
|
|
|
|
type cfgObject struct {
|
|
getSvc *getsvc.Service
|
|
|
|
cnrSource container.Source
|
|
|
|
cfgAccessPolicyEngine cfgAccessPolicyEngine
|
|
|
|
pool cfgObjectRoutines
|
|
|
|
cfgLocalStorage cfgLocalStorage
|
|
|
|
tombstoneLifetime *atomic.Uint64
|
|
|
|
skipSessionTokenIssuerVerification bool
|
|
}
|
|
|
|
type cfgLocalStorage struct {
|
|
localStorage *engine.StorageEngine
|
|
}
|
|
|
|
type cfgAccessPolicyEngine struct {
|
|
policyContractHash neogoutil.Uint160
|
|
|
|
accessPolicyEngine *accessPolicyEngine
|
|
}
|
|
|
|
type cfgObjectRoutines struct {
|
|
putRemote *ants.Pool
|
|
|
|
putLocal *ants.Pool
|
|
|
|
replication *ants.Pool
|
|
}
|
|
|
|
type cfgControlService struct {
|
|
server *grpc.Server
|
|
}
|
|
|
|
var persistateSideChainLastBlockKey = []byte("side_chain_last_processed_block")
|
|
|
|
func initCfg(appCfg *config.Config) *cfg {
|
|
c := &cfg{
|
|
applicationConfiguration: applicationConfiguration{
|
|
cmode: &atomic.Bool{},
|
|
},
|
|
}
|
|
|
|
err := c.readConfig(appCfg)
|
|
if err != nil {
|
|
panic(fmt.Errorf("config reading: %w", err))
|
|
}
|
|
|
|
key := nodeconfig.Key(appCfg)
|
|
|
|
relayOnly := nodeconfig.Relay(appCfg)
|
|
|
|
netState := newNetworkState()
|
|
|
|
c.shared = initShared(appCfg, key, netState, relayOnly)
|
|
|
|
netState.metrics = c.metricsCollector
|
|
|
|
logPrm, err := c.loggerPrm()
|
|
fatalOnErr(err)
|
|
logPrm.SamplingHook = c.metricsCollector.LogMetrics().GetSamplingHook()
|
|
log, err := logger.NewLogger(logPrm)
|
|
fatalOnErr(err)
|
|
if loggerconfig.ToLokiConfig(appCfg).Enabled {
|
|
log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
|
lokiCore := lokicore.New(core, loggerconfig.ToLokiConfig(appCfg))
|
|
return lokiCore
|
|
}))
|
|
}
|
|
|
|
c.internals = initInternals(appCfg, log)
|
|
|
|
c.cfgAccounting = cfgAccounting{
|
|
scriptHash: contractsconfig.Balance(appCfg),
|
|
}
|
|
c.cfgContainer = initContainer(appCfg)
|
|
|
|
c.cfgFrostfsID = initFrostfsID(appCfg)
|
|
|
|
c.cfgNetmap = initNetmap(appCfg, netState, relayOnly)
|
|
|
|
c.cfgGRPC = initCfgGRPC()
|
|
|
|
c.cfgMorph = cfgMorph{
|
|
proxyScriptHash: contractsconfig.Proxy(appCfg),
|
|
}
|
|
c.cfgObject = initCfgObject(appCfg)
|
|
|
|
user.IDFromKey(&c.ownerIDFromKey, key.PrivateKey.PublicKey)
|
|
|
|
c.onShutdown(c.clientCache.CloseAll) // clean up connections
|
|
c.onShutdown(c.bgClientCache.CloseAll) // clean up connections
|
|
c.onShutdown(c.putClientCache.CloseAll) // clean up connections
|
|
c.onShutdown(func() { _ = c.persistate.Close() })
|
|
|
|
return c
|
|
}
|
|
|
|
func initInternals(appCfg *config.Config, log *logger.Logger) internals {
|
|
var healthStatus atomic.Int32
|
|
healthStatus.Store(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED))
|
|
|
|
var auditRequests atomic.Bool
|
|
auditRequests.Store(audit.Enabled(appCfg))
|
|
|
|
return internals{
|
|
done: make(chan struct{}),
|
|
appCfg: appCfg,
|
|
internalErr: make(chan error),
|
|
log: log,
|
|
apiVersion: version.Current(),
|
|
healthStatus: &healthStatus,
|
|
sdNotify: initSdNotify(appCfg),
|
|
audit: &auditRequests,
|
|
}
|
|
}
|
|
|
|
func initSdNotify(appCfg *config.Config) bool {
|
|
if config.BoolSafe(appCfg.Sub("systemdnotify"), "enabled") {
|
|
fatalOnErr(sdnotify.InitSocket())
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func initShared(appCfg *config.Config, key *keys.PrivateKey, netState *networkState, relayOnly bool) shared {
|
|
var netAddr network.AddressGroup
|
|
|
|
if !relayOnly {
|
|
netAddr = nodeconfig.BootstrapAddresses(appCfg)
|
|
}
|
|
|
|
persistate, err := state.NewPersistentStorage(nodeconfig.PersistentState(appCfg).Path())
|
|
fatalOnErr(err)
|
|
|
|
nodeMetrics := metrics.NewNodeMetrics()
|
|
|
|
ds, err := internalNet.NewDialerSource(internalNetConfig(appCfg, nodeMetrics.MultinetMetrics()))
|
|
fatalOnErr(err)
|
|
|
|
cacheOpts := cache.ClientCacheOpts{
|
|
DialTimeout: apiclientconfig.DialTimeout(appCfg),
|
|
StreamTimeout: apiclientconfig.StreamTimeout(appCfg),
|
|
Key: &key.PrivateKey,
|
|
AllowExternal: apiclientconfig.AllowExternal(appCfg),
|
|
ReconnectTimeout: apiclientconfig.ReconnectTimeout(appCfg),
|
|
DialerSource: ds,
|
|
}
|
|
|
|
return shared{
|
|
key: key,
|
|
binPublicKey: key.PublicKey().Bytes(),
|
|
localAddr: netAddr,
|
|
respSvc: response.NewService(netState),
|
|
clientCache: cache.NewSDKClientCache(cacheOpts),
|
|
bgClientCache: cache.NewSDKClientCache(cacheOpts),
|
|
putClientCache: cache.NewSDKClientCache(cacheOpts),
|
|
persistate: persistate,
|
|
metricsCollector: nodeMetrics,
|
|
dialerSource: ds,
|
|
}
|
|
}
|
|
|
|
func internalNetConfig(appCfg *config.Config, m metrics.MultinetMetrics) internalNet.Config {
|
|
result := internalNet.Config{
|
|
Enabled: multinet.Enabled(appCfg),
|
|
Balancer: multinet.Balancer(appCfg),
|
|
Restrict: multinet.Restrict(appCfg),
|
|
FallbackDelay: multinet.FallbackDelay(appCfg),
|
|
Metrics: m,
|
|
}
|
|
sn := multinet.Subnets(appCfg)
|
|
for _, s := range sn {
|
|
result.Subnets = append(result.Subnets, internalNet.Subnet{
|
|
Prefix: s.Mask,
|
|
SourceIPs: s.SourceIPs,
|
|
})
|
|
}
|
|
return result
|
|
}
|
|
|
|
func initNetmap(appCfg *config.Config, netState *networkState, relayOnly bool) cfgNetmap {
|
|
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
|
fatalOnErr(err)
|
|
|
|
var reBootstrapTurnedOff atomic.Bool
|
|
reBootstrapTurnedOff.Store(relayOnly)
|
|
return cfgNetmap{
|
|
scriptHash: contractsconfig.Netmap(appCfg),
|
|
state: netState,
|
|
workerPool: netmapWorkerPool,
|
|
needBootstrap: !relayOnly,
|
|
reBoostrapTurnedOff: &reBootstrapTurnedOff,
|
|
}
|
|
}
|
|
|
|
func initContainer(appCfg *config.Config) cfgContainer {
|
|
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
|
|
fatalOnErr(err)
|
|
|
|
return cfgContainer{
|
|
scriptHash: contractsconfig.Container(appCfg),
|
|
workerPool: containerWorkerPool,
|
|
}
|
|
}
|
|
|
|
func initFrostfsID(appCfg *config.Config) cfgFrostfsID {
|
|
return cfgFrostfsID{
|
|
scriptHash: contractsconfig.FrostfsID(appCfg),
|
|
}
|
|
}
|
|
|
|
func initCfgGRPC() cfgGRPC {
|
|
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
|
|
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes
|
|
|
|
return cfgGRPC{
|
|
maxChunkSize: maxChunkSize,
|
|
maxAddrAmount: maxAddrAmount,
|
|
}
|
|
}
|
|
|
|
func initCfgObject(appCfg *config.Config) cfgObject {
|
|
var tsLifetime atomic.Uint64
|
|
tsLifetime.Store(objectconfig.TombstoneLifetime(appCfg))
|
|
return cfgObject{
|
|
pool: initObjectPool(appCfg),
|
|
tombstoneLifetime: &tsLifetime,
|
|
skipSessionTokenIssuerVerification: objectconfig.Put(appCfg).SkipSessionTokenIssuerVerification(),
|
|
}
|
|
}
|
|
|
|
func (c *cfg) engineOpts() []engine.Option {
|
|
var opts []engine.Option
|
|
|
|
opts = append(opts,
|
|
engine.WithShardPoolSize(c.EngineCfg.shardPoolSize),
|
|
engine.WithErrorThreshold(c.EngineCfg.errorThreshold),
|
|
engine.WithLogger(c.log),
|
|
engine.WithLowMemoryConsumption(c.EngineCfg.lowMem),
|
|
)
|
|
|
|
if c.metricsCollector != nil {
|
|
opts = append(opts, engine.WithMetrics(c.metricsCollector.Engine()))
|
|
}
|
|
|
|
return opts
|
|
}
|
|
|
|
type shardOptsWithID struct {
|
|
configID string
|
|
shOpts []shard.Option
|
|
}
|
|
|
|
func (c *cfg) shardOpts(ctx context.Context) []shardOptsWithID {
|
|
shards := make([]shardOptsWithID, 0, len(c.EngineCfg.shards))
|
|
|
|
for _, shCfg := range c.EngineCfg.shards {
|
|
shards = append(shards, c.getShardOpts(ctx, shCfg))
|
|
}
|
|
|
|
return shards
|
|
}
|
|
|
|
func (c *cfg) getWriteCacheOpts(shCfg shardCfg) []writecache.Option {
|
|
var writeCacheOpts []writecache.Option
|
|
if wcRead := shCfg.writecacheCfg; wcRead.enabled {
|
|
writeCacheOpts = append(writeCacheOpts,
|
|
writecache.WithPath(wcRead.path),
|
|
writecache.WithFlushSizeLimit(wcRead.flushSizeLimit),
|
|
writecache.WithMaxObjectSize(wcRead.maxObjSize),
|
|
writecache.WithFlushWorkersCount(wcRead.flushWorkerCount),
|
|
writecache.WithMaxCacheSize(wcRead.sizeLimit),
|
|
writecache.WithMaxCacheCount(wcRead.countLimit),
|
|
writecache.WithNoSync(wcRead.noSync),
|
|
writecache.WithLogger(c.log),
|
|
)
|
|
}
|
|
return writeCacheOpts
|
|
}
|
|
|
|
func (c *cfg) getPiloramaOpts(shCfg shardCfg) []pilorama.Option {
|
|
var piloramaOpts []pilorama.Option
|
|
if prRead := shCfg.piloramaCfg; prRead.enabled {
|
|
piloramaOpts = append(piloramaOpts,
|
|
pilorama.WithPath(prRead.path),
|
|
pilorama.WithPerm(prRead.perm),
|
|
pilorama.WithNoSync(prRead.noSync),
|
|
pilorama.WithMaxBatchSize(prRead.maxBatchSize),
|
|
pilorama.WithMaxBatchDelay(prRead.maxBatchDelay),
|
|
)
|
|
if c.metricsCollector != nil {
|
|
piloramaOpts = append(piloramaOpts, pilorama.WithMetrics(lsmetrics.NewPiloramaMetrics(c.metricsCollector.PiloramaMetrics())))
|
|
}
|
|
}
|
|
return piloramaOpts
|
|
}
|
|
|
|
func (c *cfg) getSubstorageOpts(ctx context.Context, shCfg shardCfg) []blobstor.SubStorage {
|
|
var ss []blobstor.SubStorage
|
|
for _, sRead := range shCfg.subStorages {
|
|
switch sRead.typ {
|
|
case blobovniczatree.Type:
|
|
blobTreeOpts := []blobovniczatree.Option{
|
|
blobovniczatree.WithRootPath(sRead.path),
|
|
blobovniczatree.WithPermissions(sRead.perm),
|
|
blobovniczatree.WithBlobovniczaSize(sRead.size),
|
|
blobovniczatree.WithBlobovniczaShallowDepth(sRead.depth),
|
|
blobovniczatree.WithBlobovniczaShallowWidth(sRead.width),
|
|
blobovniczatree.WithOpenedCacheSize(sRead.openedCacheSize),
|
|
blobovniczatree.WithOpenedCacheTTL(sRead.openedCacheTTL),
|
|
blobovniczatree.WithOpenedCacheExpInterval(sRead.openedCacheExpInterval),
|
|
blobovniczatree.WithInitWorkerCount(sRead.initWorkerCount),
|
|
blobovniczatree.WithWaitBeforeDropDB(sRead.rebuildDropTimeout),
|
|
blobovniczatree.WithLogger(c.log),
|
|
blobovniczatree.WithObjectSizeLimit(shCfg.smallSizeObjectLimit),
|
|
}
|
|
|
|
if c.metricsCollector != nil {
|
|
blobTreeOpts = append(blobTreeOpts,
|
|
blobovniczatree.WithMetrics(
|
|
lsmetrics.NewBlobovniczaTreeMetrics(sRead.path, c.metricsCollector.BlobobvnizcaTreeMetrics()),
|
|
),
|
|
)
|
|
}
|
|
ss = append(ss, blobstor.SubStorage{
|
|
Storage: blobovniczatree.NewBlobovniczaTree(ctx, blobTreeOpts...),
|
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
|
return uint64(len(data)) < shCfg.smallSizeObjectLimit
|
|
},
|
|
})
|
|
case fstree.Type:
|
|
fstreeOpts := []fstree.Option{
|
|
fstree.WithPath(sRead.path),
|
|
fstree.WithPerm(sRead.perm),
|
|
fstree.WithDepth(sRead.depth),
|
|
fstree.WithNoSync(sRead.noSync),
|
|
fstree.WithLogger(c.log),
|
|
}
|
|
if c.metricsCollector != nil {
|
|
fstreeOpts = append(fstreeOpts,
|
|
fstree.WithMetrics(
|
|
lsmetrics.NewFSTreeMetricsWithoutShardID(sRead.path, c.metricsCollector.FSTree()),
|
|
),
|
|
)
|
|
}
|
|
|
|
ss = append(ss, blobstor.SubStorage{
|
|
Storage: fstree.New(fstreeOpts...),
|
|
Policy: func(_ *objectSDK.Object, _ []byte) bool {
|
|
return true
|
|
},
|
|
})
|
|
default:
|
|
// should never happen, that has already
|
|
// been handled: when the config was read
|
|
}
|
|
}
|
|
return ss
|
|
}
|
|
|
|
func (c *cfg) getShardOpts(ctx context.Context, shCfg shardCfg) shardOptsWithID {
|
|
writeCacheOpts := c.getWriteCacheOpts(shCfg)
|
|
piloramaOpts := c.getPiloramaOpts(shCfg)
|
|
ss := c.getSubstorageOpts(ctx, shCfg)
|
|
|
|
blobstoreOpts := []blobstor.Option{
|
|
blobstor.WithCompressObjects(shCfg.compress),
|
|
blobstor.WithUncompressableContentTypes(shCfg.uncompressableContentType),
|
|
blobstor.WithCompressibilityEstimate(shCfg.estimateCompressibility),
|
|
blobstor.WithCompressibilityEstimateThreshold(shCfg.estimateCompressibilityThreshold),
|
|
blobstor.WithStorages(ss),
|
|
blobstor.WithLogger(c.log),
|
|
}
|
|
if c.metricsCollector != nil {
|
|
blobstoreOpts = append(blobstoreOpts, blobstor.WithMetrics(lsmetrics.NewBlobstoreMetrics(c.metricsCollector.Blobstore())))
|
|
}
|
|
|
|
mbOptions := []meta.Option{
|
|
meta.WithPath(shCfg.metaCfg.path),
|
|
meta.WithPermissions(shCfg.metaCfg.perm),
|
|
meta.WithMaxBatchSize(shCfg.metaCfg.maxBatchSize),
|
|
meta.WithMaxBatchDelay(shCfg.metaCfg.maxBatchDelay),
|
|
meta.WithBoltDBOptions(&bbolt.Options{
|
|
Timeout: 100 * time.Millisecond,
|
|
}),
|
|
meta.WithLogger(c.log),
|
|
meta.WithEpochState(c.cfgNetmap.state),
|
|
}
|
|
if c.metricsCollector != nil {
|
|
mbOptions = append(mbOptions, meta.WithMetrics(lsmetrics.NewMetabaseMetrics(shCfg.metaCfg.path, c.metricsCollector.MetabaseMetrics())))
|
|
}
|
|
|
|
var sh shardOptsWithID
|
|
sh.configID = shCfg.id()
|
|
sh.shOpts = []shard.Option{
|
|
shard.WithLogger(c.log),
|
|
shard.WithRefillMetabase(shCfg.refillMetabase),
|
|
shard.WithRefillMetabaseWorkersCount(shCfg.refillMetabaseWorkersCount),
|
|
shard.WithMode(shCfg.mode),
|
|
shard.WithBlobStorOptions(blobstoreOpts...),
|
|
shard.WithMetaBaseOptions(mbOptions...),
|
|
shard.WithPiloramaOptions(piloramaOpts...),
|
|
shard.WithWriteCache(shCfg.writecacheCfg.enabled),
|
|
shard.WithWriteCacheOptions(writeCacheOpts),
|
|
shard.WithRemoverBatchSize(shCfg.gcCfg.removerBatchSize),
|
|
shard.WithGCRemoverSleepInterval(shCfg.gcCfg.removerSleepInterval),
|
|
shard.WithExpiredCollectorBatchSize(shCfg.gcCfg.expiredCollectorBatchSize),
|
|
shard.WithExpiredCollectorWorkerCount(shCfg.gcCfg.expiredCollectorWorkerCount),
|
|
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
|
pool, err := ants.NewPool(sz)
|
|
fatalOnErr(err)
|
|
|
|
return pool
|
|
}),
|
|
}
|
|
return sh
|
|
}
|
|
|
|
func (c *cfg) loggerPrm() (*logger.Prm, error) {
|
|
// check if it has been inited before
|
|
if c.dynamicConfiguration.logger == nil {
|
|
c.dynamicConfiguration.logger = new(logger.Prm)
|
|
}
|
|
|
|
// (re)init read configuration
|
|
err := c.dynamicConfiguration.logger.SetLevelString(c.LoggerCfg.level)
|
|
if err != nil {
|
|
// not expected since validation should be performed before
|
|
panic("incorrect log level format: " + c.LoggerCfg.level)
|
|
}
|
|
err = c.dynamicConfiguration.logger.SetDestination(c.LoggerCfg.destination)
|
|
if err != nil {
|
|
// not expected since validation should be performed before
|
|
panic("incorrect log destination format: " + c.LoggerCfg.destination)
|
|
}
|
|
c.dynamicConfiguration.logger.PrependTimestamp = c.LoggerCfg.timestamp
|
|
|
|
return c.dynamicConfiguration.logger, nil
|
|
}
|
|
|
|
func (c *cfg) LocalAddress() network.AddressGroup {
|
|
return c.localAddr
|
|
}
|
|
|
|
func initLocalStorage(ctx context.Context, c *cfg) {
|
|
ls := engine.New(c.engineOpts()...)
|
|
|
|
addNewEpochAsyncNotificationHandler(c, func(ctx context.Context, ev event.Event) {
|
|
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
|
|
})
|
|
|
|
// allocate memory for the service;
|
|
// service will be created later
|
|
c.cfgObject.getSvc = new(getsvc.Service)
|
|
|
|
var shardsAttached int
|
|
for _, optsWithMeta := range c.shardOpts(ctx) {
|
|
id, err := ls.AddShard(ctx, append(optsWithMeta.shOpts,
|
|
shard.WithTombstoneSource(c.createTombstoneSource()),
|
|
shard.WithContainerInfoProvider(c.createContainerInfoProvider(ctx)))...)
|
|
if err != nil {
|
|
c.log.Error(ctx, logs.FrostFSNodeFailedToAttachShardToEngine, zap.Error(err))
|
|
} else {
|
|
shardsAttached++
|
|
c.log.Info(ctx, logs.FrostFSNodeShardAttachedToEngine, zap.Stringer("id", id))
|
|
}
|
|
}
|
|
if shardsAttached == 0 {
|
|
fatalOnErr(engineconfig.ErrNoShardConfigured)
|
|
}
|
|
|
|
c.cfgObject.cfgLocalStorage.localStorage = ls
|
|
|
|
c.onShutdown(func() {
|
|
c.log.Info(ctx, logs.FrostFSNodeClosingComponentsOfTheStorageEngine)
|
|
|
|
err := ls.Close(context.WithoutCancel(ctx))
|
|
if err != nil {
|
|
c.log.Info(ctx, logs.FrostFSNodeStorageEngineClosingFailure,
|
|
zap.String("error", err.Error()),
|
|
)
|
|
} else {
|
|
c.log.Info(ctx, logs.FrostFSNodeAllComponentsOfTheStorageEngineClosedSuccessfully)
|
|
}
|
|
})
|
|
}
|
|
|
|
func initAccessPolicyEngine(ctx context.Context, c *cfg) {
|
|
var localOverrideDB chainbase.LocalOverrideDatabase
|
|
if nodeconfig.PersistentPolicyRules(c.appCfg).Path() == "" {
|
|
c.log.Warn(ctx, logs.FrostFSNodePersistentRuleStorageDBPathIsNotSetInmemoryWillBeUsed)
|
|
localOverrideDB = chainbase.NewInmemoryLocalOverrideDatabase()
|
|
} else {
|
|
localOverrideDB = chainbase.NewBoltLocalOverrideDatabase(
|
|
chainbase.WithPath(nodeconfig.PersistentPolicyRules(c.appCfg).Path()),
|
|
chainbase.WithPerm(nodeconfig.PersistentPolicyRules(c.appCfg).Perm()),
|
|
chainbase.WithNoSync(nodeconfig.PersistentPolicyRules(c.appCfg).NoSync()),
|
|
)
|
|
}
|
|
|
|
var morphRuleStorage policy_engine.MorphRuleChainStorageReader
|
|
morphRuleStorage = policy_client.NewContractStorage(
|
|
client.NewSwitchRPCGuardedActor(c.cfgMorph.client),
|
|
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
|
|
|
|
cacheSize := morphconfig.APEChainCacheSize(c.appCfg)
|
|
if cacheSize > 0 {
|
|
morphRuleStorage = newMorphCache(morphRuleStorage, int(cacheSize), c.cfgMorph.cacheTTL)
|
|
}
|
|
|
|
ape := newAccessPolicyEngine(morphRuleStorage, localOverrideDB)
|
|
c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine = ape
|
|
|
|
c.onShutdown(func() {
|
|
if err := ape.LocalOverrideDatabaseCore().Close(); err != nil {
|
|
c.log.Warn(ctx, logs.FrostFSNodeAccessPolicyEngineClosingFailure,
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
})
|
|
}
|
|
|
|
func initObjectPool(cfg *config.Config) (pool cfgObjectRoutines) {
|
|
var err error
|
|
|
|
optNonBlocking := ants.WithNonblocking(true)
|
|
|
|
putRemoteCapacity := objectconfig.Put(cfg).PoolSizeRemote()
|
|
pool.putRemote, err = ants.NewPool(putRemoteCapacity, optNonBlocking)
|
|
fatalOnErr(err)
|
|
|
|
putLocalCapacity := objectconfig.Put(cfg).PoolSizeLocal()
|
|
pool.putLocal, err = ants.NewPool(putLocalCapacity, optNonBlocking)
|
|
fatalOnErr(err)
|
|
|
|
replicatorPoolSize := replicatorconfig.PoolSize(cfg)
|
|
if replicatorPoolSize <= 0 {
|
|
replicatorPoolSize = putRemoteCapacity
|
|
}
|
|
|
|
pool.replication, err = ants.NewPool(replicatorPoolSize)
|
|
fatalOnErr(err)
|
|
|
|
return pool
|
|
}
|
|
|
|
func (c *cfg) LocalNodeInfo() *netmap.NodeInfo {
|
|
var res netmap.NodeInfo
|
|
ni, ok := c.cfgNetmap.state.getNodeInfo()
|
|
if ok {
|
|
res = ni
|
|
} else {
|
|
res = c.cfgNodeInfo.localInfo
|
|
}
|
|
return &res
|
|
}
|
|
|
|
// setContractNodeInfo rewrites local node info from the FrostFS network map.
|
|
// Called with nil when storage node is outside the FrostFS network map
|
|
// (before entering the network and after leaving it).
|
|
func (c *cfg) setContractNodeInfo(ni *netmap.NodeInfo) {
|
|
c.cfgNetmap.state.setNodeInfo(ni)
|
|
}
|
|
|
|
func (c *cfg) updateContractNodeInfo(ctx context.Context, epoch uint64) {
|
|
ni, err := c.netmapLocalNodeState(epoch)
|
|
if err != nil {
|
|
c.log.Error(ctx, logs.FrostFSNodeCouldNotUpdateNodeStateOnNewEpoch,
|
|
zap.Uint64("epoch", epoch),
|
|
zap.String("error", err.Error()))
|
|
return
|
|
}
|
|
|
|
c.setContractNodeInfo(ni)
|
|
}
|
|
|
|
// bootstrapWithState calls "addPeer" method of the Sidechain Netmap contract
|
|
// with the binary-encoded information from the current node's configuration.
|
|
// The state is set using the provided setter which MUST NOT be nil.
|
|
func (c *cfg) bootstrapWithState(ctx context.Context, stateSetter func(*netmap.NodeInfo)) error {
|
|
ni := c.cfgNodeInfo.localInfo
|
|
stateSetter(&ni)
|
|
|
|
prm := nmClient.AddPeerPrm{}
|
|
prm.SetNodeInfo(ni)
|
|
|
|
return c.cfgNetmap.wrapper.AddPeer(ctx, prm)
|
|
}
|
|
|
|
// bootstrapOnline calls cfg.bootstrapWithState with "online" state.
|
|
func bootstrapOnline(ctx context.Context, c *cfg) error {
|
|
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) {
|
|
ni.SetStatus(netmap.Online)
|
|
})
|
|
}
|
|
|
|
// bootstrap calls bootstrapWithState with:
|
|
// - "maintenance" state if maintenance is in progress on the current node
|
|
// - "online", otherwise
|
|
func (c *cfg) bootstrap(ctx context.Context) error {
|
|
// switch to online except when under maintenance
|
|
st := c.cfgNetmap.state.controlNetmapStatus()
|
|
if st == control.NetmapStatus_MAINTENANCE {
|
|
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithTheMaintenanceState)
|
|
return c.bootstrapWithState(ctx, func(ni *netmap.NodeInfo) {
|
|
ni.SetStatus(netmap.Maintenance)
|
|
})
|
|
}
|
|
|
|
c.log.Info(ctx, logs.FrostFSNodeBootstrappingWithOnlineState,
|
|
zap.Stringer("previous", st),
|
|
)
|
|
|
|
return bootstrapOnline(ctx, c)
|
|
}
|
|
|
|
// needBootstrap checks if local node should be registered in network on bootup.
|
|
func (c *cfg) needBootstrap() bool {
|
|
return c.cfgNetmap.needBootstrap
|
|
}
|
|
|
|
type dCmp struct {
|
|
name string
|
|
reloadFunc func() error
|
|
}
|
|
|
|
func (c *cfg) signalWatcher(ctx context.Context) {
|
|
ch := make(chan os.Signal, 1)
|
|
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
sighupCh := make(chan os.Signal, 1)
|
|
signal.Notify(sighupCh, syscall.SIGHUP)
|
|
|
|
for {
|
|
select {
|
|
// signals causing application to shut down should have priority over
|
|
// reconfiguration signal
|
|
case <-ch:
|
|
c.log.Info(ctx, logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)
|
|
|
|
c.shutdown(ctx)
|
|
|
|
c.log.Info(ctx, logs.FrostFSNodeTerminationSignalProcessingIsComplete)
|
|
return
|
|
case err := <-c.internalErr: // internal application error
|
|
c.log.Warn(ctx, logs.FrostFSNodeInternalApplicationError,
|
|
zap.String("message", err.Error()))
|
|
|
|
c.shutdown(ctx)
|
|
|
|
c.log.Info(ctx, logs.FrostFSNodeInternalErrorProcessingIsComplete)
|
|
return
|
|
default:
|
|
// block until any signal is receieved
|
|
select {
|
|
case <-sighupCh:
|
|
c.reloadConfig(ctx)
|
|
case <-ch:
|
|
c.log.Info(ctx, logs.FrostFSNodeTerminationSignalHasBeenReceivedStopping)
|
|
|
|
c.shutdown(ctx)
|
|
|
|
c.log.Info(ctx, logs.FrostFSNodeTerminationSignalProcessingIsComplete)
|
|
return
|
|
case err := <-c.internalErr: // internal application error
|
|
c.log.Warn(ctx, logs.FrostFSNodeInternalApplicationError,
|
|
zap.String("message", err.Error()))
|
|
|
|
c.shutdown(ctx)
|
|
|
|
c.log.Info(ctx, logs.FrostFSNodeInternalErrorProcessingIsComplete)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *cfg) reloadConfig(ctx context.Context) {
|
|
c.log.Info(ctx, logs.FrostFSNodeSIGHUPHasBeenReceivedRereadingConfiguration)
|
|
|
|
if !c.compareAndSwapHealthStatus(ctx, control.HealthStatus_READY, control.HealthStatus_RECONFIGURING) {
|
|
c.log.Info(ctx, logs.FrostFSNodeSIGHUPSkip)
|
|
return
|
|
}
|
|
defer c.compareAndSwapHealthStatus(ctx, control.HealthStatus_RECONFIGURING, control.HealthStatus_READY)
|
|
|
|
err := c.reloadAppConfig()
|
|
if err != nil {
|
|
c.log.Error(ctx, logs.FrostFSNodeConfigurationReading, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// all the components are expected to support
|
|
// Logger's dynamic reconfiguration approach
|
|
|
|
// Logger
|
|
|
|
logPrm, err := c.loggerPrm()
|
|
if err != nil {
|
|
c.log.Error(ctx, logs.FrostFSNodeLoggerConfigurationPreparation, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
components := c.getComponents(ctx, logPrm)
|
|
|
|
// Object
|
|
c.cfgObject.tombstoneLifetime.Store(c.ObjectCfg.tombstoneLifetime)
|
|
|
|
// Storage Engine
|
|
|
|
var rcfg engine.ReConfiguration
|
|
for _, optsWithID := range c.shardOpts(ctx) {
|
|
rcfg.AddShard(optsWithID.configID, append(optsWithID.shOpts,
|
|
shard.WithTombstoneSource(c.createTombstoneSource()),
|
|
shard.WithContainerInfoProvider(c.createContainerInfoProvider(ctx)),
|
|
))
|
|
}
|
|
|
|
err = c.cfgObject.cfgLocalStorage.localStorage.Reload(ctx, rcfg)
|
|
if err != nil {
|
|
c.log.Error(ctx, logs.FrostFSNodeStorageEngineConfigurationUpdate, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
for _, component := range components {
|
|
err = component.reloadFunc()
|
|
if err != nil {
|
|
c.log.Error(ctx, logs.FrostFSNodeUpdatedConfigurationApplying,
|
|
zap.String("component", component.name),
|
|
zap.Error(err))
|
|
}
|
|
}
|
|
|
|
if err := c.dialerSource.Update(internalNetConfig(c.appCfg, c.metricsCollector.MultinetMetrics())); err != nil {
|
|
c.log.Error(ctx, logs.FailedToUpdateMultinetConfiguration, zap.Error(err))
|
|
return
|
|
}
|
|
|
|
c.log.Info(ctx, logs.FrostFSNodeConfigurationHasBeenReloadedSuccessfully)
|
|
}
|
|
|
|
func (c *cfg) getComponents(ctx context.Context, logPrm *logger.Prm) []dCmp {
|
|
var components []dCmp
|
|
|
|
components = append(components, dCmp{"logger", logPrm.Reload})
|
|
components = append(components, dCmp{"runtime", func() error {
|
|
setRuntimeParameters(ctx, c)
|
|
return nil
|
|
}})
|
|
components = append(components, dCmp{"audit", func() error {
|
|
c.audit.Store(audit.Enabled(c.appCfg))
|
|
return nil
|
|
}})
|
|
components = append(components, dCmp{"pools", c.reloadPools})
|
|
components = append(components, dCmp{"tracing", func() error {
|
|
traceConfig, err := tracingconfig.ToTracingConfig(c.appCfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
updated, err := tracing.Setup(ctx, *traceConfig)
|
|
if updated {
|
|
c.log.Info(ctx, logs.FrostFSNodeTracingConfigationUpdated)
|
|
}
|
|
return err
|
|
}})
|
|
if cmp, updated := metricsComponent(c); updated {
|
|
if cmp.enabled {
|
|
cmp.preReload = enableMetricsSvc
|
|
} else {
|
|
cmp.preReload = disableMetricsSvc
|
|
}
|
|
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
|
}
|
|
if cmp, updated := pprofComponent(c); updated {
|
|
components = append(components, dCmp{cmp.name, func() error { return cmp.reload(ctx) }})
|
|
}
|
|
|
|
return components
|
|
}
|
|
|
|
func (c *cfg) reloadPools() error {
|
|
newSize := objectconfig.Put(c.appCfg).PoolSizeLocal()
|
|
c.reloadPool(c.cfgObject.pool.putLocal, newSize, "object.put.local_pool_size")
|
|
|
|
newSize = objectconfig.Put(c.appCfg).PoolSizeRemote()
|
|
c.reloadPool(c.cfgObject.pool.putRemote, newSize, "object.put.remote_pool_size")
|
|
|
|
newSize = replicatorconfig.PoolSize(c.appCfg)
|
|
c.reloadPool(c.cfgObject.pool.replication, newSize, "replicator.pool_size")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *cfg) reloadPool(p *ants.Pool, newSize int, name string) {
|
|
oldSize := p.Cap()
|
|
if oldSize != newSize {
|
|
c.log.Info(context.Background(), logs.FrostFSNodePoolConfigurationUpdate, zap.String("field", name),
|
|
zap.Int("old", oldSize), zap.Int("new", newSize))
|
|
p.Tune(newSize)
|
|
}
|
|
}
|
|
|
|
func (c *cfg) reloadAppConfig() error {
|
|
unlock := c.LockAppConfigExclusive()
|
|
defer unlock()
|
|
|
|
return c.readConfig(c.appCfg)
|
|
}
|
|
|
|
func (c *cfg) createTombstoneSource() *tombstone.ExpirationChecker {
|
|
var tssPrm tsourse.TombstoneSourcePrm
|
|
tssPrm.SetGetService(c.cfgObject.getSvc)
|
|
tombstoneSrc := tsourse.NewSource(tssPrm)
|
|
|
|
tombstoneSource := tombstone.NewChecker(
|
|
tombstone.WithLogger(c.log),
|
|
tombstone.WithTombstoneSource(tombstoneSrc),
|
|
)
|
|
return tombstoneSource
|
|
}
|
|
|
|
func (c *cfg) createContainerInfoProvider(ctx context.Context) container.InfoProvider {
|
|
return container.NewInfoProvider(func() (container.Source, error) {
|
|
c.initMorphComponents(ctx)
|
|
cc, err := containerClient.NewFromMorph(c.cfgMorph.client, c.cfgContainer.scriptHash, 0)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return containerClient.AsContainerSource(cc), nil
|
|
})
|
|
}
|
|
|
|
func (c *cfg) shutdown(ctx context.Context) {
|
|
old := c.swapHealthStatus(ctx, control.HealthStatus_SHUTTING_DOWN)
|
|
if old == control.HealthStatus_SHUTTING_DOWN {
|
|
c.log.Info(ctx, logs.FrostFSNodeShutdownSkip)
|
|
return
|
|
}
|
|
if old == control.HealthStatus_STARTING {
|
|
c.log.Warn(ctx, logs.FrostFSNodeShutdownWhenNotReady)
|
|
}
|
|
|
|
c.ctxCancel()
|
|
close(c.done)
|
|
for i := range c.closers {
|
|
c.closers[len(c.closers)-1-i].fn()
|
|
}
|
|
|
|
if err := sdnotify.ClearStatus(); err != nil {
|
|
c.log.Error(ctx, logs.FailedToReportStatusToSystemd, zap.Error(err))
|
|
}
|
|
}
|