[#374] Add inner-ring event metrics

Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
This commit is contained in:
Alejandro Lopez 2023-05-26 13:24:41 +03:00 committed by Evgenii Stratonikov
parent 8dcd06c587
commit ebcc8afbee
27 changed files with 287 additions and 113 deletions

View file

@ -52,6 +52,7 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
s.netmapProcessor, err = netmap.New(&netmap.Params{ s.netmapProcessor, err = netmap.New(&netmap.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.netmap"), PoolSize: cfg.GetInt("workers.netmap"),
NetmapClient: netmap.NewNetmapClient(s.netmapClient), NetmapClient: netmap.NewNetmapClient(s.netmapClient),
EpochTimer: s, EpochTimer: s,
@ -162,6 +163,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
// create governance processor // create governance processor
governanceProcessor, err := governance.New(&governance.Params{ governanceProcessor, err := governance.New(&governance.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
FrostFSClient: frostfsCli, FrostFSClient: frostfsCli,
NetmapClient: s.netmapClient, NetmapClient: s.netmapClient,
AlphabetState: s, AlphabetState: s,
@ -231,6 +233,7 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{ s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets, ParsedWallets: parsedWallets,
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.alphabet"), PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: s.contracts.alphabet, AlphabetContracts: s.contracts.alphabet,
NetmapClient: s.netmapClient, NetmapClient: s.netmapClient,
@ -255,6 +258,7 @@ func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.C
// container processor // container processor
containerProcessor, err := cont.New(&cont.Params{ containerProcessor, err := cont.New(&cont.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.container"), PoolSize: cfg.GetInt("workers.container"),
AlphabetState: s, AlphabetState: s,
ContainerClient: cnrClient, ContainerClient: cnrClient,
@ -273,6 +277,7 @@ func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClien
// create balance processor // create balance processor
balanceProcessor, err := balance.New(&balance.Params{ balanceProcessor, err := balance.New(&balance.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.balance"), PoolSize: cfg.GetInt("workers.balance"),
FrostFSClient: frostfsCli, FrostFSClient: frostfsCli,
BalanceSC: s.contracts.balance, BalanceSC: s.contracts.balance,
@ -293,6 +298,7 @@ func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *
frostfsProcessor, err := frostfs.New(&frostfs.Params{ frostfsProcessor, err := frostfs.New(&frostfs.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.frostfs"), PoolSize: cfg.GetInt("workers.frostfs"),
FrostFSContract: s.contracts.frostfs, FrostFSContract: s.contracts.frostfs,
FrostFSIDClient: frostfsIDClient, FrostFSIDClient: frostfsIDClient,

View file

@ -0,0 +1,15 @@
package metrics
import "time"
type Register interface {
SetEpoch(epoch uint64)
SetHealth(s int32)
AddEvent(d time.Duration, typ string, success bool)
}
type DefaultRegister struct{}
func (DefaultRegister) SetEpoch(uint64) {}
func (DefaultRegister) SetHealth(int32) {}
func (DefaultRegister) AddEvent(time.Duration, string, bool) {}

View file

@ -2,6 +2,7 @@ package alphabet
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"go.uber.org/zap" "go.uber.org/zap"
@ -13,7 +14,7 @@ func (ap *Processor) HandleGasEmission(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := ap.pool.Submit(func() { ap.processEmit() }) err := processors.SubmitEvent(ap.pool, ap.metrics, "alphabet_emit_gas", ap.processEmit)
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained, ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained,

View file

@ -13,12 +13,12 @@ import (
const emitMethod = "emit" const emitMethod = "emit"
func (ap *Processor) processEmit() { func (ap *Processor) processEmit() bool {
index := ap.irList.AlphabetIndex() index := ap.irList.AlphabetIndex()
if index < 0 { if index < 0 {
ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent) ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent)
return return true
} }
contract, ok := ap.alphabetContracts.GetByIndex(index) contract, ok := ap.alphabetContracts.GetByIndex(index)
@ -26,7 +26,7 @@ func (ap *Processor) processEmit() {
ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent, ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent,
zap.Int("index", index)) zap.Int("index", index))
return return false
} }
// there is no signature collecting, so we don't need extra fee // there is no signature collecting, so we don't need extra fee
@ -34,13 +34,13 @@ func (ap *Processor) processEmit() {
if err != nil { if err != nil {
ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error())) ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error()))
return return false
} }
if ap.storageEmission == 0 { if ap.storageEmission == 0 {
ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff) ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff)
return return true
} }
networkMap, err := ap.netmapClient.NetMap() networkMap, err := ap.netmapClient.NetMap()
@ -48,7 +48,7 @@ func (ap *Processor) processEmit() {
ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes, ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
nmNodes := networkMap.Nodes() nmNodes := networkMap.Nodes()
@ -63,7 +63,7 @@ func (ap *Processor) processEmit() {
zap.Int("extra_wallets", extraLen)) zap.Int("extra_wallets", extraLen))
if nmLen+extraLen == 0 { if nmLen+extraLen == 0 {
return return true
} }
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen)) gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen))
@ -71,6 +71,8 @@ func (ap *Processor) processEmit() {
ap.transferGasToNetmapNodes(nmNodes, gasPerNode) ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
ap.transferGasToExtraNodes(pw, gasPerNode) ap.transferGasToExtraNodes(pw, gasPerNode)
return true
} }
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) { func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {

View file

@ -7,6 +7,7 @@ import (
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -50,6 +51,7 @@ type (
// protects parsedWallets from concurrent change // protects parsedWallets from concurrent change
pwLock *sync.RWMutex pwLock *sync.RWMutex
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
alphabetContracts Contracts alphabetContracts Contracts
netmapClient netmapClient netmapClient netmapClient
@ -62,6 +64,7 @@ type (
Params struct { Params struct {
ParsedWallets []util.Uint160 ParsedWallets []util.Uint160
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
AlphabetContracts Contracts AlphabetContracts Contracts
NetmapClient netmapClient NetmapClient netmapClient
@ -89,10 +92,16 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
parsedWallets: p.ParsedWallets, parsedWallets: p.ParsedWallets,
pwLock: new(sync.RWMutex), pwLock: new(sync.RWMutex),
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
alphabetContracts: p.AlphabetContracts, alphabetContracts: p.AlphabetContracts,
netmapClient: p.NetmapClient, netmapClient: p.NetmapClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex" "encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance" balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
"go.uber.org/zap" "go.uber.org/zap"
@ -17,7 +18,9 @@ func (bp *Processor) handleLock(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := bp.pool.Submit(func() { bp.processLock(&lock) }) err := processors.SubmitEvent(bp.pool, bp.metrics, "lock", func() bool {
return bp.processLock(&lock)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained, bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained,

View file

@ -9,10 +9,10 @@ import (
// Process lock event by invoking Cheque method in main net to send assets // Process lock event by invoking Cheque method in main net to send assets
// back to the withdraw issuer. // back to the withdraw issuer.
func (bp *Processor) processLock(lock *balanceEvent.Lock) { func (bp *Processor) processLock(lock *balanceEvent.Lock) bool {
if !bp.alphabetState.IsAlphabet() { if !bp.alphabetState.IsAlphabet() {
bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock) bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock)
return return true
} }
prm := frostfsContract.ChequePrm{} prm := frostfsContract.ChequePrm{}
@ -26,5 +26,8 @@ func (bp *Processor) processLock(lock *balanceEvent.Lock) {
err := bp.frostfsClient.Cheque(prm) err := bp.frostfsClient.Cheque(prm)
if err != nil { if err != nil {
bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err)) bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err))
return false
} }
return true
} }

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance" balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
@ -32,6 +33,7 @@ type (
// Processor of events produced by balance contract in the morphchain. // Processor of events produced by balance contract in the morphchain.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
frostfsClient FrostFSClient frostfsClient FrostFSClient
balanceSC util.Uint160 balanceSC util.Uint160
@ -42,6 +44,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
FrostFSClient FrostFSClient FrostFSClient FrostFSClient
BalanceSC util.Uint160 BalanceSC util.Uint160
@ -72,8 +75,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
frostfsClient: p.FrostFSClient, frostfsClient: p.FrostFSClient,
balanceSC: p.BalanceSC, balanceSC: p.BalanceSC,

View file

@ -4,6 +4,7 @@ import (
"crypto/sha256" "crypto/sha256"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
"github.com/mr-tron/base58" "github.com/mr-tron/base58"
@ -20,7 +21,9 @@ func (cp *Processor) handlePut(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerPut(put) }) err := processors.SubmitEvent(cp.pool, cp.metrics, "container_put", func() bool {
return cp.processContainerPut(put)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained, cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
@ -36,7 +39,9 @@ func (cp *Processor) handleDelete(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerDelete(del) }) err := processors.SubmitEvent(cp.pool, cp.metrics, "container_delete", func() bool {
return cp.processContainerDelete(del)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained, cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
@ -53,8 +58,8 @@ func (cp *Processor) handleSetEACL(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := cp.pool.Submit(func() { err := processors.SubmitEvent(cp.pool, cp.metrics, "container_set_eacl", func() bool {
cp.processSetEACL(e) return cp.processSetEACL(e)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage

View file

@ -31,10 +31,10 @@ type putContainerContext struct {
// Process a new container from the user by checking the container sanity // Process a new container from the user by checking the container sanity
// and sending approve tx back to the morph. // and sending approve tx back to the morph.
func (cp *Processor) processContainerPut(put putEvent) { func (cp *Processor) processContainerPut(put putEvent) bool {
if !cp.alphabetState.IsAlphabet() { if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut) cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut)
return return true
} }
ctx := &putContainerContext{ ctx := &putContainerContext{
@ -47,10 +47,17 @@ func (cp *Processor) processContainerPut(put putEvent) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
cp.approvePutContainer(ctx) if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
zap.String("error", err.Error()),
)
return false
}
return true
} }
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error { func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
@ -89,20 +96,12 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
return nil return nil
} }
func (cp *Processor) approvePutContainer(ctx *putContainerContext) {
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
zap.String("error", err.Error()),
)
}
}
// Process delete container operation from the user by checking container sanity // Process delete container operation from the user by checking container sanity
// and sending approve tx back to morph. // and sending approve tx back to morph.
func (cp *Processor) processContainerDelete(e containerEvent.Delete) { func (cp *Processor) processContainerDelete(e containerEvent.Delete) bool {
if !cp.alphabetState.IsAlphabet() { if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete) cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete)
return return true
} }
err := cp.checkDeleteContainer(e) err := cp.checkDeleteContainer(e)
@ -111,10 +110,18 @@ func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
cp.approveDeleteContainer(e) if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
zap.String("error", err.Error()),
)
return false
}
return true
} }
func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error { func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
@ -149,14 +156,6 @@ func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
return nil return nil
} }
func (cp *Processor) approveDeleteContainer(e containerEvent.Delete) {
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
zap.String("error", err.Error()),
)
}
}
func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error { func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error {
// fetch domain info // fetch domain info
ctx.d = containerSDK.ReadDomain(cnr) ctx.d = containerSDK.ReadDomain(cnr)

View file

@ -12,10 +12,10 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) { func (cp *Processor) processSetEACL(e containerEvent.SetEACL) bool {
if !cp.alphabetState.IsAlphabet() { if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL) cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL)
return return true
} }
err := cp.checkSetEACL(e) err := cp.checkSetEACL(e)
@ -24,10 +24,17 @@ func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
cp.approveSetEACL(e) if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
zap.String("error", err.Error()),
)
return false
}
return true
} }
func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error { func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
@ -73,11 +80,3 @@ func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
return nil return nil
} }
func (cp *Processor) approveSetEACL(e containerEvent.SetEACL) {
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
zap.String("error", err.Error()),
)
}
}

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
@ -40,6 +41,7 @@ type (
// Processor of events produced by container contract in the sidechain. // Processor of events produced by container contract in the sidechain.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
alphabetState AlphabetState alphabetState AlphabetState
cnrClient ContClient // notary must be enabled cnrClient ContClient // notary must be enabled
@ -51,6 +53,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
AlphabetState AlphabetState AlphabetState AlphabetState
ContainerClient ContClient ContainerClient ContClient
@ -102,8 +105,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
alphabetState: p.AlphabetState, alphabetState: p.AlphabetState,
cnrClient: p.ContainerClient, cnrClient: p.ContainerClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex" "encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs" frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs"
"github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/util/slice"
@ -18,7 +19,9 @@ func (np *Processor) handleDeposit(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processDeposit(deposit) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_deposit", func() bool {
return np.processDeposit(deposit)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -34,7 +37,9 @@ func (np *Processor) handleWithdraw(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processWithdraw(withdraw) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_withdraw", func() bool {
return np.processWithdraw(withdraw)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -50,7 +55,9 @@ func (np *Processor) handleCheque(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processCheque(cheque) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_cheque", func() bool {
return np.processCheque(cheque)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -67,7 +74,9 @@ func (np *Processor) handleConfig(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processConfig(cfg) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_config", func() bool {
return np.processConfig(cfg)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -83,7 +92,9 @@ func (np *Processor) handleBind(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processBind(e, true) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_bind", func() bool {
return np.processBind(e, true)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -99,7 +110,9 @@ func (np *Processor) handleUnbind(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processBind(e, false) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_unbind", func() bool {
return np.processBind(e, false)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,

View file

@ -15,10 +15,10 @@ const (
// Process deposit event by invoking a balance contract and sending native // Process deposit event by invoking a balance contract and sending native
// gas in the sidechain. // gas in the sidechain.
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) { func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit)
return return true
} }
prm := balance.MintPrm{} prm := balance.MintPrm{}
@ -49,7 +49,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
zap.Uint64("last_emission", val), zap.Uint64("last_emission", val),
zap.Uint64("current_epoch", curEpoch)) zap.Uint64("current_epoch", curEpoch))
return return false
} }
// get gas balance of the node // get gas balance of the node
@ -57,7 +57,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
balance, err := np.morphClient.GasBalance() balance, err := np.morphClient.GasBalance()
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err)) np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err))
return return false
} }
if balance < np.gasBalanceThreshold { if balance < np.gasBalanceThreshold {
@ -65,7 +65,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
zap.Int64("balance", balance), zap.Int64("balance", balance),
zap.Int64("threshold", np.gasBalanceThreshold)) zap.Int64("threshold", np.gasBalanceThreshold))
return return false
} }
err = np.morphClient.TransferGas(receiver, np.mintEmitValue) err = np.morphClient.TransferGas(receiver, np.mintEmitValue)
@ -73,24 +73,26 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver, np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
np.mintEmitCache.Add(receiver.String(), curEpoch) np.mintEmitCache.Add(receiver.String(), curEpoch)
return true
} }
// Process withdraw event by locking assets in the balance account. // Process withdraw event by locking assets in the balance account.
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) { func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw)
return return true
} }
// create lock account // create lock account
lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size]) lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size])
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err)) np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err))
return return false
} }
curEpoch := np.epochState.EpochCounter() curEpoch := np.epochState.EpochCounter()
@ -106,15 +108,18 @@ func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
err = np.balanceClient.Lock(prm) err = np.balanceClient.Lock(prm)
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err)) np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err))
return false
} }
return true
} }
// Process cheque event by transferring assets from the lock account back to // Process cheque event by transferring assets from the lock account back to
// the reserve account. // the reserve account.
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) { func (np *Processor) processCheque(cheque frostfsEvent.Cheque) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque)
return return true
} }
prm := balance.BurnPrm{} prm := balance.BurnPrm{}
@ -126,5 +131,8 @@ func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
err := np.balanceClient.Burn(prm) err := np.balanceClient.Burn(prm)
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err)) np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err))
return false
} }
return true
} }

View file

@ -18,10 +18,10 @@ type bindCommon interface {
TxHash() util.Uint256 TxHash() util.Uint256
} }
func (np *Processor) processBind(e bindCommon, bind bool) { func (np *Processor) processBind(e bindCommon, bind bool) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind)
return return true
} }
c := &bindCommonContext{ c := &bindCommonContext{
@ -36,10 +36,10 @@ func (np *Processor) processBind(e bindCommon, bind bool) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
np.approveBindCommon(c) return np.approveBindCommon(c) == nil
} }
type bindCommonContext struct { type bindCommonContext struct {
@ -70,7 +70,7 @@ func (np *Processor) checkBindCommon(e *bindCommonContext) error {
return nil return nil
} }
func (np *Processor) approveBindCommon(e *bindCommonContext) { func (np *Processor) approveBindCommon(e *bindCommonContext) error {
// calculate wallet address // calculate wallet address
scriptHash := e.User() scriptHash := e.User()
@ -80,7 +80,7 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return err
} }
var id user.ID var id user.ID
@ -104,4 +104,6 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
np.log.Error(fmt.Sprintf("could not approve %s", typ), np.log.Error(fmt.Sprintf("could not approve %s", typ),
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }
return err
} }

View file

@ -9,10 +9,10 @@ import (
// Process config event by setting configuration value from the mainchain in // Process config event by setting configuration value from the mainchain in
// the sidechain. // the sidechain.
func (np *Processor) processConfig(config frostfsEvent.Config) { func (np *Processor) processConfig(config frostfsEvent.Config) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig)
return return true
} }
prm := nmClient.SetConfigPrm{} prm := nmClient.SetConfigPrm{}
@ -25,5 +25,8 @@ func (np *Processor) processConfig(config frostfsEvent.Config) {
err := np.netmapClient.SetConfig(prm) err := np.netmapClient.SetConfig(prm)
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err)) np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err))
return false
} }
return true
} }

View file

@ -6,6 +6,7 @@ import (
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
@ -58,6 +59,7 @@ type (
// Processor of events produced by frostfs contract in main net. // Processor of events produced by frostfs contract in main net.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
frostfsContract util.Uint160 frostfsContract util.Uint160
balanceClient BalanceClient balanceClient BalanceClient
@ -77,6 +79,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
FrostFSContract util.Uint160 FrostFSContract util.Uint160
FrostFSIDClient IDClient FrostFSIDClient IDClient
@ -129,8 +132,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err) return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
frostfsContract: p.FrostFSContract, frostfsContract: p.FrostFSContract,
balanceClient: p.BalanceClient, balanceClient: p.BalanceClient,

View file

@ -2,6 +2,7 @@ package governance
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement"
"github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/native"
@ -35,7 +36,9 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
// send event to the worker pool // send event to the worker pool
err := gp.pool.Submit(func() { gp.processAlphabetSync(hash) }) err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
return gp.processAlphabetSync(hash)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained, gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained,

View file

@ -18,36 +18,36 @@ const (
alphabetUpdateIDPrefix = "AlphabetUpdate" alphabetUpdateIDPrefix = "AlphabetUpdate"
) )
func (gp *Processor) processAlphabetSync(txHash util.Uint256) { func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
if !gp.alphabetState.IsAlphabet() { if !gp.alphabetState.IsAlphabet() {
gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync) gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
return return true
} }
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList() mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
if err != nil { if err != nil {
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet, gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
sidechainAlphabet, err := gp.morphClient.Committee() sidechainAlphabet, err := gp.morphClient.Committee()
if err != nil { if err != nil {
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain, gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet) newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
if err != nil { if err != nil {
gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain, gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
if newAlphabet == nil { if newAlphabet == nil {
gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged) gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
return return true
} }
gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate, gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
@ -77,6 +77,8 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
gp.updateFrostFSContractInMainnet(newAlphabet) gp.updateFrostFSContractInMainnet(newAlphabet)
gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate) gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate)
return true
} }
func prettyKeys(keys keys.PublicKeys) string { func prettyKeys(keys keys.PublicKeys) string {

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
@ -75,6 +76,7 @@ type (
// Processor of events related to governance in the network. // Processor of events related to governance in the network.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
frostfsClient FrostFSClient frostfsClient FrostFSClient
netmapClient NetmapClient netmapClient NetmapClient
@ -92,7 +94,8 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
AlphabetState AlphabetState AlphabetState AlphabetState
EpochState EpochState EpochState EpochState
@ -130,11 +133,17 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
// result is cached by neo-go, so we can pre-calc it // result is cached by neo-go, so we can pre-calc it
designate := p.MainnetClient.GetDesignateHash() designate := p.MainnetClient.GetDesignateHash()
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
frostfsClient: p.FrostFSClient, frostfsClient: p.FrostFSClient,
netmapClient: p.NetmapClient, netmapClient: p.NetmapClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex" "encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
@ -16,7 +17,7 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := np.pool.Submit(func() { np.processNewEpochTick() }) err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick)
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.NetmapNetmapWorkerPoolDrained, np.log.Warn(logs.NetmapNetmapWorkerPoolDrained,
@ -32,8 +33,8 @@ func (np *Processor) handleNewEpoch(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := np.pool.Submit(func() { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
np.processNewEpoch(epochEvent) return np.processNewEpoch(epochEvent)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
@ -51,8 +52,8 @@ func (np *Processor) handleAddPeer(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := np.pool.Submit(func() { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_add_peer", func() bool {
np.processAddPeer(newPeer) return np.processAddPeer(newPeer)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
@ -69,8 +70,8 @@ func (np *Processor) handleUpdateState(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_update_peer", func() bool {
np.processUpdatePeer(updPeer) return np.processUpdatePeer(updPeer)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
@ -91,8 +92,8 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner")) np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner"))
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
np.processNetmapCleanupTick(cleanup) return np.processNetmapCleanupTick(cleanup)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage

View file

@ -7,11 +7,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) { func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick) np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick)
return return true
} }
err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error { err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error {
@ -47,5 +47,8 @@ func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
if err != nil { if err != nil {
np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache, np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return false
} }
return true
} }

View file

@ -10,7 +10,7 @@ import (
// Process new epoch notification by setting global epoch value and resetting // Process new epoch notification by setting global epoch value and resetting
// local epoch timer. // local epoch timer.
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
epoch := ev.EpochNumber() epoch := ev.EpochNumber()
epochDuration, err := np.netmapClient.EpochDuration() epochDuration, err := np.netmapClient.EpochDuration()
@ -41,7 +41,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup, np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
prm := cntClient.StartEstimationPrm{} prm := cntClient.StartEstimationPrm{}
@ -63,13 +63,15 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()}) np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash())) np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
np.handleNotaryDeposit(ev) np.handleNotaryDeposit(ev)
return true
} }
// Process new epoch tick by invoking new epoch method in network map contract. // Process new epoch tick by invoking new epoch method in network map contract.
func (np *Processor) processNewEpochTick() { func (np *Processor) processNewEpochTick() bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick) np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
return return true
} }
nextEpoch := np.epochState.EpochCounter() + 1 nextEpoch := np.epochState.EpochCounter() + 1
@ -78,5 +80,8 @@ func (np *Processor) processNewEpochTick() {
err := np.netmapClient.NewEpoch(nextEpoch, false) err := np.netmapClient.NewEpoch(nextEpoch, false)
if err != nil { if err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err)) np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
return false
} }
return true
} }

View file

@ -12,10 +12,10 @@ import (
// Process add peer notification by sanity check of new node // Process add peer notification by sanity check of new node
// local epoch timer. // local epoch timer.
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) { func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification) np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification)
return return true
} }
// check if notary transaction is valid, see #976 // check if notary transaction is valid, see #976
@ -26,7 +26,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
zap.String("method", "netmap.AddPeer"), zap.String("method", "netmap.AddPeer"),
zap.String("hash", tx.Hash().StringLE()), zap.String("hash", tx.Hash().StringLE()),
zap.Error(err)) zap.Error(err))
return return false
} }
// unmarshal node info // unmarshal node info
@ -34,7 +34,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
if err := nodeInfo.Unmarshal(ev.Node()); err != nil { if err := nodeInfo.Unmarshal(ev.Node()); err != nil {
// it will be nice to have tx id at event structure to log it // it will be nice to have tx id at event structure to log it
np.log.Warn(logs.NetmapCantParseNetworkMapCandidate) np.log.Warn(logs.NetmapCantParseNetworkMapCandidate)
return return false
} }
// validate and update node info // validate and update node info
@ -44,7 +44,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
// sort attributes to make it consistent // sort attributes to make it consistent
@ -81,15 +81,18 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
if err != nil { if err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err)) np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err))
return false
} }
} }
return true
} }
// Process update peer notification by sending approval tx to the smart contract. // Process update peer notification by sending approval tx to the smart contract.
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) { func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification) np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification)
return return true
} }
// flag node to remove from local view, so it can be re-bootstrapped // flag node to remove from local view, so it can be re-bootstrapped
@ -105,11 +108,14 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
zap.Error(err), zap.Error(err),
) )
return return false
} }
} }
if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil { if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err)) np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err))
return false
} }
return true
} }

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
@ -72,6 +73,7 @@ type (
// and new epoch ticker, because it is related to contract. // and new epoch ticker, because it is related to contract.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
epochTimer EpochTimerReseter epochTimer EpochTimerReseter
epochState EpochState epochState EpochState
@ -93,6 +95,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
NetmapClient Client NetmapClient Client
EpochTimer EpochTimerReseter EpochTimer EpochTimerReseter
@ -145,8 +148,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
epochTimer: p.EpochTimer, epochTimer: p.EpochTimer,
epochState: p.EpochState, epochState: p.EpochState,

View file

@ -0,0 +1,16 @@
package processors
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"github.com/panjf2000/ants/v2"
)
func SubmitEvent(pool *ants.Pool, metrics metrics.Register, eventLabel string, eventProcessor func() bool) error {
return pool.Submit(func() {
start := time.Now()
success := eventProcessor()
metrics.AddEvent(time.Since(start), eventLabel, success)
})
}

View file

@ -1,13 +1,23 @@
package metrics package metrics
import "github.com/prometheus/client_golang/prometheus" import (
"strconv"
"time"
const innerRingSubsystem = "ir" "github.com/prometheus/client_golang/prometheus"
)
const (
innerRingSubsystem = "ir"
innerRingLabelSuccess = "success"
innerRingLabelType = "type"
)
// InnerRingServiceMetrics contains metrics collected by inner ring. // InnerRingServiceMetrics contains metrics collected by inner ring.
type InnerRingServiceMetrics struct { type InnerRingServiceMetrics struct {
epoch metric[prometheus.Gauge] epoch metric[prometheus.Gauge]
health metric[prometheus.Gauge] health metric[prometheus.Gauge]
eventDuration metric[*prometheus.HistogramVec]
} }
// NewInnerRingMetrics returns new instance of metrics collectors for inner ring. // NewInnerRingMetrics returns new instance of metrics collectors for inner ring.
@ -25,14 +35,22 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
Name: "health", Name: "health",
Help: "Current inner-ring node state.", Help: "Current inner-ring node state.",
}) })
eventDuration = newHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: innerRingSubsystem,
Name: "event_duration_seconds",
Help: "Duration of processing of inner-ring events",
}, []string{innerRingLabelType, innerRingLabelSuccess})
) )
mustRegister(epoch) mustRegister(epoch)
mustRegister(health) mustRegister(health)
mustRegister(eventDuration)
return &InnerRingServiceMetrics{ return &InnerRingServiceMetrics{
epoch: epoch, epoch: epoch,
health: health, health: health,
eventDuration: eventDuration,
} }
} }
@ -45,3 +63,10 @@ func (m InnerRingServiceMetrics) SetEpoch(epoch uint64) {
func (m InnerRingServiceMetrics) SetHealth(s int32) { func (m InnerRingServiceMetrics) SetHealth(s int32) {
m.health.value.Set(float64(s)) m.health.value.Set(float64(s))
} }
func (m InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success bool) {
m.eventDuration.value.With(prometheus.Labels{
innerRingLabelType: typ,
innerRingLabelSuccess: strconv.FormatBool(success),
}).Observe(d.Seconds())
}