From 164b0870deefc0763779116736a1c566db4b9b2f Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 31 Mar 2020 11:37:10 +0300 Subject: [PATCH] refactoring + moved to neofs-api-go --- app.go | 207 ++++++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 4 +- health.go | 4 +- main.go | 118 ++---------------------------- pool.go | 27 ++++++- pprof.go | 2 +- receive.go | 12 +-- settings.go | 2 +- 9 files changed, 252 insertions(+), 126 deletions(-) create mode 100644 app.go diff --git a/app.go b/app.go new file mode 100644 index 0000000..2b0b0e0 --- /dev/null +++ b/app.go @@ -0,0 +1,207 @@ +package main + +import ( + "context" + "crypto/ecdsa" + "errors" + "time" + + "github.com/fasthttp/router" + + "google.golang.org/grpc/grpclog" + + "github.com/nspcc-dev/neofs-api-go/service" + "github.com/nspcc-dev/neofs-api-go/state" + "github.com/spf13/viper" + "github.com/valyala/fasthttp" + "go.uber.org/zap" +) + +type ( + app struct { + pool *Pool + log *zap.Logger + cfg *viper.Viper + key *ecdsa.PrivateKey + + wlog logger + web *fasthttp.Server + + jobDone chan struct{} + webDone chan struct{} + + rebalanceTimer time.Duration + + nodes []string + + reqHealth *state.HealthRequest + reqNetmap *state.NetmapRequest + + conTimeout time.Duration + reqTimeout time.Duration + } + + App interface { + Wait() + Worker(context.Context) + Serve(context.Context) + } + + Option func(a *app) +) + +func WithLogger(l *zap.Logger) Option { + return func(a *app) { + if l == nil { + return + } + a.log = l + } +} + +func WithConfig(c *viper.Viper) Option { + return func(a *app) { + if c == nil { + return + } + a.cfg = c + } +} + +func newApp(opt ...Option) App { + a := &app{ + log: zap.L(), + cfg: viper.GetViper(), + web: new(fasthttp.Server), + + jobDone: make(chan struct{}), + webDone: make(chan struct{}), + } + + for i := range opt { + opt[i](a) + } + + a.wlog = gRPCLogger(a.log) + + if a.cfg.GetBool("verbose") { + grpclog.SetLoggerV2(a.wlog) + } + + a.key = fetchKey(a.log, a.cfg) + a.rebalanceTimer = a.cfg.GetDuration("rebalance_timer") + a.conTimeout = a.cfg.GetDuration("connect_timeout") + a.reqTimeout = a.cfg.GetDuration("request_timeout") + + // -- setup FastHTTP server: -- + a.web.Name = "neofs-http-gate" + a.web.ReadBufferSize = a.cfg.GetInt("web.read_buffer_size") + a.web.WriteBufferSize = a.cfg.GetInt("web.write_buffer_size") + a.web.ReadTimeout = a.cfg.GetDuration("web.read_timeout") + a.web.WriteTimeout = a.cfg.GetDuration("web.write_timeout") + a.web.GetOnly = true + a.web.DisableHeaderNamesNormalizing = true + a.web.NoDefaultServerHeader = true + a.web.NoDefaultContentType = true + // -- -- -- -- -- -- -- -- -- -- + + a.reqHealth = new(state.HealthRequest) + a.reqHealth.SetTTL(service.NonForwardingTTL) + + if err := service.SignRequestHeader(a.key, a.reqHealth); err != nil { + a.log.Fatal("could not sign `HealthRequest`", zap.Error(err)) + } + + a.reqNetmap = new(state.NetmapRequest) + a.reqNetmap.SetTTL(service.SingleForwardingTTL) + + if err := service.SignRequestHeader(a.key, a.reqNetmap); err != nil { + a.log.Fatal("could not sign `NetmapRequest`", zap.Error(err)) + } + + a.pool = newPool(a.log, a.cfg) + + return a +} + +func (a *app) Wait() { + a.log.Info("application started") + + select { + case <-a.jobDone: // wait for job is stopped + <-a.webDone + case <-a.webDone: // wait for web-server is stopped + <-a.jobDone + } +} + +func (a *app) Worker(ctx context.Context) { + dur := a.rebalanceTimer + tick := time.NewTimer(dur) + + a.pool.reBalance(ctx) + + switch _, err := a.pool.getConnection(ctx); { + case err == nil: + // ignore + case errors.Is(err, context.Canceled): + // ignore + // l.Info("context canceled") + default: + a.log.Fatal("could get connection", zap.Error(err)) + } + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-tick.C: + a.pool.reBalance(ctx) + tick.Reset(dur) + } + } + + a.pool.close() + tick.Stop() + + a.log.Info("connection worker stopped") + + close(a.jobDone) +} + +func (a *app) Serve(ctx context.Context) { + go func() { + <-ctx.Done() + a.log.Info("stop web-server", zap.Error(a.web.Shutdown())) + close(a.webDone) + }() + + r := router.New() + r.RedirectTrailingSlash = true + r.GET("/get/:cid/:oid/", a.receiveFile) + + // attaching /-/(ready,healthy) + attachHealthy(r, a.pool.unhealthy) + + // enable metrics + if a.cfg.GetBool("metrics") { + a.log.Info("enabled /metrics") + attachMetrics(r, a.wlog) + } + + // enable pprof + if a.cfg.GetBool("pprof") { + a.log.Info("enabled /debug/pprof") + attachProfiler(r) + } + + bind := a.cfg.GetString("listen_address") + a.log.Info("run gateway server", + zap.String("address", bind)) + + a.web.Handler = r.Handler + if err := a.web.ListenAndServe(bind); err != nil { + a.log.Fatal("could not start server", zap.Error(err)) + } +} diff --git a/go.mod b/go.mod index cd245b7..3a214b6 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/fasthttp/router v0.6.1 - github.com/nspcc-dev/neofs-api v0.4.1 + github.com/nspcc-dev/neofs-api-go v0.5.0 github.com/nspcc-dev/neofs-crypto v0.3.0 github.com/prometheus/client_golang v1.5.0 github.com/prometheus/common v0.9.1 diff --git a/go.sum b/go.sum index b18ac63..bd7466b 100644 --- a/go.sum +++ b/go.sum @@ -116,8 +116,8 @@ github.com/mr-tron/base58 v1.1.3/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjW github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nspcc-dev/hrw v1.0.8 h1:vwRuJXZXgkMvf473vFzeWGCfY1WBVeSHAEHvR4u3/Cg= github.com/nspcc-dev/hrw v1.0.8/go.mod h1:l/W2vx83vMQo6aStyx2AuZrJ+07lGv2JQGlVkPG06MU= -github.com/nspcc-dev/neofs-api v0.4.1 h1:Kcr9mVmtCQPoPeHIL5z6dvE8P0zsispP2c2z6HA5bLM= -github.com/nspcc-dev/neofs-api v0.4.1/go.mod h1:kkoWJ6YnTRwRHI+h2kXbx21i/sZZs0LheF+T/33xzr4= +github.com/nspcc-dev/neofs-api-go v0.5.0 h1:YGX2lEjFmiFYmk8gCkLINJjSh6hToKp2oo0sG5u8G+4= +github.com/nspcc-dev/neofs-api-go v0.5.0/go.mod h1:RlP++uLPHmXN81KWmdgBujo6FEeTn/b3aoQjHgKuNd4= github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM= github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/netmap v1.6.1 h1:Pigqpqi6QSdRiusbq5XlO20A18k6Eyu7j9MzOfAE3CM= diff --git a/health.go b/health.go index 708bbb7..3141a0f 100644 --- a/health.go +++ b/health.go @@ -12,12 +12,12 @@ const ( ) func attachHealthy(r *router.Router, e *atomic.Error) { - r.GET("/-/ready", func(ctx *fasthttp.RequestCtx) { + r.GET("/-/ready/", func(ctx *fasthttp.RequestCtx) { ctx.SetStatusCode(fasthttp.StatusOK) ctx.SetBodyString(healthyState + "ready") }) - r.GET("/-/healthy", func(c *fasthttp.RequestCtx) { + r.GET("/-/healthy/", func(c *fasthttp.RequestCtx) { code := fasthttp.StatusOK msg := "healthy" diff --git a/main.go b/main.go index 8a66199..1d9c832 100644 --- a/main.go +++ b/main.go @@ -1,122 +1,18 @@ package main -import ( - "context" - "crypto/ecdsa" - "errors" - "time" - - "github.com/fasthttp/router" - "github.com/valyala/fasthttp" - "go.uber.org/zap" - "google.golang.org/grpc/grpclog" -) - -type app struct { - pool *Pool - log *zap.Logger - timeout time.Duration - key *ecdsa.PrivateKey -} - func main() { var ( - err error - v = settings() l = newLogger(v) - z = gRPCLogger(l) g = newGracefulContext(l) - d = v.GetDuration("rebalance_timer") + + a = newApp( + WithLogger(l), + WithConfig(v)) ) - if v.GetBool("verbose") { - grpclog.SetLoggerV2(z) - } + go a.Serve(g) + go a.Worker(g) - a := &app{ - log: l, - pool: newPool(l, v), - key: fetchKey(l, v), - timeout: v.GetDuration("request_timeout"), - } - - r := router.New() - r.RedirectTrailingSlash = true - r.GET("/get/:cid/:oid", a.receiveFile) - - // attaching /-/(ready,healthy) - attachHealthy(r, a.pool.unhealthy) - - // enable metrics - if v.GetBool("metrics") { - l.Info("enabled /metrics") - attachMetrics(r, z) - } - - // enable pprof - if v.GetBool("pprof") { - l.Info("enabled /debug/pprof") - attachProfiler(r) - } - - en := &fasthttp.Server{ - Name: "neofs-http-gate", - Handler: r.Handler, - ReadBufferSize: 4096, - ReadTimeout: time.Second * 15, - GetOnly: true, - DisableHeaderNamesNormalizing: true, - NoDefaultServerHeader: true, - NoDefaultContentType: true, - } - - go func() { - bind := v.GetString("listen_address") - l.Info("run gateway server", - zap.String("address", bind)) - - if err := en.ListenAndServe(bind); err != nil { - l.Panic("could not start server", zap.Error(err)) - } - }() - - go checkConnection(g, d, a.pool) - - a.pool.reBalance(g) - - switch _, err = a.pool.getConnection(g); { - case err == nil: - // ignore - case errors.Is(err, context.Canceled): - // ignore - // l.Info("context canceled") - default: - l.Error("could get connection", zap.Error(err)) - return - } - - <-g.Done() - - l.Info("web server stopped", zap.Error(en.Shutdown())) -} - -func checkConnection(ctx context.Context, dur time.Duration, p *Pool) { - tick := time.NewTimer(dur) - -loop: - for { - select { - case <-ctx.Done(): - break loop - case <-tick.C: - p.reBalance(ctx) - tick.Reset(dur) - } - } - - p.close() - tick.Stop() - - p.log.Info("connection worker stopped") + a.Wait() } diff --git a/pool.go b/pool.go index 7baa273..15f9f47 100644 --- a/pool.go +++ b/pool.go @@ -11,8 +11,8 @@ import ( "sync" "time" - "github.com/nspcc-dev/neofs-api/service" - "github.com/nspcc-dev/neofs-api/state" + "github.com/nspcc-dev/neofs-api-go/service" + "github.com/nspcc-dev/neofs-api-go/state" "github.com/spf13/viper" "go.uber.org/atomic" "go.uber.org/zap" @@ -149,6 +149,29 @@ func (p *Pool) close() { } } +func (a *app) checkHealth(ctx context.Context, conn *grpc.ClientConn) error { + //a.log.Info("try to fetch node health status", + // zap.String("node", conn.Target()), + // zap.Stringer("timeout", a.reqTimeout)) + + ctx, cancel := context.WithTimeout(ctx, a.reqTimeout) + result, err := state.NewStatusClient(conn).HealthCheck(ctx, a.reqHealth) + cancel() + + if err != nil { + result = &state.HealthResponse{Status: err.Error()} + } else if !result.Healthy { + err = errors.New(result.Status) + } + + a.log.Debug("received node health status", + zap.String("node", conn.Target()), + zap.String("status", result.Status), + zap.Error(err)) + + return err +} + func (p *Pool) reBalance(ctx context.Context) { p.Lock() defer func() { diff --git a/pprof.go b/pprof.go index ef5457a..7400ea5 100644 --- a/pprof.go +++ b/pprof.go @@ -11,7 +11,7 @@ import ( func attachProfiler(r *router.Router) { r.GET("/debug/pprof/", pprofHandler()) - r.GET("/debug/pprof/:name", pprofHandler()) + r.GET("/debug/pprof/:name/", pprofHandler()) } func pprofHandler() fasthttp.RequestHandler { diff --git a/receive.go b/receive.go index a3d001b..de2d148 100644 --- a/receive.go +++ b/receive.go @@ -9,10 +9,10 @@ import ( "strings" "time" - "github.com/nspcc-dev/neofs-api/container" - "github.com/nspcc-dev/neofs-api/object" - "github.com/nspcc-dev/neofs-api/refs" - "github.com/nspcc-dev/neofs-api/service" + "github.com/nspcc-dev/neofs-api-go/container" + "github.com/nspcc-dev/neofs-api-go/object" + "github.com/nspcc-dev/neofs-api-go/refs" + "github.com/nspcc-dev/neofs-api-go/service" "github.com/valyala/fasthttp" "go.uber.org/zap" "google.golang.org/grpc" @@ -49,7 +49,7 @@ func (a *app) receiveFile(c *fasthttp.RequestCtx) { } { // try to connect or throw http error: - ctx, cancel := context.WithTimeout(c, a.timeout) + ctx, cancel := context.WithTimeout(c, a.reqTimeout) defer cancel() if con, err = a.pool.getConnection(ctx); err != nil { @@ -59,7 +59,7 @@ func (a *app) receiveFile(c *fasthttp.RequestCtx) { } } - ctx, cancel := context.WithTimeout(c, a.timeout) + ctx, cancel := context.WithTimeout(c, a.reqTimeout) defer cancel() log = log.With(zap.String("node", con.Target())) diff --git a/settings.go b/settings.go index f21bfd9..02cba71 100644 --- a/settings.go +++ b/settings.go @@ -11,7 +11,7 @@ import ( "strings" "time" - "github.com/nspcc-dev/neofs-api/refs" + "github.com/nspcc-dev/neofs-api-go/refs" crypto "github.com/nspcc-dev/neofs-crypto" "github.com/spf13/pflag" "github.com/spf13/viper"