Feature/6 monitoring pool node #7
3 changed files with 40 additions and 15 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"go.uber.org/zap"
|
||||
|
||||
sessionv2 "github.com/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
"github.com/TrueCloudLab/frostfs-sdk-go/accounting"
|
||||
|
@ -33,7 +34,7 @@ type mockClient struct {
|
|||
func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
|
||||
return &mockClient{
|
||||
key: key,
|
||||
clientStatusMonitor: newClientStatusMonitor(addr, 10),
|
||||
clientStatusMonitor: newClientStatusMonitor(zap.NewExample(), addr, 10),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
48
pool/pool.go
48
pool/pool.go
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
// client represents virtual connection to the single FrostFS network endpoint from which Pool is formed.
|
||||
|
@ -101,6 +102,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
|
|||
|
||||
// clientStatusMonitor count error rate and other statistics for connection.
|
||||
type clientStatusMonitor struct {
|
||||
logger *zap.Logger
|
||||
addr string
|
||||
healthy *atomic.Bool
|
||||
errorThreshold uint32
|
||||
|
@ -186,13 +188,14 @@ func (m MethodIndex) String() string {
|
|||
}
|
||||
}
|
||||
|
||||
func newClientStatusMonitor(addr string, errorThreshold uint32) clientStatusMonitor {
|
||||
func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint32) clientStatusMonitor {
|
||||
methods := make([]*methodStatus, methodLast)
|
||||
for i := methodBalanceGet; i < methodLast; i++ {
|
||||
methods[i] = &methodStatus{name: i.String()}
|
||||
}
|
||||
|
||||
return clientStatusMonitor{
|
||||
logger: logger,
|
||||
addr: addr,
|
||||
healthy: atomic.NewBool(true),
|
||||
errorThreshold: errorThreshold,
|
||||
|
@ -224,6 +227,7 @@ type clientWrapper struct {
|
|||
|
||||
// wrapperPrm is params to create clientWrapper.
|
||||
type wrapperPrm struct {
|
||||
logger *zap.Logger
|
||||
address string
|
||||
key ecdsa.PrivateKey
|
||||
dialTimeout time.Duration
|
||||
|
@ -280,7 +284,7 @@ func newWrapper(prm wrapperPrm) *clientWrapper {
|
|||
|
||||
res := &clientWrapper{
|
||||
client: &cl,
|
||||
clientStatusMonitor: newClientStatusMonitor(prm.address, prm.errorThreshold),
|
||||
clientStatusMonitor: newClientStatusMonitor(prm.logger, prm.address, prm.errorThreshold),
|
||||
prm: prm,
|
||||
}
|
||||
|
||||
|
@ -940,13 +944,20 @@ func (c *clientStatusMonitor) address() string {
|
|||
|
||||
func (c *clientStatusMonitor) incErrorRate() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.currentErrorCount++
|
||||
c.overallErrorCount++
|
||||
if c.currentErrorCount >= c.errorThreshold {
|
||||
|
||||
thresholdReached := c.currentErrorCount >= c.errorThreshold
|
||||
if thresholdReached {
|
||||
c.setUnhealthy()
|
||||
c.currentErrorCount = 0
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if thresholdReached && c.logger != nil {
|
||||
c.logger.Warn("error threshold reached",
|
||||
zap.String("address", c.addr), zap.Uint32("threshold", c.errorThreshold))
|
||||
}
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) currentErrorRate() uint32 {
|
||||
|
@ -1545,7 +1556,7 @@ const (
|
|||
defaultSessionTokenExpirationDuration = 100 // in blocks
|
||||
defaultErrorThreshold = 100
|
||||
|
||||
defaultRebalanceInterval = 25 * time.Second
|
||||
defaultRebalanceInterval = 15 * time.Second
|
||||
defaultHealthcheckTimeout = 4 * time.Second
|
||||
defaultDialTimeout = 5 * time.Second
|
||||
defaultStreamTimeout = 10 * time.Second
|
||||
|
@ -1603,9 +1614,7 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
for j, addr := range params.addresses {
|
||||
clients[j] = p.clientBuilder(addr)
|
||||
if err := clients[j].dial(ctx); err != nil {
|
||||
if p.logger != nil {
|
||||
p.logger.Warn("failed to build client", zap.String("address", addr), zap.Error(err))
|
||||
}
|
||||
p.log(zap.WarnLevel, "failed to build client", zap.String("address", addr), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1613,10 +1622,8 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
err := initSessionForDuration(ctx, &st, clients[j], p.rebalanceParams.sessionExpirationDuration, *p.key)
|
||||
if err != nil {
|
||||
clients[j].setUnhealthy()
|
||||
if p.logger != nil {
|
||||
p.logger.Warn("failed to create frostfs session token for client",
|
||||
zap.String("address", addr), zap.Error(err))
|
||||
}
|
||||
p.log(zap.WarnLevel, "failed to create frostfs session token for client",
|
||||
zap.String("address", addr), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1645,6 +1652,21 @@ func (p *Pool) Dial(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *Pool) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
if p.logger == nil {
|
||||
return
|
||||
}
|
||||
|
||||
switch level {
|
||||
case zap.DebugLevel:
|
||||
p.logger.Debug(msg, fields...)
|
||||
case zap.WarnLevel:
|
||||
p.logger.Warn(msg, fields...)
|
||||
case zap.ErrorLevel:
|
||||
p.logger.Error(msg, fields...)
|
||||
}
|
||||
}
|
||||
|
||||
func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
|
||||
if params.sessionExpirationDuration == 0 {
|
||||
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
|
||||
|
@ -1778,6 +1800,8 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
|
|||
}
|
||||
|
||||
if changed {
|
||||
p.log(zap.DebugLevel, "health has changed",
|
||||
zap.String("address", cli.address()), zap.Bool("healthy", healthy))
|
||||
healthyChanged.Store(true)
|
||||
}
|
||||
}(j, cli)
|
||||
|
|
|
@ -514,7 +514,7 @@ func TestWaitPresence(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestStatusMonitor(t *testing.T) {
|
||||
monitor := newClientStatusMonitor("", 10)
|
||||
monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
|
||||
monitor.errorThreshold = 3
|
||||
|
||||
count := 10
|
||||
|
@ -527,7 +527,7 @@ func TestStatusMonitor(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHandleError(t *testing.T) {
|
||||
monitor := newClientStatusMonitor("", 10)
|
||||
monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
|
||||
|
||||
for i, tc := range []struct {
|
||||
status apistatus.Status
|
||||
|
|
Loading…
Reference in a new issue