forked from TrueCloudLab/frostfs-sdk-go
pool: Refactor handleError to updateErrorRate
Signed-off-by: Evgenii Baidakov <evgenii@nspcc.io>
This commit is contained in:
parent
e377b3b4f6
commit
49bc3b7202
3 changed files with 59 additions and 34 deletions
|
@ -107,7 +107,9 @@ func (m *mockClient) endpointInfo(context.Context, prmEndpointInfo) (netmap.Node
|
||||||
var ni netmap.NodeInfo
|
var ni netmap.NodeInfo
|
||||||
|
|
||||||
if m.errorOnEndpointInfo {
|
if m.errorOnEndpointInfo {
|
||||||
return ni, m.handleError(errors.New("error"))
|
err := errors.New("endpoint info")
|
||||||
|
m.updateErrorRate(err)
|
||||||
|
return ni, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ni.SetNetworkEndpoints(m.addr)
|
ni.SetNetworkEndpoints(m.addr)
|
||||||
|
@ -118,7 +120,9 @@ func (m *mockClient) networkInfo(context.Context, prmNetworkInfo) (netmap.Networ
|
||||||
var ni netmap.NetworkInfo
|
var ni netmap.NetworkInfo
|
||||||
|
|
||||||
if m.errorOnNetworkInfo {
|
if m.errorOnNetworkInfo {
|
||||||
return ni, m.handleError(errors.New("error"))
|
err := errors.New("network info")
|
||||||
|
m.updateErrorRate(err)
|
||||||
|
return ni, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return ni, nil
|
return ni, nil
|
||||||
|
@ -140,7 +144,8 @@ func (m *mockClient) objectGet(context.Context, PrmObjectGet) (ResGetObject, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err := apistatus.ErrFromStatus(m.stOnGetObject)
|
err := apistatus.ErrFromStatus(m.stOnGetObject)
|
||||||
return res, m.handleError(err)
|
m.updateErrorRate(err)
|
||||||
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockClient) objectHead(context.Context, PrmObjectHead) (object.Object, error) {
|
func (m *mockClient) objectHead(context.Context, PrmObjectHead) (object.Object, error) {
|
||||||
|
@ -157,7 +162,9 @@ func (m *mockClient) objectSearch(context.Context, PrmObjectSearch) (ResObjectSe
|
||||||
|
|
||||||
func (m *mockClient) sessionCreate(context.Context, prmCreateSession) (resCreateSession, error) {
|
func (m *mockClient) sessionCreate(context.Context, prmCreateSession) (resCreateSession, error) {
|
||||||
if m.errorOnCreateSession {
|
if m.errorOnCreateSession {
|
||||||
return resCreateSession{}, m.handleError(errors.New("error"))
|
err := errors.New("create session")
|
||||||
|
m.updateErrorRate(err)
|
||||||
|
return resCreateSession{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tok := newToken(m.signer)
|
tok := newToken(m.signer)
|
||||||
|
|
72
pool/pool.go
72
pool/pool.go
|
@ -374,7 +374,8 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (acco
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.BalanceGet(ctx, cliPrm)
|
res, err := cl.BalanceGet(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodBalanceGet)
|
c.incRequests(time.Since(start), methodBalanceGet)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err)
|
return accounting.Decimal{}, fmt.Errorf("balance get on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -392,7 +393,8 @@ func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.ContainerPut(ctx, prm.prmClient)
|
res, err := cl.ContainerPut(ctx, prm.prmClient)
|
||||||
c.incRequests(time.Since(start), methodContainerPut)
|
c.incRequests(time.Since(start), methodContainerPut)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return cid.ID{}, fmt.Errorf("container put on client: %w", err)
|
return cid.ID{}, fmt.Errorf("container put on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -403,7 +405,8 @@ func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (
|
||||||
idCnr := res.ID()
|
idCnr := res.ID()
|
||||||
|
|
||||||
err = waitForContainerPresence(ctx, c, idCnr, &prm.waitParams)
|
err = waitForContainerPresence(ctx, c, idCnr, &prm.waitParams)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err)
|
return cid.ID{}, fmt.Errorf("wait container presence on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,7 +426,8 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.ContainerGet(ctx, cliPrm)
|
res, err := cl.ContainerGet(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodContainerGet)
|
c.incRequests(time.Since(start), methodContainerGet)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return container.Container{}, fmt.Errorf("container get on client: %w", err)
|
return container.Container{}, fmt.Errorf("container get on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,7 +447,8 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.ContainerList(ctx, cliPrm)
|
res, err := cl.ContainerList(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodContainerList)
|
c.incRequests(time.Since(start), methodContainerList)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return nil, fmt.Errorf("container list on client: %w", err)
|
return nil, fmt.Errorf("container list on client: %w", err)
|
||||||
}
|
}
|
||||||
return res.Containers(), nil
|
return res.Containers(), nil
|
||||||
|
@ -466,7 +471,8 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err = cl.ContainerDelete(ctx, cliPrm)
|
err = cl.ContainerDelete(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodContainerDelete)
|
c.incRequests(time.Since(start), methodContainerDelete)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("container delete on client: %w", err)
|
return fmt.Errorf("container delete on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -490,7 +496,8 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.ContainerEACL(ctx, cliPrm)
|
res, err := cl.ContainerEACL(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodContainerEACL)
|
c.incRequests(time.Since(start), methodContainerEACL)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return eacl.Table{}, fmt.Errorf("get eacl on client: %w", err)
|
return eacl.Table{}, fmt.Errorf("get eacl on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -515,7 +522,8 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err = cl.ContainerSetEACL(ctx, cliPrm)
|
err = cl.ContainerSetEACL(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodContainerSetEACL)
|
c.incRequests(time.Since(start), methodContainerSetEACL)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("set eacl on client: %w", err)
|
return fmt.Errorf("set eacl on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -529,7 +537,8 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
|
||||||
}
|
}
|
||||||
|
|
||||||
err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
|
err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("wait eacl presence on client: %w", err)
|
return fmt.Errorf("wait eacl presence on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -546,7 +555,8 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (ne
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
|
res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
|
||||||
c.incRequests(time.Since(start), methodEndpointInfo)
|
c.incRequests(time.Since(start), methodEndpointInfo)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err)
|
return netmap.NodeInfo{}, fmt.Errorf("endpoint info on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -563,7 +573,8 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (netm
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
|
res, err := cl.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
|
||||||
c.incRequests(time.Since(start), methodNetworkInfo)
|
c.incRequests(time.Since(start), methodNetworkInfo)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err)
|
return netmap.NetworkInfo{}, fmt.Errorf("network info on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -592,7 +603,8 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
wObj, err := cl.ObjectPutInit(ctx, cliPrm)
|
wObj, err := cl.ObjectPutInit(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodObjectPut)
|
c.incRequests(time.Since(start), methodObjectPut)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
return oid.ID{}, fmt.Errorf("init writing on API client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -636,13 +648,15 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (oid.ID
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
return oid.ID{}, fmt.Errorf("read payload: %w", c.handleError(err))
|
c.updateErrorRate(err)
|
||||||
|
return oid.ID{}, fmt.Errorf("read payload: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := wObj.Close()
|
res, err := wObj.Close()
|
||||||
if err = c.handleError(err); err != nil { // here err already carries both status and client errors
|
c.updateErrorRate(err)
|
||||||
|
if err != nil { // here err already carries both status and client errors
|
||||||
return oid.ID{}, fmt.Errorf("client failure: %w", err)
|
return oid.ID{}, fmt.Errorf("client failure: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -674,7 +688,8 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
_, err = cl.ObjectDelete(ctx, cliPrm)
|
_, err = cl.ObjectDelete(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodObjectDelete)
|
c.incRequests(time.Since(start), methodObjectDelete)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return fmt.Errorf("delete object on client: %w", err)
|
return fmt.Errorf("delete object on client: %w", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -705,7 +720,8 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
|
||||||
var res ResGetObject
|
var res ResGetObject
|
||||||
|
|
||||||
rObj, err := cl.ObjectGetInit(ctx, cliPrm)
|
rObj, err := cl.ObjectGetInit(ctx, cliPrm)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err)
|
return ResGetObject{}, fmt.Errorf("init object reading on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -714,7 +730,7 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (ResGet
|
||||||
c.incRequests(time.Since(start), methodObjectGet)
|
c.incRequests(time.Since(start), methodObjectGet)
|
||||||
if !successReadHeader {
|
if !successReadHeader {
|
||||||
err = rObj.Close()
|
err = rObj.Close()
|
||||||
err = c.handleError(err)
|
c.updateErrorRate(err)
|
||||||
return res, fmt.Errorf("read header: %w", err)
|
return res, fmt.Errorf("read header: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -758,7 +774,8 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (obje
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.ObjectHead(ctx, cliPrm)
|
res, err := cl.ObjectHead(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodObjectHead)
|
c.incRequests(time.Since(start), methodObjectHead)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return obj, fmt.Errorf("read object header via client: %w", err)
|
return obj, fmt.Errorf("read object header via client: %w", err)
|
||||||
}
|
}
|
||||||
if !res.ReadHeader(&obj) {
|
if !res.ReadHeader(&obj) {
|
||||||
|
@ -794,7 +811,8 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (Re
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.ObjectRangeInit(ctx, cliPrm)
|
res, err := cl.ObjectRangeInit(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodObjectRange)
|
c.incRequests(time.Since(start), methodObjectRange)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err)
|
return ResObjectRange{}, fmt.Errorf("init payload range reading on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,7 +849,8 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := cl.ObjectSearchInit(ctx, cliPrm)
|
res, err := cl.ObjectSearchInit(ctx, cliPrm)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err)
|
return ResObjectSearch{}, fmt.Errorf("init object searching on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -852,7 +871,8 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
res, err := cl.SessionCreate(ctx, cliPrm)
|
res, err := cl.SessionCreate(ctx, cliPrm)
|
||||||
c.incRequests(time.Since(start), methodSessionCreate)
|
c.incRequests(time.Since(start), methodSessionCreate)
|
||||||
if err = c.handleError(err); err != nil {
|
c.updateErrorRate(err)
|
||||||
|
if err != nil {
|
||||||
return resCreateSession{}, fmt.Errorf("session creation on client: %w", err)
|
return resCreateSession{}, fmt.Errorf("session creation on client: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -922,9 +942,9 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) handleError(err error) error {
|
func (c *clientStatusMonitor) updateErrorRate(err error) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// count only this API errors
|
// count only this API errors
|
||||||
|
@ -933,12 +953,12 @@ func (c *clientStatusMonitor) handleError(err error) error {
|
||||||
errors.Is(err, apistatus.ErrSignatureVerification) ||
|
errors.Is(err, apistatus.ErrSignatureVerification) ||
|
||||||
errors.Is(err, apistatus.ErrNodeUnderMaintenance) {
|
errors.Is(err, apistatus.ErrNodeUnderMaintenance) {
|
||||||
c.incErrorRate()
|
c.incErrorRate()
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// don't count another API errors
|
// don't count another API errors
|
||||||
if errors.Is(err, apistatus.Error) {
|
if errors.Is(err, apistatus.Error) {
|
||||||
return err
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// non-status logic error that could be returned
|
// non-status logic error that could be returned
|
||||||
|
@ -948,8 +968,6 @@ func (c *clientStatusMonitor) handleError(err error) error {
|
||||||
if !errors.As(err, &siErr) {
|
if !errors.As(err, &siErr) {
|
||||||
c.incErrorRate()
|
c.incErrorRate()
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// clientBuilder is a type alias of client constructors which open connection
|
// clientBuilder is a type alias of client constructors which open connection
|
||||||
|
|
|
@ -578,11 +578,11 @@ func TestHandleError(t *testing.T) {
|
||||||
} {
|
} {
|
||||||
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
t.Run(strconv.Itoa(i), func(t *testing.T) {
|
||||||
errCount := monitor.currentErrorRate()
|
errCount := monitor.currentErrorRate()
|
||||||
err := monitor.handleError(tc.err)
|
monitor.updateErrorRate(tc.err)
|
||||||
if tc.expectedError {
|
if tc.expectedError {
|
||||||
require.Error(t, err)
|
require.Error(t, tc.err)
|
||||||
} else {
|
} else {
|
||||||
require.NoError(t, err)
|
require.NoError(t, tc.err)
|
||||||
}
|
}
|
||||||
if tc.countError {
|
if tc.countError {
|
||||||
errCount++
|
errCount++
|
||||||
|
|
Loading…
Reference in a new issue