Fix connection leak and panic at close operation #171
2 changed files with 81 additions and 9 deletions
50
pool/pool.go
50
pool/pool.go
|
@ -86,6 +86,8 @@ type client interface {
|
|||
type clientStatus interface {
|
||||
// isHealthy checks if the connection can handle requests.
|
||||
isHealthy() bool
|
||||
// isDialed checks if the connection was created.
|
||||
|
||||
isDialed() bool
|
||||
// setUnhealthy marks client as unhealthy.
|
||||
setUnhealthy()
|
||||
// address return address of endpoint.
|
||||
|
@ -107,7 +109,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
|
|||
type clientStatusMonitor struct {
|
||||
logger *zap.Logger
|
||||
addr string
|
||||
healthy *atomic.Bool
|
||||
healthy *atomic.Uint32
|
||||
errorThreshold uint32
|
||||
|
||||
mu sync.RWMutex // protect counters
|
||||
|
@ -116,6 +118,22 @@ type clientStatusMonitor struct {
|
|||
methods []*methodStatus
|
||||
}
|
||||
|
||||
// values for healthy status of clientStatusMonitor.
|
||||
const (
|
||||
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
|
||||
// so there is no connection to the endpoint, and pool should not close it
|
||||
// before re-establishing connection once again.
|
||||
statusUnhealthyOnDial = iota
|
||||
|
||||
// statusUnhealthyOnRequest is set when communication after dialing to the
|
||||
// endpoint is failed due to immediate or accumulated errors, connection is
|
||||
// available and pool should close it before re-establishing connection once again.
|
||||
statusUnhealthyOnRequest
|
||||
|
||||
// statusHealthy is set when connection is ready to be used by the pool.
|
||||
statusHealthy
|
||||
)
|
||||
|
||||
// methodStatus provide statistic for specific method.
|
||||
type methodStatus struct {
|
||||
name string
|
||||
|
@ -197,8 +215,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
|
|||
methods[i] = &methodStatus{name: i.String()}
|
||||
}
|
||||
|
||||
healthy := new(atomic.Bool)
|
||||
healthy.Store(true)
|
||||
healthy := new(atomic.Uint32)
|
||||
healthy.Store(statusHealthy)
|
||||
|
||||
return clientStatusMonitor{
|
||||
logger: logger,
|
||||
|
@ -324,7 +342,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
|
|||
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
|
||||
|
||||
if err = cl.Dial(ctx, prmDial); err != nil {
|
||||
c.setUnhealthy()
|
||||
c.setUnhealthyOnDial()
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -341,6 +359,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
|
|||
wasHealthy = true
|
||||
}
|
||||
|
||||
// if connection is dialed before, to avoid routine / connection leak,
|
||||
// pool has to close it and then initialize once again.
|
||||
if c.isDialed() {
|
||||
_ = c.close()
|
||||
}
|
||||
|
||||
var cl sdkClient.Client
|
||||
var prmInit sdkClient.PrmInit
|
||||
prmInit.SetDefaultPrivateKey(c.prm.key)
|
||||
|
@ -355,7 +379,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
|
|||
prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
|
||||
|
||||
if err := cl.Dial(ctx, prmDial); err != nil {
|
||||
c.setUnhealthy()
|
||||
c.setUnhealthyOnDial()
|
||||
return false, wasHealthy
|
||||
}
|
||||
|
||||
|
@ -1008,15 +1032,23 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
|
|||
}
|
||||
|
||||
func (c *clientStatusMonitor) isHealthy() bool {
|
||||
return c.healthy.Load()
|
||||
return c.healthy.Load() == statusHealthy
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) isDialed() bool {
|
||||
return c.healthy.Load() != statusUnhealthyOnDial
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) setHealthy() {
|
||||
c.healthy.Store(true)
|
||||
c.healthy.Store(statusHealthy)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) setUnhealthy() {
|
||||
c.healthy.Store(false)
|
||||
c.healthy.Store(statusUnhealthyOnRequest)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) setUnhealthyOnDial() {
|
||||
c.healthy.Store(statusUnhealthyOnDial)
|
||||
}
|
||||
|
||||
func (c *clientStatusMonitor) address() string {
|
||||
|
@ -2779,10 +2811,12 @@ func (p *Pool) Close() {
|
|||
// close all clients
|
||||
for _, pools := range p.innerPools {
|
||||
for _, cli := range pools.clients {
|
||||
if cli.isDialed() {
|
||||
fyrchik
commented
Wouldn't it be better from the client POV to just allow closing even undialed clients? Wouldn't it be better from the client POV to just allow closing even undialed clients?
alexvanin
commented
It would, but simple solution down below didn't work, so I suppose changes should be done on api-go level as well.
Function comment clearly tells us It would, but simple solution down below didn't work, so I suppose changes should be done on api-go level as well.
```go
func (c *Client) Close() error {
if c.c.Conn() != nil {
return c.c.Conn().Close()
}
return nil
```
Function comment clearly tells us `MUST NOT be called before successful Dial`. So invoker (pool) should check dial state, which it does now.
|
||||
_ = cli.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SyncContainerWithNetwork applies network configuration received via
|
||||
// the Pool to the container. Changes the container if it does not satisfy
|
||||
|
|
|
@ -523,6 +523,44 @@ func TestStatusMonitor(t *testing.T) {
|
|||
|
||||
require.Equal(t, uint64(count), monitor.overallErrorRate())
|
||||
require.Equal(t, uint32(1), monitor.currentErrorRate())
|
||||
|
||||
t.Run("healthy status", func(t *testing.T) {
|
||||
cases := []struct {
|
||||
action func(*clientStatusMonitor)
|
||||
status uint32
|
||||
isDialed bool
|
||||
isHealthy bool
|
||||
description string
|
||||
}{
|
||||
{
|
||||
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
|
||||
status: statusUnhealthyOnDial,
|
||||
isDialed: false,
|
||||
isHealthy: false,
|
||||
description: "set unhealthy on dial",
|
||||
},
|
||||
{
|
||||
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
|
||||
status: statusUnhealthyOnRequest,
|
||||
isDialed: true,
|
||||
isHealthy: false,
|
||||
description: "set unhealthy on request",
|
||||
},
|
||||
{
|
||||
action: func(m *clientStatusMonitor) { m.setHealthy() },
|
||||
status: statusHealthy,
|
||||
isDialed: true,
|
||||
isHealthy: true,
|
||||
description: "set healthy",
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
tc.action(&monitor)
|
||||
require.Equal(t, tc.status, monitor.healthy.Load())
|
||||
require.Equal(t, tc.isDialed, monitor.isDialed())
|
||||
require.Equal(t, tc.isHealthy, monitor.isHealthy())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestHandleError(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue
isDialled -> isDialed