frostfs-node/cmd/neofs-node/config.go
Alex Vanin 44b19c145f [#493] cmd/node: Use new config for control service configuration
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
2021-06-02 14:58:22 +03:00

583 lines
14 KiB
Go

package main
import (
"context"
"crypto/ecdsa"
"net"
"os"
"path"
"strings"
"sync"
"time"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-api-go/pkg"
"github.com/nspcc-dev/neofs-api-go/pkg/netmap"
netmapV2 "github.com/nspcc-dev/neofs-api-go/v2/netmap"
crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
contractsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/contracts"
engineconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine"
shardconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard"
grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc"
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
metricsconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/metrics"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
"github.com/nspcc-dev/neofs-node/misc"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
cntwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
netmap2 "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-node/pkg/morph/timer"
"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
trustcontroller "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/controller"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
tokenStorage "github.com/nspcc-dev/neofs-node/pkg/services/session/storage"
"github.com/nspcc-dev/neofs-node/pkg/services/util/response"
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper"
"go.etcd.io/bbolt"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
)
const (
// config keys for API client cache
cfgAPIClientDialTimeout = "apiclient.dial_timeout"
// config keys for cfgMorph
cfgMorphRPCAddress = "morph.rpc_endpoint"
cfgMorphNotifyRPCAddress = "morph.notification_endpoint"
cfgMorphNotifyDialTimeout = "morph.dial_timeout"
cfgMainChainRPCAddress = "mainchain.rpc_endpoint"
cfgMainChainDialTimeout = "mainchain.dial_timeout"
cfgPolicerHeadTimeout = "policer.head_timeout"
cfgReplicatorPutTimeout = "replicator.put_timeout"
cfgObjectPutPoolSize = "object.put.pool_size"
)
const (
addressSize = 72 // 32 bytes oid, 32 bytes cid, 8 bytes protobuf encoding
)
const maxMsgSize = 4 << 20 // transport msg limit 4 MiB
// capacity of the pools of the morph notification handlers
// for each contract listener.
const notificationHandlerPoolSize = 10
type cfg struct {
ctx context.Context
appCfg *config.Config
ctxCancel func()
internalErr chan error // channel for internal application errors at runtime
viper *viper.Viper
log *zap.Logger
wg *sync.WaitGroup
key *ecdsa.PrivateKey
apiVersion *pkg.Version
cfgGRPC cfgGRPC
cfgMorph cfgMorph
cfgAccounting cfgAccounting
cfgContainer cfgContainer
cfgNetmap cfgNetmap
privateTokenStore *tokenStorage.TokenStore
cfgNodeInfo cfgNodeInfo
localAddr *network.Address
cfgObject cfgObject
metricsCollector *metrics.StorageMetrics
workers []worker
respSvc *response.Service
cfgControlService cfgControlService
netStatus *atomic.Int32
healthStatus *atomic.Int32
closers []func()
cfgReputation cfgReputation
mainChainClient *client.Client
}
type cfgGRPC struct {
listener net.Listener
server *grpc.Server
maxChunkSize uint64
maxAddrAmount uint64
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
}
type cfgMorph struct {
client *client.Client
blockTimers []*timer.BlockTimer // all combined timers
eigenTrustTimer *timer.BlockTimer // timer for EigenTrust iterations
}
type cfgAccounting struct {
scriptHash util.Uint160
}
type cfgContainer struct {
scriptHash util.Uint160
parsers map[event.Type]event.Parser
subscribers map[event.Type][]event.Handler
workerPool util2.WorkerPool // pool for asynchronous handlers
}
type cfgNetmap struct {
scriptHash util.Uint160
wrapper *nmwrapper.Wrapper
parsers map[event.Type]event.Parser
subscribers map[event.Type][]event.Handler
workerPool util2.WorkerPool // pool for asynchronous handlers
state *networkState
reBootstrapEnabled bool
reBoostrapTurnedOff *atomic.Bool // managed by control service in runtime
startEpoch uint64 // epoch number when application is started
}
type BootstrapType uint32
type cfgNodeInfo struct {
// values from config
bootType BootstrapType
attributes []*netmap.NodeAttribute
// values at runtime
infoMtx sync.RWMutex
info netmap.NodeInfo
}
type cfgObject struct {
netMapStorage netmapCore.Source
cnrStorage container.Source
cnrClient *cntwrapper.Wrapper
pool cfgObjectRoutines
cfgLocalStorage cfgLocalStorage
}
type cfgLocalStorage struct {
localStorage *engine.StorageEngine
shardOpts [][]shard.Option
}
type cfgObjectRoutines struct {
put *ants.Pool
}
type cfgControlService struct {
server *grpc.Server
}
type cfgReputation struct {
workerPool util2.WorkerPool // pool for EigenTrust algorithm's iterations
localTrustStorage *truststorage.Storage
localTrustCtrl *trustcontroller.Controller
scriptHash util.Uint160
}
const (
_ BootstrapType = iota
StorageNode
RelayNode
)
func initCfg(path string) *cfg {
var p config.Prm
appCfg := config.New(p,
config.WithConfigFile(path),
)
viperCfg := initViper(path)
key, err := crypto.LoadPrivateKey(nodeconfig.Key(appCfg))
fatalOnErr(err)
var logPrm logger.Prm
err = logPrm.SetLevelString(
loggerconfig.Level(appCfg),
)
fatalOnErr(err)
log, err := logger.NewLogger(logPrm)
fatalOnErr(err)
netAddr := nodeconfig.BootstrapAddress(appCfg)
maxChunkSize := uint64(maxMsgSize) * 3 / 4 // 25% to meta, 75% to payload
maxAddrAmount := uint64(maxChunkSize) / addressSize // each address is about 72 bytes
var (
tlsEnabled bool
tlsCertFile string
tlsKeyFile string
tlsConfig = grpcconfig.TLS(appCfg)
)
if tlsConfig.Enabled() {
tlsEnabled = true
tlsCertFile = tlsConfig.CertificateFile()
tlsKeyFile = tlsConfig.KeyFile()
}
if tlsEnabled {
netAddr.AddTLS()
}
state := newNetworkState()
containerWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
netmapWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
reputationWorkerPool, err := ants.NewPool(notificationHandlerPoolSize)
fatalOnErr(err)
relayOnly := nodeconfig.Relay(appCfg)
c := &cfg{
ctx: context.Background(),
appCfg: appCfg,
internalErr: make(chan error),
viper: viperCfg,
log: log,
wg: new(sync.WaitGroup),
key: key,
apiVersion: pkg.SDKVersion(),
cfgAccounting: cfgAccounting{
scriptHash: contractsconfig.Balance(appCfg),
},
cfgContainer: cfgContainer{
scriptHash: contractsconfig.Container(appCfg),
workerPool: containerWorkerPool,
},
cfgNetmap: cfgNetmap{
scriptHash: contractsconfig.Netmap(appCfg),
state: state,
workerPool: netmapWorkerPool,
reBootstrapEnabled: !relayOnly,
reBoostrapTurnedOff: atomic.NewBool(relayOnly),
},
cfgNodeInfo: cfgNodeInfo{
bootType: StorageNode,
attributes: parseAttributes(appCfg),
},
cfgGRPC: cfgGRPC{
maxChunkSize: maxChunkSize,
maxAddrAmount: maxAddrAmount,
tlsEnabled: tlsEnabled,
tlsCertFile: tlsCertFile,
tlsKeyFile: tlsKeyFile,
},
localAddr: netAddr,
respSvc: response.NewService(
response.WithNetworkState(state),
),
cfgObject: cfgObject{
pool: initObjectPool(viperCfg),
},
netStatus: atomic.NewInt32(int32(control.NetmapStatus_STATUS_UNDEFINED)),
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
cfgReputation: cfgReputation{
scriptHash: contractsconfig.Reputation(appCfg),
workerPool: reputationWorkerPool,
},
}
if metricsconfig.Address(c.appCfg) != "" {
c.metricsCollector = metrics.NewStorageMetrics()
}
initLocalStorage(c)
return c
}
func initViper(path string) *viper.Viper {
v := viper.New()
v.SetEnvPrefix(misc.Prefix)
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
defaultConfiguration(v)
if path != "" {
v.SetConfigFile(path)
v.SetConfigType("yml")
fatalOnErr(v.ReadInConfig())
}
return v
}
func defaultConfiguration(v *viper.Viper) {
v.SetDefault(cfgMorphRPCAddress, []string{})
v.SetDefault(cfgMorphNotifyRPCAddress, []string{})
v.SetDefault(cfgMorphNotifyDialTimeout, 5*time.Second)
v.SetDefault(cfgAPIClientDialTimeout, 5*time.Second)
v.SetDefault(cfgPolicerHeadTimeout, 5*time.Second)
v.SetDefault(cfgReplicatorPutTimeout, 5*time.Second)
v.SetDefault(cfgObjectPutPoolSize, 10)
}
func (c *cfg) LocalAddress() *network.Address {
return c.localAddr
}
func initLocalStorage(c *cfg) {
initShardOptions(c)
engineOpts := []engine.Option{engine.WithLogger(c.log)}
if c.metricsCollector != nil {
engineOpts = append(engineOpts, engine.WithMetrics(c.metricsCollector))
}
ls := engine.New(engineOpts...)
for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts {
id, err := ls.AddShard(opts...)
fatalOnErr(err)
c.log.Info("shard attached to engine",
zap.Stringer("id", id),
)
}
c.cfgObject.cfgLocalStorage.localStorage = ls
c.onShutdown(func() {
c.log.Info("closing components of the storage engine...")
err := ls.Close()
if err != nil {
c.log.Info("storage engine closing failure",
zap.String("error", err.Error()),
)
} else {
c.log.Info("all components of the storage engine closed successfully")
}
})
}
func initShardOptions(c *cfg) {
var opts [][]shard.Option
engineconfig.IterateShards(c.appCfg, func(sc *shardconfig.Config) {
var writeCacheOpts []writecache.Option
useWriteCache := sc.UseWriteCache()
if useWriteCache {
writeCacheCfg := sc.WriteCache()
writeCacheOpts = []writecache.Option{
writecache.WithPath(writeCacheCfg.Path()),
writecache.WithLogger(c.log),
writecache.WithMaxMemSize(writeCacheCfg.MemSize()),
writecache.WithMaxObjectSize(writeCacheCfg.MaxObjectSize()),
writecache.WithSmallObjectSize(writeCacheCfg.SmallObjectSize()),
writecache.WithMaxDBSize(writeCacheCfg.MaxDBSize()),
writecache.WithFlushWorkersCount(writeCacheCfg.WorkersNumber()),
}
}
blobStorCfg := sc.BlobStor()
blobovniczaCfg := blobStorCfg.Blobovnicza()
metabaseCfg := sc.Metabase()
gcCfg := sc.GC()
metaPath := metabaseCfg.Path()
metaPerm := metabaseCfg.Perm()
fatalOnErr(os.MkdirAll(path.Dir(metaPath), metaPerm))
opts = append(opts, []shard.Option{
shard.WithLogger(c.log),
shard.WithBlobStorOptions(
blobstor.WithRootPath(blobStorCfg.Path()),
blobstor.WithCompressObjects(blobStorCfg.Compress(), c.log),
blobstor.WithRootPerm(blobStorCfg.Perm()),
blobstor.WithShallowDepth(blobStorCfg.ShallowDepth()),
blobstor.WithSmallSizeLimit(blobStorCfg.SmallSizeLimit()),
blobstor.WithBlobovniczaSize(blobovniczaCfg.Size()),
blobstor.WithBlobovniczaShallowDepth(blobovniczaCfg.ShallowDepth()),
blobstor.WithBlobovniczaShallowWidth(blobovniczaCfg.ShallowWidth()),
blobstor.WithBlobovniczaOpenedCacheSize(blobovniczaCfg.OpenedCacheSize()),
blobstor.WithLogger(c.log),
),
shard.WithMetaBaseOptions(
meta.WithLogger(c.log),
meta.WithPath(metaPath),
meta.WithPermissions(metaPerm),
meta.WithBoltDBOptions(&bbolt.Options{
Timeout: 100 * time.Millisecond,
}),
),
shard.WithWriteCache(useWriteCache),
shard.WithWriteCacheOptions(writeCacheOpts...),
shard.WithRemoverBatchSize(gcCfg.RemoverBatchSize()),
shard.WithGCRemoverSleepInterval(gcCfg.RemoverSleepInterval()),
shard.WithGCWorkerPoolInitializer(func(sz int) util2.WorkerPool {
pool, err := ants.NewPool(sz)
fatalOnErr(err)
return pool
}),
shard.WithGCEventChannelInitializer(func() <-chan shard.Event {
ch := make(chan shard.Event)
addNewEpochNotificationHandler(c, func(ev event.Event) {
ch <- shard.EventNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
})
return ch
}),
})
})
c.cfgObject.cfgLocalStorage.shardOpts = opts
}
func initObjectPool(cfg *viper.Viper) (pool cfgObjectRoutines) {
var err error
optNonBlocking := ants.WithNonblocking(true)
pool.put, err = ants.NewPool(cfg.GetInt(cfgObjectPutPoolSize), optNonBlocking)
if err != nil {
fatalOnErr(err)
}
return pool
}
func (c *cfg) LocalNodeInfo() (*netmapV2.NodeInfo, error) {
ni := c.localNodeInfo()
return ni.ToV2(), nil
}
// handleLocalNodeInfo rewrites local node info
func (c *cfg) handleLocalNodeInfo(ni *netmap.NodeInfo) {
c.cfgNodeInfo.infoMtx.Lock()
if ni != nil {
c.cfgNodeInfo.info = *ni
}
c.updateStatusWithoutLock(ni)
c.cfgNodeInfo.infoMtx.Unlock()
}
// handleNodeInfoStatus updates node info status without rewriting whole local
// node info status
func (c *cfg) handleNodeInfoStatus(ni *netmap.NodeInfo) {
c.cfgNodeInfo.infoMtx.Lock()
c.updateStatusWithoutLock(ni)
c.cfgNodeInfo.infoMtx.Unlock()
}
func (c *cfg) localNodeInfo() netmap.NodeInfo {
c.cfgNodeInfo.infoMtx.RLock()
defer c.cfgNodeInfo.infoMtx.RUnlock()
return c.cfgNodeInfo.info
}
func (c *cfg) toOnlineLocalNodeInfo() *netmap.NodeInfo {
ni := c.localNodeInfo()
ni.SetState(netmap.NodeStateOnline)
return &ni
}
func (c *cfg) updateStatusWithoutLock(ni *netmap.NodeInfo) {
var nmState netmap.NodeState
if ni != nil {
nmState = ni.State()
} else {
nmState = netmap.NodeStateOffline
c.cfgNodeInfo.info.SetState(nmState)
}
switch nmState {
default:
c.setNetmapStatus(control.NetmapStatus_STATUS_UNDEFINED)
case netmap.NodeStateOnline:
c.setNetmapStatus(control.NetmapStatus_ONLINE)
case netmap.NodeStateOffline:
c.setNetmapStatus(control.NetmapStatus_OFFLINE)
}
}