forked from TrueCloudLab/frostfs-sdk-go
Compare commits
10 commits
d00892f418
...
ce086fc234
Author | SHA1 | Date | |
---|---|---|---|
ce086fc234 | |||
5361f0eceb | |||
05aa3becae | |||
79f387317a | |||
3ea4741231 | |||
d7872061f8 | |||
99c5c58365 | |||
4c310ae1c7 | |||
997346ef95 | |||
7f6eda566a |
10 changed files with 439 additions and 130 deletions
|
@ -100,20 +100,27 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error {
|
|||
|
||||
c.setFrostFSAPIServer((*coreServer)(&c.c))
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, prm.DialTimeout)
|
||||
defer cancel()
|
||||
_, err := rpc.Balance(&c.c, new(v2accounting.BalanceRequest),
|
||||
client.WithContext(ctx),
|
||||
)
|
||||
if err != nil {
|
||||
var ctxErr error
|
||||
|
||||
// return context errors since they signal about dial problem
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return err
|
||||
ctxErr = err
|
||||
} else if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled {
|
||||
ctxErr = context.Canceled
|
||||
} else if ok && st.Code() == codes.DeadlineExceeded {
|
||||
ctxErr = context.DeadlineExceeded
|
||||
}
|
||||
st, ok := status.FromError(err)
|
||||
if ok && st.Code() == codes.Canceled {
|
||||
return context.Canceled
|
||||
}
|
||||
if ok && st.Code() == codes.DeadlineExceeded {
|
||||
return context.DeadlineExceeded
|
||||
if ctxErr != nil {
|
||||
if conn := c.c.Conn(); conn != nil {
|
||||
_ = conn.Close()
|
||||
}
|
||||
return ctxErr
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -220,7 +220,7 @@ func (x *ObjectListReader) Close() (*ResObjectSearch, error) {
|
|||
defer x.cancelCtxStream()
|
||||
|
||||
if x.err != nil && !errors.Is(x.err, io.EOF) {
|
||||
return nil, x.err
|
||||
return &x.res, x.err
|
||||
}
|
||||
|
||||
return &x.res, nil
|
||||
|
|
|
@ -114,7 +114,7 @@ func TestNodeUnderMaintenance(t *testing.T) {
|
|||
|
||||
stV2 := st.ToStatusV2()
|
||||
|
||||
require.Empty(t, "", stV2.Message())
|
||||
require.Equal(t, "node is under maintenance", stV2.Message())
|
||||
})
|
||||
|
||||
t.Run("non-empty to V2", func(t *testing.T) {
|
||||
|
|
|
@ -61,6 +61,24 @@ func TestToStatusV2(t *testing.T) {
|
|||
}),
|
||||
codeV2: 1025,
|
||||
},
|
||||
{
|
||||
status: (statusConstructor)(func() apistatus.Status {
|
||||
return new(apistatus.SignatureVerification)
|
||||
}),
|
||||
codeV2: 1026,
|
||||
},
|
||||
{
|
||||
status: (statusConstructor)(func() apistatus.Status {
|
||||
return new(apistatus.NodeUnderMaintenance)
|
||||
}),
|
||||
codeV2: 1027,
|
||||
},
|
||||
{
|
||||
status: (statusConstructor)(func() apistatus.Status {
|
||||
return new(apistatus.InvalidArgument)
|
||||
}),
|
||||
codeV2: 1028,
|
||||
},
|
||||
{
|
||||
status: (statusConstructor)(func() apistatus.Status {
|
||||
return new(apistatus.ObjectLocked)
|
||||
|
@ -131,18 +149,6 @@ func TestToStatusV2(t *testing.T) {
|
|||
}),
|
||||
codeV2: 5120,
|
||||
},
|
||||
{
|
||||
status: (statusConstructor)(func() apistatus.Status {
|
||||
return new(apistatus.NodeUnderMaintenance)
|
||||
}),
|
||||
codeV2: 1027,
|
||||
},
|
||||
{
|
||||
status: (statusConstructor)(func() apistatus.Status {
|
||||
return new(apistatus.InvalidArgument)
|
||||
}),
|
||||
codeV2: 1028,
|
||||
},
|
||||
} {
|
||||
var st apistatus.Status
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-sdk-go
|
|||
go 1.22
|
||||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241002064811-3e705a3cbe84
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241011114054-f0fc40e116d1
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0
|
||||
|
|
4
go.sum
4
go.sum
|
@ -1,5 +1,5 @@
|
|||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241002064811-3e705a3cbe84 h1:enycv8Uaji5Ic1+hk+F4BpYOQKV5U5t8A9CV8AmU2+M=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241002064811-3e705a3cbe84/go.mod h1:F5GS7hRb62PUy5sTYDC4ajVdeffoAfjHSSHTKUJEaYU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241011114054-f0fc40e116d1 h1:ivcdxQeQDnx4srF2ezoaeVlF0FAycSAztwfIUJnUI4s=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241011114054-f0fc40e116d1/go.mod h1:F5GS7hRb62PUy5sTYDC4ajVdeffoAfjHSSHTKUJEaYU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e h1:kcBqZBiFIUBATUqEuvVigtkJJWQ2Gug/eYXn967o3M4=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e/go.mod h1:F/fe1OoIDKr5Bz99q4sriuHDuf3aZefZy9ZsCqEtgxc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
||||
|
|
|
@ -26,11 +26,15 @@ type mockClient struct {
|
|||
|
||||
errorOnDial bool
|
||||
errorOnCreateSession bool
|
||||
errorOnEndpointInfo bool
|
||||
errorOnEndpointInfo error
|
||||
resOnEndpointInfo netmap.NodeInfo
|
||||
healthcheckFn func()
|
||||
errorOnNetworkInfo bool
|
||||
stOnGetObject apistatus.Status
|
||||
}
|
||||
|
||||
var _ client = (*mockClient)(nil)
|
||||
|
||||
func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
|
||||
return &mockClient{
|
||||
key: key,
|
||||
|
@ -38,6 +42,16 @@ func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
|
|||
}
|
||||
}
|
||||
|
||||
func newMockClientHealthy(addr string, key ecdsa.PrivateKey, healthy bool) *mockClient {
|
||||
m := newMockClient(addr, key)
|
||||
if healthy {
|
||||
m.setHealthy()
|
||||
} else {
|
||||
m.setUnhealthy()
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *mockClient) setThreshold(threshold uint32) {
|
||||
m.errorThreshold = threshold
|
||||
}
|
||||
|
@ -47,11 +61,11 @@ func (m *mockClient) errOnCreateSession() {
|
|||
}
|
||||
|
||||
func (m *mockClient) errOnEndpointInfo() {
|
||||
m.errorOnEndpointInfo = true
|
||||
m.errorOnEndpointInfo = errors.New("error")
|
||||
}
|
||||
|
||||
func (m *mockClient) errOnNetworkInfo() {
|
||||
m.errorOnEndpointInfo = true
|
||||
m.errorOnEndpointInfo = errors.New("error")
|
||||
}
|
||||
|
||||
func (m *mockClient) errOnDial() {
|
||||
|
@ -94,27 +108,32 @@ func (m *mockClient) containerDelete(context.Context, PrmContainerDelete) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *mockClient) apeManagerAddChain(ctx context.Context, prm PrmAddAPEChain) error {
|
||||
func (m *mockClient) apeManagerAddChain(context.Context, PrmAddAPEChain) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockClient) apeManagerRemoveChain(ctx context.Context, prm PrmRemoveAPEChain) error {
|
||||
func (m *mockClient) apeManagerRemoveChain(context.Context, PrmRemoveAPEChain) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockClient) apeManagerListChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) {
|
||||
func (m *mockClient) apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error) {
|
||||
return []ape.Chain{}, nil
|
||||
}
|
||||
|
||||
func (m *mockClient) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) {
|
||||
var ni netmap.NodeInfo
|
||||
|
||||
if m.errorOnEndpointInfo {
|
||||
return ni, m.handleError(ctx, nil, errors.New("error"))
|
||||
if m.errorOnEndpointInfo != nil {
|
||||
return netmap.NodeInfo{}, m.handleError(ctx, nil, m.errorOnEndpointInfo)
|
||||
}
|
||||
|
||||
ni.SetNetworkEndpoints(m.addr)
|
||||
return ni, nil
|
||||
m.resOnEndpointInfo.SetNetworkEndpoints(m.addr)
|
||||
return m.resOnEndpointInfo, nil
|
||||
}
|
||||
|
||||
func (m *mockClient) healthcheck(ctx context.Context) (netmap.NodeInfo, error) {
|
||||
if m.healthcheckFn != nil {
|
||||
m.healthcheckFn()
|
||||
}
|
||||
return m.endpointInfo(ctx, prmEndpointInfo{})
|
||||
}
|
||||
|
||||
func (m *mockClient) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) {
|
||||
|
@ -190,16 +209,12 @@ func (m *mockClient) dial(context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *mockClient) restartIfUnhealthy(ctx context.Context) (changed bool, err error) {
|
||||
_, err = m.endpointInfo(ctx, prmEndpointInfo{})
|
||||
healthy := err == nil
|
||||
changed = healthy != m.isHealthy()
|
||||
if healthy {
|
||||
m.setHealthy()
|
||||
} else {
|
||||
m.setUnhealthy()
|
||||
func (m *mockClient) restart(context.Context) error {
|
||||
if m.errorOnDial {
|
||||
return errors.New("restart dial error")
|
||||
}
|
||||
return
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mockClient) close() error {
|
||||
|
|
175
pool/pool.go
175
pool/pool.go
|
@ -57,6 +57,8 @@ type client interface {
|
|||
apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error)
|
||||
// see clientWrapper.endpointInfo.
|
||||
endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error)
|
||||
// see clientWrapper.healthcheck.
|
||||
healthcheck(ctx context.Context) (netmap.NodeInfo, error)
|
||||
// see clientWrapper.networkInfo.
|
||||
networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error)
|
||||
// see clientWrapper.netMapSnapshot
|
||||
|
@ -82,8 +84,8 @@ type client interface {
|
|||
|
||||
// see clientWrapper.dial.
|
||||
dial(ctx context.Context) error
|
||||
// see clientWrapper.restartIfUnhealthy.
|
||||
restartIfUnhealthy(ctx context.Context) (bool, error)
|
||||
// see clientWrapper.restart.
|
||||
restart(ctx context.Context) error
|
||||
// see clientWrapper.close.
|
||||
close() error
|
||||
}
|
||||
|
@ -92,10 +94,10 @@ type client interface {
|
|||
type clientStatus interface {
|
||||
// isHealthy checks if the connection can handle requests.
|
||||
isHealthy() bool
|
||||
// isDialed checks if the connection was created.
|
||||
isDialed() bool
|
||||
// setUnhealthy marks client as unhealthy.
|
||||
setUnhealthy()
|
||||
// setHealthy marks client as healthy.
|
||||
setHealthy()
|
||||
// address return address of endpoint.
|
||||
address() string
|
||||
// currentErrorRate returns current errors rate.
|
||||
|
@ -126,15 +128,10 @@ type clientStatusMonitor struct {
|
|||
|
||||
// values for healthy status of clientStatusMonitor.
|
||||
const (
|
||||
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
|
||||
// so there is no connection to the endpoint, and pool should not close it
|
||||
// before re-establishing connection once again.
|
||||
statusUnhealthyOnDial = iota
|
||||
|
||||
// statusUnhealthyOnRequest is set when communication after dialing to the
|
||||
// endpoint is failed due to immediate or accumulated errors, connection is
|
||||
// available and pool should close it before re-establishing connection once again.
|
||||
statusUnhealthyOnRequest
|
||||
statusUnhealthyOnRequest = iota
|
||||
|
||||
// statusHealthy is set when connection is ready to be used by the pool.
|
||||
statusHealthy
|
||||
|
@ -233,6 +230,7 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
|
|||
type clientWrapper struct {
|
||||
clientMutex sync.RWMutex
|
||||
client *sdkClient.Client
|
||||
dialed bool
|
||||
prm wrapperPrm
|
||||
|
||||
clientStatusMonitor
|
||||
|
@ -342,30 +340,17 @@ func (c *clientWrapper) dial(ctx context.Context) error {
|
|||
GRPCDialOptions: c.prm.dialOptions,
|
||||
}
|
||||
|
||||
if err = cl.Dial(ctx, prmDial); err != nil {
|
||||
c.setUnhealthyOnDial()
|
||||
err = cl.Dial(ctx, prmDial)
|
||||
c.setDialed(err == nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
|
||||
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
|
||||
func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, err error) {
|
||||
var wasHealthy bool
|
||||
if _, err = c.endpointInfo(ctx, prmEndpointInfo{}); err == nil {
|
||||
return false, nil
|
||||
} else if !errors.Is(err, errPoolClientUnhealthy) {
|
||||
wasHealthy = true
|
||||
}
|
||||
|
||||
// if connection is dialed before, to avoid routine / connection leak,
|
||||
// pool has to close it and then initialize once again.
|
||||
if c.isDialed() {
|
||||
c.scheduleGracefulClose()
|
||||
}
|
||||
|
||||
// restart recreates and redial inner sdk client.
|
||||
func (c *clientWrapper) restart(ctx context.Context) error {
|
||||
var cl sdkClient.Client
|
||||
prmInit := sdkClient.PrmInit{
|
||||
Key: c.prm.key,
|
||||
|
@ -381,22 +366,35 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, e
|
|||
GRPCDialOptions: c.prm.dialOptions,
|
||||
}
|
||||
|
||||
if err = cl.Dial(ctx, prmDial); err != nil {
|
||||
c.setUnhealthyOnDial()
|
||||
return wasHealthy, err
|
||||
// if connection is dialed before, to avoid routine / connection leak,
|
||||
// pool has to close it and then initialize once again.
|
||||
if c.isDialed() {
|
||||
c.scheduleGracefulClose()
|
||||
}
|
||||
|
||||
err := cl.Dial(ctx, prmDial)
|
||||
c.setDialed(err == nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.clientMutex.Lock()
|
||||
c.client = &cl
|
||||
c.clientMutex.Unlock()
|
||||
|
||||
if _, err = cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}); err != nil {
|
||||
c.setUnhealthy()
|
||||
return wasHealthy, err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
c.setHealthy()
|
||||
return !wasHealthy, nil
|
||||
func (c *clientWrapper) isDialed() bool {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.dialed
|
||||
}
|
||||
|
||||
func (c *clientWrapper) setDialed(dialed bool) {
|
||||
c.mu.Lock()
|
||||
c.dialed = dialed
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
|
||||
|
@ -654,6 +652,15 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (ne
|
|||
return netmap.NodeInfo{}, err
|
||||
}
|
||||
|
||||
return c.endpointInfoRaw(ctx, cl)
|
||||
}
|
||||
|
||||
func (c *clientWrapper) healthcheck(ctx context.Context) (netmap.NodeInfo, error) {
|
||||
cl := c.getClientRaw()
|
||||
return c.endpointInfoRaw(ctx, cl)
|
||||
}
|
||||
|
||||
func (c *clientWrapper) endpointInfoRaw(ctx context.Context, cl *sdkClient.Client) (netmap.NodeInfo, error) {
|
||||
start := time.Now()
|
||||
res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
|
||||
c.incRequests(time.Since(start), methodEndpointInfo)
|
||||
|
@ -1085,7 +1092,7 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (
|
|||
return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err)
|
||||
}
|
||||
|
||||
return ResObjectSearch{r: res}, nil
|
||||
return ResObjectSearch{r: res, handleError: c.handleError}, nil
|
||||
}
|
||||
|
||||
// sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is.
|
||||
|
@ -1121,10 +1128,6 @@ func (c *clientStatusMonitor) isHealthy() bool {
|
|||
return c.healthy.Load() == statusHealthy
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) isDialed() bool {
|
||||
return c.healthy.Load() != statusUnhealthyOnDial
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) setHealthy() {
|
||||
c.healthy.Store(statusHealthy)
|
||||
}
|
||||
|
@ -1133,10 +1136,6 @@ func (c *clientStatusMonitor) setUnhealthy() {
|
|||
c.healthy.Store(statusUnhealthyOnRequest)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) setUnhealthyOnDial() {
|
||||
c.healthy.Store(statusUnhealthyOnDial)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) address() string {
|
||||
return c.addr
|
||||
}
|
||||
|
@ -1159,6 +1158,16 @@ func (c *clientStatusMonitor) incErrorRate() {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) incErrorRateToUnhealthy(err error) {
|
||||
c.mu.Lock()
|
||||
c.currentErrorCount = 0
|
||||
c.overallErrorCount++
|
||||
c.setUnhealthy()
|
||||
c.mu.Unlock()
|
||||
|
||||
c.log(zapcore.WarnLevel, "explicitly mark node unhealthy", zap.String("address", c.addr), zap.Error(err))
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
|
||||
if c.logger == nil {
|
||||
return
|
||||
|
@ -1201,6 +1210,9 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
|||
}
|
||||
|
||||
func (c *clientWrapper) close() error {
|
||||
if !c.isDialed() {
|
||||
return nil
|
||||
}
|
||||
if cl := c.getClientRaw(); cl != nil {
|
||||
return cl.Close()
|
||||
}
|
||||
|
@ -1225,9 +1237,10 @@ func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Stat
|
|||
switch stErr.(type) {
|
||||
case *apistatus.ServerInternal,
|
||||
*apistatus.WrongMagicNumber,
|
||||
*apistatus.SignatureVerification,
|
||||
*apistatus.NodeUnderMaintenance:
|
||||
*apistatus.SignatureVerification:
|
||||
c.incErrorRate()
|
||||
case *apistatus.NodeUnderMaintenance:
|
||||
c.incErrorRateToUnhealthy(stErr)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
|
@ -1239,7 +1252,11 @@ func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Stat
|
|||
|
||||
if err != nil {
|
||||
if needCountError(ctx, err) {
|
||||
c.incErrorRate()
|
||||
if sdkClient.IsErrNodeUnderMaintenance(err) {
|
||||
c.incErrorRateToUnhealthy(err)
|
||||
} else {
|
||||
c.incErrorRate()
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
|
@ -1261,7 +1278,7 @@ func needCountError(ctx context.Context, err error) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
if ctx != nil && errors.Is(ctx.Err(), context.Canceled) {
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -2138,7 +2155,9 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
|
|||
|
||||
// startRebalance runs loop to monitor connection healthy status.
|
||||
func (p *Pool) startRebalance(ctx context.Context) {
|
||||
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
|
||||
ticker := time.NewTicker(p.rebalanceParams.clientRebalanceInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
|
||||
for i, params := range p.rebalanceParams.nodesParams {
|
||||
buffers[i] = make([]float64, len(params.weights))
|
||||
|
@ -2188,7 +2207,7 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
|
|||
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
||||
defer c()
|
||||
|
||||
changed, err := cli.restartIfUnhealthy(tctx)
|
||||
changed, err := restartIfUnhealthy(tctx, cli)
|
||||
healthy := err == nil
|
||||
if healthy {
|
||||
bufferWeights[j] = options.nodesParams[i].weights[j]
|
||||
|
@ -2219,6 +2238,43 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
|
|||
}
|
||||
}
|
||||
|
||||
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
|
||||
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
|
||||
func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.setUnhealthy()
|
||||
} else {
|
||||
c.setHealthy()
|
||||
}
|
||||
}()
|
||||
|
||||
wasHealthy := c.isHealthy()
|
||||
|
||||
if res, err := c.healthcheck(ctx); err == nil {
|
||||
if res.Status().IsMaintenance() {
|
||||
return wasHealthy, new(apistatus.NodeUnderMaintenance)
|
||||
}
|
||||
|
||||
return !wasHealthy, nil
|
||||
}
|
||||
|
||||
if err = c.restart(ctx); err != nil {
|
||||
return wasHealthy, err
|
||||
}
|
||||
|
||||
res, err := c.healthcheck(ctx)
|
||||
if err != nil {
|
||||
return wasHealthy, err
|
||||
}
|
||||
|
||||
if res.Status().IsMaintenance() {
|
||||
return wasHealthy, new(apistatus.NodeUnderMaintenance)
|
||||
}
|
||||
|
||||
return !wasHealthy, nil
|
||||
}
|
||||
|
||||
func adjustWeights(weights []float64) []float64 {
|
||||
adjusted := make([]float64, len(weights))
|
||||
sum := 0.0
|
||||
|
@ -2708,18 +2764,25 @@ func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRa
|
|||
//
|
||||
// Must be initialized using Pool.SearchObjects, any other usage is unsafe.
|
||||
type ResObjectSearch struct {
|
||||
r *sdkClient.ObjectListReader
|
||||
r *sdkClient.ObjectListReader
|
||||
handleError func(context.Context, apistatus.Status, error) error
|
||||
}
|
||||
|
||||
// Read reads another list of the object identifiers.
|
||||
func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) {
|
||||
n, ok := x.r.Read(buf)
|
||||
if !ok {
|
||||
_, err := x.r.Close()
|
||||
res, err := x.r.Close()
|
||||
if err == nil {
|
||||
return n, io.EOF
|
||||
}
|
||||
|
||||
var status apistatus.Status
|
||||
if res != nil {
|
||||
status = res.Status()
|
||||
}
|
||||
err = x.handleError(nil, status, err)
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
|
@ -3017,9 +3080,7 @@ func (p *Pool) Close() {
|
|||
// close all clients
|
||||
for _, pools := range p.innerPools {
|
||||
for _, cli := range pools.clients {
|
||||
if cli.isDialed() {
|
||||
_ = cli.close()
|
||||
}
|
||||
_ = cli.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,12 +4,13 @@ import (
|
|||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"strconv"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
|
@ -17,6 +18,8 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
"go.uber.org/zap/zaptest/observer"
|
||||
)
|
||||
|
||||
func TestBuildPoolClientFailed(t *testing.T) {
|
||||
|
@ -230,6 +233,179 @@ func TestOneOfTwoFailed(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUpdateNodesHealth(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
key := newPrivateKey(t)
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
wasHealthy bool
|
||||
willHealthy bool
|
||||
prepareCli func(*mockClient)
|
||||
}{
|
||||
{
|
||||
name: "healthy, maintenance, unhealthy",
|
||||
wasHealthy: true,
|
||||
willHealthy: false,
|
||||
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Maintenance) },
|
||||
},
|
||||
{
|
||||
name: "unhealthy, maintenance, unhealthy",
|
||||
wasHealthy: false,
|
||||
willHealthy: false,
|
||||
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Maintenance) },
|
||||
},
|
||||
{
|
||||
name: "healthy, no error, healthy",
|
||||
wasHealthy: true,
|
||||
willHealthy: true,
|
||||
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Online) },
|
||||
},
|
||||
{
|
||||
name: "unhealthy, no error, healthy",
|
||||
wasHealthy: false,
|
||||
willHealthy: true,
|
||||
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Online) },
|
||||
},
|
||||
{
|
||||
name: "healthy, error, failed restart, unhealthy",
|
||||
wasHealthy: true,
|
||||
willHealthy: false,
|
||||
prepareCli: func(c *mockClient) {
|
||||
c.errOnEndpointInfo()
|
||||
c.errorOnDial = true
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unhealthy, error, failed restart, unhealthy",
|
||||
wasHealthy: false,
|
||||
willHealthy: false,
|
||||
prepareCli: func(c *mockClient) {
|
||||
c.errOnEndpointInfo()
|
||||
c.errorOnDial = true
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "healthy, error, restart, error, unhealthy",
|
||||
wasHealthy: true,
|
||||
willHealthy: false,
|
||||
prepareCli: func(c *mockClient) { c.errOnEndpointInfo() },
|
||||
},
|
||||
{
|
||||
name: "unhealthy, error, restart, error, unhealthy",
|
||||
wasHealthy: false,
|
||||
willHealthy: false,
|
||||
prepareCli: func(c *mockClient) { c.errOnEndpointInfo() },
|
||||
},
|
||||
{
|
||||
name: "healthy, error, restart, maintenance, unhealthy",
|
||||
wasHealthy: true,
|
||||
willHealthy: false,
|
||||
prepareCli: func(c *mockClient) {
|
||||
healthError := true
|
||||
c.healthcheckFn = func() {
|
||||
if healthError {
|
||||
c.errorOnEndpointInfo = errors.New("error")
|
||||
} else {
|
||||
c.errorOnEndpointInfo = nil
|
||||
c.resOnEndpointInfo.SetStatus(netmap.Maintenance)
|
||||
}
|
||||
healthError = !healthError
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unhealthy, error, restart, maintenance, unhealthy",
|
||||
wasHealthy: false,
|
||||
willHealthy: false,
|
||||
prepareCli: func(c *mockClient) {
|
||||
healthError := true
|
||||
c.healthcheckFn = func() {
|
||||
if healthError {
|
||||
c.errorOnEndpointInfo = errors.New("error")
|
||||
} else {
|
||||
c.errorOnEndpointInfo = nil
|
||||
c.resOnEndpointInfo.SetStatus(netmap.Maintenance)
|
||||
}
|
||||
healthError = !healthError
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "healthy, error, restart, healthy",
|
||||
wasHealthy: true,
|
||||
willHealthy: true,
|
||||
prepareCli: func(c *mockClient) {
|
||||
healthError := true
|
||||
c.healthcheckFn = func() {
|
||||
if healthError {
|
||||
c.errorOnEndpointInfo = errors.New("error")
|
||||
} else {
|
||||
c.errorOnEndpointInfo = nil
|
||||
}
|
||||
healthError = !healthError
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unhealthy, error, restart, healthy",
|
||||
wasHealthy: false,
|
||||
willHealthy: true,
|
||||
prepareCli: func(c *mockClient) {
|
||||
healthError := true
|
||||
c.healthcheckFn = func() {
|
||||
if healthError {
|
||||
c.errorOnEndpointInfo = errors.New("error")
|
||||
} else {
|
||||
c.errorOnEndpointInfo = nil
|
||||
}
|
||||
healthError = !healthError
|
||||
}
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cli := newMockClientHealthy("peer0", *key, tc.wasHealthy)
|
||||
tc.prepareCli(cli)
|
||||
p, log := newPool(t, cli)
|
||||
|
||||
p.updateNodesHealth(ctx, [][]float64{{1}})
|
||||
|
||||
changed := tc.wasHealthy != tc.willHealthy
|
||||
require.Equalf(t, tc.willHealthy, cli.isHealthy(), "healthy status should be: %v", tc.willHealthy)
|
||||
require.Equalf(t, changed, 1 == log.Len(), "healthy status should be changed: %v", changed)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newPool(t *testing.T, cli *mockClient) (*Pool, *observer.ObservedLogs) {
|
||||
log, observedLog := getObservedLogger()
|
||||
|
||||
cache, err := newCache(0)
|
||||
require.NoError(t, err)
|
||||
|
||||
return &Pool{
|
||||
innerPools: []*innerPool{{
|
||||
sampler: newSampler([]float64{1}, rand.NewSource(0)),
|
||||
clients: []client{cli},
|
||||
}},
|
||||
cache: cache,
|
||||
key: newPrivateKey(t),
|
||||
closedCh: make(chan struct{}),
|
||||
rebalanceParams: rebalanceParameters{
|
||||
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
|
||||
nodeRequestTimeout: time.Second,
|
||||
clientRebalanceInterval: 200 * time.Millisecond,
|
||||
},
|
||||
logger: log,
|
||||
}, observedLog
|
||||
}
|
||||
|
||||
func getObservedLogger() (*zap.Logger, *observer.ObservedLogs) {
|
||||
loggerCore, observedLog := observer.New(zap.DebugLevel)
|
||||
return zap.New(loggerCore), observedLog
|
||||
}
|
||||
|
||||
func TestTwoFailed(t *testing.T) {
|
||||
var clientKeys []*ecdsa.PrivateKey
|
||||
mockClientBuilder := func(addr string) client {
|
||||
|
@ -529,13 +705,6 @@ func TestStatusMonitor(t *testing.T) {
|
|||
isHealthy bool
|
||||
description string
|
||||
}{
|
||||
{
|
||||
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
|
||||
status: statusUnhealthyOnDial,
|
||||
isDialed: false,
|
||||
isHealthy: false,
|
||||
description: "set unhealthy on dial",
|
||||
},
|
||||
{
|
||||
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
|
||||
status: statusUnhealthyOnRequest,
|
||||
|
@ -554,7 +723,6 @@ func TestStatusMonitor(t *testing.T) {
|
|||
for _, tc := range cases {
|
||||
tc.action(&monitor)
|
||||
require.Equal(t, tc.status, monitor.healthy.Load())
|
||||
require.Equal(t, tc.isDialed, monitor.isDialed())
|
||||
require.Equal(t, tc.isHealthy, monitor.isHealthy())
|
||||
}
|
||||
})
|
||||
|
@ -562,19 +730,22 @@ func TestStatusMonitor(t *testing.T) {
|
|||
|
||||
func TestHandleError(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
|
||||
log := zaptest.NewLogger(t)
|
||||
|
||||
canceledCtx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
for i, tc := range []struct {
|
||||
ctx context.Context
|
||||
status apistatus.Status
|
||||
err error
|
||||
expectedError bool
|
||||
countError bool
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
ctx context.Context
|
||||
status apistatus.Status
|
||||
err error
|
||||
expectedError bool
|
||||
countError bool
|
||||
markedUnhealthy bool
|
||||
}{
|
||||
{
|
||||
name: "no error, no status",
|
||||
ctx: ctx,
|
||||
status: nil,
|
||||
err: nil,
|
||||
|
@ -582,6 +753,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: false,
|
||||
},
|
||||
{
|
||||
name: "no error, success status",
|
||||
ctx: ctx,
|
||||
status: new(apistatus.SuccessDefaultV2),
|
||||
err: nil,
|
||||
|
@ -589,6 +761,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: false,
|
||||
},
|
||||
{
|
||||
name: "error, success status",
|
||||
ctx: ctx,
|
||||
status: new(apistatus.SuccessDefaultV2),
|
||||
err: errors.New("error"),
|
||||
|
@ -596,6 +769,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: true,
|
||||
},
|
||||
{
|
||||
name: "error, no status",
|
||||
ctx: ctx,
|
||||
status: nil,
|
||||
err: errors.New("error"),
|
||||
|
@ -603,6 +777,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: true,
|
||||
},
|
||||
{
|
||||
name: "no error, object not found status",
|
||||
ctx: ctx,
|
||||
status: new(apistatus.ObjectNotFound),
|
||||
err: nil,
|
||||
|
@ -610,6 +785,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: false,
|
||||
},
|
||||
{
|
||||
name: "object not found error, object not found status",
|
||||
ctx: ctx,
|
||||
status: new(apistatus.ObjectNotFound),
|
||||
err: &apistatus.ObjectNotFound{},
|
||||
|
@ -617,6 +793,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: false,
|
||||
},
|
||||
{
|
||||
name: "eacl not found error, no status",
|
||||
ctx: ctx,
|
||||
status: nil,
|
||||
err: &apistatus.EACLNotFound{},
|
||||
|
@ -627,6 +804,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: true,
|
||||
},
|
||||
{
|
||||
name: "no error, internal status",
|
||||
ctx: ctx,
|
||||
status: new(apistatus.ServerInternal),
|
||||
err: nil,
|
||||
|
@ -634,6 +812,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: true,
|
||||
},
|
||||
{
|
||||
name: "no error, wrong magic status",
|
||||
ctx: ctx,
|
||||
status: new(apistatus.WrongMagicNumber),
|
||||
err: nil,
|
||||
|
@ -641,6 +820,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: true,
|
||||
},
|
||||
{
|
||||
name: "no error, signature verification status",
|
||||
ctx: ctx,
|
||||
status: new(apistatus.SignatureVerification),
|
||||
err: nil,
|
||||
|
@ -648,13 +828,25 @@ func TestHandleError(t *testing.T) {
|
|||
countError: true,
|
||||
},
|
||||
{
|
||||
ctx: ctx,
|
||||
status: new(apistatus.NodeUnderMaintenance),
|
||||
err: nil,
|
||||
expectedError: true,
|
||||
countError: true,
|
||||
name: "no error, maintenance status",
|
||||
ctx: ctx,
|
||||
status: new(apistatus.NodeUnderMaintenance),
|
||||
err: nil,
|
||||
expectedError: true,
|
||||
countError: true,
|
||||
markedUnhealthy: true,
|
||||
},
|
||||
{
|
||||
name: "maintenance error, no status",
|
||||
ctx: ctx,
|
||||
status: nil,
|
||||
err: &apistatus.NodeUnderMaintenance{},
|
||||
expectedError: true,
|
||||
countError: true,
|
||||
markedUnhealthy: true,
|
||||
},
|
||||
{
|
||||
name: "no error, invalid argument status",
|
||||
ctx: ctx,
|
||||
status: new(apistatus.InvalidArgument),
|
||||
err: nil,
|
||||
|
@ -662,6 +854,7 @@ func TestHandleError(t *testing.T) {
|
|||
countError: false,
|
||||
},
|
||||
{
|
||||
name: "context canceled error, no status",
|
||||
ctx: canceledCtx,
|
||||
status: nil,
|
||||
err: errors.New("error"),
|
||||
|
@ -669,8 +862,9 @@ func TestHandleError(t *testing.T) {
|
|||
countError: false,
|
||||
},
|
||||
} {
|
||||
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||
errCount := monitor.currentErrorRate()
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
monitor := newClientStatusMonitor(log, "", 10)
|
||||
errCount := monitor.overallErrorRate()
|
||||
err := monitor.handleError(tc.ctx, tc.status, tc.err)
|
||||
if tc.expectedError {
|
||||
require.Error(t, err)
|
||||
|
@ -680,7 +874,10 @@ func TestHandleError(t *testing.T) {
|
|||
if tc.countError {
|
||||
errCount++
|
||||
}
|
||||
require.Equal(t, errCount, monitor.currentErrorRate())
|
||||
require.Equal(t, errCount, monitor.overallErrorRate())
|
||||
if tc.markedUnhealthy {
|
||||
require.False(t, monitor.isHealthy())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,6 +66,7 @@ type InitParameters struct {
|
|||
logger *zap.Logger
|
||||
nodeDialTimeout time.Duration
|
||||
nodeStreamTimeout time.Duration
|
||||
connTimeout time.Duration
|
||||
healthcheckTimeout time.Duration
|
||||
clientRebalanceInterval time.Duration
|
||||
nodeParams []pool.NodeParam
|
||||
|
@ -91,6 +92,7 @@ type Pool struct {
|
|||
methods []*pool.MethodStatus
|
||||
|
||||
maxRequestAttempts int
|
||||
connTimeout time.Duration
|
||||
|
||||
startIndicesMtx sync.RWMutex
|
||||
// startIndices points to the client from which the next request will be executed.
|
||||
|
@ -231,6 +233,7 @@ func NewPool(options InitParameters) (*Pool, error) {
|
|||
clientRebalanceInterval: options.clientRebalanceInterval,
|
||||
},
|
||||
maxRequestAttempts: options.maxRequestAttempts,
|
||||
connTimeout: options.connTimeout,
|
||||
methods: methods,
|
||||
}
|
||||
|
||||
|
@ -306,6 +309,11 @@ func (x *InitParameters) SetHealthcheckTimeout(timeout time.Duration) {
|
|||
x.healthcheckTimeout = timeout
|
||||
}
|
||||
|
||||
// SetConnectionTimeout specifies the timeout for operation in each request retry
|
||||
func (x *InitParameters) SetConnectionTimeout(timeout time.Duration) {
|
||||
x.connTimeout = timeout
|
||||
}
|
||||
|
||||
// SetClientRebalanceInterval specifies the interval for updating nodes health status.
|
||||
//
|
||||
// See also Pool.Dial.
|
||||
|
@ -489,7 +497,9 @@ func (p *Pool) AddNode(ctx context.Context, prm AddNodeParams) (uint64, error) {
|
|||
|
||||
var resp *grpcService.AddResponse
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
resp, inErr = client.Add(ctx, request)
|
||||
reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
|
||||
defer cancel()
|
||||
resp, inErr = client.Add(reqCtx, request)
|
||||
return handleError("failed to add node", inErr)
|
||||
})
|
||||
p.methods[methodAddNode].IncRequests(time.Since(start))
|
||||
|
@ -524,7 +534,9 @@ func (p *Pool) AddNodeByPath(ctx context.Context, prm AddNodeByPathParams) (uint
|
|||
|
||||
var resp *grpcService.AddByPathResponse
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) (inErr error) {
|
||||
resp, inErr = client.AddByPath(ctx, request)
|
||||
reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
|
||||
defer cancel()
|
||||
resp, inErr = client.AddByPath(reqCtx, request)
|
||||
return handleError("failed to add node by path", inErr)
|
||||
})
|
||||
p.methods[methodAddNodeByPath].IncRequests(time.Since(start))
|
||||
|
@ -566,7 +578,9 @@ func (p *Pool) MoveNode(ctx context.Context, prm MoveNodeParams) error {
|
|||
}
|
||||
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
if _, err := client.Move(ctx, request); err != nil {
|
||||
reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
|
||||
defer cancel()
|
||||
if _, err := client.Move(reqCtx, request); err != nil {
|
||||
return handleError("failed to move node", err)
|
||||
}
|
||||
return nil
|
||||
|
@ -597,7 +611,9 @@ func (p *Pool) RemoveNode(ctx context.Context, prm RemoveNodeParams) error {
|
|||
}
|
||||
|
||||
err := p.requestWithRetry(ctx, func(client grpcService.TreeServiceClient) error {
|
||||
if _, err := client.Remove(ctx, request); err != nil {
|
||||
reqCtx, cancel := getTimeoutContext(ctx, p.connTimeout)
|
||||
defer cancel()
|
||||
if _, err := client.Remove(reqCtx, request); err != nil {
|
||||
return handleError("failed to remove node", err)
|
||||
}
|
||||
return nil
|
||||
|
@ -887,6 +903,13 @@ func finalError(current, candidate error) error {
|
|||
return current
|
||||
}
|
||||
|
||||
func getTimeoutContext(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
|
||||
if timeout != 0 {
|
||||
return context.WithTimeout(ctx, timeout)
|
||||
}
|
||||
return ctx, func() {}
|
||||
}
|
||||
|
||||
type reqKeyType struct{}
|
||||
|
||||
// SetRequestID sets request identifier to context so when some operations are logged in tree pool
|
||||
|
|
Loading…
Reference in a new issue