[#446] innerring: Use alphabet state in processors

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-03-23 18:20:44 +03:00 committed by Alex Vanin
parent 1332db883e
commit e05f1e1394
17 changed files with 80 additions and 69 deletions

View file

@ -431,14 +431,14 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
EpochTimer: server, EpochTimer: server,
MorphClient: server.morphClient, MorphClient: server.morphClient,
EpochState: server, EpochState: server,
ActiveState: server, AlphabetState: server,
CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"), CleanupEnabled: cfg.GetBool("netmap_cleaner.enabled"),
CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"), CleanupThreshold: cfg.GetUint64("netmap_cleaner.threshold"),
ContainerWrapper: cnrClient, ContainerWrapper: cnrClient,
HandleAudit: server.onlyActiveEventHandler( HandleAudit: server.onlyActiveEventHandler(
auditProcessor.StartAuditHandler(), auditProcessor.StartAuditHandler(),
), ),
AuditSettlementsHandler: server.onlyActiveEventHandler( AuditSettlementsHandler: server.onlyAlphabetEventHandler(
settlementProcessor.HandleAuditEvent, settlementProcessor.HandleAuditEvent,
), ),
NodeValidator: locodeValidator, NodeValidator: locodeValidator,
@ -458,7 +458,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
PoolSize: cfg.GetInt("workers.container"), PoolSize: cfg.GetInt("workers.container"),
ContainerContract: server.contracts.container, ContainerContract: server.contracts.container,
MorphClient: server.morphClient, MorphClient: server.morphClient,
ActiveState: server, AlphabetState: server,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -476,7 +476,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
NeoFSContract: server.contracts.neofs, NeoFSContract: server.contracts.neofs,
BalanceContract: server.contracts.balance, BalanceContract: server.contracts.balance,
MainnetClient: server.mainnetClient, MainnetClient: server.mainnetClient,
ActiveState: server, AlphabetState: server,
Converter: &server.precision, Converter: &server.precision,
}) })
if err != nil { if err != nil {
@ -499,7 +499,7 @@ func New(ctx context.Context, log *zap.Logger, cfg *viper.Viper) (*Server, error
NetmapContract: server.contracts.netmap, NetmapContract: server.contracts.netmap,
MorphClient: server.morphClient, MorphClient: server.morphClient,
EpochState: server, EpochState: server,
ActiveState: server, AlphabetState: server,
Converter: &server.precision, Converter: &server.precision,
MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"), MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"),
MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"), MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"),
@ -727,6 +727,7 @@ func (s *Server) initConfigFromBlockchain() error {
s.log.Debug("read config from blockchain", s.log.Debug("read config from blockchain",
zap.Bool("active", s.IsActive()), zap.Bool("active", s.IsActive()),
zap.Bool("alphabet", s.IsAlphabet()),
zap.Int64("epoch", epoch), zap.Int64("epoch", epoch),
zap.Uint32("precision", balancePrecision), zap.Uint32("precision", balancePrecision),
) )
@ -744,6 +745,16 @@ func (s *Server) onlyActiveEventHandler(f event.Handler) event.Handler {
} }
} }
// onlyAlphabet wrapper around event handler that executes it
// only if inner ring node is alphabet node.
func (s *Server) onlyAlphabetEventHandler(f event.Handler) event.Handler {
return func(ev event.Event) {
if s.IsAlphabet() {
f(ev)
}
}
}
func (s *Server) depositNotary() error { func (s *Server) depositNotary() error {
return s.morphClient.DepositNotary( return s.morphClient.DepositNotary(
s.notaryDepositAmount, s.notaryDepositAmount,

View file

@ -10,9 +10,9 @@ import (
) )
func (np *Processor) processEmit() { func (np *Processor) processEmit() {
index := np.irList.InnerRingIndex() index := np.irList.AlphabetIndex()
if index < 0 { if index < 0 {
np.log.Info("passive mode, ignore gas emission event") np.log.Info("non alphabet mode, ignore gas emission event")
return return
} }

View file

@ -12,7 +12,7 @@ import (
type ( type (
// Indexer is a callback interface for inner ring global state. // Indexer is a callback interface for inner ring global state.
Indexer interface { Indexer interface {
InnerRingIndex() int AlphabetIndex() int
} }
// Contracts is an interface of the storage // Contracts is an interface of the storage

View file

@ -9,8 +9,8 @@ import (
// Process lock event by invoking Cheque method in main net to send assets // Process lock event by invoking Cheque method in main net to send assets
// back to the withdraw issuer. // back to the withdraw issuer.
func (bp *Processor) processLock(lock *balanceEvent.Lock) { func (bp *Processor) processLock(lock *balanceEvent.Lock) {
if !bp.activeState.IsActive() { if !bp.alphabetState.IsAlphabet() {
bp.log.Info("passive mode, ignore balance lock") bp.log.Info("non alphabet mode, ignore balance lock")
return return
} }

View file

@ -11,9 +11,9 @@ import (
) )
type ( type (
// ActiveState is a callback interface for inner ring global state // AlphabetState is a callback interface for inner ring global state
ActiveState interface { AlphabetState interface {
IsActive() bool IsAlphabet() bool
} }
// PrecisionConverter converts balance amount values. // PrecisionConverter converts balance amount values.
@ -28,7 +28,7 @@ type (
neofsContract util.Uint160 neofsContract util.Uint160
balanceContract util.Uint160 balanceContract util.Uint160
mainnetClient *client.Client mainnetClient *client.Client
activeState ActiveState alphabetState AlphabetState
converter PrecisionConverter converter PrecisionConverter
} }
@ -39,7 +39,7 @@ type (
NeoFSContract util.Uint160 NeoFSContract util.Uint160
BalanceContract util.Uint160 BalanceContract util.Uint160
MainnetClient *client.Client MainnetClient *client.Client
ActiveState ActiveState AlphabetState AlphabetState
Converter PrecisionConverter Converter PrecisionConverter
} }
) )
@ -55,7 +55,7 @@ func New(p *Params) (*Processor, error) {
return nil, errors.New("ir/balance: logger is not set") return nil, errors.New("ir/balance: logger is not set")
case p.MainnetClient == nil: case p.MainnetClient == nil:
return nil, errors.New("ir/balance: neo:mainnet client is not set") return nil, errors.New("ir/balance: neo:mainnet client is not set")
case p.ActiveState == nil: case p.AlphabetState == nil:
return nil, errors.New("ir/balance: global state is not set") return nil, errors.New("ir/balance: global state is not set")
case p.Converter == nil: case p.Converter == nil:
return nil, errors.New("ir/balance: balance precision converter is not set") return nil, errors.New("ir/balance: balance precision converter is not set")
@ -74,7 +74,7 @@ func New(p *Params) (*Processor, error) {
neofsContract: p.NeoFSContract, neofsContract: p.NeoFSContract,
balanceContract: p.BalanceContract, balanceContract: p.BalanceContract,
mainnetClient: p.MainnetClient, mainnetClient: p.MainnetClient,
activeState: p.ActiveState, alphabetState: p.AlphabetState,
converter: p.Converter, converter: p.Converter,
}, nil }, nil
} }

View file

@ -11,8 +11,8 @@ import (
// Process new container from the user by checking container sanity // Process new container from the user by checking container sanity
// and sending approve tx back to morph. // and sending approve tx back to morph.
func (cp *Processor) processContainerPut(put *containerEvent.Put) { func (cp *Processor) processContainerPut(put *containerEvent.Put) {
if !cp.activeState.IsActive() { if !cp.alphabetState.IsAlphabet() {
cp.log.Info("passive mode, ignore container put") cp.log.Info("non alphabet mode, ignore container put")
return return
} }
@ -51,8 +51,8 @@ func (cp *Processor) processContainerPut(put *containerEvent.Put) {
// Process delete container operation from the user by checking container sanity // Process delete container operation from the user by checking container sanity
// and sending approve tx back to morph. // and sending approve tx back to morph.
func (cp *Processor) processContainerDelete(delete *containerEvent.Delete) { func (cp *Processor) processContainerDelete(delete *containerEvent.Delete) {
if !cp.activeState.IsActive() { if !cp.alphabetState.IsAlphabet() {
cp.log.Info("passive mode, ignore container put") cp.log.Info("non alphabet mode, ignore container delete")
return return
} }

View file

@ -11,9 +11,9 @@ import (
) )
type ( type (
// ActiveState is a callback interface for inner ring global state. // AlphabetState is a callback interface for inner ring global state.
ActiveState interface { AlphabetState interface {
IsActive() bool IsAlphabet() bool
} }
// Processor of events produced by container contract in morph chain. // Processor of events produced by container contract in morph chain.
@ -22,7 +22,7 @@ type (
pool *ants.Pool pool *ants.Pool
containerContract util.Uint160 containerContract util.Uint160
morphClient *client.Client morphClient *client.Client
activeState ActiveState alphabetState AlphabetState
} }
// Params of the processor constructor. // Params of the processor constructor.
@ -31,7 +31,7 @@ type (
PoolSize int PoolSize int
ContainerContract util.Uint160 ContainerContract util.Uint160
MorphClient *client.Client MorphClient *client.Client
ActiveState ActiveState AlphabetState AlphabetState
} }
) )
@ -47,7 +47,7 @@ func New(p *Params) (*Processor, error) {
return nil, errors.New("ir/container: logger is not set") return nil, errors.New("ir/container: logger is not set")
case p.MorphClient == nil: case p.MorphClient == nil:
return nil, errors.New("ir/container: neo:morph client is not set") return nil, errors.New("ir/container: neo:morph client is not set")
case p.ActiveState == nil: case p.AlphabetState == nil:
return nil, errors.New("ir/container: global state is not set") return nil, errors.New("ir/container: global state is not set")
} }
@ -63,7 +63,7 @@ func New(p *Params) (*Processor, error) {
pool: pool, pool: pool,
containerContract: p.ContainerContract, containerContract: p.ContainerContract,
morphClient: p.MorphClient, morphClient: p.MorphClient,
activeState: p.ActiveState, alphabetState: p.AlphabetState,
}, nil }, nil
} }

View file

@ -18,8 +18,8 @@ const (
// Process deposit event by invoking balance contract and sending native // Process deposit event by invoking balance contract and sending native
// gas in morph chain. // gas in morph chain.
func (np *Processor) processDeposit(deposit *neofsEvent.Deposit) { func (np *Processor) processDeposit(deposit *neofsEvent.Deposit) {
if !np.activeState.IsActive() { if !np.alphabetState.IsAlphabet() {
np.log.Info("passive mode, ignore deposit") np.log.Info("non alphabet mode, ignore deposit")
return return
} }
@ -82,8 +82,8 @@ func (np *Processor) processDeposit(deposit *neofsEvent.Deposit) {
// Process withdraw event by locking assets in balance account. // Process withdraw event by locking assets in balance account.
func (np *Processor) processWithdraw(withdraw *neofsEvent.Withdraw) { func (np *Processor) processWithdraw(withdraw *neofsEvent.Withdraw) {
if !np.activeState.IsActive() { if !np.alphabetState.IsAlphabet() {
np.log.Info("passive mode, ignore withdraw") np.log.Info("non alphabet mode, ignore withdraw")
return return
} }
@ -118,8 +118,8 @@ func (np *Processor) processWithdraw(withdraw *neofsEvent.Withdraw) {
// Process cheque event by transferring assets from lock account back to // Process cheque event by transferring assets from lock account back to
// reserve account. // reserve account.
func (np *Processor) processCheque(cheque *neofsEvent.Cheque) { func (np *Processor) processCheque(cheque *neofsEvent.Cheque) {
if !np.activeState.IsActive() { if !np.alphabetState.IsAlphabet() {
np.log.Info("passive mode, ignore cheque") np.log.Info("non alphabet mode, ignore cheque")
return return
} }

View file

@ -9,8 +9,8 @@ import (
// Process config event by setting configuration value from main chain in // Process config event by setting configuration value from main chain in
// side chain. // side chain.
func (np *Processor) processConfig(config *neofsEvent.Config) { func (np *Processor) processConfig(config *neofsEvent.Config) {
if !np.activeState.IsActive() { if !np.alphabetState.IsAlphabet() {
np.log.Info("passive mode, ignore deposit") np.log.Info("non alphabet mode, ignore config")
return return
} }

View file

@ -9,8 +9,8 @@ import (
// Process update inner ring event by setting inner ring list value from // Process update inner ring event by setting inner ring list value from
// main chain in side chain. // main chain in side chain.
func (np *Processor) processUpdateInnerRing(list *neofsEvent.UpdateInnerRing) { func (np *Processor) processUpdateInnerRing(list *neofsEvent.UpdateInnerRing) {
if !np.activeState.IsActive() { if !np.alphabetState.IsAlphabet() {
np.log.Info("passive mode, ignore deposit") np.log.Info("non alphabet mode, ignore inner ring update")
return return
} }

View file

@ -20,9 +20,9 @@ type (
EpochCounter() uint64 EpochCounter() uint64
} }
// ActiveState is a callback interface for inner ring global state // AlphabetState is a callback interface for inner ring global state
ActiveState interface { AlphabetState interface {
IsActive() bool IsAlphabet() bool
} }
// PrecisionConverter converts balance amount values. // PrecisionConverter converts balance amount values.
@ -39,7 +39,7 @@ type (
netmapContract util.Uint160 netmapContract util.Uint160
morphClient *client.Client morphClient *client.Client
epochState EpochState epochState EpochState
activeState ActiveState alphabetState AlphabetState
converter PrecisionConverter converter PrecisionConverter
mintEmitLock *sync.Mutex mintEmitLock *sync.Mutex
mintEmitCache *lru.Cache mintEmitCache *lru.Cache
@ -57,7 +57,7 @@ type (
NetmapContract util.Uint160 NetmapContract util.Uint160
MorphClient *client.Client MorphClient *client.Client
EpochState EpochState EpochState EpochState
ActiveState ActiveState AlphabetState AlphabetState
Converter PrecisionConverter Converter PrecisionConverter
MintEmitCacheSize int MintEmitCacheSize int
MintEmitThreshold uint64 // in epochs MintEmitThreshold uint64 // in epochs
@ -83,7 +83,7 @@ func New(p *Params) (*Processor, error) {
return nil, errors.New("ir/neofs: neo:morph client is not set") return nil, errors.New("ir/neofs: neo:morph client is not set")
case p.EpochState == nil: case p.EpochState == nil:
return nil, errors.New("ir/neofs: global state is not set") return nil, errors.New("ir/neofs: global state is not set")
case p.ActiveState == nil: case p.AlphabetState == nil:
return nil, errors.New("ir/neofs: global state is not set") return nil, errors.New("ir/neofs: global state is not set")
case p.Converter == nil: case p.Converter == nil:
return nil, errors.New("ir/neofs: balance precision converter is not set") return nil, errors.New("ir/neofs: balance precision converter is not set")
@ -109,7 +109,7 @@ func New(p *Params) (*Processor, error) {
netmapContract: p.NetmapContract, netmapContract: p.NetmapContract,
morphClient: p.MorphClient, morphClient: p.MorphClient,
epochState: p.EpochState, epochState: p.EpochState,
activeState: p.ActiveState, alphabetState: p.AlphabetState,
converter: p.Converter, converter: p.Converter,
mintEmitLock: new(sync.Mutex), mintEmitLock: new(sync.Mutex),
mintEmitCache: lruCache, mintEmitCache: lruCache,

View file

@ -8,8 +8,8 @@ import (
) )
func (np *Processor) processNetmapCleanupTick(epoch uint64) { func (np *Processor) processNetmapCleanupTick(epoch uint64) {
if !np.activeState.IsActive() { if !np.alphabetState.IsAlphabet() {
np.log.Info("passive mode, ignore new netmap cleanup tick") np.log.Info("non alphabet mode, ignore new netmap cleanup tick")
return return
} }

View file

@ -42,8 +42,8 @@ func (np *Processor) processNewEpoch(epoch uint64) {
// Process new epoch tick by invoking new epoch method in network map contract. // Process new epoch tick by invoking new epoch method in network map contract.
func (np *Processor) processNewEpochTick() { func (np *Processor) processNewEpochTick() {
if !np.activeState.IsActive() { if !np.alphabetState.IsAlphabet() {
np.log.Info("passive mode, ignore new epoch tick") np.log.Info("non alphabet mode, ignore new epoch tick")
return return
} }

View file

@ -14,8 +14,8 @@ import (
// Process add peer notification by sanity check of new node // Process add peer notification by sanity check of new node
// local epoch timer. // local epoch timer.
func (np *Processor) processAddPeer(node []byte) { func (np *Processor) processAddPeer(node []byte) {
if !np.activeState.IsActive() { if !np.alphabetState.IsAlphabet() {
np.log.Info("passive mode, ignore new peer notification") np.log.Info("non alphabet mode, ignore new peer notification")
return return
} }
@ -74,8 +74,8 @@ func (np *Processor) processAddPeer(node []byte) {
// Process update peer notification by sending approval tx to the smart contract. // Process update peer notification by sending approval tx to the smart contract.
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) { func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
if !np.activeState.IsActive() { if !np.alphabetState.IsAlphabet() {
np.log.Info("passive mode, ignore new epoch tick") np.log.Info("non alphabet mode, ignore update peer notification")
return return
} }

View file

@ -24,9 +24,9 @@ type (
EpochCounter() uint64 EpochCounter() uint64
} }
// ActiveState is a callback interface for inner ring global state. // AlphabetState is a callback interface for inner ring global state.
ActiveState interface { AlphabetState interface {
IsActive() bool IsAlphabet() bool
} }
// NodeValidator wraps basic method of checking the correctness // NodeValidator wraps basic method of checking the correctness
@ -52,7 +52,7 @@ type (
netmapContract util.Uint160 netmapContract util.Uint160
epochTimer EpochTimerReseter epochTimer EpochTimerReseter
epochState EpochState epochState EpochState
activeState ActiveState alphabetState AlphabetState
morphClient *client.Client morphClient *client.Client
containerWrp *container.Wrapper containerWrp *container.Wrapper
@ -73,7 +73,7 @@ type (
EpochTimer EpochTimerReseter EpochTimer EpochTimerReseter
MorphClient *client.Client MorphClient *client.Client
EpochState EpochState EpochState EpochState
ActiveState ActiveState AlphabetState AlphabetState
CleanupEnabled bool CleanupEnabled bool
CleanupThreshold uint64 // in epochs CleanupThreshold uint64 // in epochs
ContainerWrapper *container.Wrapper ContainerWrapper *container.Wrapper
@ -102,7 +102,7 @@ func New(p *Params) (*Processor, error) {
return nil, errors.New("ir/netmap: epoch itmer is not set") return nil, errors.New("ir/netmap: epoch itmer is not set")
case p.EpochState == nil: case p.EpochState == nil:
return nil, errors.New("ir/netmap: global state is not set") return nil, errors.New("ir/netmap: global state is not set")
case p.ActiveState == nil: case p.AlphabetState == nil:
return nil, errors.New("ir/netmap: global state is not set") return nil, errors.New("ir/netmap: global state is not set")
case p.HandleAudit == nil: case p.HandleAudit == nil:
return nil, errors.New("ir/netmap: audit handler is not set") return nil, errors.New("ir/netmap: audit handler is not set")
@ -127,7 +127,7 @@ func New(p *Params) (*Processor, error) {
netmapContract: p.NetmapContract, netmapContract: p.NetmapContract,
epochTimer: p.EpochTimer, epochTimer: p.EpochTimer,
epochState: p.EpochState, epochState: p.EpochState,
activeState: p.ActiveState, alphabetState: p.AlphabetState,
morphClient: p.MorphClient, morphClient: p.MorphClient,
containerWrp: p.ContainerWrapper, containerWrp: p.ContainerWrapper,
netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold), netmapSnapshot: newCleanupTable(p.CleanupEnabled, p.CleanupThreshold),

View file

@ -45,8 +45,8 @@ func (p *Processor) HandleIncomeCollectionEvent(e event.Event) {
ev := e.(BasicIncomeCollectEvent) ev := e.(BasicIncomeCollectEvent)
epoch := ev.Epoch() epoch := ev.Epoch()
if !p.state.IsActive() { if !p.state.IsAlphabet() {
p.log.Info("passive mode, ignore income collection event") p.log.Info("non alphabet mode, ignore income collection event")
return return
} }
@ -90,8 +90,8 @@ func (p *Processor) HandleIncomeDistributionEvent(e event.Event) {
ev := e.(BasicIncomeDistributeEvent) ev := e.(BasicIncomeDistributeEvent)
epoch := ev.Epoch() epoch := ev.Epoch()
if !p.state.IsActive() { if !p.state.IsAlphabet() {
p.log.Info("passive mode, ignore income distribution event") p.log.Info("non alphabet mode, ignore income distribution event")
return return
} }

View file

@ -13,16 +13,16 @@ import (
) )
type ( type (
// ActiveState is a callback interface for inner ring global state // AlphabetState is a callback interface for inner ring global state
ActiveState interface { AlphabetState interface {
IsActive() bool IsAlphabet() bool
} }
// Processor is an event handler for payments in the system. // Processor is an event handler for payments in the system.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
state ActiveState state AlphabetState
pool nodeutil.WorkerPool pool nodeutil.WorkerPool
@ -38,7 +38,7 @@ type (
Prm struct { Prm struct {
AuditProcessor AuditProcessor AuditProcessor AuditProcessor
BasicIncome BasicIncomeInitializer BasicIncome BasicIncomeInitializer
State ActiveState State AlphabetState
} }
) )