[#16] pool: Fix counting context canceled error #43

Merged
alexvanin merged 1 commits from dkirillov/frostfs-sdk-go:feature/16-dont_count_context_canceled_as_error into master 2023-03-30 12:42:17 +00:00
3 changed files with 60 additions and 38 deletions

View File

@ -103,22 +103,22 @@ func (m *mockClient) containerSetEACL(context.Context, PrmContainerSetEACL) erro
return nil
}
func (m *mockClient) endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error) {
func (m *mockClient) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) {
var ni netmap.NodeInfo
if m.errorOnEndpointInfo {
return ni, m.handleError(nil, errors.New("error"))
return ni, m.handleError(ctx, nil, errors.New("error"))
}
ni.SetNetworkEndpoints(m.addr)
return ni, nil
}
func (m *mockClient) networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error) {
func (m *mockClient) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) {
var ni netmap.NetworkInfo
if m.errorOnNetworkInfo {
return ni, m.handleError(nil, errors.New("error"))
return ni, m.handleError(ctx, nil, errors.New("error"))
}
return ni, nil
@ -132,7 +132,7 @@ func (m *mockClient) objectDelete(context.Context, PrmObjectDelete) error {
return nil
}
func (m *mockClient) objectGet(context.Context, PrmObjectGet) (ResGetObject, error) {
func (m *mockClient) objectGet(ctx context.Context, _ PrmObjectGet) (ResGetObject, error) {
var res ResGetObject
if m.stOnGetObject == nil {
@ -140,7 +140,7 @@ func (m *mockClient) objectGet(context.Context, PrmObjectGet) (ResGetObject, err
}
status := apistatus.ErrFromStatus(m.stOnGetObject)
return res, m.handleError(status, nil)
return res, m.handleError(ctx, status, nil)
}
func (m *mockClient) objectHead(context.Context, PrmObjectHead) (object.Object, error) {
@ -155,9 +155,9 @@ func (m *mockClient) objectSearch(context.Context, PrmObjectSearch) (ResObjectSe
return ResObjectSearch{}, nil
}
func (m *mockClient) sessionCreate(context.Context, prmCreateSession) (resCreateSession, error) {
func (m *mockClient) sessionCreate(ctx context.Context, _ prmCreateSession) (resCreateSession, error) {
if m.errorOnCreateSession {
return resCreateSession{}, m.handleError(nil, errors.New("error"))
return resCreateSession{}, m.handleError(ctx, nil, errors.New("error"))
}
tok := newToken(m.key)

View File

@ -32,8 +32,6 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// client represents virtual connection to the single FrostFS network endpoint from which Pool is formed.
@ -383,7 +381,7 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (acco
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err)
}
@ -405,7 +403,7 @@ func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return cid.ID{}, fmt.Errorf("container put on client: %w", err)
}
@ -416,7 +414,7 @@ func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (
idCnr := res.ID()
err = waitForContainerPresence(ctx, c, idCnr, &prm.waitParams)
if err = c.handleError(nil, err); err != nil {
if err = c.handleError(ctx, nil, err); err != nil {
return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err)
}
@ -440,7 +438,7 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return container.Container{}, fmt.Errorf("container get on client: %w", err)
}
@ -464,7 +462,7 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList)
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return nil, fmt.Errorf("container list on client: %w", err)
}
return res.Containers(), nil
@ -491,7 +489,7 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return fmt.Errorf("container delete on client: %w", err)
}
@ -519,7 +517,7 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL)
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return eacl.Table{}, fmt.Errorf("get eacl on client: %w", err)
}
@ -548,7 +546,7 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return fmt.Errorf("set eacl on client: %w", err)
}
@ -562,7 +560,7 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
}
err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
if err = c.handleError(nil, err); err != nil {
if err = c.handleError(ctx, nil, err); err != nil {
return fmt.Errorf("wait eacl presence on client: %w", err)
}
@ -583,7 +581,7 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (ne
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err)
}
@ -604,7 +602,7 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err)
}
@ -633,7 +631,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
start := time.Now()
wObj, err := cl.ObjectPutInit(ctx, cliPrm)
c.incRequests(time.Since(start), methodObjectPut)
if err = c.handleError(nil, err); err != nil {
if err = c.handleError(ctx, nil, err); err != nil {
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
}
@ -677,7 +675,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
break
}
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(nil, err))
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(ctx, nil, err))
}
}
}
@ -687,7 +685,7 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil { // here err already carries both status and client errors
if err = c.handleError(ctx, st, err); err != nil { // here err already carries both status and client errors
return oid.ID{}, fmt.Errorf("client failure: %w", err)
}
@ -724,7 +722,7 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return fmt.Errorf("delete object on client: %w", err)
}
return nil
@ -756,7 +754,7 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
var res ResGetObject
rObj, err := cl.ObjectGetInit(ctx, cliPrm)
if err = c.handleError(nil, err); err != nil {
if err = c.handleError(ctx, nil, err); err != nil {
return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err)
}
@ -769,7 +767,7 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
if rObjRes != nil {
st = rObjRes.Status()
}
err = c.handleError(st, err)
err = c.handleError(ctx, st, err)
return res, fmt.Errorf("read header: %w", err)
}
@ -818,7 +816,7 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return obj, fmt.Errorf("read object header via client: %w", err)
}
if !res.ReadHeader(&obj) {
@ -856,7 +854,7 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re
start := time.Now()
res, err := cl.ObjectRangeInit(ctx, cliPrm)
c.incRequests(time.Since(start), methodObjectRange)
if err = c.handleError(nil, err); err != nil {
if err = c.handleError(ctx, nil, err); err != nil {
return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err)
}
@ -893,7 +891,7 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (
}
res, err := cl.ObjectSearchInit(ctx, cliPrm)
if err = c.handleError(nil, err); err != nil {
if err = c.handleError(ctx, nil, err); err != nil {
return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err)
}
@ -918,7 +916,7 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
if res != nil {
st = res.Status()
}
if err = c.handleError(st, err); err != nil {
if err = c.handleError(ctx, st, err); err != nil {
return resCreateSession{}, fmt.Errorf("session creation on client: %w", err)
}
@ -995,9 +993,9 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
}
}
func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error {
func (c *clientStatusMonitor) handleError(ctx context.Context, st apistatus.Status, err error) error {
if err != nil {
if needCountError(err) {
if needCountError(ctx, err) {
c.incErrorRate()
}
@ -1016,7 +1014,7 @@ func (c *clientStatusMonitor) handleError(st apistatus.Status, err error) error
return err
}
func needCountError(err error) bool {
func needCountError(ctx context.Context, err error) bool {
// non-status logic error that could be returned
// from the SDK client; should not be considered
// as a connection error
@ -1025,9 +1023,11 @@ func needCountError(err error) bool {
return false
}
// we can't use errors.Is(err, context.Canceled)
// https://github.com/grpc/grpc-go/issues/4375
return status.Code(err) != codes.Canceled

In case of client canceling - yes.
As I understand we want to handle client canceling and not inner timeout/deadling/canceling. Am I wrong?

In case of client canceling - yes. As I understand we want to handle client canceling and not inner timeout/deadling/canceling. Am I wrong?
if errors.Is(ctx.Err(), context.Canceled) {

Why condition status.Code(err) != codes.Canceled dropped? As I understand it, the type of error depends on where the cancellation event is handled. For example, this function wraps ctx.Done() to status.Status with status.Code(err) == codes.Canceled

Why condition ```status.Code(err) != codes.Canceled``` dropped? As I understand it, the type of error depends on where the cancellation event is handled. For example, [this function](https://github.com/grpc/grpc-go/blob/master/clientconn.go#L577) wraps ctx.Done() to status.Status with ```status.Code(err) == codes.Canceled```

If we want to handle only client canceling we can rely on error in context. Error in context will be context.Canceled no matter what error will be returned from sdk client

If we want to handle only client canceling we can rely on error in context. Error in context will be `context.Canceled` no matter what error will be returned from sdk client
return false
}
return true
}
// clientBuilder is a type alias of client constructors which open connection

View File

@ -527,78 +527,100 @@ func TestStatusMonitor(t *testing.T) {
}
func TestHandleError(t *testing.T) {
ctx := context.Background()
monitor := newClientStatusMonitor(zap.NewExample(), "", 10)
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
for i, tc := range []struct {
ctx context.Context
status apistatus.Status
err error
expectedError bool
countError bool
}{
{
ctx: ctx,
status: nil,
err: nil,
expectedError: false,
countError: false,
},
{
ctx: ctx,
status: apistatus.SuccessDefaultV2{},
err: nil,
expectedError: false,
countError: false,
},
{
ctx: ctx,
status: apistatus.SuccessDefaultV2{},
err: errors.New("error"),
expectedError: true,
countError: true,
},
{
ctx: ctx,
status: nil,
err: errors.New("error"),
expectedError: true,
countError: true,
},
{
ctx: ctx,
status: apistatus.ObjectNotFound{},
err: nil,
expectedError: true,
countError: false,
},
{
ctx: ctx,
status: apistatus.ServerInternal{},
err: nil,
expectedError: true,
countError: true,
},
{
ctx: ctx,
status: apistatus.WrongMagicNumber{},
err: nil,
expectedError: true,
countError: true,
},
{
ctx: ctx,
status: apistatus.SignatureVerification{},
err: nil,
expectedError: true,
countError: true,
},
{
ctx: ctx,
status: &apistatus.SignatureVerification{},
err: nil,
expectedError: true,
countError: true,
},
{
ctx: ctx,
status: apistatus.NodeUnderMaintenance{},
err: nil,
expectedError: true,
countError: true,
},
{
ctx: canceledCtx,
status: nil,
err: errors.New("error"),
expectedError: true,
countError: false,
},
} {
t.Run(strconv.Itoa(i), func(t *testing.T) {
errCount := monitor.currentErrorRate()
err := monitor.handleError(tc.status, tc.err)
err := monitor.handleError(tc.ctx, tc.status, tc.err)
if tc.expectedError {
require.Error(t, err)
} else {