[#374] Add inner-ring event metrics #405

Merged
fyrchik merged 1 commit from ale64bit/frostfs-node:feature/374-inner-ring-metrics into master 2023-05-31 10:11:49 +00:00
27 changed files with 287 additions and 113 deletions

View file

@ -52,6 +52,7 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
s.netmapProcessor, err = netmap.New(&netmap.Params{ s.netmapProcessor, err = netmap.New(&netmap.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.netmap"), PoolSize: cfg.GetInt("workers.netmap"),
NetmapClient: netmap.NewNetmapClient(s.netmapClient), NetmapClient: netmap.NewNetmapClient(s.netmapClient),
EpochTimer: s, EpochTimer: s,
@ -162,6 +163,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
// create governance processor // create governance processor
governanceProcessor, err := governance.New(&governance.Params{ governanceProcessor, err := governance.New(&governance.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
FrostFSClient: frostfsCli, FrostFSClient: frostfsCli,
NetmapClient: s.netmapClient, NetmapClient: s.netmapClient,
AlphabetState: s, AlphabetState: s,
@ -231,6 +233,7 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{ s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets, ParsedWallets: parsedWallets,
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.alphabet"), PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: s.contracts.alphabet, AlphabetContracts: s.contracts.alphabet,
NetmapClient: s.netmapClient, NetmapClient: s.netmapClient,
@ -255,6 +258,7 @@ func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.C
// container processor // container processor
containerProcessor, err := cont.New(&cont.Params{ containerProcessor, err := cont.New(&cont.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.container"), PoolSize: cfg.GetInt("workers.container"),
AlphabetState: s, AlphabetState: s,
ContainerClient: cnrClient, ContainerClient: cnrClient,
@ -273,6 +277,7 @@ func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClien
// create balance processor // create balance processor
balanceProcessor, err := balance.New(&balance.Params{ balanceProcessor, err := balance.New(&balance.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.balance"), PoolSize: cfg.GetInt("workers.balance"),
FrostFSClient: frostfsCli, FrostFSClient: frostfsCli,
BalanceSC: s.contracts.balance, BalanceSC: s.contracts.balance,
@ -293,6 +298,7 @@ func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *
frostfsProcessor, err := frostfs.New(&frostfs.Params{ frostfsProcessor, err := frostfs.New(&frostfs.Params{
Log: s.log, Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.frostfs"), PoolSize: cfg.GetInt("workers.frostfs"),
FrostFSContract: s.contracts.frostfs, FrostFSContract: s.contracts.frostfs,
FrostFSIDClient: frostfsIDClient, FrostFSIDClient: frostfsIDClient,

View file

@ -0,0 +1,15 @@
package metrics
import "time"
type Register interface {
SetEpoch(epoch uint64)
SetHealth(s int32)
AddEvent(d time.Duration, typ string, success bool)
}
type DefaultRegister struct{}
func (DefaultRegister) SetEpoch(uint64) {}
func (DefaultRegister) SetHealth(int32) {}
func (DefaultRegister) AddEvent(time.Duration, string, bool) {}

View file

@ -2,6 +2,7 @@ package alphabet
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"go.uber.org/zap" "go.uber.org/zap"
@ -13,7 +14,7 @@ func (ap *Processor) HandleGasEmission(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := ap.pool.Submit(func() { ap.processEmit() }) err := processors.SubmitEvent(ap.pool, ap.metrics, "alphabet_emit_gas", ap.processEmit)
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained, ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained,

View file

@ -13,12 +13,12 @@ import (
const emitMethod = "emit" const emitMethod = "emit"
func (ap *Processor) processEmit() { func (ap *Processor) processEmit() bool {
index := ap.irList.AlphabetIndex() index := ap.irList.AlphabetIndex()
if index < 0 { if index < 0 {
ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent) ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent)
return return true
} }
contract, ok := ap.alphabetContracts.GetByIndex(index) contract, ok := ap.alphabetContracts.GetByIndex(index)
@ -26,7 +26,7 @@ func (ap *Processor) processEmit() {
ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent, ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent,
zap.Int("index", index)) zap.Int("index", index))
return return false
} }
// there is no signature collecting, so we don't need extra fee // there is no signature collecting, so we don't need extra fee
@ -34,13 +34,13 @@ func (ap *Processor) processEmit() {
if err != nil { if err != nil {
ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error())) ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error()))
return return false
} }
if ap.storageEmission == 0 { if ap.storageEmission == 0 {
ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff) ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff)
return return true
} }
networkMap, err := ap.netmapClient.NetMap() networkMap, err := ap.netmapClient.NetMap()
@ -48,7 +48,7 @@ func (ap *Processor) processEmit() {
ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes, ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
nmNodes := networkMap.Nodes() nmNodes := networkMap.Nodes()
@ -63,7 +63,7 @@ func (ap *Processor) processEmit() {
zap.Int("extra_wallets", extraLen)) zap.Int("extra_wallets", extraLen))
if nmLen+extraLen == 0 { if nmLen+extraLen == 0 {
return return true
} }
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen)) gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen))
@ -71,6 +71,8 @@ func (ap *Processor) processEmit() {
ap.transferGasToNetmapNodes(nmNodes, gasPerNode) ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
ap.transferGasToExtraNodes(pw, gasPerNode) ap.transferGasToExtraNodes(pw, gasPerNode)
return true
} }
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) { func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {

View file

@ -7,6 +7,7 @@ import (
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -50,6 +51,7 @@ type (
// protects parsedWallets from concurrent change // protects parsedWallets from concurrent change
pwLock *sync.RWMutex pwLock *sync.RWMutex
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
alphabetContracts Contracts alphabetContracts Contracts
netmapClient netmapClient netmapClient netmapClient
@ -62,6 +64,7 @@ type (
Params struct { Params struct {
ParsedWallets []util.Uint160 ParsedWallets []util.Uint160
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
AlphabetContracts Contracts AlphabetContracts Contracts
NetmapClient netmapClient NetmapClient netmapClient
@ -89,10 +92,16 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
parsedWallets: p.ParsedWallets, parsedWallets: p.ParsedWallets,
pwLock: new(sync.RWMutex), pwLock: new(sync.RWMutex),
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
alphabetContracts: p.AlphabetContracts, alphabetContracts: p.AlphabetContracts,
netmapClient: p.NetmapClient, netmapClient: p.NetmapClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex" "encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance" balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
"go.uber.org/zap" "go.uber.org/zap"
@ -17,7 +18,9 @@ func (bp *Processor) handleLock(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := bp.pool.Submit(func() { bp.processLock(&lock) }) err := processors.SubmitEvent(bp.pool, bp.metrics, "lock", func() bool {
return bp.processLock(&lock)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained, bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained,

View file

@ -9,10 +9,10 @@ import (
// Process lock event by invoking Cheque method in main net to send assets // Process lock event by invoking Cheque method in main net to send assets
// back to the withdraw issuer. // back to the withdraw issuer.
func (bp *Processor) processLock(lock *balanceEvent.Lock) { func (bp *Processor) processLock(lock *balanceEvent.Lock) bool {
if !bp.alphabetState.IsAlphabet() { if !bp.alphabetState.IsAlphabet() {
bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock) bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock)
return return true
} }
prm := frostfsContract.ChequePrm{} prm := frostfsContract.ChequePrm{}
@ -26,5 +26,8 @@ func (bp *Processor) processLock(lock *balanceEvent.Lock) {
err := bp.frostfsClient.Cheque(prm) err := bp.frostfsClient.Cheque(prm)
if err != nil { if err != nil {
bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err)) bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err))
return false
} }
return true
} }

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance" balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
@ -32,6 +33,7 @@ type (
// Processor of events produced by balance contract in the morphchain. // Processor of events produced by balance contract in the morphchain.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
frostfsClient FrostFSClient frostfsClient FrostFSClient
balanceSC util.Uint160 balanceSC util.Uint160
@ -42,6 +44,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
FrostFSClient FrostFSClient FrostFSClient FrostFSClient
BalanceSC util.Uint160 BalanceSC util.Uint160
@ -72,8 +75,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
frostfsClient: p.FrostFSClient, frostfsClient: p.FrostFSClient,
balanceSC: p.BalanceSC, balanceSC: p.BalanceSC,

View file

@ -4,6 +4,7 @@ import (
"crypto/sha256" "crypto/sha256"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
"github.com/mr-tron/base58" "github.com/mr-tron/base58"
@ -20,7 +21,9 @@ func (cp *Processor) handlePut(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerPut(put) }) err := processors.SubmitEvent(cp.pool, cp.metrics, "container_put", func() bool {
return cp.processContainerPut(put)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained, cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
@ -36,7 +39,9 @@ func (cp *Processor) handleDelete(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerDelete(del) }) err := processors.SubmitEvent(cp.pool, cp.metrics, "container_delete", func() bool {
return cp.processContainerDelete(del)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained, cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
@ -53,8 +58,8 @@ func (cp *Processor) handleSetEACL(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := cp.pool.Submit(func() { err := processors.SubmitEvent(cp.pool, cp.metrics, "container_set_eacl", func() bool {
cp.processSetEACL(e) return cp.processSetEACL(e)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage

View file

@ -31,10 +31,10 @@ type putContainerContext struct {
// Process a new container from the user by checking the container sanity // Process a new container from the user by checking the container sanity
// and sending approve tx back to the morph. // and sending approve tx back to the morph.
func (cp *Processor) processContainerPut(put putEvent) { func (cp *Processor) processContainerPut(put putEvent) bool {
if !cp.alphabetState.IsAlphabet() { if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut) cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut)
return return true
} }
ctx := &putContainerContext{ ctx := &putContainerContext{
@ -47,10 +47,17 @@ func (cp *Processor) processContainerPut(put putEvent) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
cp.approvePutContainer(ctx) if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
zap.String("error", err.Error()),
)
return false
}
return true
} }
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error { func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
@ -89,20 +96,12 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
return nil return nil
} }
func (cp *Processor) approvePutContainer(ctx *putContainerContext) {
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
zap.String("error", err.Error()),
)
}
}
// Process delete container operation from the user by checking container sanity // Process delete container operation from the user by checking container sanity
// and sending approve tx back to morph. // and sending approve tx back to morph.
func (cp *Processor) processContainerDelete(e containerEvent.Delete) { func (cp *Processor) processContainerDelete(e containerEvent.Delete) bool {
if !cp.alphabetState.IsAlphabet() { if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete) cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete)
return return true
} }
err := cp.checkDeleteContainer(e) err := cp.checkDeleteContainer(e)
@ -111,10 +110,18 @@ func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
cp.approveDeleteContainer(e) if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
zap.String("error", err.Error()),
)
return false
}
return true
} }
func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error { func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
@ -149,14 +156,6 @@ func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
return nil return nil
} }
func (cp *Processor) approveDeleteContainer(e containerEvent.Delete) {
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
zap.String("error", err.Error()),
)
}
}
func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error { func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error {
// fetch domain info // fetch domain info
ctx.d = containerSDK.ReadDomain(cnr) ctx.d = containerSDK.ReadDomain(cnr)

View file

@ -12,10 +12,10 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) { func (cp *Processor) processSetEACL(e containerEvent.SetEACL) bool {
if !cp.alphabetState.IsAlphabet() { if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL) cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL)
return return true
} }
err := cp.checkSetEACL(e) err := cp.checkSetEACL(e)
@ -24,10 +24,17 @@ func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
cp.approveSetEACL(e) if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
zap.String("error", err.Error()),
)
return false
}
return true
} }
func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error { func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
@ -73,11 +80,3 @@ func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
return nil return nil
} }
func (cp *Processor) approveSetEACL(e containerEvent.SetEACL) {
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
zap.String("error", err.Error()),
)
}
}

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container" containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
@ -40,6 +41,7 @@ type (
// Processor of events produced by container contract in the sidechain. // Processor of events produced by container contract in the sidechain.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
alphabetState AlphabetState alphabetState AlphabetState
cnrClient ContClient // notary must be enabled cnrClient ContClient // notary must be enabled
@ -51,6 +53,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
AlphabetState AlphabetState AlphabetState AlphabetState
ContainerClient ContClient ContainerClient ContClient
@ -102,8 +105,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
alphabetState: p.AlphabetState, alphabetState: p.AlphabetState,
cnrClient: p.ContainerClient, cnrClient: p.ContainerClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex" "encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs" frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs"
"github.com/nspcc-dev/neo-go/pkg/util/slice" "github.com/nspcc-dev/neo-go/pkg/util/slice"
@ -18,7 +19,9 @@ func (np *Processor) handleDeposit(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processDeposit(deposit) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_deposit", func() bool {
return np.processDeposit(deposit)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -34,7 +37,9 @@ func (np *Processor) handleWithdraw(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processWithdraw(withdraw) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_withdraw", func() bool {
return np.processWithdraw(withdraw)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -50,7 +55,9 @@ func (np *Processor) handleCheque(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processCheque(cheque) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_cheque", func() bool {
return np.processCheque(cheque)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -67,7 +74,9 @@ func (np *Processor) handleConfig(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processConfig(cfg) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_config", func() bool {
return np.processConfig(cfg)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -83,7 +92,9 @@ func (np *Processor) handleBind(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processBind(e, true) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_bind", func() bool {
return np.processBind(e, true)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -99,7 +110,9 @@ func (np *Processor) handleUnbind(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { np.processBind(e, false) }) err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_unbind", func() bool {
return np.processBind(e, false)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained, np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,

View file

@ -15,10 +15,10 @@ const (
// Process deposit event by invoking a balance contract and sending native // Process deposit event by invoking a balance contract and sending native
// gas in the sidechain. // gas in the sidechain.
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) { func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit)
return return true
} }
prm := balance.MintPrm{} prm := balance.MintPrm{}
@ -49,7 +49,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
zap.Uint64("last_emission", val), zap.Uint64("last_emission", val),
zap.Uint64("current_epoch", curEpoch)) zap.Uint64("current_epoch", curEpoch))
return return false
} }
// get gas balance of the node // get gas balance of the node
@ -57,7 +57,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
balance, err := np.morphClient.GasBalance() balance, err := np.morphClient.GasBalance()
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err)) np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err))
return return false
} }
if balance < np.gasBalanceThreshold { if balance < np.gasBalanceThreshold {
@ -65,7 +65,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
zap.Int64("balance", balance), zap.Int64("balance", balance),
zap.Int64("threshold", np.gasBalanceThreshold)) zap.Int64("threshold", np.gasBalanceThreshold))
return return false
} }
err = np.morphClient.TransferGas(receiver, np.mintEmitValue) err = np.morphClient.TransferGas(receiver, np.mintEmitValue)
@ -73,24 +73,26 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver, np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
np.mintEmitCache.Add(receiver.String(), curEpoch) np.mintEmitCache.Add(receiver.String(), curEpoch)
return true
} }
// Process withdraw event by locking assets in the balance account. // Process withdraw event by locking assets in the balance account.
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) { func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw)
return return true
} }
// create lock account // create lock account
lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size]) lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size])
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err)) np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err))
return return false
} }
curEpoch := np.epochState.EpochCounter() curEpoch := np.epochState.EpochCounter()
@ -106,15 +108,18 @@ func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
err = np.balanceClient.Lock(prm) err = np.balanceClient.Lock(prm)
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err)) np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err))
return false
} }
return true
} }
// Process cheque event by transferring assets from the lock account back to // Process cheque event by transferring assets from the lock account back to
// the reserve account. // the reserve account.
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) { func (np *Processor) processCheque(cheque frostfsEvent.Cheque) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque)
return return true
} }
prm := balance.BurnPrm{} prm := balance.BurnPrm{}
@ -126,5 +131,8 @@ func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
err := np.balanceClient.Burn(prm) err := np.balanceClient.Burn(prm)
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err)) np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err))
return false
} }
return true
} }

View file

@ -18,10 +18,10 @@ type bindCommon interface {
TxHash() util.Uint256 TxHash() util.Uint256
} }
func (np *Processor) processBind(e bindCommon, bind bool) { func (np *Processor) processBind(e bindCommon, bind bool) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind)
return return true
} }
c := &bindCommonContext{ c := &bindCommonContext{
@ -36,10 +36,10 @@ func (np *Processor) processBind(e bindCommon, bind bool) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
np.approveBindCommon(c) return np.approveBindCommon(c) == nil
} }
type bindCommonContext struct { type bindCommonContext struct {
@ -70,7 +70,7 @@ func (np *Processor) checkBindCommon(e *bindCommonContext) error {
return nil return nil
} }
func (np *Processor) approveBindCommon(e *bindCommonContext) { func (np *Processor) approveBindCommon(e *bindCommonContext) error {
// calculate wallet address // calculate wallet address
scriptHash := e.User() scriptHash := e.User()
@ -80,7 +80,7 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return err
} }
var id user.ID var id user.ID
@ -104,4 +104,6 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
np.log.Error(fmt.Sprintf("could not approve %s", typ), np.log.Error(fmt.Sprintf("could not approve %s", typ),
zap.String("error", err.Error())) zap.String("error", err.Error()))
} }
return err
} }

View file

@ -9,10 +9,10 @@ import (
// Process config event by setting configuration value from the mainchain in // Process config event by setting configuration value from the mainchain in
// the sidechain. // the sidechain.
func (np *Processor) processConfig(config frostfsEvent.Config) { func (np *Processor) processConfig(config frostfsEvent.Config) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig) np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig)
return return true
} }
prm := nmClient.SetConfigPrm{} prm := nmClient.SetConfigPrm{}
@ -25,5 +25,8 @@ func (np *Processor) processConfig(config frostfsEvent.Config) {
err := np.netmapClient.SetConfig(prm) err := np.netmapClient.SetConfig(prm)
if err != nil { if err != nil {
np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err)) np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err))
return false
} }
return true
} }

View file

@ -6,6 +6,7 @@ import (
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
@ -58,6 +59,7 @@ type (
// Processor of events produced by frostfs contract in main net. // Processor of events produced by frostfs contract in main net.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
frostfsContract util.Uint160 frostfsContract util.Uint160
balanceClient BalanceClient balanceClient BalanceClient
@ -77,6 +79,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
FrostFSContract util.Uint160 FrostFSContract util.Uint160
FrostFSIDClient IDClient FrostFSIDClient IDClient
@ -129,8 +132,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err) return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
frostfsContract: p.FrostFSContract, frostfsContract: p.FrostFSContract,
balanceClient: p.BalanceClient, balanceClient: p.BalanceClient,

View file

@ -2,6 +2,7 @@ package governance
import ( import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement"
"github.com/nspcc-dev/neo-go/pkg/core/native" "github.com/nspcc-dev/neo-go/pkg/core/native"
@ -35,7 +36,9 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
// send event to the worker pool // send event to the worker pool
err := gp.pool.Submit(func() { gp.processAlphabetSync(hash) }) err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
return gp.processAlphabetSync(hash)
})
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained, gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained,

View file

@ -18,36 +18,36 @@ const (
alphabetUpdateIDPrefix = "AlphabetUpdate" alphabetUpdateIDPrefix = "AlphabetUpdate"
) )
func (gp *Processor) processAlphabetSync(txHash util.Uint256) { func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
if !gp.alphabetState.IsAlphabet() { if !gp.alphabetState.IsAlphabet() {
gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync) gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
return return true
} }
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList() mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
if err != nil { if err != nil {
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet, gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
sidechainAlphabet, err := gp.morphClient.Committee() sidechainAlphabet, err := gp.morphClient.Committee()
if err != nil { if err != nil {
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain, gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet) newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
if err != nil { if err != nil {
gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain, gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
if newAlphabet == nil { if newAlphabet == nil {
gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged) gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
return return true
} }
gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate, gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
@ -77,6 +77,8 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
gp.updateFrostFSContractInMainnet(newAlphabet) gp.updateFrostFSContractInMainnet(newAlphabet)
gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate) gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate)
return true
} }
func prettyKeys(keys keys.PublicKeys) string { func prettyKeys(keys keys.PublicKeys) string {

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs" frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap" nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
@ -75,6 +76,7 @@ type (
// Processor of events related to governance in the network. // Processor of events related to governance in the network.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
frostfsClient FrostFSClient frostfsClient FrostFSClient
netmapClient NetmapClient netmapClient NetmapClient
@ -93,6 +95,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
AlphabetState AlphabetState AlphabetState AlphabetState
EpochState EpochState EpochState EpochState
@ -130,11 +133,17 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
// result is cached by neo-go, so we can pre-calc it // result is cached by neo-go, so we can pre-calc it
designate := p.MainnetClient.GetDesignateHash() designate := p.MainnetClient.GetDesignateHash()
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
frostfsClient: p.FrostFSClient, frostfsClient: p.FrostFSClient,
netmapClient: p.NetmapClient, netmapClient: p.NetmapClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex" "encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers" timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap" netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
@ -16,7 +17,7 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := np.pool.Submit(func() { np.processNewEpochTick() }) err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick)
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
np.log.Warn(logs.NetmapNetmapWorkerPoolDrained, np.log.Warn(logs.NetmapNetmapWorkerPoolDrained,
@ -32,8 +33,8 @@ func (np *Processor) handleNewEpoch(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := np.pool.Submit(func() { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
np.processNewEpoch(epochEvent) return np.processNewEpoch(epochEvent)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
@ -51,8 +52,8 @@ func (np *Processor) handleAddPeer(ev event.Event) {
// send an event to the worker pool // send an event to the worker pool
err := np.pool.Submit(func() { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_add_peer", func() bool {
np.processAddPeer(newPeer) return np.processAddPeer(newPeer)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
@ -69,8 +70,8 @@ func (np *Processor) handleUpdateState(ev event.Event) {
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_update_peer", func() bool {
np.processUpdatePeer(updPeer) return np.processUpdatePeer(updPeer)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage
@ -91,8 +92,8 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner")) np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner"))
// send event to the worker pool // send event to the worker pool
err := np.pool.Submit(func() { err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
np.processNetmapCleanupTick(cleanup) return np.processNetmapCleanupTick(cleanup)
}) })
if err != nil { if err != nil {
// there system can be moved into controlled degradation stage // there system can be moved into controlled degradation stage

View file

@ -7,11 +7,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) { func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick) np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick)
return return true
} }
err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error { err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error {
@ -47,5 +47,8 @@ func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
if err != nil { if err != nil {
np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache, np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return false
} }
return true
} }

View file

@ -10,7 +10,7 @@ import (
// Process new epoch notification by setting global epoch value and resetting // Process new epoch notification by setting global epoch value and resetting
// local epoch timer. // local epoch timer.
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
epoch := ev.EpochNumber() epoch := ev.EpochNumber()
epochDuration, err := np.netmapClient.EpochDuration() epochDuration, err := np.netmapClient.EpochDuration()
@ -41,7 +41,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup, np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
zap.String("error", err.Error())) zap.String("error", err.Error()))
return return false
} }
prm := cntClient.StartEstimationPrm{} prm := cntClient.StartEstimationPrm{}
@ -63,13 +63,15 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()}) np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash())) np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
np.handleNotaryDeposit(ev) np.handleNotaryDeposit(ev)
return true
} }
// Process new epoch tick by invoking new epoch method in network map contract. // Process new epoch tick by invoking new epoch method in network map contract.
func (np *Processor) processNewEpochTick() { func (np *Processor) processNewEpochTick() bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick) np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
return return true
} }
nextEpoch := np.epochState.EpochCounter() + 1 nextEpoch := np.epochState.EpochCounter() + 1
@ -78,5 +80,8 @@ func (np *Processor) processNewEpochTick() {
err := np.netmapClient.NewEpoch(nextEpoch, false) err := np.netmapClient.NewEpoch(nextEpoch, false)
if err != nil { if err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err)) np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
return false
} }
return true
} }

View file

@ -12,10 +12,10 @@ import (
// Process add peer notification by sanity check of new node // Process add peer notification by sanity check of new node
// local epoch timer. // local epoch timer.
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) { func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification) np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification)
return return true
} }
// check if notary transaction is valid, see #976 // check if notary transaction is valid, see #976
@ -26,7 +26,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
zap.String("method", "netmap.AddPeer"), zap.String("method", "netmap.AddPeer"),
zap.String("hash", tx.Hash().StringLE()), zap.String("hash", tx.Hash().StringLE()),
zap.Error(err)) zap.Error(err))
return return false
} }
// unmarshal node info // unmarshal node info
@ -34,7 +34,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
if err := nodeInfo.Unmarshal(ev.Node()); err != nil { if err := nodeInfo.Unmarshal(ev.Node()); err != nil {
// it will be nice to have tx id at event structure to log it // it will be nice to have tx id at event structure to log it
np.log.Warn(logs.NetmapCantParseNetworkMapCandidate) np.log.Warn(logs.NetmapCantParseNetworkMapCandidate)
return return false
} }
// validate and update node info // validate and update node info
@ -44,7 +44,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
return return false
} }
// sort attributes to make it consistent // sort attributes to make it consistent
@ -81,15 +81,18 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
if err != nil { if err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err)) np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err))
return false
} }
} }
return true
} }
// Process update peer notification by sending approval tx to the smart contract. // Process update peer notification by sending approval tx to the smart contract.
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) { func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) bool {
if !np.alphabetState.IsAlphabet() { if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification) np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification)
return return true
} }
// flag node to remove from local view, so it can be re-bootstrapped // flag node to remove from local view, so it can be re-bootstrapped
@ -105,11 +108,14 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
zap.Error(err), zap.Error(err),
) )
return return false
} }
} }
if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil { if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err)) np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err))
return false
} }
return true
} }

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
@ -72,6 +73,7 @@ type (
// and new epoch ticker, because it is related to contract. // and new epoch ticker, because it is related to contract.
Processor struct { Processor struct {
log *logger.Logger log *logger.Logger
metrics metrics.Register
pool *ants.Pool pool *ants.Pool
epochTimer EpochTimerReseter epochTimer EpochTimerReseter
epochState EpochState epochState EpochState
@ -93,6 +95,7 @@ type (
// Params of the processor constructor. // Params of the processor constructor.
Params struct { Params struct {
Log *logger.Logger Log *logger.Logger
Metrics metrics.Register
PoolSize int PoolSize int
NetmapClient Client NetmapClient Client
EpochTimer EpochTimerReseter EpochTimer EpochTimerReseter
@ -145,8 +148,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err) return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err)
} }
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{ return &Processor{
log: p.Log, log: p.Log,
metrics: metricsRegister,
pool: pool, pool: pool,
epochTimer: p.EpochTimer, epochTimer: p.EpochTimer,
epochState: p.EpochState, epochState: p.EpochState,

View file

@ -0,0 +1,16 @@
package processors
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"github.com/panjf2000/ants/v2"
)
func SubmitEvent(pool *ants.Pool, metrics metrics.Register, eventLabel string, eventProcessor func() bool) error {
return pool.Submit(func() {
start := time.Now()
Review

Why did you decide to have start here and not before we submit the task in pool?
Just asking.

Why did you decide to have `start` here and not _before_ we submit the task in pool? Just asking.
Review

well, because my intention was to measure the actual event processor duration,
excluding whatever wait time it spends on the pool. The pool wait time is statistically unrelated to the event processor time for specific event types, and it can be measured separately if you think it's interesting.

well, because my intention was to measure the actual event processor duration, excluding whatever wait time it spends on the pool. The pool wait time is statistically unrelated to the event processor time for specific event types, and it can be measured separately if you think it's interesting.
success := eventProcessor()
metrics.AddEvent(time.Since(start), eventLabel, success)
})
}

View file

@ -1,13 +1,23 @@
package metrics package metrics
import "github.com/prometheus/client_golang/prometheus" import (
"strconv"
"time"
const innerRingSubsystem = "ir" "github.com/prometheus/client_golang/prometheus"
)
const (
innerRingSubsystem = "ir"
innerRingLabelSuccess = "success"
innerRingLabelType = "type"
)
// InnerRingServiceMetrics contains metrics collected by inner ring. // InnerRingServiceMetrics contains metrics collected by inner ring.
type InnerRingServiceMetrics struct { type InnerRingServiceMetrics struct {
epoch metric[prometheus.Gauge] epoch metric[prometheus.Gauge]
health metric[prometheus.Gauge] health metric[prometheus.Gauge]
eventDuration metric[*prometheus.HistogramVec]
} }
// NewInnerRingMetrics returns new instance of metrics collectors for inner ring. // NewInnerRingMetrics returns new instance of metrics collectors for inner ring.
@ -25,14 +35,22 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
Name: "health", Name: "health",
Help: "Current inner-ring node state.", Help: "Current inner-ring node state.",
}) })
eventDuration = newHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: innerRingSubsystem,
Name: "event_duration_seconds",
Help: "Duration of processing of inner-ring events",
}, []string{innerRingLabelType, innerRingLabelSuccess})
) )
mustRegister(epoch) mustRegister(epoch)
mustRegister(health) mustRegister(health)
mustRegister(eventDuration)
return &InnerRingServiceMetrics{ return &InnerRingServiceMetrics{
epoch: epoch, epoch: epoch,
health: health, health: health,
eventDuration: eventDuration,
} }
} }
@ -45,3 +63,10 @@ func (m InnerRingServiceMetrics) SetEpoch(epoch uint64) {
func (m InnerRingServiceMetrics) SetHealth(s int32) { func (m InnerRingServiceMetrics) SetHealth(s int32) {
m.health.value.Set(float64(s)) m.health.value.Set(float64(s))
} }
func (m InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success bool) {
m.eventDuration.value.With(prometheus.Labels{
innerRingLabelType: typ,
innerRingLabelSuccess: strconv.FormatBool(success),
}).Observe(d.Seconds())
}