[#374] Add inner-ring event metrics #405
27 changed files with 287 additions and 113 deletions
|
@ -52,6 +52,7 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
|
||||||
|
|
||||||
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
||||||
Log: s.log,
|
Log: s.log,
|
||||||
|
Metrics: s.metrics,
|
||||||
PoolSize: cfg.GetInt("workers.netmap"),
|
PoolSize: cfg.GetInt("workers.netmap"),
|
||||||
NetmapClient: netmap.NewNetmapClient(s.netmapClient),
|
NetmapClient: netmap.NewNetmapClient(s.netmapClient),
|
||||||
EpochTimer: s,
|
EpochTimer: s,
|
||||||
|
@ -162,6 +163,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
|
||||||
// create governance processor
|
// create governance processor
|
||||||
governanceProcessor, err := governance.New(&governance.Params{
|
governanceProcessor, err := governance.New(&governance.Params{
|
||||||
Log: s.log,
|
Log: s.log,
|
||||||
|
Metrics: s.metrics,
|
||||||
FrostFSClient: frostfsCli,
|
FrostFSClient: frostfsCli,
|
||||||
NetmapClient: s.netmapClient,
|
NetmapClient: s.netmapClient,
|
||||||
AlphabetState: s,
|
AlphabetState: s,
|
||||||
|
@ -231,6 +233,7 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
|
||||||
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
|
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
|
||||||
ParsedWallets: parsedWallets,
|
ParsedWallets: parsedWallets,
|
||||||
Log: s.log,
|
Log: s.log,
|
||||||
|
Metrics: s.metrics,
|
||||||
PoolSize: cfg.GetInt("workers.alphabet"),
|
PoolSize: cfg.GetInt("workers.alphabet"),
|
||||||
AlphabetContracts: s.contracts.alphabet,
|
AlphabetContracts: s.contracts.alphabet,
|
||||||
NetmapClient: s.netmapClient,
|
NetmapClient: s.netmapClient,
|
||||||
|
@ -255,6 +258,7 @@ func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.C
|
||||||
// container processor
|
// container processor
|
||||||
containerProcessor, err := cont.New(&cont.Params{
|
containerProcessor, err := cont.New(&cont.Params{
|
||||||
Log: s.log,
|
Log: s.log,
|
||||||
|
Metrics: s.metrics,
|
||||||
PoolSize: cfg.GetInt("workers.container"),
|
PoolSize: cfg.GetInt("workers.container"),
|
||||||
AlphabetState: s,
|
AlphabetState: s,
|
||||||
ContainerClient: cnrClient,
|
ContainerClient: cnrClient,
|
||||||
|
@ -273,6 +277,7 @@ func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClien
|
||||||
// create balance processor
|
// create balance processor
|
||||||
balanceProcessor, err := balance.New(&balance.Params{
|
balanceProcessor, err := balance.New(&balance.Params{
|
||||||
Log: s.log,
|
Log: s.log,
|
||||||
|
Metrics: s.metrics,
|
||||||
PoolSize: cfg.GetInt("workers.balance"),
|
PoolSize: cfg.GetInt("workers.balance"),
|
||||||
FrostFSClient: frostfsCli,
|
FrostFSClient: frostfsCli,
|
||||||
BalanceSC: s.contracts.balance,
|
BalanceSC: s.contracts.balance,
|
||||||
|
@ -293,6 +298,7 @@ func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *
|
||||||
|
|
||||||
frostfsProcessor, err := frostfs.New(&frostfs.Params{
|
frostfsProcessor, err := frostfs.New(&frostfs.Params{
|
||||||
Log: s.log,
|
Log: s.log,
|
||||||
|
Metrics: s.metrics,
|
||||||
PoolSize: cfg.GetInt("workers.frostfs"),
|
PoolSize: cfg.GetInt("workers.frostfs"),
|
||||||
FrostFSContract: s.contracts.frostfs,
|
FrostFSContract: s.contracts.frostfs,
|
||||||
FrostFSIDClient: frostfsIDClient,
|
FrostFSIDClient: frostfsIDClient,
|
||||||
|
|
15
pkg/innerring/metrics/metrics.go
Normal file
15
pkg/innerring/metrics/metrics.go
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type Register interface {
|
||||||
|
SetEpoch(epoch uint64)
|
||||||
|
SetHealth(s int32)
|
||||||
|
AddEvent(d time.Duration, typ string, success bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
type DefaultRegister struct{}
|
||||||
|
|
||||||
|
func (DefaultRegister) SetEpoch(uint64) {}
|
||||||
|
func (DefaultRegister) SetHealth(int32) {}
|
||||||
|
func (DefaultRegister) AddEvent(time.Duration, string, bool) {}
|
|
@ -2,6 +2,7 @@ package alphabet
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -13,7 +14,7 @@ func (ap *Processor) HandleGasEmission(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := ap.pool.Submit(func() { ap.processEmit() })
|
err := processors.SubmitEvent(ap.pool, ap.metrics, "alphabet_emit_gas", ap.processEmit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained,
|
ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained,
|
||||||
|
|
|
@ -13,12 +13,12 @@ import (
|
||||||
|
|
||||||
const emitMethod = "emit"
|
const emitMethod = "emit"
|
||||||
|
|
||||||
func (ap *Processor) processEmit() {
|
func (ap *Processor) processEmit() bool {
|
||||||
index := ap.irList.AlphabetIndex()
|
index := ap.irList.AlphabetIndex()
|
||||||
if index < 0 {
|
if index < 0 {
|
||||||
ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent)
|
ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent)
|
||||||
|
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
contract, ok := ap.alphabetContracts.GetByIndex(index)
|
contract, ok := ap.alphabetContracts.GetByIndex(index)
|
||||||
|
@ -26,7 +26,7 @@ func (ap *Processor) processEmit() {
|
||||||
ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent,
|
ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent,
|
||||||
zap.Int("index", index))
|
zap.Int("index", index))
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// there is no signature collecting, so we don't need extra fee
|
// there is no signature collecting, so we don't need extra fee
|
||||||
|
@ -34,13 +34,13 @@ func (ap *Processor) processEmit() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error()))
|
ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error()))
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if ap.storageEmission == 0 {
|
if ap.storageEmission == 0 {
|
||||||
ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff)
|
ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff)
|
||||||
|
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
networkMap, err := ap.netmapClient.NetMap()
|
networkMap, err := ap.netmapClient.NetMap()
|
||||||
|
@ -48,7 +48,7 @@ func (ap *Processor) processEmit() {
|
||||||
ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes,
|
ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
nmNodes := networkMap.Nodes()
|
nmNodes := networkMap.Nodes()
|
||||||
|
@ -63,7 +63,7 @@ func (ap *Processor) processEmit() {
|
||||||
zap.Int("extra_wallets", extraLen))
|
zap.Int("extra_wallets", extraLen))
|
||||||
|
|
||||||
if nmLen+extraLen == 0 {
|
if nmLen+extraLen == 0 {
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen))
|
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen))
|
||||||
|
@ -71,6 +71,8 @@ func (ap *Processor) processEmit() {
|
||||||
ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
|
ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
|
||||||
|
|
||||||
ap.transferGasToExtraNodes(pw, gasPerNode)
|
ap.transferGasToExtraNodes(pw, gasPerNode)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {
|
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
@ -50,6 +51,7 @@ type (
|
||||||
// protects parsedWallets from concurrent change
|
// protects parsedWallets from concurrent change
|
||||||
pwLock *sync.RWMutex
|
pwLock *sync.RWMutex
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
metrics metrics.Register
|
||||||
pool *ants.Pool
|
pool *ants.Pool
|
||||||
alphabetContracts Contracts
|
alphabetContracts Contracts
|
||||||
netmapClient netmapClient
|
netmapClient netmapClient
|
||||||
|
@ -62,6 +64,7 @@ type (
|
||||||
Params struct {
|
Params struct {
|
||||||
ParsedWallets []util.Uint160
|
ParsedWallets []util.Uint160
|
||||||
Log *logger.Logger
|
Log *logger.Logger
|
||||||
|
Metrics metrics.Register
|
||||||
PoolSize int
|
PoolSize int
|
||||||
AlphabetContracts Contracts
|
AlphabetContracts Contracts
|
||||||
NetmapClient netmapClient
|
NetmapClient netmapClient
|
||||||
|
@ -89,10 +92,16 @@ func New(p *Params) (*Processor, error) {
|
||||||
return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err)
|
return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metricsRegister := p.Metrics
|
||||||
|
if metricsRegister == nil {
|
||||||
|
metricsRegister = metrics.DefaultRegister{}
|
||||||
|
}
|
||||||
|
|
||||||
return &Processor{
|
return &Processor{
|
||||||
parsedWallets: p.ParsedWallets,
|
parsedWallets: p.ParsedWallets,
|
||||||
pwLock: new(sync.RWMutex),
|
pwLock: new(sync.RWMutex),
|
||||||
log: p.Log,
|
log: p.Log,
|
||||||
|
metrics: metricsRegister,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
alphabetContracts: p.AlphabetContracts,
|
alphabetContracts: p.AlphabetContracts,
|
||||||
netmapClient: p.NetmapClient,
|
netmapClient: p.NetmapClient,
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
|
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -17,7 +18,9 @@ func (bp *Processor) handleLock(ev event.Event) {
|
||||||
|
|
||||||
// send an event to the worker pool
|
// send an event to the worker pool
|
||||||
|
|
||||||
err := bp.pool.Submit(func() { bp.processLock(&lock) })
|
err := processors.SubmitEvent(bp.pool, bp.metrics, "lock", func() bool {
|
||||||
|
return bp.processLock(&lock)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained,
|
bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained,
|
||||||
|
|
|
@ -9,10 +9,10 @@ import (
|
||||||
|
|
||||||
// Process lock event by invoking Cheque method in main net to send assets
|
// Process lock event by invoking Cheque method in main net to send assets
|
||||||
// back to the withdraw issuer.
|
// back to the withdraw issuer.
|
||||||
func (bp *Processor) processLock(lock *balanceEvent.Lock) {
|
func (bp *Processor) processLock(lock *balanceEvent.Lock) bool {
|
||||||
if !bp.alphabetState.IsAlphabet() {
|
if !bp.alphabetState.IsAlphabet() {
|
||||||
bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock)
|
bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
prm := frostfsContract.ChequePrm{}
|
prm := frostfsContract.ChequePrm{}
|
||||||
|
@ -26,5 +26,8 @@ func (bp *Processor) processLock(lock *balanceEvent.Lock) {
|
||||||
err := bp.frostfsClient.Cheque(prm)
|
err := bp.frostfsClient.Cheque(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err))
|
bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err))
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||||
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
|
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
|
||||||
|
@ -32,6 +33,7 @@ type (
|
||||||
// Processor of events produced by balance contract in the morphchain.
|
// Processor of events produced by balance contract in the morphchain.
|
||||||
Processor struct {
|
Processor struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
metrics metrics.Register
|
||||||
pool *ants.Pool
|
pool *ants.Pool
|
||||||
frostfsClient FrostFSClient
|
frostfsClient FrostFSClient
|
||||||
balanceSC util.Uint160
|
balanceSC util.Uint160
|
||||||
|
@ -42,6 +44,7 @@ type (
|
||||||
// Params of the processor constructor.
|
// Params of the processor constructor.
|
||||||
Params struct {
|
Params struct {
|
||||||
Log *logger.Logger
|
Log *logger.Logger
|
||||||
|
Metrics metrics.Register
|
||||||
PoolSize int
|
PoolSize int
|
||||||
FrostFSClient FrostFSClient
|
FrostFSClient FrostFSClient
|
||||||
BalanceSC util.Uint160
|
BalanceSC util.Uint160
|
||||||
|
@ -72,8 +75,14 @@ func New(p *Params) (*Processor, error) {
|
||||||
return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err)
|
return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metricsRegister := p.Metrics
|
||||||
|
if metricsRegister == nil {
|
||||||
|
metricsRegister = metrics.DefaultRegister{}
|
||||||
|
}
|
||||||
|
|
||||||
return &Processor{
|
return &Processor{
|
||||||
log: p.Log,
|
log: p.Log,
|
||||||
|
metrics: metricsRegister,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
frostfsClient: p.FrostFSClient,
|
frostfsClient: p.FrostFSClient,
|
||||||
balanceSC: p.BalanceSC,
|
balanceSC: p.BalanceSC,
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
|
@ -20,7 +21,9 @@ func (cp *Processor) handlePut(ev event.Event) {
|
||||||
|
|
||||||
// send an event to the worker pool
|
// send an event to the worker pool
|
||||||
|
|
||||||
err := cp.pool.Submit(func() { cp.processContainerPut(put) })
|
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_put", func() bool {
|
||||||
|
return cp.processContainerPut(put)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
|
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||||
|
@ -36,7 +39,9 @@ func (cp *Processor) handleDelete(ev event.Event) {
|
||||||
|
|
||||||
// send an event to the worker pool
|
// send an event to the worker pool
|
||||||
|
|
||||||
err := cp.pool.Submit(func() { cp.processContainerDelete(del) })
|
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_delete", func() bool {
|
||||||
|
return cp.processContainerDelete(del)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
|
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||||
|
@ -53,8 +58,8 @@ func (cp *Processor) handleSetEACL(ev event.Event) {
|
||||||
|
|
||||||
// send an event to the worker pool
|
// send an event to the worker pool
|
||||||
|
|
||||||
err := cp.pool.Submit(func() {
|
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_set_eacl", func() bool {
|
||||||
cp.processSetEACL(e)
|
return cp.processSetEACL(e)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
|
|
|
@ -31,10 +31,10 @@ type putContainerContext struct {
|
||||||
|
|
||||||
// Process a new container from the user by checking the container sanity
|
// Process a new container from the user by checking the container sanity
|
||||||
// and sending approve tx back to the morph.
|
// and sending approve tx back to the morph.
|
||||||
func (cp *Processor) processContainerPut(put putEvent) {
|
func (cp *Processor) processContainerPut(put putEvent) bool {
|
||||||
if !cp.alphabetState.IsAlphabet() {
|
if !cp.alphabetState.IsAlphabet() {
|
||||||
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut)
|
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx := &putContainerContext{
|
ctx := &putContainerContext{
|
||||||
|
@ -47,10 +47,17 @@ func (cp *Processor) processContainerPut(put putEvent) {
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
cp.approvePutContainer(ctx)
|
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
|
||||||
|
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
|
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
|
||||||
|
@ -89,20 +96,12 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) approvePutContainer(ctx *putContainerContext) {
|
|
||||||
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
|
|
||||||
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process delete container operation from the user by checking container sanity
|
// Process delete container operation from the user by checking container sanity
|
||||||
// and sending approve tx back to morph.
|
// and sending approve tx back to morph.
|
||||||
func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
|
func (cp *Processor) processContainerDelete(e containerEvent.Delete) bool {
|
||||||
if !cp.alphabetState.IsAlphabet() {
|
if !cp.alphabetState.IsAlphabet() {
|
||||||
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete)
|
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
err := cp.checkDeleteContainer(e)
|
err := cp.checkDeleteContainer(e)
|
||||||
|
@ -111,10 +110,18 @@ func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
cp.approveDeleteContainer(e)
|
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
|
||||||
|
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
|
func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
|
||||||
|
@ -149,14 +156,6 @@ func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) approveDeleteContainer(e containerEvent.Delete) {
|
|
||||||
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
|
|
||||||
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error {
|
func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error {
|
||||||
// fetch domain info
|
// fetch domain info
|
||||||
ctx.d = containerSDK.ReadDomain(cnr)
|
ctx.d = containerSDK.ReadDomain(cnr)
|
||||||
|
|
|
@ -12,10 +12,10 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
|
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) bool {
|
||||||
if !cp.alphabetState.IsAlphabet() {
|
if !cp.alphabetState.IsAlphabet() {
|
||||||
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL)
|
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
err := cp.checkSetEACL(e)
|
err := cp.checkSetEACL(e)
|
||||||
|
@ -24,10 +24,17 @@ func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
cp.approveSetEACL(e)
|
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
|
||||||
|
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
|
||||||
|
zap.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
|
func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
|
||||||
|
@ -73,11 +80,3 @@ func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cp *Processor) approveSetEACL(e containerEvent.SetEACL) {
|
|
||||||
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
|
|
||||||
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
||||||
|
@ -40,6 +41,7 @@ type (
|
||||||
// Processor of events produced by container contract in the sidechain.
|
// Processor of events produced by container contract in the sidechain.
|
||||||
Processor struct {
|
Processor struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
metrics metrics.Register
|
||||||
pool *ants.Pool
|
pool *ants.Pool
|
||||||
alphabetState AlphabetState
|
alphabetState AlphabetState
|
||||||
cnrClient ContClient // notary must be enabled
|
cnrClient ContClient // notary must be enabled
|
||||||
|
@ -51,6 +53,7 @@ type (
|
||||||
// Params of the processor constructor.
|
// Params of the processor constructor.
|
||||||
Params struct {
|
Params struct {
|
||||||
Log *logger.Logger
|
Log *logger.Logger
|
||||||
|
Metrics metrics.Register
|
||||||
PoolSize int
|
PoolSize int
|
||||||
AlphabetState AlphabetState
|
AlphabetState AlphabetState
|
||||||
ContainerClient ContClient
|
ContainerClient ContClient
|
||||||
|
@ -102,8 +105,14 @@ func New(p *Params) (*Processor, error) {
|
||||||
return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err)
|
return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metricsRegister := p.Metrics
|
||||||
|
if metricsRegister == nil {
|
||||||
|
metricsRegister = metrics.DefaultRegister{}
|
||||||
|
}
|
||||||
|
|
||||||
return &Processor{
|
return &Processor{
|
||||||
log: p.Log,
|
log: p.Log,
|
||||||
|
metrics: metricsRegister,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
alphabetState: p.AlphabetState,
|
alphabetState: p.AlphabetState,
|
||||||
cnrClient: p.ContainerClient,
|
cnrClient: p.ContainerClient,
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs"
|
frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||||
|
@ -18,7 +19,9 @@ func (np *Processor) handleDeposit(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() { np.processDeposit(deposit) })
|
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_deposit", func() bool {
|
||||||
|
return np.processDeposit(deposit)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
|
@ -34,7 +37,9 @@ func (np *Processor) handleWithdraw(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() { np.processWithdraw(withdraw) })
|
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_withdraw", func() bool {
|
||||||
|
return np.processWithdraw(withdraw)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
|
@ -50,7 +55,9 @@ func (np *Processor) handleCheque(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() { np.processCheque(cheque) })
|
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_cheque", func() bool {
|
||||||
|
return np.processCheque(cheque)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
|
@ -67,7 +74,9 @@ func (np *Processor) handleConfig(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() { np.processConfig(cfg) })
|
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_config", func() bool {
|
||||||
|
return np.processConfig(cfg)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
|
@ -83,7 +92,9 @@ func (np *Processor) handleBind(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() { np.processBind(e, true) })
|
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_bind", func() bool {
|
||||||
|
return np.processBind(e, true)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
|
@ -99,7 +110,9 @@ func (np *Processor) handleUnbind(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() { np.processBind(e, false) })
|
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_unbind", func() bool {
|
||||||
|
return np.processBind(e, false)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||||
|
|
|
@ -15,10 +15,10 @@ const (
|
||||||
|
|
||||||
// Process deposit event by invoking a balance contract and sending native
|
// Process deposit event by invoking a balance contract and sending native
|
||||||
// gas in the sidechain.
|
// gas in the sidechain.
|
||||||
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit)
|
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
prm := balance.MintPrm{}
|
prm := balance.MintPrm{}
|
||||||
|
@ -49,7 +49,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
||||||
zap.Uint64("last_emission", val),
|
zap.Uint64("last_emission", val),
|
||||||
zap.Uint64("current_epoch", curEpoch))
|
zap.Uint64("current_epoch", curEpoch))
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// get gas balance of the node
|
// get gas balance of the node
|
||||||
|
@ -57,7 +57,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
||||||
balance, err := np.morphClient.GasBalance()
|
balance, err := np.morphClient.GasBalance()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err))
|
np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err))
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if balance < np.gasBalanceThreshold {
|
if balance < np.gasBalanceThreshold {
|
||||||
|
@ -65,7 +65,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
||||||
zap.Int64("balance", balance),
|
zap.Int64("balance", balance),
|
||||||
zap.Int64("threshold", np.gasBalanceThreshold))
|
zap.Int64("threshold", np.gasBalanceThreshold))
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
err = np.morphClient.TransferGas(receiver, np.mintEmitValue)
|
err = np.morphClient.TransferGas(receiver, np.mintEmitValue)
|
||||||
|
@ -73,24 +73,26 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
||||||
np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver,
|
np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
np.mintEmitCache.Add(receiver.String(), curEpoch)
|
np.mintEmitCache.Add(receiver.String(), curEpoch)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process withdraw event by locking assets in the balance account.
|
// Process withdraw event by locking assets in the balance account.
|
||||||
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
|
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw)
|
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// create lock account
|
// create lock account
|
||||||
lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size])
|
lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err))
|
np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err))
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
curEpoch := np.epochState.EpochCounter()
|
curEpoch := np.epochState.EpochCounter()
|
||||||
|
@ -106,15 +108,18 @@ func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
|
||||||
err = np.balanceClient.Lock(prm)
|
err = np.balanceClient.Lock(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err))
|
np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err))
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process cheque event by transferring assets from the lock account back to
|
// Process cheque event by transferring assets from the lock account back to
|
||||||
// the reserve account.
|
// the reserve account.
|
||||||
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
|
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque)
|
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
prm := balance.BurnPrm{}
|
prm := balance.BurnPrm{}
|
||||||
|
@ -126,5 +131,8 @@ func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
|
||||||
err := np.balanceClient.Burn(prm)
|
err := np.balanceClient.Burn(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err))
|
np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err))
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,10 +18,10 @@ type bindCommon interface {
|
||||||
TxHash() util.Uint256
|
TxHash() util.Uint256
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *Processor) processBind(e bindCommon, bind bool) {
|
func (np *Processor) processBind(e bindCommon, bind bool) bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind)
|
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
c := &bindCommonContext{
|
c := &bindCommonContext{
|
||||||
|
@ -36,10 +36,10 @@ func (np *Processor) processBind(e bindCommon, bind bool) {
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
np.approveBindCommon(c)
|
return np.approveBindCommon(c) == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type bindCommonContext struct {
|
type bindCommonContext struct {
|
||||||
|
@ -70,7 +70,7 @@ func (np *Processor) checkBindCommon(e *bindCommonContext) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *Processor) approveBindCommon(e *bindCommonContext) {
|
func (np *Processor) approveBindCommon(e *bindCommonContext) error {
|
||||||
// calculate wallet address
|
// calculate wallet address
|
||||||
scriptHash := e.User()
|
scriptHash := e.User()
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var id user.ID
|
var id user.ID
|
||||||
|
@ -104,4 +104,6 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
|
||||||
np.log.Error(fmt.Sprintf("could not approve %s", typ),
|
np.log.Error(fmt.Sprintf("could not approve %s", typ),
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,10 +9,10 @@ import (
|
||||||
|
|
||||||
// Process config event by setting configuration value from the mainchain in
|
// Process config event by setting configuration value from the mainchain in
|
||||||
// the sidechain.
|
// the sidechain.
|
||||||
func (np *Processor) processConfig(config frostfsEvent.Config) {
|
func (np *Processor) processConfig(config frostfsEvent.Config) bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig)
|
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
prm := nmClient.SetConfigPrm{}
|
prm := nmClient.SetConfigPrm{}
|
||||||
|
@ -25,5 +25,8 @@ func (np *Processor) processConfig(config frostfsEvent.Config) {
|
||||||
err := np.netmapClient.SetConfig(prm)
|
err := np.netmapClient.SetConfig(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err))
|
np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err))
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
||||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
|
@ -58,6 +59,7 @@ type (
|
||||||
// Processor of events produced by frostfs contract in main net.
|
// Processor of events produced by frostfs contract in main net.
|
||||||
Processor struct {
|
Processor struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
metrics metrics.Register
|
||||||
pool *ants.Pool
|
pool *ants.Pool
|
||||||
frostfsContract util.Uint160
|
frostfsContract util.Uint160
|
||||||
balanceClient BalanceClient
|
balanceClient BalanceClient
|
||||||
|
@ -77,6 +79,7 @@ type (
|
||||||
// Params of the processor constructor.
|
// Params of the processor constructor.
|
||||||
Params struct {
|
Params struct {
|
||||||
Log *logger.Logger
|
Log *logger.Logger
|
||||||
|
Metrics metrics.Register
|
||||||
PoolSize int
|
PoolSize int
|
||||||
FrostFSContract util.Uint160
|
FrostFSContract util.Uint160
|
||||||
FrostFSIDClient IDClient
|
FrostFSIDClient IDClient
|
||||||
|
@ -129,8 +132,14 @@ func New(p *Params) (*Processor, error) {
|
||||||
return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err)
|
return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metricsRegister := p.Metrics
|
||||||
|
if metricsRegister == nil {
|
||||||
|
metricsRegister = metrics.DefaultRegister{}
|
||||||
|
}
|
||||||
|
|
||||||
return &Processor{
|
return &Processor{
|
||||||
log: p.Log,
|
log: p.Log,
|
||||||
|
metrics: metricsRegister,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
frostfsContract: p.FrostFSContract,
|
frostfsContract: p.FrostFSContract,
|
||||||
balanceClient: p.BalanceClient,
|
balanceClient: p.BalanceClient,
|
||||||
|
|
|
@ -2,6 +2,7 @@ package governance
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/native"
|
"github.com/nspcc-dev/neo-go/pkg/core/native"
|
||||||
|
@ -35,7 +36,9 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := gp.pool.Submit(func() { gp.processAlphabetSync(hash) })
|
err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
|
||||||
|
return gp.processAlphabetSync(hash)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained,
|
gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained,
|
||||||
|
|
|
@ -18,36 +18,36 @@ const (
|
||||||
alphabetUpdateIDPrefix = "AlphabetUpdate"
|
alphabetUpdateIDPrefix = "AlphabetUpdate"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
|
func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
|
||||||
if !gp.alphabetState.IsAlphabet() {
|
if !gp.alphabetState.IsAlphabet() {
|
||||||
gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
|
gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
|
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet,
|
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
sidechainAlphabet, err := gp.morphClient.Committee()
|
sidechainAlphabet, err := gp.morphClient.Committee()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain,
|
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
|
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
|
gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if newAlphabet == nil {
|
if newAlphabet == nil {
|
||||||
gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
|
gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
|
gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
|
||||||
|
@ -77,6 +77,8 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
|
||||||
gp.updateFrostFSContractInMainnet(newAlphabet)
|
gp.updateFrostFSContractInMainnet(newAlphabet)
|
||||||
|
|
||||||
gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate)
|
gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func prettyKeys(keys keys.PublicKeys) string {
|
func prettyKeys(keys keys.PublicKeys) string {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||||
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
||||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||||
|
@ -75,6 +76,7 @@ type (
|
||||||
// Processor of events related to governance in the network.
|
// Processor of events related to governance in the network.
|
||||||
Processor struct {
|
Processor struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
metrics metrics.Register
|
||||||
pool *ants.Pool
|
pool *ants.Pool
|
||||||
frostfsClient FrostFSClient
|
frostfsClient FrostFSClient
|
||||||
netmapClient NetmapClient
|
netmapClient NetmapClient
|
||||||
|
@ -93,6 +95,7 @@ type (
|
||||||
// Params of the processor constructor.
|
// Params of the processor constructor.
|
||||||
Params struct {
|
Params struct {
|
||||||
Log *logger.Logger
|
Log *logger.Logger
|
||||||
|
Metrics metrics.Register
|
||||||
|
|
||||||
AlphabetState AlphabetState
|
AlphabetState AlphabetState
|
||||||
EpochState EpochState
|
EpochState EpochState
|
||||||
|
@ -130,11 +133,17 @@ func New(p *Params) (*Processor, error) {
|
||||||
return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err)
|
return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metricsRegister := p.Metrics
|
||||||
|
if metricsRegister == nil {
|
||||||
|
metricsRegister = metrics.DefaultRegister{}
|
||||||
|
}
|
||||||
|
|
||||||
// result is cached by neo-go, so we can pre-calc it
|
// result is cached by neo-go, so we can pre-calc it
|
||||||
designate := p.MainnetClient.GetDesignateHash()
|
designate := p.MainnetClient.GetDesignateHash()
|
||||||
|
|
||||||
return &Processor{
|
return &Processor{
|
||||||
log: p.Log,
|
log: p.Log,
|
||||||
|
metrics: metricsRegister,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
frostfsClient: p.FrostFSClient,
|
frostfsClient: p.FrostFSClient,
|
||||||
netmapClient: p.NetmapClient,
|
netmapClient: p.NetmapClient,
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||||
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
|
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
|
||||||
|
@ -16,7 +17,7 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
|
||||||
|
|
||||||
// send an event to the worker pool
|
// send an event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() { np.processNewEpochTick() })
|
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
np.log.Warn(logs.NetmapNetmapWorkerPoolDrained,
|
np.log.Warn(logs.NetmapNetmapWorkerPoolDrained,
|
||||||
|
@ -32,8 +33,8 @@ func (np *Processor) handleNewEpoch(ev event.Event) {
|
||||||
|
|
||||||
// send an event to the worker pool
|
// send an event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() {
|
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
|
||||||
np.processNewEpoch(epochEvent)
|
return np.processNewEpoch(epochEvent)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
|
@ -51,8 +52,8 @@ func (np *Processor) handleAddPeer(ev event.Event) {
|
||||||
|
|
||||||
// send an event to the worker pool
|
// send an event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() {
|
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_add_peer", func() bool {
|
||||||
np.processAddPeer(newPeer)
|
return np.processAddPeer(newPeer)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
|
@ -69,8 +70,8 @@ func (np *Processor) handleUpdateState(ev event.Event) {
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
|
|
||||||
err := np.pool.Submit(func() {
|
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_update_peer", func() bool {
|
||||||
np.processUpdatePeer(updPeer)
|
return np.processUpdatePeer(updPeer)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
|
@ -91,8 +92,8 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
|
||||||
np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner"))
|
np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner"))
|
||||||
|
|
||||||
// send event to the worker pool
|
// send event to the worker pool
|
||||||
err := np.pool.Submit(func() {
|
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
|
||||||
np.processNetmapCleanupTick(cleanup)
|
return np.processNetmapCleanupTick(cleanup)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// there system can be moved into controlled degradation stage
|
// there system can be moved into controlled degradation stage
|
||||||
|
|
|
@ -7,11 +7,11 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
|
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick)
|
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick)
|
||||||
|
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error {
|
err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error {
|
||||||
|
@ -47,5 +47,8 @@ func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache,
|
np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
|
|
||||||
// Process new epoch notification by setting global epoch value and resetting
|
// Process new epoch notification by setting global epoch value and resetting
|
||||||
// local epoch timer.
|
// local epoch timer.
|
||||||
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
|
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
|
||||||
epoch := ev.EpochNumber()
|
epoch := ev.EpochNumber()
|
||||||
|
|
||||||
epochDuration, err := np.netmapClient.EpochDuration()
|
epochDuration, err := np.netmapClient.EpochDuration()
|
||||||
|
@ -41,7 +41,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
|
||||||
np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
|
np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
|
||||||
zap.String("error", err.Error()))
|
zap.String("error", err.Error()))
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
prm := cntClient.StartEstimationPrm{}
|
prm := cntClient.StartEstimationPrm{}
|
||||||
|
@ -63,13 +63,15 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
|
||||||
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
|
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
|
||||||
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
|
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
|
||||||
np.handleNotaryDeposit(ev)
|
np.handleNotaryDeposit(ev)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process new epoch tick by invoking new epoch method in network map contract.
|
// Process new epoch tick by invoking new epoch method in network map contract.
|
||||||
func (np *Processor) processNewEpochTick() {
|
func (np *Processor) processNewEpochTick() bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
|
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
nextEpoch := np.epochState.EpochCounter() + 1
|
nextEpoch := np.epochState.EpochCounter() + 1
|
||||||
|
@ -78,5 +80,8 @@ func (np *Processor) processNewEpochTick() {
|
||||||
err := np.netmapClient.NewEpoch(nextEpoch, false)
|
err := np.netmapClient.NewEpoch(nextEpoch, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
|
np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,10 +12,10 @@ import (
|
||||||
|
|
||||||
// Process add peer notification by sanity check of new node
|
// Process add peer notification by sanity check of new node
|
||||||
// local epoch timer.
|
// local epoch timer.
|
||||||
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification)
|
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if notary transaction is valid, see #976
|
// check if notary transaction is valid, see #976
|
||||||
|
@ -26,7 +26,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
||||||
zap.String("method", "netmap.AddPeer"),
|
zap.String("method", "netmap.AddPeer"),
|
||||||
zap.String("hash", tx.Hash().StringLE()),
|
zap.String("hash", tx.Hash().StringLE()),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// unmarshal node info
|
// unmarshal node info
|
||||||
|
@ -34,7 +34,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
||||||
if err := nodeInfo.Unmarshal(ev.Node()); err != nil {
|
if err := nodeInfo.Unmarshal(ev.Node()); err != nil {
|
||||||
// it will be nice to have tx id at event structure to log it
|
// it will be nice to have tx id at event structure to log it
|
||||||
np.log.Warn(logs.NetmapCantParseNetworkMapCandidate)
|
np.log.Warn(logs.NetmapCantParseNetworkMapCandidate)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// validate and update node info
|
// validate and update node info
|
||||||
|
@ -44,7 +44,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// sort attributes to make it consistent
|
// sort attributes to make it consistent
|
||||||
|
@ -81,15 +81,18 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err))
|
np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err))
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process update peer notification by sending approval tx to the smart contract.
|
// Process update peer notification by sending approval tx to the smart contract.
|
||||||
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
|
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) bool {
|
||||||
if !np.alphabetState.IsAlphabet() {
|
if !np.alphabetState.IsAlphabet() {
|
||||||
np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification)
|
np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification)
|
||||||
return
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// flag node to remove from local view, so it can be re-bootstrapped
|
// flag node to remove from local view, so it can be re-bootstrapped
|
||||||
|
@ -105,11 +108,14 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
|
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil {
|
if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil {
|
||||||
np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err))
|
np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err))
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
||||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||||
|
@ -72,6 +73,7 @@ type (
|
||||||
// and new epoch ticker, because it is related to contract.
|
// and new epoch ticker, because it is related to contract.
|
||||||
Processor struct {
|
Processor struct {
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
metrics metrics.Register
|
||||||
pool *ants.Pool
|
pool *ants.Pool
|
||||||
epochTimer EpochTimerReseter
|
epochTimer EpochTimerReseter
|
||||||
epochState EpochState
|
epochState EpochState
|
||||||
|
@ -93,6 +95,7 @@ type (
|
||||||
// Params of the processor constructor.
|
// Params of the processor constructor.
|
||||||
Params struct {
|
Params struct {
|
||||||
Log *logger.Logger
|
Log *logger.Logger
|
||||||
|
Metrics metrics.Register
|
||||||
PoolSize int
|
PoolSize int
|
||||||
NetmapClient Client
|
NetmapClient Client
|
||||||
EpochTimer EpochTimerReseter
|
EpochTimer EpochTimerReseter
|
||||||
|
@ -145,8 +148,14 @@ func New(p *Params) (*Processor, error) {
|
||||||
return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err)
|
return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metricsRegister := p.Metrics
|
||||||
|
if metricsRegister == nil {
|
||||||
|
metricsRegister = metrics.DefaultRegister{}
|
||||||
|
}
|
||||||
|
|
||||||
return &Processor{
|
return &Processor{
|
||||||
log: p.Log,
|
log: p.Log,
|
||||||
|
metrics: metricsRegister,
|
||||||
pool: pool,
|
pool: pool,
|
||||||
epochTimer: p.EpochTimer,
|
epochTimer: p.EpochTimer,
|
||||||
epochState: p.EpochState,
|
epochState: p.EpochState,
|
||||||
|
|
16
pkg/innerring/processors/util.go
Normal file
16
pkg/innerring/processors/util.go
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
package processors
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func SubmitEvent(pool *ants.Pool, metrics metrics.Register, eventLabel string, eventProcessor func() bool) error {
|
||||||
|
return pool.Submit(func() {
|
||||||
|
start := time.Now()
|
||||||
|
|||||||
|
success := eventProcessor()
|
||||||
|
metrics.AddEvent(time.Since(start), eventLabel, success)
|
||||||
|
})
|
||||||
|
}
|
|
@ -1,13 +1,23 @@
|
||||||
package metrics
|
package metrics
|
||||||
|
|
||||||
import "github.com/prometheus/client_golang/prometheus"
|
import (
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
const innerRingSubsystem = "ir"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
innerRingSubsystem = "ir"
|
||||||
|
innerRingLabelSuccess = "success"
|
||||||
|
innerRingLabelType = "type"
|
||||||
|
)
|
||||||
|
|
||||||
// InnerRingServiceMetrics contains metrics collected by inner ring.
|
// InnerRingServiceMetrics contains metrics collected by inner ring.
|
||||||
type InnerRingServiceMetrics struct {
|
type InnerRingServiceMetrics struct {
|
||||||
epoch metric[prometheus.Gauge]
|
epoch metric[prometheus.Gauge]
|
||||||
health metric[prometheus.Gauge]
|
health metric[prometheus.Gauge]
|
||||||
|
eventDuration metric[*prometheus.HistogramVec]
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewInnerRingMetrics returns new instance of metrics collectors for inner ring.
|
// NewInnerRingMetrics returns new instance of metrics collectors for inner ring.
|
||||||
|
@ -25,14 +35,22 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
|
||||||
Name: "health",
|
Name: "health",
|
||||||
Help: "Current inner-ring node state.",
|
Help: "Current inner-ring node state.",
|
||||||
})
|
})
|
||||||
|
eventDuration = newHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: innerRingSubsystem,
|
||||||
|
Name: "event_duration_seconds",
|
||||||
|
Help: "Duration of processing of inner-ring events",
|
||||||
|
}, []string{innerRingLabelType, innerRingLabelSuccess})
|
||||||
)
|
)
|
||||||
|
|
||||||
mustRegister(epoch)
|
mustRegister(epoch)
|
||||||
mustRegister(health)
|
mustRegister(health)
|
||||||
|
mustRegister(eventDuration)
|
||||||
|
|
||||||
return &InnerRingServiceMetrics{
|
return &InnerRingServiceMetrics{
|
||||||
epoch: epoch,
|
epoch: epoch,
|
||||||
health: health,
|
health: health,
|
||||||
|
eventDuration: eventDuration,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,3 +63,10 @@ func (m InnerRingServiceMetrics) SetEpoch(epoch uint64) {
|
||||||
func (m InnerRingServiceMetrics) SetHealth(s int32) {
|
func (m InnerRingServiceMetrics) SetHealth(s int32) {
|
||||||
m.health.value.Set(float64(s))
|
m.health.value.Set(float64(s))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success bool) {
|
||||||
|
m.eventDuration.value.With(prometheus.Labels{
|
||||||
|
innerRingLabelType: typ,
|
||||||
|
innerRingLabelSuccess: strconv.FormatBool(success),
|
||||||
|
}).Observe(d.Seconds())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue
Why did you decide to have
start
here and not before we submit the task in pool?Just asking.
well, because my intention was to measure the actual event processor duration,
excluding whatever wait time it spends on the pool. The pool wait time is statistically unrelated to the event processor time for specific event types, and it can be measured separately if you think it's interesting.