Refactoring connection pool

- replace RWMutex with Mutex
- conns now contains `map[int32][]*node`
- add index field to node struct
- add usedAt field to node struct
- `New` don't store current connection twice
- writes `used_at` of connection into the log
- we check that conn is alive and, that connection used not a long time ago, otherwise, close it
- `getConnection` reset current connection and index on errors
-  `IsAlive` now is a method of `Pool`
-  `IsAlive` creates context with timeout for every health check call of node
This commit is contained in:
Evgeniy Kulikov 2020-02-19 13:55:42 +03:00
parent 65a2da6995
commit 75113f8195
No known key found for this signature in database
GPG key ID: BF6AEE0A2A699BF2

90
pool.go
View file

@ -14,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-api/service" "github.com/nspcc-dev/neofs-api/service"
"github.com/nspcc-dev/neofs-api/state" "github.com/nspcc-dev/neofs-api/state"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
@ -22,23 +23,29 @@ import (
type ( type (
node struct { node struct {
index int32
address string address string
weight uint32 weight uint32
usedAt time.Time
conn *grpc.ClientConn conn *grpc.ClientConn
} }
Pool struct { Pool struct {
log *zap.Logger log *zap.Logger
ttl time.Duration
connectTimeout time.Duration connectTimeout time.Duration
requestTimeout time.Duration
opts keepalive.ClientParameters opts keepalive.ClientParameters
cur *grpc.ClientConn currentIdx *atomic.Int32
currentConn *grpc.ClientConn
*sync.RWMutex *sync.Mutex
nodes []*node nodes []*node
keys []uint32 keys []uint32
conns map[uint32][]*grpc.ClientConn conns map[uint32][]*node
} }
) )
@ -50,19 +57,25 @@ var (
func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool { func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool {
p := &Pool{ p := &Pool{
log: l, log: l,
RWMutex: new(sync.RWMutex), Mutex: new(sync.Mutex),
keys: make([]uint32, 0), keys: make([]uint32, 0),
nodes: make([]*node, 0), nodes: make([]*node, 0),
conns: make(map[uint32][]*grpc.ClientConn), conns: make(map[uint32][]*node),
ttl: defaultTTL,
currentIdx: atomic.NewInt32(-1),
// fill with defaults: // fill with defaults:
connectTimeout: time.Second * 15, requestTimeout: defaultRequestTimeout,
connectTimeout: defaultConnectTimeout,
opts: keepalive.ClientParameters{ opts: keepalive.ClientParameters{
Time: time.Second * 10, Time: defaultKeepaliveTime,
Timeout: time.Minute * 5, Timeout: defaultKeepaliveTimeout,
PermitWithoutStream: true, PermitWithoutStream: true,
}, },
} }
buf := make([]byte, 8) buf := make([]byte, 8)
if _, err := crand.Read(buf); err != nil { if _, err := crand.Read(buf); err != nil {
l.Panic("could not read seed", zap.Error(err)) l.Panic("could not read seed", zap.Error(err))
@ -72,6 +85,10 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool {
rand.Seed(int64(seed)) rand.Seed(int64(seed))
l.Info("used random seed", zap.Uint64("seed", seed)) l.Info("used random seed", zap.Uint64("seed", seed))
if val := v.GetDuration("conn_ttl"); val > 0 {
p.ttl = val
}
if val := v.GetDuration("connect_timeout"); val > 0 { if val := v.GetDuration("connect_timeout"); val > 0 {
p.connectTimeout = val p.connectTimeout = val
} }
@ -99,6 +116,7 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool {
} }
p.nodes = append(p.nodes, &node{ p.nodes = append(p.nodes, &node{
index: int32(i),
address: address, address: address,
weight: uint32(weight * 100), weight: uint32(weight * 100),
}) })
@ -110,13 +128,10 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool {
p.reBalance(ctx) p.reBalance(ctx)
cur, err := p.getConnection(ctx) if _, err := p.getConnection(ctx); err != nil {
if err != nil {
l.Panic("could get connection", zap.Error(err)) l.Panic("could get connection", zap.Error(err))
} }
p.cur = cur
return p return p
} }
@ -173,21 +188,25 @@ func (p *Pool) reBalance(ctx context.Context) {
} }
p.nodes[i].conn = conn p.nodes[i].conn = conn
p.nodes[i].usedAt = time.Now()
p.log.Info("connected to node", zap.String("address", p.nodes[i].address)) p.log.Info("connected to node", zap.String("address", p.nodes[i].address))
} }
for j := range p.conns[weight] { for j := range p.conns[weight] {
if p.conns[weight][j] == conn { if p.conns[weight][j] != nil && p.conns[weight][j].conn == conn {
idx = j idx = j
exists = true exists = true
break break
} }
} }
// if something wrong with connection (bad state or unhealthy), try to close it and remove usedAt := time.Since(p.nodes[i].usedAt)
if err = isAlive(ctx, p.log, conn); err != nil {
// if something wrong with connection (bad state, unhealthy or not used a long time), try to close it and remove
if err = p.isAlive(ctx, conn); err != nil || usedAt > p.ttl {
p.log.Warn("connection not alive", p.log.Warn("connection not alive",
zap.String("address", p.nodes[i].address), zap.String("address", p.nodes[i].address),
zap.Duration("used_at", usedAt),
zap.Error(err)) zap.Error(err))
if exists { if exists {
@ -198,6 +217,7 @@ func (p *Pool) reBalance(ctx context.Context) {
if err = conn.Close(); err != nil { if err = conn.Close(); err != nil {
p.log.Warn("could not close bad connection", p.log.Warn("could not close bad connection",
zap.String("address", p.nodes[i].address), zap.String("address", p.nodes[i].address),
zap.Duration("used_at", usedAt),
zap.Error(err)) zap.Error(err))
} }
@ -209,8 +229,12 @@ func (p *Pool) reBalance(ctx context.Context) {
keys[weight] = struct{}{} keys[weight] = struct{}{}
p.log.Info("connection alive",
zap.String("address", p.nodes[i].address),
zap.Duration("used_at", usedAt))
if !exists { if !exists {
p.conns[weight] = append(p.conns[weight], conn) p.conns[weight] = append(p.conns[weight], p.nodes[i])
} }
} }
@ -225,11 +249,15 @@ func (p *Pool) reBalance(ctx context.Context) {
} }
func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) { func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) {
p.RLock() p.Lock()
defer p.RUnlock() defer p.Unlock()
if err := isAlive(ctx, p.log, p.cur); err == nil { if err := p.isAlive(ctx, p.currentConn); err == nil {
return p.cur, nil if id := p.currentIdx.Load(); id != -1 && p.nodes[id] != nil {
p.nodes[id].usedAt = time.Now()
}
return p.currentConn, nil
} }
for _, w := range p.keys { for _, w := range p.keys {
@ -237,19 +265,26 @@ func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) {
case 0: case 0:
continue continue
case 1: case 1:
p.cur = p.conns[w][0] p.currentConn = p.conns[w][0].conn
return p.cur, nil p.conns[w][0].usedAt = time.Now()
p.currentIdx.Store(p.conns[w][0].index)
return p.currentConn, nil
default: // > 1 default: // > 1
i := rand.Intn(ln) i := rand.Intn(ln)
p.cur = p.conns[w][i] p.currentConn = p.conns[w][i].conn
return p.cur, nil p.conns[w][i].usedAt = time.Now()
p.currentIdx.Store(p.conns[w][i].index)
return p.currentConn, nil
} }
} }
p.currentConn = nil
p.currentIdx.Store(-1)
return nil, errNoHealthyConnections return nil, errNoHealthyConnections
} }
func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) error { func (p *Pool) isAlive(ctx context.Context, cur *grpc.ClientConn) error {
if cur == nil { if cur == nil {
return errEmptyConnection return errEmptyConnection
} }
@ -259,9 +294,12 @@ func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) error {
req := new(state.HealthRequest) req := new(state.HealthRequest)
req.SetTTL(service.NonForwardingTTL) req.SetTTL(service.NonForwardingTTL)
ctx, cancel := context.WithTimeout(ctx, p.requestTimeout)
defer cancel()
res, err := state.NewStatusClient(cur).HealthCheck(ctx, req) res, err := state.NewStatusClient(cur).HealthCheck(ctx, req)
if err != nil { if err != nil {
log.Warn("could not fetch health-check", zap.Error(err)) p.log.Warn("could not fetch health-check", zap.Error(err))
return err return err
} else if !res.Healthy { } else if !res.Healthy {