From 54696acf48e9c6352f35b34b03589c4b710a03f7 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 11 Jan 2023 15:48:42 +0300 Subject: [PATCH] [TrueCloudLab#6] pool: Log node healthy changing Signed-off-by: Denis Kirillov --- pool/mock_test.go | 3 ++- pool/pool.go | 46 +++++++++++++++++++++++++++++++++++----------- pool/pool_test.go | 4 ++-- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/pool/mock_test.go b/pool/mock_test.go index da55a97a..632d3426 100644 --- a/pool/mock_test.go +++ b/pool/mock_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "errors" + "go.uber.org/zap" sessionv2 "github.com/TrueCloudLab/frostfs-api-go/v2/session" "github.com/TrueCloudLab/frostfs-sdk-go/accounting" @@ -33,7 +34,7 @@ type mockClient struct { func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient { return &mockClient{ key: key, - clientStatusMonitor: newClientStatusMonitor(addr, 10), + clientStatusMonitor: newClientStatusMonitor(zap.NewExample(), addr, 10), } } diff --git a/pool/pool.go b/pool/pool.go index 2bb45990..887abc8a 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -31,6 +31,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "go.uber.org/atomic" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // client represents virtual connection to the single FrostFS network endpoint from which Pool is formed. @@ -101,6 +102,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy") // clientStatusMonitor count error rate and other statistics for connection. type clientStatusMonitor struct { + logger *zap.Logger addr string healthy *atomic.Bool errorThreshold uint32 @@ -186,13 +188,14 @@ func (m MethodIndex) String() string { } } -func newClientStatusMonitor(addr string, errorThreshold uint32) clientStatusMonitor { +func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor { methods := make([]*methodStatus, methodLast) for i := methodBalanceGet; i < methodLast; i++ { methods[i] = &methodStatus{name: i.String()} } return clientStatusMonitor{ + logger: logger, addr: addr, healthy: atomic.NewBool(true), errorThreshold: errorThreshold, @@ -224,6 +227,7 @@ type clientWrapper struct { // wrapperPrm is params to create clientWrapper. type wrapperPrm struct { + logger *zap.Logger address string key ecdsa.PrivateKey dialTimeout time.Duration @@ -280,7 +284,7 @@ func newWrapper(prm wrapperPrm) *clientWrapper { res := &clientWrapper{ client: &cl, - clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold), + clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold), prm: prm, } @@ -940,13 +944,20 @@ func (c *clientStatusMonitor) address() string { func (c *clientStatusMonitor) incErrorRate() { c.mu.Lock() - defer c.mu.Unlock() c.currentErrorCount++ c.overallErrorCount++ - if c.currentErrorCount >= c.errorThreshold { + + thresholdReached := c.currentErrorCount >= c.errorThreshold + if thresholdReached { c.setUnhealthy() c.currentErrorCount = 0 } + c.mu.Unlock() + + if thresholdReached && c.logger != nil { + c.logger.Warn("error threshold reached", + zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold)) + } } func (c *clientStatusMonitor) currentErrorRate() uint32 { @@ -1603,9 +1614,7 @@ func (p *Pool) Dial(ctx context.Context) error { for j, addr := range params.addresses { clients[j] = p.clientBuilder(addr) if err := clients[j].dial(ctx); err != nil { - if p.logger != nil { - p.logger.Warn("failed to build client", zap.String("address", addr), zap.Error(err)) - } + p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err)) continue } @@ -1613,10 +1622,8 @@ func (p *Pool) Dial(ctx context.Context) error { err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key) if err != nil { clients[j].setUnhealthy() - if p.logger != nil { - p.logger.Warn("failed to create frostfs session token for client", - zap.String("address", addr), zap.Error(err)) - } + p.log(zap.WarnLevel, "failed to create frostfs session token for client", + zap.String("address", addr), zap.Error(err)) continue } @@ -1645,6 +1652,21 @@ func (p *Pool) Dial(ctx context.Context) error { return nil } +func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) { + if p.logger == nil { + return + } + + switch level { + case zap.DebugLevel: + p.logger.Debug(msg, fields...) + case zap.WarnLevel: + p.logger.Warn(msg, fields...) + case zap.ErrorLevel: + p.logger.Error(msg, fields...) + } +} + func fillDefaultInitParams(params *InitParameters, cache *sessionCache) { if params.sessionExpirationDuration == 0 { params.sessionExpirationDuration = defaultSessionTokenExpirationDuration @@ -1778,6 +1800,8 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights } if changed { + p.log(zap.DebugLevel, "health has changed", + zap.String("address", cli.address()), zap.Bool("healthy", healthy)) healthyChanged.Store(true) } }(j, cli) diff --git a/pool/pool_test.go b/pool/pool_test.go index 0ec94e8a..29a2cdaf 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -514,7 +514,7 @@ func TestWaitPresence(t *testing.T) { } func TestStatusMonitor(t *testing.T) { - monitor := newClientStatusMonitor("", 10) + monitor := newClientStatusMonitor(zap.NewExample(), "", 10) monitor.errorThreshold = 3 count := 10 @@ -527,7 +527,7 @@ func TestStatusMonitor(t *testing.T) { } func TestHandleError(t *testing.T) { - monitor := newClientStatusMonitor("", 10) + monitor := newClientStatusMonitor(zap.NewExample(), "", 10) for i, tc := range []struct { status apistatus.Status