[#374] Add inner-ring event metrics #405

Merged
fyrchik merged 1 commit from ale64bit/frostfs-node:feature/374-inner-ring-metrics into master 2023-05-31 10:11:49 +00:00
27 changed files with 287 additions and 113 deletions

View file

@ -52,6 +52,7 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
s.netmapProcessor, err = netmap.New(&netmap.Params{
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.netmap"),
NetmapClient: netmap.NewNetmapClient(s.netmapClient),
EpochTimer: s,
@ -162,6 +163,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
// create governance processor
governanceProcessor, err := governance.New(&governance.Params{
Log: s.log,
Metrics: s.metrics,
FrostFSClient: frostfsCli,
NetmapClient: s.netmapClient,
AlphabetState: s,
@ -231,6 +233,7 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets,
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: s.contracts.alphabet,
NetmapClient: s.netmapClient,
@ -255,6 +258,7 @@ func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.C
// container processor
containerProcessor, err := cont.New(&cont.Params{
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.container"),
AlphabetState: s,
ContainerClient: cnrClient,
@ -273,6 +277,7 @@ func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClien
// create balance processor
balanceProcessor, err := balance.New(&balance.Params{
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.balance"),
FrostFSClient: frostfsCli,
BalanceSC: s.contracts.balance,
@ -293,6 +298,7 @@ func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *
frostfsProcessor, err := frostfs.New(&frostfs.Params{
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.frostfs"),
FrostFSContract: s.contracts.frostfs,
FrostFSIDClient: frostfsIDClient,

View file

@ -0,0 +1,15 @@
package metrics
import "time"
type Register interface {
SetEpoch(epoch uint64)
SetHealth(s int32)
AddEvent(d time.Duration, typ string, success bool)
}
type DefaultRegister struct{}
func (DefaultRegister) SetEpoch(uint64) {}
func (DefaultRegister) SetHealth(int32) {}
func (DefaultRegister) AddEvent(time.Duration, string, bool) {}

View file

@ -2,6 +2,7 @@ package alphabet
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"go.uber.org/zap"
@ -13,7 +14,7 @@ func (ap *Processor) HandleGasEmission(ev event.Event) {
// send event to the worker pool
err := ap.pool.Submit(func() { ap.processEmit() })
err := processors.SubmitEvent(ap.pool, ap.metrics, "alphabet_emit_gas", ap.processEmit)
if err != nil {
// there system can be moved into controlled degradation stage
ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained,

View file

@ -13,12 +13,12 @@ import (
const emitMethod = "emit"
func (ap *Processor) processEmit() {
func (ap *Processor) processEmit() bool {
index := ap.irList.AlphabetIndex()
if index < 0 {
ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent)
return
return true
}
contract, ok := ap.alphabetContracts.GetByIndex(index)
@ -26,7 +26,7 @@ func (ap *Processor) processEmit() {
ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent,
zap.Int("index", index))
return
return false
}
// there is no signature collecting, so we don't need extra fee
@ -34,13 +34,13 @@ func (ap *Processor) processEmit() {
if err != nil {
ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error()))
return
return false
}
if ap.storageEmission == 0 {
ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff)
return
return true
}
networkMap, err := ap.netmapClient.NetMap()
@ -48,7 +48,7 @@ func (ap *Processor) processEmit() {
ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes,
zap.String("error", err.Error()))
return
return false
}
nmNodes := networkMap.Nodes()
@ -63,7 +63,7 @@ func (ap *Processor) processEmit() {
zap.Int("extra_wallets", extraLen))
if nmLen+extraLen == 0 {
return
return true
}
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen))
@ -71,6 +71,8 @@ func (ap *Processor) processEmit() {
ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
ap.transferGasToExtraNodes(pw, gasPerNode)
return true
}
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {

View file

@ -7,6 +7,7 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -50,6 +51,7 @@ type (
// protects parsedWallets from concurrent change
pwLock *sync.RWMutex
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
alphabetContracts Contracts
netmapClient netmapClient
@ -62,6 +64,7 @@ type (
Params struct {
ParsedWallets []util.Uint160
Log *logger.Logger
Metrics metrics.Register
PoolSize int
AlphabetContracts Contracts
NetmapClient netmapClient
@ -89,10 +92,16 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
parsedWallets: p.ParsedWallets,
pwLock: new(sync.RWMutex),
log: p.Log,
metrics: metricsRegister,
pool: pool,
alphabetContracts: p.AlphabetContracts,
netmapClient: p.NetmapClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
"go.uber.org/zap"
@ -17,7 +18,9 @@ func (bp *Processor) handleLock(ev event.Event) {
// send an event to the worker pool
err := bp.pool.Submit(func() { bp.processLock(&lock) })
err := processors.SubmitEvent(bp.pool, bp.metrics, "lock", func() bool {
return bp.processLock(&lock)
})
if err != nil {
// there system can be moved into controlled degradation stage
bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained,

View file

@ -9,10 +9,10 @@ import (
// Process lock event by invoking Cheque method in main net to send assets
// back to the withdraw issuer.
func (bp *Processor) processLock(lock *balanceEvent.Lock) {
func (bp *Processor) processLock(lock *balanceEvent.Lock) bool {
if !bp.alphabetState.IsAlphabet() {
bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock)
return
return true
}
prm := frostfsContract.ChequePrm{}
@ -26,5 +26,8 @@ func (bp *Processor) processLock(lock *balanceEvent.Lock) {
err := bp.frostfsClient.Cheque(prm)
if err != nil {
bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err))
return false
}
return true
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
@ -32,6 +33,7 @@ type (
// Processor of events produced by balance contract in the morphchain.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
frostfsClient FrostFSClient
balanceSC util.Uint160
@ -42,6 +44,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
PoolSize int
FrostFSClient FrostFSClient
BalanceSC util.Uint160
@ -72,8 +75,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
frostfsClient: p.FrostFSClient,
balanceSC: p.BalanceSC,

View file

@ -4,6 +4,7 @@ import (
"crypto/sha256"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
"github.com/mr-tron/base58"
@ -20,7 +21,9 @@ func (cp *Processor) handlePut(ev event.Event) {
// send an event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerPut(put) })
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_put", func() bool {
return cp.processContainerPut(put)
})
if err != nil {
// there system can be moved into controlled degradation stage
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
@ -36,7 +39,9 @@ func (cp *Processor) handleDelete(ev event.Event) {
// send an event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerDelete(del) })
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_delete", func() bool {
return cp.processContainerDelete(del)
})
if err != nil {
// there system can be moved into controlled degradation stage
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
@ -53,8 +58,8 @@ func (cp *Processor) handleSetEACL(ev event.Event) {
// send an event to the worker pool
err := cp.pool.Submit(func() {
cp.processSetEACL(e)
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_set_eacl", func() bool {
return cp.processSetEACL(e)
})
if err != nil {
// there system can be moved into controlled degradation stage

View file

@ -31,10 +31,10 @@ type putContainerContext struct {
// Process a new container from the user by checking the container sanity
// and sending approve tx back to the morph.
func (cp *Processor) processContainerPut(put putEvent) {
func (cp *Processor) processContainerPut(put putEvent) bool {
if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut)
return
return true
}
ctx := &putContainerContext{
@ -47,10 +47,17 @@ func (cp *Processor) processContainerPut(put putEvent) {
zap.String("error", err.Error()),
)
return
return false
}
cp.approvePutContainer(ctx)
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
zap.String("error", err.Error()),
)
return false
}
return true
}
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
@ -89,20 +96,12 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
return nil
}
func (cp *Processor) approvePutContainer(ctx *putContainerContext) {
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
zap.String("error", err.Error()),
)
}
}
// Process delete container operation from the user by checking container sanity
// and sending approve tx back to morph.
func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
func (cp *Processor) processContainerDelete(e containerEvent.Delete) bool {
if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete)
return
return true
}
err := cp.checkDeleteContainer(e)
@ -111,10 +110,18 @@ func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
zap.String("error", err.Error()),
)
return
return false
}
cp.approveDeleteContainer(e)
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
zap.String("error", err.Error()),
)
return false
}
return true
}
func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
@ -149,14 +156,6 @@ func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
return nil
}
func (cp *Processor) approveDeleteContainer(e containerEvent.Delete) {
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
zap.String("error", err.Error()),
)
}
}
func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error {
// fetch domain info
ctx.d = containerSDK.ReadDomain(cnr)

View file

@ -12,10 +12,10 @@ import (
"go.uber.org/zap"
)
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) bool {
if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL)
return
return true
}
err := cp.checkSetEACL(e)
@ -24,10 +24,17 @@ func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
zap.String("error", err.Error()),
)
return
return false
}
cp.approveSetEACL(e)
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
zap.String("error", err.Error()),
)
return false
}
return true
}
func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
@ -73,11 +80,3 @@ func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
return nil
}
func (cp *Processor) approveSetEACL(e containerEvent.SetEACL) {
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
zap.String("error", err.Error()),
)
}
}

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
@ -40,6 +41,7 @@ type (
// Processor of events produced by container contract in the sidechain.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
alphabetState AlphabetState
cnrClient ContClient // notary must be enabled
@ -51,6 +53,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
PoolSize int
AlphabetState AlphabetState
ContainerClient ContClient
@ -102,8 +105,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
alphabetState: p.AlphabetState,
cnrClient: p.ContainerClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
@ -18,7 +19,9 @@ func (np *Processor) handleDeposit(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processDeposit(deposit) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_deposit", func() bool {
return np.processDeposit(deposit)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -34,7 +37,9 @@ func (np *Processor) handleWithdraw(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processWithdraw(withdraw) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_withdraw", func() bool {
return np.processWithdraw(withdraw)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -50,7 +55,9 @@ func (np *Processor) handleCheque(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processCheque(cheque) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_cheque", func() bool {
return np.processCheque(cheque)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -67,7 +74,9 @@ func (np *Processor) handleConfig(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processConfig(cfg) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_config", func() bool {
return np.processConfig(cfg)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -83,7 +92,9 @@ func (np *Processor) handleBind(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processBind(e, true) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_bind", func() bool {
return np.processBind(e, true)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -99,7 +110,9 @@ func (np *Processor) handleUnbind(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processBind(e, false) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_unbind", func() bool {
return np.processBind(e, false)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,

View file

@ -15,10 +15,10 @@ const (
// Process deposit event by invoking a balance contract and sending native
// gas in the sidechain.
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit)
return
return true
}
prm := balance.MintPrm{}
@ -49,7 +49,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
zap.Uint64("last_emission", val),
zap.Uint64("current_epoch", curEpoch))
return
return false
}
// get gas balance of the node
@ -57,7 +57,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
balance, err := np.morphClient.GasBalance()
if err != nil {
np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err))
return
return false
}
if balance < np.gasBalanceThreshold {
@ -65,7 +65,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
zap.Int64("balance", balance),
zap.Int64("threshold", np.gasBalanceThreshold))
return
return false
}
err = np.morphClient.TransferGas(receiver, np.mintEmitValue)
@ -73,24 +73,26 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver,
zap.String("error", err.Error()))
return
return false
}
np.mintEmitCache.Add(receiver.String(), curEpoch)
return true
}
// Process withdraw event by locking assets in the balance account.
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw)
return
return true
}
// create lock account
lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size])
if err != nil {
np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err))
return
return false
}
curEpoch := np.epochState.EpochCounter()
@ -106,15 +108,18 @@ func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
err = np.balanceClient.Lock(prm)
if err != nil {
np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err))
return false
}
return true
}
// Process cheque event by transferring assets from the lock account back to
// the reserve account.
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque)
return
return true
}
prm := balance.BurnPrm{}
@ -126,5 +131,8 @@ func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
err := np.balanceClient.Burn(prm)
if err != nil {
np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err))
return false
}
return true
}

View file

@ -18,10 +18,10 @@ type bindCommon interface {
TxHash() util.Uint256
}
func (np *Processor) processBind(e bindCommon, bind bool) {
func (np *Processor) processBind(e bindCommon, bind bool) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind)
return
return true
}
c := &bindCommonContext{
@ -36,10 +36,10 @@ func (np *Processor) processBind(e bindCommon, bind bool) {
zap.String("error", err.Error()),
)
return
return false
}
np.approveBindCommon(c)
return np.approveBindCommon(c) == nil
}
type bindCommonContext struct {
@ -70,7 +70,7 @@ func (np *Processor) checkBindCommon(e *bindCommonContext) error {
return nil
}
func (np *Processor) approveBindCommon(e *bindCommonContext) {
func (np *Processor) approveBindCommon(e *bindCommonContext) error {
// calculate wallet address
scriptHash := e.User()
@ -80,7 +80,7 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
zap.String("error", err.Error()),
)
return
return err
}
var id user.ID
@ -104,4 +104,6 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
np.log.Error(fmt.Sprintf("could not approve %s", typ),
zap.String("error", err.Error()))
}
return err
}

View file

@ -9,10 +9,10 @@ import (
// Process config event by setting configuration value from the mainchain in
// the sidechain.
func (np *Processor) processConfig(config frostfsEvent.Config) {
func (np *Processor) processConfig(config frostfsEvent.Config) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig)
return
return true
}
prm := nmClient.SetConfigPrm{}
@ -25,5 +25,8 @@ func (np *Processor) processConfig(config frostfsEvent.Config) {
err := np.netmapClient.SetConfig(prm)
if err != nil {
np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err))
return false
}
return true
}

View file

@ -6,6 +6,7 @@ import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
@ -58,6 +59,7 @@ type (
// Processor of events produced by frostfs contract in main net.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
frostfsContract util.Uint160
balanceClient BalanceClient
@ -77,6 +79,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
PoolSize int
FrostFSContract util.Uint160
FrostFSIDClient IDClient
@ -129,8 +132,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
frostfsContract: p.FrostFSContract,
balanceClient: p.BalanceClient,

View file

@ -2,6 +2,7 @@ package governance
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement"
"github.com/nspcc-dev/neo-go/pkg/core/native"
@ -35,7 +36,9 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
// send event to the worker pool
err := gp.pool.Submit(func() { gp.processAlphabetSync(hash) })
err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
return gp.processAlphabetSync(hash)
})
if err != nil {
// there system can be moved into controlled degradation stage
gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained,

View file

@ -18,36 +18,36 @@ const (
alphabetUpdateIDPrefix = "AlphabetUpdate"
)
func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
if !gp.alphabetState.IsAlphabet() {
gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
return
return true
}
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
if err != nil {
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet,
zap.String("error", err.Error()))
return
return false
}
sidechainAlphabet, err := gp.morphClient.Committee()
if err != nil {
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain,
zap.String("error", err.Error()))
return
return false
}
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
if err != nil {
gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
zap.String("error", err.Error()))
return
return false
}
if newAlphabet == nil {
gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
return
return true
}
gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
@ -77,6 +77,8 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
gp.updateFrostFSContractInMainnet(newAlphabet)
gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate)
return true
}
func prettyKeys(keys keys.PublicKeys) string {

View file

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
@ -75,6 +76,7 @@ type (
// Processor of events related to governance in the network.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
frostfsClient FrostFSClient
netmapClient NetmapClient
@ -93,6 +95,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
AlphabetState AlphabetState
EpochState EpochState
@ -130,11 +133,17 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
// result is cached by neo-go, so we can pre-calc it
designate := p.MainnetClient.GetDesignateHash()
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
frostfsClient: p.FrostFSClient,
netmapClient: p.NetmapClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
@ -16,7 +17,7 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
// send an event to the worker pool
err := np.pool.Submit(func() { np.processNewEpochTick() })
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick)
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.NetmapNetmapWorkerPoolDrained,
@ -32,8 +33,8 @@ func (np *Processor) handleNewEpoch(ev event.Event) {
// send an event to the worker pool
err := np.pool.Submit(func() {
np.processNewEpoch(epochEvent)
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
return np.processNewEpoch(epochEvent)
})
if err != nil {
// there system can be moved into controlled degradation stage
@ -51,8 +52,8 @@ func (np *Processor) handleAddPeer(ev event.Event) {
// send an event to the worker pool
err := np.pool.Submit(func() {
np.processAddPeer(newPeer)
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_add_peer", func() bool {
return np.processAddPeer(newPeer)
})
if err != nil {
// there system can be moved into controlled degradation stage
@ -69,8 +70,8 @@ func (np *Processor) handleUpdateState(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() {
np.processUpdatePeer(updPeer)
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_update_peer", func() bool {
return np.processUpdatePeer(updPeer)
})
if err != nil {
// there system can be moved into controlled degradation stage
@ -91,8 +92,8 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner"))
// send event to the worker pool
err := np.pool.Submit(func() {
np.processNetmapCleanupTick(cleanup)
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
return np.processNetmapCleanupTick(cleanup)
})
if err != nil {
// there system can be moved into controlled degradation stage

View file

@ -7,11 +7,11 @@ import (
"go.uber.org/zap"
)
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick)
return
return true
}
err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error {
@ -47,5 +47,8 @@ func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
if err != nil {
np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache,
zap.String("error", err.Error()))
return false
}
return true
}

View file

@ -10,7 +10,7 @@ import (
// Process new epoch notification by setting global epoch value and resetting
// local epoch timer.
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
epoch := ev.EpochNumber()
epochDuration, err := np.netmapClient.EpochDuration()
@ -41,7 +41,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
zap.String("error", err.Error()))
return
return false
}
prm := cntClient.StartEstimationPrm{}
@ -63,13 +63,15 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
np.handleNotaryDeposit(ev)
return true
}
// Process new epoch tick by invoking new epoch method in network map contract.
func (np *Processor) processNewEpochTick() {
func (np *Processor) processNewEpochTick() bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
return
return true
}
nextEpoch := np.epochState.EpochCounter() + 1
@ -78,5 +80,8 @@ func (np *Processor) processNewEpochTick() {
err := np.netmapClient.NewEpoch(nextEpoch, false)
if err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
return false
}
return true
}

View file

@ -12,10 +12,10 @@ import (
// Process add peer notification by sanity check of new node
// local epoch timer.
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification)
return
return true
}
// check if notary transaction is valid, see #976
@ -26,7 +26,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
zap.String("method", "netmap.AddPeer"),
zap.String("hash", tx.Hash().StringLE()),
zap.Error(err))
return
return false
}
// unmarshal node info
@ -34,7 +34,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
if err := nodeInfo.Unmarshal(ev.Node()); err != nil {
// it will be nice to have tx id at event structure to log it
np.log.Warn(logs.NetmapCantParseNetworkMapCandidate)
return
return false
}
// validate and update node info
@ -44,7 +44,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
zap.String("error", err.Error()),
)
return
return false
}
// sort attributes to make it consistent
@ -81,15 +81,18 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
if err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err))
return false
}
}
return true
}
// Process update peer notification by sending approval tx to the smart contract.
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification)
return
return true
}
// flag node to remove from local view, so it can be re-bootstrapped
@ -105,11 +108,14 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
zap.Error(err),
)
return
return false
}
}
if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err))
return false
}
return true
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
@ -72,6 +73,7 @@ type (
// and new epoch ticker, because it is related to contract.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
epochTimer EpochTimerReseter
epochState EpochState
@ -93,6 +95,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
PoolSize int
NetmapClient Client
EpochTimer EpochTimerReseter
@ -145,8 +148,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
epochTimer: p.EpochTimer,
epochState: p.EpochState,

View file

@ -0,0 +1,16 @@
package processors
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"github.com/panjf2000/ants/v2"
)
func SubmitEvent(pool *ants.Pool, metrics metrics.Register, eventLabel string, eventProcessor func() bool) error {
return pool.Submit(func() {
start := time.Now()
Review

Why did you decide to have start here and not before we submit the task in pool?
Just asking.

Why did you decide to have `start` here and not _before_ we submit the task in pool? Just asking.
Review

well, because my intention was to measure the actual event processor duration,
excluding whatever wait time it spends on the pool. The pool wait time is statistically unrelated to the event processor time for specific event types, and it can be measured separately if you think it's interesting.

well, because my intention was to measure the actual event processor duration, excluding whatever wait time it spends on the pool. The pool wait time is statistically unrelated to the event processor time for specific event types, and it can be measured separately if you think it's interesting.
success := eventProcessor()
metrics.AddEvent(time.Since(start), eventLabel, success)
})
}

View file

@ -1,13 +1,23 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
import (
"strconv"
"time"
const innerRingSubsystem = "ir"
"github.com/prometheus/client_golang/prometheus"
)
const (
innerRingSubsystem = "ir"
innerRingLabelSuccess = "success"
innerRingLabelType = "type"
)
// InnerRingServiceMetrics contains metrics collected by inner ring.
type InnerRingServiceMetrics struct {
epoch metric[prometheus.Gauge]
health metric[prometheus.Gauge]
eventDuration metric[*prometheus.HistogramVec]
}
// NewInnerRingMetrics returns new instance of metrics collectors for inner ring.
@ -25,14 +35,22 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
Name: "health",
Help: "Current inner-ring node state.",
})
eventDuration = newHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: innerRingSubsystem,
Name: "event_duration_seconds",
Help: "Duration of processing of inner-ring events",
}, []string{innerRingLabelType, innerRingLabelSuccess})
)
mustRegister(epoch)
mustRegister(health)
mustRegister(eventDuration)
return &InnerRingServiceMetrics{
epoch: epoch,
health: health,
eventDuration: eventDuration,
}
}
@ -45,3 +63,10 @@ func (m InnerRingServiceMetrics) SetEpoch(epoch uint64) {
func (m InnerRingServiceMetrics) SetHealth(s int32) {
m.health.value.Set(float64(s))
}
func (m InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success bool) {
m.eventDuration.value.With(prometheus.Labels{
innerRingLabelType: typ,
innerRingLabelSuccess: strconv.FormatBool(success),
}).Observe(d.Seconds())
}