refactoring application: use fasthttp instead echo, add healthcheck handlers, run web server immidiently, close pool connections when stop

This commit is contained in:
Evgeniy Kulikov 2020-02-28 20:07:50 +03:00
parent 7790684726
commit 8aef59cdbd
No known key found for this signature in database
GPG key ID: BF6AEE0A2A699BF2

115
main.go
View file

@ -4,92 +4,134 @@ import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"errors" "errors"
"net/http"
_ "net/http/pprof"
"time" "time"
"github.com/labstack/echo/v4" "github.com/fasthttp/router"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus"
http "github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/valyala/fasthttp"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
) )
type router struct { type app struct {
pool *Pool pool *Pool
log *zap.Logger log *zap.Logger
timeout time.Duration timeout time.Duration
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
} }
const (
defaultHealthyMsg = "NeoFS HTTP Gateway is "
defaultContentType = "text/plain; charset=utf-8"
)
func main() { func main() {
var ( var (
err error err error
pool *Pool
v = settings() v = settings()
l = newLogger(v) l = newLogger(v)
z = gRPCLogger(l)
g = newGracefulContext(l) g = newGracefulContext(l)
d = v.GetDuration("rebalance_timer") d = v.GetDuration("rebalance_timer")
) )
if v.GetBool("verbose") { if v.GetBool("verbose") {
grpclog.SetLoggerV2(gRPCLogger(l)) grpclog.SetLoggerV2(z)
} }
switch pool, err = newPool(g, l, v); { a := &app{
case err == nil:
// ignore
case errors.Is(err, context.Canceled):
l.Info("close application")
return
default:
l.Error("could get connection", zap.Error(err))
return
}
r := &router{
log: l, log: l,
pool: pool, pool: newPool(l, v),
key: fetchKey(l, v), key: fetchKey(l, v),
timeout: v.GetDuration("request_timeout"), timeout: v.GetDuration("request_timeout"),
} }
go checkConnection(g, d, r.pool) r := router.New()
r.RedirectTrailingSlash = true
r.GET("/get/:cid/:oid", a.receiveFile)
e := echo.New() r.GET("/-/ready", func(ctx *fasthttp.RequestCtx) {
e.Debug = false ctx.SetStatusCode(fasthttp.StatusOK)
e.HidePort = true ctx.SetBodyString("NeoFS HTTP Gateway is ready")
e.HideBanner = true })
e.GET("/:cid/:oid", r.receiveFile) r.GET("/-/healthy", func(c *fasthttp.RequestCtx) {
code := fasthttp.StatusOK
msg := "healthy"
if err := a.pool.unhealthy.Load(); err != nil {
msg = "unhealthy: " + err.Error()
code = fasthttp.StatusBadRequest
}
c.Response.Reset()
c.SetStatusCode(code)
c.SetContentType(defaultContentType)
c.SetBodyString(defaultHealthyMsg + msg)
})
// enable metrics // enable metrics
if v.GetBool("metrics") { if v.GetBool("metrics") {
l.Info("enabled /metrics") l.Info("enabled /metrics")
e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) r.GET("/metrics/", metricsHandler(prometheus.DefaultGatherer, http.HandlerOpts{
ErrorLog: z.(http.Logger),
//ErrorHandling: 0,
//Registry: nil,
//DisableCompression: false,
//MaxRequestsInFlight: 0,
//Timeout: 0,
//EnableOpenMetrics: false,
}))
} }
// enable pprof // enable pprof
if v.GetBool("pprof") { if v.GetBool("pprof") {
l.Info("enabled /debug/pprof") l.Info("enabled /debug/pprof")
e.Any("/debug/pprof*", echo.WrapHandler(http.DefaultServeMux)) r.GET("/debug/pprof/", pprofHandler())
r.GET("/debug/pprof/:name", pprofHandler())
}
en := &fasthttp.Server{
Name: "neofs-http-gate",
Handler: r.Handler,
ReadBufferSize: 4096,
ReadTimeout: time.Second * 15,
GetOnly: true,
DisableHeaderNamesNormalizing: true,
NoDefaultServerHeader: true,
NoDefaultContentType: true,
} }
go func() { go func() {
bind := v.GetString("listen_address")
l.Info("run gateway server", l.Info("run gateway server",
zap.String("address", v.GetString("listen_address"))) zap.String("address", bind))
if err := e.Start(v.GetString("listen_address")); err != nil && !errors.Is(err, http.ErrServerClosed) { if err := en.ListenAndServe(bind); err != nil {
l.Panic("could not start server", zap.Error(err)) l.Panic("could not start server", zap.Error(err))
} }
}() }()
go checkConnection(g, d, a.pool)
a.pool.reBalance(g)
switch _, err = a.pool.getConnection(g); {
case err == nil:
// ignore
case errors.Is(err, context.Canceled):
// ignore
// l.Info("context canceled")
default:
l.Error("could get connection", zap.Error(err))
return
}
<-g.Done() <-g.Done()
ctx, cancel := context.WithTimeout(context.TODO(), time.Second*30) l.Info("web server stopped", zap.Error(en.Shutdown()))
defer cancel()
l.Info("stopping server", zap.Error(e.Shutdown(ctx)))
} }
func checkConnection(ctx context.Context, dur time.Duration, p *Pool) { func checkConnection(ctx context.Context, dur time.Duration, p *Pool) {
@ -106,7 +148,8 @@ loop:
} }
} }
p.close()
tick.Stop() tick.Stop()
p.log.Info("stop connection worker") p.log.Info("connection worker stopped")
} }