[#171] Sync metrics and pprof configuration

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-07-27 09:52:08 +03:00 committed by Alex Vanin
parent f0e8bde761
commit af732d294c
10 changed files with 248 additions and 209 deletions

65
app.go
View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-http-gw/downloader" "github.com/nspcc-dev/neofs-http-gw/downloader"
"github.com/nspcc-dev/neofs-http-gw/metrics"
"github.com/nspcc-dev/neofs-http-gw/resolver" "github.com/nspcc-dev/neofs-http-gw/resolver"
"github.com/nspcc-dev/neofs-http-gw/response" "github.com/nspcc-dev/neofs-http-gw/response"
"github.com/nspcc-dev/neofs-http-gw/uploader" "github.com/nspcc-dev/neofs-http-gw/uploader"
@ -162,8 +163,8 @@ func newApp(ctx context.Context, opt ...Option) App {
a.log.Info("container resolver is disabled") a.log.Info("container resolver is disabled")
} }
if a.cfg.GetBool(cmdMetrics) { if a.cfg.GetBool(cfgPrometheusEnabled) {
a.metrics = newGateMetrics() a.metrics = metrics.NewGateMetrics()
} }
return a return a
@ -254,11 +255,6 @@ func (a *app) Wait() {
} }
func (a *app) Serve(ctx context.Context) { func (a *app) Serve(ctx context.Context) {
go func() {
<-ctx.Done()
a.log.Info("shutting down web server", zap.Error(a.webServer.Shutdown()))
close(a.webDone)
}()
edts := a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) edts := a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)
uploadRoutes := uploader.New(ctx, a.AppParams(), edts) uploadRoutes := uploader.New(ctx, a.AppParams(), edts)
downloadSettings := downloader.Settings{ZipCompression: a.cfg.GetBool(cfgZipCompression)} downloadSettings := downloader.Settings{ZipCompression: a.cfg.GetBool(cfgZipCompression)}
@ -282,32 +278,45 @@ func (a *app) Serve(ctx context.Context) {
a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}") a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}")
r.GET("/zip/{cid}/{prefix:*}", a.logger(downloadRoutes.DownloadZipped)) r.GET("/zip/{cid}/{prefix:*}", a.logger(downloadRoutes.DownloadZipped))
a.log.Info("added path /zip/{cid}/{prefix}") a.log.Info("added path /zip/{cid}/{prefix}")
// enable metrics
if a.cfg.GetBool(cmdMetrics) { pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
a.log.Info("added path /metrics/") pprof := metrics.NewPprofService(a.log, pprofConfig)
attachMetrics(r, a.log) prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
} prometheus := metrics.NewPrometheusService(a.log, prometheusConfig)
// enable pprof
if a.cfg.GetBool(cmdPprof) {
a.log.Info("added path /debug/pprof/")
attachProfiler(r)
}
bind := a.cfg.GetString(cfgListenAddress) bind := a.cfg.GetString(cfgListenAddress)
tlsCertPath := a.cfg.GetString(cfgTLSCertificate) tlsCertPath := a.cfg.GetString(cfgTLSCertificate)
tlsKeyPath := a.cfg.GetString(cfgTLSKey) tlsKeyPath := a.cfg.GetString(cfgTLSKey)
a.webServer.Handler = r.Handler a.webServer.Handler = r.Handler
var err error
if tlsCertPath == "" && tlsKeyPath == "" { go pprof.Start()
a.log.Info("running web server", zap.String("address", bind)) go prometheus.Start()
err = a.webServer.ListenAndServe(bind)
} else { go func() {
a.log.Info("running web server (TLS-enabled)", zap.String("address", bind)) var err error
err = a.webServer.ListenAndServeTLS(bind, tlsCertPath, tlsKeyPath) if tlsCertPath == "" && tlsKeyPath == "" {
} a.log.Info("running web server", zap.String("address", bind))
if err != nil { err = a.webServer.ListenAndServe(bind)
a.log.Fatal("could not start server", zap.Error(err)) } else {
} a.log.Info("running web server (TLS-enabled)", zap.String("address", bind))
err = a.webServer.ListenAndServeTLS(bind, tlsCertPath, tlsKeyPath)
}
if err != nil {
a.log.Fatal("could not start server", zap.Error(err))
}
}()
<-ctx.Done()
a.log.Info("shutting down web server", zap.Error(a.webServer.Shutdown()))
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()
pprof.ShutDown(ctx)
prometheus.ShutDown(ctx)
close(a.webDone)
} }
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler { func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {

View file

@ -8,9 +8,12 @@ HTTP_GW_WALLET_ADDRESS=NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP
HTTP_GW_WALLET_PASSPHRASE=pwd HTTP_GW_WALLET_PASSPHRASE=pwd
# Enable metrics. # Enable metrics.
HTTP_GW_METRICS=true HTTP_GW_PPROF_ENABLED=true
# Enable pprof. HTTP_GW_PPROF_ADDRESS=localhost:8083
HTTP_GW_PPROF=true
HTTP_GW_PROMETHEUS_ENABLED=true
HTTP_GW_PROMETHEUS_ADDRESS=localhost:8084
# Log level. # Log level.
HTTP_GW_LOGGER_LEVEL=debug HTTP_GW_LOGGER_LEVEL=debug

View file

@ -3,8 +3,13 @@ wallet:
address: NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP # Account address. If omitted default one will be used. address: NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP # Account address. If omitted default one will be used.
passphrase: pwd # Passphrase to decrypt wallet. If you're using a wallet without a password, place '' here. passphrase: pwd # Passphrase to decrypt wallet. If you're using a wallet without a password, place '' here.
metrics: true # Enable metrics. pprof:
pprof: true # Enable pprof. enabled: true # Enable pprof.
address: localhost:8083
prometheus:
enabled: true # Enable metrics.
address: localhost:8084
logger: logger:
level: debug # Log level. level: debug # Log level.

View file

@ -19,6 +19,9 @@ There are some custom types used for brevity:
| `web` | [Web configuration](#web-section) | | `web` | [Web configuration](#web-section) |
| `upload-header` | [Upload header configuration](#upload-header-section) | | `upload-header` | [Upload header configuration](#upload-header-section) |
| `zip` | [ZIP configuration](#zip-section) | | `zip` | [ZIP configuration](#zip-section) |
| `pprof` | [Pprof configuration](#pprof-section) |
| `prometheus` | [Prometheus configuration](#prometheus-section) |
# General section # General section
@ -32,9 +35,6 @@ resolve_order:
- nns - nns
- dns - dns
metrics: false
pprof: false
connect_timeout: 5s connect_timeout: 5s
request_timeout: 5s request_timeout: 5s
rebalance_timer: 30s rebalance_timer: 30s
@ -47,8 +47,6 @@ rebalance_timer: 30s
| `tls_key` | `string` | | Path to the TLS key. | | `tls_key` | `string` | | Path to the TLS key. |
| `rpc_endpoint` | `string` | | The address of the RPC host to which the gateway connects to resolve bucket names. | | `rpc_endpoint` | `string` | | The address of the RPC host to which the gateway connects to resolve bucket names. |
| `resolve_order` | `[]string` | `[nns, dns]` | Order of bucket name resolvers to use. | | `resolve_order` | `[]string` | `[nns, dns]` | Order of bucket name resolvers to use. |
| `metrics` | `bool` | `false` | Flag to enable and expose the prometheus metrics. |
| `pprof` | `bool` | `false` | Flag to enable the profiler. |
| `connect_timeout` | `duration` | `10s` | Timeout to connect to a node. | | `connect_timeout` | `duration` | `10s` | Timeout to connect to a node. |
| `request_timeout` | `duration` | `15s` | Timeout to check node health during rebalance. | | `request_timeout` | `duration` | `15s` | Timeout to check node health during rebalance. |
| `rebalance_timer` | `duration` | `60s` | Interval to check node health. | | `rebalance_timer` | `duration` | `60s` | Interval to check node health. |
@ -156,3 +154,33 @@ zip:
|---------------|--------|---------------|--------------------------------------------------------------| |---------------|--------|---------------|--------------------------------------------------------------|
| `compression` | `bool` | `false` | Enable zip compression when download files by common prefix. | | `compression` | `bool` | `false` | Enable zip compression when download files by common prefix. |
# `pprof` section
Contains configuration for the `pprof` profiler.
```yaml
pprof:
enabled: true
address: localhost:8083
```
| Parameter | Type | Default value | Description |
|-----------|----------|------------------|-----------------------------------------|
| `enabled` | `bool` | `false` | Flag to enable the service. |
| `address` | `string` | `localhost:8083` | Address that service listener binds to. |
# `prometheus` section
Contains configuration for the `prometheus` metrics service.
```yaml
prometheus:
enabled: true
address: localhost:8084
```
| Parameter | Type | Default value | Description |
|-----------|----------|------------------|-----------------------------------------|
| `enabled` | `bool` | `false` | Flag to enable the service. |
| `address` | `string` | `localhost:8084` | Address that service listener binds to. |

View file

@ -1,127 +0,0 @@
package main
import (
"github.com/fasthttp/router"
"github.com/nspcc-dev/neofs-http-gw/response"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
const (
namespace = "neofs_http_gw"
stateSubsystem = "state"
)
type GateMetrics struct {
stateMetrics
}
type stateMetrics struct {
healthCheck prometheus.Gauge
}
func newGateMetrics() *GateMetrics {
stateMetric := newStateMetrics()
stateMetric.register()
return &GateMetrics{
stateMetrics: *stateMetric,
}
}
func newStateMetrics() *stateMetrics {
return &stateMetrics{
healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: stateSubsystem,
Name: "health",
Help: "Current HTTP gateway state",
}),
}
}
func (m stateMetrics) register() {
prometheus.MustRegister(m.healthCheck)
}
func (m stateMetrics) SetHealth(s int32) {
m.healthCheck.Set(float64(s))
}
func attachMetrics(r *router.Router, l *zap.Logger) {
r.GET("/metrics/", metricsHandler(prometheus.DefaultGatherer, l))
}
func metricsHandler(reg prometheus.Gatherer, logger *zap.Logger) fasthttp.RequestHandler {
var (
inFlightSem chan struct{}
errCnt = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "promhttp_metric_handler_errors_total",
Help: "Total number of internal errors encountered by the promhttp metric handler.",
},
[]string{"cause"},
)
)
h := fasthttp.RequestHandler(func(c *fasthttp.RequestCtx) {
if inFlightSem != nil {
select {
case inFlightSem <- struct{}{}: // All good, carry on.
defer func() { <-inFlightSem }()
default:
response.Error(c, "Limit of concurrent requests reached, try again later.", fasthttp.StatusServiceUnavailable)
return
}
}
mfs, err := reg.Gather()
if err != nil {
if logger != nil {
panic("error gathering metrics:" + err.Error())
}
errCnt.WithLabelValues("gathering").Inc()
response.Error(c, err.Error(), fasthttp.StatusServiceUnavailable)
return
}
contentType := expfmt.FmtText
c.SetContentType(string(contentType))
enc := expfmt.NewEncoder(c, contentType)
var lastErr error
// handleError handles the error according to opts.ErrorHandling
// and returns true if we have to abort after the handling.
handleError := func(err error) bool {
if err == nil {
return false
}
lastErr = err
if logger != nil {
logger.Error("encoding and sending metric family", zap.Error(err))
}
errCnt.WithLabelValues("encoding").Inc()
response.Error(c, err.Error(), fasthttp.StatusServiceUnavailable)
return true
}
for _, mf := range mfs {
if handleError(enc.Encode(mf)) {
return
}
}
if closer, ok := enc.(expfmt.Closer); ok {
// This in particular takes care of the final "# EOF\n" line for OpenMetrics.
if handleError(closer.Close()) {
return
}
}
handleError(lastErr)
})
return h
}

68
metrics/metrics.go Normal file
View file

@ -0,0 +1,68 @@
package metrics
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
const (
namespace = "neofs_http_gw"
stateSubsystem = "state"
)
type GateMetrics struct {
stateMetrics
}
type stateMetrics struct {
healthCheck prometheus.Gauge
}
// NewGateMetrics creates new metrics for http gate.
func NewGateMetrics() *GateMetrics {
stateMetric := newStateMetrics()
stateMetric.register()
return &GateMetrics{
stateMetrics: *stateMetric,
}
}
func newStateMetrics() *stateMetrics {
return &stateMetrics{
healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: stateSubsystem,
Name: "health",
Help: "Current HTTP gateway state",
}),
}
}
func (m stateMetrics) register() {
prometheus.MustRegister(m.healthCheck)
}
func (m stateMetrics) SetHealth(s int32) {
m.healthCheck.Set(float64(s))
}
// NewPrometheusService creates a new service for gathering prometheus metrics.
func NewPrometheusService(log *zap.Logger, cfg Config) *Service {
if log == nil {
return nil
}
return &Service{
Server: &http.Server{
Addr: cfg.Address,
Handler: promhttp.Handler(),
},
enabled: cfg.Enabled,
serviceType: "Prometheus",
log: log.With(zap.String("service", "Prometheus")),
}
}

33
metrics/pprof.go Normal file
View file

@ -0,0 +1,33 @@
package metrics
import (
"net/http"
"net/http/pprof"
"go.uber.org/zap"
)
// NewPprofService creates a new service for gathering pprof metrics.
func NewPprofService(l *zap.Logger, cfg Config) *Service {
handler := http.NewServeMux()
handler.HandleFunc("/debug/pprof/", pprof.Index)
handler.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
handler.HandleFunc("/debug/pprof/profile", pprof.Profile)
handler.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
handler.HandleFunc("/debug/pprof/trace", pprof.Trace)
// Manually add support for paths linked to by index page at /debug/pprof/
for _, item := range []string{"allocs", "block", "heap", "goroutine", "mutex", "threadcreate"} {
handler.Handle("/debug/pprof/"+item, pprof.Handler(item))
}
return &Service{
Server: &http.Server{
Addr: cfg.Address,
Handler: handler,
},
enabled: cfg.Enabled,
serviceType: "Pprof",
log: l.With(zap.String("service", "Pprof")),
}
}

44
metrics/service.go Normal file
View file

@ -0,0 +1,44 @@
package metrics
import (
"context"
"net/http"
"go.uber.org/zap"
)
// Service serves metrics.
type Service struct {
*http.Server
enabled bool
log *zap.Logger
serviceType string
}
// Config is a params to configure service.
type Config struct {
Address string
Enabled bool
}
// Start runs http service with the exposed endpoint on the configured port.
func (ms *Service) Start() {
if ms.enabled {
ms.log.Info("service is running", zap.String("endpoint", ms.Addr))
err := ms.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
ms.log.Warn("service couldn't start on configured port")
}
} else {
ms.log.Info("service hasn't started since it's disabled")
}
}
// ShutDown stops the service.
func (ms *Service) ShutDown(ctx context.Context) {
ms.log.Info("shutting down service", zap.String("endpoint", ms.Addr))
err := ms.Shutdown(ctx)
if err != nil {
ms.log.Panic("can't shut down service")
}
}

View file

@ -1,44 +0,0 @@
package main
import (
"net/http/pprof"
rtp "runtime/pprof"
"github.com/fasthttp/router"
"github.com/nspcc-dev/neofs-http-gw/response"
"github.com/valyala/fasthttp"
"github.com/valyala/fasthttp/fasthttpadaptor"
)
func attachProfiler(r *router.Router) {
r.GET("/debug/pprof/", pprofHandler())
r.GET("/debug/pprof/{name}/", pprofHandler())
}
func pprofHandler() fasthttp.RequestHandler {
items := rtp.Profiles()
profiles := map[string]fasthttp.RequestHandler{
"": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Index),
"cmdline": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Cmdline),
"profile": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Profile),
"symbol": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Symbol),
"trace": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Trace),
}
for i := range items {
name := items[i].Name()
profiles[name] = fasthttpadaptor.NewFastHTTPHandler(pprof.Handler(name))
}
return func(ctx *fasthttp.RequestCtx) {
name, _ := ctx.UserValue("name").(string)
if handler, ok := profiles[name]; ok {
handler(ctx)
return
}
response.Error(ctx, "Not found", fasthttp.StatusNotFound)
}
}

View file

@ -20,6 +20,8 @@ const (
defaultRequestTimeout = 15 * time.Second defaultRequestTimeout = 15 * time.Second
defaultConnectTimeout = 10 * time.Second defaultConnectTimeout = 10 * time.Second
defaultShutdownTimeout = 15 * time.Second
cfgListenAddress = "listen_address" cfgListenAddress = "listen_address"
cfgTLSCertificate = "tls_certificate" cfgTLSCertificate = "tls_certificate"
cfgTLSKey = "tls_key" cfgTLSKey = "tls_key"
@ -32,6 +34,12 @@ const (
cfgWebStreamRequestBody = "web.stream_request_body" cfgWebStreamRequestBody = "web.stream_request_body"
cfgWebMaxRequestBodySize = "web.max_request_body_size" cfgWebMaxRequestBodySize = "web.max_request_body_size"
// Metrics / Profiler.
cfgPrometheusEnabled = "prometheus.enabled"
cfgPrometheusAddress = "prometheus.address"
cfgPprofEnabled = "pprof.enabled"
cfgPprofAddress = "pprof.address"
// Timeouts. // Timeouts.
cfgConTimeout = "connect_timeout" cfgConTimeout = "connect_timeout"
cfgReqTimeout = "request_timeout" cfgReqTimeout = "request_timeout"
@ -128,6 +136,18 @@ func settings() *viper.Viper {
// zip: // zip:
v.SetDefault(cfgZipCompression, false) v.SetDefault(cfgZipCompression, false)
// metrics
v.SetDefault(cfgPprofAddress, "localhost:8083")
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
// Binding flags
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
panic(err)
}
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
panic(err)
}
if err := v.BindPFlags(flags); err != nil { if err := v.BindPFlags(flags); err != nil {
panic(err) panic(err)
} }