[#185] ir: Refactor ir service creation
Resolve funlen linter for New function Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
c8a6978563
commit
4d160bd4ab
2 changed files with 775 additions and 590 deletions
756
pkg/innerring/initialization.go
Normal file
756
pkg/innerring/initialization.go
Normal file
|
@ -0,0 +1,756 @@
|
||||||
|
package innerring
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance"
|
||||||
|
cont "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container"
|
||||||
|
frostfs "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
|
||||||
|
nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation"
|
||||||
|
addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress"
|
||||||
|
statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
||||||
|
subnetvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/subnet"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/reputation"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement"
|
||||||
|
auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
|
auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit"
|
||||||
|
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
|
frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
||||||
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
|
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
|
||||||
|
morphsubnet "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/subnet"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
|
audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager"
|
||||||
|
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||||
|
controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server"
|
||||||
|
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
|
||||||
|
util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
|
utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Server) initNetmapProcessor(cfg *viper.Viper,
|
||||||
|
cnrClient *container.Client,
|
||||||
|
alphaSync event.Handler,
|
||||||
|
subnetClient *morphsubnet.Client,
|
||||||
|
auditProcessor *audit.Processor,
|
||||||
|
settlementProcessor *settlement.Processor) error {
|
||||||
|
locodeValidator, err := s.newLocodeValidator(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
subnetValidator, err := subnetvalidator.New(
|
||||||
|
subnetvalidator.Prm{
|
||||||
|
SubnetClient: subnetClient,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
netSettings := (*networkSettings)(s.netmapClient)
|
||||||
|
|
||||||
|
var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator
|
||||||
|
netMapCandidateStateValidator.SetNetworkSettings(netSettings)
|
||||||
|
|
||||||
|
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
||||||
|
Log: s.log,
|
||||||
|
PoolSize: cfg.GetInt("workers.netmap"),
|
||||||
|
NetmapClient: s.netmapClient,
|
||||||
|
EpochTimer: s,
|
||||||
|
EpochState: s,
|
||||||
|
AlphabetState: s,
|
||||||
|
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
|
||||||
|
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
|
||||||
|
ContainerWrapper: cnrClient,
|
||||||
|
HandleAudit: s.onlyActiveEventHandler(
|
||||||
|
auditProcessor.StartAuditHandler(),
|
||||||
|
),
|
||||||
|
NotaryDepositHandler: s.onlyAlphabetEventHandler(
|
||||||
|
s.notaryHandler,
|
||||||
|
),
|
||||||
|
AuditSettlementsHandler: s.onlyAlphabetEventHandler(
|
||||||
|
settlementProcessor.HandleAuditEvent,
|
||||||
|
),
|
||||||
|
AlphabetSyncHandler: alphaSync,
|
||||||
|
NodeValidator: nodevalidator.New(
|
||||||
|
&netMapCandidateStateValidator,
|
||||||
|
addrvalidator.New(),
|
||||||
|
locodeValidator,
|
||||||
|
subnetValidator,
|
||||||
|
),
|
||||||
|
NotaryDisabled: s.sideNotaryConfig.disabled,
|
||||||
|
SubnetContract: &s.contracts.subnet,
|
||||||
|
|
||||||
|
NodeStateSettings: netSettings,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bindMorphProcessor(s.netmapProcessor, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initMainnet(ctx context.Context, cfg *viper.Viper, morphChain *chainParams, errChan chan<- error) error {
|
||||||
|
s.withoutMainNet = cfg.GetBool("without_mainnet")
|
||||||
|
if s.withoutMainNet {
|
||||||
|
// This works as long as event Listener starts listening loop once,
|
||||||
|
// otherwise Server.Start will run two similar routines.
|
||||||
|
// This behavior most likely will not change.
|
||||||
|
s.mainnetListener = s.morphListener
|
||||||
|
s.mainnetClient = s.morphClient
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
mainnetChain := morphChain
|
||||||
|
mainnetChain.name = mainnetPrefix
|
||||||
|
mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry}
|
||||||
|
|
||||||
|
fromMainChainBlock, err := s.persistate.UInt32(persistateMainChainLastBlockKey)
|
||||||
|
if err != nil {
|
||||||
|
fromMainChainBlock = 0
|
||||||
|
s.log.Warn("can't get last processed main chain block number", zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
mainnetChain.from = fromMainChainBlock
|
||||||
|
|
||||||
|
// create mainnet client
|
||||||
|
s.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// create mainnet listener
|
||||||
|
s.mainnetListener, err = createListener(ctx, s.mainnetClient, mainnetChain)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) enableNotarySupport() error {
|
||||||
|
if !s.sideNotaryConfig.disabled {
|
||||||
|
// enable notary support in the side client
|
||||||
|
err := s.morphClient.EnableNotarySupport(
|
||||||
|
client.WithProxyContract(s.contracts.proxy),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not enable side chain notary support: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.morphListener.EnableNotarySupport(s.contracts.proxy, s.morphClient.Committee, s.morphClient)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !s.mainNotaryConfig.disabled {
|
||||||
|
// enable notary support in the main client
|
||||||
|
err := s.mainnetClient.EnableNotarySupport(
|
||||||
|
client.WithProxyContract(s.contracts.processing),
|
||||||
|
client.WithAlphabetSource(s.morphClient.Committee),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not enable main chain notary support: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initNotaryConfig(cfg *viper.Viper) {
|
||||||
|
s.mainNotaryConfig, s.sideNotaryConfig = notaryConfigs(
|
||||||
|
s.morphClient.ProbeNotary(),
|
||||||
|
!s.withoutMainNet && s.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
|
||||||
|
)
|
||||||
|
|
||||||
|
s.log.Info("notary support",
|
||||||
|
zap.Bool("sidechain_enabled", !s.sideNotaryConfig.disabled),
|
||||||
|
zap.Bool("mainchain_enabled", !s.mainNotaryConfig.disabled),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) createAuditProcessor(cfg *viper.Viper, clientCache *ClientCache, cnrClient *container.Client) (*audit.Processor, error) {
|
||||||
|
auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
|
||||||
|
porPoolSize := cfg.GetInt("audit.por.pool_size")
|
||||||
|
|
||||||
|
// create audit processor dependencies
|
||||||
|
auditTaskManager := audittask.New(
|
||||||
|
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
|
||||||
|
audittask.WithWorkerPool(auditPool),
|
||||||
|
audittask.WithLogger(s.log),
|
||||||
|
audittask.WithContainerCommunicator(clientCache),
|
||||||
|
audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")),
|
||||||
|
audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) {
|
||||||
|
return ants.NewPool(pdpPoolSize)
|
||||||
|
}),
|
||||||
|
audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) {
|
||||||
|
return ants.NewPool(porPoolSize)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
|
||||||
|
s.workers = append(s.workers, auditTaskManager.Listen)
|
||||||
|
|
||||||
|
// create audit processor
|
||||||
|
return audit.New(&audit.Params{
|
||||||
|
Log: s.log,
|
||||||
|
NetmapClient: s.netmapClient,
|
||||||
|
ContainerClient: cnrClient,
|
||||||
|
IRList: s,
|
||||||
|
EpochSource: s,
|
||||||
|
SGSource: clientCache,
|
||||||
|
Key: &s.key.PrivateKey,
|
||||||
|
RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"),
|
||||||
|
TaskManager: auditTaskManager,
|
||||||
|
Reporter: s,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) createSettlementProcessor(clientCache *ClientCache, cnrClient *container.Client) *settlement.Processor {
|
||||||
|
// create settlement processor dependencies
|
||||||
|
settlementDeps := settlementDeps{
|
||||||
|
log: s.log,
|
||||||
|
cnrSrc: cntClient.AsContainerSource(cnrClient),
|
||||||
|
auditClient: s.auditClient,
|
||||||
|
nmClient: s.netmapClient,
|
||||||
|
clientCache: clientCache,
|
||||||
|
balanceClient: s.balanceClient,
|
||||||
|
}
|
||||||
|
|
||||||
|
settlementDeps.settlementCtx = auditSettlementContext
|
||||||
|
auditCalcDeps := &auditSettlementDeps{
|
||||||
|
settlementDeps: settlementDeps,
|
||||||
|
}
|
||||||
|
|
||||||
|
settlementDeps.settlementCtx = basicIncomeSettlementContext
|
||||||
|
basicSettlementDeps := &basicIncomeSettlementDeps{
|
||||||
|
settlementDeps: settlementDeps,
|
||||||
|
cnrClient: cnrClient,
|
||||||
|
}
|
||||||
|
|
||||||
|
auditSettlementCalc := auditSettlement.NewCalculator(
|
||||||
|
&auditSettlement.CalculatorPrm{
|
||||||
|
ResultStorage: auditCalcDeps,
|
||||||
|
ContainerStorage: auditCalcDeps,
|
||||||
|
PlacementCalculator: auditCalcDeps,
|
||||||
|
SGStorage: auditCalcDeps,
|
||||||
|
AccountStorage: auditCalcDeps,
|
||||||
|
Exchanger: auditCalcDeps,
|
||||||
|
AuditFeeFetcher: s.netmapClient,
|
||||||
|
},
|
||||||
|
auditSettlement.WithLogger(s.log),
|
||||||
|
)
|
||||||
|
|
||||||
|
// create settlement processor
|
||||||
|
return settlement.New(
|
||||||
|
settlement.Prm{
|
||||||
|
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
|
||||||
|
BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps},
|
||||||
|
State: s,
|
||||||
|
},
|
||||||
|
settlement.WithLogger(s.log),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Client, irf irFetcher) (event.Handler, error) {
|
||||||
|
var alphaSync event.Handler
|
||||||
|
|
||||||
|
if s.withoutMainNet || cfg.GetBool("governance.disable") {
|
||||||
|
alphaSync = func(event.Event) {
|
||||||
|
s.log.Debug("alphabet keys sync is disabled")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// create governance processor
|
||||||
|
governanceProcessor, err := governance.New(&governance.Params{
|
||||||
|
Log: s.log,
|
||||||
|
FrostFSClient: frostfsCli,
|
||||||
|
NetmapClient: s.netmapClient,
|
||||||
|
AlphabetState: s,
|
||||||
|
EpochState: s,
|
||||||
|
Voter: s,
|
||||||
|
IRFetcher: irf,
|
||||||
|
MorphClient: s.morphClient,
|
||||||
|
MainnetClient: s.mainnetClient,
|
||||||
|
NotaryDisabled: s.sideNotaryConfig.disabled,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
alphaSync = governanceProcessor.HandleAlphabetSync
|
||||||
|
err = bindMainnetProcessor(governanceProcessor, s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return alphaSync, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) createIRFetcher() irFetcher {
|
||||||
|
var irf irFetcher
|
||||||
|
|
||||||
|
if s.withoutMainNet || !s.mainNotaryConfig.disabled {
|
||||||
|
// if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs
|
||||||
|
// (naming `...WithNotary` will not always be correct)
|
||||||
|
irf = NewIRFetcherWithNotary(s.morphClient)
|
||||||
|
} else {
|
||||||
|
irf = NewIRFetcherWithoutNotary(s.netmapClient)
|
||||||
|
}
|
||||||
|
|
||||||
|
return irf
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initTimers(cfg *viper.Viper, processors *serverProcessors, morphClients *serverMorphClients) {
|
||||||
|
s.epochTimer = newEpochTimer(&epochTimerArgs{
|
||||||
|
l: s.log,
|
||||||
|
newEpochHandlers: s.newEpochTickHandlers(),
|
||||||
|
cnrWrapper: morphClients.CnrClient,
|
||||||
|
epoch: s,
|
||||||
|
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),
|
||||||
|
stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"),
|
||||||
|
collectBasicIncome: subEpochEventHandler{
|
||||||
|
handler: processors.SettlementProcessor.HandleIncomeCollectionEvent,
|
||||||
|
durationMul: cfg.GetUint32("timers.collect_basic_income.mul"),
|
||||||
|
durationDiv: cfg.GetUint32("timers.collect_basic_income.div"),
|
||||||
|
},
|
||||||
|
distributeBasicIncome: subEpochEventHandler{
|
||||||
|
handler: processors.SettlementProcessor.HandleIncomeDistributionEvent,
|
||||||
|
durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"),
|
||||||
|
durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
s.addBlockTimer(s.epochTimer)
|
||||||
|
|
||||||
|
// initialize emission timer
|
||||||
|
emissionTimer := newEmissionTimer(&emitTimerArgs{
|
||||||
|
ap: processors.AlphabetProcessor,
|
||||||
|
emitDuration: cfg.GetUint32("timers.emit"),
|
||||||
|
})
|
||||||
|
|
||||||
|
s.addBlockTimer(emissionTimer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) createMorphSubnetClient() (*morphsubnet.Client, error) {
|
||||||
|
// initialize morph client of Subnet contract
|
||||||
|
clientMode := morphsubnet.NotaryAlphabet
|
||||||
|
|
||||||
|
if s.sideNotaryConfig.disabled {
|
||||||
|
clientMode = morphsubnet.NonNotary
|
||||||
|
}
|
||||||
|
|
||||||
|
subnetInitPrm := morphsubnet.InitPrm{}
|
||||||
|
subnetInitPrm.SetBaseClient(s.morphClient)
|
||||||
|
subnetInitPrm.SetContractAddress(s.contracts.subnet)
|
||||||
|
subnetInitPrm.SetMode(clientMode)
|
||||||
|
|
||||||
|
subnetClient := &morphsubnet.Client{}
|
||||||
|
err := subnetClient.Init(subnetInitPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("could not initialize subnet client: %w", err)
|
||||||
|
}
|
||||||
|
return subnetClient, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initAlphabetProcessor(cfg *viper.Viper) (*alphabet.Processor, error) {
|
||||||
|
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// create alphabet processor
|
||||||
|
alphabetProcessor, err := alphabet.New(&alphabet.Params{
|
||||||
|
ParsedWallets: parsedWallets,
|
||||||
|
Log: s.log,
|
||||||
|
PoolSize: cfg.GetInt("workers.alphabet"),
|
||||||
|
AlphabetContracts: s.contracts.alphabet,
|
||||||
|
NetmapClient: s.netmapClient,
|
||||||
|
MorphClient: s.morphClient,
|
||||||
|
IRList: s,
|
||||||
|
StorageEmission: cfg.GetUint64("emit.storage.amount"),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = bindMorphProcessor(alphabetProcessor, s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return alphabetProcessor, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.Client,
|
||||||
|
frostfsIDClient *frostfsid.Client, subnetClient *morphsubnet.Client) error {
|
||||||
|
// container processor
|
||||||
|
containerProcessor, err := cont.New(&cont.Params{
|
||||||
|
Log: s.log,
|
||||||
|
PoolSize: cfg.GetInt("workers.container"),
|
||||||
|
AlphabetState: s,
|
||||||
|
ContainerClient: cnrClient,
|
||||||
|
FrostFSIDClient: frostfsIDClient,
|
||||||
|
NetworkState: s.netmapClient,
|
||||||
|
NotaryDisabled: s.sideNotaryConfig.disabled,
|
||||||
|
SubnetClient: subnetClient,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bindMorphProcessor(containerProcessor, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClient.Client) error {
|
||||||
|
// create balance processor
|
||||||
|
balanceProcessor, err := balance.New(&balance.Params{
|
||||||
|
Log: s.log,
|
||||||
|
PoolSize: cfg.GetInt("workers.balance"),
|
||||||
|
FrostFSClient: frostfsCli,
|
||||||
|
BalanceSC: s.contracts.balance,
|
||||||
|
AlphabetState: s,
|
||||||
|
Converter: &s.precision,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bindMorphProcessor(balanceProcessor, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *frostfsid.Client) error {
|
||||||
|
if s.withoutMainNet {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
frostfsProcessor, err := frostfs.New(&frostfs.Params{
|
||||||
|
Log: s.log,
|
||||||
|
PoolSize: cfg.GetInt("workers.frostfs"),
|
||||||
|
FrostFSContract: s.contracts.frostfs,
|
||||||
|
FrostFSIDClient: frostfsIDClient,
|
||||||
|
BalanceClient: s.balanceClient,
|
||||||
|
NetmapClient: s.netmapClient,
|
||||||
|
MorphClient: s.morphClient,
|
||||||
|
EpochState: s,
|
||||||
|
AlphabetState: s,
|
||||||
|
Converter: &s.precision,
|
||||||
|
MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"),
|
||||||
|
MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"),
|
||||||
|
MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")),
|
||||||
|
GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bindMainnetProcessor(frostfsProcessor, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initReputationProcessor(cfg *viper.Viper, sidechainFee fixedn.Fixed8) error {
|
||||||
|
repClient, err := repClient.NewFromMorph(s.morphClient, s.contracts.reputation, sidechainFee, repClient.TryNotary(), repClient.AsAlphabet())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// create reputation processor
|
||||||
|
reputationProcessor, err := reputation.New(&reputation.Params{
|
||||||
|
Log: s.log,
|
||||||
|
PoolSize: cfg.GetInt("workers.reputation"),
|
||||||
|
EpochState: s,
|
||||||
|
AlphabetState: s,
|
||||||
|
ReputationWrapper: repClient,
|
||||||
|
ManagerBuilder: reputationcommon.NewManagerBuilder(
|
||||||
|
reputationcommon.ManagersPrm{
|
||||||
|
NetMapSource: s.netmapClient,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
NotaryDisabled: s.sideNotaryConfig.disabled,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bindMorphProcessor(reputationProcessor, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initGRPCServer(cfg *viper.Viper) error {
|
||||||
|
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
|
||||||
|
if controlSvcEndpoint == "" {
|
||||||
|
s.log.Info("no Control server endpoint specified, service is disabled")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
|
||||||
|
authKeys := make([][]byte, 0, len(authKeysStr))
|
||||||
|
|
||||||
|
for i := range authKeysStr {
|
||||||
|
key, err := hex.DecodeString(authKeysStr[i])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not parse Control authorized key %s: %w",
|
||||||
|
authKeysStr[i],
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
authKeys = append(authKeys, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
var p controlsrv.Prm
|
||||||
|
|
||||||
|
p.SetPrivateKey(*s.key)
|
||||||
|
p.SetHealthChecker(s)
|
||||||
|
|
||||||
|
controlSvc := controlsrv.New(p,
|
||||||
|
controlsrv.WithAllowedKeys(authKeys),
|
||||||
|
)
|
||||||
|
|
||||||
|
grpcControlSrv := grpc.NewServer()
|
||||||
|
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)
|
||||||
|
|
||||||
|
s.runners = append(s.runners, func(ch chan<- error) error {
|
||||||
|
lis, err := net.Listen("tcp", controlSvcEndpoint)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ch <- grpcControlSrv.Serve(lis)
|
||||||
|
}()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
s.registerNoErrCloser(grpcControlSrv.GracefulStop)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverMorphClients struct {
|
||||||
|
CnrClient *cntClient.Client
|
||||||
|
FrostFSIDClient *frostfsid.Client
|
||||||
|
FrostFSClient *frostfsClient.Client
|
||||||
|
MorphSubnetClient *morphsubnet.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initClientsFromMorph() (*serverMorphClients, error) {
|
||||||
|
result := &serverMorphClients{}
|
||||||
|
var err error
|
||||||
|
|
||||||
|
fee := s.feeConfig.SideChainFee()
|
||||||
|
// do not use TryNotary() in audit wrapper
|
||||||
|
// audit operations do not require multisignatures
|
||||||
|
s.auditClient, err = auditClient.NewFromMorph(s.morphClient, s.contracts.audit, fee)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// form morph container client's options
|
||||||
|
morphCnrOpts := make([]cntClient.Option, 0, 3)
|
||||||
|
morphCnrOpts = append(morphCnrOpts,
|
||||||
|
cntClient.TryNotary(),
|
||||||
|
cntClient.AsAlphabet(),
|
||||||
|
)
|
||||||
|
|
||||||
|
if s.sideNotaryConfig.disabled {
|
||||||
|
// in non-notary environments we customize fee for named container registration
|
||||||
|
// because it takes much more additional GAS than other operations.
|
||||||
|
morphCnrOpts = append(morphCnrOpts,
|
||||||
|
cntClient.WithCustomFeeForNamedPut(s.feeConfig.NamedContainerRegistrationFee()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
result.CnrClient, err = cntClient.NewFromMorph(s.morphClient, s.contracts.container, fee, morphCnrOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.netmapClient, err = nmClient.NewFromMorph(s.morphClient, s.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.balanceClient, err = balanceClient.NewFromMorph(s.morphClient, s.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.FrostFSIDClient, err = frostfsid.NewFromMorph(s.morphClient, s.contracts.frostfsID, fee, frostfsid.TryNotary(), frostfsid.AsAlphabet())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.FrostFSClient, err = frostfsClient.NewFromMorph(s.mainnetClient, s.contracts.frostfs,
|
||||||
|
s.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.MorphSubnetClient, err = s.createMorphSubnetClient()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type serverProcessors struct {
|
||||||
|
AlphabetProcessor *alphabet.Processor
|
||||||
|
SettlementProcessor *settlement.Processor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initProcessors(cfg *viper.Viper, morphClients *serverMorphClients) (*serverProcessors, error) {
|
||||||
|
result := &serverProcessors{}
|
||||||
|
|
||||||
|
fee := s.feeConfig.SideChainFee()
|
||||||
|
|
||||||
|
irf := s.createIRFetcher()
|
||||||
|
|
||||||
|
s.statusIndex = newInnerRingIndexer(
|
||||||
|
s.morphClient,
|
||||||
|
irf,
|
||||||
|
s.key.PublicKey(),
|
||||||
|
cfg.GetDuration("indexer.cache_timeout"),
|
||||||
|
)
|
||||||
|
|
||||||
|
clientCache := newClientCache(&clientCacheParams{
|
||||||
|
Log: s.log,
|
||||||
|
Key: &s.key.PrivateKey,
|
||||||
|
SGTimeout: cfg.GetDuration("audit.timeout.get"),
|
||||||
|
HeadTimeout: cfg.GetDuration("audit.timeout.head"),
|
||||||
|
RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"),
|
||||||
|
AllowExternal: cfg.GetBool("audit.allow_external"),
|
||||||
|
})
|
||||||
|
|
||||||
|
s.registerNoErrCloser(clientCache.cache.CloseAll)
|
||||||
|
|
||||||
|
// create audit processor
|
||||||
|
auditProcessor, err := s.createAuditProcessor(cfg, clientCache, morphClients.CnrClient)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.SettlementProcessor = s.createSettlementProcessor(clientCache, morphClients.CnrClient)
|
||||||
|
|
||||||
|
var alphaSync event.Handler
|
||||||
|
alphaSync, err = s.createAlphaSync(cfg, morphClients.FrostFSClient, irf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.initNetmapProcessor(cfg, morphClients.CnrClient, alphaSync, morphClients.MorphSubnetClient, auditProcessor, result.SettlementProcessor)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.initContainerProcessor(cfg, morphClients.CnrClient, morphClients.FrostFSIDClient, morphClients.MorphSubnetClient)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.initBalanceProcessor(cfg, morphClients.FrostFSClient)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.initFrostFSMainnetProcessor(cfg, morphClients.FrostFSIDClient)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.AlphabetProcessor, err = s.initAlphabetProcessor(cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.initReputationProcessor(cfg, fee)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<- error) (*chainParams, error) {
|
||||||
|
fromSideChainBlock, err := s.persistate.UInt32(persistateSideChainLastBlockKey)
|
||||||
|
if err != nil {
|
||||||
|
fromSideChainBlock = 0
|
||||||
|
s.log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
morphChain := &chainParams{
|
||||||
|
log: s.log,
|
||||||
|
cfg: cfg,
|
||||||
|
key: s.key,
|
||||||
|
name: morphPrefix,
|
||||||
|
from: fromSideChainBlock,
|
||||||
|
}
|
||||||
|
|
||||||
|
// create morph client
|
||||||
|
s.morphClient, err = createClient(ctx, morphChain, errChan)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// create morph listener
|
||||||
|
s.morphListener, err = createListener(ctx, s.morphClient, morphChain)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := s.morphClient.SetGroupSignerScope(); err != nil {
|
||||||
|
morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return morphChain, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initContracts(cfg *viper.Viper) error {
|
||||||
|
var err error
|
||||||
|
// get all script hashes of contracts
|
||||||
|
s.contracts, err = parseContracts(
|
||||||
|
cfg,
|
||||||
|
s.morphClient,
|
||||||
|
s.withoutMainNet,
|
||||||
|
s.mainNotaryConfig.disabled,
|
||||||
|
s.sideNotaryConfig.disabled,
|
||||||
|
)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initKey(cfg *viper.Viper) error {
|
||||||
|
// prepare inner ring node private key
|
||||||
|
acc, err := utilConfig.LoadAccount(
|
||||||
|
cfg.GetString("wallet.path"),
|
||||||
|
cfg.GetString("wallet.address"),
|
||||||
|
cfg.GetString("wallet.password"))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("ir: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.key = acc.PrivateKey()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) initMetrics(cfg *viper.Viper) {
|
||||||
|
if cfg.GetString("prometheus.address") == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m := metrics.NewInnerRingMetrics()
|
||||||
|
s.metrics = &m
|
||||||
|
}
|
|
@ -2,47 +2,23 @@ package innerring
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/config"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/alphabet"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/audit"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/balance"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/container"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/frostfs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/governance"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap"
|
||||||
nodevalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation"
|
|
||||||
addrvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/maddress"
|
|
||||||
statevalidation "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
|
||||||
subnetvalidator "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/subnet"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/reputation"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement"
|
|
||||||
auditSettlement "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/settlement/audit"
|
|
||||||
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit"
|
auditClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/audit"
|
||||||
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
||||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
|
||||||
frostfsClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
|
||||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
repClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/reputation"
|
|
||||||
morphsubnet "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/subnet"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/subscriber"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/timer"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/timer"
|
||||||
audittask "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/audit/taskmanager"
|
|
||||||
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
|
||||||
controlsrv "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir/server"
|
|
||||||
reputationcommon "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/reputation/common"
|
|
||||||
util2 "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
|
||||||
utilConfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/config"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/precision"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/precision"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/state"
|
||||||
|
@ -50,13 +26,10 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
"github.com/panjf2000/ants/v2"
|
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"google.golang.org/grpc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
@ -354,8 +327,6 @@ func (s *Server) registerStarter(f func() error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates instance of inner ring sever structure.
|
// New creates instance of inner ring sever structure.
|
||||||
//
|
|
||||||
// nolint: funlen, gocognit
|
|
||||||
func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan<- error) (*Server, error) {
|
func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan<- error) (*Server, error) {
|
||||||
var err error
|
var err error
|
||||||
server := &Server{log: log}
|
server := &Server{log: log}
|
||||||
|
@ -365,128 +336,38 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
||||||
// parse notary support
|
// parse notary support
|
||||||
server.feeConfig = config.NewFeeConfig(cfg)
|
server.feeConfig = config.NewFeeConfig(cfg)
|
||||||
|
|
||||||
// prepare inner ring node private key
|
err = server.initKey(cfg)
|
||||||
acc, err := utilConfig.LoadAccount(
|
|
||||||
cfg.GetString("wallet.path"),
|
|
||||||
cfg.GetString("wallet.address"),
|
|
||||||
cfg.GetString("wallet.password"))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("ir: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
server.key = acc.PrivateKey()
|
|
||||||
|
|
||||||
server.persistate, err = initPersistentStateStorage(cfg)
|
server.persistate, err = initPersistentStateStorage(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
server.registerCloser(server.persistate.Close)
|
server.registerCloser(server.persistate.Close)
|
||||||
|
|
||||||
fromSideChainBlock, err := server.persistate.UInt32(persistateSideChainLastBlockKey)
|
var morphChain *chainParams
|
||||||
if err != nil {
|
morphChain, err = server.initMorph(ctx, cfg, errChan)
|
||||||
fromSideChainBlock = 0
|
|
||||||
log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
|
|
||||||
}
|
|
||||||
|
|
||||||
morphChain := &chainParams{
|
|
||||||
log: log,
|
|
||||||
cfg: cfg,
|
|
||||||
key: server.key,
|
|
||||||
name: morphPrefix,
|
|
||||||
from: fromSideChainBlock,
|
|
||||||
}
|
|
||||||
|
|
||||||
// create morph client
|
|
||||||
server.morphClient, err = createClient(ctx, morphChain, errChan)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// create morph listener
|
err = server.initMainnet(ctx, cfg, morphChain, errChan)
|
||||||
server.morphListener, err = createListener(ctx, server.morphClient, morphChain)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err := server.morphClient.SetGroupSignerScope(); err != nil {
|
|
||||||
morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
server.withoutMainNet = cfg.GetBool("without_mainnet")
|
|
||||||
|
|
||||||
if server.withoutMainNet {
|
|
||||||
// This works as long as event Listener starts listening loop once,
|
|
||||||
// otherwise Server.Start will run two similar routines.
|
|
||||||
// This behavior most likely will not change.
|
|
||||||
server.mainnetListener = server.morphListener
|
|
||||||
server.mainnetClient = server.morphClient
|
|
||||||
} else {
|
|
||||||
mainnetChain := morphChain
|
|
||||||
mainnetChain.name = mainnetPrefix
|
|
||||||
mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry}
|
|
||||||
|
|
||||||
fromMainChainBlock, err := server.persistate.UInt32(persistateMainChainLastBlockKey)
|
|
||||||
if err != nil {
|
|
||||||
fromMainChainBlock = 0
|
|
||||||
log.Warn("can't get last processed main chain block number", zap.String("error", err.Error()))
|
|
||||||
}
|
|
||||||
mainnetChain.from = fromMainChainBlock
|
|
||||||
|
|
||||||
// create mainnet client
|
|
||||||
server.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// create mainnet listener
|
server.initNotaryConfig(cfg)
|
||||||
server.mainnetListener, err = createListener(ctx, server.mainnetClient, mainnetChain)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
server.mainNotaryConfig, server.sideNotaryConfig = notaryConfigs(
|
err = server.initContracts(cfg)
|
||||||
server.morphClient.ProbeNotary(),
|
|
||||||
!server.withoutMainNet && server.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
|
|
||||||
)
|
|
||||||
|
|
||||||
log.Info("notary support",
|
|
||||||
zap.Bool("sidechain_enabled", !server.sideNotaryConfig.disabled),
|
|
||||||
zap.Bool("mainchain_enabled", !server.mainNotaryConfig.disabled),
|
|
||||||
)
|
|
||||||
|
|
||||||
// get all script hashes of contracts
|
|
||||||
server.contracts, err = parseContracts(
|
|
||||||
cfg,
|
|
||||||
server.morphClient,
|
|
||||||
server.withoutMainNet,
|
|
||||||
server.mainNotaryConfig.disabled,
|
|
||||||
server.sideNotaryConfig.disabled,
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if !server.sideNotaryConfig.disabled {
|
err = server.enableNotarySupport()
|
||||||
// enable notary support in the side client
|
|
||||||
err = server.morphClient.EnableNotarySupport(
|
|
||||||
client.WithProxyContract(server.contracts.proxy),
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not enable side chain notary support: %w", err)
|
return nil, err
|
||||||
}
|
|
||||||
|
|
||||||
server.morphListener.EnableNotarySupport(server.contracts.proxy, server.morphClient.Committee, server.morphClient)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !server.mainNotaryConfig.disabled {
|
|
||||||
// enable notary support in the main client
|
|
||||||
err = server.mainnetClient.EnableNotarySupport(
|
|
||||||
client.WithProxyContract(server.contracts.processing),
|
|
||||||
client.WithAlphabetSource(server.morphClient.Committee),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not enable main chain notary support: %w", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse default validators
|
// parse default validators
|
||||||
|
@ -497,482 +378,30 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan
|
||||||
|
|
||||||
server.pubKey = server.key.PublicKey().Bytes()
|
server.pubKey = server.key.PublicKey().Bytes()
|
||||||
|
|
||||||
auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size"))
|
var morphClients *serverMorphClients
|
||||||
|
morphClients, err = server.initClientsFromMorph()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fee := server.feeConfig.SideChainFee()
|
var processors *serverProcessors
|
||||||
|
processors, err = server.initProcessors(cfg, morphClients)
|
||||||
// do not use TryNotary() in audit wrapper
|
|
||||||
// audit operations do not require multisignatures
|
|
||||||
server.auditClient, err = auditClient.NewFromMorph(server.morphClient, server.contracts.audit, fee)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// form morph container client's options
|
server.initTimers(cfg, processors, morphClients)
|
||||||
morphCnrOpts := make([]cntClient.Option, 0, 3)
|
|
||||||
morphCnrOpts = append(morphCnrOpts,
|
|
||||||
cntClient.TryNotary(),
|
|
||||||
cntClient.AsAlphabet(),
|
|
||||||
)
|
|
||||||
|
|
||||||
if server.sideNotaryConfig.disabled {
|
err = server.initGRPCServer(cfg)
|
||||||
// in non-notary environments we customize fee for named container registration
|
|
||||||
// because it takes much more additional GAS than other operations.
|
|
||||||
morphCnrOpts = append(morphCnrOpts,
|
|
||||||
cntClient.WithCustomFeeForNamedPut(server.feeConfig.NamedContainerRegistrationFee()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, fee, morphCnrOpts...)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
server.netmapClient, err = nmClient.NewFromMorph(server.morphClient, server.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
server.balanceClient, err = balanceClient.NewFromMorph(server.morphClient, server.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
repClient, err := repClient.NewFromMorph(server.morphClient, server.contracts.reputation, fee, repClient.TryNotary(), repClient.AsAlphabet())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
frostfsIDClient, err := frostfsid.NewFromMorph(server.morphClient, server.contracts.frostfsID, fee, frostfsid.TryNotary(), frostfsid.AsAlphabet())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
frostfsCli, err := frostfsClient.NewFromMorph(server.mainnetClient, server.contracts.frostfs,
|
|
||||||
server.feeConfig.MainChainFee(), frostfsClient.TryNotary(), frostfsClient.AsAlphabet())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// initialize morph client of Subnet contract
|
|
||||||
clientMode := morphsubnet.NotaryAlphabet
|
|
||||||
|
|
||||||
if server.sideNotaryConfig.disabled {
|
|
||||||
clientMode = morphsubnet.NonNotary
|
|
||||||
}
|
|
||||||
|
|
||||||
subnetInitPrm := morphsubnet.InitPrm{}
|
|
||||||
subnetInitPrm.SetBaseClient(server.morphClient)
|
|
||||||
subnetInitPrm.SetContractAddress(server.contracts.subnet)
|
|
||||||
subnetInitPrm.SetMode(clientMode)
|
|
||||||
|
|
||||||
subnetClient := &morphsubnet.Client{}
|
|
||||||
err = subnetClient.Init(subnetInitPrm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not initialize subnet client: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var irf irFetcher
|
|
||||||
|
|
||||||
if server.withoutMainNet || !server.mainNotaryConfig.disabled {
|
|
||||||
// if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs
|
|
||||||
// (naming `...WithNotary` will not always be correct)
|
|
||||||
irf = NewIRFetcherWithNotary(server.morphClient)
|
|
||||||
} else {
|
|
||||||
irf = NewIRFetcherWithoutNotary(server.netmapClient)
|
|
||||||
}
|
|
||||||
|
|
||||||
server.statusIndex = newInnerRingIndexer(
|
|
||||||
server.morphClient,
|
|
||||||
irf,
|
|
||||||
server.key.PublicKey(),
|
|
||||||
cfg.GetDuration("indexer.cache_timeout"),
|
|
||||||
)
|
|
||||||
|
|
||||||
clientCache := newClientCache(&clientCacheParams{
|
|
||||||
Log: log,
|
|
||||||
Key: &server.key.PrivateKey,
|
|
||||||
SGTimeout: cfg.GetDuration("audit.timeout.get"),
|
|
||||||
HeadTimeout: cfg.GetDuration("audit.timeout.head"),
|
|
||||||
RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"),
|
|
||||||
AllowExternal: cfg.GetBool("audit.allow_external"),
|
|
||||||
})
|
|
||||||
|
|
||||||
server.registerNoErrCloser(clientCache.cache.CloseAll)
|
|
||||||
|
|
||||||
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
|
|
||||||
porPoolSize := cfg.GetInt("audit.por.pool_size")
|
|
||||||
|
|
||||||
// create audit processor dependencies
|
|
||||||
auditTaskManager := audittask.New(
|
|
||||||
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
|
|
||||||
audittask.WithWorkerPool(auditPool),
|
|
||||||
audittask.WithLogger(log),
|
|
||||||
audittask.WithContainerCommunicator(clientCache),
|
|
||||||
audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")),
|
|
||||||
audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) {
|
|
||||||
return ants.NewPool(pdpPoolSize)
|
|
||||||
}),
|
|
||||||
audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) {
|
|
||||||
return ants.NewPool(porPoolSize)
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
|
|
||||||
server.workers = append(server.workers, auditTaskManager.Listen)
|
|
||||||
|
|
||||||
// create audit processor
|
|
||||||
auditProcessor, err := audit.New(&audit.Params{
|
|
||||||
Log: log,
|
|
||||||
NetmapClient: server.netmapClient,
|
|
||||||
ContainerClient: cnrClient,
|
|
||||||
IRList: server,
|
|
||||||
EpochSource: server,
|
|
||||||
SGSource: clientCache,
|
|
||||||
Key: &server.key.PrivateKey,
|
|
||||||
RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"),
|
|
||||||
TaskManager: auditTaskManager,
|
|
||||||
Reporter: server,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// create settlement processor dependencies
|
|
||||||
settlementDeps := settlementDeps{
|
|
||||||
log: server.log,
|
|
||||||
cnrSrc: cntClient.AsContainerSource(cnrClient),
|
|
||||||
auditClient: server.auditClient,
|
|
||||||
nmClient: server.netmapClient,
|
|
||||||
clientCache: clientCache,
|
|
||||||
balanceClient: server.balanceClient,
|
|
||||||
}
|
|
||||||
|
|
||||||
settlementDeps.settlementCtx = auditSettlementContext
|
|
||||||
auditCalcDeps := &auditSettlementDeps{
|
|
||||||
settlementDeps: settlementDeps,
|
|
||||||
}
|
|
||||||
|
|
||||||
settlementDeps.settlementCtx = basicIncomeSettlementContext
|
|
||||||
basicSettlementDeps := &basicIncomeSettlementDeps{
|
|
||||||
settlementDeps: settlementDeps,
|
|
||||||
cnrClient: cnrClient,
|
|
||||||
}
|
|
||||||
|
|
||||||
auditSettlementCalc := auditSettlement.NewCalculator(
|
|
||||||
&auditSettlement.CalculatorPrm{
|
|
||||||
ResultStorage: auditCalcDeps,
|
|
||||||
ContainerStorage: auditCalcDeps,
|
|
||||||
PlacementCalculator: auditCalcDeps,
|
|
||||||
SGStorage: auditCalcDeps,
|
|
||||||
AccountStorage: auditCalcDeps,
|
|
||||||
Exchanger: auditCalcDeps,
|
|
||||||
AuditFeeFetcher: server.netmapClient,
|
|
||||||
},
|
|
||||||
auditSettlement.WithLogger(server.log),
|
|
||||||
)
|
|
||||||
|
|
||||||
// create settlement processor
|
|
||||||
settlementProcessor := settlement.New(
|
|
||||||
settlement.Prm{
|
|
||||||
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
|
|
||||||
BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps},
|
|
||||||
State: server,
|
|
||||||
},
|
|
||||||
settlement.WithLogger(server.log),
|
|
||||||
)
|
|
||||||
|
|
||||||
locodeValidator, err := server.newLocodeValidator(cfg)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
subnetValidator, err := subnetvalidator.New(
|
|
||||||
subnetvalidator.Prm{
|
|
||||||
SubnetClient: subnetClient,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var alphaSync event.Handler
|
|
||||||
|
|
||||||
if server.withoutMainNet || cfg.GetBool("governance.disable") {
|
|
||||||
alphaSync = func(event.Event) {
|
|
||||||
log.Debug("alphabet keys sync is disabled")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// create governance processor
|
|
||||||
governanceProcessor, err := governance.New(&governance.Params{
|
|
||||||
Log: log,
|
|
||||||
FrostFSClient: frostfsCli,
|
|
||||||
NetmapClient: server.netmapClient,
|
|
||||||
AlphabetState: server,
|
|
||||||
EpochState: server,
|
|
||||||
Voter: server,
|
|
||||||
IRFetcher: irf,
|
|
||||||
MorphClient: server.morphClient,
|
|
||||||
MainnetClient: server.mainnetClient,
|
|
||||||
NotaryDisabled: server.sideNotaryConfig.disabled,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
alphaSync = governanceProcessor.HandleAlphabetSync
|
|
||||||
err = bindMainnetProcessor(governanceProcessor, server)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
netSettings := (*networkSettings)(server.netmapClient)
|
|
||||||
|
|
||||||
var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator
|
|
||||||
netMapCandidateStateValidator.SetNetworkSettings(netSettings)
|
|
||||||
|
|
||||||
// create netmap processor
|
|
||||||
server.netmapProcessor, err = netmap.New(&netmap.Params{
|
|
||||||
Log: log,
|
|
||||||
PoolSize: cfg.GetInt("workers.netmap"),
|
|
||||||
NetmapClient: server.netmapClient,
|
|
||||||
EpochTimer: server,
|
|
||||||
EpochState: server,
|
|
||||||
AlphabetState: server,
|
|
||||||
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
|
|
||||||
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
|
|
||||||
ContainerWrapper: cnrClient,
|
|
||||||
HandleAudit: server.onlyActiveEventHandler(
|
|
||||||
auditProcessor.StartAuditHandler(),
|
|
||||||
),
|
|
||||||
NotaryDepositHandler: server.onlyAlphabetEventHandler(
|
|
||||||
server.notaryHandler,
|
|
||||||
),
|
|
||||||
AuditSettlementsHandler: server.onlyAlphabetEventHandler(
|
|
||||||
settlementProcessor.HandleAuditEvent,
|
|
||||||
),
|
|
||||||
AlphabetSyncHandler: alphaSync,
|
|
||||||
NodeValidator: nodevalidator.New(
|
|
||||||
&netMapCandidateStateValidator,
|
|
||||||
addrvalidator.New(),
|
|
||||||
locodeValidator,
|
|
||||||
subnetValidator,
|
|
||||||
),
|
|
||||||
NotaryDisabled: server.sideNotaryConfig.disabled,
|
|
||||||
SubnetContract: &server.contracts.subnet,
|
|
||||||
|
|
||||||
NodeStateSettings: netSettings,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = bindMorphProcessor(server.netmapProcessor, server)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// container processor
|
|
||||||
containerProcessor, err := container.New(&container.Params{
|
|
||||||
Log: log,
|
|
||||||
PoolSize: cfg.GetInt("workers.container"),
|
|
||||||
AlphabetState: server,
|
|
||||||
ContainerClient: cnrClient,
|
|
||||||
FrostFSIDClient: frostfsIDClient,
|
|
||||||
NetworkState: server.netmapClient,
|
|
||||||
NotaryDisabled: server.sideNotaryConfig.disabled,
|
|
||||||
SubnetClient: subnetClient,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = bindMorphProcessor(containerProcessor, server)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// create balance processor
|
|
||||||
balanceProcessor, err := balance.New(&balance.Params{
|
|
||||||
Log: log,
|
|
||||||
PoolSize: cfg.GetInt("workers.balance"),
|
|
||||||
FrostFSClient: frostfsCli,
|
|
||||||
BalanceSC: server.contracts.balance,
|
|
||||||
AlphabetState: server,
|
|
||||||
Converter: &server.precision,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = bindMorphProcessor(balanceProcessor, server)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !server.withoutMainNet {
|
|
||||||
// create mainnnet frostfs processor
|
|
||||||
frostfsProcessor, err := frostfs.New(&frostfs.Params{
|
|
||||||
Log: log,
|
|
||||||
PoolSize: cfg.GetInt("workers.frostfs"),
|
|
||||||
FrostFSContract: server.contracts.frostfs,
|
|
||||||
FrostFSIDClient: frostfsIDClient,
|
|
||||||
BalanceClient: server.balanceClient,
|
|
||||||
NetmapClient: server.netmapClient,
|
|
||||||
MorphClient: server.morphClient,
|
|
||||||
EpochState: server,
|
|
||||||
AlphabetState: server,
|
|
||||||
Converter: &server.precision,
|
|
||||||
MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"),
|
|
||||||
MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"),
|
|
||||||
MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")),
|
|
||||||
GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = bindMainnetProcessor(frostfsProcessor, server)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// create alphabet processor
|
|
||||||
alphabetProcessor, err := alphabet.New(&alphabet.Params{
|
|
||||||
ParsedWallets: parsedWallets,
|
|
||||||
Log: log,
|
|
||||||
PoolSize: cfg.GetInt("workers.alphabet"),
|
|
||||||
AlphabetContracts: server.contracts.alphabet,
|
|
||||||
NetmapClient: server.netmapClient,
|
|
||||||
MorphClient: server.morphClient,
|
|
||||||
IRList: server,
|
|
||||||
StorageEmission: cfg.GetUint64("emit.storage.amount"),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = bindMorphProcessor(alphabetProcessor, server)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// create reputation processor
|
|
||||||
reputationProcessor, err := reputation.New(&reputation.Params{
|
|
||||||
Log: log,
|
|
||||||
PoolSize: cfg.GetInt("workers.reputation"),
|
|
||||||
EpochState: server,
|
|
||||||
AlphabetState: server,
|
|
||||||
ReputationWrapper: repClient,
|
|
||||||
ManagerBuilder: reputationcommon.NewManagerBuilder(
|
|
||||||
reputationcommon.ManagersPrm{
|
|
||||||
NetMapSource: server.netmapClient,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
NotaryDisabled: server.sideNotaryConfig.disabled,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = bindMorphProcessor(reputationProcessor, server)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// initialize epoch timers
|
|
||||||
server.epochTimer = newEpochTimer(&epochTimerArgs{
|
|
||||||
l: server.log,
|
|
||||||
newEpochHandlers: server.newEpochTickHandlers(),
|
|
||||||
cnrWrapper: cnrClient,
|
|
||||||
epoch: server,
|
|
||||||
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),
|
|
||||||
stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"),
|
|
||||||
collectBasicIncome: subEpochEventHandler{
|
|
||||||
handler: settlementProcessor.HandleIncomeCollectionEvent,
|
|
||||||
durationMul: cfg.GetUint32("timers.collect_basic_income.mul"),
|
|
||||||
durationDiv: cfg.GetUint32("timers.collect_basic_income.div"),
|
|
||||||
},
|
|
||||||
distributeBasicIncome: subEpochEventHandler{
|
|
||||||
handler: settlementProcessor.HandleIncomeDistributionEvent,
|
|
||||||
durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"),
|
|
||||||
durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"),
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
server.addBlockTimer(server.epochTimer)
|
|
||||||
|
|
||||||
// initialize emission timer
|
|
||||||
emissionTimer := newEmissionTimer(&emitTimerArgs{
|
|
||||||
ap: alphabetProcessor,
|
|
||||||
emitDuration: cfg.GetUint32("timers.emit"),
|
|
||||||
})
|
|
||||||
|
|
||||||
server.addBlockTimer(emissionTimer)
|
|
||||||
|
|
||||||
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
|
|
||||||
if controlSvcEndpoint != "" {
|
|
||||||
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
|
|
||||||
authKeys := make([][]byte, 0, len(authKeysStr))
|
|
||||||
|
|
||||||
for i := range authKeysStr {
|
|
||||||
key, err := hex.DecodeString(authKeysStr[i])
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
|
|
||||||
authKeysStr[i],
|
|
||||||
err,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
authKeys = append(authKeys, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
var p controlsrv.Prm
|
|
||||||
|
|
||||||
p.SetPrivateKey(*server.key)
|
|
||||||
p.SetHealthChecker(server)
|
|
||||||
|
|
||||||
controlSvc := controlsrv.New(p,
|
|
||||||
controlsrv.WithAllowedKeys(authKeys),
|
|
||||||
)
|
|
||||||
|
|
||||||
grpcControlSrv := grpc.NewServer()
|
|
||||||
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)
|
|
||||||
|
|
||||||
server.runners = append(server.runners, func(ch chan<- error) error {
|
|
||||||
lis, err := net.Listen("tcp", controlSvcEndpoint)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
ch <- grpcControlSrv.Serve(lis)
|
|
||||||
}()
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
server.registerNoErrCloser(grpcControlSrv.GracefulStop)
|
|
||||||
} else {
|
|
||||||
log.Info("no Control server endpoint specified, service is disabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
server.initSubnet(subnetConfig{
|
server.initSubnet(subnetConfig{
|
||||||
queueSize: cfg.GetUint32("workers.subnet"),
|
queueSize: cfg.GetUint32("workers.subnet"),
|
||||||
})
|
})
|
||||||
|
|
||||||
if cfg.GetString("prometheus.address") != "" {
|
server.initMetrics(cfg)
|
||||||
m := metrics.NewInnerRingMetrics()
|
|
||||||
server.metrics = &m
|
|
||||||
}
|
|
||||||
|
|
||||||
return server, nil
|
return server, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue