frostfs-node/pkg/innerring/innerring.go
Leonard Lyubich f64ae55806 [#1681] ir/netmap: Require MAINTENANCE mode to be allowed by network
There is a need to prevent limitless abuse of MAINTENANCE status of the
storage nodes. To do this, configuration of the NeoFS network is going
to be extended with the flag which allows the state. Until this is done,
it makes sense to prepare a site for this in the code.

Define `state.NetworkSettings` interface as an abstraction of global
network configuration within the `state` package. Make
`NetMapCandidateValidator` to depend on `NetworkSettings` and provide
corresponding field setter. Change `VerifyAndUpdate` method's behavior
to return an error for candidates with MAINTENANCE state if this state
is disallowed by the network configuration. Provide `NetworkSettings`
from the wrapper over Netmap contract's client on Inner Ring application
side. The provider is implemented to statically disallow MAINTENANCE
mode in order to save previous behavior.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
Signed-off-by: Leonard Lyubich <ctulhurider@gmail.com>
2022-10-05 11:41:49 +03:00

1118 lines
32 KiB
Go

package innerring
import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neofs-node/pkg/innerring/config"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/alphabet"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/balance"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/container"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/governance"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/neofs"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap"
nodevalidator "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap/nodevalidation"
addrvalidator "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap/nodevalidation/maddress"
statevalidation "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap/nodevalidation/state"
subnetvalidator "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/netmap/nodevalidation/subnet"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/reputation"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement"
auditSettlement "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement/audit"
timerEvent "github.com/nspcc-dev/neofs-node/pkg/innerring/timers"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
auditClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/audit"
balanceClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/balance"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
neofsClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/neofs"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/neofsid"
nmClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap"
repClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/reputation"
morphsubnet "github.com/nspcc-dev/neofs-node/pkg/morph/client/subnet"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
"github.com/nspcc-dev/neofs-node/pkg/morph/subscriber"
"github.com/nspcc-dev/neofs-node/pkg/morph/timer"
audittask "github.com/nspcc-dev/neofs-node/pkg/services/audit/taskmanager"
control "github.com/nspcc-dev/neofs-node/pkg/services/control/ir"
controlsrv "github.com/nspcc-dev/neofs-node/pkg/services/control/ir/server"
reputationcommon "github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
utilConfig "github.com/nspcc-dev/neofs-node/pkg/util/config"
"github.com/nspcc-dev/neofs-node/pkg/util/precision"
"github.com/nspcc-dev/neofs-node/pkg/util/state"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper"
"go.uber.org/atomic"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type (
// Server is the inner ring application structure, that contains all event
// processors, shared variables and event handlers.
Server struct {
log *zap.Logger
// event producers
morphListener event.Listener
mainnetListener event.Listener
blockTimers []*timer.BlockTimer
epochTimer *timer.BlockTimer
// global state
morphClient *client.Client
mainnetClient *client.Client
epochCounter atomic.Uint64
epochDuration atomic.Uint64
statusIndex *innerRingIndexer
precision precision.Fixed8Converter
auditClient *auditClient.Client
healthStatus atomic.Value
balanceClient *balanceClient.Client
netmapClient *nmClient.Client
persistate *state.PersistentStorage
// metrics
metrics *metrics.InnerRingServiceMetrics
// notary configuration
feeConfig *config.FeeConfig
mainNotaryConfig *notaryConfig
sideNotaryConfig *notaryConfig
// internal variables
key *keys.PrivateKey
pubKey []byte
contracts *contracts
predefinedValidators keys.PublicKeys
initialEpochTickDelta uint32
withoutMainNet bool
// runtime processors
netmapProcessor *netmap.Processor
workers []func(context.Context)
// Set of local resources that must be
// initialized at the very beginning of
// Server's work, (e.g. opening files).
//
// If any starter returns an error, Server's
// starting fails immediately.
starters []func() error
// Set of local resources that must be
// released at Server's work completion
// (e.g closing files).
//
// Closer's wrong outcome shouldn't be critical.
//
// Errors are logged.
closers []func() error
// Set of component runners which
// should report start errors
// to the application.
runners []func(chan<- error) error
subnetHandler
}
chainParams struct {
log *zap.Logger
cfg *viper.Viper
key *keys.PrivateKey
name string
sgn *transaction.Signer
from uint32 // block height
}
)
const (
morphPrefix = "morph"
mainnetPrefix = "mainnet"
// extra blocks to overlap two deposits, we do that to make sure that
// there won't be any blocks without deposited assets in notary contract;
// make sure it is bigger than any extra rounding value in notary client.
notaryExtraBlocks = 300
// amount of tries before notary deposit timeout.
notaryDepositTimeout = 100
)
var (
errDepositTimeout = errors.New("notary deposit didn't appear in the network")
errDepositFail = errors.New("notary tx has faulted")
)
// Start runs all event providers.
func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
s.setHealthStatus(control.HealthStatus_STARTING)
defer func() {
if err == nil {
s.setHealthStatus(control.HealthStatus_READY)
}
}()
for _, starter := range s.starters {
if err := starter(); err != nil {
return err
}
}
err = s.initConfigFromBlockchain()
if err != nil {
return err
}
if !s.mainNotaryConfig.disabled {
err = s.initNotary(ctx,
s.depositMainNotary,
s.awaitMainNotaryDeposit,
"waiting to accept main notary deposit",
)
if err != nil {
return err
}
}
if !s.sideNotaryConfig.disabled {
err = s.initNotary(ctx,
s.depositSideNotary,
s.awaitSideNotaryDeposit,
"waiting to accept side notary deposit",
)
if err != nil {
return err
}
}
prm := governance.VoteValidatorPrm{}
prm.Validators = s.predefinedValidators
// vote for sidechain validator if it is prepared in config
err = s.voteForSidechainValidator(prm)
if err != nil {
// we don't stop inner ring execution on this error
s.log.Warn("can't vote for prepared validators",
zap.String("error", err.Error()))
}
// tick initial epoch
initialEpochTicker := timer.NewOneTickTimer(
timer.StaticBlockMeter(s.initialEpochTickDelta),
func() {
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
})
s.addBlockTimer(initialEpochTicker)
morphErr := make(chan error)
mainnnetErr := make(chan error)
// anonymous function to multiplex error channels
go func() {
select {
case <-ctx.Done():
return
case err := <-morphErr:
intError <- fmt.Errorf("sidechain: %w", err)
case err := <-mainnnetErr:
intError <- fmt.Errorf("mainnet: %w", err)
}
}()
s.morphListener.RegisterBlockHandler(func(b *block.Block) {
s.log.Debug("new block",
zap.Uint32("index", b.Index),
)
err = s.persistate.SetUInt32(persistateSideChainLastBlockKey, b.Index)
if err != nil {
s.log.Warn("can't update persistent state",
zap.String("chain", "side"),
zap.Uint32("block_index", b.Index))
}
s.tickTimers(b.Index)
})
if !s.withoutMainNet {
s.mainnetListener.RegisterBlockHandler(func(b *block.Block) {
err = s.persistate.SetUInt32(persistateMainChainLastBlockKey, b.Index)
if err != nil {
s.log.Warn("can't update persistent state",
zap.String("chain", "main"),
zap.Uint32("block_index", b.Index))
}
})
}
for _, runner := range s.runners {
if err := runner(intError); err != nil {
return err
}
}
go s.morphListener.ListenWithError(ctx, morphErr) // listen for neo:morph events
go s.mainnetListener.ListenWithError(ctx, mainnnetErr) // listen for neo:mainnet events
if err := s.startBlockTimers(); err != nil {
return fmt.Errorf("could not start block timers: %w", err)
}
s.startWorkers(ctx)
return nil
}
func (s *Server) startWorkers(ctx context.Context) {
for _, w := range s.workers {
go w(ctx)
}
}
// Stop closes all subscription channels.
func (s *Server) Stop() {
s.setHealthStatus(control.HealthStatus_SHUTTING_DOWN)
go s.morphListener.Stop()
go s.mainnetListener.Stop()
for _, c := range s.closers {
if err := c(); err != nil {
s.log.Warn("closer error",
zap.String("error", err.Error()),
)
}
}
}
func (s *Server) registerNoErrCloser(c func()) {
s.registerCloser(func() error {
c()
return nil
})
}
func (s *Server) registerIOCloser(c io.Closer) {
s.registerCloser(c.Close)
}
func (s *Server) registerCloser(f func() error) {
s.closers = append(s.closers, f)
}
func (s *Server) registerStarter(f func() error) {
s.starters = append(s.starters, f)
}
// New creates instance of inner ring sever structure.
func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper, errChan chan<- error) (*Server, error) {
var err error
server := &Server{log: log}
server.setHealthStatus(control.HealthStatus_HEALTH_STATUS_UNDEFINED)
// parse notary support
server.feeConfig = config.NewFeeConfig(cfg)
// prepare inner ring node private key
acc, err := utilConfig.LoadAccount(
cfg.GetString("wallet.path"),
cfg.GetString("wallet.address"),
cfg.GetString("wallet.password"))
if err != nil {
return nil, fmt.Errorf("ir: %w", err)
}
server.key = acc.PrivateKey()
server.persistate, err = initPersistentStateStorage(cfg)
if err != nil {
return nil, err
}
server.registerCloser(server.persistate.Close)
fromSideChainBlock, err := server.persistate.UInt32(persistateSideChainLastBlockKey)
if err != nil {
fromSideChainBlock = 0
log.Warn("can't get last processed side chain block number", zap.String("error", err.Error()))
}
morphChain := &chainParams{
log: log,
cfg: cfg,
key: server.key,
name: morphPrefix,
from: fromSideChainBlock,
}
// create morph client
server.morphClient, err = createClient(ctx, morphChain, errChan)
if err != nil {
return nil, err
}
// create morph listener
server.morphListener, err = createListener(ctx, server.morphClient, morphChain)
if err != nil {
return nil, err
}
if err := server.morphClient.SetGroupSignerScope(); err != nil {
morphChain.log.Info("failed to set group signer scope, continue with Global", zap.Error(err))
}
server.withoutMainNet = cfg.GetBool("without_mainnet")
if server.withoutMainNet {
// This works as long as event Listener starts listening loop once,
// otherwise Server.Start will run two similar routines.
// This behavior most likely will not change.
server.mainnetListener = server.morphListener
server.mainnetClient = server.morphClient
} else {
mainnetChain := morphChain
mainnetChain.name = mainnetPrefix
mainnetChain.sgn = &transaction.Signer{Scopes: transaction.CalledByEntry}
fromMainChainBlock, err := server.persistate.UInt32(persistateMainChainLastBlockKey)
if err != nil {
fromMainChainBlock = 0
log.Warn("can't get last processed main chain block number", zap.String("error", err.Error()))
}
mainnetChain.from = fromMainChainBlock
// create mainnet client
server.mainnetClient, err = createClient(ctx, mainnetChain, errChan)
if err != nil {
return nil, err
}
// create mainnet listener
server.mainnetListener, err = createListener(ctx, server.mainnetClient, mainnetChain)
if err != nil {
return nil, err
}
}
server.mainNotaryConfig, server.sideNotaryConfig = parseNotaryConfigs(
cfg,
server.morphClient.ProbeNotary(),
!server.withoutMainNet && server.mainnetClient.ProbeNotary(), // if mainnet disabled then notary flag must be disabled too
)
log.Debug("notary support",
zap.Bool("sidechain_enabled", !server.sideNotaryConfig.disabled),
zap.Bool("mainchain_enabled", !server.mainNotaryConfig.disabled),
)
// get all script hashes of contracts
server.contracts, err = parseContracts(
cfg,
server.morphClient,
server.withoutMainNet,
server.mainNotaryConfig.disabled,
server.sideNotaryConfig.disabled,
)
if err != nil {
return nil, err
}
if !server.sideNotaryConfig.disabled {
// enable notary support in the side client
err = server.morphClient.EnableNotarySupport(
client.WithProxyContract(server.contracts.proxy),
)
if err != nil {
return nil, fmt.Errorf("could not enable side chain notary support: %w", err)
}
server.morphListener.EnableNotarySupport(server.contracts.proxy, server.morphClient.Committee, server.morphClient)
}
if !server.mainNotaryConfig.disabled {
// enable notary support in the main client
err = server.mainnetClient.EnableNotarySupport(
client.WithProxyContract(server.contracts.processing),
client.WithAlphabetSource(server.morphClient.Committee),
)
if err != nil {
return nil, fmt.Errorf("could not enable main chain notary support: %w", err)
}
}
// parse default validators
server.predefinedValidators, err = parsePredefinedValidators(cfg)
if err != nil {
return nil, fmt.Errorf("ir: can't parse predefined validators list: %w", err)
}
server.pubKey = server.key.PublicKey().Bytes()
auditPool, err := ants.NewPool(cfg.GetInt("audit.task.exec_pool_size"))
if err != nil {
return nil, err
}
fee := server.feeConfig.SideChainFee()
// do not use TryNotary() in audit wrapper
// audit operations do not require multisignatures
server.auditClient, err = auditClient.NewFromMorph(server.morphClient, server.contracts.audit, fee)
if err != nil {
return nil, err
}
// form morph container client's options
morphCnrOpts := make([]cntClient.Option, 0, 3)
morphCnrOpts = append(morphCnrOpts,
cntClient.TryNotary(),
cntClient.AsAlphabet(),
)
if server.sideNotaryConfig.disabled {
// in non-notary environments we customize fee for named container registration
// because it takes much more additional GAS than other operations.
morphCnrOpts = append(morphCnrOpts,
cntClient.WithCustomFeeForNamedPut(server.feeConfig.NamedContainerRegistrationFee()),
)
}
cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, fee, morphCnrOpts...)
if err != nil {
return nil, err
}
server.netmapClient, err = nmClient.NewFromMorph(server.morphClient, server.contracts.netmap, fee, nmClient.TryNotary(), nmClient.AsAlphabet())
if err != nil {
return nil, err
}
server.balanceClient, err = balanceClient.NewFromMorph(server.morphClient, server.contracts.balance, fee, balanceClient.TryNotary(), balanceClient.AsAlphabet())
if err != nil {
return nil, err
}
repClient, err := repClient.NewFromMorph(server.morphClient, server.contracts.reputation, fee, repClient.TryNotary(), repClient.AsAlphabet())
if err != nil {
return nil, err
}
neofsIDClient, err := neofsid.NewFromMorph(server.morphClient, server.contracts.neofsID, fee, neofsid.TryNotary(), neofsid.AsAlphabet())
if err != nil {
return nil, err
}
neofsCli, err := neofsClient.NewFromMorph(server.mainnetClient, server.contracts.neofs,
server.feeConfig.MainChainFee(), neofsClient.TryNotary(), neofsClient.AsAlphabet())
if err != nil {
return nil, err
}
// initialize morph client of Subnet contract
clientMode := morphsubnet.NotaryAlphabet
if server.sideNotaryConfig.disabled {
clientMode = morphsubnet.NonNotary
}
subnetInitPrm := morphsubnet.InitPrm{}
subnetInitPrm.SetBaseClient(server.morphClient)
subnetInitPrm.SetContractAddress(server.contracts.subnet)
subnetInitPrm.SetMode(clientMode)
subnetClient := &morphsubnet.Client{}
err = subnetClient.Init(subnetInitPrm)
if err != nil {
return nil, fmt.Errorf("could not initialize subnet client: %w", err)
}
var irf irFetcher
if server.withoutMainNet || !server.mainNotaryConfig.disabled {
// if mainchain is disabled we should use NeoFSAlphabetList client method according to its docs
// (naming `...WithNotary` will not always be correct)
irf = NewIRFetcherWithNotary(server.morphClient)
} else {
irf = NewIRFetcherWithoutNotary(server.netmapClient)
}
server.statusIndex = newInnerRingIndexer(
server.morphClient,
irf,
server.key.PublicKey(),
cfg.GetDuration("indexer.cache_timeout"),
)
clientCache := newClientCache(&clientCacheParams{
Log: log,
Key: &server.key.PrivateKey,
SGTimeout: cfg.GetDuration("audit.timeout.get"),
HeadTimeout: cfg.GetDuration("audit.timeout.head"),
RangeTimeout: cfg.GetDuration("audit.timeout.rangehash"),
AllowExternal: cfg.GetBool("audit.allow_external"),
})
server.registerNoErrCloser(clientCache.cache.CloseAll)
pdpPoolSize := cfg.GetInt("audit.pdp.pairs_pool_size")
porPoolSize := cfg.GetInt("audit.por.pool_size")
// create audit processor dependencies
auditTaskManager := audittask.New(
audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")),
audittask.WithWorkerPool(auditPool),
audittask.WithLogger(log),
audittask.WithContainerCommunicator(clientCache),
audittask.WithMaxPDPSleepInterval(cfg.GetDuration("audit.pdp.max_sleep_interval")),
audittask.WithPDPWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(pdpPoolSize)
}),
audittask.WithPoRWorkerPoolGenerator(func() (util2.WorkerPool, error) {
return ants.NewPool(porPoolSize)
}),
)
server.workers = append(server.workers, auditTaskManager.Listen)
// create audit processor
auditProcessor, err := audit.New(&audit.Params{
Log: log,
NetmapClient: server.netmapClient,
ContainerClient: cnrClient,
IRList: server,
EpochSource: server,
SGSource: clientCache,
Key: &server.key.PrivateKey,
RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"),
TaskManager: auditTaskManager,
Reporter: server,
})
if err != nil {
return nil, err
}
// create settlement processor dependencies
settlementDeps := settlementDeps{
log: server.log,
cnrSrc: cntClient.AsContainerSource(cnrClient),
auditClient: server.auditClient,
nmClient: server.netmapClient,
clientCache: clientCache,
balanceClient: server.balanceClient,
}
settlementDeps.settlementCtx = auditSettlementContext
auditCalcDeps := &auditSettlementDeps{
settlementDeps: settlementDeps,
}
settlementDeps.settlementCtx = basicIncomeSettlementContext
basicSettlementDeps := &basicIncomeSettlementDeps{
settlementDeps: settlementDeps,
cnrClient: cnrClient,
}
auditSettlementCalc := auditSettlement.NewCalculator(
&auditSettlement.CalculatorPrm{
ResultStorage: auditCalcDeps,
ContainerStorage: auditCalcDeps,
PlacementCalculator: auditCalcDeps,
SGStorage: auditCalcDeps,
AccountStorage: auditCalcDeps,
Exchanger: auditCalcDeps,
AuditFeeFetcher: server.netmapClient,
},
auditSettlement.WithLogger(server.log),
)
// create settlement processor
settlementProcessor := settlement.New(
settlement.Prm{
AuditProcessor: (*auditSettlementCalculator)(auditSettlementCalc),
BasicIncome: &basicSettlementConstructor{dep: basicSettlementDeps},
State: server,
},
settlement.WithLogger(server.log),
)
locodeValidator, err := server.newLocodeValidator(cfg)
if err != nil {
return nil, err
}
subnetValidator, err := subnetvalidator.New(
subnetvalidator.Prm{
SubnetClient: subnetClient,
},
)
if err != nil {
return nil, err
}
var alphaSync event.Handler
if server.withoutMainNet || cfg.GetBool("governance.disable") {
alphaSync = func(event.Event) {
log.Debug("alphabet keys sync is disabled")
}
} else {
// create governance processor
governanceProcessor, err := governance.New(&governance.Params{
Log: log,
NeoFSClient: neofsCli,
NetmapClient: server.netmapClient,
AlphabetState: server,
EpochState: server,
Voter: server,
IRFetcher: irf,
MorphClient: server.morphClient,
MainnetClient: server.mainnetClient,
NotaryDisabled: server.sideNotaryConfig.disabled,
})
if err != nil {
return nil, err
}
alphaSync = governanceProcessor.HandleAlphabetSync
err = bindMainnetProcessor(governanceProcessor, server)
if err != nil {
return nil, err
}
}
var netMapCandidateStateValidator statevalidation.NetMapCandidateValidator
netMapCandidateStateValidator.SetNetworkSettings((*networkSettings)(server.netmapClient))
// create netmap processor
server.netmapProcessor, err = netmap.New(&netmap.Params{
Log: log,
PoolSize: cfg.GetInt("workers.netmap"),
NetmapClient: server.netmapClient,
EpochTimer: server,
EpochState: server,
AlphabetState: server,
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
ContainerWrapper: cnrClient,
HandleAudit: server.onlyActiveEventHandler(
auditProcessor.StartAuditHandler(),
),
NotaryDepositHandler: server.onlyAlphabetEventHandler(
server.notaryHandler,
),
AuditSettlementsHandler: server.onlyAlphabetEventHandler(
settlementProcessor.HandleAuditEvent,
),
AlphabetSyncHandler: alphaSync,
NodeValidator: nodevalidator.New(
&netMapCandidateStateValidator,
addrvalidator.New(),
locodeValidator,
subnetValidator,
),
NotaryDisabled: server.sideNotaryConfig.disabled,
SubnetContract: &server.contracts.subnet,
})
if err != nil {
return nil, err
}
err = bindMorphProcessor(server.netmapProcessor, server)
if err != nil {
return nil, err
}
// container processor
containerProcessor, err := container.New(&container.Params{
Log: log,
PoolSize: cfg.GetInt("workers.container"),
AlphabetState: server,
ContainerClient: cnrClient,
NeoFSIDClient: neofsIDClient,
NetworkState: server.netmapClient,
NotaryDisabled: server.sideNotaryConfig.disabled,
SubnetClient: subnetClient,
})
if err != nil {
return nil, err
}
err = bindMorphProcessor(containerProcessor, server)
if err != nil {
return nil, err
}
// create balance processor
balanceProcessor, err := balance.New(&balance.Params{
Log: log,
PoolSize: cfg.GetInt("workers.balance"),
NeoFSClient: neofsCli,
BalanceSC: server.contracts.balance,
AlphabetState: server,
Converter: &server.precision,
})
if err != nil {
return nil, err
}
err = bindMorphProcessor(balanceProcessor, server)
if err != nil {
return nil, err
}
if !server.withoutMainNet {
// create mainnnet neofs processor
neofsProcessor, err := neofs.New(&neofs.Params{
Log: log,
PoolSize: cfg.GetInt("workers.neofs"),
NeoFSContract: server.contracts.neofs,
NeoFSIDClient: neofsIDClient,
BalanceClient: server.balanceClient,
NetmapClient: server.netmapClient,
MorphClient: server.morphClient,
EpochState: server,
AlphabetState: server,
Converter: &server.precision,
MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"),
MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"),
MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")),
GasBalanceThreshold: cfg.GetInt64("emit.gas.balance_threshold"),
})
if err != nil {
return nil, err
}
err = bindMainnetProcessor(neofsProcessor, server)
if err != nil {
return nil, err
}
}
// create alphabet processor
alphabetProcessor, err := alphabet.New(&alphabet.Params{
Log: log,
PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: server.contracts.alphabet,
NetmapClient: server.netmapClient,
MorphClient: server.morphClient,
IRList: server,
StorageEmission: cfg.GetUint64("emit.storage.amount"),
})
if err != nil {
return nil, err
}
err = bindMorphProcessor(alphabetProcessor, server)
if err != nil {
return nil, err
}
// create reputation processor
reputationProcessor, err := reputation.New(&reputation.Params{
Log: log,
PoolSize: cfg.GetInt("workers.reputation"),
EpochState: server,
AlphabetState: server,
ReputationWrapper: repClient,
ManagerBuilder: reputationcommon.NewManagerBuilder(
reputationcommon.ManagersPrm{
NetMapSource: server.netmapClient,
},
),
NotaryDisabled: server.sideNotaryConfig.disabled,
})
if err != nil {
return nil, err
}
err = bindMorphProcessor(reputationProcessor, server)
if err != nil {
return nil, err
}
// initialize epoch timers
server.epochTimer = newEpochTimer(&epochTimerArgs{
l: server.log,
newEpochHandlers: server.newEpochTickHandlers(),
cnrWrapper: cnrClient,
epoch: server,
stopEstimationDMul: cfg.GetUint32("timers.stop_estimation.mul"),
stopEstimationDDiv: cfg.GetUint32("timers.stop_estimation.div"),
collectBasicIncome: subEpochEventHandler{
handler: settlementProcessor.HandleIncomeCollectionEvent,
durationMul: cfg.GetUint32("timers.collect_basic_income.mul"),
durationDiv: cfg.GetUint32("timers.collect_basic_income.div"),
},
distributeBasicIncome: subEpochEventHandler{
handler: settlementProcessor.HandleIncomeDistributionEvent,
durationMul: cfg.GetUint32("timers.distribute_basic_income.mul"),
durationDiv: cfg.GetUint32("timers.distribute_basic_income.div"),
},
})
server.addBlockTimer(server.epochTimer)
// initialize emission timer
emissionTimer := newEmissionTimer(&emitTimerArgs{
ap: alphabetProcessor,
emitDuration: cfg.GetUint32("timers.emit"),
})
server.addBlockTimer(emissionTimer)
controlSvcEndpoint := cfg.GetString("control.grpc.endpoint")
if controlSvcEndpoint != "" {
authKeysStr := cfg.GetStringSlice("control.authorized_keys")
authKeys := make([][]byte, 0, len(authKeysStr))
for i := range authKeysStr {
key, err := hex.DecodeString(authKeysStr[i])
if err != nil {
return nil, fmt.Errorf("could not parse Control authorized key %s: %w",
authKeysStr[i],
err,
)
}
authKeys = append(authKeys, key)
}
var p controlsrv.Prm
p.SetPrivateKey(*server.key)
p.SetHealthChecker(server)
controlSvc := controlsrv.New(p,
controlsrv.WithAllowedKeys(authKeys),
)
grpcControlSrv := grpc.NewServer()
control.RegisterControlServiceServer(grpcControlSrv, controlSvc)
server.runners = append(server.runners, func(ch chan<- error) error {
lis, err := net.Listen("tcp", controlSvcEndpoint)
if err != nil {
return err
}
go func() {
ch <- grpcControlSrv.Serve(lis)
}()
return nil
})
server.registerNoErrCloser(grpcControlSrv.GracefulStop)
} else {
log.Info("no Control server endpoint specified, service is disabled")
}
server.initSubnet(subnetConfig{
queueSize: cfg.GetUint32("workers.subnet"),
})
if cfg.GetString("metrics.address") != "" {
m := metrics.NewInnerRingMetrics()
server.metrics = &m
}
return server, nil
}
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
// listenerPoolCap is a capacity of a
// worker pool inside the listener. It
// is used to prevent blocking in neo-go:
// the client cannot make RPC requests if
// the notification channel is not being
// read by another goroutine.
const listenerPoolCap = 10
var (
sub subscriber.Subscriber
err error
)
sub, err = subscriber.New(ctx, &subscriber.Params{
Log: p.log,
StartFromBlock: p.from,
Client: cli,
})
if err != nil {
return nil, err
}
listener, err := event.NewListener(event.ListenerParams{
Logger: p.log.With(zap.String("chain", p.name)),
Subscriber: sub,
WorkerPoolCapacity: listenerPoolCap,
})
if err != nil {
return nil, err
}
return listener, err
}
func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) {
// config name left unchanged for compatibility, may be its better to rename it to "endpoints" or "clients"
var endpoints []client.Endpoint
// defaultPriority is a default endpoint priority
const defaultPriority = 1
section := p.name + ".endpoint.client"
for i := 0; ; i++ {
addr := p.cfg.GetString(fmt.Sprintf("%s.%d.%s", section, i, "address"))
if addr == "" {
break
}
priority := p.cfg.GetInt(section + ".priority")
if priority <= 0 {
priority = defaultPriority
}
endpoints = append(endpoints, client.Endpoint{
Address: addr,
Priority: priority,
})
}
if len(endpoints) == 0 {
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
}
return client.New(
p.key,
client.WithContext(ctx),
client.WithLogger(p.log),
client.WithDialTimeout(p.cfg.GetDuration(p.name+".dial_timeout")),
client.WithSigner(p.sgn),
client.WithEndpoints(endpoints...),
client.WithConnLostCallback(func() {
errChan <- fmt.Errorf("%s chain connection has been lost", p.name)
}),
)
}
func parsePredefinedValidators(cfg *viper.Viper) (keys.PublicKeys, error) {
publicKeyStrings := cfg.GetStringSlice("morph.validators")
return ParsePublicKeysFromStrings(publicKeyStrings)
}
// ParsePublicKeysFromStrings returns slice of neo public keys from slice
// of hex encoded strings.
func ParsePublicKeysFromStrings(pubKeys []string) (keys.PublicKeys, error) {
publicKeys := make(keys.PublicKeys, 0, len(pubKeys))
for i := range pubKeys {
key, err := keys.NewPublicKeyFromString(pubKeys[i])
if err != nil {
return nil, fmt.Errorf("can't decode public key: %w", err)
}
publicKeys = append(publicKeys, key)
}
return publicKeys, nil
}
func (s *Server) initConfigFromBlockchain() error {
// get current epoch
epoch, err := s.netmapClient.Epoch()
if err != nil {
return fmt.Errorf("can't read epoch number: %w", err)
}
// get current epoch duration
epochDuration, err := s.netmapClient.EpochDuration()
if err != nil {
return fmt.Errorf("can't read epoch duration: %w", err)
}
// get balance precision
balancePrecision, err := s.balanceClient.Decimals()
if err != nil {
return fmt.Errorf("can't read balance contract precision: %w", err)
}
s.epochCounter.Store(epoch)
s.epochDuration.Store(epochDuration)
s.precision.SetBalancePrecision(balancePrecision)
// get next epoch delta tick
s.initialEpochTickDelta, err = s.nextEpochBlockDelta()
if err != nil {
return err
}
s.log.Debug("read config from blockchain",
zap.Bool("active", s.IsActive()),
zap.Bool("alphabet", s.IsAlphabet()),
zap.Uint64("epoch", epoch),
zap.Uint32("precision", balancePrecision),
zap.Uint32("init_epoch_tick_delta", s.initialEpochTickDelta),
)
return nil
}
func (s *Server) nextEpochBlockDelta() (uint32, error) {
epochBlock, err := s.netmapClient.LastEpochBlock()
if err != nil {
return 0, fmt.Errorf("can't read last epoch block: %w", err)
}
blockHeight, err := s.morphClient.BlockCount()
if err != nil {
return 0, fmt.Errorf("can't get side chain height: %w", err)
}
delta := uint32(s.epochDuration.Load()) + epochBlock
if delta < blockHeight {
return 0, nil
}
return delta - blockHeight, nil
}
// onlyActiveHandler wrapper around event handler that executes it
// only if inner ring node state is active.
func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) {
if s.IsActive() {
f(ev)
}
}
}
// onlyAlphabet wrapper around event handler that executes it
// only if inner ring node is alphabet node.
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) {
if s.IsAlphabet() {
f(ev)
}
}
}
func (s *Server) newEpochTickHandlers() []newEpochHandler {
newEpochHandlers := []newEpochHandler{
func() {
s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{})
},
}
return newEpochHandlers
}