diff --git a/app.go b/app.go index dc56ddc..b255c9e 100644 --- a/app.go +++ b/app.go @@ -7,14 +7,10 @@ import ( "time" "github.com/fasthttp/router" - - "google.golang.org/grpc/grpclog" - - "github.com/nspcc-dev/neofs-api-go/service" - "github.com/nspcc-dev/neofs-api-go/state" "github.com/spf13/viper" "github.com/valyala/fasthttp" "go.uber.org/zap" + "google.golang.org/grpc/grpclog" ) type ( @@ -34,9 +30,6 @@ type ( nodes []string - reqHealth *state.HealthRequest - reqNetmap *state.NetmapRequest - conTimeout time.Duration reqTimeout time.Duration } @@ -105,21 +98,7 @@ func newApp(opt ...Option) App { a.web.NoDefaultContentType = true // -- -- -- -- -- -- -- -- -- -- - a.reqHealth = new(state.HealthRequest) - a.reqHealth.SetTTL(service.NonForwardingTTL) - - if err := service.SignDataWithSessionToken(a.key, a.reqHealth); err != nil { - a.log.Fatal("could not sign `HealthRequest`", zap.Error(err)) - } - - a.reqNetmap = new(state.NetmapRequest) - a.reqNetmap.SetTTL(service.SingleForwardingTTL) - - if err := service.SignDataWithSessionToken(a.key, a.reqNetmap); err != nil { - a.log.Fatal("could not sign `NetmapRequest`", zap.Error(err)) - } - - a.pool = newPool(a.log, a.cfg) + a.pool = newPool(a.log, a.cfg, a.key) return a } diff --git a/pool.go b/pool.go index 15f9f47..d45cd44 100644 --- a/pool.go +++ b/pool.go @@ -2,6 +2,7 @@ package main import ( "context" + "crypto/ecdsa" crand "crypto/rand" "encoding/binary" "errors" @@ -35,9 +36,11 @@ type ( ttl time.Duration - connectTimeout time.Duration - requestTimeout time.Duration - opts keepalive.ClientParameters + conTimeout time.Duration + reqTimeout time.Duration + opts keepalive.ClientParameters + + reqHealth *state.HealthRequest currentIdx *atomic.Int32 currentConn *grpc.ClientConn @@ -57,7 +60,7 @@ var ( errNoHealthyConnections = errors.New("no active connections") ) -func newPool(l *zap.Logger, v *viper.Viper) *Pool { +func newPool(l *zap.Logger, v *viper.Viper, key *ecdsa.PrivateKey) *Pool { p := &Pool{ log: l, Mutex: new(sync.Mutex), @@ -70,8 +73,8 @@ func newPool(l *zap.Logger, v *viper.Viper) *Pool { currentIdx: atomic.NewInt32(-1), // fill with defaults: - requestTimeout: defaultRequestTimeout, - connectTimeout: defaultConnectTimeout, + reqTimeout: defaultRequestTimeout, + conTimeout: defaultConnectTimeout, opts: keepalive.ClientParameters{ Time: defaultKeepaliveTime, Timeout: defaultKeepaliveTimeout, @@ -90,12 +93,19 @@ func newPool(l *zap.Logger, v *viper.Viper) *Pool { rand.Seed(int64(seed)) l.Info("used random seed", zap.Uint64("seed", seed)) + p.reqHealth = new(state.HealthRequest) + p.reqHealth.SetTTL(service.NonForwardingTTL) + + if err := service.SignDataWithSessionToken(key, p.reqHealth); err != nil { + l.Fatal("could not sign `HealthRequest`", zap.Error(err)) + } + if val := v.GetDuration("conn_ttl"); val > 0 { p.ttl = val } if val := v.GetDuration("connect_timeout"); val > 0 { - p.connectTimeout = val + p.conTimeout = val } if val := v.GetDuration("keepalive.time"); val > 0 { @@ -149,29 +159,6 @@ func (p *Pool) close() { } } -func (a *app) checkHealth(ctx context.Context, conn *grpc.ClientConn) error { - //a.log.Info("try to fetch node health status", - // zap.String("node", conn.Target()), - // zap.Stringer("timeout", a.reqTimeout)) - - ctx, cancel := context.WithTimeout(ctx, a.reqTimeout) - result, err := state.NewStatusClient(conn).HealthCheck(ctx, a.reqHealth) - cancel() - - if err != nil { - result = &state.HealthResponse{Status: err.Error()} - } else if !result.Healthy { - err = errors.New(result.Status) - } - - a.log.Debug("received node health status", - zap.String("node", conn.Target()), - zap.String("status", result.Status), - zap.Error(err)) - - return err -} - func (p *Pool) reBalance(ctx context.Context) { p.Lock() defer func() { @@ -206,7 +193,7 @@ func (p *Pool) reBalance(ctx context.Context) { p.log.Warn("empty connection, try to connect", zap.String("address", p.nodes[i].address)) - ctx, cancel := context.WithTimeout(ctx, p.connectTimeout) + ctx, cancel := context.WithTimeout(ctx, p.conTimeout) conn, err = grpc.DialContext(ctx, p.nodes[i].address, grpc.WithBlock(), grpc.WithInsecure(), @@ -329,13 +316,10 @@ func (p *Pool) isAlive(ctx context.Context, cur *grpc.ClientConn) error { switch st := cur.GetState(); st { case connectivity.Idle, connectivity.Ready, connectivity.Connecting: - req := new(state.HealthRequest) - req.SetTTL(service.NonForwardingTTL) - - ctx, cancel := context.WithTimeout(ctx, p.requestTimeout) + ctx, cancel := context.WithTimeout(ctx, p.reqTimeout) defer cancel() - res, err := state.NewStatusClient(cur).HealthCheck(ctx, req) + res, err := state.NewStatusClient(cur).HealthCheck(ctx, p.reqHealth) if err != nil { p.log.Warn("could not fetch health-check", zap.Error(err))