Compare commits

...

1 commit

Author SHA1 Message Date
0f7d3219b8 [#374] Add inner-ring event metrics
All checks were successful
ci/woodpecker/pr/pre-commit Pipeline was successful
Signed-off-by: Alejandro Lopez <a.lopez@yadro.com>
2023-05-29 12:03:49 +03:00
27 changed files with 287 additions and 113 deletions

View file

@ -52,6 +52,7 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
s.netmapProcessor, err = netmap.New(&netmap.Params{
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.netmap"),
NetmapClient: netmap.NewNetmapClient(s.netmapClient),
EpochTimer: s,
@ -162,6 +163,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
// create governance processor
governanceProcessor, err := governance.New(&governance.Params{
Log: s.log,
Metrics: s.metrics,
FrostFSClient: frostfsCli,
NetmapClient: s.netmapClient,
AlphabetState: s,
@ -231,6 +233,7 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
ParsedWallets: parsedWallets,
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.alphabet"),
AlphabetContracts: s.contracts.alphabet,
NetmapClient: s.netmapClient,
@ -255,6 +258,7 @@ func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.C
// container processor
containerProcessor, err := cont.New(&cont.Params{
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.container"),
AlphabetState: s,
ContainerClient: cnrClient,
@ -273,6 +277,7 @@ func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClien
// create balance processor
balanceProcessor, err := balance.New(&balance.Params{
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.balance"),
FrostFSClient: frostfsCli,
BalanceSC: s.contracts.balance,
@ -293,6 +298,7 @@ func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *
frostfsProcessor, err := frostfs.New(&frostfs.Params{
Log: s.log,
Metrics: s.metrics,
PoolSize: cfg.GetInt("workers.frostfs"),
FrostFSContract: s.contracts.frostfs,
FrostFSIDClient: frostfsIDClient,

View file

@ -0,0 +1,15 @@
package metrics
import "time"
type Register interface {
SetEpoch(epoch uint64)
SetHealth(s int32)
AddEvent(d time.Duration, typ string, success bool)
}
type DefaultRegister struct{}
func (DefaultRegister) SetEpoch(uint64) {}
func (DefaultRegister) SetHealth(int32) {}
func (DefaultRegister) AddEvent(time.Duration, string, bool) {}

View file

@ -2,6 +2,7 @@ package alphabet
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"go.uber.org/zap"
@ -13,7 +14,7 @@ func (ap *Processor) HandleGasEmission(ev event.Event) {
// send event to the worker pool
err := ap.pool.Submit(func() { ap.processEmit() })
err := processors.SubmitEvent(ap.pool, ap.metrics, "alphabet_emit_gas", ap.processEmit)
if err != nil {
// there system can be moved into controlled degradation stage
ap.log.Warn(logs.AlphabetAlphabetProcessorWorkerPoolDrained,

View file

@ -13,12 +13,12 @@ import (
const emitMethod = "emit"
func (ap *Processor) processEmit() {
func (ap *Processor) processEmit() bool {
index := ap.irList.AlphabetIndex()
if index < 0 {
ap.log.Info(logs.AlphabetNonAlphabetModeIgnoreGasEmissionEvent)
return
return true
}
contract, ok := ap.alphabetContracts.GetByIndex(index)
@ -26,7 +26,7 @@ func (ap *Processor) processEmit() {
ap.log.Debug(logs.AlphabetNodeIsOutOfAlphabetRangeIgnoreGasEmissionEvent,
zap.Int("index", index))
return
return false
}
// there is no signature collecting, so we don't need extra fee
@ -34,13 +34,13 @@ func (ap *Processor) processEmit() {
if err != nil {
ap.log.Warn(logs.AlphabetCantInvokeAlphabetEmitMethod, zap.String("error", err.Error()))
return
return false
}
if ap.storageEmission == 0 {
ap.log.Info(logs.AlphabetStorageNodeEmissionIsOff)
return
return true
}
networkMap, err := ap.netmapClient.NetMap()
@ -48,7 +48,7 @@ func (ap *Processor) processEmit() {
ap.log.Warn(logs.AlphabetCantGetNetmapSnapshotToEmitGasToStorageNodes,
zap.String("error", err.Error()))
return
return false
}
nmNodes := networkMap.Nodes()
@ -63,7 +63,7 @@ func (ap *Processor) processEmit() {
zap.Int("extra_wallets", extraLen))
if nmLen+extraLen == 0 {
return
return true
}
gasPerNode := fixedn.Fixed8(ap.storageEmission / uint64(nmLen+extraLen))
@ -71,6 +71,8 @@ func (ap *Processor) processEmit() {
ap.transferGasToNetmapNodes(nmNodes, gasPerNode)
ap.transferGasToExtraNodes(pw, gasPerNode)
return true
}
func (ap *Processor) transferGasToNetmapNodes(nmNodes []netmap.NodeInfo, gasPerNode fixedn.Fixed8) {

View file

@ -7,6 +7,7 @@ import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
@ -50,6 +51,7 @@ type (
// protects parsedWallets from concurrent change
pwLock *sync.RWMutex
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
alphabetContracts Contracts
netmapClient netmapClient
@ -62,6 +64,7 @@ type (
Params struct {
ParsedWallets []util.Uint160
Log *logger.Logger
Metrics metrics.Register
PoolSize int
AlphabetContracts Contracts
NetmapClient netmapClient
@ -89,10 +92,16 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/frostfs: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
parsedWallets: p.ParsedWallets,
pwLock: new(sync.RWMutex),
log: p.Log,
metrics: metricsRegister,
pool: pool,
alphabetContracts: p.AlphabetContracts,
netmapClient: p.NetmapClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
"go.uber.org/zap"
@ -17,7 +18,9 @@ func (bp *Processor) handleLock(ev event.Event) {
// send an event to the worker pool
err := bp.pool.Submit(func() { bp.processLock(&lock) })
err := processors.SubmitEvent(bp.pool, bp.metrics, "lock", func() bool {
return bp.processLock(&lock)
})
if err != nil {
// there system can be moved into controlled degradation stage
bp.log.Warn(logs.BalanceBalanceWorkerPoolDrained,

View file

@ -9,10 +9,10 @@ import (
// Process lock event by invoking Cheque method in main net to send assets
// back to the withdraw issuer.
func (bp *Processor) processLock(lock *balanceEvent.Lock) {
func (bp *Processor) processLock(lock *balanceEvent.Lock) bool {
if !bp.alphabetState.IsAlphabet() {
bp.log.Info(logs.BalanceNonAlphabetModeIgnoreBalanceLock)
return
return true
}
prm := frostfsContract.ChequePrm{}
@ -26,5 +26,8 @@ func (bp *Processor) processLock(lock *balanceEvent.Lock) {
err := bp.frostfsClient.Cheque(prm)
if err != nil {
bp.log.Error(logs.BalanceCantSendLockAssetTx, zap.Error(err))
return false
}
return true
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
balanceEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/balance"
@ -32,6 +33,7 @@ type (
// Processor of events produced by balance contract in the morphchain.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
frostfsClient FrostFSClient
balanceSC util.Uint160
@ -42,6 +44,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
PoolSize int
FrostFSClient FrostFSClient
BalanceSC util.Uint160
@ -72,8 +75,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/balance: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
frostfsClient: p.FrostFSClient,
balanceSC: p.BalanceSC,

View file

@ -4,6 +4,7 @@ import (
"crypto/sha256"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
"github.com/mr-tron/base58"
@ -20,7 +21,9 @@ func (cp *Processor) handlePut(ev event.Event) {
// send an event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerPut(put) })
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_put", func() bool {
return cp.processContainerPut(put)
})
if err != nil {
// there system can be moved into controlled degradation stage
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
@ -36,7 +39,9 @@ func (cp *Processor) handleDelete(ev event.Event) {
// send an event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerDelete(del) })
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_delete", func() bool {
return cp.processContainerDelete(del)
})
if err != nil {
// there system can be moved into controlled degradation stage
cp.log.Warn(logs.ContainerContainerProcessorWorkerPoolDrained,
@ -53,8 +58,8 @@ func (cp *Processor) handleSetEACL(ev event.Event) {
// send an event to the worker pool
err := cp.pool.Submit(func() {
cp.processSetEACL(e)
err := processors.SubmitEvent(cp.pool, cp.metrics, "container_set_eacl", func() bool {
return cp.processSetEACL(e)
})
if err != nil {
// there system can be moved into controlled degradation stage

View file

@ -31,10 +31,10 @@ type putContainerContext struct {
// Process a new container from the user by checking the container sanity
// and sending approve tx back to the morph.
func (cp *Processor) processContainerPut(put putEvent) {
func (cp *Processor) processContainerPut(put putEvent) bool {
if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerPut)
return
return true
}
ctx := &putContainerContext{
@ -47,10 +47,17 @@ func (cp *Processor) processContainerPut(put putEvent) {
zap.String("error", err.Error()),
)
return
return false
}
cp.approvePutContainer(ctx)
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
zap.String("error", err.Error()),
)
return false
}
return true
}
func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
@ -89,20 +96,12 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
return nil
}
func (cp *Processor) approvePutContainer(ctx *putContainerContext) {
if err := cp.morphClient.NotarySignAndInvokeTX(ctx.e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApprovePutContainer,
zap.String("error", err.Error()),
)
}
}
// Process delete container operation from the user by checking container sanity
// and sending approve tx back to morph.
func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
func (cp *Processor) processContainerDelete(e containerEvent.Delete) bool {
if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreContainerDelete)
return
return true
}
err := cp.checkDeleteContainer(e)
@ -111,10 +110,18 @@ func (cp *Processor) processContainerDelete(e containerEvent.Delete) {
zap.String("error", err.Error()),
)
return
return false
}
cp.approveDeleteContainer(e)
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
zap.String("error", err.Error()),
)
return false
}
return true
}
func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
@ -149,14 +156,6 @@ func (cp *Processor) checkDeleteContainer(e containerEvent.Delete) error {
return nil
}
func (cp *Processor) approveDeleteContainer(e containerEvent.Delete) {
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveDeleteContainer,
zap.String("error", err.Error()),
)
}
}
func checkNNS(ctx *putContainerContext, cnr containerSDK.Container) error {
// fetch domain info
ctx.d = containerSDK.ReadDomain(cnr)

View file

@ -12,10 +12,10 @@ import (
"go.uber.org/zap"
)
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
func (cp *Processor) processSetEACL(e containerEvent.SetEACL) bool {
if !cp.alphabetState.IsAlphabet() {
cp.log.Info(logs.ContainerNonAlphabetModeIgnoreSetEACL)
return
return true
}
err := cp.checkSetEACL(e)
@ -24,10 +24,17 @@ func (cp *Processor) processSetEACL(e containerEvent.SetEACL) {
zap.String("error", err.Error()),
)
return
return false
}
cp.approveSetEACL(e)
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
zap.String("error", err.Error()),
)
return false
}
return true
}
func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
@ -73,11 +80,3 @@ func (cp *Processor) checkSetEACL(e containerEvent.SetEACL) error {
return nil
}
func (cp *Processor) approveSetEACL(e containerEvent.SetEACL) {
if err := cp.morphClient.NotarySignAndInvokeTX(e.NotaryRequest().MainTransaction); err != nil {
cp.log.Error(logs.ContainerCouldNotApproveSetEACL,
zap.String("error", err.Error()),
)
}
}

View file

@ -6,6 +6,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
containerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/container"
@ -40,6 +41,7 @@ type (
// Processor of events produced by container contract in the sidechain.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
alphabetState AlphabetState
cnrClient ContClient // notary must be enabled
@ -51,6 +53,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
PoolSize int
AlphabetState AlphabetState
ContainerClient ContClient
@ -102,8 +105,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
alphabetState: p.AlphabetState,
cnrClient: p.ContainerClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
frostfsEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/frostfs"
"github.com/nspcc-dev/neo-go/pkg/util/slice"
@ -18,7 +19,9 @@ func (np *Processor) handleDeposit(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processDeposit(deposit) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_deposit", func() bool {
return np.processDeposit(deposit)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -34,7 +37,9 @@ func (np *Processor) handleWithdraw(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processWithdraw(withdraw) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_withdraw", func() bool {
return np.processWithdraw(withdraw)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -50,7 +55,9 @@ func (np *Processor) handleCheque(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processCheque(cheque) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_cheque", func() bool {
return np.processCheque(cheque)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -67,7 +74,9 @@ func (np *Processor) handleConfig(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processConfig(cfg) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_config", func() bool {
return np.processConfig(cfg)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -83,7 +92,9 @@ func (np *Processor) handleBind(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processBind(e, true) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_bind", func() bool {
return np.processBind(e, true)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,
@ -99,7 +110,9 @@ func (np *Processor) handleUnbind(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() { np.processBind(e, false) })
err := processors.SubmitEvent(np.pool, np.metrics, "frostfs_unbind", func() bool {
return np.processBind(e, false)
})
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.FrostFSFrostfsProcessorWorkerPoolDrained,

View file

@ -15,10 +15,10 @@ const (
// Process deposit event by invoking a balance contract and sending native
// gas in the sidechain.
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreDeposit)
return
return true
}
prm := balance.MintPrm{}
@ -49,7 +49,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
zap.Uint64("last_emission", val),
zap.Uint64("current_epoch", curEpoch))
return
return false
}
// get gas balance of the node
@ -57,7 +57,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
balance, err := np.morphClient.GasBalance()
if err != nil {
np.log.Error(logs.FrostFSCantGetGasBalanceOfTheNode, zap.Error(err))
return
return false
}
if balance < np.gasBalanceThreshold {
@ -65,7 +65,7 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
zap.Int64("balance", balance),
zap.Int64("threshold", np.gasBalanceThreshold))
return
return false
}
err = np.morphClient.TransferGas(receiver, np.mintEmitValue)
@ -73,24 +73,26 @@ func (np *Processor) processDeposit(deposit frostfsEvent.Deposit) {
np.log.Error(logs.FrostFSCantTransferNativeGasToReceiver,
zap.String("error", err.Error()))
return
return false
}
np.mintEmitCache.Add(receiver.String(), curEpoch)
return true
}
// Process withdraw event by locking assets in the balance account.
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreWithdraw)
return
return true
}
// create lock account
lock, err := util.Uint160DecodeBytesBE(withdraw.ID()[:util.Uint160Size])
if err != nil {
np.log.Error(logs.FrostFSCantCreateLockAccount, zap.Error(err))
return
return false
}
curEpoch := np.epochState.EpochCounter()
@ -106,15 +108,18 @@ func (np *Processor) processWithdraw(withdraw frostfsEvent.Withdraw) {
err = np.balanceClient.Lock(prm)
if err != nil {
np.log.Error(logs.FrostFSCantLockAssetsForWithdraw, zap.Error(err))
return false
}
return true
}
// Process cheque event by transferring assets from the lock account back to
// the reserve account.
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
func (np *Processor) processCheque(cheque frostfsEvent.Cheque) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreCheque)
return
return true
}
prm := balance.BurnPrm{}
@ -126,5 +131,8 @@ func (np *Processor) processCheque(cheque frostfsEvent.Cheque) {
err := np.balanceClient.Burn(prm)
if err != nil {
np.log.Error(logs.FrostFSCantTransferAssetsToFedContract, zap.Error(err))
return false
}
return true
}

View file

@ -18,10 +18,10 @@ type bindCommon interface {
TxHash() util.Uint256
}
func (np *Processor) processBind(e bindCommon, bind bool) {
func (np *Processor) processBind(e bindCommon, bind bool) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreBind)
return
return true
}
c := &bindCommonContext{
@ -36,10 +36,10 @@ func (np *Processor) processBind(e bindCommon, bind bool) {
zap.String("error", err.Error()),
)
return
return false
}
np.approveBindCommon(c)
return np.approveBindCommon(c) == nil
}
type bindCommonContext struct {
@ -70,7 +70,7 @@ func (np *Processor) checkBindCommon(e *bindCommonContext) error {
return nil
}
func (np *Processor) approveBindCommon(e *bindCommonContext) {
func (np *Processor) approveBindCommon(e *bindCommonContext) error {
// calculate wallet address
scriptHash := e.User()
@ -80,7 +80,7 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
zap.String("error", err.Error()),
)
return
return err
}
var id user.ID
@ -104,4 +104,6 @@ func (np *Processor) approveBindCommon(e *bindCommonContext) {
np.log.Error(fmt.Sprintf("could not approve %s", typ),
zap.String("error", err.Error()))
}
return err
}

View file

@ -9,10 +9,10 @@ import (
// Process config event by setting configuration value from the mainchain in
// the sidechain.
func (np *Processor) processConfig(config frostfsEvent.Config) {
func (np *Processor) processConfig(config frostfsEvent.Config) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.FrostFSNonAlphabetModeIgnoreConfig)
return
return true
}
prm := nmClient.SetConfigPrm{}
@ -25,5 +25,8 @@ func (np *Processor) processConfig(config frostfsEvent.Config) {
err := np.netmapClient.SetConfig(prm)
if err != nil {
np.log.Error(logs.FrostFSCantRelaySetConfigEvent, zap.Error(err))
return false
}
return true
}

View file

@ -6,6 +6,7 @@ import (
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/balance"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfsid"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
@ -58,6 +59,7 @@ type (
// Processor of events produced by frostfs contract in main net.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
frostfsContract util.Uint160
balanceClient BalanceClient
@ -77,6 +79,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
PoolSize int
FrostFSContract util.Uint160
FrostFSIDClient IDClient
@ -129,8 +132,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/frostfs: can't create LRU cache for gas emission: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
frostfsContract: p.FrostFSContract,
balanceClient: p.BalanceClient,

View file

@ -2,6 +2,7 @@ package governance
import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/rolemanagement"
"github.com/nspcc-dev/neo-go/pkg/core/native"
@ -35,7 +36,9 @@ func (gp *Processor) HandleAlphabetSync(e event.Event) {
// send event to the worker pool
err := gp.pool.Submit(func() { gp.processAlphabetSync(hash) })
err := processors.SubmitEvent(gp.pool, gp.metrics, "alphabet_sync", func() bool {
return gp.processAlphabetSync(hash)
})
if err != nil {
// there system can be moved into controlled degradation stage
gp.log.Warn(logs.GovernanceGovernanceWorkerPoolDrained,

View file

@ -18,36 +18,36 @@ const (
alphabetUpdateIDPrefix = "AlphabetUpdate"
)
func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
func (gp *Processor) processAlphabetSync(txHash util.Uint256) bool {
if !gp.alphabetState.IsAlphabet() {
gp.log.Info(logs.GovernanceNonAlphabetModeIgnoreAlphabetSync)
return
return true
}
mainnetAlphabet, err := gp.mainnetClient.NeoFSAlphabetList()
if err != nil {
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromMainNet,
zap.String("error", err.Error()))
return
return false
}
sidechainAlphabet, err := gp.morphClient.Committee()
if err != nil {
gp.log.Error(logs.GovernanceCantFetchAlphabetListFromSideChain,
zap.String("error", err.Error()))
return
return false
}
newAlphabet, err := newAlphabetList(sidechainAlphabet, mainnetAlphabet)
if err != nil {
gp.log.Error(logs.GovernanceCantMergeAlphabetListsFromMainNetAndSideChain,
zap.String("error", err.Error()))
return
return false
}
if newAlphabet == nil {
gp.log.Info(logs.GovernanceNoGovernanceUpdateAlphabetListHasNotBeenChanged)
return
return true
}
gp.log.Info(logs.GovernanceAlphabetListHasBeenChangedStartingUpdate,
@ -77,6 +77,8 @@ func (gp *Processor) processAlphabetSync(txHash util.Uint256) {
gp.updateFrostFSContractInMainnet(newAlphabet)
gp.log.Info(logs.GovernanceFinishedAlphabetListUpdate)
return true
}
func prettyKeys(keys keys.PublicKeys) string {

View file

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
frostfscontract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/frostfs"
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
@ -75,6 +76,7 @@ type (
// Processor of events related to governance in the network.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
frostfsClient FrostFSClient
netmapClient NetmapClient
@ -93,6 +95,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
AlphabetState AlphabetState
EpochState EpochState
@ -130,11 +133,17 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/governance: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
// result is cached by neo-go, so we can pre-calc it
designate := p.MainnetClient.GetDesignateHash()
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
frostfsClient: p.FrostFSClient,
netmapClient: p.NetmapClient,

View file

@ -4,6 +4,7 @@ import (
"encoding/hex"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors"
timerEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/timers"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
netmapEvent "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event/netmap"
@ -16,7 +17,7 @@ func (np *Processor) HandleNewEpochTick(ev event.Event) {
// send an event to the worker pool
err := np.pool.Submit(func() { np.processNewEpochTick() })
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch_tick", np.processNewEpochTick)
if err != nil {
// there system can be moved into controlled degradation stage
np.log.Warn(logs.NetmapNetmapWorkerPoolDrained,
@ -32,8 +33,8 @@ func (np *Processor) handleNewEpoch(ev event.Event) {
// send an event to the worker pool
err := np.pool.Submit(func() {
np.processNewEpoch(epochEvent)
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_new_epoch", func() bool {
return np.processNewEpoch(epochEvent)
})
if err != nil {
// there system can be moved into controlled degradation stage
@ -51,8 +52,8 @@ func (np *Processor) handleAddPeer(ev event.Event) {
// send an event to the worker pool
err := np.pool.Submit(func() {
np.processAddPeer(newPeer)
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_add_peer", func() bool {
return np.processAddPeer(newPeer)
})
if err != nil {
// there system can be moved into controlled degradation stage
@ -69,8 +70,8 @@ func (np *Processor) handleUpdateState(ev event.Event) {
// send event to the worker pool
err := np.pool.Submit(func() {
np.processUpdatePeer(updPeer)
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_update_peer", func() bool {
return np.processUpdatePeer(updPeer)
})
if err != nil {
// there system can be moved into controlled degradation stage
@ -91,8 +92,8 @@ func (np *Processor) handleCleanupTick(ev event.Event) {
np.log.Info(logs.NetmapTick, zap.String("type", "netmap cleaner"))
// send event to the worker pool
err := np.pool.Submit(func() {
np.processNetmapCleanupTick(cleanup)
err := processors.SubmitEvent(np.pool, np.metrics, "netmap_cleanup_tick", func() bool {
return np.processNetmapCleanupTick(cleanup)
})
if err != nil {
// there system can be moved into controlled degradation stage

View file

@ -7,11 +7,11 @@ import (
"go.uber.org/zap"
)
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewNetmapCleanupTick)
return
return true
}
err := np.netmapSnapshot.forEachRemoveCandidate(ev.epoch, func(s string) error {
@ -47,5 +47,8 @@ func (np *Processor) processNetmapCleanupTick(ev netmapCleanupTick) {
if err != nil {
np.log.Warn(logs.NetmapCantIterateOnNetmapCleanerCache,
zap.String("error", err.Error()))
return false
}
return true
}

View file

@ -10,7 +10,7 @@ import (
// Process new epoch notification by setting global epoch value and resetting
// local epoch timer.
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) bool {
epoch := ev.EpochNumber()
epochDuration, err := np.netmapClient.EpochDuration()
@ -41,7 +41,7 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
np.log.Warn(logs.NetmapCantGetNetmapSnapshotToPerformCleanup,
zap.String("error", err.Error()))
return
return false
}
prm := cntClient.StartEstimationPrm{}
@ -63,13 +63,15 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
np.handleNotaryDeposit(ev)
return true
}
// Process new epoch tick by invoking new epoch method in network map contract.
func (np *Processor) processNewEpochTick() {
func (np *Processor) processNewEpochTick() bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewEpochTick)
return
return true
}
nextEpoch := np.epochState.EpochCounter() + 1
@ -78,5 +80,8 @@ func (np *Processor) processNewEpochTick() {
err := np.netmapClient.NewEpoch(nextEpoch, false)
if err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapNewEpoch, zap.Error(err))
return false
}
return true
}

View file

@ -12,10 +12,10 @@ import (
// Process add peer notification by sanity check of new node
// local epoch timer.
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreNewPeerNotification)
return
return true
}
// check if notary transaction is valid, see #976
@ -26,7 +26,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
zap.String("method", "netmap.AddPeer"),
zap.String("hash", tx.Hash().StringLE()),
zap.Error(err))
return
return false
}
// unmarshal node info
@ -34,7 +34,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
if err := nodeInfo.Unmarshal(ev.Node()); err != nil {
// it will be nice to have tx id at event structure to log it
np.log.Warn(logs.NetmapCantParseNetworkMapCandidate)
return
return false
}
// validate and update node info
@ -44,7 +44,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
zap.String("error", err.Error()),
)
return
return false
}
// sort attributes to make it consistent
@ -81,15 +81,18 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) {
if err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapAddPeer, zap.Error(err))
}
return false
}
}
return true
}
// Process update peer notification by sending approval tx to the smart contract.
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) bool {
if !np.alphabetState.IsAlphabet() {
np.log.Info(logs.NetmapNonAlphabetModeIgnoreUpdatePeerNotification)
return
return true
}
// flag node to remove from local view, so it can be re-bootstrapped
@ -105,11 +108,14 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) {
zap.Error(err),
)
return
return false
}
}
if err = np.netmapClient.MorphNotarySignAndInvokeTX(ev.NotaryRequest().MainTransaction); err != nil {
np.log.Error(logs.NetmapCantInvokeNetmapUpdatePeer, zap.Error(err))
return false
}
return true
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/processors/netmap/nodevalidation/state"
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
@ -72,6 +73,7 @@ type (
// and new epoch ticker, because it is related to contract.
Processor struct {
log *logger.Logger
metrics metrics.Register
pool *ants.Pool
epochTimer EpochTimerReseter
epochState EpochState
@ -93,6 +95,7 @@ type (
// Params of the processor constructor.
Params struct {
Log *logger.Logger
Metrics metrics.Register
PoolSize int
NetmapClient Client
EpochTimer EpochTimerReseter
@ -145,8 +148,14 @@ func New(p *Params) (*Processor, error) {
return nil, fmt.Errorf("ir/netmap: can't create worker pool: %w", err)
}
metricsRegister := p.Metrics
if metricsRegister == nil {
metricsRegister = metrics.DefaultRegister{}
}
return &Processor{
log: p.Log,
metrics: metricsRegister,
pool: pool,
epochTimer: p.EpochTimer,
epochState: p.EpochState,

View file

@ -0,0 +1,16 @@
package processors
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/innerring/metrics"
"github.com/panjf2000/ants/v2"
)
func SubmitEvent(pool *ants.Pool, metrics metrics.Register, eventLabel string, eventProcessor func() bool) error {
return pool.Submit(func() {
start := time.Now()
success := eventProcessor()
metrics.AddEvent(time.Since(start), eventLabel, success)
})
}

View file

@ -1,13 +1,23 @@
package metrics
import "github.com/prometheus/client_golang/prometheus"
import (
"strconv"
"time"
const innerRingSubsystem = "ir"
"github.com/prometheus/client_golang/prometheus"
)
const (
innerRingSubsystem = "ir"
innerRingLabelSuccess = "success"
innerRingLabelType = "type"
)
// InnerRingServiceMetrics contains metrics collected by inner ring.
type InnerRingServiceMetrics struct {
epoch metric[prometheus.Gauge]
health metric[prometheus.Gauge]
eventDuration metric[*prometheus.HistogramVec]
}
// NewInnerRingMetrics returns new instance of metrics collectors for inner ring.
@ -25,14 +35,22 @@ func NewInnerRingMetrics() *InnerRingServiceMetrics {
Name: "health",
Help: "Current inner-ring node state.",
})
eventDuration = newHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: innerRingSubsystem,
Name: "event_duration_seconds",
Help: "Duration of processing of inner-ring events",
}, []string{innerRingLabelType, innerRingLabelSuccess})
)
mustRegister(epoch)
mustRegister(health)
mustRegister(eventDuration)
return &InnerRingServiceMetrics{
epoch: epoch,
health: health,
eventDuration: eventDuration,
}
}
@ -45,3 +63,10 @@ func (m InnerRingServiceMetrics) SetEpoch(epoch uint64) {
func (m InnerRingServiceMetrics) SetHealth(s int32) {
m.health.value.Set(float64(s))
}
func (m InnerRingServiceMetrics) AddEvent(d time.Duration, typ string, success bool) {
m.eventDuration.value.With(prometheus.Labels{
innerRingLabelType: typ,
innerRingLabelSuccess: strconv.FormatBool(success),
}).Observe(d.Seconds())
}