[#278] pool: Don't make maintenance node healthy in rebalance

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-10-17 16:26:13 +03:00
parent 79f387317a
commit 05aa3becae
3 changed files with 301 additions and 79 deletions

View file

@ -26,11 +26,15 @@ type mockClient struct {
errorOnDial bool errorOnDial bool
errorOnCreateSession bool errorOnCreateSession bool
errorOnEndpointInfo bool errorOnEndpointInfo error
resOnEndpointInfo netmap.NodeInfo
healthcheckFn func()
errorOnNetworkInfo bool errorOnNetworkInfo bool
stOnGetObject apistatus.Status stOnGetObject apistatus.Status
} }
var _ client = (*mockClient)(nil)
func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient { func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
return &mockClient{ return &mockClient{
key: key, key: key,
@ -38,6 +42,16 @@ func newMockClient(addr string, key ecdsa.PrivateKey) *mockClient {
} }
} }
func newMockClientHealthy(addr string, key ecdsa.PrivateKey, healthy bool) *mockClient {
m := newMockClient(addr, key)
if healthy {
m.setHealthy()
} else {
m.setUnhealthy()
}
return m
}
func (m *mockClient) setThreshold(threshold uint32) { func (m *mockClient) setThreshold(threshold uint32) {
m.errorThreshold = threshold m.errorThreshold = threshold
} }
@ -47,11 +61,11 @@ func (m *mockClient) errOnCreateSession() {
} }
func (m *mockClient) errOnEndpointInfo() { func (m *mockClient) errOnEndpointInfo() {
m.errorOnEndpointInfo = true m.errorOnEndpointInfo = errors.New("error")
} }
func (m *mockClient) errOnNetworkInfo() { func (m *mockClient) errOnNetworkInfo() {
m.errorOnEndpointInfo = true m.errorOnEndpointInfo = errors.New("error")
} }
func (m *mockClient) errOnDial() { func (m *mockClient) errOnDial() {
@ -94,27 +108,32 @@ func (m *mockClient) containerDelete(context.Context, PrmContainerDelete) error
return nil return nil
} }
func (c *mockClient) apeManagerAddChain(ctx context.Context, prm PrmAddAPEChain) error { func (m *mockClient) apeManagerAddChain(context.Context, PrmAddAPEChain) error {
return nil return nil
} }
func (c *mockClient) apeManagerRemoveChain(ctx context.Context, prm PrmRemoveAPEChain) error { func (m *mockClient) apeManagerRemoveChain(context.Context, PrmRemoveAPEChain) error {
return nil return nil
} }
func (c *mockClient) apeManagerListChains(ctx context.Context, prm PrmListAPEChains) ([]ape.Chain, error) { func (m *mockClient) apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error) {
return []ape.Chain{}, nil return []ape.Chain{}, nil
} }
func (m *mockClient) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) { func (m *mockClient) endpointInfo(ctx context.Context, _ prmEndpointInfo) (netmap.NodeInfo, error) {
var ni netmap.NodeInfo if m.errorOnEndpointInfo != nil {
return netmap.NodeInfo{}, m.handleError(ctx, nil, m.errorOnEndpointInfo)
if m.errorOnEndpointInfo {
return ni, m.handleError(ctx, nil, errors.New("error"))
} }
ni.SetNetworkEndpoints(m.addr) m.resOnEndpointInfo.SetNetworkEndpoints(m.addr)
return ni, nil return m.resOnEndpointInfo, nil
}
func (m *mockClient) healthcheck(ctx context.Context) (netmap.NodeInfo, error) {
if m.healthcheckFn != nil {
m.healthcheckFn()
}
return m.endpointInfo(ctx, prmEndpointInfo{})
} }
func (m *mockClient) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) { func (m *mockClient) networkInfo(ctx context.Context, _ prmNetworkInfo) (netmap.NetworkInfo, error) {
@ -190,16 +209,12 @@ func (m *mockClient) dial(context.Context) error {
return nil return nil
} }
func (m *mockClient) restartIfUnhealthy(ctx context.Context) (changed bool, err error) { func (m *mockClient) restart(context.Context) error {
_, err = m.endpointInfo(ctx, prmEndpointInfo{}) if m.errorOnDial {
healthy := err == nil return errors.New("restart dial error")
changed = healthy != m.isHealthy()
if healthy {
m.setHealthy()
} else {
m.setUnhealthy()
} }
return
return nil
} }
func (m *mockClient) close() error { func (m *mockClient) close() error {

View file

@ -57,6 +57,8 @@ type client interface {
apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error) apeManagerListChains(context.Context, PrmListAPEChains) ([]ape.Chain, error)
// see clientWrapper.endpointInfo. // see clientWrapper.endpointInfo.
endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error) endpointInfo(context.Context, prmEndpointInfo) (netmap.NodeInfo, error)
// see clientWrapper.healthcheck.
healthcheck(ctx context.Context) (netmap.NodeInfo, error)
// see clientWrapper.networkInfo. // see clientWrapper.networkInfo.
networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error) networkInfo(context.Context, prmNetworkInfo) (netmap.NetworkInfo, error)
// see clientWrapper.netMapSnapshot // see clientWrapper.netMapSnapshot
@ -82,8 +84,8 @@ type client interface {
// see clientWrapper.dial. // see clientWrapper.dial.
dial(ctx context.Context) error dial(ctx context.Context) error
// see clientWrapper.restartIfUnhealthy. // see clientWrapper.restart.
restartIfUnhealthy(ctx context.Context) (bool, error) restart(ctx context.Context) error
// see clientWrapper.close. // see clientWrapper.close.
close() error close() error
} }
@ -92,10 +94,10 @@ type client interface {
type clientStatus interface { type clientStatus interface {
// isHealthy checks if the connection can handle requests. // isHealthy checks if the connection can handle requests.
isHealthy() bool isHealthy() bool
// isDialed checks if the connection was created.
isDialed() bool
// setUnhealthy marks client as unhealthy. // setUnhealthy marks client as unhealthy.
setUnhealthy() setUnhealthy()
// setHealthy marks client as healthy.
setHealthy()
// address return address of endpoint. // address return address of endpoint.
address() string address() string
// currentErrorRate returns current errors rate. // currentErrorRate returns current errors rate.
@ -126,15 +128,10 @@ type clientStatusMonitor struct {
// values for healthy status of clientStatusMonitor. // values for healthy status of clientStatusMonitor.
const ( const (
// statusUnhealthyOnDial is set when dialing to the endpoint is failed,
// so there is no connection to the endpoint, and pool should not close it
// before re-establishing connection once again.
statusUnhealthyOnDial = iota
// statusUnhealthyOnRequest is set when communication after dialing to the // statusUnhealthyOnRequest is set when communication after dialing to the
// endpoint is failed due to immediate or accumulated errors, connection is // endpoint is failed due to immediate or accumulated errors, connection is
// available and pool should close it before re-establishing connection once again. // available and pool should close it before re-establishing connection once again.
statusUnhealthyOnRequest statusUnhealthyOnRequest = iota
// statusHealthy is set when connection is ready to be used by the pool. // statusHealthy is set when connection is ready to be used by the pool.
statusHealthy statusHealthy
@ -233,6 +230,7 @@ func newClientStatusMonitor(logger *zap.Logger, addr string, errorThreshold uint
type clientWrapper struct { type clientWrapper struct {
clientMutex sync.RWMutex clientMutex sync.RWMutex
client *sdkClient.Client client *sdkClient.Client
dialed bool
prm wrapperPrm prm wrapperPrm
clientStatusMonitor clientStatusMonitor
@ -342,30 +340,17 @@ func (c *clientWrapper) dial(ctx context.Context) error {
GRPCDialOptions: c.prm.dialOptions, GRPCDialOptions: c.prm.dialOptions,
} }
if err = cl.Dial(ctx, prmDial); err != nil { err = cl.Dial(ctx, prmDial)
c.setUnhealthyOnDial() c.setDialed(err == nil)
if err != nil {
return err return err
} }
return nil return nil
} }
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy. // restart recreates and redial inner sdk client.
// Indicating if status was changed by this function call and returns error that caused unhealthy status. func (c *clientWrapper) restart(ctx context.Context) error {
func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, err error) {
var wasHealthy bool
if _, err = c.endpointInfo(ctx, prmEndpointInfo{}); err == nil {
return false, nil
} else if !errors.Is(err, errPoolClientUnhealthy) {
wasHealthy = true
}
// if connection is dialed before, to avoid routine / connection leak,
// pool has to close it and then initialize once again.
if c.isDialed() {
c.scheduleGracefulClose()
}
var cl sdkClient.Client var cl sdkClient.Client
prmInit := sdkClient.PrmInit{ prmInit := sdkClient.PrmInit{
Key: c.prm.key, Key: c.prm.key,
@ -381,22 +366,35 @@ func (c *clientWrapper) restartIfUnhealthy(ctx context.Context) (changed bool, e
GRPCDialOptions: c.prm.dialOptions, GRPCDialOptions: c.prm.dialOptions,
} }
if err = cl.Dial(ctx, prmDial); err != nil { // if connection is dialed before, to avoid routine / connection leak,
c.setUnhealthyOnDial() // pool has to close it and then initialize once again.
return wasHealthy, err if c.isDialed() {
c.scheduleGracefulClose()
}
err := cl.Dial(ctx, prmDial)
c.setDialed(err == nil)
if err != nil {
return err
} }
c.clientMutex.Lock() c.clientMutex.Lock()
c.client = &cl c.client = &cl
c.clientMutex.Unlock() c.clientMutex.Unlock()
if _, err = cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}); err != nil { return nil
c.setUnhealthy() }
return wasHealthy, err
}
c.setHealthy() func (c *clientWrapper) isDialed() bool {
return !wasHealthy, nil c.mu.RLock()
defer c.mu.RUnlock()
return c.dialed
}
func (c *clientWrapper) setDialed(dialed bool) {
c.mu.Lock()
c.dialed = dialed
c.mu.Unlock()
} }
func (c *clientWrapper) getClient() (*sdkClient.Client, error) { func (c *clientWrapper) getClient() (*sdkClient.Client, error) {
@ -654,6 +652,15 @@ func (c *clientWrapper) endpointInfo(ctx context.Context, _ prmEndpointInfo) (ne
return netmap.NodeInfo{}, err return netmap.NodeInfo{}, err
} }
return c.endpointInfoRaw(ctx, cl)
}
func (c *clientWrapper) healthcheck(ctx context.Context) (netmap.NodeInfo, error) {
cl := c.getClientRaw()
return c.endpointInfoRaw(ctx, cl)
}
func (c *clientWrapper) endpointInfoRaw(ctx context.Context, cl *sdkClient.Client) (netmap.NodeInfo, error) {
start := time.Now() start := time.Now()
res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{}) res, err := cl.EndpointInfo(ctx, sdkClient.PrmEndpointInfo{})
c.incRequests(time.Since(start), methodEndpointInfo) c.incRequests(time.Since(start), methodEndpointInfo)
@ -1121,10 +1128,6 @@ func (c *clientStatusMonitor) isHealthy() bool {
return c.healthy.Load() == statusHealthy return c.healthy.Load() == statusHealthy
} }
func (c *clientStatusMonitor) isDialed() bool {
return c.healthy.Load() != statusUnhealthyOnDial
}
func (c *clientStatusMonitor) setHealthy() { func (c *clientStatusMonitor) setHealthy() {
c.healthy.Store(statusHealthy) c.healthy.Store(statusHealthy)
} }
@ -1133,10 +1136,6 @@ func (c *clientStatusMonitor) setUnhealthy() {
c.healthy.Store(statusUnhealthyOnRequest) c.healthy.Store(statusUnhealthyOnRequest)
} }
func (c *clientStatusMonitor) setUnhealthyOnDial() {
c.healthy.Store(statusUnhealthyOnDial)
}
func (c *clientStatusMonitor) address() string { func (c *clientStatusMonitor) address() string {
return c.addr return c.addr
} }
@ -1211,6 +1210,9 @@ func (c *clientWrapper) incRequests(elapsed time.Duration, method MethodIndex) {
} }
func (c *clientWrapper) close() error { func (c *clientWrapper) close() error {
if !c.isDialed() {
return nil
}
if cl := c.getClientRaw(); cl != nil { if cl := c.getClientRaw(); cl != nil {
return cl.Close() return cl.Close()
} }
@ -2153,7 +2155,9 @@ func adjustNodeParams(nodeParams []NodeParam) ([]*nodesParam, error) {
// startRebalance runs loop to monitor connection healthy status. // startRebalance runs loop to monitor connection healthy status.
func (p *Pool) startRebalance(ctx context.Context) { func (p *Pool) startRebalance(ctx context.Context) {
ticker := time.NewTimer(p.rebalanceParams.clientRebalanceInterval) ticker := time.NewTicker(p.rebalanceParams.clientRebalanceInterval)
defer ticker.Stop()
buffers := make([][]float64, len(p.rebalanceParams.nodesParams)) buffers := make([][]float64, len(p.rebalanceParams.nodesParams))
for i, params := range p.rebalanceParams.nodesParams { for i, params := range p.rebalanceParams.nodesParams {
buffers[i] = make([]float64, len(params.weights)) buffers[i] = make([]float64, len(params.weights))
@ -2203,7 +2207,7 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout) tctx, c := context.WithTimeout(ctx, options.nodeRequestTimeout)
defer c() defer c()
changed, err := cli.restartIfUnhealthy(tctx) changed, err := restartIfUnhealthy(tctx, cli)
healthy := err == nil healthy := err == nil
if healthy { if healthy {
bufferWeights[j] = options.nodesParams[i].weights[j] bufferWeights[j] = options.nodesParams[i].weights[j]
@ -2234,6 +2238,43 @@ func (p *Pool) updateInnerNodesHealth(ctx context.Context, i int, bufferWeights
} }
} }
// restartIfUnhealthy checks healthy status of client and recreate it if status is unhealthy.
// Indicating if status was changed by this function call and returns error that caused unhealthy status.
func restartIfUnhealthy(ctx context.Context, c client) (changed bool, err error) {
defer func() {
if err != nil {
c.setUnhealthy()
} else {
c.setHealthy()
}
}()
wasHealthy := c.isHealthy()
if res, err := c.healthcheck(ctx); err == nil {
if res.Status().IsMaintenance() {
return wasHealthy, new(apistatus.NodeUnderMaintenance)
}
return !wasHealthy, nil
}
if err = c.restart(ctx); err != nil {
return wasHealthy, err
}
res, err := c.healthcheck(ctx)
if err != nil {
return wasHealthy, err
}
if res.Status().IsMaintenance() {
return wasHealthy, new(apistatus.NodeUnderMaintenance)
}
return !wasHealthy, nil
}
func adjustWeights(weights []float64) []float64 { func adjustWeights(weights []float64) []float64 {
adjusted := make([]float64, len(weights)) adjusted := make([]float64, len(weights))
sum := 0.0 sum := 0.0
@ -3032,9 +3073,7 @@ func (p *Pool) Close() {
// close all clients // close all clients
for _, pools := range p.innerPools { for _, pools := range p.innerPools {
for _, cli := range pools.clients { for _, cli := range pools.clients {
if cli.isDialed() { _ = cli.close()
_ = cli.close()
}
} }
} }
} }

View file

@ -4,11 +4,13 @@ import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"math/rand"
"testing" "testing"
"time" "time"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa" frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
@ -17,6 +19,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap" "go.uber.org/zap"
"go.uber.org/zap/zaptest" "go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
) )
func TestBuildPoolClientFailed(t *testing.T) { func TestBuildPoolClientFailed(t *testing.T) {
@ -230,6 +233,179 @@ func TestOneOfTwoFailed(t *testing.T) {
} }
} }
func TestUpdateNodesHealth(t *testing.T) {
ctx := context.Background()
key := newPrivateKey(t)
for _, tc := range []struct {
name string
wasHealthy bool
willHealthy bool
prepareCli func(*mockClient)
}{
{
name: "healthy, maintenance, unhealthy",
wasHealthy: true,
willHealthy: false,
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Maintenance) },
},
{
name: "unhealthy, maintenance, unhealthy",
wasHealthy: false,
willHealthy: false,
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Maintenance) },
},
{
name: "healthy, no error, healthy",
wasHealthy: true,
willHealthy: true,
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Online) },
},
{
name: "unhealthy, no error, healthy",
wasHealthy: false,
willHealthy: true,
prepareCli: func(c *mockClient) { c.resOnEndpointInfo.SetStatus(netmap.Online) },
},
{
name: "healthy, error, failed restart, unhealthy",
wasHealthy: true,
willHealthy: false,
prepareCli: func(c *mockClient) {
c.errOnEndpointInfo()
c.errorOnDial = true
},
},
{
name: "unhealthy, error, failed restart, unhealthy",
wasHealthy: false,
willHealthy: false,
prepareCli: func(c *mockClient) {
c.errOnEndpointInfo()
c.errorOnDial = true
},
},
{
name: "healthy, error, restart, error, unhealthy",
wasHealthy: true,
willHealthy: false,
prepareCli: func(c *mockClient) { c.errOnEndpointInfo() },
},
{
name: "unhealthy, error, restart, error, unhealthy",
wasHealthy: false,
willHealthy: false,
prepareCli: func(c *mockClient) { c.errOnEndpointInfo() },
},
{
name: "healthy, error, restart, maintenance, unhealthy",
wasHealthy: true,
willHealthy: false,
prepareCli: func(c *mockClient) {
healthError := true
c.healthcheckFn = func() {
if healthError {
c.errorOnEndpointInfo = errors.New("error")
} else {
c.errorOnEndpointInfo = nil
c.resOnEndpointInfo.SetStatus(netmap.Maintenance)
}
healthError = !healthError
}
},
},
{
name: "unhealthy, error, restart, maintenance, unhealthy",
wasHealthy: false,
willHealthy: false,
prepareCli: func(c *mockClient) {
healthError := true
c.healthcheckFn = func() {
if healthError {
c.errorOnEndpointInfo = errors.New("error")
} else {
c.errorOnEndpointInfo = nil
c.resOnEndpointInfo.SetStatus(netmap.Maintenance)
}
healthError = !healthError
}
},
},
{
name: "healthy, error, restart, healthy",
wasHealthy: true,
willHealthy: true,
prepareCli: func(c *mockClient) {
healthError := true
c.healthcheckFn = func() {
if healthError {
c.errorOnEndpointInfo = errors.New("error")
} else {
c.errorOnEndpointInfo = nil
}
healthError = !healthError
}
},
},
{
name: "unhealthy, error, restart, healthy",
wasHealthy: false,
willHealthy: true,
prepareCli: func(c *mockClient) {
healthError := true
c.healthcheckFn = func() {
if healthError {
c.errorOnEndpointInfo = errors.New("error")
} else {
c.errorOnEndpointInfo = nil
}
healthError = !healthError
}
},
},
} {
t.Run(tc.name, func(t *testing.T) {
cli := newMockClientHealthy("peer0", *key, tc.wasHealthy)
tc.prepareCli(cli)
p, log := newPool(t, cli)
p.updateNodesHealth(ctx, [][]float64{{1}})
changed := tc.wasHealthy != tc.willHealthy
require.Equalf(t, tc.willHealthy, cli.isHealthy(), "healthy status should be: %v", tc.willHealthy)
require.Equalf(t, changed, 1 == log.Len(), "healthy status should be changed: %v", changed)
})
}
}
func newPool(t *testing.T, cli *mockClient) (*Pool, *observer.ObservedLogs) {
log, observedLog := getObservedLogger()
cache, err := newCache(0)
require.NoError(t, err)
return &Pool{
innerPools: []*innerPool{{
sampler: newSampler([]float64{1}, rand.NewSource(0)),
clients: []client{cli},
}},
cache: cache,
key: newPrivateKey(t),
closedCh: make(chan struct{}),
rebalanceParams: rebalanceParameters{
nodesParams: []*nodesParam{{1, []string{"peer0"}, []float64{1}}},
nodeRequestTimeout: time.Second,
clientRebalanceInterval: 200 * time.Millisecond,
},
logger: log,
}, observedLog
}
func getObservedLogger() (*zap.Logger, *observer.ObservedLogs) {
loggerCore, observedLog := observer.New(zap.DebugLevel)
return zap.New(loggerCore), observedLog
}
func TestTwoFailed(t *testing.T) { func TestTwoFailed(t *testing.T) {
var clientKeys []*ecdsa.PrivateKey var clientKeys []*ecdsa.PrivateKey
mockClientBuilder := func(addr string) client { mockClientBuilder := func(addr string) client {
@ -529,13 +705,6 @@ func TestStatusMonitor(t *testing.T) {
isHealthy bool isHealthy bool
description string description string
}{ }{
{
action: func(m *clientStatusMonitor) { m.setUnhealthyOnDial() },
status: statusUnhealthyOnDial,
isDialed: false,
isHealthy: false,
description: "set unhealthy on dial",
},
{ {
action: func(m *clientStatusMonitor) { m.setUnhealthy() }, action: func(m *clientStatusMonitor) { m.setUnhealthy() },
status: statusUnhealthyOnRequest, status: statusUnhealthyOnRequest,
@ -554,7 +723,6 @@ func TestStatusMonitor(t *testing.T) {
for _, tc := range cases { for _, tc := range cases {
tc.action(&monitor) tc.action(&monitor)
require.Equal(t, tc.status, monitor.healthy.Load()) require.Equal(t, tc.status, monitor.healthy.Load())
require.Equal(t, tc.isDialed, monitor.isDialed())
require.Equal(t, tc.isHealthy, monitor.isHealthy()) require.Equal(t, tc.isHealthy, monitor.isHealthy())
} }
}) })