From e6cb5f2ee171a56a52394d456bc46d9ab4e9aa27 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 12 Jul 2022 21:55:59 +0300 Subject: [PATCH] [#283] pool: Add counter for errors Signed-off-by: Denis Kirillov --- go.mod | 1 + pool/pool.go | 217 ++++++++++++++++++++++++++++++++------------------- 2 files changed, 139 insertions(+), 79 deletions(-) diff --git a/go.mod b/go.mod index 9e00e9a..b2890d0 100644 --- a/go.mod +++ b/go.mod @@ -14,5 +14,6 @@ require ( github.com/nspcc-dev/neofs-contract v0.15.1 github.com/nspcc-dev/tzhash v1.6.1 github.com/stretchr/testify v1.7.0 + go.uber.org/atomic v1.9.0 go.uber.org/zap v1.18.1 ) diff --git a/pool/pool.go b/pool/pool.go index 42caaf5..03aaccc 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -18,6 +18,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/accounting" "github.com/nspcc-dev/neofs-sdk-go/bearer" sdkClient "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" @@ -27,6 +28,7 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/user" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -48,12 +50,21 @@ type client interface { objectRange(context.Context, PrmObjectRange) (*ResObjectRange, error) objectSearch(context.Context, PrmObjectSearch) (*ResObjectSearch, error) sessionCreate(context.Context, prmCreateSession) (*resCreateSession, error) + + isHealthy() bool + setHealthy(bool) bool + address() string + errorRate() uint32 + resetErrorCounter() } // clientWrapper is used by default, alternative implementations are intended for testing purposes only. type clientWrapper struct { - client sdkClient.Client - key ecdsa.PrivateKey + client sdkClient.Client + key ecdsa.PrivateKey + addr string + healthy *atomic.Bool + errorCount *atomic.Uint32 } type wrapperPrm struct { @@ -81,11 +92,16 @@ func (x *wrapperPrm) setResponseInfoCallback(f func(sdkClient.ResponseMetaInfo) func newWrapper(prm wrapperPrm) (*clientWrapper, error) { var prmInit sdkClient.PrmInit - prmInit.ResolveNeoFSFailures() + //prmInit.ResolveNeoFSFailures() prmInit.SetDefaultPrivateKey(prm.key) prmInit.SetResponseInfoCallback(prm.responseInfoCallback) - res := &clientWrapper{key: prm.key} + res := &clientWrapper{ + addr: prm.address, + key: prm.key, + healthy: atomic.NewBool(true), + errorCount: atomic.NewUint32(0), + } res.client.Init(prmInit) @@ -106,23 +122,28 @@ func (c *clientWrapper) balanceGet(ctx context.Context, prm PrmBalanceGet) (*acc cliPrm.SetAccount(prm.account) res, err := c.client.BalanceGet(ctx, cliPrm) - if err != nil { - return nil, err + if err = c.handleError(res.Status(), err); err != nil { + return nil, fmt.Errorf("balance get on client: %w", err) } return res.Amount(), nil } func (c *clientWrapper) containerPut(ctx context.Context, prm PrmContainerPut) (*cid.ID, error) { res, err := c.client.ContainerPut(ctx, prm.prmClient) - if err != nil { - return nil, err + if err = c.handleError(res.Status(), err); err != nil { + return nil, fmt.Errorf("container put on client: %w", err) } if !prm.waitParamsSet { prm.waitParams.setDefaults() } - return res.ID(), waitForContainerPresence(ctx, c, res.ID(), &prm.waitParams) + err = waitForContainerPresence(ctx, c, res.ID(), &prm.waitParams) + if err = c.handleError(nil, err); err != nil { + return nil, fmt.Errorf("wait container presence on client: %w", err) + } + + return res.ID(), nil } func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) (*container.Container, error) { @@ -130,8 +151,8 @@ func (c *clientWrapper) containerGet(ctx context.Context, prm PrmContainerGet) ( cliPrm.SetContainer(prm.cnrID) res, err := c.client.ContainerGet(ctx, cliPrm) - if err != nil { - return nil, err + if err = c.handleError(res.Status(), err); err != nil { + return nil, fmt.Errorf("container get on client: %w", err) } cnr := res.Container() @@ -144,8 +165,8 @@ func (c *clientWrapper) containerList(ctx context.Context, prm PrmContainerList) cliPrm.SetAccount(prm.ownerID) res, err := c.client.ContainerList(ctx, cliPrm) - if err != nil { - return nil, err + if err = c.handleError(res.Status(), err); err != nil { + return nil, fmt.Errorf("container list on client: %w", err) } return res.Containers(), nil } @@ -157,8 +178,9 @@ func (c *clientWrapper) containerDelete(ctx context.Context, prm PrmContainerDel cliPrm.WithinSession(prm.stoken) } - if _, err := c.client.ContainerDelete(ctx, cliPrm); err != nil { - return err + res, err := c.client.ContainerDelete(ctx, cliPrm) + if err = c.handleError(res.Status(), err); err != nil { + return fmt.Errorf("container delete on client: %w", err) } if !prm.waitParamsSet { @@ -173,8 +195,8 @@ func (c *clientWrapper) containerEACL(ctx context.Context, prm PrmContainerEACL) cliPrm.SetContainer(prm.cnrID) res, err := c.client.ContainerEACL(ctx, cliPrm) - if err != nil { - return nil, err + if err = c.handleError(res.Status(), err); err != nil { + return nil, fmt.Errorf("get eacl on client: %w", err) } return res.Table(), nil } @@ -187,8 +209,9 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe cliPrm.WithinSession(prm.session) } - if _, err := c.client.ContainerSetEACL(ctx, cliPrm); err != nil { - return err + res, err := c.client.ContainerSetEACL(ctx, cliPrm) + if err = c.handleError(res.Status(), err); err != nil { + return fmt.Errorf("set eacl on client: %w", err) } if !prm.waitParamsSet { @@ -200,21 +223,26 @@ func (c *clientWrapper) containerSetEACL(ctx context.Context, prm PrmContainerSe cIDp = &cID } - return waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams) + err = waitForEACLPresence(ctx, c, cIDp, &prm.table, &prm.waitParams) + if err = c.handleError(nil, err); err != nil { + return fmt.Errorf("wait eacl presence on client: %w", err) + } + + return nil } func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (*netmap.NodeInfo, error) { res, err := c.client.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) - if err != nil { - return nil, err + if err = c.handleError(res.Status(), err); err != nil { + return nil, fmt.Errorf("endpoint info on client: %w", err) } return res.NodeInfo(), nil } func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*netmap.NetworkInfo, error) { res, err := c.client.NetworkInfo(ctx, sdkClient.PrmNetworkInfo{}) - if err != nil { - return nil, err + if err = c.handleError(res.Status(), err); err != nil { + return nil, fmt.Errorf("network info on client: %w", err) } return res.Info(), nil } @@ -222,7 +250,7 @@ func (c *clientWrapper) networkInfo(ctx context.Context, _ prmNetworkInfo) (*net func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.ID, error) { var cliPrm sdkClient.PrmObjectPutInit wObj, err := c.client.ObjectPutInit(ctx, cliPrm) - if err != nil { + if err = c.handleError(nil, err); err != nil { return nil, fmt.Errorf("init writing on API client: %w", err) } @@ -274,13 +302,13 @@ func (c *clientWrapper) objectPut(ctx context.Context, prm PrmObjectPut) (*oid.I break } - return nil, fmt.Errorf("read payload: %w", err) + return nil, fmt.Errorf("read payload: %w", c.handleError(nil, err)) } } } res, err := wObj.Close() - if err != nil { // here err already carries both status and client errors + if err = c.handleError(res.Status(), err); err != nil { // here err already carries both status and client errors return nil, fmt.Errorf("client failure: %w", err) } @@ -309,8 +337,11 @@ func (c *clientWrapper) objectDelete(ctx context.Context, prm PrmObjectDelete) e if prm.key != nil { cliPrm.UseKey(*prm.key) } - _, err := c.client.ObjectDelete(ctx, cliPrm) - return err + res, err := c.client.ObjectDelete(ctx, cliPrm) + if err = c.handleError(res.Status(), err); err != nil { + return fmt.Errorf("delete object on client: %w", err) + } + return nil } func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGetObject, error) { @@ -329,7 +360,7 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe var res ResGetObject rObj, err := c.client.ObjectGetInit(ctx, cliPrm) - if err != nil { + if err = c.handleError(nil, err); err != nil { return nil, fmt.Errorf("init object reading on client: %w", err) } @@ -338,7 +369,8 @@ func (c *clientWrapper) objectGet(ctx context.Context, prm PrmObjectGet) (*ResGe } if !rObj.ReadHeader(&res.Header) { - _, err = rObj.Close() + rObjRes, err := rObj.Close() + err = c.handleError(rObjRes.Status(), err) return nil, fmt.Errorf("read header: %w", err) } @@ -367,7 +399,7 @@ func (c *clientWrapper) objectHead(ctx context.Context, prm PrmObjectHead) (*obj var obj object.Object res, err := c.client.ObjectHead(ctx, cliPrm) - if err != nil { + if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("read object header via client: %w", err) } if !res.ReadHeader(&obj) { @@ -393,7 +425,7 @@ func (c *clientWrapper) objectRange(ctx context.Context, prm PrmObjectRange) (*R } res, err := c.client.ObjectRangeInit(ctx, cliPrm) - if err != nil { + if err = c.handleError(nil, err); err != nil { return nil, fmt.Errorf("init payload range reading on client: %w", err) } if prm.key != nil { @@ -418,7 +450,7 @@ func (c *clientWrapper) objectSearch(ctx context.Context, prm PrmObjectSearch) ( } res, err := c.client.ObjectSearchInit(ctx, cliPrm) - if err != nil { + if err = c.handleError(nil, err); err != nil { return nil, fmt.Errorf("init object searching on client: %w", err) } if prm.key != nil { @@ -434,7 +466,7 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) cliPrm.UseKey(prm.key) res, err := c.client.SessionCreate(ctx, cliPrm) - if err != nil { + if err = c.handleError(res.Status(), err); err != nil { return nil, fmt.Errorf("session creation on client: %w", err) } @@ -444,6 +476,43 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) }, nil } +func (c *clientWrapper) isHealthy() bool { + return c.healthy.Load() +} + +func (c *clientWrapper) setHealthy(val bool) bool { + return c.healthy.Swap(val) != val +} + +func (c *clientWrapper) address() string { + return c.addr +} + +func (c *clientWrapper) errorRate() uint32 { + return c.errorCount.Load() +} + +func (c *clientWrapper) resetErrorCounter() { + c.errorCount.Store(0) +} + +func (c *clientWrapper) handleError(st apistatus.Status, err error) error { + if err != nil { + c.errorCount.Inc() + return err + } + + err = apistatus.ErrFromStatus(st) + switch err.(type) { + case apistatus.ServerInternal, + apistatus.WrongMagicNumber, + apistatus.SignatureVerification: + c.errorCount.Inc() + } + + return err +} + // InitParameters contains values used to initialize connection Pool. type InitParameters struct { key *ecdsa.PrivateKey @@ -570,12 +639,6 @@ func (x *WaitParams) checkForPositive() { } } -type clientPack struct { - client client - healthy bool - address string -} - type prmContext struct { defaultSession bool verb session.ObjectVerb @@ -926,9 +989,9 @@ type Pool struct { } type innerPool struct { - lock sync.RWMutex - sampler *sampler - clientPacks []*clientPack + lock sync.RWMutex + sampler *sampler + clients []client } const ( @@ -986,7 +1049,7 @@ func (p *Pool) Dial(ctx context.Context) error { var atLeastOneHealthy bool for i, params := range p.rebalanceParams.nodesParams { - clientPacks := make([]*clientPack, len(params.weights)) + clients := make([]client, len(params.weights)) for j, addr := range params.addresses { c, err := p.clientBuilder(addr) if err != nil { @@ -1003,14 +1066,15 @@ func (p *Pool) Dial(ctx context.Context) error { healthy, atLeastOneHealthy = true, true _ = p.cache.Put(formCacheKey(addr, p.key), st) } - clientPacks[j] = &clientPack{client: c, healthy: healthy, address: addr} + c.setHealthy(healthy) + clients[j] = c } source := rand.NewSource(time.Now().UnixNano()) sampl := newSampler(params.weights, source) inner[i] = &innerPool{ - sampler: sampl, - clientPacks: clientPacks, + sampler: sampl, + clients: clients, } } @@ -1124,12 +1188,12 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights pool := p.innerPools[i] options := p.rebalanceParams - healthyChanged := false + healthyChanged := atomic.NewBool(false) wg := sync.WaitGroup{} var prmEndpoint prmEndpointInfo - for j, cPack := range pool.clientPacks { + for j, cli := range pool.clients { wg.Add(1) go func(j int, cli client) { defer wg.Done() @@ -1137,31 +1201,26 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) defer c() + // TODO (@kirillovdenis) : #283 consider reconnect to the node on failure if _, err := cli.endpointInfo(tctx, prmEndpoint); err != nil { ok = false bufferWeights[j] = 0 } - pool.lock.RLock() - cp := *pool.clientPacks[j] - pool.lock.RUnlock() if ok { bufferWeights[j] = options.nodesParams[i].weights[j] } else { - p.cache.DeleteByPrefix(cp.address) + p.cache.DeleteByPrefix(cli.address()) } - pool.lock.Lock() - if pool.clientPacks[j].healthy != ok { - pool.clientPacks[j].healthy = ok - healthyChanged = true + if cli.setHealthy(ok) { + healthyChanged.Store(true) } - pool.lock.Unlock() - }(j, cPack.client) + }(j, cli) } wg.Wait() - if healthyChanged { + if healthyChanged.Load() { probabilities := adjustWeights(bufferWeights) source := rand.NewSource(time.Now().UnixNano()) pool.lock.Lock() @@ -1185,7 +1244,7 @@ func adjustWeights(weights []float64) []float64 { return adjusted } -func (p *Pool) connection() (*clientPack, error) { +func (p *Pool) connection() (client, error) { for _, inner := range p.innerPools { cp, err := inner.connection() if err == nil { @@ -1196,20 +1255,20 @@ func (p *Pool) connection() (*clientPack, error) { return nil, errors.New("no healthy client") } -func (p *innerPool) connection() (*clientPack, error) { - p.lock.RLock() +func (p *innerPool) connection() (client, error) { + p.lock.RLock() // TODO(@kirillovdenis): #283 consider remove this lock because using client should be thread safe defer p.lock.RUnlock() - if len(p.clientPacks) == 1 { - cp := p.clientPacks[0] - if cp.healthy { + if len(p.clients) == 1 { + cp := p.clients[0] + if cp.isHealthy() { return cp, nil } return nil, errors.New("no healthy client") } - attempts := 3 * len(p.clientPacks) + attempts := 3 * len(p.clients) for k := 0; k < attempts; k++ { i := p.sampler.Next() - if cp := p.clientPacks[i]; cp.healthy { + if cp := p.clients[i]; cp.isHealthy() { return cp, nil } } @@ -1312,8 +1371,8 @@ func (p *Pool) initCallContext(ctx *callContext, cfg prmCommon, prmCtx prmContex ctx.key = p.key } - ctx.endpoint = cp.address - ctx.client = cp.client + ctx.endpoint = cp.address() + ctx.client = cp if ctx.sessionTarget != nil && cfg.stoken != nil { ctx.sessionTarget(*cfg.stoken) @@ -1656,7 +1715,7 @@ func (p *Pool) PutContainer(ctx context.Context, prm PrmContainerPut) (*cid.ID, return nil, err } - return cp.client.containerPut(ctx, prm) + return cp.containerPut(ctx, prm) } // GetContainer reads NeoFS container by ID. @@ -1666,7 +1725,7 @@ func (p *Pool) GetContainer(ctx context.Context, prm PrmContainerGet) (*containe return nil, err } - return cp.client.containerGet(ctx, prm) + return cp.containerGet(ctx, prm) } // ListContainers requests identifiers of the account-owned containers. @@ -1676,7 +1735,7 @@ func (p *Pool) ListContainers(ctx context.Context, prm PrmContainerList) ([]cid. return nil, err } - return cp.client.containerList(ctx, prm) + return cp.containerList(ctx, prm) } // DeleteContainer sends request to remove the NeoFS container and waits for the operation to complete. @@ -1692,7 +1751,7 @@ func (p *Pool) DeleteContainer(ctx context.Context, prm PrmContainerDelete) erro return err } - return cp.client.containerDelete(ctx, prm) + return cp.containerDelete(ctx, prm) } // GetEACL reads eACL table of the NeoFS container. @@ -1702,7 +1761,7 @@ func (p *Pool) GetEACL(ctx context.Context, prm PrmContainerEACL) (*eacl.Table, return nil, err } - return cp.client.containerEACL(ctx, prm) + return cp.containerEACL(ctx, prm) } // SetEACL sends request to update eACL table of the NeoFS container and waits for the operation to complete. @@ -1718,7 +1777,7 @@ func (p *Pool) SetEACL(ctx context.Context, prm PrmContainerSetEACL) error { return err } - return cp.client.containerSetEACL(ctx, prm) + return cp.containerSetEACL(ctx, prm) } // Balance requests current balance of the NeoFS account. @@ -1728,7 +1787,7 @@ func (p *Pool) Balance(ctx context.Context, prm PrmBalanceGet) (*accounting.Deci return nil, err } - return cp.client.balanceGet(ctx, prm) + return cp.balanceGet(ctx, prm) } // waitForContainerPresence waits until the container is found on the NeoFS network. @@ -1803,7 +1862,7 @@ func (p *Pool) NetworkInfo(ctx context.Context) (*netmap.NetworkInfo, error) { return nil, err } - return cp.client.networkInfo(ctx, prmNetworkInfo{}) + return cp.networkInfo(ctx, prmNetworkInfo{}) } // Close closes the Pool and releases all the associated resources.