NFSSVC-16 Sign health-check requests

This commit is contained in:
Evgeniy Kulikov 2020-07-02 11:34:54 +03:00
parent 6b3344bd41
commit bbb2a057c7
No known key found for this signature in database
GPG key ID: BF6AEE0A2A699BF2
2 changed files with 22 additions and 59 deletions

25
app.go
View file

@ -7,14 +7,10 @@ import (
"time" "time"
"github.com/fasthttp/router" "github.com/fasthttp/router"
"google.golang.org/grpc/grpclog"
"github.com/nspcc-dev/neofs-api-go/service"
"github.com/nspcc-dev/neofs-api-go/state"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc/grpclog"
) )
type ( type (
@ -34,9 +30,6 @@ type (
nodes []string nodes []string
reqHealth *state.HealthRequest
reqNetmap *state.NetmapRequest
conTimeout time.Duration conTimeout time.Duration
reqTimeout time.Duration reqTimeout time.Duration
} }
@ -105,21 +98,7 @@ func newApp(opt ...Option) App {
a.web.NoDefaultContentType = true a.web.NoDefaultContentType = true
// -- -- -- -- -- -- -- -- -- -- // -- -- -- -- -- -- -- -- -- --
a.reqHealth = new(state.HealthRequest) a.pool = newPool(a.log, a.cfg, a.key)
a.reqHealth.SetTTL(service.NonForwardingTTL)
if err := service.SignDataWithSessionToken(a.key, a.reqHealth); err != nil {
a.log.Fatal("could not sign `HealthRequest`", zap.Error(err))
}
a.reqNetmap = new(state.NetmapRequest)
a.reqNetmap.SetTTL(service.SingleForwardingTTL)
if err := service.SignDataWithSessionToken(a.key, a.reqNetmap); err != nil {
a.log.Fatal("could not sign `NetmapRequest`", zap.Error(err))
}
a.pool = newPool(a.log, a.cfg)
return a return a
} }

54
pool.go
View file

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"crypto/ecdsa"
crand "crypto/rand" crand "crypto/rand"
"encoding/binary" "encoding/binary"
"errors" "errors"
@ -35,10 +36,12 @@ type (
ttl time.Duration ttl time.Duration
connectTimeout time.Duration conTimeout time.Duration
requestTimeout time.Duration reqTimeout time.Duration
opts keepalive.ClientParameters opts keepalive.ClientParameters
reqHealth *state.HealthRequest
currentIdx *atomic.Int32 currentIdx *atomic.Int32
currentConn *grpc.ClientConn currentConn *grpc.ClientConn
@ -57,7 +60,7 @@ var (
errNoHealthyConnections = errors.New("no active connections") errNoHealthyConnections = errors.New("no active connections")
) )
func newPool(l *zap.Logger, v *viper.Viper) *Pool { func newPool(l *zap.Logger, v *viper.Viper, key *ecdsa.PrivateKey) *Pool {
p := &Pool{ p := &Pool{
log: l, log: l,
Mutex: new(sync.Mutex), Mutex: new(sync.Mutex),
@ -70,8 +73,8 @@ func newPool(l *zap.Logger, v *viper.Viper) *Pool {
currentIdx: atomic.NewInt32(-1), currentIdx: atomic.NewInt32(-1),
// fill with defaults: // fill with defaults:
requestTimeout: defaultRequestTimeout, reqTimeout: defaultRequestTimeout,
connectTimeout: defaultConnectTimeout, conTimeout: defaultConnectTimeout,
opts: keepalive.ClientParameters{ opts: keepalive.ClientParameters{
Time: defaultKeepaliveTime, Time: defaultKeepaliveTime,
Timeout: defaultKeepaliveTimeout, Timeout: defaultKeepaliveTimeout,
@ -90,12 +93,19 @@ func newPool(l *zap.Logger, v *viper.Viper) *Pool {
rand.Seed(int64(seed)) rand.Seed(int64(seed))
l.Info("used random seed", zap.Uint64("seed", seed)) l.Info("used random seed", zap.Uint64("seed", seed))
p.reqHealth = new(state.HealthRequest)
p.reqHealth.SetTTL(service.NonForwardingTTL)
if err := service.SignDataWithSessionToken(key, p.reqHealth); err != nil {
l.Fatal("could not sign `HealthRequest`", zap.Error(err))
}
if val := v.GetDuration("conn_ttl"); val > 0 { if val := v.GetDuration("conn_ttl"); val > 0 {
p.ttl = val p.ttl = val
} }
if val := v.GetDuration("connect_timeout"); val > 0 { if val := v.GetDuration("connect_timeout"); val > 0 {
p.connectTimeout = val p.conTimeout = val
} }
if val := v.GetDuration("keepalive.time"); val > 0 { if val := v.GetDuration("keepalive.time"); val > 0 {
@ -149,29 +159,6 @@ func (p *Pool) close() {
} }
} }
func (a *app) checkHealth(ctx context.Context, conn *grpc.ClientConn) error {
//a.log.Info("try to fetch node health status",
// zap.String("node", conn.Target()),
// zap.Stringer("timeout", a.reqTimeout))
ctx, cancel := context.WithTimeout(ctx, a.reqTimeout)
result, err := state.NewStatusClient(conn).HealthCheck(ctx, a.reqHealth)
cancel()
if err != nil {
result = &state.HealthResponse{Status: err.Error()}
} else if !result.Healthy {
err = errors.New(result.Status)
}
a.log.Debug("received node health status",
zap.String("node", conn.Target()),
zap.String("status", result.Status),
zap.Error(err))
return err
}
func (p *Pool) reBalance(ctx context.Context) { func (p *Pool) reBalance(ctx context.Context) {
p.Lock() p.Lock()
defer func() { defer func() {
@ -206,7 +193,7 @@ func (p *Pool) reBalance(ctx context.Context) {
p.log.Warn("empty connection, try to connect", p.log.Warn("empty connection, try to connect",
zap.String("address", p.nodes[i].address)) zap.String("address", p.nodes[i].address))
ctx, cancel := context.WithTimeout(ctx, p.connectTimeout) ctx, cancel := context.WithTimeout(ctx, p.conTimeout)
conn, err = grpc.DialContext(ctx, p.nodes[i].address, conn, err = grpc.DialContext(ctx, p.nodes[i].address,
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithInsecure(), grpc.WithInsecure(),
@ -329,13 +316,10 @@ func (p *Pool) isAlive(ctx context.Context, cur *grpc.ClientConn) error {
switch st := cur.GetState(); st { switch st := cur.GetState(); st {
case connectivity.Idle, connectivity.Ready, connectivity.Connecting: case connectivity.Idle, connectivity.Ready, connectivity.Connecting:
req := new(state.HealthRequest) ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
req.SetTTL(service.NonForwardingTTL)
ctx, cancel := context.WithTimeout(ctx, p.requestTimeout)
defer cancel() defer cancel()
res, err := state.NewStatusClient(cur).HealthCheck(ctx, req) res, err := state.NewStatusClient(cur).HealthCheck(ctx, p.reqHealth)
if err != nil { if err != nil {
p.log.Warn("could not fetch health-check", zap.Error(err)) p.log.Warn("could not fetch health-check", zap.Error(err))