[#374] Add inner-ring event metrics #405
27 changed files with 287 additions and 113 deletions
|
@ -52,6 +52,7 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
|
|||
|
||||
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
PoolSize: cfg.GetInt("workers.netmap"),
|
||||
NetmapClient: netmap.NewNetmapClient(s.netmapClient),
|
||||
EpochTimer: s,
|
||||
|
@ -162,6 +163,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
|
|||
// create governance processor
|
||||
governanceProcessor, err := governance.New(&governance.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
FrostFSClient: frostfsCli,
|
||||
NetmapClient: s.netmapClient,
|
||||
AlphabetState: s,
|
||||
|
@ -231,6 +233,7 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
|
|||
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
|
||||
ParsedWallets: parsedWallets,
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
PoolSize: cfg.GetInt("workers.alphabet"),
|
||||
AlphabetContracts: s.contracts.alphabet,
|
||||
NetmapClient: s.netmapClient,
|
||||
|
@ -255,6 +258,7 @@ func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.C
|
|||
// container processor
|
||||
containerProcessor, err := cont.New(&cont.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
PoolSize: cfg.GetInt("workers.container"),
|
||||
AlphabetState: s,
|
||||
ContainerClient: cnrClient,
|
||||
|
@ -273,6 +277,7 @@ func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClien
|
|||
// create balance processor
|
||||
balanceProcessor, err := balance.New(&balance.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
PoolSize: cfg.GetInt("workers.balance"),
|
||||
FrostFSClient: frostfsCli,
|
||||
BalanceSC: s.contracts.balance,
|
||||
|
@ -293,6 +298,7 @@ func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *
|
|||
|
||||
frostfsProcessor, err := frostfs.New(&frostfs.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
PoolSize: cfg.GetInt("workers.frostfs"),
|
||||
FrostFSContract: s.contracts.frostfs,
|
||||
FrostFSIDClient: frostfsIDClient,
|
||||
|
|
15
pkg/innerring/metrics/metrics.go
Normal file
15
pkg/innerring/metrics/metrics.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package metrics
|
||||
|
||||
import "time"
|
||||
|
||||
type Register interface {
|
||||
SetEpoch(epoch uint64)
|
||||
SetHealth(s int32)
|
||||
AddEvent(d time.Duration, typ string, success bool)
|
||||
}
|
||||
|
||||
type DefaultRegister struct{}
|
||||
|
||||
func (DefaultRegister) SetEpoch(uint64) {}
|
||||
func (DefaultRegister) SetHealth(int32) {}
|
||||
func (DefaultRegister) AddEvent(time.Duration, string, bool) {}
|
|
@ -2,6 +2,7 @@ package alphabet
|
|||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"go.uber.org/zap"
|
||||
|
@ -13,7 +14,7 @@ func (ap *Processor) HandleGasEmission(ev event.Event) {
|
|||
|
||||
// send event to the worker pool
|
||||
|
||||
err := ap.pool.Submit(func() { ap.processEmit() })
|
||||
err := processors.SubmitEvent(ap.pool, ap.metrics, "alphabet_emit_gas", ap.processEmit)
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained,
|
||||
|
|
|
@ -13,12 +13,12 @@ import (
|
|||
|
||||
const emitMethod = "emit"
|
||||
|
||||
func (ap *Processor) processEmit() {
|
||||
func (ap *Processor) processEmit() bool {
|
||||
index := ap.irList.AlphabetIndex()
|
||||
if index < 0 {
|
||||
ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent)
|
||||
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
contract, ok := ap.alphabetContracts.GetByIndex(index)
|
||||
|
@ -26,7 +26,7 @@ func (ap *Processor) processEmit() {
|
|||
ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent,
|
||||
zap.Int("index", index))
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// there is no signature collecting, so we don't need extra fee
|
||||
|
@ -34,13 +34,13 @@ func (ap *Processor) processEmit() {
|
|||
if err != nil {
|
||||
ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
if ap.storageEmission == 0 {
|
||||
ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff)
|
||||
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
networkMap, err := ap.netmapClient.NetMap()
|
||||
|
@ -48,7 +48,7 @@ func (ap *Processor) processEmit() {
|
|||
ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes,
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
nmNodes := networkMap.Nodes()
|
||||
|
@ -63,7 +63,7 @@ func (ap *Processor) processEmit() {
|
|||
zap.Int("extra_wallets", extraLen))
|
||||
|
||||
if nmLen+extraLen == 0 {
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen))
|
||||
|
@ -71,6 +71,8 @@ func (ap *Processor) processEmit() {
|
|||
ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
|
||||
|
||||
ap.transferGasToExtraNodes(pw, gasPerNode)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
|
@ -50,6 +51,7 @@ type (
|
|||
// protects parsedWallets from concurrent change
|
||||
pwLock *sync.RWMutex
|
||||
log *logger.Logger
|
||||
metrics metrics.Register
|
||||
pool *ants.Pool
|
||||
alphabetContracts Contracts
|
||||
netmapClient netmapClient
|
||||
|
@ -62,6 +64,7 @@ type (
|
|||
Params struct {
|
||||
ParsedWallets []util.Uint160
|
||||
Log *logger.Logger
|
||||
Metrics metrics.Register
|
||||
PoolSize int
|
||||
AlphabetContracts Contracts
|
||||
NetmapClient netmapClient
|
||||
|
@ -89,10 +92,16 @@ func New(p *Params) (*Processor, error) {
|
|||
return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err)
|
||||
}
|
||||
|
||||
metricsRegister := p.Metrics
|
||||
if metricsRegister == nil {
|
||||
metricsRegister = metrics.DefaultRegister{}
|
||||
}
|
||||
|
||||
return &Processor{
|
||||
parsedWallets: p.ParsedWallets,
|
||||
pwLock: new(sync.RWMutex),
|
||||
log: p.Log,
|
||||
metrics: metricsRegister,
|
||||
pool: pool,
|
||||
alphabetContracts: p.AlphabetContracts,
|
||||
netmapClient: p.NetmapClient,
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/hex"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
|
||||
"go.uber.org/zap"
|
||||
|
@ -17,7 +18,9 @@ func (bp *Processor) handleLock(ev event.Event) {
|
|||
|
||||
// send an event to the worker pool
|
||||
|
||||
err := bp.pool.Submit(func() { bp.processLock(&lock) })
|
||||
err := processors.SubmitEvent(bp.pool, bp.metrics, "lock", func() bool {
|
||||
return bp.processLock(&lock)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained,
|
||||
|
|
|
@ -9,10 +9,10 @@ import (
|
|||
|
||||
// Process lock event by invoking Cheque method in main net to send assets
|
||||
// back to the withdraw issuer.
|
||||
func (bp *Processor) processLock(lock *balanceEvent.Lock) {
|
||||
func (bp *Processor) processLock(lock *balanceEvent.Lock) bool {
|
||||
if !bp.alphabetState.IsAlphabet() {
|
||||
bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
prm := frostfsContract.ChequePrm{}
|
||||
|
@ -26,5 +26,8 @@ func (bp *Processor) processLock(lock *balanceEvent.Lock) {
|
|||
err := bp.frostfsClient.Cheque(prm)
|
||||
if err != nil {
|
||||
bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
|
||||
|
@ -32,6 +33,7 @@ type (
|
|||
// Processor of events produced by balance contract in the morphchain.
|
||||
Processor struct {
|
||||
log *logger.Logger
|
||||
metrics metrics.Register
|
||||
pool *ants.Pool
|
||||
frostfsClient FrostFSClient
|
||||
balanceSC util.Uint160
|
||||
|
@ -42,6 +44,7 @@ type (
|
|||
// Params of the processor constructor.
|
||||
Params struct {
|
||||
Log *logger.Logger
|
||||
Metrics metrics.Register
|
||||
PoolSize int
|
||||
FrostFSClient FrostFSClient
|
||||
BalanceSC util.Uint160
|
||||
|
@ -72,8 +75,14 @@ func New(p *Params) (*Processor, error) {
|
|||
return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err)
|
||||
}
|
||||
|
||||
metricsRegister := p.Metrics
|
||||
if metricsRegister == nil {
|
||||
metricsRegister = metrics.DefaultRegister{}
|
||||
}
|
||||
|
||||
return &Processor{
|
||||
log: p.Log,
|
||||
metrics: metricsRegister,
|
||||
pool: pool,
|
||||
frostfsClient: p.FrostFSClient,
|
||||
balanceSC: p.BalanceSC,
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"crypto/sha256"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
||||
"github.com/mr-tron/base58"
|
||||
|
@ -20,7 +21,9 @@ func (cp *Processor) handlePut(ev event.Event) {
|
|||
|
||||
// send an event to the worker pool
|
||||
|
||||
err := cp.pool.Submit(func() { cp.processContainerPut(put) })
|
||||
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_put", func() bool {
|
||||
return cp.processContainerPut(put)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||
|
@ -36,7 +39,9 @@ func (cp *Processor) handleDelete(ev event.Event) {
|
|||
|
||||
// send an event to the worker pool
|
||||
|
||||
err := cp.pool.Submit(func() { cp.processContainerDelete(del) })
|
||||
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_delete", func() bool {
|
||||
return cp.processContainerDelete(del)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
|
||||
|
@ -53,8 +58,8 @@ func (cp *Processor) handleSetEACL(ev event.Event) {
|
|||
|
||||
// send an event to the worker pool
|
||||
|
||||
err := cp.pool.Submit(func() {
|
||||
cp.processSetEACL(e)
|
||||
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_set_eacl", func() bool {
|
||||
return cp.processSetEACL(e)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
|
|
|
@ -31,10 +31,10 @@ type putContainerContext struct {
|
|||
|
||||
// Process a new container from the user by checking the container sanity
|
||||
// and sending approve tx back to the morph.
|
||||
func (cp *Processor) processContainerPut(put putEvent) {
|
||||
func (cp *Processor) processContainerPut(put putEvent) bool {
|
||||
if !cp.alphabetState.IsAlphabet() {
|
||||
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
ctx := &putContainerContext{
|
||||
|
@ -47,10 +47,17 @@ func (cp *Processor) processContainerPut(put putEvent) {
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
cp.approvePutContainer(ctx)
|
||||
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
|
||||
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
|
||||
|
@ -89,20 +96,12 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cp *Processor) approvePutContainer(ctx *putContainerContext) {
|
||||
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
|
||||
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Process delete container operation from the user by checking container sanity
|
||||
// and sending approve tx back to morph.
|
||||
func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
|
||||
func (cp *Processor) processContainerDelete(e containerEvent.Delete) bool {
|
||||
if !cp.alphabetState.IsAlphabet() {
|
||||
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
err := cp.checkDeleteContainer(e)
|
||||
|
@ -111,10 +110,18 @@ func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
cp.approveDeleteContainer(e)
|
||||
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
|
||||
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
|
||||
|
@ -149,14 +156,6 @@ func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cp *Processor) approveDeleteContainer(e containerEvent.Delete) {
|
||||
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
|
||||
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error {
|
||||
// fetch domain info
|
||||
ctx.d = containerSDK.ReadDomain(cnr)
|
||||
|
|
|
@ -12,10 +12,10 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
|
||||
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) bool {
|
||||
if !cp.alphabetState.IsAlphabet() {
|
||||
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
err := cp.checkSetEACL(e)
|
||||
|
@ -24,10 +24,17 @@ func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
cp.approveSetEACL(e)
|
||||
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
|
||||
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
|
||||
|
@ -73,11 +80,3 @@ func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cp *Processor) approveSetEACL(e containerEvent.SetEACL) {
|
||||
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
|
||||
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
|
||||
|
@ -40,6 +41,7 @@ type (
|
|||
// Processor of events produced by container contract in the sidechain.
|
||||
Processor struct {
|
||||
log *logger.Logger
|
||||
metrics metrics.Register
|
||||
pool *ants.Pool
|
||||
alphabetState AlphabetState
|
||||
cnrClient ContClient // notary must be enabled
|
||||
|
@ -51,6 +53,7 @@ type (
|
|||
// Params of the processor constructor.
|
||||
Params struct {
|
||||
Log *logger.Logger
|
||||
Metrics metrics.Register
|
||||
PoolSize int
|
||||
AlphabetState AlphabetState
|
||||
ContainerClient ContClient
|
||||
|
@ -102,8 +105,14 @@ func New(p *Params) (*Processor, error) {
|
|||
return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err)
|
||||
}
|
||||
|
||||
metricsRegister := p.Metrics
|
||||
if metricsRegister == nil {
|
||||
metricsRegister = metrics.DefaultRegister{}
|
||||
}
|
||||
|
||||
return &Processor{
|
||||
log: p.Log,
|
||||
metrics: metricsRegister,
|
||||
pool: pool,
|
||||
alphabetState: p.AlphabetState,
|
||||
cnrClient: p.ContainerClient,
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/hex"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs"
|
||||
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||
|
@ -18,7 +19,9 @@ func (np *Processor) handleDeposit(ev event.Event) {
|
|||
|
||||
// send event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() { np.processDeposit(deposit) })
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_deposit", func() bool {
|
||||
return np.processDeposit(deposit)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
|
@ -34,7 +37,9 @@ func (np *Processor) handleWithdraw(ev event.Event) {
|
|||
|
||||
// send event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() { np.processWithdraw(withdraw) })
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_withdraw", func() bool {
|
||||
return np.processWithdraw(withdraw)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
|
@ -50,7 +55,9 @@ func (np *Processor) handleCheque(ev event.Event) {
|
|||
|
||||
// send event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() { np.processCheque(cheque) })
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_cheque", func() bool {
|
||||
return np.processCheque(cheque)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
|
@ -67,7 +74,9 @@ func (np *Processor) handleConfig(ev event.Event) {
|
|||
|
||||
// send event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() { np.processConfig(cfg) })
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_config", func() bool {
|
||||
return np.processConfig(cfg)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
|
@ -83,7 +92,9 @@ func (np *Processor) handleBind(ev event.Event) {
|
|||
|
||||
// send event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() { np.processBind(e, true) })
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_bind", func() bool {
|
||||
return np.processBind(e, true)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
|
@ -99,7 +110,9 @@ func (np *Processor) handleUnbind(ev event.Event) {
|
|||
|
||||
// send event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() { np.processBind(e, false) })
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_unbind", func() bool {
|
||||
return np.processBind(e, false)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
|
||||
|
|
|
@ -15,10 +15,10 @@ const (
|
|||
|
||||
// Process deposit event by invoking a balance contract and sending native
|
||||
// gas in the sidechain.
|
||||
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
||||
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
prm := balance.MintPrm{}
|
||||
|
@ -49,7 +49,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
|||
zap.Uint64("last_emission", val),
|
||||
zap.Uint64("current_epoch", curEpoch))
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// get gas balance of the node
|
||||
|
@ -57,7 +57,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
|||
balance, err := np.morphClient.GasBalance()
|
||||
if err != nil {
|
||||
np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err))
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
if balance < np.gasBalanceThreshold {
|
||||
|
@ -65,7 +65,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
|||
zap.Int64("balance", balance),
|
||||
zap.Int64("threshold", np.gasBalanceThreshold))
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
err = np.morphClient.TransferGas(receiver, np.mintEmitValue)
|
||||
|
@ -73,24 +73,26 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
|
|||
np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver,
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
np.mintEmitCache.Add(receiver.String(), curEpoch)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Process withdraw event by locking assets in the balance account.
|
||||
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
|
||||
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
// create lock account
|
||||
lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size])
|
||||
if err != nil {
|
||||
np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err))
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
curEpoch := np.epochState.EpochCounter()
|
||||
|
@ -106,15 +108,18 @@ func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
|
|||
err = np.balanceClient.Lock(prm)
|
||||
if err != nil {
|
||||
np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Process cheque event by transferring assets from the lock account back to
|
||||
// the reserve account.
|
||||
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
|
||||
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
prm := balance.BurnPrm{}
|
||||
|
@ -126,5 +131,8 @@ func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
|
|||
err := np.balanceClient.Burn(prm)
|
||||
if err != nil {
|
||||
np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -18,10 +18,10 @@ type bindCommon interface {
|
|||
TxHash() util.Uint256
|
||||
}
|
||||
|
||||
func (np *Processor) processBind(e bindCommon, bind bool) {
|
||||
func (np *Processor) processBind(e bindCommon, bind bool) bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
c := &bindCommonContext{
|
||||
|
@ -36,10 +36,10 @@ func (np *Processor) processBind(e bindCommon, bind bool) {
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
np.approveBindCommon(c)
|
||||
return np.approveBindCommon(c) == nil
|
||||
}
|
||||
|
||||
type bindCommonContext struct {
|
||||
|
@ -70,7 +70,7 @@ func (np *Processor) checkBindCommon(e *bindCommonContext) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (np *Processor) approveBindCommon(e *bindCommonContext) {
|
||||
func (np *Processor) approveBindCommon(e *bindCommonContext) error {
|
||||
// calculate wallet address
|
||||
scriptHash := e.User()
|
||||
|
||||
|
@ -80,7 +80,7 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
var id user.ID
|
||||
|
@ -104,4 +104,6 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
|
|||
np.log.Error(fmt.Sprintf("could not approve %s", typ),
|
||||
zap.String("error", err.Error()))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -9,10 +9,10 @@ import (
|
|||
|
||||
// Process config event by setting configuration value from the mainchain in
|
||||
// the sidechain.
|
||||
func (np *Processor) processConfig(config frostfsEvent.Config) {
|
||||
func (np *Processor) processConfig(config frostfsEvent.Config) bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
prm := nmClient.SetConfigPrm{}
|
||||
|
@ -25,5 +25,8 @@ func (np *Processor) processConfig(config frostfsEvent.Config) {
|
|||
err := np.netmapClient.SetConfig(prm)
|
||||
if err != nil {
|
||||
np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
|
||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
|
@ -58,6 +59,7 @@ type (
|
|||
// Processor of events produced by frostfs contract in main net.
|
||||
Processor struct {
|
||||
log *logger.Logger
|
||||
metrics metrics.Register
|
||||
pool *ants.Pool
|
||||
frostfsContract util.Uint160
|
||||
balanceClient BalanceClient
|
||||
|
@ -77,6 +79,7 @@ type (
|
|||
// Params of the processor constructor.
|
||||
Params struct {
|
||||
Log *logger.Logger
|
||||
Metrics metrics.Register
|
||||
PoolSize int
|
||||
FrostFSContract util.Uint160
|
||||
FrostFSIDClient IDClient
|
||||
|
@ -129,8 +132,14 @@ func New(p *Params) (*Processor, error) {
|
|||
return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err)
|
||||
}
|
||||
|
||||
metricsRegister := p.Metrics
|
||||
if metricsRegister == nil {
|
||||
metricsRegister = metrics.DefaultRegister{}
|
||||
}
|
||||
|
||||
return &Processor{
|
||||
log: p.Log,
|
||||
metrics: metricsRegister,
|
||||
pool: pool,
|
||||
frostfsContract: p.FrostFSContract,
|
||||
balanceClient: p.BalanceClient,
|
||||
|
|
|
@ -2,6 +2,7 @@ package governance
|
|||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/native"
|
||||
|
@ -35,7 +36,9 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
|
|||
|
||||
// send event to the worker pool
|
||||
|
||||
err := gp.pool.Submit(func() { gp.processAlphabetSync(hash) })
|
||||
err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
|
||||
return gp.processAlphabetSync(hash)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained,
|
||||
|
|
|
@ -18,36 +18,36 @@ const (
|
|||
alphabetUpdateIDPrefix = "AlphabetUpdate"
|
||||
)
|
||||
|
||||
func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
|
||||
func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
|
||||
if !gp.alphabetState.IsAlphabet() {
|
||||
gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
|
||||
if err != nil {
|
||||
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet,
|
||||
zap.String("error", err.Error()))
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
sidechainAlphabet, err := gp.morphClient.Committee()
|
||||
if err != nil {
|
||||
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain,
|
||||
zap.String("error", err.Error()))
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
|
||||
if err != nil {
|
||||
gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
|
||||
zap.String("error", err.Error()))
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
if newAlphabet == nil {
|
||||
gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
|
||||
|
@ -77,6 +77,8 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
|
|||
gp.updateFrostFSContractInMainnet(newAlphabet)
|
||||
|
||||
gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func prettyKeys(keys keys.PublicKeys) string {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
|
||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
|
@ -75,6 +76,7 @@ type (
|
|||
// Processor of events related to governance in the network.
|
||||
Processor struct {
|
||||
log *logger.Logger
|
||||
metrics metrics.Register
|
||||
pool *ants.Pool
|
||||
frostfsClient FrostFSClient
|
||||
netmapClient NetmapClient
|
||||
|
@ -93,6 +95,7 @@ type (
|
|||
// Params of the processor constructor.
|
||||
Params struct {
|
||||
Log *logger.Logger
|
||||
Metrics metrics.Register
|
||||
|
||||
AlphabetState AlphabetState
|
||||
EpochState EpochState
|
||||
|
@ -130,11 +133,17 @@ func New(p *Params) (*Processor, error) {
|
|||
return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err)
|
||||
}
|
||||
|
||||
metricsRegister := p.Metrics
|
||||
if metricsRegister == nil {
|
||||
metricsRegister = metrics.DefaultRegister{}
|
||||
}
|
||||
|
||||
// result is cached by neo-go, so we can pre-calc it
|
||||
designate := p.MainnetClient.GetDesignateHash()
|
||||
|
||||
return &Processor{
|
||||
log: p.Log,
|
||||
metrics: metricsRegister,
|
||||
pool: pool,
|
||||
frostfsClient: p.FrostFSClient,
|
||||
netmapClient: p.NetmapClient,
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/hex"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
|
||||
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
|
||||
|
@ -16,7 +17,7 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
|
|||
|
||||
// send an event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() { np.processNewEpochTick() })
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick)
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
np.log.Warn(logs.NetmapNetmapWorkerPoolDrained,
|
||||
|
@ -32,8 +33,8 @@ func (np *Processor) handleNewEpoch(ev event.Event) {
|
|||
|
||||
// send an event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() {
|
||||
np.processNewEpoch(epochEvent)
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
|
||||
return np.processNewEpoch(epochEvent)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
|
@ -51,8 +52,8 @@ func (np *Processor) handleAddPeer(ev event.Event) {
|
|||
|
||||
// send an event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() {
|
||||
np.processAddPeer(newPeer)
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_add_peer", func() bool {
|
||||
return np.processAddPeer(newPeer)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
|
@ -69,8 +70,8 @@ func (np *Processor) handleUpdateState(ev event.Event) {
|
|||
|
||||
// send event to the worker pool
|
||||
|
||||
err := np.pool.Submit(func() {
|
||||
np.processUpdatePeer(updPeer)
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_update_peer", func() bool {
|
||||
return np.processUpdatePeer(updPeer)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
|
@ -91,8 +92,8 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
|
|||
np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner"))
|
||||
|
||||
// send event to the worker pool
|
||||
err := np.pool.Submit(func() {
|
||||
np.processNetmapCleanupTick(cleanup)
|
||||
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
|
||||
return np.processNetmapCleanupTick(cleanup)
|
||||
})
|
||||
if err != nil {
|
||||
// there system can be moved into controlled degradation stage
|
||||
|
|
|
@ -7,11 +7,11 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
|
||||
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick)
|
||||
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error {
|
||||
|
@ -47,5 +47,8 @@ func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
|
|||
if err != nil {
|
||||
np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache,
|
||||
zap.String("error", err.Error()))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
// Process new epoch notification by setting global epoch value and resetting
|
||||
// local epoch timer.
|
||||
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
|
||||
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
|
||||
epoch := ev.EpochNumber()
|
||||
|
||||
epochDuration, err := np.netmapClient.EpochDuration()
|
||||
|
@ -41,7 +41,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
|
|||
np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
|
||||
zap.String("error", err.Error()))
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
prm := cntClient.StartEstimationPrm{}
|
||||
|
@ -63,13 +63,15 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
|
|||
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
|
||||
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
|
||||
np.handleNotaryDeposit(ev)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Process new epoch tick by invoking new epoch method in network map contract.
|
||||
func (np *Processor) processNewEpochTick() {
|
||||
func (np *Processor) processNewEpochTick() bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
nextEpoch := np.epochState.EpochCounter() + 1
|
||||
|
@ -78,5 +80,8 @@ func (np *Processor) processNewEpochTick() {
|
|||
err := np.netmapClient.NewEpoch(nextEpoch, false)
|
||||
if err != nil {
|
||||
np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -12,10 +12,10 @@ import (
|
|||
|
||||
// Process add peer notification by sanity check of new node
|
||||
// local epoch timer.
|
||||
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
||||
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
// check if notary transaction is valid, see #976
|
||||
|
@ -26,7 +26,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
|||
zap.String("method", "netmap.AddPeer"),
|
||||
zap.String("hash", tx.Hash().StringLE()),
|
||||
zap.Error(err))
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// unmarshal node info
|
||||
|
@ -34,7 +34,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
|||
if err := nodeInfo.Unmarshal(ev.Node()); err != nil {
|
||||
// it will be nice to have tx id at event structure to log it
|
||||
np.log.Warn(logs.NetmapCantParseNetworkMapCandidate)
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// validate and update node info
|
||||
|
@ -44,7 +44,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
|||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
// sort attributes to make it consistent
|
||||
|
@ -81,15 +81,18 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
|
|||
|
||||
if err != nil {
|
||||
np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Process update peer notification by sending approval tx to the smart contract.
|
||||
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
|
||||
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) bool {
|
||||
if !np.alphabetState.IsAlphabet() {
|
||||
np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
// flag node to remove from local view, so it can be re-bootstrapped
|
||||
|
@ -105,11 +108,14 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
|
|||
zap.Error(err),
|
||||
)
|
||||
|
||||
return
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil {
|
||||
np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
|
||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
|
@ -72,6 +73,7 @@ type (
|
|||
// and new epoch ticker, because it is related to contract.
|
||||
Processor struct {
|
||||
log *logger.Logger
|
||||
metrics metrics.Register
|
||||
pool *ants.Pool
|
||||
epochTimer EpochTimerReseter
|
||||
epochState EpochState
|
||||
|
@ -93,6 +95,7 @@ type (
|
|||
// Params of the processor constructor.
|
||||
Params struct {
|
||||
Log *logger.Logger
|
||||
Metrics metrics.Register
|
||||
PoolSize int
|
||||
NetmapClient Client
|
||||
EpochTimer EpochTimerReseter
|
||||
|
@ -145,8 +148,14 @@ func New(p *Params) (*Processor, error) {
|
|||
return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err)
|
||||
}
|
||||
|
||||
metricsRegister := p.Metrics
|
||||
if metricsRegister == nil {
|
||||
metricsRegister = metrics.DefaultRegister{}
|
||||
}
|
||||
|
||||
return &Processor{
|
||||
log: p.Log,
|
||||
metrics: metricsRegister,
|
||||
pool: pool,
|
||||
epochTimer: p.EpochTimer,
|
||||
epochState: p.EpochState,
|
||||
|
|
16
pkg/innerring/processors/util.go
Normal file
16
pkg/innerring/processors/util.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package processors
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
)
|
||||
|
||||
func SubmitEvent(pool *ants.Pool, metrics metrics.Register, eventLabel string, eventProcessor func() bool) error {
|
||||
return pool.Submit(func() {
|
||||
start := time.Now()
|
||||
|
||||
success := eventProcessor()
|
||||
metrics.AddEvent(time.Since(start), eventLabel, success)
|
||||
})
|
||||
}
|
|
@ -1,13 +1,23 @@
|
|||
package metrics
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
const innerRingSubsystem = "ir"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
innerRingSubsystem = "ir"
|
||||
innerRingLabelSuccess = "success"
|
||||
innerRingLabelType = "type"
|
||||
)
|
||||
|
||||
// InnerRingServiceMetrics contains metrics collected by inner ring.
|
||||
type InnerRingServiceMetrics struct {
|
||||
epoch metric[prometheus.Gauge]
|
||||
health metric[prometheus.Gauge]
|
||||
eventDuration metric[*prometheus.HistogramVec]
|
||||
}
|
||||
|
||||
// NewInnerRingMetrics returns new instance of metrics collectors for inner ring.
|
||||
|
@ -25,14 +35,22 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
|
|||
Name: "health",
|
||||
Help: "Current inner-ring node state.",
|
||||
})
|
||||
eventDuration = newHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: innerRingSubsystem,
|
||||
Name: "event_duration_seconds",
|
||||
Help: "Duration of processing of inner-ring events",
|
||||
}, []string{innerRingLabelType, innerRingLabelSuccess})
|
||||
)
|
||||
|
||||
mustRegister(epoch)
|
||||
mustRegister(health)
|
||||
mustRegister(eventDuration)
|
||||
|
||||
return &InnerRingServiceMetrics{
|
||||
epoch: epoch,
|
||||
health: health,
|
||||
eventDuration: eventDuration,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,3 +63,10 @@ func (m InnerRingServiceMetrics) SetEpoch(epoch uint64) {
|
|||
func (m InnerRingServiceMetrics) SetHealth(s int32) {
|
||||
m.health.value.Set(float64(s))
|
||||
}
|
||||
|
||||
func (m InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success bool) {
|
||||
m.eventDuration.value.With(prometheus.Labels{
|
||||
innerRingLabelType: typ,
|
||||
innerRingLabelSuccess: strconv.FormatBool(success),
|
||||
}).Observe(d.Seconds())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue
Why did you decide to have
start
here and not before we submit the task in pool?Just asking.
well, because my intention was to measure the actual event processor duration,
excluding whatever wait time it spends on the pool. The pool wait time is statistically unrelated to the event processor time for specific event types, and it can be measured separately if you think it's interesting.