[#200] Reload config level and metrics on SIGHUP

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-09-08 17:57:22 +03:00 committed by Kirillov Denis
parent 25c9bc81fa
commit 939f5f0c65
4 changed files with 226 additions and 109 deletions

196
app.go
View file

@ -4,7 +4,11 @@ import (
"context"
"crypto/ecdsa"
"fmt"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"github.com/fasthttp/router"
"github.com/nspcc-dev/neo-go/cli/flags"
@ -28,13 +32,15 @@ import (
type (
app struct {
log *zap.Logger
logLevel zap.AtomicLevel
pool *pool.Pool
owner *user.ID
cfg *viper.Viper
webServer *fasthttp.Server
webDone chan struct{}
resolver *resolver.ContainerResolver
metrics GateMetricsProvider
metrics *gateMetrics
services []*metrics.Service
}
// App is an interface for the main gateway function.
@ -46,18 +52,26 @@ type (
// Option is an application option.
Option func(a *app)
gateMetrics struct {
logger *zap.Logger
provider GateMetricsProvider
mu sync.RWMutex
enabled bool
}
GateMetricsProvider interface {
SetHealth(int32)
}
)
// WithLogger returns Option to set a specific logger.
func WithLogger(l *zap.Logger) Option {
func WithLogger(l *zap.Logger, lvl zap.AtomicLevel) Option {
return func(a *app) {
if l == nil {
return
}
a.log = l
a.logLevel = lvl
}
}
@ -164,13 +178,47 @@ func newApp(ctx context.Context, opt ...Option) App {
a.log.Info("container resolver is disabled")
}
if a.cfg.GetBool(cfgPrometheusEnabled) {
a.metrics = metrics.NewGateMetrics(a.pool)
}
a.initMetrics()
return a
}
func (a *app) initMetrics() {
gateMetricsProvider := metrics.NewGateMetrics(a.pool)
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
}
func newGateMetrics(logger *zap.Logger, provider GateMetricsProvider, enabled bool) *gateMetrics {
if !enabled {
logger.Warn("metrics are disabled")
}
return &gateMetrics{
logger: logger,
provider: provider,
}
}
func (m *gateMetrics) SetEnabled(enabled bool) {
if !enabled {
m.logger.Warn("metrics are disabled")
}
m.mu.Lock()
m.enabled = enabled
m.mu.Unlock()
}
func (m *gateMetrics) SetHealth(status int32) {
m.mu.RLock()
if !m.enabled {
m.mu.RUnlock()
return
}
m.mu.RUnlock()
m.provider.SetHealth(status)
}
func remove(list []string, element string) []string {
for i, item := range list {
if item == element {
@ -242,19 +290,110 @@ func getKeyFromWallet(w *wallet.Wallet, addrStr string, password *string) (*ecds
func (a *app) Wait() {
a.log.Info("starting application", zap.String("app_name", "neofs-http-gw"), zap.String("version", Version))
if a.metrics != nil {
a.metrics.SetHealth(1)
}
a.setHealthStatus()
<-a.webDone // wait for web-server to be stopped
}
func (a *app) setHealthStatus() {
a.metrics.SetHealth(1)
}
func (a *app) Serve(ctx context.Context) {
edts := a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)
uploadRoutes := uploader.New(ctx, a.AppParams(), edts)
downloadSettings := downloader.Settings{ZipCompression: a.cfg.GetBool(cfgZipCompression)}
downloadRoutes := downloader.New(ctx, a.AppParams(), downloadSettings)
// Configure router.
a.configureRouter(uploadRoutes, downloadRoutes)
a.startServices()
bind := a.cfg.GetString(cfgListenAddress)
tlsCertPath := a.cfg.GetString(cfgTLSCertificate)
tlsKeyPath := a.cfg.GetString(cfgTLSKey)
go func() {
var err error
if tlsCertPath == "" && tlsKeyPath == "" {
a.log.Info("running web server", zap.String("address", bind))
err = a.webServer.ListenAndServe(bind)
} else {
a.log.Info("running web server (TLS-enabled)", zap.String("address", bind))
err = a.webServer.ListenAndServeTLS(bind, tlsCertPath, tlsKeyPath)
}
if err != nil {
a.log.Fatal("could not start server", zap.Error(err))
}
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
case <-sigs:
a.configReload()
}
}
a.log.Info("shutting down web server", zap.Error(a.webServer.Shutdown()))
a.stopServices()
close(a.webDone)
}
func (a *app) configReload() {
a.log.Info("SIGHUP config reload")
if !a.cfg.IsSet(cmdConfig) {
a.log.Warn("failed to reload config because it's missed")
return
}
if err := readConfig(a.cfg); err != nil {
a.log.Warn("failed to reload config", zap.Error(err))
return
}
if lvl, err := getLogLevel(a.cfg); err != nil {
a.log.Warn("log level won't be updated", zap.Error(err))
} else {
a.logLevel.SetLevel(lvl)
}
a.stopServices()
a.startServices()
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.setHealthStatus()
}
func (a *app) startServices() {
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
pprofService := metrics.NewPprofService(a.log, pprofConfig)
a.services = append(a.services, pprofService)
go pprofService.Start()
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
a.services = append(a.services, prometheusService)
go prometheusService.Start()
}
func (a *app) stopServices() {
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()
for _, svc := range a.services {
svc.ShutDown(ctx)
}
}
func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *downloader.Downloader) {
r := router.New()
r.RedirectTrailingSlash = true
r.NotFound = func(r *fasthttp.RequestCtx) {
@ -274,55 +413,18 @@ func (a *app) Serve(ctx context.Context) {
r.GET("/zip/{cid}/{prefix:*}", a.logger(downloadRoutes.DownloadZipped))
a.log.Info("added path /zip/{cid}/{prefix}")
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
pprof := metrics.NewPprofService(a.log, pprofConfig)
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
prometheus := metrics.NewPrometheusService(a.log, prometheusConfig)
bind := a.cfg.GetString(cfgListenAddress)
tlsCertPath := a.cfg.GetString(cfgTLSCertificate)
tlsKeyPath := a.cfg.GetString(cfgTLSKey)
a.webServer.Handler = r.Handler
go pprof.Start()
go prometheus.Start()
go func() {
var err error
if tlsCertPath == "" && tlsKeyPath == "" {
a.log.Info("running web server", zap.String("address", bind))
err = a.webServer.ListenAndServe(bind)
} else {
a.log.Info("running web server (TLS-enabled)", zap.String("address", bind))
err = a.webServer.ListenAndServeTLS(bind, tlsCertPath, tlsKeyPath)
}
if err != nil {
a.log.Fatal("could not start server", zap.Error(err))
}
}()
<-ctx.Done()
a.log.Info("shutting down web server", zap.Error(a.webServer.Shutdown()))
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()
pprof.ShutDown(ctx)
prometheus.ShutDown(ctx)
close(a.webDone)
}
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return fasthttp.RequestHandler(func(ctx *fasthttp.RequestCtx) {
return func(ctx *fasthttp.RequestCtx) {
a.log.Info("request", zap.String("remote", ctx.RemoteAddr().String()),
zap.ByteString("method", ctx.Method()),
zap.ByteString("path", ctx.Path()),
zap.ByteString("query", ctx.QueryArgs().QueryString()),
zap.Uint64("id", ctx.ID()))
h(ctx)
})
}
}
func (a *app) AppParams() *utils.AppParams {