package innerring import ( "context" "encoding/hex" "fmt" "net" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance" cont "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap" nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation" addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress" statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir" controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server" utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config" "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/spf13/viper" "go.uber.org/zap" "google.golang.org/grpc" ) func (s *Server) initNetmapProcessor(cfg *viper.Viper, cnrClient *container.Client, alphaSync event.Handler, ) error { locodeValidator, err := s.newLocodeValidator(cfg) if err != nil { return err } netSettings := (*networkSettings)(s.netmapClient) var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator netMapCandidateStateValidator.SetNetworkSettings(netSettings) s.netmapProcessor, err = netmap.New(&netmap.Params{ Log: s.log, Metrics: s.irMetrics, PoolSize: cfg.GetInt("workers.netmap"), NetmapClient: netmap.NewNetmapClient(s.netmapClient), EpochTimer: s, EpochState: s, AlphabetState: s, CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), ContainerWrapper: cnrClient, NotaryDepositHandler: s.onlyAlphabetEventHandler( s.notaryHandler, ), AlphabetSyncHandler: s.onlyAlphabetEventHandler( alphaSync, ), NodeValidator: nodevalidator.New( &netMapCandidateStateValidator, addrvalidator.New(), locodeValidator, ), NodeStateSettings: netSettings, }) if err != nil { return err } return bindMorphProcessor(s.netmapProcessor, s) } func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *chainParams, errChan chan<- error) error { s.withoutMainNet = cfg.GetBool("without_mainnet") if s.withoutMainNet { // This works as long as event Listener starts listening loop once, // otherwise Server.Start will run two similar routines. // This behavior most likely will not change. s.mainnetListener = s.morphListener s.mainnetClient = s.morphClient return nil } mainnetChain := morphChain mainnetChain.name = mainnetPrefix mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry} fromMainChainBlock, err := s.persistate.UInt32(persistateMainChainLastBlockKey) if err != nil { fromMainChainBlock = 0 s.log.Warn(logs.InnerringCantGetLastProcessedMainChainBlockNumber, zap.String("error", err.Error())) } mainnetChain.from = fromMainChainBlock // create mainnet client s.mainnetClient, err = createClient(ctx, mainnetChain, errChan) if err != nil { return err } // create mainnet listener s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain) return err } func (s *Server) enableNotarySupport() error { // enable notary support in the side client err := s.morphClient.EnableNotarySupport( client.WithProxyContract(s.contracts.proxy), ) if err != nil { return fmt.Errorf("could not enable side chain notary support: %w", err) } s.morphListener.EnableNotarySupport(s.contracts.proxy, s.morphClient.Committee, s.morphClient) if !s.mainNotaryConfig.disabled { // enable notary support in the main client err := s.mainnetClient.EnableNotarySupport( client.WithProxyContract(s.contracts.processing), client.WithAlphabetSource(s.morphClient.Committee), ) if err != nil { return fmt.Errorf("could not enable main chain notary support: %w", err) } } return nil } func (s *Server) initNotaryConfig() { s.mainNotaryConfig = notaryConfigs( !s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too ) s.log.Info(logs.InnerringNotarySupport, zap.Bool("sidechain_enabled", true), zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled), ) } func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Client, irf irFetcher) (event.Handler, error) { var alphaSync event.Handler if s.withoutMainNet || cfg.GetBool("governance.disable") { alphaSync = func(event.Event) { s.log.Debug(logs.InnerringAlphabetKeysSyncIsDisabled) } } else { // create governance processor governanceProcessor, err := governance.New(&governance.Params{ Log: s.log, Metrics: s.irMetrics, FrostFSClient: frostfsCli, NetmapClient: s.netmapClient, AlphabetState: s, EpochState: s, Voter: s, IRFetcher: irf, MorphClient: s.morphClient, MainnetClient: s.mainnetClient, }) if err != nil { return nil, err } alphaSync = governanceProcessor.HandleAlphabetSync err = bindMainnetProcessor(governanceProcessor, s) if err != nil { return nil, err } } return alphaSync, nil } func (s *Server) createIRFetcher() irFetcher { var irf irFetcher if s.withoutMainNet || !s.mainNotaryConfig.disabled { // if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs // (naming `...WithNotary` will not always be correct) irf = NewIRFetcherWithNotary(s.morphClient) } else { irf = NewIRFetcherWithoutNotary(s.netmapClient) } return irf } func (s *Server) initTimers(cfg *viper.Viper, morphClients *serverMorphClients) { s.epochTimer = newEpochTimer(&epochTimerArgs{ l: s.log, alphabetState: s, newEpochHandlers: s.newEpochTickHandlers(), cnrWrapper: morphClients.CnrClient, epoch: s, stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"), stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"), }) s.addBlockTimer(s.epochTimer) // initialize emission timer emissionTimer := newEmissionTimer(&emitTimerArgs{ ap: s.alphabetProcessor, emitDuration: cfg.GetUint32("timers.emit"), }) s.addBlockTimer(emissionTimer) } func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error { parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) if err != nil { return err } // create alphabet processor s.alphabetProcessor, err = alphabet.New(&alphabet.Params{ ParsedWallets: parsedWallets, Log: s.log, Metrics: s.irMetrics, PoolSize: cfg.GetInt("workers.alphabet"), AlphabetContracts: s.contracts.alphabet, NetmapClient: s.netmapClient, MorphClient: s.morphClient, IRList: s, StorageEmission: cfg.GetUint64("emit.storage.amount"), }) if err != nil { return err } err = bindMorphProcessor(s.alphabetProcessor, s) return err } func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client, frostfsIDClient *frostfsid.Client) error { // container processor containerProcessor, err := cont.New(&cont.Params{ Log: s.log, Metrics: s.irMetrics, PoolSize: cfg.GetInt("workers.container"), AlphabetState: s, ContainerClient: cnrClient, MorphClient: cnrClient.Morph(), FrostFSIDClient: frostfsIDClient, NetworkState: s.netmapClient, }) if err != nil { return err } return bindMorphProcessor(containerProcessor, s) } func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClient.Client) error { // create balance processor balanceProcessor, err := balance.New(&balance.Params{ Log: s.log, Metrics: s.irMetrics, PoolSize: cfg.GetInt("workers.balance"), FrostFSClient: frostfsCli, BalanceSC: s.contracts.balance, AlphabetState: s, Converter: &s.precision, }) if err != nil { return err } return bindMorphProcessor(balanceProcessor, s) } func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper) error { if s.withoutMainNet { return nil } frostfsProcessor, err := frostfs.New(&frostfs.Params{ Log: s.log, Metrics: s.irMetrics, PoolSize: cfg.GetInt("workers.frostfs"), FrostFSContract: s.contracts.frostfs, BalanceClient: s.balanceClient, NetmapClient: s.netmapClient, MorphClient: s.morphClient, EpochState: s, AlphabetState: s, Converter: &s.precision, MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"), MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"), MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")), GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"), }) if err != nil { return err } return bindMainnetProcessor(frostfsProcessor, s) } func (s *Server) initGRPCServer(cfg *viper.Viper) error { controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") if controlSvcEndpoint == "" { s.log.Info(logs.InnerringNoControlServerEndpointSpecified) return nil } authKeysStr := cfg.GetStringSlice("control.authorized_keys") authKeys := make([][]byte, 0, len(authKeysStr)) for i := range authKeysStr { key, err := hex.DecodeString(authKeysStr[i]) if err != nil { return fmt.Errorf("could not parse Control authorized key %s: %w", authKeysStr[i], err, ) } authKeys = append(authKeys, key) } var p controlsrv.Prm p.SetPrivateKey(*s.key) p.SetHealthChecker(s) controlSvc := controlsrv.New(p, s.netmapClient, s.containerClient, controlsrv.WithAllowedKeys(authKeys), ) grpcControlSrv := grpc.NewServer() control.RegisterControlServiceServer(grpcControlSrv, controlSvc) s.runners = append(s.runners, func(ch chan<- error) error { lis, err := net.Listen("tcp", controlSvcEndpoint) if err != nil { return err } go func() { ch <- grpcControlSrv.Serve(lis) }() return nil }) s.registerNoErrCloser(grpcControlSrv.GracefulStop) return nil } type serverMorphClients struct { CnrClient *container.Client FrostFSIDClient *frostfsid.Client FrostFSClient *frostfsClient.Client } func (s *Server) initClientsFromMorph() (*serverMorphClients, error) { result := &serverMorphClients{} var err error fee := s.feeConfig.SideChainFee() // form morph container client's options morphCnrOpts := make([]container.Option, 0, 3) morphCnrOpts = append(morphCnrOpts, container.TryNotary(), container.AsAlphabet(), ) result.CnrClient, err = container.NewFromMorph(s.morphClient, s.contracts.container, fee, morphCnrOpts...) if err != nil { return nil, err } s.containerClient = result.CnrClient s.netmapClient, err = nmClient.NewFromMorph(s.morphClient, s.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet()) if err != nil { return nil, err } s.balanceClient, err = balanceClient.NewFromMorph(s.morphClient, s.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet()) if err != nil { return nil, err } result.FrostFSIDClient, err = frostfsid.NewFromMorph(s.morphClient, s.contracts.frostfsID, fee) if err != nil { return nil, err } result.FrostFSClient, err = frostfsClient.NewFromMorph(s.mainnetClient, s.contracts.frostfs, s.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet()) if err != nil { return nil, err } return result, nil } func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) error { irf := s.createIRFetcher() s.statusIndex = newInnerRingIndexer( s.morphClient, irf, s.key.PublicKey(), cfg.GetDuration("indexer.cache_timeout"), ) alphaSync, err := s.createAlphaSync(cfg, morphClients.FrostFSClient, irf) if err != nil { return err } err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync) if err != nil { return err } err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient) if err != nil { return err } err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient) if err != nil { return err } err = s.initFrostFSMainnetProcessor(cfg) if err != nil { return err } err = s.initAlphabetProcessor(cfg) return err } func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) { fromSideChainBlock, err := s.persistate.UInt32(persistateSideChainLastBlockKey) if err != nil { fromSideChainBlock = 0 s.log.Warn(logs.InnerringCantGetLastProcessedSideChainBlockNumber, zap.String("error", err.Error())) } morphChain := &chainParams{ log: s.log, cfg: cfg, key: s.key, name: morphPrefix, from: fromSideChainBlock, morphCacheMetric: s.irMetrics.MorphCacheMetrics(), cmode: s.cmode, } // create morph client s.morphClient, err = createClient(ctx, morphChain, errChan) if err != nil { return nil, err } // create morph listener s.morphListener, err = createListener(ctx, s.morphClient, morphChain) if err != nil { return nil, err } if err := s.morphClient.SetGroupSignerScope(); err != nil { morphChain.log.Info(logs.InnerringFailedToSetGroupSignerScope, zap.Error(err)) } return morphChain, nil } func (s *Server) initContracts(cfg *viper.Viper) error { var err error // get all script hashes of contracts s.contracts, err = parseContracts( cfg, s.morphClient, s.withoutMainNet, s.mainNotaryConfig.disabled, ) return err } func (s *Server) initKey(cfg *viper.Viper) error { // prepare inner ring node private key acc, err := utilConfig.LoadAccount( cfg.GetString("wallet.path"), cfg.GetString("wallet.address"), cfg.GetString("wallet.password")) if err != nil { return fmt.Errorf("ir: %w", err) } s.key = acc.PrivateKey() return nil }