From fbfab8cae76130aa74f7813862668c1ba6a275ea Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Thu, 13 Feb 2020 17:31:33 +0300 Subject: [PATCH] Add state.HealthCheck to isActive --- pool.go | 26 +++++++++++++++++++++----- receive.go | 5 +++-- settings.go | 2 +- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/pool.go b/pool.go index fcc57a7..1fdcc8d 100644 --- a/pool.go +++ b/pool.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/nspcc-dev/neofs-api/state" "github.com/spf13/viper" "go.uber.org/zap" "google.golang.org/grpc" @@ -105,7 +106,7 @@ func newPool(ctx context.Context, l *zap.Logger, v *viper.Viper) *Pool { p.reBalance(ctx) - cur, err := p.getConnection() + cur, err := p.getConnection(ctx) if err != nil { l.Panic("could get connection", zap.Error(err)) } @@ -196,6 +197,14 @@ func (p *Pool) reBalance(ctx context.Context) { // remove from connections p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...) } + + if err := conn.Close(); err != nil { + p.log.Warn("could not close bad connection", zap.Error(err)) + } + + if p.nodes[i].conn != nil { + p.nodes[i].conn = nil + } } } @@ -209,11 +218,11 @@ func (p *Pool) reBalance(ctx context.Context) { }) } -func (p *Pool) getConnection() (*grpc.ClientConn, error) { +func (p *Pool) getConnection(ctx context.Context) (*grpc.ClientConn, error) { p.RLock() defer p.RUnlock() - if p.cur != nil && isAlive(p.cur) { + if p.cur != nil && isAlive(ctx, p.log, p.cur) { return p.cur, nil } @@ -234,10 +243,17 @@ func (p *Pool) getConnection() (*grpc.ClientConn, error) { return nil, errNoHealthyConnections } -func isAlive(cur *grpc.ClientConn) bool { +func isAlive(ctx context.Context, log *zap.Logger, cur *grpc.ClientConn) bool { switch st := cur.GetState(); st { case connectivity.Idle, connectivity.Ready, connectivity.Connecting: - return true + res, err := state.NewStatusClient(cur).HealthCheck(ctx, new(state.HealthRequest)) + if err != nil { + log.Warn("could not fetch health-check", zap.Error(err)) + + return false + } + + return res.Healthy default: return false } diff --git a/receive.go b/receive.go index 78c79c1..136156e 100644 --- a/receive.go +++ b/receive.go @@ -22,8 +22,9 @@ func (r *router) receiveFile(c echo.Context) error { cid refs.CID oid refs.ObjectID obj *object.Object + ctx = c.Request().Context() + con, err = r.pool.getConnection(ctx) download = c.QueryParam("download") != "" - con, err = r.pool.getConnection() ) if err != nil { @@ -51,7 +52,7 @@ func (r *router) receiveFile(c echo.Context) error { ) } - ctx, cancel := context.WithTimeout(context.Background(), r.timeout) + ctx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() req := &object.GetRequest{Address: refs.Address{ObjectID: oid, CID: cid}} diff --git a/settings.go b/settings.go index 50f8774..fa024fe 100644 --- a/settings.go +++ b/settings.go @@ -96,7 +96,7 @@ func settings() *viper.Viper { // keepalive: v.SetDefault("keepalive.timeout", time.Second*10) - v.SetDefault("keepalive.time", time.Millisecond*100) + v.SetDefault("keepalive.time", time.Second*10) // If set below 10s, a minimum value of 10s will be used instead. v.SetDefault("keepalive.permit_without_stream", true) if err := v.BindPFlags(flags); err != nil {