[#242] pool: Log error that caused healthy status change
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
7e94a6adf2
commit
93ef8a4202
2 changed files with 19 additions and 14 deletions
|
@ -195,8 +195,8 @@ func (m *mockClient) dial(context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockClient) restartIfUnhealthy(ctx context.Context) (healthy bool, changed bool) {
|
func (m *mockClient) restartIfUnhealthy(ctx context.Context) (healthy bool, changed bool, err error) {
|
||||||
_, err := m.endpointInfo(ctx, prmEndpointInfo{})
|
_, err = m.endpointInfo(ctx, prmEndpointInfo{})
|
||||||
healthy = err == nil
|
healthy = err == nil
|
||||||
changed = healthy != m.isHealthy()
|
changed = healthy != m.isHealthy()
|
||||||
if healthy {
|
if healthy {
|
||||||
|
|
29
pool/pool.go
29
pool/pool.go
|
@ -86,7 +86,7 @@ type client interface {
|
||||||
// see clientWrapper.dial.
|
// see clientWrapper.dial.
|
||||||
dial(ctx context.Context) error
|
dial(ctx context.Context) error
|
||||||
// see clientWrapper.restartIfUnhealthy.
|
// see clientWrapper.restartIfUnhealthy.
|
||||||
restartIfUnhealthy(ctx context.Context) (bool, bool)
|
restartIfUnhealthy(ctx context.Context) (bool, bool, error)
|
||||||
// see clientWrapper.close.
|
// see clientWrapper.close.
|
||||||
close() error
|
close() error
|
||||||
}
|
}
|
||||||
|
@ -374,10 +374,11 @@ func (c *clientWrapper) dial(ctx context.Context) error {
|
||||||
|
|
||||||
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
|
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
|
||||||
// Return current healthy status and indicating if status was changed by this function call.
|
// Return current healthy status and indicating if status was changed by this function call.
|
||||||
func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, changed bool) {
|
// Returns error that caused unhealthy status.
|
||||||
|
func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, changed bool, err error) {
|
||||||
var wasHealthy bool
|
var wasHealthy bool
|
||||||
if _, err := c.endpointInfo(ctx, prmEndpointInfo{}); err == nil {
|
if _, err = c.endpointInfo(ctx, prmEndpointInfo{}); err == nil {
|
||||||
return true, false
|
return true, false, nil
|
||||||
} else if !errors.Is(err, errPoolClientUnhealthy) {
|
} else if !errors.Is(err, errPoolClientUnhealthy) {
|
||||||
wasHealthy = true
|
wasHealthy = true
|
||||||
}
|
}
|
||||||
|
@ -403,22 +404,22 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (healthy, change
|
||||||
GRPCDialOptions: c.prm.dialOptions,
|
GRPCDialOptions: c.prm.dialOptions,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cl.Dial(ctx, prmDial); err != nil {
|
if err = cl.Dial(ctx, prmDial); err != nil {
|
||||||
c.setUnhealthyOnDial()
|
c.setUnhealthyOnDial()
|
||||||
return false, wasHealthy
|
return false, wasHealthy, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.clientMutex.Lock()
|
c.clientMutex.Lock()
|
||||||
c.client = &cl
|
c.client = &cl
|
||||||
c.clientMutex.Unlock()
|
c.clientMutex.Unlock()
|
||||||
|
|
||||||
if _, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}); err != nil {
|
if _, err = cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}); err != nil {
|
||||||
c.setUnhealthy()
|
c.setUnhealthy()
|
||||||
return false, wasHealthy
|
return false, wasHealthy, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.setHealthy()
|
c.setHealthy()
|
||||||
return true, !wasHealthy
|
return true, !wasHealthy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
|
func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
|
||||||
|
@ -2198,7 +2199,7 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
|
||||||
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
|
||||||
defer c()
|
defer c()
|
||||||
|
|
||||||
healthy, changed := cli.restartIfUnhealthy(tctx)
|
healthy, changed, err := cli.restartIfUnhealthy(tctx)
|
||||||
if healthy {
|
if healthy {
|
||||||
bufferWeights[j] = options.nodesParams[i].weights[j]
|
bufferWeights[j] = options.nodesParams[i].weights[j]
|
||||||
} else {
|
} else {
|
||||||
|
@ -2207,8 +2208,12 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
|
||||||
}
|
}
|
||||||
|
|
||||||
if changed {
|
if changed {
|
||||||
p.log(zap.DebugLevel, "health has changed",
|
fields := []zap.Field{zap.String("address", cli.address()), zap.Bool("healthy", healthy)}
|
||||||
zap.String("address", cli.address()), zap.Bool("healthy", healthy))
|
if err != nil {
|
||||||
|
fields = append(fields, zap.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
|
||||||
|
p.log(zap.DebugLevel, "health has changed", fields...)
|
||||||
healthyChanged.Store(true)
|
healthyChanged.Store(true)
|
||||||
}
|
}
|
||||||
}(j, cli)
|
}(j, cli)
|
||||||
|
|
Loading…
Reference in a new issue