Fix connection leak and panic at close operation #171

Merged
alexvanin merged 3 commits from alexvanin/frostfs-sdk-go:fix/pool-panic-leak into master 2024-09-04 19:51:15 +00:00
2 changed files with 81 additions and 9 deletions

View file

@ -86,6 +86,8 @@ type client interface {
type clientStatus interface { type clientStatus interface {
// isHealthy checks if the connection can handle requests. // isHealthy checks if the connection can handle requests.
isHealthy() bool isHealthy() bool
// isDialed checks if the connection was created.

isDialled -> isDialed

isDialled -> isDialed
isDialed() bool
// setUnhealthy marks client as unhealthy. // setUnhealthy marks client as unhealthy.
setUnhealthy() setUnhealthy()
// address return address of endpoint. // address return address of endpoint.
@ -107,7 +109,7 @@ var errPoolClientUnhealthy = errors.New("pool client unhealthy")
type clientStatusMonitor struct { type clientStatusMonitor struct {
logger *zap.Logger logger *zap.Logger
addr string addr string
healthy *atomic.Bool healthy *atomic.Uint32
errorThreshold uint32 errorThreshold uint32
mu sync.RWMutex // protect counters mu sync.RWMutex // protect counters
@ -116,6 +118,22 @@ type clientStatusMonitor struct {
methods []*methodStatus methods []*methodStatus
} }
// values for healthy status of clientStatusMonitor.
const (
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
statusUnhealthyOnDial = iota
// statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again.
statusUnhealthyOnRequest
// statusHealthy is set when connection is ready to be used by the pool.
statusHealthy
)
// methodStatus provide statistic for specific method. // methodStatus provide statistic for specific method.
type methodStatus struct { type methodStatus struct {
name string name string
@ -197,8 +215,8 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
methods[i] = &methodStatus{name: i.String()} methods[i] = &methodStatus{name: i.String()}
} }
healthy := new(atomic.Bool) healthy := new(atomic.Uint32)
healthy.Store(true) healthy.Store(statusHealthy)
return clientStatusMonitor{ return clientStatusMonitor{
logger: logger, logger: logger,
@ -324,7 +342,7 @@ func (c *clientWrapper) dial(ctx context.Context) error {
prmDial.SetGRPCDialOptions(c.prm.dialOptions...) prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err = cl.Dial(ctx, prmDial); err != nil { if err = cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthy() c.setUnhealthyOnDial()
return err return err
} }
@ -341,6 +359,12 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
wasHealthy = true wasHealthy = true
} }
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
_ = c.close()
}
var cl sdkClient.Client var cl sdkClient.Client
var prmInit sdkClient.PrmInit var prmInit sdkClient.PrmInit
prmInit.SetDefaultPrivateKey(c.prm.key) prmInit.SetDefaultPrivateKey(c.prm.key)
@ -355,7 +379,7 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
prmDial.SetGRPCDialOptions(c.prm.dialOptions...) prmDial.SetGRPCDialOptions(c.prm.dialOptions...)
if err := cl.Dial(ctx, prmDial); err != nil { if err := cl.Dial(ctx, prmDial); err != nil {
c.setUnhealthy() c.setUnhealthyOnDial()
return false, wasHealthy return false, wasHealthy
} }
@ -1008,15 +1032,23 @@ func (c *clientWrapper) sessionCreate(ctx context.Context, prm prmCreateSession)
} }
func (c *clientStatusMonitor) isHealthy() bool { func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load() return c.healthy.Load() == statusHealthy
}
func (c *clientStatusMonitor) isDialed() bool {
return c.healthy.Load() != statusUnhealthyOnDial
} }
func (c *clientStatusMonitor) setHealthy() { func (c *clientStatusMonitor) setHealthy() {
c.healthy.Store(true) c.healthy.Store(statusHealthy)
} }
func (c *clientStatusMonitor) setUnhealthy() { func (c *clientStatusMonitor) setUnhealthy() {
c.healthy.Store(false) c.healthy.Store(statusUnhealthyOnRequest)
}
func (c *clientStatusMonitor) setUnhealthyOnDial() {
c.healthy.Store(statusUnhealthyOnDial)
} }
func (c *clientStatusMonitor) address() string { func (c *clientStatusMonitor) address() string {
@ -2779,9 +2811,11 @@ func (p *Pool) Close() {
// close all clients // close all clients
for _, pools := range p.innerPools { for _, pools := range p.innerPools {
for _, cli := range pools.clients { for _, cli := range pools.clients {
if cli.isDialed() {

Wouldn't it be better from the client POV to just allow closing even undialed clients?

Wouldn't it be better from the client POV to just allow closing even undialed clients?

It would, but simple solution down below didn't work, so I suppose changes should be done on api-go level as well.

func (c *Client) Close() error {
	if c.c.Conn() != nil {
		return c.c.Conn().Close()
	}
	return nil

Function comment clearly tells us MUST NOT be called before successful Dial. So invoker (pool) should check dial state, which it does now.

It would, but simple solution down below didn't work, so I suppose changes should be done on api-go level as well. ```go func (c *Client) Close() error { if c.c.Conn() != nil { return c.c.Conn().Close() } return nil ``` Function comment clearly tells us `MUST NOT be called before successful Dial`. So invoker (pool) should check dial state, which it does now.
_ = cli.close() _ = cli.close()
} }
} }
}
} }
// SyncContainerWithNetwork applies network configuration received via // SyncContainerWithNetwork applies network configuration received via

View file

@ -523,6 +523,44 @@ func TestStatusMonitor(t *testing.T) {
require.Equal(t, uint64(count), monitor.overallErrorRate()) require.Equal(t, uint64(count), monitor.overallErrorRate())
require.Equal(t, uint32(1), monitor.currentErrorRate()) require.Equal(t, uint32(1), monitor.currentErrorRate())
t.Run("healthy status", func(t *testing.T) {
cases := []struct {
action func(*clientStatusMonitor)
status uint32
isDialed bool
isHealthy bool
description string
}{
{
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
status: statusUnhealthyOnDial,
isDialed: false,
isHealthy: false,
description: "set unhealthy on dial",
},
{
action: func(m *clientStatusMonitor) { m.setUnhealthy() },
status: statusUnhealthyOnRequest,
isDialed: true,
isHealthy: false,
description: "set unhealthy on request",
},
{
action: func(m *clientStatusMonitor) { m.setHealthy() },
status: statusHealthy,
isDialed: true,
isHealthy: true,
description: "set healthy",
},
}
for _, tc := range cases {
tc.action(&monitor)
require.Equal(t, tc.status, monitor.healthy.Load())
require.Equal(t, tc.isDialed, monitor.isDialed())
require.Equal(t, tc.isHealthy, monitor.isHealthy())
}
})
} }
func TestHandleError(t *testing.T) { func TestHandleError(t *testing.T) {