Fix connection leak and panic at close operation #171
2 changed files with 81 additions and 9 deletions
52
pool/pool.go
52
pool/pool.go
|
@ -86,6 +86,8 @@ type client interface {
|
||||||
type clientStatus interface {
|
type clientStatus interface {
|
||||||
// isHealthy checks if the connection can handle requests.
|
// isHealthy checks if the connection can handle requests.
|
||||||
isHealthy() bool
|
isHealthy() bool
|
||||||
|
// isDialed checks if the connection was created.
|
||||||
|
isDialed() bool
|
||||||
// setUnhealthy marks client as unhealthy.
|
// setUnhealthy marks client as unhealthy.
|
||||||
setUnhealthy()
|
setUnhealthy()
|
||||||
// address return address of endpoint.
|
// address return address of endpoint.
|
||||||
|
@ -107,7 +109,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
|
||||||
type clientStatusMonitor struct {
|
type clientStatusMonitor struct {
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
addr string
|
addr string
|
||||||
healthy *atomic.Bool
|
healthy *atomic.Uint32
|
||||||
errorThreshold uint32
|
errorThreshold uint32
|
||||||
|
|
||||||
mu sync.RWMutex // protect counters
|
mu sync.RWMutex // protect counters
|
||||||
|
@ -116,6 +118,22 @@ type clientStatusMonitor struct {
|
||||||
methods []*methodStatus
|
methods []*methodStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// values for healthy status of clientStatusMonitor.
|
||||||
|
const (
|
||||||
|
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
|
||||||
|
// so there is no connection to the endpoint, and pool should not close it
|
||||||
|
// before re-establishing connection once again.
|
||||||
|
statusUnhealthyOnDial = iota
|
||||||
|
|
||||||
|
// statusUnhealthyOnRequest is set when communication after dialing to the
|
||||||
|
// endpoint is failed due to immediate or accumulated errors, connection is
|
||||||
|
// available and pool should close it before re-establishing connection once again.
|
||||||
|
statusUnhealthyOnRequest
|
||||||
|
|
||||||
|
// statusHealthy is set when connection is ready to be used by the pool.
|
||||||
|
statusHealthy
|
||||||
|
)
|
||||||
|
|
||||||
// methodStatus provide statistic for specific method.
|
// methodStatus provide statistic for specific method.
|
||||||
type methodStatus struct {
|
type methodStatus struct {
|
||||||
name string
|
name string
|
||||||
|
@ -197,8 +215,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
|
||||||
methods[i] = &methodStatus{name: i.String()}
|
methods[i] = &methodStatus{name: i.String()}
|
||||||
}
|
}
|
||||||
|
|
||||||
healthy := new(atomic.Bool)
|
healthy := new(atomic.Uint32)
|
||||||
healthy.Store(true)
|
healthy.Store(statusHealthy)
|
||||||
|
|
||||||
return clientStatusMonitor{
|
return clientStatusMonitor{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
@ -324,7 +342,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
|
||||||
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
|
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
|
||||||
|
|
||||||
if err = cl.Dial(ctx, prmDial); err != nil {
|
if err = cl.Dial(ctx, prmDial); err != nil {
|
||||||
c.setUnhealthy()
|
c.setUnhealthyOnDial()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -341,6 +359,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
|
||||||
wasHealthy = true
|
wasHealthy = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if connection is dialed before, to avoid routine / connection leak,
|
||||||
|
// pool has to close it and then initialize once again.
|
||||||
|
if c.isDialed() {
|
||||||
|
_ = c.close()
|
||||||
|
}
|
||||||
|
|
||||||
var cl sdkClient.Client
|
var cl sdkClient.Client
|
||||||
var prmInit sdkClient.PrmInit
|
var prmInit sdkClient.PrmInit
|
||||||
prmInit.SetDefaultPrivateKey(c.prm.key)
|
prmInit.SetDefaultPrivateKey(c.prm.key)
|
||||||
|
@ -355,7 +379,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
|
||||||
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
|
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
|
||||||
|
|
||||||
if err := cl.Dial(ctx, prmDial); err != nil {
|
if err := cl.Dial(ctx, prmDial); err != nil {
|
||||||
c.setUnhealthy()
|
c.setUnhealthyOnDial()
|
||||||
return false, wasHealthy
|
return false, wasHealthy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1008,15 +1032,23 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) isHealthy() bool {
|
func (c *clientStatusMonitor) isHealthy() bool {
|
||||||
return c.healthy.Load()
|
return c.healthy.Load() == statusHealthy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientStatusMonitor) isDialed() bool {
|
||||||
|
return c.healthy.Load() != statusUnhealthyOnDial
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) setHealthy() {
|
func (c *clientStatusMonitor) setHealthy() {
|
||||||
c.healthy.Store(true)
|
c.healthy.Store(statusHealthy)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) setUnhealthy() {
|
func (c *clientStatusMonitor) setUnhealthy() {
|
||||||
c.healthy.Store(false)
|
c.healthy.Store(statusUnhealthyOnRequest)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *clientStatusMonitor) setUnhealthyOnDial() {
|
||||||
|
c.healthy.Store(statusUnhealthyOnDial)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientStatusMonitor) address() string {
|
func (c *clientStatusMonitor) address() string {
|
||||||
|
@ -2779,7 +2811,9 @@ func (p *Pool) Close() {
|
||||||
// close all clients
|
// close all clients
|
||||||
for _, pools := range p.innerPools {
|
for _, pools := range p.innerPools {
|
||||||
for _, cli := range pools.clients {
|
for _, cli := range pools.clients {
|
||||||
_ = cli.close()
|
if cli.isDialed() {
|
||||||
|
_ = cli.close()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -523,6 +523,44 @@ func TestStatusMonitor(t *testing.T) {
|
||||||
|
|
||||||
require.Equal(t, uint64(count), monitor.overallErrorRate())
|
require.Equal(t, uint64(count), monitor.overallErrorRate())
|
||||||
require.Equal(t, uint32(1), monitor.currentErrorRate())
|
require.Equal(t, uint32(1), monitor.currentErrorRate())
|
||||||
|
|
||||||
|
t.Run("healthy status", func(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
action func(*clientStatusMonitor)
|
||||||
|
status uint32
|
||||||
|
isDialed bool
|
||||||
|
isHealthy bool
|
||||||
|
description string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
|
||||||
|
status: statusUnhealthyOnDial,
|
||||||
|
isDialed: false,
|
||||||
|
isHealthy: false,
|
||||||
|
description: "set unhealthy on dial",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
|
||||||
|
status: statusUnhealthyOnRequest,
|
||||||
|
isDialed: true,
|
||||||
|
isHealthy: false,
|
||||||
|
description: "set unhealthy on request",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
action: func(m *clientStatusMonitor) { m.setHealthy() },
|
||||||
|
status: statusHealthy,
|
||||||
|
isDialed: true,
|
||||||
|
isHealthy: true,
|
||||||
|
description: "set healthy",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
tc.action(&monitor)
|
||||||
|
require.Equal(t, tc.status, monitor.healthy.Load())
|
||||||
|
require.Equal(t, tc.isDialed, monitor.isDialed())
|
||||||
|
require.Equal(t, tc.isHealthy, monitor.isHealthy())
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHandleError(t *testing.T) {
|
func TestHandleError(t *testing.T) {
|
||||||
|
|
Loading…
Add table
Reference in a new issue