From 8aef59cdbd49b2cabf5a5d0d1322d7c96673936e Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Fri, 28 Feb 2020 20:07:50 +0300 Subject: [PATCH] refactoring application: use fasthttp instead echo, add healthcheck handlers, run web server immidiently, close pool connections when stop --- main.go | 115 ++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 79 insertions(+), 36 deletions(-) diff --git a/main.go b/main.go index 3974b88..ad17d6e 100644 --- a/main.go +++ b/main.go @@ -4,92 +4,134 @@ import ( "context" "crypto/ecdsa" "errors" - "net/http" - _ "net/http/pprof" "time" - "github.com/labstack/echo/v4" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/fasthttp/router" + "github.com/prometheus/client_golang/prometheus" + http "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/valyala/fasthttp" "go.uber.org/zap" "google.golang.org/grpc/grpclog" ) -type router struct { +type app struct { pool *Pool log *zap.Logger timeout time.Duration key *ecdsa.PrivateKey } +const ( + defaultHealthyMsg = "NeoFS HTTP Gateway is " + defaultContentType = "text/plain; charset=utf-8" +) + func main() { var ( - err error - pool *Pool + err error v = settings() l = newLogger(v) + z = gRPCLogger(l) g = newGracefulContext(l) d = v.GetDuration("rebalance_timer") ) if v.GetBool("verbose") { - grpclog.SetLoggerV2(gRPCLogger(l)) + grpclog.SetLoggerV2(z) } - switch pool, err = newPool(g, l, v); { - case err == nil: - // ignore - case errors.Is(err, context.Canceled): - l.Info("close application") - return - default: - l.Error("could get connection", zap.Error(err)) - return - } - - r := &router{ + a := &app{ log: l, - pool: pool, + pool: newPool(l, v), key: fetchKey(l, v), timeout: v.GetDuration("request_timeout"), } - go checkConnection(g, d, r.pool) + r := router.New() + r.RedirectTrailingSlash = true + r.GET("/get/:cid/:oid", a.receiveFile) - e := echo.New() - e.Debug = false - e.HidePort = true - e.HideBanner = true + r.GET("/-/ready", func(ctx *fasthttp.RequestCtx) { + ctx.SetStatusCode(fasthttp.StatusOK) + ctx.SetBodyString("NeoFS HTTP Gateway is ready") + }) - e.GET("/:cid/:oid", r.receiveFile) + r.GET("/-/healthy", func(c *fasthttp.RequestCtx) { + code := fasthttp.StatusOK + msg := "healthy" + + if err := a.pool.unhealthy.Load(); err != nil { + msg = "unhealthy: " + err.Error() + code = fasthttp.StatusBadRequest + } + + c.Response.Reset() + c.SetStatusCode(code) + c.SetContentType(defaultContentType) + c.SetBodyString(defaultHealthyMsg + msg) + }) // enable metrics if v.GetBool("metrics") { l.Info("enabled /metrics") - e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) + r.GET("/metrics/", metricsHandler(prometheus.DefaultGatherer, http.HandlerOpts{ + ErrorLog: z.(http.Logger), + //ErrorHandling: 0, + //Registry: nil, + //DisableCompression: false, + //MaxRequestsInFlight: 0, + //Timeout: 0, + //EnableOpenMetrics: false, + })) } // enable pprof if v.GetBool("pprof") { l.Info("enabled /debug/pprof") - e.Any("/debug/pprof*", echo.WrapHandler(http.DefaultServeMux)) + r.GET("/debug/pprof/", pprofHandler()) + r.GET("/debug/pprof/:name", pprofHandler()) + } + + en := &fasthttp.Server{ + Name: "neofs-http-gate", + Handler: r.Handler, + ReadBufferSize: 4096, + ReadTimeout: time.Second * 15, + GetOnly: true, + DisableHeaderNamesNormalizing: true, + NoDefaultServerHeader: true, + NoDefaultContentType: true, } go func() { + bind := v.GetString("listen_address") l.Info("run gateway server", - zap.String("address", v.GetString("listen_address"))) + zap.String("address", bind)) - if err := e.Start(v.GetString("listen_address")); err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := en.ListenAndServe(bind); err != nil { l.Panic("could not start server", zap.Error(err)) } }() + go checkConnection(g, d, a.pool) + + a.pool.reBalance(g) + + switch _, err = a.pool.getConnection(g); { + case err == nil: + // ignore + case errors.Is(err, context.Canceled): + // ignore + // l.Info("context canceled") + default: + l.Error("could get connection", zap.Error(err)) + return + } + <-g.Done() - ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) - defer cancel() - - l.Info("stopping server", zap.Error(e.Shutdown(ctx))) + l.Info("web server stopped", zap.Error(en.Shutdown())) } func checkConnection(ctx context.Context, dur time.Duration, p *Pool) { @@ -106,7 +148,8 @@ loop: } } + p.close() tick.Stop() - p.log.Info("stop connection worker") + p.log.Info("connection worker stopped") }