[#283] pool: Add counter for errors

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-07-12 21:55:59 +03:00 committed by fyrchik
parent 7d10b432d1
commit e6cb5f2ee1
2 changed files with 139 additions and 79 deletions

1
go.mod
View file

@ -14,5 +14,6 @@ require (
github.com/nspcc-dev/neofs-contract v0.15.1
github.com/nspcc-dev/tzhash v1.6.1
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.18.1
)

View file

@ -18,6 +18,7 @@ import (
"github.com/nspcc-dev/neofs-sdk-go/accounting"
"github.com/nspcc-dev/neofs-sdk-go/bearer"
sdkClient "github.com/nspcc-dev/neofs-sdk-go/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
@ -27,6 +28,7 @@ import (
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -48,12 +50,21 @@ type client interface {
objectRange(context.Context, PrmObjectRange) (*ResObjectRange, error)
objectSearch(context.Context, PrmObjectSearch) (*ResObjectSearch, error)
sessionCreate(context.Context, prmCreateSession) (*resCreateSession, error)
isHealthy() bool
setHealthy(bool) bool
address() string
errorRate() uint32
resetErrorCounter()
}
// clientWrapper is used by default, alternative implementations are intended for testing purposes only.
type clientWrapper struct {
client sdkClient.Client
key ecdsa.PrivateKey
client sdkClient.Client
key ecdsa.PrivateKey
addr string
healthy *atomic.Bool
errorCount *atomic.Uint32
}
type wrapperPrm struct {
@ -81,11 +92,16 @@ func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo)
func newWrapper(prm wrapperPrm) (*clientWrapper, error) {
var prmInit sdkClient.PrmInit
prmInit.ResolveNeoFSFailures()
//prmInit.ResolveNeoFSFailures()
prmInit.SetDefaultPrivateKey(prm.key)
prmInit.SetResponseInfoCallback(prm.responseInfoCallback)
res := &clientWrapper{key: prm.key}
res := &clientWrapper{
addr: prm.address,
key: prm.key,
healthy: atomic.NewBool(true),
errorCount: atomic.NewUint32(0),
}
res.client.Init(prmInit)
@ -106,23 +122,28 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc
cliPrm.SetAccount(prm.account)
res, err := c.client.BalanceGet(ctx, cliPrm)
if err != nil {
return nil, err
if err = c.handleError(res.Status(), err); err != nil {
return nil, fmt.Errorf("balance get on client: %w", err)
}
return res.Amount(), nil
}
func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) {
res, err := c.client.ContainerPut(ctx, prm.prmClient)
if err != nil {
return nil, err
if err = c.handleError(res.Status(), err); err != nil {
return nil, fmt.Errorf("container put on client: %w", err)
}
if !prm.waitParamsSet {
prm.waitParams.setDefaults()
}
return res.ID(), waitForContainerPresence(ctx, c, res.ID(), &prm.waitParams)
err = waitForContainerPresence(ctx, c, res.ID(), &prm.waitParams)
if err = c.handleError(nil, err); err != nil {
return nil, fmt.Errorf("wait container presence on client: %w", err)
}
return res.ID(), nil
}
func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (*container.Container, error) {
@ -130,8 +151,8 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (
cliPrm.SetContainer(prm.cnrID)
res, err := c.client.ContainerGet(ctx, cliPrm)
if err != nil {
return nil, err
if err = c.handleError(res.Status(), err); err != nil {
return nil, fmt.Errorf("container get on client: %w", err)
}
cnr := res.Container()
@ -144,8 +165,8 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList)
cliPrm.SetAccount(prm.ownerID)
res, err := c.client.ContainerList(ctx, cliPrm)
if err != nil {
return nil, err
if err = c.handleError(res.Status(), err); err != nil {
return nil, fmt.Errorf("container list on client: %w", err)
}
return res.Containers(), nil
}
@ -157,8 +178,9 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel
cliPrm.WithinSession(prm.stoken)
}
if _, err := c.client.ContainerDelete(ctx, cliPrm); err != nil {
return err
res, err := c.client.ContainerDelete(ctx, cliPrm)
if err = c.handleError(res.Status(), err); err != nil {
return fmt.Errorf("container delete on client: %w", err)
}
if !prm.waitParamsSet {
@ -173,8 +195,8 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL)
cliPrm.SetContainer(prm.cnrID)
res, err := c.client.ContainerEACL(ctx, cliPrm)
if err != nil {
return nil, err
if err = c.handleError(res.Status(), err); err != nil {
return nil, fmt.Errorf("get eacl on client: %w", err)
}
return res.Table(), nil
}
@ -187,8 +209,9 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
cliPrm.WithinSession(prm.session)
}
if _, err := c.client.ContainerSetEACL(ctx, cliPrm); err != nil {
return err
res, err := c.client.ContainerSetEACL(ctx, cliPrm)
if err = c.handleError(res.Status(), err); err != nil {
return fmt.Errorf("set eacl on client: %w", err)
}
if !prm.waitParamsSet {
@ -200,21 +223,26 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe
cIDp = &cID
}
return waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams)
if err = c.handleError(nil, err); err != nil {
return fmt.Errorf("wait eacl presence on client: %w", err)
}
return nil
}
func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) {
res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
if err != nil {
return nil, err
if err = c.handleError(res.Status(), err); err != nil {
return nil, fmt.Errorf("endpoint info on client: %w", err)
}
return res.NodeInfo(), nil
}
func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) {
res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{})
if err != nil {
return nil, err
if err = c.handleError(res.Status(), err); err != nil {
return nil, fmt.Errorf("network info on client: %w", err)
}
return res.Info(), nil
}
@ -222,7 +250,7 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*net
func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) {
var cliPrm sdkClient.PrmObjectPutInit
wObj, err := c.client.ObjectPutInit(ctx, cliPrm)
if err != nil {
if err = c.handleError(nil, err); err != nil {
return nil, fmt.Errorf("init writing on API client: %w", err)
}
@ -274,13 +302,13 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I
break
}
return nil, fmt.Errorf("read payload: %w", err)
return nil, fmt.Errorf("read payload: %w", c.handleError(nil, err))
}
}
}
res, err := wObj.Close()
if err != nil { // here err already carries both status and client errors
if err = c.handleError(res.Status(), err); err != nil { // here err already carries both status and client errors
return nil, fmt.Errorf("client failure: %w", err)
}
@ -309,8 +337,11 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e
if prm.key != nil {
cliPrm.UseKey(*prm.key)
}
_, err := c.client.ObjectDelete(ctx, cliPrm)
return err
res, err := c.client.ObjectDelete(ctx, cliPrm)
if err = c.handleError(res.Status(), err); err != nil {
return fmt.Errorf("delete object on client: %w", err)
}
return nil
}
func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) {
@ -329,7 +360,7 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe
var res ResGetObject
rObj, err := c.client.ObjectGetInit(ctx, cliPrm)
if err != nil {
if err = c.handleError(nil, err); err != nil {
return nil, fmt.Errorf("init object reading on client: %w", err)
}
@ -338,7 +369,8 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe
}
if !rObj.ReadHeader(&res.Header) {
_, err = rObj.Close()
rObjRes, err := rObj.Close()
err = c.handleError(rObjRes.Status(), err)
return nil, fmt.Errorf("read header: %w", err)
}
@ -367,7 +399,7 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*obj
var obj object.Object
res, err := c.client.ObjectHead(ctx, cliPrm)
if err != nil {
if err = c.handleError(res.Status(), err); err != nil {
return nil, fmt.Errorf("read object header via client: %w", err)
}
if !res.ReadHeader(&obj) {
@ -393,7 +425,7 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R
}
res, err := c.client.ObjectRangeInit(ctx, cliPrm)
if err != nil {
if err = c.handleError(nil, err); err != nil {
return nil, fmt.Errorf("init payload range reading on client: %w", err)
}
if prm.key != nil {
@ -418,7 +450,7 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) (
}
res, err := c.client.ObjectSearchInit(ctx, cliPrm)
if err != nil {
if err = c.handleError(nil, err); err != nil {
return nil, fmt.Errorf("init object searching on client: %w", err)
}
if prm.key != nil {
@ -434,7 +466,7 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
cliPrm.UseKey(prm.key)
res, err := c.client.SessionCreate(ctx, cliPrm)
if err != nil {
if err = c.handleError(res.Status(), err); err != nil {
return nil, fmt.Errorf("session creation on client: %w", err)
}
@ -444,6 +476,43 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
}, nil
}
func (c *clientWrapper) isHealthy() bool {
return c.healthy.Load()
}
func (c *clientWrapper) setHealthy(val bool) bool {
return c.healthy.Swap(val) != val
}
func (c *clientWrapper) address() string {
return c.addr
}
func (c *clientWrapper) errorRate() uint32 {
return c.errorCount.Load()
}
func (c *clientWrapper) resetErrorCounter() {
c.errorCount.Store(0)
}
func (c *clientWrapper) handleError(st apistatus.Status, err error) error {
if err != nil {
c.errorCount.Inc()
return err
}
err = apistatus.ErrFromStatus(st)
switch err.(type) {
case apistatus.ServerInternal,
apistatus.WrongMagicNumber,
apistatus.SignatureVerification:
c.errorCount.Inc()
}
return err
}
// InitParameters contains values used to initialize connection Pool.
type InitParameters struct {
key *ecdsa.PrivateKey
@ -570,12 +639,6 @@ func (x *WaitParams) checkForPositive() {
}
}
type clientPack struct {
client client
healthy bool
address string
}
type prmContext struct {
defaultSession bool
verb session.ObjectVerb
@ -926,9 +989,9 @@ type Pool struct {
}
type innerPool struct {
lock sync.RWMutex
sampler *sampler
clientPacks []*clientPack
lock sync.RWMutex
sampler *sampler
clients []client
}
const (
@ -986,7 +1049,7 @@ func (p *Pool) Dial(ctx context.Context) error {
var atLeastOneHealthy bool
for i, params := range p.rebalanceParams.nodesParams {
clientPacks := make([]*clientPack, len(params.weights))
clients := make([]client, len(params.weights))
for j, addr := range params.addresses {
c, err := p.clientBuilder(addr)
if err != nil {
@ -1003,14 +1066,15 @@ func (p *Pool) Dial(ctx context.Context) error {
healthy, atLeastOneHealthy = true, true
_ = p.cache.Put(formCacheKey(addr, p.key), st)
}
clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr}
c.setHealthy(healthy)
clients[j] = c
}
source := rand.NewSource(time.Now().UnixNano())
sampl := newSampler(params.weights, source)
inner[i] = &innerPool{
sampler: sampl,
clientPacks: clientPacks,
sampler: sampl,
clients: clients,
}
}
@ -1124,12 +1188,12 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
pool := p.innerPools[i]
options := p.rebalanceParams
healthyChanged := false
healthyChanged := atomic.NewBool(false)
wg := sync.WaitGroup{}
var prmEndpoint prmEndpointInfo
for j, cPack := range pool.clientPacks {
for j, cli := range pool.clients {
wg.Add(1)
go func(j int, cli client) {
defer wg.Done()
@ -1137,31 +1201,26 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
defer c()
// TODO (@kirillovdenis) : #283 consider reconnect to the node on failure
if _, err := cli.endpointInfo(tctx, prmEndpoint); err != nil {
ok = false
bufferWeights[j] = 0
}
pool.lock.RLock()
cp := *pool.clientPacks[j]
pool.lock.RUnlock()
if ok {
bufferWeights[j] = options.nodesParams[i].weights[j]
} else {
p.cache.DeleteByPrefix(cp.address)
p.cache.DeleteByPrefix(cli.address())
}
pool.lock.Lock()
if pool.clientPacks[j].healthy != ok {
pool.clientPacks[j].healthy = ok
healthyChanged = true
if cli.setHealthy(ok) {
healthyChanged.Store(true)
}
pool.lock.Unlock()
}(j, cPack.client)
}(j, cli)
}
wg.Wait()
if healthyChanged {
if healthyChanged.Load() {
probabilities := adjustWeights(bufferWeights)
source := rand.NewSource(time.Now().UnixNano())
pool.lock.Lock()
@ -1185,7 +1244,7 @@ func adjustWeights(weights []float64) []float64 {
return adjusted
}
func (p *Pool) connection() (*clientPack, error) {
func (p *Pool) connection() (client, error) {
for _, inner := range p.innerPools {
cp, err := inner.connection()
if err == nil {
@ -1196,20 +1255,20 @@ func (p *Pool) connection() (*clientPack, error) {
return nil, errors.New("no healthy client")
}
func (p *innerPool) connection() (*clientPack, error) {
p.lock.RLock()
func (p *innerPool) connection() (client, error) {
p.lock.RLock() // TODO(@kirillovdenis): #283 consider remove this lock because using client should be thread safe
defer p.lock.RUnlock()
if len(p.clientPacks) == 1 {
cp := p.clientPacks[0]
if cp.healthy {
if len(p.clients) == 1 {
cp := p.clients[0]
if cp.isHealthy() {
return cp, nil
}
return nil, errors.New("no healthy client")
}
attempts := 3 * len(p.clientPacks)
attempts := 3 * len(p.clients)
for k := 0; k < attempts; k++ {
i := p.sampler.Next()
if cp := p.clientPacks[i]; cp.healthy {
if cp := p.clients[i]; cp.isHealthy() {
return cp, nil
}
}
@ -1312,8 +1371,8 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex
ctx.key = p.key
}
ctx.endpoint = cp.address
ctx.client = cp.client
ctx.endpoint = cp.address()
ctx.client = cp
if ctx.sessionTarget != nil && cfg.stoken != nil {
ctx.sessionTarget(*cfg.stoken)
@ -1656,7 +1715,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID,
return nil, err
}
return cp.client.containerPut(ctx, prm)
return cp.containerPut(ctx, prm)
}
// GetContainer reads NeoFS container by ID.
@ -1666,7 +1725,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*containe
return nil, err
}
return cp.client.containerGet(ctx, prm)
return cp.containerGet(ctx, prm)
}
// ListContainers requests identifiers of the account-owned containers.
@ -1676,7 +1735,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid.
return nil, err
}
return cp.client.containerList(ctx, prm)
return cp.containerList(ctx, prm)
}
// DeleteContainer sends request to remove the NeoFS container and waits for the operation to complete.
@ -1692,7 +1751,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro
return err
}
return cp.client.containerDelete(ctx, prm)
return cp.containerDelete(ctx, prm)
}
// GetEACL reads eACL table of the NeoFS container.
@ -1702,7 +1761,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table,
return nil, err
}
return cp.client.containerEACL(ctx, prm)
return cp.containerEACL(ctx, prm)
}
// SetEACL sends request to update eACL table of the NeoFS container and waits for the operation to complete.
@ -1718,7 +1777,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error {
return err
}
return cp.client.containerSetEACL(ctx, prm)
return cp.containerSetEACL(ctx, prm)
}
// Balance requests current balance of the NeoFS account.
@ -1728,7 +1787,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci
return nil, err
}
return cp.client.balanceGet(ctx, prm)
return cp.balanceGet(ctx, prm)
}
// waitForContainerPresence waits until the container is found on the NeoFS network.
@ -1803,7 +1862,7 @@ func (p *Pool) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) {
return nil, err
}
return cp.client.networkInfo(ctx, prmNetworkInfo{})
return cp.networkInfo(ctx, prmNetworkInfo{})
}
// Close closes the Pool and releases all the associated resources.