From ddbfb758c978fbeb9d0bb359ee7e11ca94b53959 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Tue, 3 Oct 2023 13:57:58 +0300 Subject: [PATCH] [#171] pool: Use dial status to close connections during restarts Every client restart, pool creates new client instance. If client failed due to dial error, there was no prior connection and go routine on a server side. If client failed due to communication or business logic errors, then server side maintains connection and client should close it to avoid routine and connection leak. Dialing is a part of healthcheck, so health status is now a enum of three values: - unhealthy due to dial fail, - unhealthy due to transmission fail, - healthy. Signed-off-by: Alex Vanin --- pool/pool.go | 46 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/pool/pool.go b/pool/pool.go index e87cabb2..4d3cbf09 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -107,7 +107,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy") type clientStatusMonitor struct { logger *zap.Logger addr string - healthy *atomic.Bool + healthy *atomic.Uint32 errorThreshold uint32 mu sync.RWMutex // protect counters @@ -116,6 +116,22 @@ type clientStatusMonitor struct { methods []*methodStatus } +// values for healthy status of clientStatusMonitor. +const ( + // statusUnhealthyOnDial is set when dialing to the endpoint is failed, + // so there is no connection to the endpoint, and pool should not close it + // before re-establishing connection once again. + statusUnhealthyOnDial = iota + + // statusUnhealthyOnRequest is set when communication after dialing to the + // endpoint is failed due to immediate or accumulated errors, connection is + // available and pool should close it before re-establishing connection once again. + statusUnhealthyOnRequest + + // statusHealthy is set when connection is ready to be used by the pool. + statusHealthy +) + // methodStatus provide statistic for specific method. type methodStatus struct { name string @@ -197,8 +213,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint methods[i] = &methodStatus{name: i.String()} } - healthy := new(atomic.Bool) - healthy.Store(true) + healthy := new(atomic.Uint32) + healthy.Store(statusHealthy) return clientStatusMonitor{ logger: logger, @@ -324,7 +340,7 @@ func (c *clientWrapper) dial(ctx context.Context) error { prmDial.SetGRPCDialOptions(c.prm.dialOptions...) if err = cl.Dial(ctx, prmDial); err != nil { - c.setUnhealthy() + c.setUnhealthyOnDial() return err } @@ -341,6 +357,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change wasHealthy = true } + // if connection is dialed before, to avoid routine / connection leak, + // pool has to close it and then initialize once again. + if c.isDialed() { + _ = c.close() + } + var cl sdkClient.Client var prmInit sdkClient.PrmInit prmInit.SetDefaultPrivateKey(c.prm.key) @@ -355,7 +377,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change prmDial.SetGRPCDialOptions(c.prm.dialOptions...) if err := cl.Dial(ctx, prmDial); err != nil { - c.setUnhealthy() + c.setUnhealthyOnDial() return false, wasHealthy } @@ -1008,15 +1030,23 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) } func (c *clientStatusMonitor) isHealthy() bool { - return c.healthy.Load() + return c.healthy.Load() == statusHealthy +} + +func (c *clientStatusMonitor) isDialed() bool { + return c.healthy.Load() != statusUnhealthyOnDial } func (c *clientStatusMonitor) setHealthy() { - c.healthy.Store(true) + c.healthy.Store(statusHealthy) } func (c *clientStatusMonitor) setUnhealthy() { - c.healthy.Store(false) + c.healthy.Store(statusUnhealthyOnRequest) +} + +func (c *clientStatusMonitor) setUnhealthyOnDial() { + c.healthy.Store(statusUnhealthyOnDial) } func (c *clientStatusMonitor) address() string {