All checks were successful
ci/woodpecker/push/pre-commit Pipeline was successful
Fix NPE, introduced in f09ee27a
Signed-off-by: Evgenii Stratonikov <>
759 lines
23 KiB
759 lines
23 KiB
package innerring
import (
cont ""
nodevalidator ""
addrvalidator ""
statevalidation ""
subnetvalidator ""
auditSettlement ""
auditClient ""
balanceClient ""
cntClient ""
frostfsClient ""
nmClient ""
repClient ""
morphsubnet ""
audittask ""
control ""
controlsrv ""
reputationcommon ""
util2 ""
utilConfig ""
func (s *Server) initNetmapProcessor(cfg *viper.Viper,
cnrClient *container.Client,
alphaSync event.Handler,
subnetClient *morphsubnet.Client,
auditProcessor *audit.Processor,
settlementProcessor *settlement.Processor) error {
locodeValidator, err := s.newLocodeValidator(cfg)
if err != nil {
return err
subnetValidator, err := subnetvalidator.New(
SubnetClient: subnetClient,
if err != nil {
return err
netSettings := (*networkSettings)(s.netmapClient)
var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator
s.netmapProcessor, err = netmap.New(&netmap.Params{
Log: s.log,
PoolSize: cfg.GetInt("workers.netmap"),
NetmapClient: s.netmapClient,
EpochTimer: s,
EpochState: s,
AlphabetState: s,
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
ContainerWrapper: cnrClient,
HandleAudit: s.onlyActiveEventHandler(
NotaryDepositHandler: s.onlyAlphabetEventHandler(
AuditSettlementsHandler: s.onlyAlphabetEventHandler(
AlphabetSyncHandler: s.onlyAlphabetEventHandler(
NodeValidator: nodevalidator.New(
NotaryDisabled: s.sideNotaryConfig.disabled,
SubnetContract: &s.contracts.subnet,
NodeStateSettings: netSettings,
if err != nil {
return err
return bindMorphProcessor(s.netmapProcessor, s)
func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *chainParams, errChan chan<- error) error {
s.withoutMainNet = cfg.GetBool("without_mainnet")
if s.withoutMainNet {
// This works as long as event Listener starts listening loop once,
// otherwise Server.Start will run two similar routines.
// This behavior most likely will not change.
s.mainnetListener = s.morphListener
s.mainnetClient = s.morphClient
return nil
mainnetChain := morphChain
| = mainnetPrefix
mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry}
fromMainChainBlock, err := s.persistate.UInt32(persistateMainChainLastBlockKey)
if err != nil {
fromMainChainBlock = 0
s.log.Warn("can't get last processed main chain block number", zap.String("error", err.Error()))
mainnetChain.from = fromMainChainBlock
// create mainnet client
s.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
if err != nil {
return err
// create mainnet listener
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain)
return err
func (s *Server) enableNotarySupport() error {
if !s.sideNotaryConfig.disabled {
// enable notary support in the side client
err := s.morphClient.EnableNotarySupport(
if err != nil {
return fmt.Errorf("could not enable side chain notary support: %w", err)
s.morphListener.EnableNotarySupport(s.contracts.proxy, s.morphClient.Committee, s.morphClient)
if !s.mainNotaryConfig.disabled {
// enable notary support in the main client
err := s.mainnetClient.EnableNotarySupport(
if err != nil {
return fmt.Errorf("could not enable main chain notary support: %w", err)
return nil
func (s *Server) initNotaryConfig(cfg *viper.Viper) {
s.mainNotaryConfig, s.sideNotaryConfig = notaryConfigs(
!s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
s.log.Info("notary support",
zap.Bool("sidechain_enabled", !s.sideNotaryConfig.disabled),
zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled),
func (s *Server) createAuditProcessor(cfg *viper.Viper, clientCache *ClientCache, cnrClient *container.Client) (*audit.Processor, error) {
auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size"))
if err != nil {
return nil, err
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
porPoolSize := cfg.GetInt("audit.por.pool_size")
// create audit processor dependencies
auditTaskManager := audittask.New(
audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(pdpPoolSize)
audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(porPoolSize)
s.workers = append(s.workers, auditTaskManager.Listen)
// create audit processor
return audit.New(&audit.Params{
Log: s.log,
NetmapClient: s.netmapClient,
ContainerClient: cnrClient,
IRList: s,
EpochSource: s,
SGSource: clientCache,
Key: &s.key.PrivateKey,
RPCSearchTimeout: cfg.GetDuration(""),
TaskManager: auditTaskManager,
Reporter: s,
func (s *Server) createSettlementProcessor(clientCache *ClientCache, cnrClient *container.Client) *settlement.Processor {
// create settlement processor dependencies
settlementDeps := settlementDeps{
log: s.log,
cnrSrc: cntClient.AsContainerSource(cnrClient),
auditClient: s.auditClient,
nmClient: s.netmapClient,
clientCache: clientCache,
balanceClient: s.balanceClient,
settlementDeps.settlementCtx = auditSettlementContext
auditCalcDeps := &auditSettlementDeps{
settlementDeps: settlementDeps,
settlementDeps.settlementCtx = basicIncomeSettlementContext
basicSettlementDeps := &basicIncomeSettlementDeps{
settlementDeps: settlementDeps,
cnrClient: cnrClient,
auditSettlementCalc := auditSettlement.NewCalculator(
ResultStorage: auditCalcDeps,
ContainerStorage: auditCalcDeps,
PlacementCalculator: auditCalcDeps,
SGStorage: auditCalcDeps,
AccountStorage: auditCalcDeps,
Exchanger: auditCalcDeps,
AuditFeeFetcher: s.netmapClient,
// create settlement processor
return settlement.New(
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps},
State: s,
func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Client, irf irFetcher) (event.Handler, error) {
var alphaSync event.Handler
if s.withoutMainNet || cfg.GetBool("governance.disable") {
alphaSync = func(event.Event) {
s.log.Debug("alphabet keys sync is disabled")
} else {
// create governance processor
governanceProcessor, err := governance.New(&governance.Params{
Log: s.log,
FrostFSClient: frostfsCli,
NetmapClient: s.netmapClient,
AlphabetState: s,
EpochState: s,
Voter: s,
IRFetcher: irf,
MorphClient: s.morphClient,
MainnetClient: s.mainnetClient,
NotaryDisabled: s.sideNotaryConfig.disabled,
if err != nil {
return nil, err
alphaSync = governanceProcessor.HandleAlphabetSync
err = bindMainnetProcessor(governanceProcessor, s)
if err != nil {
return nil, err
return alphaSync, nil
func (s *Server) createIRFetcher() irFetcher {
var irf irFetcher
if s.withoutMainNet || !s.mainNotaryConfig.disabled {
// if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs
// (naming `...WithNotary` will not always be correct)
irf = NewIRFetcherWithNotary(s.morphClient)
} else {
irf = NewIRFetcherWithoutNotary(s.netmapClient)
return irf
func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) {
s.epochTimer = newEpochTimer(&epochTimerArgs{
l: s.log,
alphabetState: s,
newEpochHandlers: s.newEpochTickHandlers(),
cnrWrapper: morphClients.CnrClient,
epoch: s,
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),
stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"),
collectBasicIncome: subEpochEventHandler{
handler: processors.SettlementProcessor.HandleIncomeCollectionEvent,
durationMul: cfg.GetUint32("timers.collect_basic_income.mul"),
durationDiv: cfg.GetUint32("timers.collect_basic_income.div"),
distributeBasicIncome: subEpochEventHandler{
handler: processors.SettlementProcessor.HandleIncomeDistributionEvent,
durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"),
durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"),
// initialize emission timer
emissionTimer := newEmissionTimer(&emitTimerArgs{
ap: processors.AlphabetProcessor,
emitDuration: cfg.GetUint32("timers.emit"),
func (s *Server) createMorphSubnetClient() (*morphsubnet.Client, error) {
// initialize morph client of Subnet contract
clientMode := morphsubnet.NotaryAlphabet
if s.sideNotaryConfig.disabled {
clientMode = morphsubnet.NonNotary
subnetInitPrm := morphsubnet.InitPrm{}
subnetClient := &morphsubnet.Client{}
err := subnetClient.Init(subnetInitPrm)
if err != nil {
return nil, fmt.Errorf("could not initialize subnet client: %w", err)
return subnetClient, nil
func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) {
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil {
return nil, err
// create alphabet processor
alphabetProcessor, err := alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets,
Log: s.log,
PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: s.contracts.alphabet,
NetmapClient: s.netmapClient,
MorphClient: s.morphClient,
IRList: s,
StorageEmission: cfg.GetUint64(""),
if err != nil {
return nil, err
err = bindMorphProcessor(alphabetProcessor, s)
if err != nil {
return nil, err
return alphabetProcessor, nil
func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client,
frostfsIDClient *frostfsid.Client, subnetClient *morphsubnet.Client) error {
// container processor
containerProcessor, err := cont.New(&cont.Params{
Log: s.log,
PoolSize: cfg.GetInt("workers.container"),
AlphabetState: s,
ContainerClient: cnrClient,
FrostFSIDClient: frostfsIDClient,
NetworkState: s.netmapClient,
NotaryDisabled: s.sideNotaryConfig.disabled,
SubnetClient: subnetClient,
if err != nil {
return err
return bindMorphProcessor(containerProcessor, s)
func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClient.Client) error {
// create balance processor
balanceProcessor, err := balance.New(&balance.Params{
Log: s.log,
PoolSize: cfg.GetInt("workers.balance"),
FrostFSClient: frostfsCli,
BalanceSC: s.contracts.balance,
AlphabetState: s,
Converter: &s.precision,
if err != nil {
return err
return bindMorphProcessor(balanceProcessor, s)
func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *frostfsid.Client) error {
if s.withoutMainNet {
return nil
frostfsProcessor, err := frostfs.New(&frostfs.Params{
Log: s.log,
PoolSize: cfg.GetInt("workers.frostfs"),
FrostFSContract: s.contracts.frostfs,
FrostFSIDClient: frostfsIDClient,
BalanceClient: s.balanceClient,
NetmapClient: s.netmapClient,
MorphClient: s.morphClient,
EpochState: s,
AlphabetState: s,
Converter: &s.precision,
MintEmitCacheSize: cfg.GetInt(""),
MintEmitThreshold: cfg.GetUint64(""),
MintEmitValue: fixedn.Fixed8(cfg.GetInt64("")),
GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"),
if err != nil {
return err
return bindMainnetProcessor(frostfsProcessor, s)
func (s *Server) initReputationProcessor(cfg *viper.Viper, sidechainFee fixedn.Fixed8) error {
repClient, err := repClient.NewFromMorph(s.morphClient, s.contracts.reputation, sidechainFee, repClient.TryNotary(), repClient.AsAlphabet())
if err != nil {
return err
// create reputation processor
reputationProcessor, err := reputation.New(&reputation.Params{
Log: s.log,
PoolSize: cfg.GetInt("workers.reputation"),
EpochState: s,
AlphabetState: s,
ReputationWrapper: repClient,
ManagerBuilder: reputationcommon.NewManagerBuilder(
NetMapSource: s.netmapClient,
NotaryDisabled: s.sideNotaryConfig.disabled,
if err != nil {
return err
return bindMorphProcessor(reputationProcessor, s)
func (s *Server) initGRPCServer(cfg *viper.Viper) error {
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint == "" {
s.log.Info("no Control server endpoint specified, service is disabled")
return nil
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))
for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return fmt.Errorf("could not parse Control authorized key %s: %w",
authKeys = append(authKeys, key)
var p controlsrv.Prm
controlSvc := controlsrv.New(p,
grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)
s.runners = append(s.runners, func(ch chan<- error) error {
lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return err
go func() {
ch <- grpcControlSrv.Serve(lis)
return nil
return nil
type serverMorphClients struct {
CnrClient *cntClient.Client
FrostFSIDClient *frostfsid.Client
FrostFSClient *frostfsClient.Client
MorphSubnetClient *morphsubnet.Client
func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
result := &serverMorphClients{}
var err error
fee := s.feeConfig.SideChainFee()
// do not use TryNotary() in audit wrapper
// audit operations do not require multisignatures
s.auditClient, err = auditClient.NewFromMorph(s.morphClient, s.contracts.audit, fee)
if err != nil {
return nil, err
// form morph container client's options
morphCnrOpts := make([]cntClient.Option, 0, 3)
morphCnrOpts = append(morphCnrOpts,
if s.sideNotaryConfig.disabled {
// in non-notary environments we customize fee for named container registration
// because it takes much more additional GAS than other operations.
morphCnrOpts = append(morphCnrOpts,
result.CnrClient, err = cntClient.NewFromMorph(s.morphClient, s.contracts.container, fee, morphCnrOpts...)
if err != nil {
return nil, err
s.netmapClient, err = nmClient.NewFromMorph(s.morphClient, s.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet())
if err != nil {
return nil, err
s.balanceClient, err = balanceClient.NewFromMorph(s.morphClient, s.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet())
if err != nil {
return nil, err
result.FrostFSIDClient, err = frostfsid.NewFromMorph(s.morphClient, s.contracts.frostfsID, fee, frostfsid.TryNotary(), frostfsid.AsAlphabet())
if err != nil {
return nil, err
result.FrostFSClient, err = frostfsClient.NewFromMorph(s.mainnetClient, s.contracts.frostfs,
s.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet())
if err != nil {
return nil, err
result.MorphSubnetClient, err = s.createMorphSubnetClient()
if err != nil {
return nil, err
return result, nil
type serverProcessors struct {
AlphabetProcessor *alphabet.Processor
SettlementProcessor *settlement.Processor
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) {
result := &serverProcessors{}
fee := s.feeConfig.SideChainFee()
irf := s.createIRFetcher()
s.statusIndex = newInnerRingIndexer(
clientCache := newClientCache(&clientCacheParams{
Log: s.log,
Key: &s.key.PrivateKey,
SGTimeout: cfg.GetDuration("audit.timeout.get"),
HeadTimeout: cfg.GetDuration("audit.timeout.head"),
RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"),
AllowExternal: cfg.GetBool("audit.allow_external"),
// create audit processor
auditProcessor, err := s.createAuditProcessor(cfg, clientCache, morphClients.CnrClient)
if err != nil {
return nil, err
result.SettlementProcessor = s.createSettlementProcessor(clientCache, morphClients.CnrClient)
var alphaSync event.Handler
alphaSync, err = s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
if err != nil {
return nil, err
err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync, morphClients.MorphSubnetClient, auditProcessor, result.SettlementProcessor)
if err != nil {
return nil, err
err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient, morphClients.MorphSubnetClient)
if err != nil {
return nil, err
err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient)
if err != nil {
return nil, err
err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient)
if err != nil {
return nil, err
result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg)
if err != nil {
return nil, err
err = s.initReputationProcessor(cfg, fee)
if err != nil {
return nil, err
return result, nil
func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) {
fromSideChainBlock, err := s.persistate.UInt32(persistateSideChainLastBlockKey)
if err != nil {
fromSideChainBlock = 0
s.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
morphChain := &chainParams{
log: s.log,
cfg: cfg,
key: s.key,
name: morphPrefix,
from: fromSideChainBlock,
// create morph client
s.morphClient, err = createClient(ctx, morphChain, errChan)
if err != nil {
return nil, err
// create morph listener
s.morphListener, err = createListener(ctx, s.morphClient, morphChain)
if err != nil {
return nil, err
if err := s.morphClient.SetGroupSignerScope(); err != nil {
morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err))
return morphChain, nil
func (s *Server) initContracts(cfg *viper.Viper) error {
var err error
// get all script hashes of contracts
s.contracts, err = parseContracts(
return err
func (s *Server) initKey(cfg *viper.Viper) error {
// prepare inner ring node private key
acc, err := utilConfig.LoadAccount(
if err != nil {
return fmt.Errorf("ir: %w", err)
s.key = acc.PrivateKey()
return nil
func (s *Server) initMetrics(cfg *viper.Viper) {
if cfg.GetString("prometheus.address") == "" {
m := metrics.NewInnerRingMetrics()
s.metrics = &m