Add state.HealthCheck to isActive

This commit is contained in:
Evgeniy Kulikov 2020-02-13 17:31:33 +03:00
parent b8a6af43aa
commit fbfab8cae7
No known key found for this signature in database
GPG key ID: BF6AEE0A2A699BF2
3 changed files with 25 additions and 8 deletions

26
pool.go
View file

@ -11,6 +11,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/nspcc-dev/neofs-api/state"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -105,7 +106,7 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool {
p.reBalance(ctx) p.reBalance(ctx)
cur, err := p.getConnection() cur, err := p.getConnection(ctx)
if err != nil { if err != nil {
l.Panic("could get connection", zap.Error(err)) l.Panic("could get connection", zap.Error(err))
} }
@ -196,6 +197,14 @@ func (p *Pool) reBalance(ctx context.Context) {
// remove from connections // remove from connections
p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...) p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...)
} }
if err := conn.Close(); err != nil {
p.log.Warn("could not close bad connection", zap.Error(err))
}
if p.nodes[i].conn != nil {
p.nodes[i].conn = nil
}
} }
} }
@ -209,11 +218,11 @@ func (p *Pool) reBalance(ctx context.Context) {
}) })
} }
func (p *Pool) getConnection() (*grpc.ClientConn, error) { func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) {
p.RLock() p.RLock()
defer p.RUnlock() defer p.RUnlock()
if p.cur != nil && isAlive(p.cur) { if p.cur != nil && isAlive(ctx, p.log, p.cur) {
return p.cur, nil return p.cur, nil
} }
@ -234,10 +243,17 @@ func (p *Pool) getConnection() (*grpc.ClientConn, error) {
return nil, errNoHealthyConnections return nil, errNoHealthyConnections
} }
func isAlive(cur *grpc.ClientConn) bool { func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) bool {
switch st := cur.GetState(); st { switch st := cur.GetState(); st {
case connectivity.Idle, connectivity.Ready, connectivity.Connecting: case connectivity.Idle, connectivity.Ready, connectivity.Connecting:
return true res, err := state.NewStatusClient(cur).HealthCheck(ctx, new(state.HealthRequest))
if err != nil {
log.Warn("could not fetch health-check", zap.Error(err))
return false
}
return res.Healthy
default: default:
return false return false
} }

View file

@ -22,8 +22,9 @@ func (r *router) receiveFile(c echo.Context) error {
cid refs.CID cid refs.CID
oid refs.ObjectID oid refs.ObjectID
obj *object.Object obj *object.Object
ctx = c.Request().Context()
con, err = r.pool.getConnection(ctx)
download = c.QueryParam("download") != "" download = c.QueryParam("download") != ""
con, err = r.pool.getConnection()
) )
if err != nil { if err != nil {
@ -51,7 +52,7 @@ func (r *router) receiveFile(c echo.Context) error {
) )
} }
ctx, cancel := context.WithTimeout(context.Background(), r.timeout) ctx, cancel := context.WithTimeout(ctx, r.timeout)
defer cancel() defer cancel()
req := &object.GetRequest{Address: refs.Address{ObjectID: oid, CID: cid}} req := &object.GetRequest{Address: refs.Address{ObjectID: oid, CID: cid}}

View file

@ -96,7 +96,7 @@ func settings() *viper.Viper {
// keepalive: // keepalive:
v.SetDefault("keepalive.timeout", time.Second*10) v.SetDefault("keepalive.timeout", time.Second*10)
v.SetDefault("keepalive.time", time.Millisecond*100) v.SetDefault("keepalive.time", time.Second*10) // If set below 10s, a minimum value of 10s will be used instead.
v.SetDefault("keepalive.permit_without_stream", true) v.SetDefault("keepalive.permit_without_stream", true)
if err := v.BindPFlags(flags); err != nil { if err := v.BindPFlags(flags); err != nil {