[#283] pool: Add error threshold

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-07-13 09:57:04 +03:00 committed by fyrchik
parent 9d3a1835d1
commit 99e185690e
4 changed files with 144 additions and 68 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
sessionv2 "github.com/nspcc-dev/neofs-api-go/v2/session" sessionv2 "github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-sdk-go/accounting" "github.com/nspcc-dev/neofs-sdk-go/accounting"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/container" "github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
@ -16,30 +17,29 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/session"
"go.uber.org/atomic"
) )
type mockClient struct { type mockClient struct {
key ecdsa.PrivateKey key ecdsa.PrivateKey
addr string *clientStatusMonitor
healthy *atomic.Bool
errorCount *atomic.Uint32
errorOnCreateSession bool errorOnCreateSession bool
errorOnEndpointInfo bool errorOnEndpointInfo bool
errorOnNetworkInfo bool errorOnNetworkInfo bool
errorOnGetObject error stOnGetObject apistatus.Status
} }
func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient { func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
return &mockClient{ return &mockClient{
key: key, key: key,
addr: addr, clientStatusMonitor: newTestStatusMonitor(addr),
healthy: atomic.NewBool(true),
errorCount: atomic.NewUint32(0),
} }
} }
func (m *mockClient) setThreshold(threshold uint32) {
m.errorThreshold = threshold
}
func (m *mockClient) errOnCreateSession() { func (m *mockClient) errOnCreateSession() {
m.errorOnCreateSession = true m.errorOnCreateSession = true
} }
@ -52,8 +52,8 @@ func (m *mockClient) errOnNetworkInfo() {
m.errorOnEndpointInfo = true m.errorOnEndpointInfo = true
} }
func (m *mockClient) errOnGetObject(err error) { func (m *mockClient) statusOnGetObject(st apistatus.Status) {
m.errorOnGetObject = err m.stOnGetObject = st
} }
func newToken(key ecdsa.PrivateKey) *session.Object { func newToken(key ecdsa.PrivateKey) *session.Object {
@ -95,7 +95,7 @@ func (m *mockClient) containerSetEACL(context.Context, PrmContainerSetEACL) erro
func (m *mockClient) endpointInfo(context.Context, prmEndpointInfo) (*netmap.NodeInfo, error) { func (m *mockClient) endpointInfo(context.Context, prmEndpointInfo) (*netmap.NodeInfo, error) {
if m.errorOnEndpointInfo { if m.errorOnEndpointInfo {
return nil, errors.New("error") return nil, m.handleError(nil, errors.New("error"))
} }
var ni netmap.NodeInfo var ni netmap.NodeInfo
@ -105,7 +105,7 @@ func (m *mockClient) endpointInfo(context.Context, prmEndpointInfo) (*netmap.Nod
func (m *mockClient) networkInfo(context.Context, prmNetworkInfo) (*netmap.NetworkInfo, error) { func (m *mockClient) networkInfo(context.Context, prmNetworkInfo) (*netmap.NetworkInfo, error) {
if m.errorOnNetworkInfo { if m.errorOnNetworkInfo {
return nil, errors.New("error") return nil, m.handleError(nil, errors.New("error"))
} }
var ni netmap.NetworkInfo var ni netmap.NetworkInfo
@ -121,7 +121,12 @@ func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {
} }
func (m *mockClient) objectGet(context.Context, PrmObjectGet) (*ResGetObject, error) { func (m *mockClient) objectGet(context.Context, PrmObjectGet) (*ResGetObject, error) {
return &ResGetObject{}, m.errorOnGetObject if m.stOnGetObject == nil {
return &ResGetObject{}, nil
}
status := apistatus.ErrFromStatus(m.stOnGetObject)
return &ResGetObject{}, m.handleError(status, nil)
} }
func (m *mockClient) objectHead(context.Context, PrmObjectHead) (*object.Object, error) { func (m *mockClient) objectHead(context.Context, PrmObjectHead) (*object.Object, error) {
@ -138,7 +143,7 @@ func (m *mockClient) objectSearch(context.Context, PrmObjectSearch) (*ResObjectS
func (m *mockClient) sessionCreate(context.Context, prmCreateSession) (*resCreateSession, error) { func (m *mockClient) sessionCreate(context.Context, prmCreateSession) (*resCreateSession, error) {
if m.errorOnCreateSession { if m.errorOnCreateSession {
return nil, errors.New("error") return nil, m.handleError(nil, errors.New("error"))
} }
tok := newToken(m.key) tok := newToken(m.key)
@ -151,23 +156,3 @@ func (m *mockClient) sessionCreate(context.Context, prmCreateSession) (*resCreat
sessionKey: v2tok.GetBody().GetSessionKey(), sessionKey: v2tok.GetBody().GetSessionKey(),
}, nil }, nil
} }
func (m *mockClient) isHealthy() bool {
return m.healthy.Load()
}
func (m *mockClient) setHealthy(b bool) bool {
return m.healthy.Swap(b) != b
}
func (m *mockClient) address() string {
return m.addr
}
func (m *mockClient) errorRate() uint32 {
return m.errorCount.Load()
}
func (m *mockClient) resetErrorCounter() {
m.errorCount.Store(0)
}

View file

@ -51,6 +51,10 @@ type client interface {
objectSearch(context.Context, PrmObjectSearch) (*ResObjectSearch, error) objectSearch(context.Context, PrmObjectSearch) (*ResObjectSearch, error)
sessionCreate(context.Context, prmCreateSession) (*resCreateSession, error) sessionCreate(context.Context, prmCreateSession) (*resCreateSession, error)
clientStatus
}
type clientStatus interface {
isHealthy() bool isHealthy() bool
setHealthy(bool) bool setHealthy(bool) bool
address() string address() string
@ -58,19 +62,25 @@ type client interface {
resetErrorCounter() resetErrorCounter()
} }
type clientStatusMonitor struct {
addr string
healthy *atomic.Bool
errorCount *atomic.Uint32
errorThreshold uint32
}
// clientWrapper is used by default, alternative implementations are intended for testing purposes only. // clientWrapper is used by default, alternative implementations are intended for testing purposes only.
type clientWrapper struct { type clientWrapper struct {
client sdkClient.Client client sdkClient.Client
key ecdsa.PrivateKey key ecdsa.PrivateKey
addr string *clientStatusMonitor
healthy *atomic.Bool
errorCount *atomic.Uint32
} }
type wrapperPrm struct { type wrapperPrm struct {
address string address string
key ecdsa.PrivateKey key ecdsa.PrivateKey
timeout time.Duration timeout time.Duration
errorThreshold uint32
responseInfoCallback func(sdkClient.ResponseMetaInfo) error responseInfoCallback func(sdkClient.ResponseMetaInfo) error
} }
@ -86,6 +96,10 @@ func (x *wrapperPrm) setTimeout(timeout time.Duration) {
x.timeout = timeout x.timeout = timeout
} }
func (x *wrapperPrm) setErrorThreshold(threshold uint32) {
x.errorThreshold = threshold
}
func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) { func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) error) {
x.responseInfoCallback = f x.responseInfoCallback = f
} }
@ -97,10 +111,13 @@ func newWrapper(prm wrapperPrm) (*clientWrapper, error) {
prmInit.SetResponseInfoCallback(prm.responseInfoCallback) prmInit.SetResponseInfoCallback(prm.responseInfoCallback)
res := &clientWrapper{ res := &clientWrapper{
addr: prm.address,
key: prm.key, key: prm.key,
clientStatusMonitor: &clientStatusMonitor{
addr: prm.address,
healthy: atomic.NewBool(true), healthy: atomic.NewBool(true),
errorCount: atomic.NewUint32(0), errorCount: atomic.NewUint32(0),
errorThreshold: prm.errorThreshold,
},
} }
res.client.Init(prmInit) res.client.Init(prmInit)
@ -476,27 +493,27 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
}, nil }, nil
} }
func (c *clientWrapper) isHealthy() bool { func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load() return c.healthy.Load()
} }
func (c *clientWrapper) setHealthy(val bool) bool { func (c *clientStatusMonitor) setHealthy(val bool) bool {
return c.healthy.Swap(val) != val return c.healthy.Swap(val) != val
} }
func (c *clientWrapper) address() string { func (c *clientStatusMonitor) address() string {
return c.addr return c.addr
} }
func (c *clientWrapper) errorRate() uint32 { func (c *clientStatusMonitor) errorRate() uint32 {
return c.errorCount.Load() return c.errorCount.Load()
} }
func (c *clientWrapper) resetErrorCounter() { func (c *clientStatusMonitor) resetErrorCounter() {
c.errorCount.Store(0) c.errorCount.Store(0)
} }
func (c *clientWrapper) handleError(st apistatus.Status, err error) error { func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
if err != nil { if err != nil {
c.errorCount.Inc() c.errorCount.Inc()
return err return err
@ -504,10 +521,14 @@ func (c *clientWrapper) handleError(st apistatus.Status, err error) error {
err = apistatus.ErrFromStatus(st) err = apistatus.ErrFromStatus(st)
switch err.(type) { switch err.(type) {
case apistatus.ServerInternal, case apistatus.ServerInternal, *apistatus.ServerInternal,
apistatus.WrongMagicNumber, apistatus.WrongMagicNumber, *apistatus.WrongMagicNumber,
apistatus.SignatureVerification: apistatus.SignatureVerification, *apistatus.SignatureVerification:
c.errorCount.Inc() c.errorCount.Inc()
if c.errorCount.Load() >= c.errorThreshold {
c.setHealthy(false)
c.resetErrorCounter()
}
} }
return err return err
@ -521,6 +542,7 @@ type InitParameters struct {
healthcheckTimeout time.Duration healthcheckTimeout time.Duration
clientRebalanceInterval time.Duration clientRebalanceInterval time.Duration
sessionExpirationDuration uint64 sessionExpirationDuration uint64
errorThreshold uint32
nodeParams []NodeParam nodeParams []NodeParam
clientBuilder func(endpoint string) (client, error) clientBuilder func(endpoint string) (client, error)
@ -560,6 +582,11 @@ func (x *InitParameters) SetSessionExpirationDuration(expirationDuration uint64)
x.sessionExpirationDuration = expirationDuration x.sessionExpirationDuration = expirationDuration
} }
// SetErrorThreshold specifies the number of errors on connection after which node is considered as unhealthy.
func (x *InitParameters) SetErrorThreshold(threshold uint32) {
x.errorThreshold = threshold
}
// AddNode append information about the node to which you want to connect. // AddNode append information about the node to which you want to connect.
func (x *InitParameters) AddNode(nodeParam NodeParam) { func (x *InitParameters) AddNode(nodeParam NodeParam) {
x.nodeParams = append(x.nodeParams, nodeParam) x.nodeParams = append(x.nodeParams, nodeParam)
@ -996,6 +1023,7 @@ type innerPool struct {
const ( const (
defaultSessionTokenExpirationDuration = 100 // in blocks defaultSessionTokenExpirationDuration = 100 // in blocks
defaultErrorThreshold = 100
defaultRebalanceInterval = 25 * time.Second defaultRebalanceInterval = 25 * time.Second
defaultRequestTimeout = 4 * time.Second defaultRequestTimeout = 4 * time.Second
@ -1096,6 +1124,10 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
params.sessionExpirationDuration = defaultSessionTokenExpirationDuration params.sessionExpirationDuration = defaultSessionTokenExpirationDuration
} }
if params.errorThreshold == 0 {
params.errorThreshold = defaultErrorThreshold
}
if params.clientRebalanceInterval <= 0 { if params.clientRebalanceInterval <= 0 {
params.clientRebalanceInterval = defaultRebalanceInterval params.clientRebalanceInterval = defaultRebalanceInterval
} }
@ -1110,6 +1142,7 @@ func fillDefaultInitParams(params *InitParameters, cache *sessionCache) {
prm.setAddress(addr) prm.setAddress(addr)
prm.setKey(*params.key) prm.setKey(*params.key)
prm.setTimeout(params.nodeDialTimeout) prm.setTimeout(params.nodeDialTimeout)
prm.setErrorThreshold(params.errorThreshold)
prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error { prm.setResponseInfoCallback(func(info sdkClient.ResponseMetaInfo) error {
cache.updateEpoch(info.Epoch()) cache.updateEpoch(info.Epoch())
return nil return nil

View file

@ -270,7 +270,7 @@ func TestSessionCache(t *testing.T) {
clientBuilder := func(addr string) (client, error) { clientBuilder := func(addr string) (client, error) {
mockCli := newMockClient(addr, *key) mockCli := newMockClient(addr, *key)
mockCli.errOnGetObject(apistatus.SessionTokenNotFound{}) mockCli.statusOnGetObject(apistatus.SessionTokenNotFound{})
return mockCli, nil return mockCli, nil
} }
@ -508,16 +508,17 @@ func TestWaitPresence(t *testing.T) {
}) })
} }
func newTestWrapper(addr string) *clientWrapper { func newTestStatusMonitor(addr string) *clientStatusMonitor {
return &clientWrapper{ return &clientStatusMonitor{
addr: addr, addr: addr,
healthy: atomic.NewBool(true), healthy: atomic.NewBool(true),
errorCount: atomic.NewUint32(0), errorCount: atomic.NewUint32(0),
errorThreshold: 10,
} }
} }
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {
wrapper := newTestWrapper("") monitor := newTestStatusMonitor("")
for i, tc := range []struct { for i, tc := range []struct {
status apistatus.Status status apistatus.Status
@ -573,10 +574,16 @@ func TestHandleError(t *testing.T) {
expectedError: true, expectedError: true,
countError: true, countError: true,
}, },
{
status: &apistatus.SignatureVerification{},
err: nil,
expectedError: true,
countError: true,
},
} { } {
t.Run(strconv.Itoa(i), func(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) {
errCount := wrapper.errorCount.Load() errCount := monitor.errorRate()
err := wrapper.handleError(tc.status, tc.err) err := monitor.handleError(tc.status, tc.err)
if tc.expectedError { if tc.expectedError {
require.Error(t, err) require.Error(t, err)
} else { } else {
@ -585,7 +592,61 @@ func TestHandleError(t *testing.T) {
if tc.countError { if tc.countError {
errCount++ errCount++
} }
require.Equal(t, errCount, wrapper.errorCount.Load()) require.Equal(t, errCount, monitor.errorRate())
}) })
} }
} }
func TestSwitchAfterErrorThreshold(t *testing.T) {
nodes := []NodeParam{
{1, "peer0", 1},
{2, "peer1", 100},
}
errorThreshold := 5
var clientKeys []*ecdsa.PrivateKey
clientBuilder := func(addr string) (client, error) {
key := newPrivateKey(t)
clientKeys = append(clientKeys, key)
if addr == nodes[0].address {
mockCli := newMockClient(addr, *key)
mockCli.setThreshold(uint32(errorThreshold))
mockCli.statusOnGetObject(apistatus.ServerInternal{})
return mockCli, nil
}
return newMockClient(addr, *key), nil
}
opts := InitParameters{
key: newPrivateKey(t),
nodeParams: nodes,
clientRebalanceInterval: 30 * time.Second,
clientBuilder: clientBuilder,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := NewPool(opts)
require.NoError(t, err)
err = pool.Dial(ctx)
require.NoError(t, err)
t.Cleanup(pool.Close)
for i := 0; i < errorThreshold; i++ {
conn, err := pool.connection()
require.NoError(t, err)
require.Equal(t, nodes[0].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})
require.Error(t, err)
}
conn, err := pool.connection()
require.NoError(t, err)
require.Equal(t, nodes[1].address, conn.address())
_, err = conn.objectGet(ctx, PrmObjectGet{})
require.NoError(t, err)
}

View file

@ -8,7 +8,6 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/atomic"
) )
func TestSamplerStability(t *testing.T) { func TestSamplerStability(t *testing.T) {
@ -64,9 +63,7 @@ func newNetmapMock(name string, needErr bool) *clientMock {
} }
return &clientMock{ return &clientMock{
clientWrapper: clientWrapper{ clientWrapper: clientWrapper{
addr: "", clientStatusMonitor: newTestStatusMonitor(""),
healthy: atomic.NewBool(true),
errorCount: atomic.NewUint32(0),
}, },
name: name, name: name,
err: err, err: err,