[TrueCloudLab#6] pool: Log node healthy changing

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-01-11 15:48:42 +03:00 committed by Alex Vanin
parent dd88a5c5e0
commit 54696acf48
3 changed files with 39 additions and 14 deletions

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"go.uber.org/zap"
sessionv2 "github.com/TrueCloudLab/frostfs-api-go/v2/session" sessionv2 "github.com/TrueCloudLab/frostfs-api-go/v2/session"
"github.com/TrueCloudLab/frostfs-sdk-go/accounting" "github.com/TrueCloudLab/frostfs-sdk-go/accounting"
@ -33,7 +34,7 @@ type mockClient struct {
func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient { func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
return &mockClient{ return &mockClient{
key: key, key: key,
clientStatusMonitor: newClientStatusMonitor(addr, 10), clientStatusMonitor: newClientStatusMonitor(zap.NewExample(), addr, 10),
} }
} }

View file

@ -31,6 +31,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zapcore"
) )
// client represents virtual connection to the single FrostFS network endpoint from which Pool is formed. // client represents virtual connection to the single FrostFS network endpoint from which Pool is formed.
@ -101,6 +102,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
// clientStatusMonitor count error rate and other statistics for connection. // clientStatusMonitor count error rate and other statistics for connection.
type clientStatusMonitor struct { type clientStatusMonitor struct {
logger *zap.Logger
addr string addr string
healthy *atomic.Bool healthy *atomic.Bool
errorThreshold uint32 errorThreshold uint32
@ -186,13 +188,14 @@ func (m MethodIndex) String() string {
} }
} }
func newClientStatusMonitor(addr string, errorThreshold uint32) clientStatusMonitor { func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
methods := make([]*methodStatus, methodLast) methods := make([]*methodStatus, methodLast)
for i := methodBalanceGet; i < methodLast; i++ { for i := methodBalanceGet; i < methodLast; i++ {
methods[i] = &methodStatus{name: i.String()} methods[i] = &methodStatus{name: i.String()}
} }
return clientStatusMonitor{ return clientStatusMonitor{
logger: logger,
addr: addr, addr: addr,
healthy: atomic.NewBool(true), healthy: atomic.NewBool(true),
errorThreshold: errorThreshold, errorThreshold: errorThreshold,
@ -224,6 +227,7 @@ type clientWrapper struct {
// wrapperPrm is params to create clientWrapper. // wrapperPrm is params to create clientWrapper.
type wrapperPrm struct { type wrapperPrm struct {
logger *zap.Logger
address string address string
key ecdsa.PrivateKey key ecdsa.PrivateKey
dialTimeout time.Duration dialTimeout time.Duration
@ -280,7 +284,7 @@ func newWrapper(prm wrapperPrm) *clientWrapper {
res := &clientWrapper{ res := &clientWrapper{
client: &cl, client: &cl,
clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold), clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold),
prm: prm, prm: prm,
} }
@ -940,13 +944,20 @@ func (c *clientStatusMonitor) address() string {
func (c *clientStatusMonitor) incErrorRate() { func (c *clientStatusMonitor) incErrorRate() {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock()
c.currentErrorCount++ c.currentErrorCount++
c.overallErrorCount++ c.overallErrorCount++
if c.currentErrorCount >= c.errorThreshold {
thresholdReached := c.currentErrorCount >= c.errorThreshold
if thresholdReached {
c.setUnhealthy() c.setUnhealthy()
c.currentErrorCount = 0 c.currentErrorCount = 0
} }
c.mu.Unlock()
if thresholdReached && c.logger != nil {
c.logger.Warn("error threshold reached",
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
}
} }
func (c *clientStatusMonitor) currentErrorRate() uint32 { func (c *clientStatusMonitor) currentErrorRate() uint32 {
@ -1603,9 +1614,7 @@ func (p *Pool) Dial(ctx context.Context) error {
for j, addr := range params.addresses { for j, addr := range params.addresses {
clients[j] = p.clientBuilder(addr) clients[j] = p.clientBuilder(addr)
if err := clients[j].dial(ctx); err != nil { if err := clients[j].dial(ctx); err != nil {
if p.logger != nil { p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
p.logger.Warn("failed to build client", zap.String("address", addr), zap.Error(err))
}
continue continue
} }
@ -1613,10 +1622,8 @@ func (p *Pool) Dial(ctx context.Context) error {
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key) err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key)
if err != nil { if err != nil {
clients[j].setUnhealthy() clients[j].setUnhealthy()
if p.logger != nil { p.log(zap.WarnLevel, "failed to create frostfs session token for client",
p.logger.Warn("failed to create frostfs session token for client", zap.String("address", addr), zap.Error(err))
zap.String("address", addr), zap.Error(err))
}
continue continue
} }
@ -1645,6 +1652,21 @@ func (p *Pool) Dial(ctx context.Context) error {
return nil return nil
} }
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
if p.logger == nil {
return
}
switch level {
case zap.DebugLevel:
p.logger.Debug(msg, fields...)
case zap.WarnLevel:
p.logger.Warn(msg, fields...)
case zap.ErrorLevel:
p.logger.Error(msg, fields...)
}
}
func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
if params.sessionExpirationDuration == 0 { if params.sessionExpirationDuration == 0 {
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
@ -1778,6 +1800,8 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
} }
if changed { if changed {
p.log(zap.DebugLevel, "health has changed",
zap.String("address", cli.address()), zap.Bool("healthy", healthy))
healthyChanged.Store(true) healthyChanged.Store(true)
} }
}(j, cli) }(j, cli)

View file

@ -514,7 +514,7 @@ func TestWaitPresence(t *testing.T) {
} }
func TestStatusMonitor(t *testing.T) { func TestStatusMonitor(t *testing.T) {
monitor := newClientStatusMonitor("", 10) monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
monitor.errorThreshold = 3 monitor.errorThreshold = 3
count := 10 count := 10
@ -527,7 +527,7 @@ func TestStatusMonitor(t *testing.T) {
} }
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {
monitor := newClientStatusMonitor("", 10) monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
for i, tc := range []struct { for i, tc := range []struct {
status apistatus.Status status apistatus.Status