frostfs-node/cmd/neofs-node/config.go

780 lines
19 KiB
Go
Raw Normal View History

package main
import (
"context"
"crypto/ecdsa"
"errors"
"net"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-node/misc"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
cntwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/timer"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
"github.com/nspcc-dev/neofs-node/pkg/services/util/response"
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
)
const (
// logger keys
cfgLogLevel = "logger.level"
// pprof keys
cfgProfilerAddr = "profiler.address"
cfgProfilerShutdownTimeout = "profiler.shutdown_timeout"
// metrics keys
cfgMetricsAddr = "metrics.address"
cfgMetricsShutdownTimeout = "metrics.shutdown_timeout"
// config keys for cfgNodeInfo
cfgNodeKey = "node.key"
cfgBootstrapAddress = "node.address"
cfgNodeAttributePrefix = "node.attribute"
cfgNodeRelay = "node.relay"
// config keys for cfgGRPC
cfgListenAddress = "grpc.endpoint"
cfgTLSEnabled = "grpc.tls.enabled"
cfgTLSCertFile = "grpc.tls.certificate"
cfgTLSKeyFile = "grpc.tls.key"
// config keys for API client cache
cfgAPIClientDialTimeout = "apiclient.dial_timeout"
// config keys for cfgMorph
cfgMorphRPCAddress = "morph.rpc_endpoint"
cfgMorphNotifyRPCAddress = "morph.notification_endpoint"
cfgMorphNotifyDialTimeout = "morph.dial_timeout"
cfgMainChainRPCAddress = "mainchain.rpc_endpoint"
cfgMainChainDialTimeout = "mainchain.dial_timeout"
// config keys for cfgAccounting
cfgAccountingContract = "accounting.scripthash"
// config keys for cfgNetmap
cfgNetmapContract = "netmap.scripthash"
// config keys for cfgContainer
cfgContainerContract = "container.scripthash"
// config keys for cfgReputation
cfgReputationContract = "reputation.scripthash"
cfgPolicerHeadTimeout = "policer.head_timeout"
cfgReplicatorPutTimeout = "replicator.put_timeout"
cfgObjectPutPoolSize = "object.put.pool_size"
)
const (
cfgLocalStorageSection = "storage"
cfgStorageShardSection = "shard"
cfgShardUseWriteCache = "use_write_cache"
cfgBlobStorSection = "blobstor"
cfgWriteCacheSection = "writecache"
cfgWriteCacheMemSize = "mem_size"
cfgWriteCacheDBSize = "db_size"
cfgWriteCacheSmallSize = "small_size"
cfgWriteCacheMaxSize = "max_size"
cfgWriteCacheWrkCount = "workers_count"
cfgBlobStorCompress = "compress"
cfgBlobStorShallowDepth = "shallow_depth"
cfgBlobStorTreePath = "path"
cfgBlobStorTreePerm = "perm"
cfgBlobStorSmallSzLimit = "small_size_limit"
cfgBlobStorBlzSection = "blobovnicza"
cfgBlzSize = "size"
cfgBlzShallowDepth = "shallow_depth"
cfgBlzShallowWidth = "shallow_width"
cfgBlzOpenedCacheSize = "opened_cache_size"
cfgMetaBaseSection = "metabase"
cfgMetaBasePath = "path"
cfgMetaBasePerm = "perm"
cfgGCSection = "gc"
cfgGCRemoverBatchSize = "remover_batch_size"
cfgGCRemoverSleepInt = "remover_sleep_interval"
)
const (
addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding
)
const maxMsgSize = 4 << 20 // transport msg limit 4 MiB
// capacity of the pools of the morph notification handlers
// for each contract listener.
const notificationHandlerPoolSize = 10
type cfg struct {
ctx context.Context
ctxCancel func()
internalErr chan error // channel for internal application errors at runtime
viper *viper.Viper
log *zap.Logger
wg *sync.WaitGroup
key *ecdsa.PrivateKey
apiVersion *pkg.Version
cfgGRPC cfgGRPC
cfgMorph cfgMorph
cfgAccounting cfgAccounting
cfgContainer cfgContainer
cfgNetmap cfgNetmap
privateTokenStore *tokenStorage.TokenStore
cfgNodeInfo cfgNodeInfo
localAddr *network.Address
cfgObject cfgObject
metricsCollector *metrics.StorageMetrics
workers []worker
respSvc *response.Service
cfgControlService cfgControlService
netStatus *atomic.Int32
healthStatus *atomic.Int32
closers []func()
cfgReputation cfgReputation
mainChainClient *client.Client
}
type cfgGRPC struct {
listener net.Listener
server *grpc.Server
maxChunkSize uint64
maxAddrAmount uint64
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
}
type cfgMorph struct {
client *client.Client
blockTimers []*timer.BlockTimer // all combined timers
eigenTrustTimer *timer.BlockTimer // timer for EigenTrust iterations
}
type cfgAccounting struct {
scriptHash util.Uint160
}
type cfgContainer struct {
scriptHash util.Uint160
parsers map[event.Type]event.Parser
subscribers map[event.Type][]event.Handler
workerPool util2.WorkerPool // pool for asynchronous handlers
}
type cfgNetmap struct {
scriptHash util.Uint160
wrapper *nmwrapper.Wrapper
parsers map[event.Type]event.Parser
subscribers map[event.Type][]event.Handler
workerPool util2.WorkerPool // pool for asynchronous handlers
state *networkState
reBootstrapEnabled bool
reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime
startEpoch uint64 // epoch number when application is started
}
type BootstrapType uint32
type cfgNodeInfo struct {
// values from config
bootType BootstrapType
attributes []*netmap.NodeAttribute
// values at runtime
infoMtx sync.RWMutex
info netmap.NodeInfo
}
type cfgObject struct {
netMapStorage netmapCore.Source
cnrStorage container.Source
cnrClient *cntwrapper.Wrapper
pool cfgObjectRoutines
cfgLocalStorage cfgLocalStorage
}
type cfgLocalStorage struct {
localStorage *engine.StorageEngine
shardOpts [][]shard.Option
}
type cfgObjectRoutines struct {
put *ants.Pool
}
type cfgControlService struct {
server *grpc.Server
}
type cfgReputation struct {
workerPool util2.WorkerPool // pool for EigenTrust algorithm's iterations
localTrustStorage *truststorage.Storage
localTrustCtrl *trustcontroller.Controller
scriptHash util.Uint160
}
const (
_ BootstrapType = iota
StorageNode
RelayNode
)
func initCfg(path string) *cfg {
viperCfg := initViper(path)
key, err := crypto.LoadPrivateKey(viperCfg.GetString(cfgNodeKey))
fatalOnErr(err)
u160Accounting, err := util.Uint160DecodeStringLE(
viperCfg.GetString(cfgAccountingContract))
fatalOnErr(err)
u160Netmap, err := util.Uint160DecodeStringLE(
viperCfg.GetString(cfgNetmapContract))
fatalOnErr(err)
u160Container, err := util.Uint160DecodeStringLE(
viperCfg.GetString(cfgContainerContract))
fatalOnErr(err)
u160Reputation, err := util.Uint160DecodeStringLE(
viperCfg.GetString(cfgReputationContract))
fatalOnErr(err)
var logPrm logger.Prm
err = logPrm.SetLevelString(
viperCfg.GetString(cfgLogLevel),
)
fatalOnErr(err)
log, err := logger.NewLogger(logPrm)
fatalOnErr(err)
netAddr, err := network.AddressFromString(viperCfg.GetString(cfgBootstrapAddress))
fatalOnErr(err)
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes
var (
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
)
if viperCfg.GetBool(cfgTLSEnabled) {
tlsEnabled = true
tlsCertFile = viperCfg.GetString(cfgTLSCertFile)
tlsKeyFile = viperCfg.GetString(cfgTLSKeyFile)
}
if tlsEnabled {
netAddr.AddTLS()
}
state := newNetworkState()
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
reputationWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
relayOnly := viperCfg.GetBool(cfgNodeRelay)
c := &cfg{
ctx: context.Background(),
internalErr: make(chan error),
viper: viperCfg,
log: log,
wg: new(sync.WaitGroup),
key: key,
apiVersion: pkg.SDKVersion(),
cfgAccounting: cfgAccounting{
scriptHash: u160Accounting,
},
cfgContainer: cfgContainer{
scriptHash: u160Container,
workerPool: containerWorkerPool,
},
cfgNetmap: cfgNetmap{
scriptHash: u160Netmap,
state: state,
workerPool: netmapWorkerPool,
reBootstrapEnabled: !relayOnly,
reBoostrapTurnedOff: atomic.NewBool(relayOnly),
},
cfgNodeInfo: cfgNodeInfo{
bootType: StorageNode,
attributes: parseAttributes(viperCfg),
},
cfgGRPC: cfgGRPC{
maxChunkSize: maxChunkSize,
maxAddrAmount: maxAddrAmount,
tlsEnabled: tlsEnabled,
tlsCertFile: tlsCertFile,
tlsKeyFile: tlsKeyFile,
},
localAddr: netAddr,
respSvc: response.NewService(
response.WithNetworkState(state),
),
cfgObject: cfgObject{
pool: initObjectPool(viperCfg),
},
netStatus: atomic.NewInt32(int32(control.NetmapStatus_STATUS_UNDEFINED)),
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
cfgReputation: cfgReputation{
scriptHash: u160Reputation,
workerPool: reputationWorkerPool,
},
}
if c.viper.GetString(cfgMetricsAddr) != "" {
c.metricsCollector = metrics.NewStorageMetrics()
}
initLocalStorage(c)
return c
}
func initViper(path string) *viper.Viper {
v := viper.New()
v.SetEnvPrefix(misc.Prefix)
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
defaultConfiguration(v)
if path != "" {
v.SetConfigFile(path)
v.SetConfigType("yml")
fatalOnErr(v.ReadInConfig())
}
return v
}
func defaultConfiguration(v *viper.Viper) {
v.SetDefault(cfgNodeKey, "") // node key
v.SetDefault(cfgBootstrapAddress, "") // announced address of the node
v.SetDefault(cfgNodeRelay, false)
v.SetDefault(cfgMorphRPCAddress, []string{})
v.SetDefault(cfgMorphNotifyRPCAddress, []string{})
v.SetDefault(cfgMorphNotifyDialTimeout, 5*time.Second)
v.SetDefault(cfgListenAddress, "127.0.0.1:50501") // listen address
v.SetDefault(cfgTLSEnabled, false)
v.SetDefault(cfgTLSCertFile, "")
v.SetDefault(cfgTLSKeyFile, "")
v.SetDefault(cfgAPIClientDialTimeout, 5*time.Second)
v.SetDefault(cfgAccountingContract, "")
v.SetDefault(cfgContainerContract, "")
v.SetDefault(cfgNetmapContract, "")
v.SetDefault(cfgLogLevel, "info")
v.SetDefault(cfgProfilerShutdownTimeout, "30s")
v.SetDefault(cfgMetricsShutdownTimeout, "30s")
v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second)
v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second)
v.SetDefault(cfgObjectPutPoolSize, 10)
v.SetDefault(cfgCtrlSvcAuthorizedKeys, []string{})
}
func (c *cfg) LocalAddress() *network.Address {
return c.localAddr
}
func initLocalStorage(c *cfg) {
initShardOptions(c)
engineOpts := []engine.Option{engine.WithLogger(c.log)}
if c.metricsCollector != nil {
engineOpts = append(engineOpts, engine.WithMetrics(c.metricsCollector))
}
ls := engine.New(engineOpts...)
for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts {
id, err := ls.AddShard(opts...)
fatalOnErr(err)
c.log.Info("shard attached to engine",
zap.Stringer("id", id),
)
}
c.cfgObject.cfgLocalStorage.localStorage = ls
c.onShutdown(func() {
c.log.Info("closing components of the storage engine...")
err := ls.Close()
if err != nil {
c.log.Info("storage engine closing failure",
zap.String("error", err.Error()),
)
} else {
c.log.Info("all components of the storage engine closed successfully")
}
})
}
func initShardOptions(c *cfg) {
var opts [][]shard.Option
for i := 0; ; i++ {
prefix := configPath(
cfgLocalStorageSection,
cfgStorageShardSection,
strconv.Itoa(i),
)
useCache := c.viper.GetBool(
configPath(prefix, cfgShardUseWriteCache),
)
writeCachePrefix := configPath(prefix, cfgWriteCacheSection)
writeCachePath := c.viper.GetString(
configPath(writeCachePrefix, cfgBlobStorTreePath),
)
if useCache && writeCachePath == "" {
c.log.Warn("incorrect writeCache path, ignore shard")
break
}
writeCacheMemSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMemSize))
writeCacheDBSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheDBSize))
writeCacheSmallSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheSmallSize))
writeCacheMaxSize := c.viper.GetUint64(configPath(writeCachePrefix, cfgWriteCacheMaxSize))
writeCacheWrkCount := c.viper.GetInt(configPath(writeCachePrefix, cfgWriteCacheWrkCount))
blobPrefix := configPath(prefix, cfgBlobStorSection)
blobPath := c.viper.GetString(
configPath(blobPrefix, cfgBlobStorTreePath),
)
if blobPath == "" {
c.log.Warn("incorrect blobStor path, ignore shard")
break
}
compressObjects := c.viper.GetBool(
configPath(blobPrefix, cfgBlobStorCompress),
)
blobPerm := os.FileMode(c.viper.GetInt(
configPath(blobPrefix, cfgBlobStorTreePerm),
))
shallowDepth := c.viper.GetInt(
configPath(blobPrefix, cfgBlobStorShallowDepth),
)
smallSzLimit := c.viper.GetUint64(
configPath(blobPrefix, cfgBlobStorSmallSzLimit),
)
if smallSzLimit == 0 {
smallSzLimit = 1 << 20 // 1MB
}
if writeCacheMaxSize <= 0 {
writeCacheSmallSize = smallSzLimit
}
blzPrefix := configPath(blobPrefix, cfgBlobStorBlzSection)
blzSize := c.viper.GetUint64(
configPath(blzPrefix, cfgBlzSize),
)
if blzSize == 0 {
blzSize = 1 << 30 // 1 GB
}
blzShallowDepth := c.viper.GetUint64(
configPath(blzPrefix, cfgBlzShallowDepth),
)
blzShallowWidth := c.viper.GetUint64(
configPath(blzPrefix, cfgBlzShallowWidth),
)
blzCacheSize := c.viper.GetInt(
configPath(blzPrefix, cfgBlzOpenedCacheSize),
)
metaPrefix := configPath(prefix, cfgMetaBaseSection)
metaPath := c.viper.GetString(
configPath(metaPrefix, cfgMetaBasePath),
)
metaPerm := os.FileMode(c.viper.GetUint32(
configPath(metaPrefix, cfgMetaBasePerm),
))
fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm))
gcPrefix := configPath(prefix, cfgGCSection)
rmBatchSize := c.viper.GetInt(
configPath(gcPrefix, cfgGCRemoverBatchSize),
)
rmSleepInterval := c.viper.GetDuration(
configPath(gcPrefix, cfgGCRemoverSleepInt),
)
opts = append(opts, []shard.Option{
shard.WithLogger(c.log),
shard.WithBlobStorOptions(
blobstor.WithRootPath(blobPath),
blobstor.WithCompressObjects(compressObjects, c.log),
blobstor.WithRootPerm(blobPerm),
blobstor.WithShallowDepth(shallowDepth),
blobstor.WithSmallSizeLimit(smallSzLimit),
blobstor.WithBlobovniczaSize(blzSize),
blobstor.WithBlobovniczaShallowDepth(blzShallowDepth),
blobstor.WithBlobovniczaShallowWidth(blzShallowWidth),
blobstor.WithBlobovniczaOpenedCacheSize(blzCacheSize),
blobstor.WithLogger(c.log),
),
shard.WithMetaBaseOptions(
meta.WithLogger(c.log),
meta.WithPath(metaPath),
meta.WithPermissions(metaPerm),
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
}),
),
shard.WithWriteCache(useCache),
shard.WithWriteCacheOptions(
writecache.WithPath(writeCachePath),
writecache.WithLogger(c.log),
writecache.WithMaxMemSize(writeCacheMemSize),
writecache.WithMaxObjectSize(writeCacheMaxSize),
writecache.WithSmallObjectSize(writeCacheSmallSize),
writecache.WithMaxDBSize(writeCacheDBSize),
writecache.WithFlushWorkersCount(writeCacheWrkCount),
),
shard.WithRemoverBatchSize(rmBatchSize),
shard.WithGCRemoverSleepInterval(rmSleepInterval),
shard.WithGCWorkerPoolInitializer(func(sz int) util2.WorkerPool {
pool, err := ants.NewPool(sz)
fatalOnErr(err)
return pool
}),
shard.WithGCEventChannelInitializer(func() <-chan shard.Event {
ch := make(chan shard.Event)
addNewEpochNotificationHandler(c, func(ev event.Event) {
ch <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
})
return ch
}),
})
c.log.Info("storage shard options",
zap.Bool("with write cache", useCache),
zap.String("with write cache path", writeCachePath),
zap.String("BLOB path", blobPath),
zap.Stringer("BLOB permissions", blobPerm),
zap.Bool("BLOB compress", compressObjects),
zap.Int("BLOB shallow depth", shallowDepth),
zap.Uint64("BLOB small size limit", smallSzLimit),
zap.String("metabase path", metaPath),
zap.Stringer("metabase permissions", metaPerm),
zap.Int("GC remover batch size", rmBatchSize),
zap.Duration("GC remover sleep interval", rmSleepInterval),
)
}
if len(opts) == 0 {
fatalOnErr(errors.New("no correctly set up shards, exit"))
}
c.cfgObject.cfgLocalStorage.shardOpts = opts
}
func configPath(sections ...string) string {
return strings.Join(sections, ".")
}
func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) {
var err error
optNonBlocking := ants.WithNonblocking(true)
pool.put, err = ants.NewPool(cfg.GetInt(cfgObjectPutPoolSize), optNonBlocking)
if err != nil {
fatalOnErr(err)
}
return pool
}
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
ni := c.localNodeInfo()
return ni.ToV2(), nil
}
// handleLocalNodeInfo rewrites local node info
func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) {
c.cfgNodeInfo.infoMtx.Lock()
if ni != nil {
c.cfgNodeInfo.info = *ni
}
c.updateStatusWithoutLock(ni)
c.cfgNodeInfo.infoMtx.Unlock()
}
// handleNodeInfoStatus updates node info status without rewriting whole local
// node info status
func (c *cfg) handleNodeInfoStatus(ni *netmap.NodeInfo) {
c.cfgNodeInfo.infoMtx.Lock()
c.updateStatusWithoutLock(ni)
c.cfgNodeInfo.infoMtx.Unlock()
}
func (c *cfg) localNodeInfo() netmap.NodeInfo {
c.cfgNodeInfo.infoMtx.RLock()
defer c.cfgNodeInfo.infoMtx.RUnlock()
return c.cfgNodeInfo.info
}
func (c *cfg) toOnlineLocalNodeInfo() *netmap.NodeInfo {
ni := c.localNodeInfo()
ni.SetState(netmap.NodeStateOnline)
return &ni
}
func (c *cfg) updateStatusWithoutLock(ni *netmap.NodeInfo) {
var nmState netmap.NodeState
if ni != nil {
nmState = ni.State()
} else {
nmState = netmap.NodeStateOffline
c.cfgNodeInfo.info.SetState(nmState)
}
switch nmState {
default:
c.setNetmapStatus(control.NetmapStatus_STATUS_UNDEFINED)
case netmap.NodeStateOnline:
c.setNetmapStatus(control.NetmapStatus_ONLINE)
case netmap.NodeStateOffline:
c.setNetmapStatus(control.NetmapStatus_OFFLINE)
}
}