From 4d160bd4ab864b4402f518468f5bee18890b4d4a Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 29 Mar 2023 16:54:15 +0300 Subject: [PATCH] [#185] ir: Refactor ir service creation Resolve funlen linter for New function Signed-off-by: Dmitrii Stepanov --- pkg/innerring/initialization.go | 756 ++++++++++++++++++++++++++++++++ pkg/innerring/innerring.go | 609 +------------------------ 2 files changed, 775 insertions(+), 590 deletions(-) create mode 100644 pkg/innerring/initialization.go diff --git a/pkg/innerring/initialization.go b/pkg/innerring/initialization.go new file mode 100644 index 0000000000..31a1bcd605 --- /dev/null +++ b/pkg/innerring/initialization.go @@ -0,0 +1,756 @@ +package innerring + +import ( + "context" + "encoding/hex" + "fmt" + "net" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance" + cont "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container" + frostfs "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap" + nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation" + addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress" + statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state" + subnetvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/subnet" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/reputation" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement" + auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" + auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit" + balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" + cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" + frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" + nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" + repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation" + morphsubnet "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/subnet" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" + audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager" + control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir" + controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server" + reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common" + util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" + "github.com/panjf2000/ants/v2" + "github.com/spf13/viper" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +func (s *Server) initNetmapProcessor(cfg *viper.Viper, + cnrClient *container.Client, + alphaSync event.Handler, + subnetClient *morphsubnet.Client, + auditProcessor *audit.Processor, + settlementProcessor *settlement.Processor) error { + locodeValidator, err := s.newLocodeValidator(cfg) + if err != nil { + return err + } + + subnetValidator, err := subnetvalidator.New( + subnetvalidator.Prm{ + SubnetClient: subnetClient, + }, + ) + if err != nil { + return err + } + + netSettings := (*networkSettings)(s.netmapClient) + + var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator + netMapCandidateStateValidator.SetNetworkSettings(netSettings) + + s.netmapProcessor, err = netmap.New(&netmap.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.netmap"), + NetmapClient: s.netmapClient, + EpochTimer: s, + EpochState: s, + AlphabetState: s, + CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), + CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), + ContainerWrapper: cnrClient, + HandleAudit: s.onlyActiveEventHandler( + auditProcessor.StartAuditHandler(), + ), + NotaryDepositHandler: s.onlyAlphabetEventHandler( + s.notaryHandler, + ), + AuditSettlementsHandler: s.onlyAlphabetEventHandler( + settlementProcessor.HandleAuditEvent, + ), + AlphabetSyncHandler: alphaSync, + NodeValidator: nodevalidator.New( + &netMapCandidateStateValidator, + addrvalidator.New(), + locodeValidator, + subnetValidator, + ), + NotaryDisabled: s.sideNotaryConfig.disabled, + SubnetContract: &s.contracts.subnet, + + NodeStateSettings: netSettings, + }) + + if err != nil { + return err + } + + return bindMorphProcessor(s.netmapProcessor, s) +} + +func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *chainParams, errChan chan<- error) error { + s.withoutMainNet = cfg.GetBool("without_mainnet") + if s.withoutMainNet { + // This works as long as event Listener starts listening loop once, + // otherwise Server.Start will run two similar routines. + // This behavior most likely will not change. + s.mainnetListener = s.morphListener + s.mainnetClient = s.morphClient + return nil + } + + mainnetChain := morphChain + mainnetChain.name = mainnetPrefix + mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry} + + fromMainChainBlock, err := s.persistate.UInt32(persistateMainChainLastBlockKey) + if err != nil { + fromMainChainBlock = 0 + s.log.Warn("can't get last processed main chain block number", zap.String("error", err.Error())) + } + mainnetChain.from = fromMainChainBlock + + // create mainnet client + s.mainnetClient, err = createClient(ctx, mainnetChain, errChan) + if err != nil { + return err + } + + // create mainnet listener + s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain) + return err +} + +func (s *Server) enableNotarySupport() error { + if !s.sideNotaryConfig.disabled { + // enable notary support in the side client + err := s.morphClient.EnableNotarySupport( + client.WithProxyContract(s.contracts.proxy), + ) + if err != nil { + return fmt.Errorf("could not enable side chain notary support: %w", err) + } + + s.morphListener.EnableNotarySupport(s.contracts.proxy, s.morphClient.Committee, s.morphClient) + } + + if !s.mainNotaryConfig.disabled { + // enable notary support in the main client + err := s.mainnetClient.EnableNotarySupport( + client.WithProxyContract(s.contracts.processing), + client.WithAlphabetSource(s.morphClient.Committee), + ) + if err != nil { + return fmt.Errorf("could not enable main chain notary support: %w", err) + } + } + + return nil +} + +func (s *Server) initNotaryConfig(cfg *viper.Viper) { + s.mainNotaryConfig, s.sideNotaryConfig = notaryConfigs( + s.morphClient.ProbeNotary(), + !s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too + ) + + s.log.Info("notary support", + zap.Bool("sidechain_enabled", !s.sideNotaryConfig.disabled), + zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled), + ) +} + +func (s *Server) createAuditProcessor(cfg *viper.Viper, clientCache *ClientCache, cnrClient *container.Client) (*audit.Processor, error) { + auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size")) + if err != nil { + return nil, err + } + + pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") + porPoolSize := cfg.GetInt("audit.por.pool_size") + + // create audit processor dependencies + auditTaskManager := audittask.New( + audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), + audittask.WithWorkerPool(auditPool), + audittask.WithLogger(s.log), + audittask.WithContainerCommunicator(clientCache), + audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")), + audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) { + return ants.NewPool(pdpPoolSize) + }), + audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) { + return ants.NewPool(porPoolSize) + }), + ) + + s.workers = append(s.workers, auditTaskManager.Listen) + + // create audit processor + return audit.New(&audit.Params{ + Log: s.log, + NetmapClient: s.netmapClient, + ContainerClient: cnrClient, + IRList: s, + EpochSource: s, + SGSource: clientCache, + Key: &s.key.PrivateKey, + RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), + TaskManager: auditTaskManager, + Reporter: s, + }) +} + +func (s *Server) createSettlementProcessor(clientCache *ClientCache, cnrClient *container.Client) *settlement.Processor { + // create settlement processor dependencies + settlementDeps := settlementDeps{ + log: s.log, + cnrSrc: cntClient.AsContainerSource(cnrClient), + auditClient: s.auditClient, + nmClient: s.netmapClient, + clientCache: clientCache, + balanceClient: s.balanceClient, + } + + settlementDeps.settlementCtx = auditSettlementContext + auditCalcDeps := &auditSettlementDeps{ + settlementDeps: settlementDeps, + } + + settlementDeps.settlementCtx = basicIncomeSettlementContext + basicSettlementDeps := &basicIncomeSettlementDeps{ + settlementDeps: settlementDeps, + cnrClient: cnrClient, + } + + auditSettlementCalc := auditSettlement.NewCalculator( + &auditSettlement.CalculatorPrm{ + ResultStorage: auditCalcDeps, + ContainerStorage: auditCalcDeps, + PlacementCalculator: auditCalcDeps, + SGStorage: auditCalcDeps, + AccountStorage: auditCalcDeps, + Exchanger: auditCalcDeps, + AuditFeeFetcher: s.netmapClient, + }, + auditSettlement.WithLogger(s.log), + ) + + // create settlement processor + return settlement.New( + settlement.Prm{ + AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc), + BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps}, + State: s, + }, + settlement.WithLogger(s.log), + ) +} + +func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Client, irf irFetcher) (event.Handler, error) { + var alphaSync event.Handler + + if s.withoutMainNet || cfg.GetBool("governance.disable") { + alphaSync = func(event.Event) { + s.log.Debug("alphabet keys sync is disabled") + } + } else { + // create governance processor + governanceProcessor, err := governance.New(&governance.Params{ + Log: s.log, + FrostFSClient: frostfsCli, + NetmapClient: s.netmapClient, + AlphabetState: s, + EpochState: s, + Voter: s, + IRFetcher: irf, + MorphClient: s.morphClient, + MainnetClient: s.mainnetClient, + NotaryDisabled: s.sideNotaryConfig.disabled, + }) + if err != nil { + return nil, err + } + + alphaSync = governanceProcessor.HandleAlphabetSync + err = bindMainnetProcessor(governanceProcessor, s) + if err != nil { + return nil, err + } + } + + return alphaSync, nil +} + +func (s *Server) createIRFetcher() irFetcher { + var irf irFetcher + + if s.withoutMainNet || !s.mainNotaryConfig.disabled { + // if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs + // (naming `...WithNotary` will not always be correct) + irf = NewIRFetcherWithNotary(s.morphClient) + } else { + irf = NewIRFetcherWithoutNotary(s.netmapClient) + } + + return irf +} + +func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) { + s.epochTimer = newEpochTimer(&epochTimerArgs{ + l: s.log, + newEpochHandlers: s.newEpochTickHandlers(), + cnrWrapper: morphClients.CnrClient, + epoch: s, + stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"), + stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"), + collectBasicIncome: subEpochEventHandler{ + handler: processors.SettlementProcessor.HandleIncomeCollectionEvent, + durationMul: cfg.GetUint32("timers.collect_basic_income.mul"), + durationDiv: cfg.GetUint32("timers.collect_basic_income.div"), + }, + distributeBasicIncome: subEpochEventHandler{ + handler: processors.SettlementProcessor.HandleIncomeDistributionEvent, + durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"), + durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"), + }, + }) + + s.addBlockTimer(s.epochTimer) + + // initialize emission timer + emissionTimer := newEmissionTimer(&emitTimerArgs{ + ap: processors.AlphabetProcessor, + emitDuration: cfg.GetUint32("timers.emit"), + }) + + s.addBlockTimer(emissionTimer) +} + +func (s *Server) createMorphSubnetClient() (*morphsubnet.Client, error) { + // initialize morph client of Subnet contract + clientMode := morphsubnet.NotaryAlphabet + + if s.sideNotaryConfig.disabled { + clientMode = morphsubnet.NonNotary + } + + subnetInitPrm := morphsubnet.InitPrm{} + subnetInitPrm.SetBaseClient(s.morphClient) + subnetInitPrm.SetContractAddress(s.contracts.subnet) + subnetInitPrm.SetMode(clientMode) + + subnetClient := &morphsubnet.Client{} + err := subnetClient.Init(subnetInitPrm) + if err != nil { + return nil, fmt.Errorf("could not initialize subnet client: %w", err) + } + return subnetClient, nil +} + +func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) { + parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) + if err != nil { + return nil, err + } + + // create alphabet processor + alphabetProcessor, err := alphabet.New(&alphabet.Params{ + ParsedWallets: parsedWallets, + Log: s.log, + PoolSize: cfg.GetInt("workers.alphabet"), + AlphabetContracts: s.contracts.alphabet, + NetmapClient: s.netmapClient, + MorphClient: s.morphClient, + IRList: s, + StorageEmission: cfg.GetUint64("emit.storage.amount"), + }) + if err != nil { + return nil, err + } + + err = bindMorphProcessor(alphabetProcessor, s) + if err != nil { + return nil, err + } + + return alphabetProcessor, nil +} + +func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client, + frostfsIDClient *frostfsid.Client, subnetClient *morphsubnet.Client) error { + // container processor + containerProcessor, err := cont.New(&cont.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.container"), + AlphabetState: s, + ContainerClient: cnrClient, + FrostFSIDClient: frostfsIDClient, + NetworkState: s.netmapClient, + NotaryDisabled: s.sideNotaryConfig.disabled, + SubnetClient: subnetClient, + }) + if err != nil { + return err + } + + return bindMorphProcessor(containerProcessor, s) +} + +func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClient.Client) error { + // create balance processor + balanceProcessor, err := balance.New(&balance.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.balance"), + FrostFSClient: frostfsCli, + BalanceSC: s.contracts.balance, + AlphabetState: s, + Converter: &s.precision, + }) + if err != nil { + return err + } + + return bindMorphProcessor(balanceProcessor, s) +} + +func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *frostfsid.Client) error { + if s.withoutMainNet { + return nil + } + + frostfsProcessor, err := frostfs.New(&frostfs.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.frostfs"), + FrostFSContract: s.contracts.frostfs, + FrostFSIDClient: frostfsIDClient, + BalanceClient: s.balanceClient, + NetmapClient: s.netmapClient, + MorphClient: s.morphClient, + EpochState: s, + AlphabetState: s, + Converter: &s.precision, + MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"), + MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"), + MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")), + GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"), + }) + if err != nil { + return err + } + + return bindMainnetProcessor(frostfsProcessor, s) +} + +func (s *Server) initReputationProcessor(cfg *viper.Viper, sidechainFee fixedn.Fixed8) error { + repClient, err := repClient.NewFromMorph(s.morphClient, s.contracts.reputation, sidechainFee, repClient.TryNotary(), repClient.AsAlphabet()) + if err != nil { + return err + } + + // create reputation processor + reputationProcessor, err := reputation.New(&reputation.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.reputation"), + EpochState: s, + AlphabetState: s, + ReputationWrapper: repClient, + ManagerBuilder: reputationcommon.NewManagerBuilder( + reputationcommon.ManagersPrm{ + NetMapSource: s.netmapClient, + }, + ), + NotaryDisabled: s.sideNotaryConfig.disabled, + }) + if err != nil { + return err + } + + return bindMorphProcessor(reputationProcessor, s) +} + +func (s *Server) initGRPCServer(cfg *viper.Viper) error { + controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") + if controlSvcEndpoint == "" { + s.log.Info("no Control server endpoint specified, service is disabled") + return nil + } + + authKeysStr := cfg.GetStringSlice("control.authorized_keys") + authKeys := make([][]byte, 0, len(authKeysStr)) + + for i := range authKeysStr { + key, err := hex.DecodeString(authKeysStr[i]) + if err != nil { + return fmt.Errorf("could not parse Control authorized key %s: %w", + authKeysStr[i], + err, + ) + } + + authKeys = append(authKeys, key) + } + + var p controlsrv.Prm + + p.SetPrivateKey(*s.key) + p.SetHealthChecker(s) + + controlSvc := controlsrv.New(p, + controlsrv.WithAllowedKeys(authKeys), + ) + + grpcControlSrv := grpc.NewServer() + control.RegisterControlServiceServer(grpcControlSrv, controlSvc) + + s.runners = append(s.runners, func(ch chan<- error) error { + lis, err := net.Listen("tcp", controlSvcEndpoint) + if err != nil { + return err + } + + go func() { + ch <- grpcControlSrv.Serve(lis) + }() + return nil + }) + + s.registerNoErrCloser(grpcControlSrv.GracefulStop) + return nil +} + +type serverMorphClients struct { + CnrClient *cntClient.Client + FrostFSIDClient *frostfsid.Client + FrostFSClient *frostfsClient.Client + MorphSubnetClient *morphsubnet.Client +} + +func (s *Server) initClientsFromMorph() (*serverMorphClients, error) { + result := &serverMorphClients{} + var err error + + fee := s.feeConfig.SideChainFee() + // do not use TryNotary() in audit wrapper + // audit operations do not require multisignatures + s.auditClient, err = auditClient.NewFromMorph(s.morphClient, s.contracts.audit, fee) + if err != nil { + return nil, err + } + + // form morph container client's options + morphCnrOpts := make([]cntClient.Option, 0, 3) + morphCnrOpts = append(morphCnrOpts, + cntClient.TryNotary(), + cntClient.AsAlphabet(), + ) + + if s.sideNotaryConfig.disabled { + // in non-notary environments we customize fee for named container registration + // because it takes much more additional GAS than other operations. + morphCnrOpts = append(morphCnrOpts, + cntClient.WithCustomFeeForNamedPut(s.feeConfig.NamedContainerRegistrationFee()), + ) + } + + result.CnrClient, err = cntClient.NewFromMorph(s.morphClient, s.contracts.container, fee, morphCnrOpts...) + if err != nil { + return nil, err + } + + s.netmapClient, err = nmClient.NewFromMorph(s.morphClient, s.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet()) + if err != nil { + return nil, err + } + + s.balanceClient, err = balanceClient.NewFromMorph(s.morphClient, s.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet()) + if err != nil { + return nil, err + } + + result.FrostFSIDClient, err = frostfsid.NewFromMorph(s.morphClient, s.contracts.frostfsID, fee, frostfsid.TryNotary(), frostfsid.AsAlphabet()) + if err != nil { + return nil, err + } + + result.FrostFSClient, err = frostfsClient.NewFromMorph(s.mainnetClient, s.contracts.frostfs, + s.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet()) + if err != nil { + return nil, err + } + + result.MorphSubnetClient, err = s.createMorphSubnetClient() + if err != nil { + return nil, err + } + + return result, nil +} + +type serverProcessors struct { + AlphabetProcessor *alphabet.Processor + SettlementProcessor *settlement.Processor +} + +func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) { + result := &serverProcessors{} + + fee := s.feeConfig.SideChainFee() + + irf := s.createIRFetcher() + + s.statusIndex = newInnerRingIndexer( + s.morphClient, + irf, + s.key.PublicKey(), + cfg.GetDuration("indexer.cache_timeout"), + ) + + clientCache := newClientCache(&clientCacheParams{ + Log: s.log, + Key: &s.key.PrivateKey, + SGTimeout: cfg.GetDuration("audit.timeout.get"), + HeadTimeout: cfg.GetDuration("audit.timeout.head"), + RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"), + AllowExternal: cfg.GetBool("audit.allow_external"), + }) + + s.registerNoErrCloser(clientCache.cache.CloseAll) + + // create audit processor + auditProcessor, err := s.createAuditProcessor(cfg, clientCache, morphClients.CnrClient) + if err != nil { + return nil, err + } + + result.SettlementProcessor = s.createSettlementProcessor(clientCache, morphClients.CnrClient) + + var alphaSync event.Handler + alphaSync, err = s.createAlphaSync(cfg, morphClients.FrostFSClient, irf) + if err != nil { + return nil, err + } + + err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync, morphClients.MorphSubnetClient, auditProcessor, result.SettlementProcessor) + if err != nil { + return nil, err + } + + err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient, morphClients.MorphSubnetClient) + if err != nil { + return nil, err + } + + err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient) + if err != nil { + return nil, err + } + + err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient) + if err != nil { + return nil, err + } + + result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg) + if err != nil { + return nil, err + } + + err = s.initReputationProcessor(cfg, fee) + if err != nil { + return nil, err + } + + return result, nil +} + +func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) { + fromSideChainBlock, err := s.persistate.UInt32(persistateSideChainLastBlockKey) + if err != nil { + fromSideChainBlock = 0 + s.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) + } + + morphChain := &chainParams{ + log: s.log, + cfg: cfg, + key: s.key, + name: morphPrefix, + from: fromSideChainBlock, + } + + // create morph client + s.morphClient, err = createClient(ctx, morphChain, errChan) + if err != nil { + return nil, err + } + + // create morph listener + s.morphListener, err = createListener(ctx, s.morphClient, morphChain) + if err != nil { + return nil, err + } + if err := s.morphClient.SetGroupSignerScope(); err != nil { + morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err)) + } + + return morphChain, nil +} + +func (s *Server) initContracts(cfg *viper.Viper) error { + var err error + // get all script hashes of contracts + s.contracts, err = parseContracts( + cfg, + s.morphClient, + s.withoutMainNet, + s.mainNotaryConfig.disabled, + s.sideNotaryConfig.disabled, + ) + + return err +} + +func (s *Server) initKey(cfg *viper.Viper) error { + // prepare inner ring node private key + acc, err := utilConfig.LoadAccount( + cfg.GetString("wallet.path"), + cfg.GetString("wallet.address"), + cfg.GetString("wallet.password")) + if err != nil { + return fmt.Errorf("ir: %w", err) + } + + s.key = acc.PrivateKey() + return nil +} + +func (s *Server) initMetrics(cfg *viper.Viper) { + if cfg.GetString("prometheus.address") == "" { + return + } + m := metrics.NewInnerRingMetrics() + s.metrics = &m +} diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 8f41c267b3..063d0f7cd6 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -2,47 +2,23 @@ package innerring import ( "context" - "encoding/hex" "errors" "fmt" "io" - "net" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap" - nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation" - addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress" - statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state" - subnetvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/subnet" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/reputation" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement" - auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit" timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit" balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" - cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" - frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" - repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation" - morphsubnet "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/subnet" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/timer" - audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager" control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir" - controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server" - reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common" - util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/precision" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state" @@ -50,13 +26,10 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/address" - "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/util" - "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "go.uber.org/atomic" "go.uber.org/zap" - "google.golang.org/grpc" ) type ( @@ -354,8 +327,6 @@ func (s *Server) registerStarter(f func() error) { } // New creates instance of inner ring sever structure. -// -// nolint: funlen, gocognit func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan<- error) (*Server, error) { var err error server := &Server{log: log} @@ -365,128 +336,38 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan // parse notary support server.feeConfig = config.NewFeeConfig(cfg) - // prepare inner ring node private key - acc, err := utilConfig.LoadAccount( - cfg.GetString("wallet.path"), - cfg.GetString("wallet.address"), - cfg.GetString("wallet.password")) + err = server.initKey(cfg) if err != nil { - return nil, fmt.Errorf("ir: %w", err) + return nil, err } - server.key = acc.PrivateKey() - server.persistate, err = initPersistentStateStorage(cfg) if err != nil { return nil, err } server.registerCloser(server.persistate.Close) - fromSideChainBlock, err := server.persistate.UInt32(persistateSideChainLastBlockKey) - if err != nil { - fromSideChainBlock = 0 - log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) - } - - morphChain := &chainParams{ - log: log, - cfg: cfg, - key: server.key, - name: morphPrefix, - from: fromSideChainBlock, - } - - // create morph client - server.morphClient, err = createClient(ctx, morphChain, errChan) + var morphChain *chainParams + morphChain, err = server.initMorph(ctx, cfg, errChan) if err != nil { return nil, err } - // create morph listener - server.morphListener, err = createListener(ctx, server.morphClient, morphChain) - if err != nil { - return nil, err - } - if err := server.morphClient.SetGroupSignerScope(); err != nil { - morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err)) - } - - server.withoutMainNet = cfg.GetBool("without_mainnet") - - if server.withoutMainNet { - // This works as long as event Listener starts listening loop once, - // otherwise Server.Start will run two similar routines. - // This behavior most likely will not change. - server.mainnetListener = server.morphListener - server.mainnetClient = server.morphClient - } else { - mainnetChain := morphChain - mainnetChain.name = mainnetPrefix - mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry} - - fromMainChainBlock, err := server.persistate.UInt32(persistateMainChainLastBlockKey) - if err != nil { - fromMainChainBlock = 0 - log.Warn("can't get last processed main chain block number", zap.String("error", err.Error())) - } - mainnetChain.from = fromMainChainBlock - - // create mainnet client - server.mainnetClient, err = createClient(ctx, mainnetChain, errChan) - if err != nil { - return nil, err - } - - // create mainnet listener - server.mainnetListener, err = createListener(ctx, server.mainnetClient, mainnetChain) - if err != nil { - return nil, err - } - } - - server.mainNotaryConfig, server.sideNotaryConfig = notaryConfigs( - server.morphClient.ProbeNotary(), - !server.withoutMainNet && server.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too - ) - - log.Info("notary support", - zap.Bool("sidechain_enabled", !server.sideNotaryConfig.disabled), - zap.Bool("mainchain_enabled", !server.mainNotaryConfig.disabled), - ) - - // get all script hashes of contracts - server.contracts, err = parseContracts( - cfg, - server.morphClient, - server.withoutMainNet, - server.mainNotaryConfig.disabled, - server.sideNotaryConfig.disabled, - ) + err = server.initMainnet(ctx, cfg, morphChain, errChan) if err != nil { return nil, err } - if !server.sideNotaryConfig.disabled { - // enable notary support in the side client - err = server.morphClient.EnableNotarySupport( - client.WithProxyContract(server.contracts.proxy), - ) - if err != nil { - return nil, fmt.Errorf("could not enable side chain notary support: %w", err) - } + server.initNotaryConfig(cfg) - server.morphListener.EnableNotarySupport(server.contracts.proxy, server.morphClient.Committee, server.morphClient) + err = server.initContracts(cfg) + if err != nil { + return nil, err } - if !server.mainNotaryConfig.disabled { - // enable notary support in the main client - err = server.mainnetClient.EnableNotarySupport( - client.WithProxyContract(server.contracts.processing), - client.WithAlphabetSource(server.morphClient.Committee), - ) - if err != nil { - return nil, fmt.Errorf("could not enable main chain notary support: %w", err) - } + err = server.enableNotarySupport() + if err != nil { + return nil, err } // parse default validators @@ -497,482 +378,30 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan server.pubKey = server.key.PublicKey().Bytes() - auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size")) + var morphClients *serverMorphClients + morphClients, err = server.initClientsFromMorph() if err != nil { return nil, err } - fee := server.feeConfig.SideChainFee() - - // do not use TryNotary() in audit wrapper - // audit operations do not require multisignatures - server.auditClient, err = auditClient.NewFromMorph(server.morphClient, server.contracts.audit, fee) + var processors *serverProcessors + processors, err = server.initProcessors(cfg, morphClients) if err != nil { return nil, err } - // form morph container client's options - morphCnrOpts := make([]cntClient.Option, 0, 3) - morphCnrOpts = append(morphCnrOpts, - cntClient.TryNotary(), - cntClient.AsAlphabet(), - ) + server.initTimers(cfg, processors, morphClients) - if server.sideNotaryConfig.disabled { - // in non-notary environments we customize fee for named container registration - // because it takes much more additional GAS than other operations. - morphCnrOpts = append(morphCnrOpts, - cntClient.WithCustomFeeForNamedPut(server.feeConfig.NamedContainerRegistrationFee()), - ) - } - - cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, fee, morphCnrOpts...) + err = server.initGRPCServer(cfg) if err != nil { return nil, err } - server.netmapClient, err = nmClient.NewFromMorph(server.morphClient, server.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet()) - if err != nil { - return nil, err - } - - server.balanceClient, err = balanceClient.NewFromMorph(server.morphClient, server.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet()) - if err != nil { - return nil, err - } - - repClient, err := repClient.NewFromMorph(server.morphClient, server.contracts.reputation, fee, repClient.TryNotary(), repClient.AsAlphabet()) - if err != nil { - return nil, err - } - - frostfsIDClient, err := frostfsid.NewFromMorph(server.morphClient, server.contracts.frostfsID, fee, frostfsid.TryNotary(), frostfsid.AsAlphabet()) - if err != nil { - return nil, err - } - - frostfsCli, err := frostfsClient.NewFromMorph(server.mainnetClient, server.contracts.frostfs, - server.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet()) - if err != nil { - return nil, err - } - - // initialize morph client of Subnet contract - clientMode := morphsubnet.NotaryAlphabet - - if server.sideNotaryConfig.disabled { - clientMode = morphsubnet.NonNotary - } - - subnetInitPrm := morphsubnet.InitPrm{} - subnetInitPrm.SetBaseClient(server.morphClient) - subnetInitPrm.SetContractAddress(server.contracts.subnet) - subnetInitPrm.SetMode(clientMode) - - subnetClient := &morphsubnet.Client{} - err = subnetClient.Init(subnetInitPrm) - if err != nil { - return nil, fmt.Errorf("could not initialize subnet client: %w", err) - } - - var irf irFetcher - - if server.withoutMainNet || !server.mainNotaryConfig.disabled { - // if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs - // (naming `...WithNotary` will not always be correct) - irf = NewIRFetcherWithNotary(server.morphClient) - } else { - irf = NewIRFetcherWithoutNotary(server.netmapClient) - } - - server.statusIndex = newInnerRingIndexer( - server.morphClient, - irf, - server.key.PublicKey(), - cfg.GetDuration("indexer.cache_timeout"), - ) - - clientCache := newClientCache(&clientCacheParams{ - Log: log, - Key: &server.key.PrivateKey, - SGTimeout: cfg.GetDuration("audit.timeout.get"), - HeadTimeout: cfg.GetDuration("audit.timeout.head"), - RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"), - AllowExternal: cfg.GetBool("audit.allow_external"), - }) - - server.registerNoErrCloser(clientCache.cache.CloseAll) - - pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") - porPoolSize := cfg.GetInt("audit.por.pool_size") - - // create audit processor dependencies - auditTaskManager := audittask.New( - audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), - audittask.WithWorkerPool(auditPool), - audittask.WithLogger(log), - audittask.WithContainerCommunicator(clientCache), - audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")), - audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) { - return ants.NewPool(pdpPoolSize) - }), - audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) { - return ants.NewPool(porPoolSize) - }), - ) - - server.workers = append(server.workers, auditTaskManager.Listen) - - // create audit processor - auditProcessor, err := audit.New(&audit.Params{ - Log: log, - NetmapClient: server.netmapClient, - ContainerClient: cnrClient, - IRList: server, - EpochSource: server, - SGSource: clientCache, - Key: &server.key.PrivateKey, - RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), - TaskManager: auditTaskManager, - Reporter: server, - }) - if err != nil { - return nil, err - } - - // create settlement processor dependencies - settlementDeps := settlementDeps{ - log: server.log, - cnrSrc: cntClient.AsContainerSource(cnrClient), - auditClient: server.auditClient, - nmClient: server.netmapClient, - clientCache: clientCache, - balanceClient: server.balanceClient, - } - - settlementDeps.settlementCtx = auditSettlementContext - auditCalcDeps := &auditSettlementDeps{ - settlementDeps: settlementDeps, - } - - settlementDeps.settlementCtx = basicIncomeSettlementContext - basicSettlementDeps := &basicIncomeSettlementDeps{ - settlementDeps: settlementDeps, - cnrClient: cnrClient, - } - - auditSettlementCalc := auditSettlement.NewCalculator( - &auditSettlement.CalculatorPrm{ - ResultStorage: auditCalcDeps, - ContainerStorage: auditCalcDeps, - PlacementCalculator: auditCalcDeps, - SGStorage: auditCalcDeps, - AccountStorage: auditCalcDeps, - Exchanger: auditCalcDeps, - AuditFeeFetcher: server.netmapClient, - }, - auditSettlement.WithLogger(server.log), - ) - - // create settlement processor - settlementProcessor := settlement.New( - settlement.Prm{ - AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc), - BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps}, - State: server, - }, - settlement.WithLogger(server.log), - ) - - locodeValidator, err := server.newLocodeValidator(cfg) - if err != nil { - return nil, err - } - - subnetValidator, err := subnetvalidator.New( - subnetvalidator.Prm{ - SubnetClient: subnetClient, - }, - ) - if err != nil { - return nil, err - } - - var alphaSync event.Handler - - if server.withoutMainNet || cfg.GetBool("governance.disable") { - alphaSync = func(event.Event) { - log.Debug("alphabet keys sync is disabled") - } - } else { - // create governance processor - governanceProcessor, err := governance.New(&governance.Params{ - Log: log, - FrostFSClient: frostfsCli, - NetmapClient: server.netmapClient, - AlphabetState: server, - EpochState: server, - Voter: server, - IRFetcher: irf, - MorphClient: server.morphClient, - MainnetClient: server.mainnetClient, - NotaryDisabled: server.sideNotaryConfig.disabled, - }) - if err != nil { - return nil, err - } - - alphaSync = governanceProcessor.HandleAlphabetSync - err = bindMainnetProcessor(governanceProcessor, server) - if err != nil { - return nil, err - } - } - - netSettings := (*networkSettings)(server.netmapClient) - - var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator - netMapCandidateStateValidator.SetNetworkSettings(netSettings) - - // create netmap processor - server.netmapProcessor, err = netmap.New(&netmap.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.netmap"), - NetmapClient: server.netmapClient, - EpochTimer: server, - EpochState: server, - AlphabetState: server, - CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), - CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), - ContainerWrapper: cnrClient, - HandleAudit: server.onlyActiveEventHandler( - auditProcessor.StartAuditHandler(), - ), - NotaryDepositHandler: server.onlyAlphabetEventHandler( - server.notaryHandler, - ), - AuditSettlementsHandler: server.onlyAlphabetEventHandler( - settlementProcessor.HandleAuditEvent, - ), - AlphabetSyncHandler: alphaSync, - NodeValidator: nodevalidator.New( - &netMapCandidateStateValidator, - addrvalidator.New(), - locodeValidator, - subnetValidator, - ), - NotaryDisabled: server.sideNotaryConfig.disabled, - SubnetContract: &server.contracts.subnet, - - NodeStateSettings: netSettings, - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(server.netmapProcessor, server) - if err != nil { - return nil, err - } - - // container processor - containerProcessor, err := container.New(&container.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.container"), - AlphabetState: server, - ContainerClient: cnrClient, - FrostFSIDClient: frostfsIDClient, - NetworkState: server.netmapClient, - NotaryDisabled: server.sideNotaryConfig.disabled, - SubnetClient: subnetClient, - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(containerProcessor, server) - if err != nil { - return nil, err - } - - // create balance processor - balanceProcessor, err := balance.New(&balance.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.balance"), - FrostFSClient: frostfsCli, - BalanceSC: server.contracts.balance, - AlphabetState: server, - Converter: &server.precision, - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(balanceProcessor, server) - if err != nil { - return nil, err - } - - if !server.withoutMainNet { - // create mainnnet frostfs processor - frostfsProcessor, err := frostfs.New(&frostfs.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.frostfs"), - FrostFSContract: server.contracts.frostfs, - FrostFSIDClient: frostfsIDClient, - BalanceClient: server.balanceClient, - NetmapClient: server.netmapClient, - MorphClient: server.morphClient, - EpochState: server, - AlphabetState: server, - Converter: &server.precision, - MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"), - MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"), - MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")), - GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"), - }) - if err != nil { - return nil, err - } - - err = bindMainnetProcessor(frostfsProcessor, server) - if err != nil { - return nil, err - } - } - - parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) - if err != nil { - return nil, err - } - - // create alphabet processor - alphabetProcessor, err := alphabet.New(&alphabet.Params{ - ParsedWallets: parsedWallets, - Log: log, - PoolSize: cfg.GetInt("workers.alphabet"), - AlphabetContracts: server.contracts.alphabet, - NetmapClient: server.netmapClient, - MorphClient: server.morphClient, - IRList: server, - StorageEmission: cfg.GetUint64("emit.storage.amount"), - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(alphabetProcessor, server) - if err != nil { - return nil, err - } - - // create reputation processor - reputationProcessor, err := reputation.New(&reputation.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.reputation"), - EpochState: server, - AlphabetState: server, - ReputationWrapper: repClient, - ManagerBuilder: reputationcommon.NewManagerBuilder( - reputationcommon.ManagersPrm{ - NetMapSource: server.netmapClient, - }, - ), - NotaryDisabled: server.sideNotaryConfig.disabled, - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(reputationProcessor, server) - if err != nil { - return nil, err - } - - // initialize epoch timers - server.epochTimer = newEpochTimer(&epochTimerArgs{ - l: server.log, - newEpochHandlers: server.newEpochTickHandlers(), - cnrWrapper: cnrClient, - epoch: server, - stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"), - stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"), - collectBasicIncome: subEpochEventHandler{ - handler: settlementProcessor.HandleIncomeCollectionEvent, - durationMul: cfg.GetUint32("timers.collect_basic_income.mul"), - durationDiv: cfg.GetUint32("timers.collect_basic_income.div"), - }, - distributeBasicIncome: subEpochEventHandler{ - handler: settlementProcessor.HandleIncomeDistributionEvent, - durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"), - durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"), - }, - }) - - server.addBlockTimer(server.epochTimer) - - // initialize emission timer - emissionTimer := newEmissionTimer(&emitTimerArgs{ - ap: alphabetProcessor, - emitDuration: cfg.GetUint32("timers.emit"), - }) - - server.addBlockTimer(emissionTimer) - - controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") - if controlSvcEndpoint != "" { - authKeysStr := cfg.GetStringSlice("control.authorized_keys") - authKeys := make([][]byte, 0, len(authKeysStr)) - - for i := range authKeysStr { - key, err := hex.DecodeString(authKeysStr[i]) - if err != nil { - return nil, fmt.Errorf("could not parse Control authorized key %s: %w", - authKeysStr[i], - err, - ) - } - - authKeys = append(authKeys, key) - } - - var p controlsrv.Prm - - p.SetPrivateKey(*server.key) - p.SetHealthChecker(server) - - controlSvc := controlsrv.New(p, - controlsrv.WithAllowedKeys(authKeys), - ) - - grpcControlSrv := grpc.NewServer() - control.RegisterControlServiceServer(grpcControlSrv, controlSvc) - - server.runners = append(server.runners, func(ch chan<- error) error { - lis, err := net.Listen("tcp", controlSvcEndpoint) - if err != nil { - return err - } - - go func() { - ch <- grpcControlSrv.Serve(lis) - }() - return nil - }) - - server.registerNoErrCloser(grpcControlSrv.GracefulStop) - } else { - log.Info("no Control server endpoint specified, service is disabled") - } - server.initSubnet(subnetConfig{ queueSize: cfg.GetUint32("workers.subnet"), }) - if cfg.GetString("prometheus.address") != "" { - m := metrics.NewInnerRingMetrics() - server.metrics = &m - } + server.initMetrics(cfg) return server, nil }