662 lines
17 KiB
662 lines
17 KiB
package innerring
import (
internalNet "git.frostfs.info/TrueCloudLab/frostfs-node/internal/net"
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
balanceClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
control "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/ir"
type (
// Server is the inner ring application structure, that contains all event
// processors, shared variables and event handlers.
Server struct {
log *logger.Logger
// event producers
morphListener event.Listener
mainnetListener event.Listener
blockTimers []*timer.BlockTimer
epochTimer *timer.BlockTimer
// global state
morphClient *client.Client
mainnetClient *client.Client
epochCounter atomic.Uint64
epochDuration atomic.Uint64
statusIndex *innerRingIndexer
precision precision.Fixed8Converter
healthStatus atomic.Int32
balanceClient *balanceClient.Client
netmapClient *nmClient.Client
persistate *state.PersistentStorage
containerClient *container.Client
// metrics
irMetrics *metrics.InnerRingServiceMetrics
// notary configuration
feeConfig *config.FeeConfig
mainNotaryConfig *notaryConfig
// internal variables
key *keys.PrivateKey
contracts *contracts
predefinedValidators keys.PublicKeys
initialEpochTickDelta uint32
withoutMainNet bool
sdNotify bool
// runtime processors
netmapProcessor *netmap.Processor
alphabetProcessor *alphabet.Processor
workers []func(context.Context)
// Set of local resources that must be
// initialized at the very beginning of
// Server's work, (e.g. opening files).
// If any starter returns an error, Server's
// starting fails immediately.
starters []func() error
// Set of local resources that must be
// released at Server's work completion
// (e.g closing files).
// Closer's wrong outcome shouldn't be critical.
// Errors are logged.
closers []func() error
// Set of component runners which
// should report start errors
// to the application.
runners []func(chan<- error) error
// cmode used for upgrade scenario.
// nolint:unused
cmode *atomic.Bool
chainParams struct {
log *logger.Logger
cfg *viper.Viper
key *keys.PrivateKey
name string
sgn *transaction.Signer
from uint32 // block height
morphCacheMetric metrics.MorphCacheMetrics
multinetMetrics metrics.MultinetMetrics
const (
morphPrefix = "morph"
mainnetPrefix = "mainnet"
// extra blocks to overlap two deposits, we do that to make sure that
// there won't be any blocks without deposited assets in notary contract;
// make sure it is bigger than any extra rounding value in notary client.
notaryExtraBlocks = 300
// amount of tries before notary deposit timeout.
notaryDepositTimeout = 100
var (
errDepositTimeout = errors.New("notary deposit didn't appear in the network")
errDepositFail = errors.New("notary tx has faulted")
// Start runs all event providers.
func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) {
s.setHealthStatus(ctx, control.HealthStatus_STARTING)
defer func() {
if err == nil {
s.setHealthStatus(ctx, control.HealthStatus_READY)
err = s.launchStarters()
if err != nil {
return err
err = s.initConfigFromBlockchain(ctx)
if err != nil {
return err
if s.IsAlphabet(ctx) {
err = s.initMainNotary(ctx)
if err != nil {
return err
err = s.initSideNotary(ctx)
if err != nil {
return err
prm := governance.VoteValidatorPrm{}
prm.Validators = s.predefinedValidators
// vote for sidechain validator if it is prepared in config
err = s.voteForSidechainValidator(ctx, prm)
if err != nil {
// we don't stop inner ring execution on this error
s.log.Warn(ctx, logs.InnerringCantVoteForPreparedValidators,
morphErr := make(chan error)
mainnnetErr := make(chan error)
// anonymous function to multiplex error channels
go func() {
select {
case <-ctx.Done():
case err := <-morphErr:
intError <- fmt.Errorf("sidechain: %w", err)
case err := <-mainnnetErr:
intError <- fmt.Errorf("mainnet: %w", err)
if err := s.startRunners(intError); err != nil {
return err
go s.morphListener.ListenWithError(ctx, morphErr) // listen for neo:morph events
go s.mainnetListener.ListenWithError(ctx, mainnnetErr) // listen for neo:mainnet events
if err := s.startBlockTimers(); err != nil {
return fmt.Errorf("could not start block timers: %w", err)
return nil
func (s *Server) registerMorphNewBlockEventHandler() {
s.morphListener.RegisterBlockHandler(func(ctx context.Context, b *block.Block) {
s.log.Debug(ctx, logs.InnerringNewBlock,
zap.Uint32("index", b.Index),
err := s.persistate.SetUInt32(persistateSideChainLastBlockKey, b.Index)
if err != nil {
s.log.Warn(ctx, logs.InnerringCantUpdatePersistentState,
zap.String("chain", "side"),
zap.Uint32("block_index", b.Index))
func (s *Server) registerMainnetNewBlockEventHandler() {
if !s.withoutMainNet {
s.mainnetListener.RegisterBlockHandler(func(ctx context.Context, b *block.Block) {
err := s.persistate.SetUInt32(persistateMainChainLastBlockKey, b.Index)
if err != nil {
s.log.Warn(ctx, logs.InnerringCantUpdatePersistentState,
zap.String("chain", "main"),
zap.Uint32("block_index", b.Index))
func (s *Server) startRunners(errCh chan<- error) error {
for _, runner := range s.runners {
if err := runner(errCh); err != nil {
return err
return nil
func (s *Server) launchStarters() error {
for _, starter := range s.starters {
if err := starter(); err != nil {
return err
return nil
func (s *Server) initMainNotary(ctx context.Context) error {
if !s.mainNotaryConfig.disabled {
return s.initNotary(ctx,
"waiting to accept main notary deposit",
return nil
func (s *Server) initSideNotary(ctx context.Context) error {
return s.initNotary(ctx,
"waiting to accept side notary deposit",
func (s *Server) tickInitialExpoch(ctx context.Context) {
initialEpochTicker := timer.NewOneTickTimer(
func() {
s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
func (s *Server) startWorkers(ctx context.Context) {
for _, w := range s.workers {
go w(ctx)
// Stop closes all subscription channels.
func (s *Server) Stop(ctx context.Context) {
s.setHealthStatus(ctx, control.HealthStatus_SHUTTING_DOWN)
go s.morphListener.Stop()
go s.mainnetListener.Stop()
for _, c := range s.closers {
if err := c(); err != nil {
s.log.Warn(ctx, logs.InnerringCloserError,
func (s *Server) registerNoErrCloser(c func()) {
s.registerCloser(func() error {
return nil
func (s *Server) registerIOCloser(c io.Closer) {
func (s *Server) registerCloser(f func() error) {
s.closers = append(s.closers, f)
func (s *Server) registerStarter(f func() error) {
s.starters = append(s.starters, f)
// New creates instance of inner ring sever structure.
func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan<- error,
metrics *metrics.InnerRingServiceMetrics, cmode *atomic.Bool, audit *atomic.Bool,
) (*Server, error) {
var err error
server := &Server{
log: log.WithTag(logger.TagIr),
irMetrics: metrics,
cmode: cmode,
server.sdNotify, err = server.initSdNotify(cfg)
if err != nil {
return nil, err
server.setHealthStatus(ctx, control.HealthStatus_HEALTH_STATUS_UNDEFINED)
// parse notary support
server.feeConfig = config.NewFeeConfig(cfg)
err = server.initKey(cfg)
if err != nil {
return nil, err
server.persistate, err = initPersistentStateStorage(cfg)
if err != nil {
return nil, err
var morphChain *chainParams
morphChain, err = server.initMorph(ctx, cfg, errChan)
if err != nil {
return nil, err
err = server.initMainnet(ctx, cfg, morphChain, errChan)
if err != nil {
return nil, err
err = server.initContracts(cfg)
if err != nil {
return nil, err
err = server.enableNotarySupport()
if err != nil {
return nil, err
// parse default validators
server.predefinedValidators, err = parsePredefinedValidators(cfg)
if err != nil {
return nil, fmt.Errorf("ir: can't parse predefined validators list: %w", err)
var morphClients *serverMorphClients
morphClients, err = server.initClientsFromMorph()
if err != nil {
return nil, err
err = server.initProcessors(ctx, cfg, morphClients)
if err != nil {
return nil, err
server.initTimers(ctx, cfg)
err = server.initGRPCServer(ctx, cfg, log, audit)
if err != nil {
return nil, err
return server, nil
func (s *Server) initSdNotify(cfg *viper.Viper) (bool, error) {
if cfg.GetBool("systemdnotify.enabled") {
return true, sdnotify.InitSocket()
return false, nil
func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) {
var (
sub subscriber.Subscriber
err error
sub, err = subscriber.New(ctx, &subscriber.Params{
Log: p.log,
StartFromBlock: p.from,
Client: cli,
if err != nil {
return nil, err
listener, err := event.NewListener(event.ListenerParams{
Logger: p.log.With(zap.String("chain", p.name)),
Subscriber: sub,
if err != nil {
return nil, err
return listener, err
func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) {
// config name left unchanged for compatibility, may be its better to rename it to "endpoints" or "clients"
var endpoints []client.Endpoint
// defaultPriority is a default endpoint priority
const defaultPriority = 1
section := p.name + ".endpoint.client"
for i := 0; ; i++ {
addr := p.cfg.GetString(fmt.Sprintf("%s.%d.%s", section, i, "address"))
if addr == "" {
priority := p.cfg.GetInt(section + ".priority")
if priority <= 0 {
priority = defaultPriority
var mtlsConfig *client.MTLSConfig
rootCAs := p.cfg.GetStringSlice(fmt.Sprintf("%s.%d.trusted_ca_list", section, i))
if len(rootCAs) != 0 {
mtlsConfig = &client.MTLSConfig{
TrustedCAList: rootCAs,
KeyFile: p.cfg.GetString(fmt.Sprintf("%s.%d.key", section, i)),
CertFile: p.cfg.GetString(fmt.Sprintf("%s.%d.certificate", section, i)),
endpoints = append(endpoints, client.Endpoint{
Address: addr,
Priority: priority,
MTLSConfig: mtlsConfig,
if len(endpoints) == 0 {
return nil, fmt.Errorf("%s chain client endpoints not provided", p.name)
nc := parseMultinetConfig(p.cfg, p.multinetMetrics)
ds, err := internalNet.NewDialerSource(nc)
if err != nil {
return nil, fmt.Errorf("dialer source: %w", err)
return client.New(
client.WithConnLostCallback(func() {
errChan <- fmt.Errorf("%s chain connection has been lost", p.name)
func parsePredefinedValidators(cfg *viper.Viper) (keys.PublicKeys, error) {
publicKeyStrings := cfg.GetStringSlice("morph.validators")
return ParsePublicKeysFromStrings(publicKeyStrings)
// ParsePublicKeysFromStrings returns slice of neo public keys from slice
// of hex encoded strings.
func ParsePublicKeysFromStrings(pubKeys []string) (keys.PublicKeys, error) {
publicKeys := make(keys.PublicKeys, 0, len(pubKeys))
for i := range pubKeys {
key, err := keys.NewPublicKeyFromString(pubKeys[i])
if err != nil {
return nil, fmt.Errorf("can't decode public key: %w", err)
publicKeys = append(publicKeys, key)
return publicKeys, nil
// parseWalletAddressesFromStrings returns a slice of util.Uint160 from a slice
// of strings.
func parseWalletAddressesFromStrings(wallets []string) ([]util.Uint160, error) {
if len(wallets) == 0 {
return nil, nil
var err error
extraWallets := make([]util.Uint160, len(wallets))
for i := range wallets {
extraWallets[i], err = address.StringToUint160(wallets[i])
if err != nil {
return nil, err
return extraWallets, nil
func parseMultinetConfig(cfg *viper.Viper, m metrics.MultinetMetrics) internalNet.Config {
nc := internalNet.Config{
Enabled: cfg.GetBool("multinet.enabled"),
Balancer: cfg.GetString("multinet.balancer"),
Restrict: cfg.GetBool("multinet.restrict"),
FallbackDelay: cfg.GetDuration("multinet.fallback_delay"),
Metrics: m,
for i := 0; ; i++ {
mask := cfg.GetString(fmt.Sprintf("multinet.subnets.%d.mask", i))
if mask == "" {
sourceIPs := cfg.GetStringSlice(fmt.Sprintf("multinet.subnets.%d.source_ips", i))
nc.Subnets = append(nc.Subnets, internalNet.Subnet{
Prefix: mask,
SourceIPs: sourceIPs,
return nc
func (s *Server) initConfigFromBlockchain(ctx context.Context) error {
// get current epoch
epoch, err := s.netmapClient.Epoch(ctx)
if err != nil {
return fmt.Errorf("can't read epoch number: %w", err)
// get current epoch duration
epochDuration, err := s.netmapClient.EpochDuration(ctx)
if err != nil {
return fmt.Errorf("can't read epoch duration: %w", err)
// get balance precision
balancePrecision, err := s.balanceClient.Decimals(ctx)
if err != nil {
return fmt.Errorf("can't read balance contract precision: %w", err)
// get next epoch delta tick
s.initialEpochTickDelta, err = s.nextEpochBlockDelta(ctx)
if err != nil {
return err
s.log.Debug(ctx, logs.InnerringReadConfigFromBlockchain,
zap.Bool("active", s.IsActive(ctx)),
zap.Bool("alphabet", s.IsAlphabet(ctx)),
zap.Uint64("epoch", epoch),
zap.Uint32("precision", balancePrecision),
zap.Uint32("init_epoch_tick_delta", s.initialEpochTickDelta),
return nil
func (s *Server) nextEpochBlockDelta(ctx context.Context) (uint32, error) {
epochBlock, err := s.netmapClient.LastEpochBlock(ctx)
if err != nil {
return 0, fmt.Errorf("can't read last epoch block: %w", err)
blockHeight, err := s.morphClient.BlockCount()
if err != nil {
return 0, fmt.Errorf("can't get side chain height: %w", err)
delta := uint32(s.epochDuration.Load()) + epochBlock
if delta < blockHeight {
return 0, nil
return delta - blockHeight, nil
// onlyAlphabet wrapper around event handler that executes it
// only if inner ring node is alphabet node.
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {
return func(ctx context.Context, ev event.Event) {
if s.IsAlphabet(ctx) {
f(ctx, ev)
func (s *Server) newEpochTickHandlers(ctx context.Context) []newEpochHandler {
newEpochHandlers := []newEpochHandler{
func() {
s.netmapProcessor.HandleNewEpochTick(ctx, timerEvent.NewEpochTick{})
return newEpochHandlers
func (s *Server) SetExtraWallets(cfg *viper.Viper) error {
parsedWallets, err := parseWalletAddressesFromStrings(cfg.GetStringSlice("emit.extra_wallets"))
if err != nil {
return err
return nil