From a24931205f4013c75f32a356a665ba3f495b19bc Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 29 Mar 2023 11:41:30 +0300 Subject: [PATCH 1/8] [#185] ir: Resolve funlen linter Resolve funlen linter for GlagoliticLetter.String method Signed-off-by: Dmitrii Stepanov --- pkg/innerring/alphabet.go | 134 +++++++++++++------------------------- 1 file changed, 47 insertions(+), 87 deletions(-) diff --git a/pkg/innerring/alphabet.go b/pkg/innerring/alphabet.go index 518c18191..78930db7d 100644 --- a/pkg/innerring/alphabet.go +++ b/pkg/innerring/alphabet.go @@ -52,96 +52,56 @@ const ( lastLetterNum ) +var glagolicLetterToString = map[GlagoliticLetter]string{ + az: "az", + buky: "buky", + vedi: "vedi", + glagoli: "glagoli", + dobro: "dobro", + yest: "yest", + zhivete: "zhivete", + dzelo: "dzelo", + zemlja: "zemlja", + izhe: "izhe", + izhei: "izhei", + gerv: "gerv", + kako: "kako", + ljudi: "ljudi", + mislete: "mislete", + nash: "nash", + on: "on", + pokoj: "pokoj", + rtsi: "rtsi", + slovo: "slovo", + tverdo: "tverdo", + uk: "uk", + fert: "fert", + kher: "kher", + oht: "oht", + shta: "shta", + tsi: "tsi", + cherv: "cherv", + sha: "sha", + yer: "yer", + yeri: "yeri", + yerj: "yerj", + yat: "yat", + jo: "jo", + yu: "yu", + smallYus: "small.yus", + smallIotatedYus: "small.iotated.yus", + bigYus: "big.yus", + bigIotatedYus: "big.iotated.yus", + fita: "fita", + izhitsa: "izhitsa", +} + // String returns l in config-compatible format. -// -// nolint: funlen func (l GlagoliticLetter) String() string { - switch l { - default: - return "unknown" - case az: - return "az" - case buky: - return "buky" - case vedi: - return "vedi" - case glagoli: - return "glagoli" - case dobro: - return "dobro" - case yest: - return "yest" - case zhivete: - return "zhivete" - case dzelo: - return "dzelo" - case zemlja: - return "zemlja" - case izhe: - return "izhe" - case izhei: - return "izhei" - case gerv: - return "gerv" - case kako: - return "kako" - case ljudi: - return "ljudi" - case mislete: - return "mislete" - case nash: - return "nash" - case on: - return "on" - case pokoj: - return "pokoj" - case rtsi: - return "rtsi" - case slovo: - return "slovo" - case tverdo: - return "tverdo" - case uk: - return "uk" - case fert: - return "fert" - case kher: - return "kher" - case oht: - return "oht" - case shta: - return "shta" - case tsi: - return "tsi" - case cherv: - return "cherv" - case sha: - return "sha" - case yer: - return "yer" - case yeri: - return "yeri" - case yerj: - return "yerj" - case yat: - return "yat" - case jo: - return "jo" - case yu: - return "yu" - case smallYus: - return "small.yus" - case smallIotatedYus: - return "small.iotated.yus" - case bigYus: - return "big.yus" - case bigIotatedYus: - return "big.iotated.yus" - case fita: - return "fita" - case izhitsa: - return "izhitsa" + if str, found := glagolicLetterToString[l]; found { + return str } + return "unknown" } type alphabetContracts map[GlagoliticLetter]util.Uint160 -- 2.45.3 From fe822dda3f1329634021e37feee14543e5bc4b4e Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 29 Mar 2023 12:20:22 +0300 Subject: [PATCH 2/8] [#185] ir: Refactor ir start Resolve funlen linter for Server.Start method Signed-off-by: Dmitrii Stepanov --- pkg/innerring/innerring.go | 153 ++++++++++++++++++++++--------------- 1 file changed, 93 insertions(+), 60 deletions(-) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 1c2d797ef..8f41c267b 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -158,8 +158,6 @@ var ( ) // Start runs all event providers. -// -// nolint: funlen func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { s.setHealthStatus(control.HealthStatus_STARTING) defer func() { @@ -168,10 +166,9 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { } }() - for _, starter := range s.starters { - if err := starter(); err != nil { - return err - } + err = s.launchStarters() + if err != nil { + return err } err = s.initConfigFromBlockchain() @@ -179,26 +176,14 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { return err } - if !s.mainNotaryConfig.disabled { - err = s.initNotary(ctx, - s.depositMainNotary, - s.awaitMainNotaryDeposit, - "waiting to accept main notary deposit", - ) - if err != nil { - return err - } + err = s.initMainNotary(ctx) + if err != nil { + return err } - if !s.sideNotaryConfig.disabled { - err = s.initNotary(ctx, - s.depositSideNotary, - s.awaitSideNotaryDeposit, - "waiting to accept side notary deposit", - ) - if err != nil { - return err - } + err = s.initSideNotary(ctx) + if err != nil { + return err } prm := governance.VoteValidatorPrm{} @@ -212,13 +197,7 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { zap.String("error", err.Error())) } - // tick initial epoch - initialEpochTicker := timer.NewOneTickTimer( - timer.StaticBlockMeter(s.initialEpochTickDelta), - func() { - s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{}) - }) - s.addBlockTimer(initialEpochTicker) + s.tickInitialExpoch() morphErr := make(chan error) mainnnetErr := make(chan error) @@ -235,36 +214,11 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { } }() - s.morphListener.RegisterBlockHandler(func(b *block.Block) { - s.log.Debug("new block", - zap.Uint32("index", b.Index), - ) + s.registerMorphNewBlockEventHandler() + s.registerMainnetNewBlockEventHandler() - err = s.persistate.SetUInt32(persistateSideChainLastBlockKey, b.Index) - if err != nil { - s.log.Warn("can't update persistent state", - zap.String("chain", "side"), - zap.Uint32("block_index", b.Index)) - } - - s.tickTimers(b.Index) - }) - - if !s.withoutMainNet { - s.mainnetListener.RegisterBlockHandler(func(b *block.Block) { - err = s.persistate.SetUInt32(persistateMainChainLastBlockKey, b.Index) - if err != nil { - s.log.Warn("can't update persistent state", - zap.String("chain", "main"), - zap.Uint32("block_index", b.Index)) - } - }) - } - - for _, runner := range s.runners { - if err := runner(intError); err != nil { - return err - } + if err := s.startRunners(intError); err != nil { + return err } go s.morphListener.ListenWithError(ctx, morphErr) // listen for neo:morph events @@ -279,6 +233,85 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { return nil } +func (s *Server) registerMorphNewBlockEventHandler() { + s.morphListener.RegisterBlockHandler(func(b *block.Block) { + s.log.Debug("new block", + zap.Uint32("index", b.Index), + ) + + err := s.persistate.SetUInt32(persistateSideChainLastBlockKey, b.Index) + if err != nil { + s.log.Warn("can't update persistent state", + zap.String("chain", "side"), + zap.Uint32("block_index", b.Index)) + } + + s.tickTimers(b.Index) + }) +} + +func (s *Server) registerMainnetNewBlockEventHandler() { + if !s.withoutMainNet { + s.mainnetListener.RegisterBlockHandler(func(b *block.Block) { + err := s.persistate.SetUInt32(persistateMainChainLastBlockKey, b.Index) + if err != nil { + s.log.Warn("can't update persistent state", + zap.String("chain", "main"), + zap.Uint32("block_index", b.Index)) + } + }) + } +} + +func (s *Server) startRunners(errCh chan<- error) error { + for _, runner := range s.runners { + if err := runner(errCh); err != nil { + return err + } + } + return nil +} + +func (s *Server) launchStarters() error { + for _, starter := range s.starters { + if err := starter(); err != nil { + return err + } + } + return nil +} + +func (s *Server) initMainNotary(ctx context.Context) error { + if !s.mainNotaryConfig.disabled { + return s.initNotary(ctx, + s.depositMainNotary, + s.awaitMainNotaryDeposit, + "waiting to accept main notary deposit", + ) + } + return nil +} + +func (s *Server) initSideNotary(ctx context.Context) error { + if !s.sideNotaryConfig.disabled { + return s.initNotary(ctx, + s.depositSideNotary, + s.awaitSideNotaryDeposit, + "waiting to accept side notary deposit", + ) + } + return nil +} + +func (s *Server) tickInitialExpoch() { + initialEpochTicker := timer.NewOneTickTimer( + timer.StaticBlockMeter(s.initialEpochTickDelta), + func() { + s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{}) + }) + s.addBlockTimer(initialEpochTicker) +} + func (s *Server) startWorkers(ctx context.Context) { for _, w := range s.workers { go w(ctx) -- 2.45.3 From 3c1796a3d15571092a2ce5e071c44052ad59fd00 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 29 Mar 2023 16:54:15 +0300 Subject: [PATCH 3/8] [#185] ir: Refactor ir service creation Resolve funlen linter for New function Signed-off-by: Dmitrii Stepanov --- pkg/innerring/initialization.go | 756 ++++++++++++++++++++++++++++++++ pkg/innerring/innerring.go | 609 +------------------------ 2 files changed, 775 insertions(+), 590 deletions(-) create mode 100644 pkg/innerring/initialization.go diff --git a/pkg/innerring/initialization.go b/pkg/innerring/initialization.go new file mode 100644 index 000000000..31a1bcd60 --- /dev/null +++ b/pkg/innerring/initialization.go @@ -0,0 +1,756 @@ +package innerring + +import ( + "context" + "encoding/hex" + "fmt" + "net" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance" + cont "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container" + frostfs "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap" + nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation" + addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress" + statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state" + subnetvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/subnet" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/reputation" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement" + auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" + auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit" + balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" + cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" + frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" + nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" + repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation" + morphsubnet "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/subnet" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" + audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager" + control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir" + controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server" + reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common" + util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" + utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config" + "github.com/nspcc-dev/neo-go/pkg/core/transaction" + "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" + "github.com/panjf2000/ants/v2" + "github.com/spf13/viper" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +func (s *Server) initNetmapProcessor(cfg *viper.Viper, + cnrClient *container.Client, + alphaSync event.Handler, + subnetClient *morphsubnet.Client, + auditProcessor *audit.Processor, + settlementProcessor *settlement.Processor) error { + locodeValidator, err := s.newLocodeValidator(cfg) + if err != nil { + return err + } + + subnetValidator, err := subnetvalidator.New( + subnetvalidator.Prm{ + SubnetClient: subnetClient, + }, + ) + if err != nil { + return err + } + + netSettings := (*networkSettings)(s.netmapClient) + + var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator + netMapCandidateStateValidator.SetNetworkSettings(netSettings) + + s.netmapProcessor, err = netmap.New(&netmap.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.netmap"), + NetmapClient: s.netmapClient, + EpochTimer: s, + EpochState: s, + AlphabetState: s, + CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), + CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), + ContainerWrapper: cnrClient, + HandleAudit: s.onlyActiveEventHandler( + auditProcessor.StartAuditHandler(), + ), + NotaryDepositHandler: s.onlyAlphabetEventHandler( + s.notaryHandler, + ), + AuditSettlementsHandler: s.onlyAlphabetEventHandler( + settlementProcessor.HandleAuditEvent, + ), + AlphabetSyncHandler: alphaSync, + NodeValidator: nodevalidator.New( + &netMapCandidateStateValidator, + addrvalidator.New(), + locodeValidator, + subnetValidator, + ), + NotaryDisabled: s.sideNotaryConfig.disabled, + SubnetContract: &s.contracts.subnet, + + NodeStateSettings: netSettings, + }) + + if err != nil { + return err + } + + return bindMorphProcessor(s.netmapProcessor, s) +} + +func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *chainParams, errChan chan<- error) error { + s.withoutMainNet = cfg.GetBool("without_mainnet") + if s.withoutMainNet { + // This works as long as event Listener starts listening loop once, + // otherwise Server.Start will run two similar routines. + // This behavior most likely will not change. + s.mainnetListener = s.morphListener + s.mainnetClient = s.morphClient + return nil + } + + mainnetChain := morphChain + mainnetChain.name = mainnetPrefix + mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry} + + fromMainChainBlock, err := s.persistate.UInt32(persistateMainChainLastBlockKey) + if err != nil { + fromMainChainBlock = 0 + s.log.Warn("can't get last processed main chain block number", zap.String("error", err.Error())) + } + mainnetChain.from = fromMainChainBlock + + // create mainnet client + s.mainnetClient, err = createClient(ctx, mainnetChain, errChan) + if err != nil { + return err + } + + // create mainnet listener + s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain) + return err +} + +func (s *Server) enableNotarySupport() error { + if !s.sideNotaryConfig.disabled { + // enable notary support in the side client + err := s.morphClient.EnableNotarySupport( + client.WithProxyContract(s.contracts.proxy), + ) + if err != nil { + return fmt.Errorf("could not enable side chain notary support: %w", err) + } + + s.morphListener.EnableNotarySupport(s.contracts.proxy, s.morphClient.Committee, s.morphClient) + } + + if !s.mainNotaryConfig.disabled { + // enable notary support in the main client + err := s.mainnetClient.EnableNotarySupport( + client.WithProxyContract(s.contracts.processing), + client.WithAlphabetSource(s.morphClient.Committee), + ) + if err != nil { + return fmt.Errorf("could not enable main chain notary support: %w", err) + } + } + + return nil +} + +func (s *Server) initNotaryConfig(cfg *viper.Viper) { + s.mainNotaryConfig, s.sideNotaryConfig = notaryConfigs( + s.morphClient.ProbeNotary(), + !s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too + ) + + s.log.Info("notary support", + zap.Bool("sidechain_enabled", !s.sideNotaryConfig.disabled), + zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled), + ) +} + +func (s *Server) createAuditProcessor(cfg *viper.Viper, clientCache *ClientCache, cnrClient *container.Client) (*audit.Processor, error) { + auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size")) + if err != nil { + return nil, err + } + + pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") + porPoolSize := cfg.GetInt("audit.por.pool_size") + + // create audit processor dependencies + auditTaskManager := audittask.New( + audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), + audittask.WithWorkerPool(auditPool), + audittask.WithLogger(s.log), + audittask.WithContainerCommunicator(clientCache), + audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")), + audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) { + return ants.NewPool(pdpPoolSize) + }), + audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) { + return ants.NewPool(porPoolSize) + }), + ) + + s.workers = append(s.workers, auditTaskManager.Listen) + + // create audit processor + return audit.New(&audit.Params{ + Log: s.log, + NetmapClient: s.netmapClient, + ContainerClient: cnrClient, + IRList: s, + EpochSource: s, + SGSource: clientCache, + Key: &s.key.PrivateKey, + RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), + TaskManager: auditTaskManager, + Reporter: s, + }) +} + +func (s *Server) createSettlementProcessor(clientCache *ClientCache, cnrClient *container.Client) *settlement.Processor { + // create settlement processor dependencies + settlementDeps := settlementDeps{ + log: s.log, + cnrSrc: cntClient.AsContainerSource(cnrClient), + auditClient: s.auditClient, + nmClient: s.netmapClient, + clientCache: clientCache, + balanceClient: s.balanceClient, + } + + settlementDeps.settlementCtx = auditSettlementContext + auditCalcDeps := &auditSettlementDeps{ + settlementDeps: settlementDeps, + } + + settlementDeps.settlementCtx = basicIncomeSettlementContext + basicSettlementDeps := &basicIncomeSettlementDeps{ + settlementDeps: settlementDeps, + cnrClient: cnrClient, + } + + auditSettlementCalc := auditSettlement.NewCalculator( + &auditSettlement.CalculatorPrm{ + ResultStorage: auditCalcDeps, + ContainerStorage: auditCalcDeps, + PlacementCalculator: auditCalcDeps, + SGStorage: auditCalcDeps, + AccountStorage: auditCalcDeps, + Exchanger: auditCalcDeps, + AuditFeeFetcher: s.netmapClient, + }, + auditSettlement.WithLogger(s.log), + ) + + // create settlement processor + return settlement.New( + settlement.Prm{ + AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc), + BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps}, + State: s, + }, + settlement.WithLogger(s.log), + ) +} + +func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Client, irf irFetcher) (event.Handler, error) { + var alphaSync event.Handler + + if s.withoutMainNet || cfg.GetBool("governance.disable") { + alphaSync = func(event.Event) { + s.log.Debug("alphabet keys sync is disabled") + } + } else { + // create governance processor + governanceProcessor, err := governance.New(&governance.Params{ + Log: s.log, + FrostFSClient: frostfsCli, + NetmapClient: s.netmapClient, + AlphabetState: s, + EpochState: s, + Voter: s, + IRFetcher: irf, + MorphClient: s.morphClient, + MainnetClient: s.mainnetClient, + NotaryDisabled: s.sideNotaryConfig.disabled, + }) + if err != nil { + return nil, err + } + + alphaSync = governanceProcessor.HandleAlphabetSync + err = bindMainnetProcessor(governanceProcessor, s) + if err != nil { + return nil, err + } + } + + return alphaSync, nil +} + +func (s *Server) createIRFetcher() irFetcher { + var irf irFetcher + + if s.withoutMainNet || !s.mainNotaryConfig.disabled { + // if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs + // (naming `...WithNotary` will not always be correct) + irf = NewIRFetcherWithNotary(s.morphClient) + } else { + irf = NewIRFetcherWithoutNotary(s.netmapClient) + } + + return irf +} + +func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) { + s.epochTimer = newEpochTimer(&epochTimerArgs{ + l: s.log, + newEpochHandlers: s.newEpochTickHandlers(), + cnrWrapper: morphClients.CnrClient, + epoch: s, + stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"), + stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"), + collectBasicIncome: subEpochEventHandler{ + handler: processors.SettlementProcessor.HandleIncomeCollectionEvent, + durationMul: cfg.GetUint32("timers.collect_basic_income.mul"), + durationDiv: cfg.GetUint32("timers.collect_basic_income.div"), + }, + distributeBasicIncome: subEpochEventHandler{ + handler: processors.SettlementProcessor.HandleIncomeDistributionEvent, + durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"), + durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"), + }, + }) + + s.addBlockTimer(s.epochTimer) + + // initialize emission timer + emissionTimer := newEmissionTimer(&emitTimerArgs{ + ap: processors.AlphabetProcessor, + emitDuration: cfg.GetUint32("timers.emit"), + }) + + s.addBlockTimer(emissionTimer) +} + +func (s *Server) createMorphSubnetClient() (*morphsubnet.Client, error) { + // initialize morph client of Subnet contract + clientMode := morphsubnet.NotaryAlphabet + + if s.sideNotaryConfig.disabled { + clientMode = morphsubnet.NonNotary + } + + subnetInitPrm := morphsubnet.InitPrm{} + subnetInitPrm.SetBaseClient(s.morphClient) + subnetInitPrm.SetContractAddress(s.contracts.subnet) + subnetInitPrm.SetMode(clientMode) + + subnetClient := &morphsubnet.Client{} + err := subnetClient.Init(subnetInitPrm) + if err != nil { + return nil, fmt.Errorf("could not initialize subnet client: %w", err) + } + return subnetClient, nil +} + +func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) { + parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) + if err != nil { + return nil, err + } + + // create alphabet processor + alphabetProcessor, err := alphabet.New(&alphabet.Params{ + ParsedWallets: parsedWallets, + Log: s.log, + PoolSize: cfg.GetInt("workers.alphabet"), + AlphabetContracts: s.contracts.alphabet, + NetmapClient: s.netmapClient, + MorphClient: s.morphClient, + IRList: s, + StorageEmission: cfg.GetUint64("emit.storage.amount"), + }) + if err != nil { + return nil, err + } + + err = bindMorphProcessor(alphabetProcessor, s) + if err != nil { + return nil, err + } + + return alphabetProcessor, nil +} + +func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client, + frostfsIDClient *frostfsid.Client, subnetClient *morphsubnet.Client) error { + // container processor + containerProcessor, err := cont.New(&cont.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.container"), + AlphabetState: s, + ContainerClient: cnrClient, + FrostFSIDClient: frostfsIDClient, + NetworkState: s.netmapClient, + NotaryDisabled: s.sideNotaryConfig.disabled, + SubnetClient: subnetClient, + }) + if err != nil { + return err + } + + return bindMorphProcessor(containerProcessor, s) +} + +func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClient.Client) error { + // create balance processor + balanceProcessor, err := balance.New(&balance.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.balance"), + FrostFSClient: frostfsCli, + BalanceSC: s.contracts.balance, + AlphabetState: s, + Converter: &s.precision, + }) + if err != nil { + return err + } + + return bindMorphProcessor(balanceProcessor, s) +} + +func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *frostfsid.Client) error { + if s.withoutMainNet { + return nil + } + + frostfsProcessor, err := frostfs.New(&frostfs.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.frostfs"), + FrostFSContract: s.contracts.frostfs, + FrostFSIDClient: frostfsIDClient, + BalanceClient: s.balanceClient, + NetmapClient: s.netmapClient, + MorphClient: s.morphClient, + EpochState: s, + AlphabetState: s, + Converter: &s.precision, + MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"), + MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"), + MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")), + GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"), + }) + if err != nil { + return err + } + + return bindMainnetProcessor(frostfsProcessor, s) +} + +func (s *Server) initReputationProcessor(cfg *viper.Viper, sidechainFee fixedn.Fixed8) error { + repClient, err := repClient.NewFromMorph(s.morphClient, s.contracts.reputation, sidechainFee, repClient.TryNotary(), repClient.AsAlphabet()) + if err != nil { + return err + } + + // create reputation processor + reputationProcessor, err := reputation.New(&reputation.Params{ + Log: s.log, + PoolSize: cfg.GetInt("workers.reputation"), + EpochState: s, + AlphabetState: s, + ReputationWrapper: repClient, + ManagerBuilder: reputationcommon.NewManagerBuilder( + reputationcommon.ManagersPrm{ + NetMapSource: s.netmapClient, + }, + ), + NotaryDisabled: s.sideNotaryConfig.disabled, + }) + if err != nil { + return err + } + + return bindMorphProcessor(reputationProcessor, s) +} + +func (s *Server) initGRPCServer(cfg *viper.Viper) error { + controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") + if controlSvcEndpoint == "" { + s.log.Info("no Control server endpoint specified, service is disabled") + return nil + } + + authKeysStr := cfg.GetStringSlice("control.authorized_keys") + authKeys := make([][]byte, 0, len(authKeysStr)) + + for i := range authKeysStr { + key, err := hex.DecodeString(authKeysStr[i]) + if err != nil { + return fmt.Errorf("could not parse Control authorized key %s: %w", + authKeysStr[i], + err, + ) + } + + authKeys = append(authKeys, key) + } + + var p controlsrv.Prm + + p.SetPrivateKey(*s.key) + p.SetHealthChecker(s) + + controlSvc := controlsrv.New(p, + controlsrv.WithAllowedKeys(authKeys), + ) + + grpcControlSrv := grpc.NewServer() + control.RegisterControlServiceServer(grpcControlSrv, controlSvc) + + s.runners = append(s.runners, func(ch chan<- error) error { + lis, err := net.Listen("tcp", controlSvcEndpoint) + if err != nil { + return err + } + + go func() { + ch <- grpcControlSrv.Serve(lis) + }() + return nil + }) + + s.registerNoErrCloser(grpcControlSrv.GracefulStop) + return nil +} + +type serverMorphClients struct { + CnrClient *cntClient.Client + FrostFSIDClient *frostfsid.Client + FrostFSClient *frostfsClient.Client + MorphSubnetClient *morphsubnet.Client +} + +func (s *Server) initClientsFromMorph() (*serverMorphClients, error) { + result := &serverMorphClients{} + var err error + + fee := s.feeConfig.SideChainFee() + // do not use TryNotary() in audit wrapper + // audit operations do not require multisignatures + s.auditClient, err = auditClient.NewFromMorph(s.morphClient, s.contracts.audit, fee) + if err != nil { + return nil, err + } + + // form morph container client's options + morphCnrOpts := make([]cntClient.Option, 0, 3) + morphCnrOpts = append(morphCnrOpts, + cntClient.TryNotary(), + cntClient.AsAlphabet(), + ) + + if s.sideNotaryConfig.disabled { + // in non-notary environments we customize fee for named container registration + // because it takes much more additional GAS than other operations. + morphCnrOpts = append(morphCnrOpts, + cntClient.WithCustomFeeForNamedPut(s.feeConfig.NamedContainerRegistrationFee()), + ) + } + + result.CnrClient, err = cntClient.NewFromMorph(s.morphClient, s.contracts.container, fee, morphCnrOpts...) + if err != nil { + return nil, err + } + + s.netmapClient, err = nmClient.NewFromMorph(s.morphClient, s.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet()) + if err != nil { + return nil, err + } + + s.balanceClient, err = balanceClient.NewFromMorph(s.morphClient, s.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet()) + if err != nil { + return nil, err + } + + result.FrostFSIDClient, err = frostfsid.NewFromMorph(s.morphClient, s.contracts.frostfsID, fee, frostfsid.TryNotary(), frostfsid.AsAlphabet()) + if err != nil { + return nil, err + } + + result.FrostFSClient, err = frostfsClient.NewFromMorph(s.mainnetClient, s.contracts.frostfs, + s.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet()) + if err != nil { + return nil, err + } + + result.MorphSubnetClient, err = s.createMorphSubnetClient() + if err != nil { + return nil, err + } + + return result, nil +} + +type serverProcessors struct { + AlphabetProcessor *alphabet.Processor + SettlementProcessor *settlement.Processor +} + +func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) { + result := &serverProcessors{} + + fee := s.feeConfig.SideChainFee() + + irf := s.createIRFetcher() + + s.statusIndex = newInnerRingIndexer( + s.morphClient, + irf, + s.key.PublicKey(), + cfg.GetDuration("indexer.cache_timeout"), + ) + + clientCache := newClientCache(&clientCacheParams{ + Log: s.log, + Key: &s.key.PrivateKey, + SGTimeout: cfg.GetDuration("audit.timeout.get"), + HeadTimeout: cfg.GetDuration("audit.timeout.head"), + RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"), + AllowExternal: cfg.GetBool("audit.allow_external"), + }) + + s.registerNoErrCloser(clientCache.cache.CloseAll) + + // create audit processor + auditProcessor, err := s.createAuditProcessor(cfg, clientCache, morphClients.CnrClient) + if err != nil { + return nil, err + } + + result.SettlementProcessor = s.createSettlementProcessor(clientCache, morphClients.CnrClient) + + var alphaSync event.Handler + alphaSync, err = s.createAlphaSync(cfg, morphClients.FrostFSClient, irf) + if err != nil { + return nil, err + } + + err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync, morphClients.MorphSubnetClient, auditProcessor, result.SettlementProcessor) + if err != nil { + return nil, err + } + + err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient, morphClients.MorphSubnetClient) + if err != nil { + return nil, err + } + + err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient) + if err != nil { + return nil, err + } + + err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient) + if err != nil { + return nil, err + } + + result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg) + if err != nil { + return nil, err + } + + err = s.initReputationProcessor(cfg, fee) + if err != nil { + return nil, err + } + + return result, nil +} + +func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) { + fromSideChainBlock, err := s.persistate.UInt32(persistateSideChainLastBlockKey) + if err != nil { + fromSideChainBlock = 0 + s.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) + } + + morphChain := &chainParams{ + log: s.log, + cfg: cfg, + key: s.key, + name: morphPrefix, + from: fromSideChainBlock, + } + + // create morph client + s.morphClient, err = createClient(ctx, morphChain, errChan) + if err != nil { + return nil, err + } + + // create morph listener + s.morphListener, err = createListener(ctx, s.morphClient, morphChain) + if err != nil { + return nil, err + } + if err := s.morphClient.SetGroupSignerScope(); err != nil { + morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err)) + } + + return morphChain, nil +} + +func (s *Server) initContracts(cfg *viper.Viper) error { + var err error + // get all script hashes of contracts + s.contracts, err = parseContracts( + cfg, + s.morphClient, + s.withoutMainNet, + s.mainNotaryConfig.disabled, + s.sideNotaryConfig.disabled, + ) + + return err +} + +func (s *Server) initKey(cfg *viper.Viper) error { + // prepare inner ring node private key + acc, err := utilConfig.LoadAccount( + cfg.GetString("wallet.path"), + cfg.GetString("wallet.address"), + cfg.GetString("wallet.password")) + if err != nil { + return fmt.Errorf("ir: %w", err) + } + + s.key = acc.PrivateKey() + return nil +} + +func (s *Server) initMetrics(cfg *viper.Viper) { + if cfg.GetString("prometheus.address") == "" { + return + } + m := metrics.NewInnerRingMetrics() + s.metrics = &m +} diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 8f41c267b..063d0f7cd 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -2,47 +2,23 @@ package innerring import ( "context" - "encoding/hex" "errors" "fmt" "io" - "net" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap" - nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation" - addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress" - statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state" - subnetvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/subnet" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/reputation" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement" - auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit" timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit" balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" - cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" - frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" - repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation" - morphsubnet "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/subnet" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/timer" - audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager" control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir" - controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server" - reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common" - util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" - utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/precision" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state" @@ -50,13 +26,10 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/address" - "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "github.com/nspcc-dev/neo-go/pkg/util" - "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "go.uber.org/atomic" "go.uber.org/zap" - "google.golang.org/grpc" ) type ( @@ -354,8 +327,6 @@ func (s *Server) registerStarter(f func() error) { } // New creates instance of inner ring sever structure. -// -// nolint: funlen, gocognit func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan<- error) (*Server, error) { var err error server := &Server{log: log} @@ -365,128 +336,38 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan // parse notary support server.feeConfig = config.NewFeeConfig(cfg) - // prepare inner ring node private key - acc, err := utilConfig.LoadAccount( - cfg.GetString("wallet.path"), - cfg.GetString("wallet.address"), - cfg.GetString("wallet.password")) + err = server.initKey(cfg) if err != nil { - return nil, fmt.Errorf("ir: %w", err) + return nil, err } - server.key = acc.PrivateKey() - server.persistate, err = initPersistentStateStorage(cfg) if err != nil { return nil, err } server.registerCloser(server.persistate.Close) - fromSideChainBlock, err := server.persistate.UInt32(persistateSideChainLastBlockKey) - if err != nil { - fromSideChainBlock = 0 - log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) - } - - morphChain := &chainParams{ - log: log, - cfg: cfg, - key: server.key, - name: morphPrefix, - from: fromSideChainBlock, - } - - // create morph client - server.morphClient, err = createClient(ctx, morphChain, errChan) + var morphChain *chainParams + morphChain, err = server.initMorph(ctx, cfg, errChan) if err != nil { return nil, err } - // create morph listener - server.morphListener, err = createListener(ctx, server.morphClient, morphChain) - if err != nil { - return nil, err - } - if err := server.morphClient.SetGroupSignerScope(); err != nil { - morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err)) - } - - server.withoutMainNet = cfg.GetBool("without_mainnet") - - if server.withoutMainNet { - // This works as long as event Listener starts listening loop once, - // otherwise Server.Start will run two similar routines. - // This behavior most likely will not change. - server.mainnetListener = server.morphListener - server.mainnetClient = server.morphClient - } else { - mainnetChain := morphChain - mainnetChain.name = mainnetPrefix - mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry} - - fromMainChainBlock, err := server.persistate.UInt32(persistateMainChainLastBlockKey) - if err != nil { - fromMainChainBlock = 0 - log.Warn("can't get last processed main chain block number", zap.String("error", err.Error())) - } - mainnetChain.from = fromMainChainBlock - - // create mainnet client - server.mainnetClient, err = createClient(ctx, mainnetChain, errChan) - if err != nil { - return nil, err - } - - // create mainnet listener - server.mainnetListener, err = createListener(ctx, server.mainnetClient, mainnetChain) - if err != nil { - return nil, err - } - } - - server.mainNotaryConfig, server.sideNotaryConfig = notaryConfigs( - server.morphClient.ProbeNotary(), - !server.withoutMainNet && server.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too - ) - - log.Info("notary support", - zap.Bool("sidechain_enabled", !server.sideNotaryConfig.disabled), - zap.Bool("mainchain_enabled", !server.mainNotaryConfig.disabled), - ) - - // get all script hashes of contracts - server.contracts, err = parseContracts( - cfg, - server.morphClient, - server.withoutMainNet, - server.mainNotaryConfig.disabled, - server.sideNotaryConfig.disabled, - ) + err = server.initMainnet(ctx, cfg, morphChain, errChan) if err != nil { return nil, err } - if !server.sideNotaryConfig.disabled { - // enable notary support in the side client - err = server.morphClient.EnableNotarySupport( - client.WithProxyContract(server.contracts.proxy), - ) - if err != nil { - return nil, fmt.Errorf("could not enable side chain notary support: %w", err) - } + server.initNotaryConfig(cfg) - server.morphListener.EnableNotarySupport(server.contracts.proxy, server.morphClient.Committee, server.morphClient) + err = server.initContracts(cfg) + if err != nil { + return nil, err } - if !server.mainNotaryConfig.disabled { - // enable notary support in the main client - err = server.mainnetClient.EnableNotarySupport( - client.WithProxyContract(server.contracts.processing), - client.WithAlphabetSource(server.morphClient.Committee), - ) - if err != nil { - return nil, fmt.Errorf("could not enable main chain notary support: %w", err) - } + err = server.enableNotarySupport() + if err != nil { + return nil, err } // parse default validators @@ -497,482 +378,30 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan server.pubKey = server.key.PublicKey().Bytes() - auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size")) + var morphClients *serverMorphClients + morphClients, err = server.initClientsFromMorph() if err != nil { return nil, err } - fee := server.feeConfig.SideChainFee() - - // do not use TryNotary() in audit wrapper - // audit operations do not require multisignatures - server.auditClient, err = auditClient.NewFromMorph(server.morphClient, server.contracts.audit, fee) + var processors *serverProcessors + processors, err = server.initProcessors(cfg, morphClients) if err != nil { return nil, err } - // form morph container client's options - morphCnrOpts := make([]cntClient.Option, 0, 3) - morphCnrOpts = append(morphCnrOpts, - cntClient.TryNotary(), - cntClient.AsAlphabet(), - ) + server.initTimers(cfg, processors, morphClients) - if server.sideNotaryConfig.disabled { - // in non-notary environments we customize fee for named container registration - // because it takes much more additional GAS than other operations. - morphCnrOpts = append(morphCnrOpts, - cntClient.WithCustomFeeForNamedPut(server.feeConfig.NamedContainerRegistrationFee()), - ) - } - - cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, fee, morphCnrOpts...) + err = server.initGRPCServer(cfg) if err != nil { return nil, err } - server.netmapClient, err = nmClient.NewFromMorph(server.morphClient, server.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet()) - if err != nil { - return nil, err - } - - server.balanceClient, err = balanceClient.NewFromMorph(server.morphClient, server.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet()) - if err != nil { - return nil, err - } - - repClient, err := repClient.NewFromMorph(server.morphClient, server.contracts.reputation, fee, repClient.TryNotary(), repClient.AsAlphabet()) - if err != nil { - return nil, err - } - - frostfsIDClient, err := frostfsid.NewFromMorph(server.morphClient, server.contracts.frostfsID, fee, frostfsid.TryNotary(), frostfsid.AsAlphabet()) - if err != nil { - return nil, err - } - - frostfsCli, err := frostfsClient.NewFromMorph(server.mainnetClient, server.contracts.frostfs, - server.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet()) - if err != nil { - return nil, err - } - - // initialize morph client of Subnet contract - clientMode := morphsubnet.NotaryAlphabet - - if server.sideNotaryConfig.disabled { - clientMode = morphsubnet.NonNotary - } - - subnetInitPrm := morphsubnet.InitPrm{} - subnetInitPrm.SetBaseClient(server.morphClient) - subnetInitPrm.SetContractAddress(server.contracts.subnet) - subnetInitPrm.SetMode(clientMode) - - subnetClient := &morphsubnet.Client{} - err = subnetClient.Init(subnetInitPrm) - if err != nil { - return nil, fmt.Errorf("could not initialize subnet client: %w", err) - } - - var irf irFetcher - - if server.withoutMainNet || !server.mainNotaryConfig.disabled { - // if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs - // (naming `...WithNotary` will not always be correct) - irf = NewIRFetcherWithNotary(server.morphClient) - } else { - irf = NewIRFetcherWithoutNotary(server.netmapClient) - } - - server.statusIndex = newInnerRingIndexer( - server.morphClient, - irf, - server.key.PublicKey(), - cfg.GetDuration("indexer.cache_timeout"), - ) - - clientCache := newClientCache(&clientCacheParams{ - Log: log, - Key: &server.key.PrivateKey, - SGTimeout: cfg.GetDuration("audit.timeout.get"), - HeadTimeout: cfg.GetDuration("audit.timeout.head"), - RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"), - AllowExternal: cfg.GetBool("audit.allow_external"), - }) - - server.registerNoErrCloser(clientCache.cache.CloseAll) - - pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size") - porPoolSize := cfg.GetInt("audit.por.pool_size") - - // create audit processor dependencies - auditTaskManager := audittask.New( - audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), - audittask.WithWorkerPool(auditPool), - audittask.WithLogger(log), - audittask.WithContainerCommunicator(clientCache), - audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")), - audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) { - return ants.NewPool(pdpPoolSize) - }), - audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) { - return ants.NewPool(porPoolSize) - }), - ) - - server.workers = append(server.workers, auditTaskManager.Listen) - - // create audit processor - auditProcessor, err := audit.New(&audit.Params{ - Log: log, - NetmapClient: server.netmapClient, - ContainerClient: cnrClient, - IRList: server, - EpochSource: server, - SGSource: clientCache, - Key: &server.key.PrivateKey, - RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), - TaskManager: auditTaskManager, - Reporter: server, - }) - if err != nil { - return nil, err - } - - // create settlement processor dependencies - settlementDeps := settlementDeps{ - log: server.log, - cnrSrc: cntClient.AsContainerSource(cnrClient), - auditClient: server.auditClient, - nmClient: server.netmapClient, - clientCache: clientCache, - balanceClient: server.balanceClient, - } - - settlementDeps.settlementCtx = auditSettlementContext - auditCalcDeps := &auditSettlementDeps{ - settlementDeps: settlementDeps, - } - - settlementDeps.settlementCtx = basicIncomeSettlementContext - basicSettlementDeps := &basicIncomeSettlementDeps{ - settlementDeps: settlementDeps, - cnrClient: cnrClient, - } - - auditSettlementCalc := auditSettlement.NewCalculator( - &auditSettlement.CalculatorPrm{ - ResultStorage: auditCalcDeps, - ContainerStorage: auditCalcDeps, - PlacementCalculator: auditCalcDeps, - SGStorage: auditCalcDeps, - AccountStorage: auditCalcDeps, - Exchanger: auditCalcDeps, - AuditFeeFetcher: server.netmapClient, - }, - auditSettlement.WithLogger(server.log), - ) - - // create settlement processor - settlementProcessor := settlement.New( - settlement.Prm{ - AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc), - BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps}, - State: server, - }, - settlement.WithLogger(server.log), - ) - - locodeValidator, err := server.newLocodeValidator(cfg) - if err != nil { - return nil, err - } - - subnetValidator, err := subnetvalidator.New( - subnetvalidator.Prm{ - SubnetClient: subnetClient, - }, - ) - if err != nil { - return nil, err - } - - var alphaSync event.Handler - - if server.withoutMainNet || cfg.GetBool("governance.disable") { - alphaSync = func(event.Event) { - log.Debug("alphabet keys sync is disabled") - } - } else { - // create governance processor - governanceProcessor, err := governance.New(&governance.Params{ - Log: log, - FrostFSClient: frostfsCli, - NetmapClient: server.netmapClient, - AlphabetState: server, - EpochState: server, - Voter: server, - IRFetcher: irf, - MorphClient: server.morphClient, - MainnetClient: server.mainnetClient, - NotaryDisabled: server.sideNotaryConfig.disabled, - }) - if err != nil { - return nil, err - } - - alphaSync = governanceProcessor.HandleAlphabetSync - err = bindMainnetProcessor(governanceProcessor, server) - if err != nil { - return nil, err - } - } - - netSettings := (*networkSettings)(server.netmapClient) - - var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator - netMapCandidateStateValidator.SetNetworkSettings(netSettings) - - // create netmap processor - server.netmapProcessor, err = netmap.New(&netmap.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.netmap"), - NetmapClient: server.netmapClient, - EpochTimer: server, - EpochState: server, - AlphabetState: server, - CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), - CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), - ContainerWrapper: cnrClient, - HandleAudit: server.onlyActiveEventHandler( - auditProcessor.StartAuditHandler(), - ), - NotaryDepositHandler: server.onlyAlphabetEventHandler( - server.notaryHandler, - ), - AuditSettlementsHandler: server.onlyAlphabetEventHandler( - settlementProcessor.HandleAuditEvent, - ), - AlphabetSyncHandler: alphaSync, - NodeValidator: nodevalidator.New( - &netMapCandidateStateValidator, - addrvalidator.New(), - locodeValidator, - subnetValidator, - ), - NotaryDisabled: server.sideNotaryConfig.disabled, - SubnetContract: &server.contracts.subnet, - - NodeStateSettings: netSettings, - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(server.netmapProcessor, server) - if err != nil { - return nil, err - } - - // container processor - containerProcessor, err := container.New(&container.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.container"), - AlphabetState: server, - ContainerClient: cnrClient, - FrostFSIDClient: frostfsIDClient, - NetworkState: server.netmapClient, - NotaryDisabled: server.sideNotaryConfig.disabled, - SubnetClient: subnetClient, - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(containerProcessor, server) - if err != nil { - return nil, err - } - - // create balance processor - balanceProcessor, err := balance.New(&balance.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.balance"), - FrostFSClient: frostfsCli, - BalanceSC: server.contracts.balance, - AlphabetState: server, - Converter: &server.precision, - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(balanceProcessor, server) - if err != nil { - return nil, err - } - - if !server.withoutMainNet { - // create mainnnet frostfs processor - frostfsProcessor, err := frostfs.New(&frostfs.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.frostfs"), - FrostFSContract: server.contracts.frostfs, - FrostFSIDClient: frostfsIDClient, - BalanceClient: server.balanceClient, - NetmapClient: server.netmapClient, - MorphClient: server.morphClient, - EpochState: server, - AlphabetState: server, - Converter: &server.precision, - MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"), - MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"), - MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")), - GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"), - }) - if err != nil { - return nil, err - } - - err = bindMainnetProcessor(frostfsProcessor, server) - if err != nil { - return nil, err - } - } - - parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets")) - if err != nil { - return nil, err - } - - // create alphabet processor - alphabetProcessor, err := alphabet.New(&alphabet.Params{ - ParsedWallets: parsedWallets, - Log: log, - PoolSize: cfg.GetInt("workers.alphabet"), - AlphabetContracts: server.contracts.alphabet, - NetmapClient: server.netmapClient, - MorphClient: server.morphClient, - IRList: server, - StorageEmission: cfg.GetUint64("emit.storage.amount"), - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(alphabetProcessor, server) - if err != nil { - return nil, err - } - - // create reputation processor - reputationProcessor, err := reputation.New(&reputation.Params{ - Log: log, - PoolSize: cfg.GetInt("workers.reputation"), - EpochState: server, - AlphabetState: server, - ReputationWrapper: repClient, - ManagerBuilder: reputationcommon.NewManagerBuilder( - reputationcommon.ManagersPrm{ - NetMapSource: server.netmapClient, - }, - ), - NotaryDisabled: server.sideNotaryConfig.disabled, - }) - if err != nil { - return nil, err - } - - err = bindMorphProcessor(reputationProcessor, server) - if err != nil { - return nil, err - } - - // initialize epoch timers - server.epochTimer = newEpochTimer(&epochTimerArgs{ - l: server.log, - newEpochHandlers: server.newEpochTickHandlers(), - cnrWrapper: cnrClient, - epoch: server, - stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"), - stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"), - collectBasicIncome: subEpochEventHandler{ - handler: settlementProcessor.HandleIncomeCollectionEvent, - durationMul: cfg.GetUint32("timers.collect_basic_income.mul"), - durationDiv: cfg.GetUint32("timers.collect_basic_income.div"), - }, - distributeBasicIncome: subEpochEventHandler{ - handler: settlementProcessor.HandleIncomeDistributionEvent, - durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"), - durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"), - }, - }) - - server.addBlockTimer(server.epochTimer) - - // initialize emission timer - emissionTimer := newEmissionTimer(&emitTimerArgs{ - ap: alphabetProcessor, - emitDuration: cfg.GetUint32("timers.emit"), - }) - - server.addBlockTimer(emissionTimer) - - controlSvcEndpoint := cfg.GetString("control.grpc.endpoint") - if controlSvcEndpoint != "" { - authKeysStr := cfg.GetStringSlice("control.authorized_keys") - authKeys := make([][]byte, 0, len(authKeysStr)) - - for i := range authKeysStr { - key, err := hex.DecodeString(authKeysStr[i]) - if err != nil { - return nil, fmt.Errorf("could not parse Control authorized key %s: %w", - authKeysStr[i], - err, - ) - } - - authKeys = append(authKeys, key) - } - - var p controlsrv.Prm - - p.SetPrivateKey(*server.key) - p.SetHealthChecker(server) - - controlSvc := controlsrv.New(p, - controlsrv.WithAllowedKeys(authKeys), - ) - - grpcControlSrv := grpc.NewServer() - control.RegisterControlServiceServer(grpcControlSrv, controlSvc) - - server.runners = append(server.runners, func(ch chan<- error) error { - lis, err := net.Listen("tcp", controlSvcEndpoint) - if err != nil { - return err - } - - go func() { - ch <- grpcControlSrv.Serve(lis) - }() - return nil - }) - - server.registerNoErrCloser(grpcControlSrv.GracefulStop) - } else { - log.Info("no Control server endpoint specified, service is disabled") - } - server.initSubnet(subnetConfig{ queueSize: cfg.GetUint32("workers.subnet"), }) - if cfg.GetString("prometheus.address") != "" { - m := metrics.NewInnerRingMetrics() - server.metrics = &m - } + server.initMetrics(cfg) return server, nil } -- 2.45.3 From 4c0c962b291789ab53fca99147dc6eaa5e040098 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 29 Mar 2023 17:22:34 +0300 Subject: [PATCH 4/8] [#185] ir: Refactor gas emit Resolve funlen linter for processEmit method Signed-off-by: Dmitrii Stepanov --- pkg/innerring/processors/alphabet/process_emit.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/innerring/processors/alphabet/process_emit.go b/pkg/innerring/processors/alphabet/process_emit.go index 10dcf079d..90c484b88 100644 --- a/pkg/innerring/processors/alphabet/process_emit.go +++ b/pkg/innerring/processors/alphabet/process_emit.go @@ -3,6 +3,7 @@ package alphabet import ( "crypto/elliptic" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/encoding/fixedn" "go.uber.org/zap" @@ -10,7 +11,6 @@ import ( const emitMethod = "emit" -// nolint: funlen func (ap *Processor) processEmit() { index := ap.irList.AlphabetIndex() if index < 0 { @@ -63,6 +63,12 @@ func (ap *Processor) processEmit() { gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen)) + ap.transferGasToNetmapNodes(nmNodes, gasPerNode) + + ap.transferGasToExtraNodes(extraLen, gasPerNode) +} + +func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) { for i := range nmNodes { keyBytes := nmNodes[i].PublicKey() @@ -83,9 +89,11 @@ func (ap *Processor) processEmit() { ) } } +} +func (ap *Processor) transferGasToExtraNodes(extraLen int, gasPerNode fixedn.Fixed8) { if extraLen != 0 { - err = ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode) + err := ap.morphClient.BatchTransferGas(ap.parsedWallets, gasPerNode) if err != nil { receiversLog := make([]string, extraLen) for i, addr := range ap.parsedWallets { -- 2.45.3 From c51388fabc99e600fec2d94f11d9f8a2d5995da4 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 29 Mar 2023 17:33:49 +0300 Subject: [PATCH 5/8] [#185] ir: Refactor audit Resolve funlen linter for processStartAudit method Signed-off-by: Dmitrii Stepanov --- pkg/innerring/processors/audit/process.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/innerring/processors/audit/process.go b/pkg/innerring/processors/audit/process.go index 3bdb00108..7f148e57e 100644 --- a/pkg/innerring/processors/audit/process.go +++ b/pkg/innerring/processors/audit/process.go @@ -17,7 +17,6 @@ import ( "go.uber.org/zap" ) -// nolint: funlen func (ap *Processor) processStartAudit(epoch uint64) { log := ap.log.With(zap.Uint64("epoch", epoch)) @@ -52,6 +51,10 @@ func (ap *Processor) processStartAudit(epoch uint64) { pivot := make([]byte, sha256.Size) + ap.startAuditTasksOnContainers(auditCtx, containers, log, pivot, nm, epoch) +} + +func (ap *Processor) startAuditTasksOnContainers(ctx context.Context, containers []cid.ID, log *zap.Logger, pivot []byte, nm *netmap.NetMap, epoch uint64) { for i := range containers { cnr, err := cntClient.Get(ap.containerClient, containers[i]) // get container structure if err != nil { @@ -104,7 +107,7 @@ func (ap *Processor) processStartAudit(epoch uint64) { epoch: epoch, rep: ap.reporter, }). - WithAuditContext(auditCtx). + WithAuditContext(ctx). WithContainerID(containers[i]). WithStorageGroupList(storageGroups). WithContainerStructure(cnr.Value). -- 2.45.3 From dc42b0c18baf9e44b1caddd9feddb98daad4fa14 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 30 Mar 2023 10:35:16 +0300 Subject: [PATCH 6/8] [#185] ir: Refactor alphabet update Resolve funlen linter for processAlphabetSync method Signed-off-by: Dmitrii Stepanov --- .../processors/governance/process_update.go | 156 ++++++++++-------- 1 file changed, 85 insertions(+), 71 deletions(-) diff --git a/pkg/innerring/processors/governance/process_update.go b/pkg/innerring/processors/governance/process_update.go index c9764fee4..3504e7a53 100644 --- a/pkg/innerring/processors/governance/process_update.go +++ b/pkg/innerring/processors/governance/process_update.go @@ -18,7 +18,6 @@ const ( alphabetUpdateIDPrefix = "AlphabetUpdate" ) -// nolint: funlen func (gp *Processor) processAlphabetSync(txHash util.Uint256) { if !gp.alphabetState.IsAlphabet() { gp.log.Info("non alphabet mode, ignore alphabet sync") @@ -69,79 +68,13 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) { } // 2. Update NeoFSAlphabet role in the sidechain. - innerRing, err := gp.irFetcher.InnerRingKeys() - if err != nil { - gp.log.Error("can't fetch inner ring list from side chain", - zap.String("error", err.Error())) - } else { - newInnerRing, err := updateInnerRing(innerRing, sidechainAlphabet, newAlphabet) - if err != nil { - gp.log.Error("can't create new inner ring list with new alphabet keys", - zap.String("error", err.Error())) - } else { - sort.Sort(newInnerRing) + gp.updateNeoFSAlphabetRoleInSidechain(sidechainAlphabet, newAlphabet, txHash) - gp.log.Info("update of the inner ring list", - zap.String("before", prettyKeys(innerRing)), - zap.String("after", prettyKeys(newInnerRing)), - ) - - if gp.notaryDisabled { - updPrm := nmClient.UpdateIRPrm{} - - updPrm.SetKeys(newInnerRing) - updPrm.SetHash(txHash) - - err = gp.netmapClient.UpdateInnerRing(updPrm) - } else { - updPrm := client.UpdateAlphabetListPrm{} - - updPrm.SetList(newInnerRing) - updPrm.SetHash(txHash) - - err = gp.morphClient.UpdateNeoFSAlphabetList(updPrm) - } - - if err != nil { - gp.log.Error("can't update inner ring list with new alphabet keys", - zap.String("error", err.Error())) - } - } - } - - if !gp.notaryDisabled { - // 3. Update notary role in the sidechain. - - updPrm := client.UpdateNotaryListPrm{} - - updPrm.SetList(newAlphabet) - updPrm.SetHash(txHash) - - err = gp.morphClient.UpdateNotaryList(updPrm) - if err != nil { - gp.log.Error("can't update list of notary nodes in side chain", - zap.String("error", err.Error())) - } - } + // 3. Update notary role in the sidechain. + gp.updateNotaryRoleInSidechain(newAlphabet, txHash) // 4. Update FrostFS contract in the mainnet. - epoch := gp.epochState.EpochCounter() - - buf := make([]byte, 8) - binary.LittleEndian.PutUint64(buf, epoch) - - id := append([]byte(alphabetUpdateIDPrefix), buf...) - - prm := frostfscontract.AlphabetUpdatePrm{} - - prm.SetID(id) - prm.SetPubs(newAlphabet) - - err = gp.frostfsClient.AlphabetUpdate(prm) - if err != nil { - gp.log.Error("can't update list of alphabet nodes in frostfs contract", - zap.String("error", err.Error())) - } + gp.updateFrostFSContractInMainnet(newAlphabet) gp.log.Info("finished alphabet list update") } @@ -157,3 +90,84 @@ func prettyKeys(keys keys.PublicKeys) string { return strings.TrimRight(sb.String(), delimiter) } + +func (gp *Processor) updateNeoFSAlphabetRoleInSidechain(sidechainAlphabet, newAlphabet keys.PublicKeys, txHash util.Uint256) { + innerRing, err := gp.irFetcher.InnerRingKeys() + if err != nil { + gp.log.Error("can't fetch inner ring list from side chain", + zap.String("error", err.Error())) + return + } + + newInnerRing, err := updateInnerRing(innerRing, sidechainAlphabet, newAlphabet) + if err != nil { + gp.log.Error("can't create new inner ring list with new alphabet keys", + zap.String("error", err.Error())) + return + } + + sort.Sort(newInnerRing) + + gp.log.Info("update of the inner ring list", + zap.String("before", prettyKeys(innerRing)), + zap.String("after", prettyKeys(newInnerRing)), + ) + + if gp.notaryDisabled { + updPrm := nmClient.UpdateIRPrm{} + + updPrm.SetKeys(newInnerRing) + updPrm.SetHash(txHash) + + err = gp.netmapClient.UpdateInnerRing(updPrm) + } else { + updPrm := client.UpdateAlphabetListPrm{} + + updPrm.SetList(newInnerRing) + updPrm.SetHash(txHash) + + err = gp.morphClient.UpdateNeoFSAlphabetList(updPrm) + } + + if err != nil { + gp.log.Error("can't update inner ring list with new alphabet keys", + zap.String("error", err.Error())) + } +} + +func (gp *Processor) updateNotaryRoleInSidechain(newAlphabet keys.PublicKeys, txHash util.Uint256) { + if gp.notaryDisabled { + return + } + + updPrm := client.UpdateNotaryListPrm{} + + updPrm.SetList(newAlphabet) + updPrm.SetHash(txHash) + + err := gp.morphClient.UpdateNotaryList(updPrm) + if err != nil { + gp.log.Error("can't update list of notary nodes in side chain", + zap.String("error", err.Error())) + } +} + +func (gp *Processor) updateFrostFSContractInMainnet(newAlphabet keys.PublicKeys) { + epoch := gp.epochState.EpochCounter() + + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, epoch) + + id := append([]byte(alphabetUpdateIDPrefix), buf...) + + prm := frostfscontract.AlphabetUpdatePrm{} + + prm.SetID(id) + prm.SetPubs(newAlphabet) + + err := gp.frostfsClient.AlphabetUpdate(prm) + if err != nil { + gp.log.Error("can't update list of alphabet nodes in frostfs contract", + zap.String("error", err.Error())) + } +} -- 2.45.3 From b38858c8a652facce7b9ce98f76fb05e4a280406 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 30 Mar 2023 10:46:43 +0300 Subject: [PATCH 7/8] [#185] ir: Refactor signature verification Resolve funlen linter for verifySignature method Signed-off-by: Dmitrii Stepanov --- pkg/innerring/processors/container/common.go | 84 ++++++++++---------- 1 file changed, 43 insertions(+), 41 deletions(-) diff --git a/pkg/innerring/processors/container/common.go b/pkg/innerring/processors/container/common.go index 56adc0ceb..375e4c179 100644 --- a/pkg/innerring/processors/container/common.go +++ b/pkg/innerring/processors/container/common.go @@ -46,8 +46,6 @@ type signatureVerificationData struct { // - v.binPublicKey is a public session key // - session context corresponds to the container and verb in v // - session is "alive" -// -// nolint: funlen func (cp *Processor) verifySignature(v signatureVerificationData) error { var err error var key frostfsecdsa.PublicKeyRFC6979 @@ -61,45 +59,7 @@ func (cp *Processor) verifySignature(v signatureVerificationData) error { } if len(v.binTokenSession) > 0 { - var tok session.Container - - err = tok.Unmarshal(v.binTokenSession) - if err != nil { - return fmt.Errorf("decode session token: %w", err) - } - - if !tok.VerifySignature() { - return errors.New("invalid session token signature") - } - - // FIXME(@cthulhu-rider): #1387 check token is signed by container owner, see neofs-sdk-go#233 - - if keyProvided && !tok.AssertAuthKey(&key) { - return errors.New("signed with a non-session key") - } - - if !tok.AssertVerb(v.verb) { - return errWrongSessionVerb - } - - if v.idContainerSet && !tok.AppliedTo(v.idContainer) { - return errWrongCID - } - - if !session.IssuedBy(tok, v.ownerContainer) { - return errors.New("owner differs with token owner") - } - - err = cp.checkTokenLifetime(tok) - if err != nil { - return fmt.Errorf("check session lifetime: %w", err) - } - - if !tok.VerifySessionDataSignature(v.signedData, v.signature) { - return errors.New("invalid signature calculated with session key") - } - - return nil + return cp.verifyByTokenSession(v, &key, keyProvided) } if keyProvided { @@ -145,3 +105,45 @@ func (cp *Processor) checkTokenLifetime(token session.Container) error { return nil } + +func (cp *Processor) verifyByTokenSession(v signatureVerificationData, key *frostfsecdsa.PublicKeyRFC6979, keyProvided bool) error { + var tok session.Container + + err := tok.Unmarshal(v.binTokenSession) + if err != nil { + return fmt.Errorf("decode session token: %w", err) + } + + if !tok.VerifySignature() { + return errors.New("invalid session token signature") + } + + // FIXME(@cthulhu-rider): #1387 check token is signed by container owner, see neofs-sdk-go#233 + + if keyProvided && !tok.AssertAuthKey(key) { + return errors.New("signed with a non-session key") + } + + if !tok.AssertVerb(v.verb) { + return errWrongSessionVerb + } + + if v.idContainerSet && !tok.AppliedTo(v.idContainer) { + return errWrongCID + } + + if !session.IssuedBy(tok, v.ownerContainer) { + return errors.New("owner differs with token owner") + } + + err = cp.checkTokenLifetime(tok) + if err != nil { + return fmt.Errorf("check session lifetime: %w", err) + } + + if !tok.VerifySessionDataSignature(v.signedData, v.signature) { + return errors.New("invalid signature calculated with session key") + } + + return nil +} -- 2.45.3 From d161f83613d121f368ec649ed83fe0a6161548a0 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 30 Mar 2023 11:03:00 +0300 Subject: [PATCH 8/8] [#185] ir: Resolve containedctx linter Signed-off-by: Dmitrii Stepanov --- pkg/innerring/internal/client/client.go | 27 ++++++++++--------------- pkg/innerring/internal/client/prm.go | 13 ------------ pkg/innerring/rpc.go | 8 +++----- 3 files changed, 14 insertions(+), 34 deletions(-) diff --git a/pkg/innerring/internal/client/client.go b/pkg/innerring/internal/client/client.go index edcbc6ae5..3e95e9766 100644 --- a/pkg/innerring/internal/client/client.go +++ b/pkg/innerring/internal/client/client.go @@ -35,8 +35,6 @@ func (x *Client) SetPrivateKey(key *ecdsa.PrivateKey) { // SearchSGPrm groups parameters of SearchSG operation. type SearchSGPrm struct { - contextPrm - cnrID cid.ID } @@ -60,13 +58,13 @@ var sgFilter = storagegroup.SearchQuery() // SearchSG lists objects of storage group type in the container. // // Returns any error which prevented the operation from completing correctly in error return. -func (x Client) SearchSG(prm SearchSGPrm) (*SearchSGRes, error) { +func (x Client) SearchSG(ctx context.Context, prm SearchSGPrm) (*SearchSGRes, error) { var cliPrm client.PrmObjectSearch cliPrm.InContainer(prm.cnrID) cliPrm.SetFilters(sgFilter) cliPrm.UseKey(*x.key) - rdr, err := x.c.ObjectSearchInit(prm.ctx, cliPrm) + rdr, err := x.c.ObjectSearchInit(ctx, cliPrm) if err != nil { return nil, fmt.Errorf("init object search: %w", err) } @@ -119,13 +117,13 @@ func (x GetObjectRes) Object() *object.Object { // GetObject reads the object by address. // // Returns any error which prevented the operation from completing correctly in error return. -func (x Client) GetObject(prm GetObjectPrm) (*GetObjectRes, error) { +func (x Client) GetObject(ctx context.Context, prm GetObjectPrm) (*GetObjectRes, error) { var cliPrm client.PrmObjectGet cliPrm.FromContainer(prm.objAddr.Container()) cliPrm.ByID(prm.objAddr.Object()) cliPrm.UseKey(*x.key) - rdr, err := x.c.ObjectGetInit(prm.ctx, cliPrm) + rdr, err := x.c.ObjectGetInit(ctx, cliPrm) if err != nil { return nil, fmt.Errorf("init object search: %w", err) } @@ -189,7 +187,7 @@ func (x HeadObjectRes) Header() *object.Object { // // Returns any error which prevented the operation from completing correctly in error return. // For raw requests, returns *object.SplitInfoError error if the requested object is virtual. -func (x Client) HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) { +func (x Client) HeadObject(ctx context.Context, prm HeadObjectPrm) (*HeadObjectRes, error) { var cliPrm client.PrmObjectHead if prm.raw { @@ -204,7 +202,7 @@ func (x Client) HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) { cliPrm.ByID(prm.objAddr.Object()) cliPrm.UseKey(*x.key) - cliRes, err := x.c.ObjectHead(prm.ctx, cliPrm) + cliRes, err := x.c.ObjectHead(ctx, cliPrm) if err == nil { // pull out an error from status err = apistatus.ErrFromStatus(cliRes.Status()) @@ -231,10 +229,9 @@ func (x Client) HeadObject(prm HeadObjectPrm) (*HeadObjectRes, error) { func GetObjectPayload(ctx context.Context, c Client, addr oid.Address) ([]byte, error) { var prm GetObjectPrm - prm.SetContext(ctx) prm.SetAddress(addr) - obj, err := c.GetObject(prm) + obj, err := c.GetObject(ctx, prm) if err != nil { return nil, err } @@ -245,7 +242,6 @@ func GetObjectPayload(ctx context.Context, c Client, addr oid.Address) ([]byte, func headObject(ctx context.Context, c Client, addr oid.Address, raw bool, ttl uint32) (*object.Object, error) { var prm HeadObjectPrm - prm.SetContext(ctx) prm.SetAddress(addr) prm.SetTTL(ttl) @@ -253,7 +249,7 @@ func headObject(ctx context.Context, c Client, addr oid.Address, raw bool, ttl u prm.SetRawFlag() } - obj, err := c.HeadObject(prm) + obj, err := c.HeadObject(ctx, prm) if err != nil { return nil, err } @@ -298,14 +294,14 @@ func (x HashPayloadRangeRes) Hash() []byte { // from the remote server's local storage. // // Returns any error which prevented the operation from completing correctly in error return. -func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeRes, err error) { +func (x Client) HashPayloadRange(ctx context.Context, prm HashPayloadRangePrm) (res HashPayloadRangeRes, err error) { var cliPrm client.PrmObjectHash cliPrm.FromContainer(prm.objAddr.Container()) cliPrm.ByID(prm.objAddr.Object()) cliPrm.SetRangeList(prm.rng.GetOffset(), prm.rng.GetLength()) cliPrm.TillichZemorAlgo() - cliRes, err := x.c.ObjectHash(prm.ctx, cliPrm) + cliRes, err := x.c.ObjectHash(ctx, cliPrm) if err == nil { // pull out an error from status err = apistatus.ErrFromStatus(cliRes.Status()) @@ -331,11 +327,10 @@ func (x Client) HashPayloadRange(prm HashPayloadRangePrm) (res HashPayloadRangeR func HashObjectRange(ctx context.Context, c Client, addr oid.Address, rng *object.Range) ([]byte, error) { var prm HashPayloadRangePrm - prm.SetContext(ctx) prm.SetAddress(addr) prm.SetRange(rng) - res, err := c.HashPayloadRange(prm) + res, err := c.HashPayloadRange(ctx, prm) if err != nil { return nil, err } diff --git a/pkg/innerring/internal/client/prm.go b/pkg/innerring/internal/client/prm.go index 5498eb379..9b5872434 100644 --- a/pkg/innerring/internal/client/prm.go +++ b/pkg/innerring/internal/client/prm.go @@ -1,21 +1,9 @@ package frostfsapiclient import ( - "context" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) -// nolint: containedctx -type contextPrm struct { - ctx context.Context -} - -// SetContext sets context.Context used for network communication. -func (x *contextPrm) SetContext(ctx context.Context) { - x.ctx = ctx -} - type objectAddressPrm struct { objAddr oid.Address } @@ -26,6 +14,5 @@ func (x *objectAddressPrm) SetAddress(addr oid.Address) { } type getObjectPrm struct { - contextPrm objectAddressPrm } diff --git a/pkg/innerring/rpc.go b/pkg/innerring/rpc.go index 013023b09..665e94232 100644 --- a/pkg/innerring/rpc.go +++ b/pkg/innerring/rpc.go @@ -100,12 +100,11 @@ func (c *ClientCache) getSG(ctx context.Context, addr oid.Address, nm *netmap.Ne continue } - cctx, cancel := context.WithTimeout(ctx, c.sgTimeout) - getObjPrm.SetContext(cctx) + ctx, cancel := context.WithTimeout(ctx, c.sgTimeout) // NOTE: we use the function which does not verify object integrity (checksums, signature), // but it would be useful to do as part of a data audit. - res, err := cli.GetObject(getObjPrm) + res, err := cli.GetObject(ctx, getObjPrm) cancel() @@ -223,10 +222,9 @@ func (c ClientCache) ListSG(dst *storagegroup2.SearchSGDst, prm storagegroup2.Se var cliPrm frostfsapiclient.SearchSGPrm - cliPrm.SetContext(prm.Context) cliPrm.SetContainerID(prm.Container) - res, err := cli.SearchSG(cliPrm) + res, err := cli.SearchSG(prm.Context, cliPrm) if err != nil { return err } -- 2.45.3