From ebcc8afbeeeb97c66cdfefab3cde227081b6c59e Mon Sep 17 00:00:00 2001 From: Alejandro Lopez Date: Fri, 26 May 2023 13:24:41 +0300 Subject: [PATCH] [#374] Add inner-ring event metrics Signed-off-by: Alejandro Lopez --- pkg/innerring/initialization.go | 6 +++ pkg/innerring/metrics/metrics.go | 15 ++++++ pkg/innerring/processors/alphabet/handlers.go | 3 +- .../processors/alphabet/process_emit.go | 16 ++++--- .../processors/alphabet/processor.go | 9 ++++ pkg/innerring/processors/balance/handlers.go | 5 +- .../processors/balance/process_assets.go | 7 ++- pkg/innerring/processors/balance/processor.go | 9 ++++ .../processors/container/handlers.go | 13 +++-- .../processors/container/process_container.go | 47 +++++++++---------- .../processors/container/process_eacl.go | 23 +++++---- .../processors/container/processor.go | 9 ++++ pkg/innerring/processors/frostfs/handlers.go | 25 +++++++--- .../processors/frostfs/process_assets.go | 30 +++++++----- .../processors/frostfs/process_bind.go | 14 +++--- .../processors/frostfs/process_config.go | 7 ++- pkg/innerring/processors/frostfs/processor.go | 9 ++++ .../processors/governance/handlers.go | 5 +- .../processors/governance/process_update.go | 14 +++--- .../processors/governance/processor.go | 11 ++++- pkg/innerring/processors/netmap/handlers.go | 19 ++++---- .../processors/netmap/process_cleanup.go | 7 ++- .../processors/netmap/process_epoch.go | 13 +++-- .../processors/netmap/process_peers.go | 22 +++++---- pkg/innerring/processors/netmap/processor.go | 9 ++++ pkg/innerring/processors/util.go | 16 +++++++ pkg/metrics/innerring.go | 37 ++++++++++++--- 27 files changed, 287 insertions(+), 113 deletions(-) create mode 100644 pkg/innerring/metrics/metrics.go create mode 100644 pkg/innerring/processors/util.go diff --git a/pkg/innerring/initialization.go b/pkg/innerring/initialization.go index 8e5aef95..c49d2250 100644 --- a/pkg/innerring/initialization.go +++ b/pkg/innerring/initialization.go @@ -52,6 +52,7 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper, s.netmapProcessor, err = netmap.New(&netmap.Params{ Log: s.log, + Metrics: s.metrics, PoolSize: cfg.GetInt("workers.netmap"), NetmapClient: netmap.NewNetmapClient(s.netmapClient), EpochTimer: s, @@ -162,6 +163,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli // create governance processor governanceProcessor, err := governance.New(&governance.Params{ Log: s.log, + Metrics: s.metrics, FrostFSClient: frostfsCli, NetmapClient: s.netmapClient, AlphabetState: s, @@ -231,6 +233,7 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error { s.alphabetProcessor, err = alphabet.New(&alphabet.Params{ ParsedWallets: parsedWallets, Log: s.log, + Metrics: s.metrics, PoolSize: cfg.GetInt("workers.alphabet"), AlphabetContracts: s.contracts.alphabet, NetmapClient: s.netmapClient, @@ -255,6 +258,7 @@ func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.C // container processor containerProcessor, err := cont.New(&cont.Params{ Log: s.log, + Metrics: s.metrics, PoolSize: cfg.GetInt("workers.container"), AlphabetState: s, ContainerClient: cnrClient, @@ -273,6 +277,7 @@ func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClien // create balance processor balanceProcessor, err := balance.New(&balance.Params{ Log: s.log, + Metrics: s.metrics, PoolSize: cfg.GetInt("workers.balance"), FrostFSClient: frostfsCli, BalanceSC: s.contracts.balance, @@ -293,6 +298,7 @@ func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient * frostfsProcessor, err := frostfs.New(&frostfs.Params{ Log: s.log, + Metrics: s.metrics, PoolSize: cfg.GetInt("workers.frostfs"), FrostFSContract: s.contracts.frostfs, FrostFSIDClient: frostfsIDClient, diff --git a/pkg/innerring/metrics/metrics.go b/pkg/innerring/metrics/metrics.go new file mode 100644 index 00000000..002f3afe --- /dev/null +++ b/pkg/innerring/metrics/metrics.go @@ -0,0 +1,15 @@ +package metrics + +import "time" + +type Register interface { + SetEpoch(epoch uint64) + SetHealth(s int32) + AddEvent(d time.Duration, typ string, success bool) +} + +type DefaultRegister struct{} + +func (DefaultRegister) SetEpoch(uint64) {} +func (DefaultRegister) SetHealth(int32) {} +func (DefaultRegister) AddEvent(time.Duration, string, bool) {} diff --git a/pkg/innerring/processors/alphabet/handlers.go b/pkg/innerring/processors/alphabet/handlers.go index c0668a4f..9de075f1 100644 --- a/pkg/innerring/processors/alphabet/handlers.go +++ b/pkg/innerring/processors/alphabet/handlers.go @@ -2,6 +2,7 @@ package alphabet import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "go.uber.org/zap" @@ -13,7 +14,7 @@ func (ap *Processor) HandleGasEmission(ev event.Event) { // send event to the worker pool - err := ap.pool.Submit(func() { ap.processEmit() }) + err := processors.SubmitEvent(ap.pool, ap.metrics, "alphabet_emit_gas", ap.processEmit) if err != nil { // there system can be moved into controlled degradation stage ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained, diff --git a/pkg/innerring/processors/alphabet/process_emit.go b/pkg/innerring/processors/alphabet/process_emit.go index 7a268ac5..8a233601 100644 --- a/pkg/innerring/processors/alphabet/process_emit.go +++ b/pkg/innerring/processors/alphabet/process_emit.go @@ -13,12 +13,12 @@ import ( const emitMethod = "emit" -func (ap *Processor) processEmit() { +func (ap *Processor) processEmit() bool { index := ap.irList.AlphabetIndex() if index < 0 { ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent) - return + return true } contract, ok := ap.alphabetContracts.GetByIndex(index) @@ -26,7 +26,7 @@ func (ap *Processor) processEmit() { ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent, zap.Int("index", index)) - return + return false } // there is no signature collecting, so we don't need extra fee @@ -34,13 +34,13 @@ func (ap *Processor) processEmit() { if err != nil { ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error())) - return + return false } if ap.storageEmission == 0 { ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff) - return + return true } networkMap, err := ap.netmapClient.NetMap() @@ -48,7 +48,7 @@ func (ap *Processor) processEmit() { ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes, zap.String("error", err.Error())) - return + return false } nmNodes := networkMap.Nodes() @@ -63,7 +63,7 @@ func (ap *Processor) processEmit() { zap.Int("extra_wallets", extraLen)) if nmLen+extraLen == 0 { - return + return true } gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen)) @@ -71,6 +71,8 @@ func (ap *Processor) processEmit() { ap.transferGasToNetmapNodes(nmNodes, gasPerNode) ap.transferGasToExtraNodes(pw, gasPerNode) + + return true } func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) { diff --git a/pkg/innerring/processors/alphabet/processor.go b/pkg/innerring/processors/alphabet/processor.go index cd9088e0..972f8428 100644 --- a/pkg/innerring/processors/alphabet/processor.go +++ b/pkg/innerring/processors/alphabet/processor.go @@ -7,6 +7,7 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" @@ -50,6 +51,7 @@ type ( // protects parsedWallets from concurrent change pwLock *sync.RWMutex log *logger.Logger + metrics metrics.Register pool *ants.Pool alphabetContracts Contracts netmapClient netmapClient @@ -62,6 +64,7 @@ type ( Params struct { ParsedWallets []util.Uint160 Log *logger.Logger + Metrics metrics.Register PoolSize int AlphabetContracts Contracts NetmapClient netmapClient @@ -89,10 +92,16 @@ func New(p *Params) (*Processor, error) { return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err) } + metricsRegister := p.Metrics + if metricsRegister == nil { + metricsRegister = metrics.DefaultRegister{} + } + return &Processor{ parsedWallets: p.ParsedWallets, pwLock: new(sync.RWMutex), log: p.Log, + metrics: metricsRegister, pool: pool, alphabetContracts: p.AlphabetContracts, netmapClient: p.NetmapClient, diff --git a/pkg/innerring/processors/balance/handlers.go b/pkg/innerring/processors/balance/handlers.go index e325da1f..e39f3abb 100644 --- a/pkg/innerring/processors/balance/handlers.go +++ b/pkg/innerring/processors/balance/handlers.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance" "go.uber.org/zap" @@ -17,7 +18,9 @@ func (bp *Processor) handleLock(ev event.Event) { // send an event to the worker pool - err := bp.pool.Submit(func() { bp.processLock(&lock) }) + err := processors.SubmitEvent(bp.pool, bp.metrics, "lock", func() bool { + return bp.processLock(&lock) + }) if err != nil { // there system can be moved into controlled degradation stage bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained, diff --git a/pkg/innerring/processors/balance/process_assets.go b/pkg/innerring/processors/balance/process_assets.go index 3f86a3cb..1d94fa45 100644 --- a/pkg/innerring/processors/balance/process_assets.go +++ b/pkg/innerring/processors/balance/process_assets.go @@ -9,10 +9,10 @@ import ( // Process lock event by invoking Cheque method in main net to send assets // back to the withdraw issuer. -func (bp *Processor) processLock(lock *balanceEvent.Lock) { +func (bp *Processor) processLock(lock *balanceEvent.Lock) bool { if !bp.alphabetState.IsAlphabet() { bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock) - return + return true } prm := frostfsContract.ChequePrm{} @@ -26,5 +26,8 @@ func (bp *Processor) processLock(lock *balanceEvent.Lock) { err := bp.frostfsClient.Cheque(prm) if err != nil { bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err)) + return false } + + return true } diff --git a/pkg/innerring/processors/balance/processor.go b/pkg/innerring/processors/balance/processor.go index 356754cf..5cc849b5 100644 --- a/pkg/innerring/processors/balance/processor.go +++ b/pkg/innerring/processors/balance/processor.go @@ -5,6 +5,7 @@ import ( "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics" frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance" @@ -32,6 +33,7 @@ type ( // Processor of events produced by balance contract in the morphchain. Processor struct { log *logger.Logger + metrics metrics.Register pool *ants.Pool frostfsClient FrostFSClient balanceSC util.Uint160 @@ -42,6 +44,7 @@ type ( // Params of the processor constructor. Params struct { Log *logger.Logger + Metrics metrics.Register PoolSize int FrostFSClient FrostFSClient BalanceSC util.Uint160 @@ -72,8 +75,14 @@ func New(p *Params) (*Processor, error) { return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err) } + metricsRegister := p.Metrics + if metricsRegister == nil { + metricsRegister = metrics.DefaultRegister{} + } + return &Processor{ log: p.Log, + metrics: metricsRegister, pool: pool, frostfsClient: p.FrostFSClient, balanceSC: p.BalanceSC, diff --git a/pkg/innerring/processors/container/handlers.go b/pkg/innerring/processors/container/handlers.go index 2ab1147c..3ec10b88 100644 --- a/pkg/innerring/processors/container/handlers.go +++ b/pkg/innerring/processors/container/handlers.go @@ -4,6 +4,7 @@ import ( "crypto/sha256" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" "github.com/mr-tron/base58" @@ -20,7 +21,9 @@ func (cp *Processor) handlePut(ev event.Event) { // send an event to the worker pool - err := cp.pool.Submit(func() { cp.processContainerPut(put) }) + err := processors.SubmitEvent(cp.pool, cp.metrics, "container_put", func() bool { + return cp.processContainerPut(put) + }) if err != nil { // there system can be moved into controlled degradation stage cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained, @@ -36,7 +39,9 @@ func (cp *Processor) handleDelete(ev event.Event) { // send an event to the worker pool - err := cp.pool.Submit(func() { cp.processContainerDelete(del) }) + err := processors.SubmitEvent(cp.pool, cp.metrics, "container_delete", func() bool { + return cp.processContainerDelete(del) + }) if err != nil { // there system can be moved into controlled degradation stage cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained, @@ -53,8 +58,8 @@ func (cp *Processor) handleSetEACL(ev event.Event) { // send an event to the worker pool - err := cp.pool.Submit(func() { - cp.processSetEACL(e) + err := processors.SubmitEvent(cp.pool, cp.metrics, "container_set_eacl", func() bool { + return cp.processSetEACL(e) }) if err != nil { // there system can be moved into controlled degradation stage diff --git a/pkg/innerring/processors/container/process_container.go b/pkg/innerring/processors/container/process_container.go index 3bee1c4d..33ef9003 100644 --- a/pkg/innerring/processors/container/process_container.go +++ b/pkg/innerring/processors/container/process_container.go @@ -31,10 +31,10 @@ type putContainerContext struct { // Process a new container from the user by checking the container sanity // and sending approve tx back to the morph. -func (cp *Processor) processContainerPut(put putEvent) { +func (cp *Processor) processContainerPut(put putEvent) bool { if !cp.alphabetState.IsAlphabet() { cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut) - return + return true } ctx := &putContainerContext{ @@ -47,10 +47,17 @@ func (cp *Processor) processContainerPut(put putEvent) { zap.String("error", err.Error()), ) - return + return false } - cp.approvePutContainer(ctx) + if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil { + cp.log.Error(logs.ContainerCouldNotApprovePutContainer, + zap.String("error", err.Error()), + ) + return false + } + + return true } func (cp *Processor) checkPutContainer(ctx *putContainerContext) error { @@ -89,20 +96,12 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error { return nil } -func (cp *Processor) approvePutContainer(ctx *putContainerContext) { - if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil { - cp.log.Error(logs.ContainerCouldNotApprovePutContainer, - zap.String("error", err.Error()), - ) - } -} - // Process delete container operation from the user by checking container sanity // and sending approve tx back to morph. -func (cp *Processor) processContainerDelete(e containerEvent.Delete) { +func (cp *Processor) processContainerDelete(e containerEvent.Delete) bool { if !cp.alphabetState.IsAlphabet() { cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete) - return + return true } err := cp.checkDeleteContainer(e) @@ -111,10 +110,18 @@ func (cp *Processor) processContainerDelete(e containerEvent.Delete) { zap.String("error", err.Error()), ) - return + return false } - cp.approveDeleteContainer(e) + if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil { + cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer, + zap.String("error", err.Error()), + ) + + return false + } + + return true } func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error { @@ -149,14 +156,6 @@ func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error { return nil } -func (cp *Processor) approveDeleteContainer(e containerEvent.Delete) { - if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil { - cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer, - zap.String("error", err.Error()), - ) - } -} - func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error { // fetch domain info ctx.d = containerSDK.ReadDomain(cnr) diff --git a/pkg/innerring/processors/container/process_eacl.go b/pkg/innerring/processors/container/process_eacl.go index 43a59e22..8ab0d5c3 100644 --- a/pkg/innerring/processors/container/process_eacl.go +++ b/pkg/innerring/processors/container/process_eacl.go @@ -12,10 +12,10 @@ import ( "go.uber.org/zap" ) -func (cp *Processor) processSetEACL(e containerEvent.SetEACL) { +func (cp *Processor) processSetEACL(e containerEvent.SetEACL) bool { if !cp.alphabetState.IsAlphabet() { cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL) - return + return true } err := cp.checkSetEACL(e) @@ -24,10 +24,17 @@ func (cp *Processor) processSetEACL(e containerEvent.SetEACL) { zap.String("error", err.Error()), ) - return + return false } - cp.approveSetEACL(e) + if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil { + cp.log.Error(logs.ContainerCouldNotApproveSetEACL, + zap.String("error", err.Error()), + ) + return false + } + + return true } func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error { @@ -73,11 +80,3 @@ func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error { return nil } - -func (cp *Processor) approveSetEACL(e containerEvent.SetEACL) { - if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil { - cp.log.Error(logs.ContainerCouldNotApproveSetEACL, - zap.String("error", err.Error()), - ) - } -} diff --git a/pkg/innerring/processors/container/processor.go b/pkg/innerring/processors/container/processor.go index ec82ace7..fd5348c6 100644 --- a/pkg/innerring/processors/container/processor.go +++ b/pkg/innerring/processors/container/processor.go @@ -6,6 +6,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" @@ -40,6 +41,7 @@ type ( // Processor of events produced by container contract in the sidechain. Processor struct { log *logger.Logger + metrics metrics.Register pool *ants.Pool alphabetState AlphabetState cnrClient ContClient // notary must be enabled @@ -51,6 +53,7 @@ type ( // Params of the processor constructor. Params struct { Log *logger.Logger + Metrics metrics.Register PoolSize int AlphabetState AlphabetState ContainerClient ContClient @@ -102,8 +105,14 @@ func New(p *Params) (*Processor, error) { return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err) } + metricsRegister := p.Metrics + if metricsRegister == nil { + metricsRegister = metrics.DefaultRegister{} + } + return &Processor{ log: p.Log, + metrics: metricsRegister, pool: pool, alphabetState: p.AlphabetState, cnrClient: p.ContainerClient, diff --git a/pkg/innerring/processors/frostfs/handlers.go b/pkg/innerring/processors/frostfs/handlers.go index 574cf057..ab53d5c4 100644 --- a/pkg/innerring/processors/frostfs/handlers.go +++ b/pkg/innerring/processors/frostfs/handlers.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs" "github.com/nspcc-dev/neo-go/pkg/util/slice" @@ -18,7 +19,9 @@ func (np *Processor) handleDeposit(ev event.Event) { // send event to the worker pool - err := np.pool.Submit(func() { np.processDeposit(deposit) }) + err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_deposit", func() bool { + return np.processDeposit(deposit) + }) if err != nil { // there system can be moved into controlled degradation stage np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, @@ -34,7 +37,9 @@ func (np *Processor) handleWithdraw(ev event.Event) { // send event to the worker pool - err := np.pool.Submit(func() { np.processWithdraw(withdraw) }) + err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_withdraw", func() bool { + return np.processWithdraw(withdraw) + }) if err != nil { // there system can be moved into controlled degradation stage np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, @@ -50,7 +55,9 @@ func (np *Processor) handleCheque(ev event.Event) { // send event to the worker pool - err := np.pool.Submit(func() { np.processCheque(cheque) }) + err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_cheque", func() bool { + return np.processCheque(cheque) + }) if err != nil { // there system can be moved into controlled degradation stage np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, @@ -67,7 +74,9 @@ func (np *Processor) handleConfig(ev event.Event) { // send event to the worker pool - err := np.pool.Submit(func() { np.processConfig(cfg) }) + err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_config", func() bool { + return np.processConfig(cfg) + }) if err != nil { // there system can be moved into controlled degradation stage np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, @@ -83,7 +92,9 @@ func (np *Processor) handleBind(ev event.Event) { // send event to the worker pool - err := np.pool.Submit(func() { np.processBind(e, true) }) + err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_bind", func() bool { + return np.processBind(e, true) + }) if err != nil { // there system can be moved into controlled degradation stage np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, @@ -99,7 +110,9 @@ func (np *Processor) handleUnbind(ev event.Event) { // send event to the worker pool - err := np.pool.Submit(func() { np.processBind(e, false) }) + err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_unbind", func() bool { + return np.processBind(e, false) + }) if err != nil { // there system can be moved into controlled degradation stage np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, diff --git a/pkg/innerring/processors/frostfs/process_assets.go b/pkg/innerring/processors/frostfs/process_assets.go index cfbf21b0..327a4a3a 100644 --- a/pkg/innerring/processors/frostfs/process_assets.go +++ b/pkg/innerring/processors/frostfs/process_assets.go @@ -15,10 +15,10 @@ const ( // Process deposit event by invoking a balance contract and sending native // gas in the sidechain. -func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) { +func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) bool { if !np.alphabetState.IsAlphabet() { np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit) - return + return true } prm := balance.MintPrm{} @@ -49,7 +49,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) { zap.Uint64("last_emission", val), zap.Uint64("current_epoch", curEpoch)) - return + return false } // get gas balance of the node @@ -57,7 +57,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) { balance, err := np.morphClient.GasBalance() if err != nil { np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err)) - return + return false } if balance < np.gasBalanceThreshold { @@ -65,7 +65,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) { zap.Int64("balance", balance), zap.Int64("threshold", np.gasBalanceThreshold)) - return + return false } err = np.morphClient.TransferGas(receiver, np.mintEmitValue) @@ -73,24 +73,26 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) { np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver, zap.String("error", err.Error())) - return + return false } np.mintEmitCache.Add(receiver.String(), curEpoch) + + return true } // Process withdraw event by locking assets in the balance account. -func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) { +func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) bool { if !np.alphabetState.IsAlphabet() { np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw) - return + return true } // create lock account lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size]) if err != nil { np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err)) - return + return false } curEpoch := np.epochState.EpochCounter() @@ -106,15 +108,18 @@ func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) { err = np.balanceClient.Lock(prm) if err != nil { np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err)) + return false } + + return true } // Process cheque event by transferring assets from the lock account back to // the reserve account. -func (np *Processor) processCheque(cheque frostfsEvent.Cheque) { +func (np *Processor) processCheque(cheque frostfsEvent.Cheque) bool { if !np.alphabetState.IsAlphabet() { np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque) - return + return true } prm := balance.BurnPrm{} @@ -126,5 +131,8 @@ func (np *Processor) processCheque(cheque frostfsEvent.Cheque) { err := np.balanceClient.Burn(prm) if err != nil { np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err)) + return false } + + return true } diff --git a/pkg/innerring/processors/frostfs/process_bind.go b/pkg/innerring/processors/frostfs/process_bind.go index a9b523a7..50c6bf5f 100644 --- a/pkg/innerring/processors/frostfs/process_bind.go +++ b/pkg/innerring/processors/frostfs/process_bind.go @@ -18,10 +18,10 @@ type bindCommon interface { TxHash() util.Uint256 } -func (np *Processor) processBind(e bindCommon, bind bool) { +func (np *Processor) processBind(e bindCommon, bind bool) bool { if !np.alphabetState.IsAlphabet() { np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind) - return + return true } c := &bindCommonContext{ @@ -36,10 +36,10 @@ func (np *Processor) processBind(e bindCommon, bind bool) { zap.String("error", err.Error()), ) - return + return false } - np.approveBindCommon(c) + return np.approveBindCommon(c) == nil } type bindCommonContext struct { @@ -70,7 +70,7 @@ func (np *Processor) checkBindCommon(e *bindCommonContext) error { return nil } -func (np *Processor) approveBindCommon(e *bindCommonContext) { +func (np *Processor) approveBindCommon(e *bindCommonContext) error { // calculate wallet address scriptHash := e.User() @@ -80,7 +80,7 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) { zap.String("error", err.Error()), ) - return + return err } var id user.ID @@ -104,4 +104,6 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) { np.log.Error(fmt.Sprintf("could not approve %s", typ), zap.String("error", err.Error())) } + + return err } diff --git a/pkg/innerring/processors/frostfs/process_config.go b/pkg/innerring/processors/frostfs/process_config.go index ce2dabfd..2ae3e6ce 100644 --- a/pkg/innerring/processors/frostfs/process_config.go +++ b/pkg/innerring/processors/frostfs/process_config.go @@ -9,10 +9,10 @@ import ( // Process config event by setting configuration value from the mainchain in // the sidechain. -func (np *Processor) processConfig(config frostfsEvent.Config) { +func (np *Processor) processConfig(config frostfsEvent.Config) bool { if !np.alphabetState.IsAlphabet() { np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig) - return + return true } prm := nmClient.SetConfigPrm{} @@ -25,5 +25,8 @@ func (np *Processor) processConfig(config frostfsEvent.Config) { err := np.netmapClient.SetConfig(prm) if err != nil { np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err)) + return false } + + return true } diff --git a/pkg/innerring/processors/frostfs/processor.go b/pkg/innerring/processors/frostfs/processor.go index 2af15a81..e6ee7883 100644 --- a/pkg/innerring/processors/frostfs/processor.go +++ b/pkg/innerring/processors/frostfs/processor.go @@ -6,6 +6,7 @@ import ( "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" @@ -58,6 +59,7 @@ type ( // Processor of events produced by frostfs contract in main net. Processor struct { log *logger.Logger + metrics metrics.Register pool *ants.Pool frostfsContract util.Uint160 balanceClient BalanceClient @@ -77,6 +79,7 @@ type ( // Params of the processor constructor. Params struct { Log *logger.Logger + Metrics metrics.Register PoolSize int FrostFSContract util.Uint160 FrostFSIDClient IDClient @@ -129,8 +132,14 @@ func New(p *Params) (*Processor, error) { return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err) } + metricsRegister := p.Metrics + if metricsRegister == nil { + metricsRegister = metrics.DefaultRegister{} + } + return &Processor{ log: p.Log, + metrics: metricsRegister, pool: pool, frostfsContract: p.FrostFSContract, balanceClient: p.BalanceClient, diff --git a/pkg/innerring/processors/governance/handlers.go b/pkg/innerring/processors/governance/handlers.go index 727acc21..fd7f539c 100644 --- a/pkg/innerring/processors/governance/handlers.go +++ b/pkg/innerring/processors/governance/handlers.go @@ -2,6 +2,7 @@ package governance import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement" "github.com/nspcc-dev/neo-go/pkg/core/native" @@ -35,7 +36,9 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) { // send event to the worker pool - err := gp.pool.Submit(func() { gp.processAlphabetSync(hash) }) + err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool { + return gp.processAlphabetSync(hash) + }) if err != nil { // there system can be moved into controlled degradation stage gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained, diff --git a/pkg/innerring/processors/governance/process_update.go b/pkg/innerring/processors/governance/process_update.go index 3eae676d..50ba58e7 100644 --- a/pkg/innerring/processors/governance/process_update.go +++ b/pkg/innerring/processors/governance/process_update.go @@ -18,36 +18,36 @@ const ( alphabetUpdateIDPrefix = "AlphabetUpdate" ) -func (gp *Processor) processAlphabetSync(txHash util.Uint256) { +func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool { if !gp.alphabetState.IsAlphabet() { gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync) - return + return true } mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList() if err != nil { gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet, zap.String("error", err.Error())) - return + return false } sidechainAlphabet, err := gp.morphClient.Committee() if err != nil { gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain, zap.String("error", err.Error())) - return + return false } newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet) if err != nil { gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain, zap.String("error", err.Error())) - return + return false } if newAlphabet == nil { gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged) - return + return true } gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate, @@ -77,6 +77,8 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) { gp.updateFrostFSContractInMainnet(newAlphabet) gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate) + + return true } func prettyKeys(keys keys.PublicKeys) string { diff --git a/pkg/innerring/processors/governance/processor.go b/pkg/innerring/processors/governance/processor.go index 07b5b5ce..fa267ead 100644 --- a/pkg/innerring/processors/governance/processor.go +++ b/pkg/innerring/processors/governance/processor.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" @@ -75,6 +76,7 @@ type ( // Processor of events related to governance in the network. Processor struct { log *logger.Logger + metrics metrics.Register pool *ants.Pool frostfsClient FrostFSClient netmapClient NetmapClient @@ -92,7 +94,8 @@ type ( // Params of the processor constructor. Params struct { - Log *logger.Logger + Log *logger.Logger + Metrics metrics.Register AlphabetState AlphabetState EpochState EpochState @@ -130,11 +133,17 @@ func New(p *Params) (*Processor, error) { return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err) } + metricsRegister := p.Metrics + if metricsRegister == nil { + metricsRegister = metrics.DefaultRegister{} + } + // result is cached by neo-go, so we can pre-calc it designate := p.MainnetClient.GetDesignateHash() return &Processor{ log: p.Log, + metrics: metricsRegister, pool: pool, frostfsClient: p.FrostFSClient, netmapClient: p.NetmapClient, diff --git a/pkg/innerring/processors/netmap/handlers.go b/pkg/innerring/processors/netmap/handlers.go index 6adeac56..c6053e28 100644 --- a/pkg/innerring/processors/netmap/handlers.go +++ b/pkg/innerring/processors/netmap/handlers.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors" timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" @@ -16,7 +17,7 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) { // send an event to the worker pool - err := np.pool.Submit(func() { np.processNewEpochTick() }) + err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick) if err != nil { // there system can be moved into controlled degradation stage np.log.Warn(logs.NetmapNetmapWorkerPoolDrained, @@ -32,8 +33,8 @@ func (np *Processor) handleNewEpoch(ev event.Event) { // send an event to the worker pool - err := np.pool.Submit(func() { - np.processNewEpoch(epochEvent) + err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool { + return np.processNewEpoch(epochEvent) }) if err != nil { // there system can be moved into controlled degradation stage @@ -51,8 +52,8 @@ func (np *Processor) handleAddPeer(ev event.Event) { // send an event to the worker pool - err := np.pool.Submit(func() { - np.processAddPeer(newPeer) + err := processors.SubmitEvent(np.pool, np.metrics, "netmap_add_peer", func() bool { + return np.processAddPeer(newPeer) }) if err != nil { // there system can be moved into controlled degradation stage @@ -69,8 +70,8 @@ func (np *Processor) handleUpdateState(ev event.Event) { // send event to the worker pool - err := np.pool.Submit(func() { - np.processUpdatePeer(updPeer) + err := processors.SubmitEvent(np.pool, np.metrics, "netmap_update_peer", func() bool { + return np.processUpdatePeer(updPeer) }) if err != nil { // there system can be moved into controlled degradation stage @@ -91,8 +92,8 @@ func (np *Processor) handleCleanupTick(ev event.Event) { np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner")) // send event to the worker pool - err := np.pool.Submit(func() { - np.processNetmapCleanupTick(cleanup) + err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool { + return np.processNetmapCleanupTick(cleanup) }) if err != nil { // there system can be moved into controlled degradation stage diff --git a/pkg/innerring/processors/netmap/process_cleanup.go b/pkg/innerring/processors/netmap/process_cleanup.go index 287844a6..170c39e2 100644 --- a/pkg/innerring/processors/netmap/process_cleanup.go +++ b/pkg/innerring/processors/netmap/process_cleanup.go @@ -7,11 +7,11 @@ import ( "go.uber.org/zap" ) -func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) { +func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) bool { if !np.alphabetState.IsAlphabet() { np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick) - return + return true } err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error { @@ -47,5 +47,8 @@ func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) { if err != nil { np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache, zap.String("error", err.Error())) + return false } + + return true } diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index b655db9a..01bfbae6 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -10,7 +10,7 @@ import ( // Process new epoch notification by setting global epoch value and resetting // local epoch timer. -func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { +func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool { epoch := ev.EpochNumber() epochDuration, err := np.netmapClient.EpochDuration() @@ -41,7 +41,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup, zap.String("error", err.Error())) - return + return false } prm := cntClient.StartEstimationPrm{} @@ -63,13 +63,15 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()}) np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash())) np.handleNotaryDeposit(ev) + + return true } // Process new epoch tick by invoking new epoch method in network map contract. -func (np *Processor) processNewEpochTick() { +func (np *Processor) processNewEpochTick() bool { if !np.alphabetState.IsAlphabet() { np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick) - return + return true } nextEpoch := np.epochState.EpochCounter() + 1 @@ -78,5 +80,8 @@ func (np *Processor) processNewEpochTick() { err := np.netmapClient.NewEpoch(nextEpoch, false) if err != nil { np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err)) + return false } + + return true } diff --git a/pkg/innerring/processors/netmap/process_peers.go b/pkg/innerring/processors/netmap/process_peers.go index e4f1a4d6..96b8c8e9 100644 --- a/pkg/innerring/processors/netmap/process_peers.go +++ b/pkg/innerring/processors/netmap/process_peers.go @@ -12,10 +12,10 @@ import ( // Process add peer notification by sanity check of new node // local epoch timer. -func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) { +func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) bool { if !np.alphabetState.IsAlphabet() { np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification) - return + return true } // check if notary transaction is valid, see #976 @@ -26,7 +26,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) { zap.String("method", "netmap.AddPeer"), zap.String("hash", tx.Hash().StringLE()), zap.Error(err)) - return + return false } // unmarshal node info @@ -34,7 +34,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) { if err := nodeInfo.Unmarshal(ev.Node()); err != nil { // it will be nice to have tx id at event structure to log it np.log.Warn(logs.NetmapCantParseNetworkMapCandidate) - return + return false } // validate and update node info @@ -44,7 +44,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) { zap.String("error", err.Error()), ) - return + return false } // sort attributes to make it consistent @@ -81,15 +81,18 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) { if err != nil { np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err)) + return false } } + + return true } // Process update peer notification by sending approval tx to the smart contract. -func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) { +func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) bool { if !np.alphabetState.IsAlphabet() { np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification) - return + return true } // flag node to remove from local view, so it can be re-bootstrapped @@ -105,11 +108,14 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) { zap.Error(err), ) - return + return false } } if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil { np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err)) + return false } + + return true } diff --git a/pkg/innerring/processors/netmap/processor.go b/pkg/innerring/processors/netmap/processor.go index 5984cbbe..6b8a24a6 100644 --- a/pkg/innerring/processors/netmap/processor.go +++ b/pkg/innerring/processors/netmap/processor.go @@ -5,6 +5,7 @@ import ( "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" @@ -72,6 +73,7 @@ type ( // and new epoch ticker, because it is related to contract. Processor struct { log *logger.Logger + metrics metrics.Register pool *ants.Pool epochTimer EpochTimerReseter epochState EpochState @@ -93,6 +95,7 @@ type ( // Params of the processor constructor. Params struct { Log *logger.Logger + Metrics metrics.Register PoolSize int NetmapClient Client EpochTimer EpochTimerReseter @@ -145,8 +148,14 @@ func New(p *Params) (*Processor, error) { return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err) } + metricsRegister := p.Metrics + if metricsRegister == nil { + metricsRegister = metrics.DefaultRegister{} + } + return &Processor{ log: p.Log, + metrics: metricsRegister, pool: pool, epochTimer: p.EpochTimer, epochState: p.EpochState, diff --git a/pkg/innerring/processors/util.go b/pkg/innerring/processors/util.go new file mode 100644 index 00000000..364ffe25 --- /dev/null +++ b/pkg/innerring/processors/util.go @@ -0,0 +1,16 @@ +package processors + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics" + "github.com/panjf2000/ants/v2" +) + +func SubmitEvent(pool *ants.Pool, metrics metrics.Register, eventLabel string, eventProcessor func() bool) error { + return pool.Submit(func() { + start := time.Now() + success := eventProcessor() + metrics.AddEvent(time.Since(start), eventLabel, success) + }) +} diff --git a/pkg/metrics/innerring.go b/pkg/metrics/innerring.go index 79db424b..bff9184e 100644 --- a/pkg/metrics/innerring.go +++ b/pkg/metrics/innerring.go @@ -1,13 +1,23 @@ package metrics -import "github.com/prometheus/client_golang/prometheus" +import ( + "strconv" + "time" -const innerRingSubsystem = "ir" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + innerRingSubsystem = "ir" + innerRingLabelSuccess = "success" + innerRingLabelType = "type" +) // InnerRingServiceMetrics contains metrics collected by inner ring. type InnerRingServiceMetrics struct { - epoch metric[prometheus.Gauge] - health metric[prometheus.Gauge] + epoch metric[prometheus.Gauge] + health metric[prometheus.Gauge] + eventDuration metric[*prometheus.HistogramVec] } // NewInnerRingMetrics returns new instance of metrics collectors for inner ring. @@ -25,14 +35,22 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics { Name: "health", Help: "Current inner-ring node state.", }) + eventDuration = newHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: innerRingSubsystem, + Name: "event_duration_seconds", + Help: "Duration of processing of inner-ring events", + }, []string{innerRingLabelType, innerRingLabelSuccess}) ) mustRegister(epoch) mustRegister(health) + mustRegister(eventDuration) return &InnerRingServiceMetrics{ - epoch: epoch, - health: health, + epoch: epoch, + health: health, + eventDuration: eventDuration, } } @@ -45,3 +63,10 @@ func (m InnerRingServiceMetrics) SetEpoch(epoch uint64) { func (m InnerRingServiceMetrics) SetHealth(s int32) { m.health.value.Set(float64(s)) } + +func (m InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success bool) { + m.eventDuration.value.With(prometheus.Labels{ + innerRingLabelType: typ, + innerRingLabelSuccess: strconv.FormatBool(success), + }).Observe(d.Seconds()) +}