Feature/6 monitoring pool node #7

Merged
KirillovDenis merged 2 commits from feature/6-monitoring_pool_node into master 2023-01-13 10:16:23 +00:00
3 changed files with 40 additions and 15 deletions

View file

@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"errors"
"go.uber.org/zap"
sessionv2 "github.com/TrueCloudLab/frostfs-api-go/v2/session"
"github.com/TrueCloudLab/frostfs-sdk-go/accounting"
@ -33,7 +34,7 @@ type mockClient struct {
func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
return &mockClient{
key: key,
clientStatusMonitor: newClientStatusMonitor(addr, 10),
clientStatusMonitor: newClientStatusMonitor(zap.NewExample(), addr, 10),
}
}

View file

@ -31,6 +31,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// client represents virtual connection to the single FrostFS network endpoint from which Pool is formed.
@ -101,6 +102,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
// clientStatusMonitor count error rate and other statistics for connection.
type clientStatusMonitor struct {
logger *zap.Logger
addr string
healthy *atomic.Bool
errorThreshold uint32
@ -186,13 +188,14 @@ func (m MethodIndex) String() string {
}
}
func newClientStatusMonitor(addr string, errorThreshold uint32) clientStatusMonitor {
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
methods := make([]*methodStatus, methodLast)
for i := methodBalanceGet; i < methodLast; i++ {
methods[i] = &methodStatus{name: i.String()}
}
return clientStatusMonitor{
logger: logger,
addr: addr,
healthy: atomic.NewBool(true),
errorThreshold: errorThreshold,
@ -224,6 +227,7 @@ type clientWrapper struct {
// wrapperPrm is params to create clientWrapper.
type wrapperPrm struct {
logger *zap.Logger
address string
key ecdsa.PrivateKey
dialTimeout time.Duration
@ -280,7 +284,7 @@ func newWrapper(prm wrapperPrm) *clientWrapper {
res := &clientWrapper{
client: &cl,
clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold),
clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold),
prm: prm,
}
@ -940,13 +944,20 @@ func (c *clientStatusMonitor) address() string {
func (c *clientStatusMonitor) incErrorRate() {
c.mu.Lock()
defer c.mu.Unlock()
c.currentErrorCount++
c.overallErrorCount++
if c.currentErrorCount >= c.errorThreshold {
thresholdReached := c.currentErrorCount >= c.errorThreshold
if thresholdReached {
c.setUnhealthy()
c.currentErrorCount = 0
}
c.mu.Unlock()
if thresholdReached && c.logger != nil {
c.logger.Warn("error threshold reached",
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
}
}
func (c *clientStatusMonitor) currentErrorRate() uint32 {
@ -1545,7 +1556,7 @@ const (
defaultSessionTokenExpirationDuration = 100 // in blocks
defaultErrorThreshold = 100
defaultRebalanceInterval = 25 * time.Second
defaultRebalanceInterval = 15 * time.Second
defaultHealthcheckTimeout = 4 * time.Second
defaultDialTimeout = 5 * time.Second
defaultStreamTimeout = 10 * time.Second
@ -1603,9 +1614,7 @@ func (p *Pool) Dial(ctx context.Context) error {
for j, addr := range params.addresses {
clients[j] = p.clientBuilder(addr)
if err := clients[j].dial(ctx); err != nil {
if p.logger != nil {
p.logger.Warn("failed to build client", zap.String("address", addr), zap.Error(err))
}
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
continue
}
@ -1613,10 +1622,8 @@ func (p *Pool) Dial(ctx context.Context) error {
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key)
if err != nil {
clients[j].setUnhealthy()
if p.logger != nil {
p.logger.Warn("failed to create frostfs session token for client",
p.log(zap.WarnLevel, "failed to create frostfs session token for client",
zap.String("address", addr), zap.Error(err))
}
continue
}
@ -1645,6 +1652,21 @@ func (p *Pool) Dial(ctx context.Context) error {
return nil
}
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
if p.logger == nil {
return
}
switch level {
case zap.DebugLevel:
p.logger.Debug(msg, fields...)
case zap.WarnLevel:
p.logger.Warn(msg, fields...)
case zap.ErrorLevel:
p.logger.Error(msg, fields...)
}
}
func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
if params.sessionExpirationDuration == 0 {
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
@ -1778,6 +1800,8 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
}
if changed {
p.log(zap.DebugLevel, "health has changed",
zap.String("address", cli.address()), zap.Bool("healthy", healthy))
healthyChanged.Store(true)
}
}(j, cli)

View file

@ -514,7 +514,7 @@ func TestWaitPresence(t *testing.T) {
}
func TestStatusMonitor(t *testing.T) {
monitor := newClientStatusMonitor("", 10)
monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
monitor.errorThreshold = 3
count := 10
@ -527,7 +527,7 @@ func TestStatusMonitor(t *testing.T) {
}
func TestHandleError(t *testing.T) {
monitor := newClientStatusMonitor("", 10)
monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
for i, tc := range []struct {
status apistatus.Status