Compare commits

..

10 commits

Author SHA1 Message Date
d8f1cc29f5 [#XX] container: Add ListStream method
Signed-off-by: Ekaterina Lebedeva <ekaterina.lebedeva@yadro.com>
2024-10-28 18:06:40 +03:00
5361f0eceb [#279] pool: Count errors in object search
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-10-22 12:41:11 +00:00
05aa3becae [#278] pool: Don't make maintenance node healthy in rebalance
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-10-17 16:31:49 +03:00
79f387317a [#283] pool: Mark node unhealthy if node is under maintenance
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-10-16 15:22:05 +03:00
3ea4741231 [#283] go.mod: Tidy
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-10-16 15:20:53 +03:00
d7872061f8
[#284] go.mod: Update api-go
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-11 15:17:23 +03:00
99c5c58365
[#282] client: Close connection on non-nil error in Dial
A particular status code does not imply that a connection has not been
established. However, `Dial()` requires user to call `Close()` only if
the error was nil. Thus, it is `Dial()` responsibility to close
everything if it returns an error.

Introduced after the gRPC update in #270 (6009d089fc).

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-10-10 14:03:44 +03:00
4c310ae1c7 [#280] client: Use DialTimeout for gRPC dial
After removing `grpc.Dial` from client `DialTimeout` used only if
custom dialer provided. Client uses `BalanceOf` instead of `grpc.Dial`,
so it is required to use `DialTimeout` to not to use RPC timeout.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-10-07 16:58:05 +03:00
997346ef95
[#274] client/status: Add missing test cases for commom statuses
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-03 17:05:15 +03:00
7f6eda566a
[#274] client/status: Fix check in TestNodeUnderMaintenance test
Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-10-03 17:05:15 +03:00
10 changed files with 594 additions and 126 deletions

View file

@ -100,20 +100,27 @@ func (c *Client) Dial(ctx context.Context, prm PrmDial) error {
c.setFrostFSAPIServer((*coreServer)(&c.c))
ctx, cancel := context.WithTimeout(ctx, prm.DialTimeout)
defer cancel()
_, err := rpc.Balance(&c.c, new(v2accounting.BalanceRequest),
client.WithContext(ctx),
)
if err != nil {
var ctxErr error
// return context errors since they signal about dial problem
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return err
ctxErr = err
} else if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled {
ctxErr = context.Canceled
} else if ok && st.Code() == codes.DeadlineExceeded {
ctxErr = context.DeadlineExceeded
}
st, ok := status.FromError(err)
if ok && st.Code() == codes.Canceled {
return context.Canceled
}
if ok && st.Code() == codes.DeadlineExceeded {
return context.DeadlineExceeded
if ctxErr != nil {
if conn := c.c.Conn(); conn != nil {
_ = conn.Close()
}
return ctxErr
}
}

View file

@ -0,0 +1,182 @@
package client
import (
"context"
"errors"
"fmt"
"io"
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
v2refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
rpcapi "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
v2session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
)
// PrmContainerListStream groups parameters of ContainerListStream operation.
type PrmContainerListStream struct {
XHeaders []string
Account user.ID
Session *session.Container
}
// SetAccount sets identifier of the FrostFS account to list the containers.
// Required parameter.
//
// Deprecated: Use PrmContainerListStream.Account instead.
func (x *PrmContainerListStream) SetAccount(id user.ID) {
x.Account = id
}
func (x *PrmContainerListStream) buildRequest(c *Client) (*v2container.ListStreamRequest, error) {
if x.Account.IsEmpty() {
return nil, errorAccountNotSet
}
var ownerV2 v2refs.OwnerID
x.Account.WriteToV2(&ownerV2)
reqBody := new(v2container.ListStreamRequestBody)
reqBody.SetOwnerID(&ownerV2)
var meta v2session.RequestMetaHeader
writeXHeadersToMeta(x.XHeaders, &meta)
if x.Session != nil {
var tokv2 v2session.Token
x.Session.WriteToV2(&tokv2)
meta.SetSessionToken(&tokv2)
}
var req v2container.ListStreamRequest
req.SetBody(reqBody)
c.prepareRequest(&req, &meta)
return &req, nil
}
type ResContainerListStream struct {
statusRes
}
type ContainerListReader struct {
client *Client
cancelCtxStream context.CancelFunc
err error
res ResContainerListStream
stream interface {
Read(resp *v2container.ListStreamResponse) error
}
tail []v2refs.ContainerID
}
func (x *ContainerListReader) Read(buf []cid.ID) (int, bool) {
if len(buf) == 0 {
panic("empty buffer in ContainerListReader.ReadList")
}
read := copyCnrIDBuffers(buf, x.tail)
x.tail = x.tail[read:]
if len(buf) == read {
return read, true
}
for {
var resp v2container.ListStreamResponse
x.err = x.stream.Read(&resp)
if x.err != nil {
return read, false
}
x.res.st, x.err = x.client.processResponse(&resp)
if x.err != nil || !apistatus.IsSuccessful(x.res.st) {
return read, false
}
// read new chunk of objects
ids := resp.GetBody().GetContainerIDs()
if len(ids) == 0 {
// just skip empty lists since they are not prohibited by protocol
continue
}
ln := copyCnrIDBuffers(buf[read:], ids)
read += ln
if read == len(buf) {
// save the tail
x.tail = append(x.tail, ids[ln:]...)
return read, true
}
}
}
func copyCnrIDBuffers(dst []cid.ID, src []v2refs.ContainerID) int {
var i int
for ; i < len(dst) && i < len(src); i++ {
_ = dst[i].ReadFromV2(src[i])
}
return i
}
func (x *ContainerListReader) Iterate(f func(cid.ID) bool) error {
buf := make([]cid.ID, 1)
for {
// Do not check first return value because `len(buf) == 1`,
// so false means nothing was read.
_, ok := x.Read(buf)
if !ok {
res, err := x.Close()
if err != nil {
return err
}
return apistatus.ErrFromStatus(res.Status())
}
if f(buf[0]) {
return nil
}
}
}
func (x *ContainerListReader) Close() (*ResContainerListStream, error) {
defer x.cancelCtxStream()
if x.err != nil && !errors.Is(x.err, io.EOF) {
return nil, x.err
}
return &x.res, nil
}
func (c *Client) ContainerListInit(ctx context.Context, prm PrmContainerListStream) (*ContainerListReader, error) {
req, err := prm.buildRequest(c)
if err != nil {
return nil, err
}
err = signature.SignServiceMessage(&c.prm.Key, req)
if err != nil {
return nil, fmt.Errorf("sign request: %w", err)
}
var r ContainerListReader
ctx, r.cancelCtxStream = context.WithCancel(ctx)
r.stream, err = rpcapi.ListContainersStream(&c.c, req, client.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("open stream: %w", err)
}
r.client = c
return &r, nil
}

View file

@ -220,7 +220,7 @@ func (x *ObjectListReader) Close() (*ResObjectSearch, error) {
defer x.cancelCtxStream()
if x.err != nil && !errors.Is(x.err, io.EOF) {
return nil, x.err
return &x.res, x.err
}
return &x.res, nil

View file

@ -114,7 +114,7 @@ func TestNodeUnderMaintenance(t *testing.T) {
stV2 := st.ToStatusV2()
require.Empty(t, "", stV2.Message())
require.Equal(t, "node is under maintenance", stV2.Message())
})
t.Run("non-empty to V2", func(t *testing.T) {

View file

@ -61,6 +61,24 @@ func TestToStatusV2(t *testing.T) {
}),
codeV2: 1025,
},
{
status: (statusConstructor)(func() apistatus.Status {
return new(apistatus.SignatureVerification)
}),
codeV2: 1026,
},
{
status: (statusConstructor)(func() apistatus.Status {
return new(apistatus.NodeUnderMaintenance)
}),
codeV2: 1027,
},
{
status: (statusConstructor)(func() apistatus.Status {
return new(apistatus.InvalidArgument)
}),
codeV2: 1028,
},
{
status: (statusConstructor)(func() apistatus.Status {
return new(apistatus.ObjectLocked)
@ -131,18 +149,6 @@ func TestToStatusV2(t *testing.T) {
}),
codeV2: 5120,
},
{
status: (statusConstructor)(func() apistatus.Status {
return new(apistatus.NodeUnderMaintenance)
}),
codeV2: 1027,
},
{
status: (statusConstructor)(func() apistatus.Status {
return new(apistatus.InvalidArgument)
}),
codeV2: 1028,
},
} {
var st apistatus.Status

2
go.mod
View file

@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-sdk-go
go 1.22
require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241002064811-3e705a3cbe84
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241011114054-f0fc40e116d1
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/hrw v1.2.1
git.frostfs.info/TrueCloudLab/tzhash v1.8.0

4
go.sum
View file

@ -1,5 +1,5 @@
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241002064811-3e705a3cbe84 h1:enycv8Uaji5Ic1+hk+F4BpYOQKV5U5t8A9CV8AmU2+M=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241002064811-3e705a3cbe84/go.mod h1:F5GS7hRb62PUy5sTYDC4ajVdeffoAfjHSSHTKUJEaYU=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241011114054-f0fc40e116d1 h1:ivcdxQeQDnx4srF2ezoaeVlF0FAycSAztwfIUJnUI4s=
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20241011114054-f0fc40e116d1/go.mod h1:F5GS7hRb62PUy5sTYDC4ajVdeffoAfjHSSHTKUJEaYU=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e h1:kcBqZBiFIUBATUqEuvVigtkJJWQ2Gug/eYXn967o3M4=
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e/go.mod h1:F/fe1OoIDKr5Bz99q4sriuHDuf3aZefZy9ZsCqEtgxc=
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=

View file

@ -26,11 +26,15 @@ type mockClient struct {
errorOnDial bool
errorOnCreateSession bool
errorOnEndpointInfo bool
errorOnEndpointInfo error
resOnEndpointInfo netmap.NodeInfo
healthcheckFn func()
errorOnNetworkInfo bool
stOnGetObject apistatus.Status
}
var _ client = (*mockClient)(nil)
func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
return &mockClient{
key: key,
@ -38,6 +42,16 @@ func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
}
}
func newMockClientHealthy(addr string, key ecdsa.PrivateKey, healthy bool) *mockClient {
m := newMockClient(addr, key)
if healthy {
m.setHealthy()
} else {
m.setUnhealthy()
}
return m
}
func (m *mockClient) setThreshold(threshold uint32) {
m.errorThreshold = threshold
}
@ -47,11 +61,11 @@ func (m *mockClient) errOnCreateSession() {
}
func (m *mockClient) errOnEndpointInfo() {
m.errorOnEndpointInfo = true
m.errorOnEndpointInfo = errors.New("error")
}
func (m *mockClient) errOnNetworkInfo() {
m.errorOnEndpointInfo = true
m.errorOnEndpointInfo = errors.New("error")
}
func (m *mockClient) errOnDial() {
@ -94,27 +108,32 @@ func (m *mockClient) containerDelete(context.Context, PrmContainerDelete) error
return nil
}
func (c *mockClient) apeManagerAddChain(ctx context.Context, prm PrmAddAPEChain) error {
func (m *mockClient) apeManagerAddChain(context.Context, PrmAddAPEChain) error {
return nil
}
func (c *mockClient) apeManagerRemoveChain(ctx context.Context, prm PrmRemoveAPEChain) error {
func (m *mockClient) apeManagerRemoveChain(context.Context, PrmRemoveAPEChain) error {
return nil
}
func (c *mockClient) apeManagerListChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) {
func (m *mockClient) apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error) {
return []ape.Chain{}, nil
}
func (m *mockClient) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) {
var ni netmap.NodeInfo
if m.errorOnEndpointInfo {
return ni, m.handleError(ctx, nil, errors.New("error"))
if m.errorOnEndpointInfo != nil {
return netmap.NodeInfo{}, m.handleError(ctx, nil, m.errorOnEndpointInfo)
}
ni.SetNetworkEndpoints(m.addr)
return ni, nil
m.resOnEndpointInfo.SetNetworkEndpoints(m.addr)
return m.resOnEndpointInfo, nil
}
func (m *mockClient) healthcheck(ctx context.Context) (netmap.NodeInfo, error) {
if m.healthcheckFn != nil {
m.healthcheckFn()
}
return m.endpointInfo(ctx, prmEndpointInfo{})
}
func (m *mockClient) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) {
@ -190,16 +209,12 @@ func (m *mockClient) dial(context.Context) error {
return nil
}
func (m *mockClient) restartIfUnhealthy(ctx context.Context) (changed bool, err error) {
_, err = m.endpointInfo(ctx, prmEndpointInfo{})
healthy := err == nil
changed = healthy != m.isHealthy()
if healthy {
m.setHealthy()
} else {
m.setUnhealthy()
func (m *mockClient) restart(context.Context) error {
if m.errorOnDial {
return errors.New("restart dial error")
}
return
return nil
}
func (m *mockClient) close() error {

View file

@ -57,6 +57,8 @@ type client interface {
apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error)
// see clientWrapper.endpointInfo.
endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error)
// see clientWrapper.healthcheck.
healthcheck(ctx context.Context) (netmap.NodeInfo, error)
// see clientWrapper.networkInfo.
networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error)
// see clientWrapper.netMapSnapshot
@ -82,8 +84,8 @@ type client interface {
// see clientWrapper.dial.
dial(ctx context.Context) error
// see clientWrapper.restartIfUnhealthy.
restartIfUnhealthy(ctx context.Context) (bool, error)
// see clientWrapper.restart.
restart(ctx context.Context) error
// see clientWrapper.close.
close() error
}
@ -92,10 +94,10 @@ type client interface {
type clientStatus interface {
// isHealthy checks if the connection can handle requests.
isHealthy() bool
// isDialed checks if the connection was created.
isDialed() bool
// setUnhealthy marks client as unhealthy.
setUnhealthy()
// setHealthy marks client as healthy.
setHealthy()
// address return address of endpoint.
address() string
// currentErrorRate returns current errors rate.
@ -126,15 +128,10 @@ type clientStatusMonitor struct {
// values for healthy status of clientStatusMonitor.
const (
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
statusUnhealthyOnDial = iota
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
statusUnhealthyOnRequest
statusUnhealthyOnRequest = iota
// statusHealthy is set when connection is ready to be used by the pool.
statusHealthy
@ -233,6 +230,7 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
type clientWrapper struct {
clientMutex sync.RWMutex
client *sdkClient.Client
dialed bool
prm wrapperPrm
clientStatusMonitor
@ -342,30 +340,17 @@ func (c *clientWrapper) dial(ctx context.Context) error {
GRPCDialOptions: c.prm.dialOptions,
}
if err = cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthyOnDial()
err = cl.Dial(ctx, prmDial)
c.setDialed(err == nil)
if err != nil {
return err
}
return nil
}
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, err error) {
var wasHealthy bool
if _, err = c.endpointInfo(ctx, prmEndpointInfo{}); err == nil {
return false, nil
} else if !errors.Is(err, errPoolClientUnhealthy) {
wasHealthy = true
}
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
c.scheduleGracefulClose()
}
// restart recreates and redial inner sdk client.
func (c *clientWrapper) restart(ctx context.Context) error {
var cl sdkClient.Client
prmInit := sdkClient.PrmInit{
Key: c.prm.key,
@ -381,22 +366,35 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, e
GRPCDialOptions: c.prm.dialOptions,
}
if err = cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthyOnDial()
return wasHealthy, err
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
c.scheduleGracefulClose()
}
err := cl.Dial(ctx, prmDial)
c.setDialed(err == nil)
if err != nil {
return err
}
c.clientMutex.Lock()
c.client = &cl
c.clientMutex.Unlock()
if _, err = cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}); err != nil {
c.setUnhealthy()
return wasHealthy, err
}
return nil
}
c.setHealthy()
return !wasHealthy, nil
func (c *clientWrapper) isDialed() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.dialed
}
func (c *clientWrapper) setDialed(dialed bool) {
c.mu.Lock()
c.dialed = dialed
c.mu.Unlock()
}
func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
@ -654,6 +652,15 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (ne
return netmap.NodeInfo{}, err
}
return c.endpointInfoRaw(ctx, cl)
}
func (c *clientWrapper) healthcheck(ctx context.Context) (netmap.NodeInfo, error) {
cl := c.getClientRaw()
return c.endpointInfoRaw(ctx, cl)
}
func (c *clientWrapper) endpointInfoRaw(ctx context.Context, cl *sdkClient.Client) (netmap.NodeInfo, error) {
start := time.Now()
res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
c.incRequests(time.Since(start), methodEndpointInfo)
@ -1085,7 +1092,7 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (
return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err)
}
return ResObjectSearch{r: res}, nil
return ResObjectSearch{r: res, handleError: c.handleError}, nil
}
// sessionCreate invokes sdkClient.SessionCreate parse response status to error and return result as is.
@ -1121,10 +1128,6 @@ func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load() == statusHealthy
}
func (c *clientStatusMonitor) isDialed() bool {
return c.healthy.Load() != statusUnhealthyOnDial
}
func (c *clientStatusMonitor) setHealthy() {
c.healthy.Store(statusHealthy)
}
@ -1133,10 +1136,6 @@ func (c *clientStatusMonitor) setUnhealthy() {
c.healthy.Store(statusUnhealthyOnRequest)
}
func (c *clientStatusMonitor) setUnhealthyOnDial() {
c.healthy.Store(statusUnhealthyOnDial)
}
func (c *clientStatusMonitor) address() string {
return c.addr
}
@ -1159,6 +1158,16 @@ func (c *clientStatusMonitor) incErrorRate() {
}
}
func (c *clientStatusMonitor) incErrorRateToUnhealthy(err error) {
c.mu.Lock()
c.currentErrorCount = 0
c.overallErrorCount++
c.setUnhealthy()
c.mu.Unlock()
c.log(zapcore.WarnLevel, "explicitly mark node unhealthy", zap.String("address", c.addr), zap.Error(err))
}
func (c *clientStatusMonitor) log(level zapcore.Level, msg string, fields ...zap.Field) {
if c.logger == nil {
return
@ -1201,6 +1210,9 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
}
func (c *clientWrapper) close() error {
if !c.isDialed() {
return nil
}
if cl := c.getClientRaw(); cl != nil {
return cl.Close()
}
@ -1225,9 +1237,10 @@ func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Stat
switch stErr.(type) {
case *apistatus.ServerInternal,
*apistatus.WrongMagicNumber,
*apistatus.SignatureVerification,
*apistatus.NodeUnderMaintenance:
*apistatus.SignatureVerification:
c.incErrorRate()
case *apistatus.NodeUnderMaintenance:
c.incErrorRateToUnhealthy(stErr)
}
if err == nil {
@ -1239,7 +1252,11 @@ func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Stat
if err != nil {
if needCountError(ctx, err) {
c.incErrorRate()
if sdkClient.IsErrNodeUnderMaintenance(err) {
c.incErrorRateToUnhealthy(err)
} else {
c.incErrorRate()
}
}
return err
@ -1261,7 +1278,7 @@ func needCountError(ctx context.Context, err error) bool {
return false
}
if errors.Is(ctx.Err(), context.Canceled) {
if ctx != nil && errors.Is(ctx.Err(), context.Canceled) {
return false
}
@ -2138,7 +2155,9 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
// startRebalance runs loop to monitor connection healthy status.
func (p *Pool) startRebalance(ctx context.Context) {
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval)
ticker := time.NewTicker(p.rebalanceParams.clientRebalanceInterval)
defer ticker.Stop()
buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
for i, params := range p.rebalanceParams.nodesParams {
buffers[i] = make([]float64, len(params.weights))
@ -2188,7 +2207,7 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
defer c()
changed, err := cli.restartIfUnhealthy(tctx)
changed, err := restartIfUnhealthy(tctx, cli)
healthy := err == nil
if healthy {
bufferWeights[j] = options.nodesParams[i].weights[j]
@ -2219,6 +2238,43 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
}
}
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) {
defer func() {
if err != nil {
c.setUnhealthy()
} else {
c.setHealthy()
}
}()
wasHealthy := c.isHealthy()
if res, err := c.healthcheck(ctx); err == nil {
if res.Status().IsMaintenance() {
return wasHealthy, new(apistatus.NodeUnderMaintenance)
}
return !wasHealthy, nil
}
if err = c.restart(ctx); err != nil {
return wasHealthy, err
}
res, err := c.healthcheck(ctx)
if err != nil {
return wasHealthy, err
}
if res.Status().IsMaintenance() {
return wasHealthy, new(apistatus.NodeUnderMaintenance)
}
return !wasHealthy, nil
}
func adjustWeights(weights []float64) []float64 {
adjusted := make([]float64, len(weights))
sum := 0.0
@ -2708,18 +2764,25 @@ func (p *Pool) ObjectRange(ctx context.Context, prm PrmObjectRange) (ResObjectRa
//
// Must be initialized using Pool.SearchObjects, any other usage is unsafe.
type ResObjectSearch struct {
r *sdkClient.ObjectListReader
r *sdkClient.ObjectListReader
handleError func(context.Context, apistatus.Status, error) error
}
// Read reads another list of the object identifiers.
func (x *ResObjectSearch) Read(buf []oid.ID) (int, error) {
n, ok := x.r.Read(buf)
if !ok {
_, err := x.r.Close()
res, err := x.r.Close()
if err == nil {
return n, io.EOF
}
var status apistatus.Status
if res != nil {
status = res.Status()
}
err = x.handleError(nil, status, err)
return n, err
}
@ -3017,9 +3080,7 @@ func (p *Pool) Close() {
// close all clients
for _, pools := range p.innerPools {
for _, cli := range pools.clients {
if cli.isDialed() {
_ = cli.close()
}
_ = cli.close()
}
}
}

View file

@ -4,12 +4,13 @@ import (
"context"
"crypto/ecdsa"
"errors"
"strconv"
"math/rand"
"testing"
"time"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
@ -17,6 +18,8 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
)
func TestBuildPoolClientFailed(t *testing.T) {
@ -230,6 +233,179 @@ func TestOneOfTwoFailed(t *testing.T) {
}
}
func TestUpdateNodesHealth(t *testing.T) {
ctx := context.Background()
key := newPrivateKey(t)
for _, tc := range []struct {
name string
wasHealthy bool
willHealthy bool
prepareCli func(*mockClient)
}{
{
name: "healthy, maintenance, unhealthy",
wasHealthy: true,
willHealthy: false,
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Maintenance) },
},
{
name: "unhealthy, maintenance, unhealthy",
wasHealthy: false,
willHealthy: false,
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Maintenance) },
},
{
name: "healthy, no error, healthy",
wasHealthy: true,
willHealthy: true,
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Online) },
},
{
name: "unhealthy, no error, healthy",
wasHealthy: false,
willHealthy: true,
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Online) },
},
{
name: "healthy, error, failed restart, unhealthy",
wasHealthy: true,
willHealthy: false,
prepareCli: func(c *mockClient) {
c.errOnEndpointInfo()
c.errorOnDial = true
},
},
{
name: "unhealthy, error, failed restart, unhealthy",
wasHealthy: false,
willHealthy: false,
prepareCli: func(c *mockClient) {
c.errOnEndpointInfo()
c.errorOnDial = true
},
},
{
name: "healthy, error, restart, error, unhealthy",
wasHealthy: true,
willHealthy: false,
prepareCli: func(c *mockClient) { c.errOnEndpointInfo() },
},
{
name: "unhealthy, error, restart, error, unhealthy",
wasHealthy: false,
willHealthy: false,
prepareCli: func(c *mockClient) { c.errOnEndpointInfo() },
},
{
name: "healthy, error, restart, maintenance, unhealthy",
wasHealthy: true,
willHealthy: false,
prepareCli: func(c *mockClient) {
healthError := true
c.healthcheckFn = func() {
if healthError {
c.errorOnEndpointInfo = errors.New("error")
} else {
c.errorOnEndpointInfo = nil
c.resOnEndpointInfo.SetStatus(netmap.Maintenance)
}
healthError = !healthError
}
},
},
{
name: "unhealthy, error, restart, maintenance, unhealthy",
wasHealthy: false,
willHealthy: false,
prepareCli: func(c *mockClient) {
healthError := true
c.healthcheckFn = func() {
if healthError {
c.errorOnEndpointInfo = errors.New("error")
} else {
c.errorOnEndpointInfo = nil
c.resOnEndpointInfo.SetStatus(netmap.Maintenance)
}
healthError = !healthError
}
},
},
{
name: "healthy, error, restart, healthy",
wasHealthy: true,
willHealthy: true,
prepareCli: func(c *mockClient) {
healthError := true
c.healthcheckFn = func() {
if healthError {
c.errorOnEndpointInfo = errors.New("error")
} else {
c.errorOnEndpointInfo = nil
}
healthError = !healthError
}
},
},
{
name: "unhealthy, error, restart, healthy",
wasHealthy: false,
willHealthy: true,
prepareCli: func(c *mockClient) {
healthError := true
c.healthcheckFn = func() {
if healthError {
c.errorOnEndpointInfo = errors.New("error")
} else {
c.errorOnEndpointInfo = nil
}
healthError = !healthError
}
},
},
} {
t.Run(tc.name, func(t *testing.T) {
cli := newMockClientHealthy("peer0", *key, tc.wasHealthy)
tc.prepareCli(cli)
p, log := newPool(t, cli)
p.updateNodesHealth(ctx, [][]float64{{1}})
changed := tc.wasHealthy != tc.willHealthy
require.Equalf(t, tc.willHealthy, cli.isHealthy(), "healthy status should be: %v", tc.willHealthy)
require.Equalf(t, changed, 1 == log.Len(), "healthy status should be changed: %v", changed)
})
}
}
func newPool(t *testing.T, cli *mockClient) (*Pool, *observer.ObservedLogs) {
log, observedLog := getObservedLogger()
cache, err := newCache(0)
require.NoError(t, err)
return &Pool{
innerPools: []*innerPool{{
sampler: newSampler([]float64{1}, rand.NewSource(0)),
clients: []client{cli},
}},
cache: cache,
key: newPrivateKey(t),
closedCh: make(chan struct{}),
rebalanceParams: rebalanceParameters{
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
nodeRequestTimeout: time.Second,
clientRebalanceInterval: 200 * time.Millisecond,
},
logger: log,
}, observedLog
}
func getObservedLogger() (*zap.Logger, *observer.ObservedLogs) {
loggerCore, observedLog := observer.New(zap.DebugLevel)
return zap.New(loggerCore), observedLog
}
func TestTwoFailed(t *testing.T) {
var clientKeys []*ecdsa.PrivateKey
mockClientBuilder := func(addr string) client {
@ -529,13 +705,6 @@ func TestStatusMonitor(t *testing.T) {
isHealthy bool
description string
}{
{
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
status: statusUnhealthyOnDial,
isDialed: false,
isHealthy: false,
description: "set unhealthy on dial",
},
{
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
status: statusUnhealthyOnRequest,
@ -554,7 +723,6 @@ func TestStatusMonitor(t *testing.T) {
for _, tc := range cases {
tc.action(&monitor)
require.Equal(t, tc.status, monitor.healthy.Load())
require.Equal(t, tc.isDialed, monitor.isDialed())
require.Equal(t, tc.isHealthy, monitor.isHealthy())
}
})
@ -562,19 +730,22 @@ func TestStatusMonitor(t *testing.T) {
func TestHandleError(t *testing.T) {
ctx := context.Background()
monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
log := zaptest.NewLogger(t)
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
for i, tc := range []struct {
ctx context.Context
status apistatus.Status
err error
expectedError bool
countError bool
for _, tc := range []struct {
name string
ctx context.Context
status apistatus.Status
err error
expectedError bool
countError bool
markedUnhealthy bool
}{
{
name: "no error, no status",
ctx: ctx,
status: nil,
err: nil,
@ -582,6 +753,7 @@ func TestHandleError(t *testing.T) {
countError: false,
},
{
name: "no error, success status",
ctx: ctx,
status: new(apistatus.SuccessDefaultV2),
err: nil,
@ -589,6 +761,7 @@ func TestHandleError(t *testing.T) {
countError: false,
},
{
name: "error, success status",
ctx: ctx,
status: new(apistatus.SuccessDefaultV2),
err: errors.New("error"),
@ -596,6 +769,7 @@ func TestHandleError(t *testing.T) {
countError: true,
},
{
name: "error, no status",
ctx: ctx,
status: nil,
err: errors.New("error"),
@ -603,6 +777,7 @@ func TestHandleError(t *testing.T) {
countError: true,
},
{
name: "no error, object not found status",
ctx: ctx,
status: new(apistatus.ObjectNotFound),
err: nil,
@ -610,6 +785,7 @@ func TestHandleError(t *testing.T) {
countError: false,
},
{
name: "object not found error, object not found status",
ctx: ctx,
status: new(apistatus.ObjectNotFound),
err: &apistatus.ObjectNotFound{},
@ -617,6 +793,7 @@ func TestHandleError(t *testing.T) {
countError: false,
},
{
name: "eacl not found error, no status",
ctx: ctx,
status: nil,
err: &apistatus.EACLNotFound{},
@ -627,6 +804,7 @@ func TestHandleError(t *testing.T) {
countError: true,
},
{
name: "no error, internal status",
ctx: ctx,
status: new(apistatus.ServerInternal),
err: nil,
@ -634,6 +812,7 @@ func TestHandleError(t *testing.T) {
countError: true,
},
{
name: "no error, wrong magic status",
ctx: ctx,
status: new(apistatus.WrongMagicNumber),
err: nil,
@ -641,6 +820,7 @@ func TestHandleError(t *testing.T) {
countError: true,
},
{
name: "no error, signature verification status",
ctx: ctx,
status: new(apistatus.SignatureVerification),
err: nil,
@ -648,13 +828,25 @@ func TestHandleError(t *testing.T) {
countError: true,
},
{
ctx: ctx,
status: new(apistatus.NodeUnderMaintenance),
err: nil,
expectedError: true,
countError: true,
name: "no error, maintenance status",
ctx: ctx,
status: new(apistatus.NodeUnderMaintenance),
err: nil,
expectedError: true,
countError: true,
markedUnhealthy: true,
},
{
name: "maintenance error, no status",
ctx: ctx,
status: nil,
err: &apistatus.NodeUnderMaintenance{},
expectedError: true,
countError: true,
markedUnhealthy: true,
},
{
name: "no error, invalid argument status",
ctx: ctx,
status: new(apistatus.InvalidArgument),
err: nil,
@ -662,6 +854,7 @@ func TestHandleError(t *testing.T) {
countError: false,
},
{
name: "context canceled error, no status",
ctx: canceledCtx,
status: nil,
err: errors.New("error"),
@ -669,8 +862,9 @@ func TestHandleError(t *testing.T) {
countError: false,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
errCount := monitor.currentErrorRate()
t.Run(tc.name, func(t *testing.T) {
monitor := newClientStatusMonitor(log, "", 10)
errCount := monitor.overallErrorRate()
err := monitor.handleError(tc.ctx, tc.status, tc.err)
if tc.expectedError {
require.Error(t, err)
@ -680,7 +874,10 @@ func TestHandleError(t *testing.T) {
if tc.countError {
errCount++
}
require.Equal(t, errCount, monitor.currentErrorRate())
require.Equal(t, errCount, monitor.overallErrorRate())
if tc.markedUnhealthy {
require.False(t, monitor.isHealthy())
}
})
}
}