diff --git a/pool/pool.go b/pool/pool.go index e87cabb2..4d3cbf09 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -107,7 +107,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy") type clientStatusMonitor struct { logger *zap.Logger addr string - healthy *atomic.Bool + healthy *atomic.Uint32 errorThreshold uint32 mu sync.RWMutex // protect counters @@ -116,6 +116,22 @@ type clientStatusMonitor struct { methods []*methodStatus } +// values for healthy status of clientStatusMonitor. +const ( + // statusUnhealthyOnDial is set when dialing to the endpoint is failed, + // so there is no connection to the endpoint, and pool should not close it + // before re-establishing connection once again. + statusUnhealthyOnDial = iota + + // statusUnhealthyOnRequest is set when communication after dialing to the + // endpoint is failed due to immediate or accumulated errors, connection is + // available and pool should close it before re-establishing connection once again. + statusUnhealthyOnRequest + + // statusHealthy is set when connection is ready to be used by the pool. + statusHealthy +) + // methodStatus provide statistic for specific method. type methodStatus struct { name string @@ -197,8 +213,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint methods[i] = &methodStatus{name: i.String()} } - healthy := new(atomic.Bool) - healthy.Store(true) + healthy := new(atomic.Uint32) + healthy.Store(statusHealthy) return clientStatusMonitor{ logger: logger, @@ -324,7 +340,7 @@ func (c *clientWrapper) dial(ctx context.Context) error { prmDial.SetGRPCDialOptions(c.prm.dialOptions...) if err = cl.Dial(ctx, prmDial); err != nil { - c.setUnhealthy() + c.setUnhealthyOnDial() return err } @@ -341,6 +357,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change wasHealthy = true } + // if connection is dialed before, to avoid routine / connection leak, + // pool has to close it and then initialize once again. + if c.isDialed() { + _ = c.close() + } + var cl sdkClient.Client var prmInit sdkClient.PrmInit prmInit.SetDefaultPrivateKey(c.prm.key) @@ -355,7 +377,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change prmDial.SetGRPCDialOptions(c.prm.dialOptions...) if err := cl.Dial(ctx, prmDial); err != nil { - c.setUnhealthy() + c.setUnhealthyOnDial() return false, wasHealthy } @@ -1008,15 +1030,23 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession) } func (c *clientStatusMonitor) isHealthy() bool { - return c.healthy.Load() + return c.healthy.Load() == statusHealthy +} + +func (c *clientStatusMonitor) isDialed() bool { + return c.healthy.Load() != statusUnhealthyOnDial } func (c *clientStatusMonitor) setHealthy() { - c.healthy.Store(true) + c.healthy.Store(statusHealthy) } func (c *clientStatusMonitor) setUnhealthy() { - c.healthy.Store(false) + c.healthy.Store(statusUnhealthyOnRequest) +} + +func (c *clientStatusMonitor) setUnhealthyOnDial() { + c.healthy.Store(statusUnhealthyOnDial) } func (c *clientStatusMonitor) address() string {