forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
11 changed files with 182 additions and 15 deletions
|
@ -9,6 +9,7 @@ import (
|
|||
morphconfig "git.frostfs.info/TrueCloudLab/frostfs-node/cmd/frostfs-node/config/morph"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
nmClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
||||
|
@ -43,6 +44,7 @@ func initMorphComponents(ctx context.Context, c *cfg) {
|
|||
c.key,
|
||||
client.WithDialTimeout(morphconfig.DialTimeout(c.appCfg)),
|
||||
client.WithLogger(c.log),
|
||||
client.WithMetrics(metrics.NewMorphClientMetrics()),
|
||||
client.WithEndpoints(addresses...),
|
||||
client.WithConnLostCallback(func() {
|
||||
c.internalErr <- errors.New("morph connection has been lost")
|
||||
|
|
|
@ -52,7 +52,7 @@ func (s *Server) initNetmapProcessor(cfg *viper.Viper,
|
|||
|
||||
s.netmapProcessor, err = netmap.New(&netmap.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
Metrics: s.irMetrics,
|
||||
PoolSize: cfg.GetInt("workers.netmap"),
|
||||
NetmapClient: netmap.NewNetmapClient(s.netmapClient),
|
||||
EpochTimer: s,
|
||||
|
@ -163,7 +163,7 @@ func (s *Server) createAlphaSync(cfg *viper.Viper, frostfsCli *frostfsClient.Cli
|
|||
// create governance processor
|
||||
governanceProcessor, err := governance.New(&governance.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
Metrics: s.irMetrics,
|
||||
FrostFSClient: frostfsCli,
|
||||
NetmapClient: s.netmapClient,
|
||||
AlphabetState: s,
|
||||
|
@ -233,7 +233,7 @@ func (s *Server) initAlphabetProcessor(cfg *viper.Viper) error {
|
|||
s.alphabetProcessor, err = alphabet.New(&alphabet.Params{
|
||||
ParsedWallets: parsedWallets,
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
Metrics: s.irMetrics,
|
||||
PoolSize: cfg.GetInt("workers.alphabet"),
|
||||
AlphabetContracts: s.contracts.alphabet,
|
||||
NetmapClient: s.netmapClient,
|
||||
|
@ -258,7 +258,7 @@ func (s *Server) initContainerProcessor(cfg *viper.Viper, cnrClient *container.C
|
|||
// container processor
|
||||
containerProcessor, err := cont.New(&cont.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
Metrics: s.irMetrics,
|
||||
PoolSize: cfg.GetInt("workers.container"),
|
||||
AlphabetState: s,
|
||||
ContainerClient: cnrClient,
|
||||
|
@ -277,7 +277,7 @@ func (s *Server) initBalanceProcessor(cfg *viper.Viper, frostfsCli *frostfsClien
|
|||
// create balance processor
|
||||
balanceProcessor, err := balance.New(&balance.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
Metrics: s.irMetrics,
|
||||
PoolSize: cfg.GetInt("workers.balance"),
|
||||
FrostFSClient: frostfsCli,
|
||||
BalanceSC: s.contracts.balance,
|
||||
|
@ -298,7 +298,7 @@ func (s *Server) initFrostFSMainnetProcessor(cfg *viper.Viper, frostfsIDClient *
|
|||
|
||||
frostfsProcessor, err := frostfs.New(&frostfs.Params{
|
||||
Log: s.log,
|
||||
Metrics: s.metrics,
|
||||
Metrics: s.irMetrics,
|
||||
PoolSize: cfg.GetInt("workers.frostfs"),
|
||||
FrostFSContract: s.contracts.frostfs,
|
||||
FrostFSIDClient: frostfsIDClient,
|
||||
|
@ -474,7 +474,7 @@ func (s *Server) initMorph(ctx context.Context, cfg *viper.Viper, errChan chan<-
|
|||
key: s.key,
|
||||
name: morphPrefix,
|
||||
from: fromSideChainBlock,
|
||||
morphCacheMetric: s.metrics.MorphCacheMetrics(),
|
||||
morphCacheMetric: s.irMetrics.MorphCacheMetrics(),
|
||||
}
|
||||
|
||||
// create morph client
|
||||
|
|
|
@ -58,7 +58,7 @@ type (
|
|||
persistate *state.PersistentStorage
|
||||
|
||||
// metrics
|
||||
metrics *metrics.InnerRingServiceMetrics
|
||||
irMetrics *metrics.InnerRingServiceMetrics
|
||||
|
||||
// notary configuration
|
||||
feeConfig *config.FeeConfig
|
||||
|
@ -328,8 +328,8 @@ func (s *Server) registerStarter(f func() error) {
|
|||
func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan<- error) (*Server, error) {
|
||||
var err error
|
||||
server := &Server{
|
||||
log: log,
|
||||
metrics: metrics.NewInnerRingMetrics(),
|
||||
log: log,
|
||||
irMetrics: metrics.NewInnerRingMetrics(),
|
||||
}
|
||||
|
||||
server.setHealthStatus(control.HealthStatus_HEALTH_STATUS_UNDEFINED)
|
||||
|
|
|
@ -29,8 +29,8 @@ func (s *Server) EpochCounter() uint64 {
|
|||
// epoch counter.
|
||||
func (s *Server) SetEpochCounter(val uint64) {
|
||||
s.epochCounter.Store(val)
|
||||
if s.metrics != nil {
|
||||
s.metrics.SetEpoch(val)
|
||||
if s.irMetrics != nil {
|
||||
s.irMetrics.SetEpoch(val)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,8 +154,8 @@ func (s *Server) ResetEpochTimer(h uint32) error {
|
|||
|
||||
func (s *Server) setHealthStatus(hs control.HealthStatus) {
|
||||
s.healthStatus.Store(int32(hs))
|
||||
if s.metrics != nil {
|
||||
s.metrics.SetHealth(int32(hs))
|
||||
if s.irMetrics != nil {
|
||||
s.irMetrics.SetHealth(int32(hs))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
82
pkg/metrics/morph.go
Normal file
82
pkg/metrics/morph.go
Normal file
|
@ -0,0 +1,82 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/metrics"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
morphSubsystem = "morph"
|
||||
morphNotificationTypeLabel = "notification_type"
|
||||
morphInvokeTypeLabel = "invoke_type"
|
||||
morphContractLabel = "contract"
|
||||
morphMethodLabel = "method"
|
||||
morphSuccessLabel = "success"
|
||||
)
|
||||
|
||||
type morphClientMetrics struct {
|
||||
switchCount prometheus.Counter
|
||||
lastBlock prometheus.Gauge
|
||||
notificationCount *prometheus.CounterVec
|
||||
invokeDuration *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
func NewMorphClientMetrics() morphmetrics.Register {
|
||||
return &morphClientMetrics{
|
||||
switchCount: metrics.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: morphSubsystem,
|
||||
Name: "switch_count",
|
||||
Help: "Number of endpoint switches",
|
||||
}),
|
||||
lastBlock: metrics.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: morphSubsystem,
|
||||
Name: "last_block",
|
||||
Help: "Index of the last received block",
|
||||
}),
|
||||
notificationCount: metrics.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: morphSubsystem,
|
||||
Name: "notification_count",
|
||||
Help: "Number of notifications received by notification type",
|
||||
}, []string{morphNotificationTypeLabel}),
|
||||
invokeDuration: metrics.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: morphSubsystem,
|
||||
Name: "invoke_duration_seconds",
|
||||
Help: "Cummulative duration of contract invocations",
|
||||
}, []string{morphInvokeTypeLabel, morphContractLabel, morphMethodLabel, morphSuccessLabel}),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *morphClientMetrics) IncSwitchCount() {
|
||||
m.switchCount.Inc()
|
||||
}
|
||||
|
||||
func (m *morphClientMetrics) SetLastBlock(index uint32) {
|
||||
m.lastBlock.Set(float64(index))
|
||||
}
|
||||
|
||||
func (m *morphClientMetrics) IncNotificationCount(typ string) {
|
||||
m.notificationCount.With(
|
||||
prometheus.Labels{
|
||||
morphNotificationTypeLabel: typ,
|
||||
},
|
||||
).Inc()
|
||||
}
|
||||
|
||||
func (m *morphClientMetrics) ObserveInvoke(typ string, contract string, method string, success bool, d time.Duration) {
|
||||
m.invokeDuration.With(
|
||||
prometheus.Labels{
|
||||
morphInvokeTypeLabel: typ,
|
||||
morphContractLabel: contract,
|
||||
morphMethodLabel: method,
|
||||
morphSuccessLabel: strconv.FormatBool(success),
|
||||
},
|
||||
).Observe(d.Seconds())
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/native/noderoles"
|
||||
|
@ -48,7 +49,8 @@ import (
|
|||
type Client struct {
|
||||
cache cache
|
||||
|
||||
logger *logger.Logger // logging component
|
||||
logger *logger.Logger // logging component
|
||||
metrics morphmetrics.Register
|
||||
|
||||
client *rpcclient.WSClient // neo-go websocket client
|
||||
rpcActor *actor.Actor // neo-go RPC actor
|
||||
|
@ -172,6 +174,12 @@ func wrapFrostFSError(err error) error {
|
|||
// Invoke invokes contract method by sending transaction into blockchain.
|
||||
// Supported args types: int64, string, util.Uint160, []byte and bool.
|
||||
func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...any) error {
|
||||
start := time.Now()
|
||||
success := false
|
||||
defer func() {
|
||||
c.metrics.ObserveInvoke("Invoke", contract.String(), method, success, time.Since(start))
|
||||
}()
|
||||
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
|
||||
|
@ -189,6 +197,7 @@ func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string,
|
|||
zap.Uint32("vub", vub),
|
||||
zap.Stringer("tx_hash", txHash.Reverse()))
|
||||
|
||||
success = true
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -196,6 +205,12 @@ func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string,
|
|||
// If cb returns an error, the session is closed and this error is returned as-is.
|
||||
// If the remove neo-go node does not support sessions, `unwrap.ErrNoSessionID` is returned.
|
||||
func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, contract util.Uint160, method string, args ...interface{}) error {
|
||||
start := time.Now()
|
||||
success := false
|
||||
defer func() {
|
||||
c.metrics.ObserveInvoke("TestInvokeIterator", contract.String(), method, success, time.Since(start))
|
||||
}()
|
||||
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
|
||||
|
@ -228,12 +243,20 @@ func (c *Client) TestInvokeIterator(cb func(stackitem.Item) error, contract util
|
|||
}
|
||||
items, err = c.rpcActor.TraverseIterator(sid, &r, 0)
|
||||
}
|
||||
|
||||
success = err == nil
|
||||
return err
|
||||
}
|
||||
|
||||
// TestInvoke invokes contract method locally in neo-go node. This method should
|
||||
// be used to read data from smart-contract.
|
||||
func (c *Client) TestInvoke(contract util.Uint160, method string, args ...any) (res []stackitem.Item, err error) {
|
||||
start := time.Now()
|
||||
success := false
|
||||
defer func() {
|
||||
c.metrics.ObserveInvoke("TestInvoke", contract.String(), method, success, time.Since(start))
|
||||
}()
|
||||
|
||||
c.switchLock.RLock()
|
||||
defer c.switchLock.RUnlock()
|
||||
|
||||
|
@ -250,6 +273,7 @@ func (c *Client) TestInvoke(contract util.Uint160, method string, args ...any) (
|
|||
return nil, wrapFrostFSError(¬HaltStateError{state: val.State, exception: val.FaultException})
|
||||
}
|
||||
|
||||
success = true
|
||||
return val.Stack, nil
|
||||
}
|
||||
|
||||
|
@ -512,6 +536,10 @@ func (c *Client) NotificationChannel() <-chan rpcclient.Notification {
|
|||
return c.client.Notifications //lint:ignore SA1019 waits for neo-go v0.102.0 https://github.com/nspcc-dev/neo-go/pull/2980
|
||||
}
|
||||
|
||||
func (c *Client) Metrics() morphmetrics.Register {
|
||||
return c.metrics
|
||||
}
|
||||
|
||||
func (c *Client) setActor(act *actor.Actor) {
|
||||
c.rpcActor = act
|
||||
c.gasToken = nep17.New(act, gas.Hash)
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
morphmetrics "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
lru "github.com/hashicorp/golang-lru/v2"
|
||||
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
|
||||
|
@ -32,6 +33,8 @@ type cfg struct {
|
|||
|
||||
logger *logger.Logger // logging component
|
||||
|
||||
metrics morphmetrics.Register
|
||||
|
||||
waitInterval time.Duration
|
||||
|
||||
signer *transaction.Signer
|
||||
|
@ -60,6 +63,7 @@ func defaultConfig() *cfg {
|
|||
return &cfg{
|
||||
dialTimeout: defaultDialTimeout,
|
||||
logger: &logger.Logger{Logger: zap.L()},
|
||||
metrics: morphmetrics.NoopRegister{},
|
||||
waitInterval: defaultWaitInterval,
|
||||
signer: &transaction.Signer{
|
||||
Scopes: transaction.Global,
|
||||
|
@ -80,6 +84,7 @@ func defaultConfig() *cfg {
|
|||
// - signer with the global scope;
|
||||
// - wait interval: 500ms;
|
||||
// - logger: &logger.Logger{Logger: zap.L()}.
|
||||
// - metrics: metrics.NoopRegister
|
||||
//
|
||||
// If desired option satisfies the default value, it can be omitted.
|
||||
// If multiple options of the same config value are supplied,
|
||||
|
@ -109,6 +114,7 @@ func New(ctx context.Context, key *keys.PrivateKey, opts ...Option) (*Client, er
|
|||
cli := &Client{
|
||||
cache: newClientCache(cfg.morphCacheMetrics),
|
||||
logger: cfg.logger,
|
||||
metrics: cfg.metrics,
|
||||
acc: acc,
|
||||
accAddr: accAddr,
|
||||
cfg: *cfg,
|
||||
|
@ -235,6 +241,20 @@ func WithLogger(logger *logger.Logger) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithMetrics returns a client constructor option
|
||||
// that specifies the component for reporting metrics.
|
||||
//
|
||||
// Ignores nil value.
|
||||
//
|
||||
// If option not provided, NoopMetrics is used.
|
||||
func WithMetrics(metrics morphmetrics.Register) Option {
|
||||
return func(c *cfg) {
|
||||
if metrics != nil {
|
||||
c.metrics = metrics
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithSigner returns a client constructor option
|
||||
// that specifies the signer and the scope of the transaction.
|
||||
//
|
||||
|
|
|
@ -444,6 +444,12 @@ func (c *Client) notaryInvokeAsCommittee(method string, nonce, vub uint32, args
|
|||
}
|
||||
|
||||
func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint160, nonce uint32, vub *uint32, method string, args ...any) error {
|
||||
start := time.Now()
|
||||
success := false
|
||||
defer func() {
|
||||
c.metrics.ObserveInvoke("notaryInvoke", contract.String(), method, success, time.Since(start))
|
||||
}()
|
||||
|
||||
alphabetList, err := c.notary.alphabetSource()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -485,6 +491,7 @@ func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint
|
|||
zap.String("tx_hash", mainH.StringLE()),
|
||||
zap.String("fallback_hash", fbH.StringLE()))
|
||||
|
||||
success = true
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
17
pkg/morph/metrics/metrics.go
Normal file
17
pkg/morph/metrics/metrics.go
Normal file
|
@ -0,0 +1,17 @@
|
|||
package metrics
|
||||
|
||||
import "time"
|
||||
|
||||
type Register interface {
|
||||
IncSwitchCount()
|
||||
SetLastBlock(uint32)
|
||||
IncNotificationCount(notificationType string)
|
||||
ObserveInvoke(typ string, contract string, method string, success bool, d time.Duration)
|
||||
}
|
||||
|
||||
type NoopRegister struct{}
|
||||
|
||||
func (NoopRegister) IncSwitchCount() {}
|
||||
func (NoopRegister) SetLastBlock(uint32) {}
|
||||
func (NoopRegister) IncNotificationCount(string) {}
|
||||
func (NoopRegister) ObserveInvoke(string, string, string, bool, time.Duration) {}
|
|
@ -200,18 +200,22 @@ routeloop:
|
|||
break routeloop
|
||||
case ev, ok := <-curr.NotifyChan:
|
||||
if ok {
|
||||
s.client.Metrics().IncNotificationCount("notify")
|
||||
s.notifyChan <- ev
|
||||
} else {
|
||||
connLost = true
|
||||
}
|
||||
case ev, ok := <-curr.BlockChan:
|
||||
if ok {
|
||||
s.client.Metrics().IncNotificationCount("block")
|
||||
s.client.Metrics().SetLastBlock(ev.Index)
|
||||
s.blockChan <- ev
|
||||
} else {
|
||||
connLost = true
|
||||
}
|
||||
case ev, ok := <-curr.NotaryChan:
|
||||
if ok {
|
||||
s.client.Metrics().IncNotificationCount("notary")
|
||||
s.notaryChan <- ev
|
||||
} else {
|
||||
connLost = true
|
||||
|
@ -262,6 +266,7 @@ func (s *subscriber) switchEndpoint(ctx context.Context, finishCh chan<- bool) (
|
|||
s.current = chs
|
||||
s.Unlock()
|
||||
|
||||
s.client.Metrics().IncSwitchCount()
|
||||
return true, cliCh
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,12 @@ package tree
|
|||
|
||||
import "time"
|
||||
|
||||
type MetricsRegister interface {
|
||||
AddReplicateTaskDuration(time.Duration, bool)
|
||||
AddReplicateWaitDuration(time.Duration, bool)
|
||||
AddSyncDuration(time.Duration, bool)
|
||||
}
|
||||
|
||||
type defaultMetricsRegister struct{}
|
||||
|
||||
func (defaultMetricsRegister) AddReplicateTaskDuration(time.Duration, bool) {}
|
||||
|
|
Loading…
Reference in a new issue