From af732d294c5c512ae9e41083dae19f6e8dfe3152 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 27 Jul 2022 09:52:08 +0300 Subject: [PATCH] [#171] Sync metrics and pprof configuration Signed-off-by: Denis Kirillov --- app.go | 65 +++++++++++-------- config/config.env | 9 ++- config/config.yaml | 9 ++- docs/gate-configuration.md | 38 +++++++++-- metrics.go | 127 ------------------------------------- metrics/metrics.go | 68 ++++++++++++++++++++ metrics/pprof.go | 33 ++++++++++ metrics/service.go | 44 +++++++++++++ pprof.go | 44 ------------- settings.go | 20 ++++++ 10 files changed, 248 insertions(+), 209 deletions(-) delete mode 100644 metrics.go create mode 100644 metrics/metrics.go create mode 100644 metrics/pprof.go create mode 100644 metrics/service.go delete mode 100644 pprof.go diff --git a/app.go b/app.go index 55ab986..ce51cef 100644 --- a/app.go +++ b/app.go @@ -13,6 +13,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neofs-http-gw/downloader" + "github.com/nspcc-dev/neofs-http-gw/metrics" "github.com/nspcc-dev/neofs-http-gw/resolver" "github.com/nspcc-dev/neofs-http-gw/response" "github.com/nspcc-dev/neofs-http-gw/uploader" @@ -162,8 +163,8 @@ func newApp(ctx context.Context, opt ...Option) App { a.log.Info("container resolver is disabled") } - if a.cfg.GetBool(cmdMetrics) { - a.metrics = newGateMetrics() + if a.cfg.GetBool(cfgPrometheusEnabled) { + a.metrics = metrics.NewGateMetrics() } return a @@ -254,11 +255,6 @@ func (a *app) Wait() { } func (a *app) Serve(ctx context.Context) { - go func() { - <-ctx.Done() - a.log.Info("shutting down web server", zap.Error(a.webServer.Shutdown())) - close(a.webDone) - }() edts := a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) uploadRoutes := uploader.New(ctx, a.AppParams(), edts) downloadSettings := downloader.Settings{ZipCompression: a.cfg.GetBool(cfgZipCompression)} @@ -282,32 +278,45 @@ func (a *app) Serve(ctx context.Context) { a.log.Info("added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}") r.GET("/zip/{cid}/{prefix:*}", a.logger(downloadRoutes.DownloadZipped)) a.log.Info("added path /zip/{cid}/{prefix}") - // enable metrics - if a.cfg.GetBool(cmdMetrics) { - a.log.Info("added path /metrics/") - attachMetrics(r, a.log) - } - // enable pprof - if a.cfg.GetBool(cmdPprof) { - a.log.Info("added path /debug/pprof/") - attachProfiler(r) - } + + pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)} + pprof := metrics.NewPprofService(a.log, pprofConfig) + prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)} + prometheus := metrics.NewPrometheusService(a.log, prometheusConfig) + bind := a.cfg.GetString(cfgListenAddress) tlsCertPath := a.cfg.GetString(cfgTLSCertificate) tlsKeyPath := a.cfg.GetString(cfgTLSKey) a.webServer.Handler = r.Handler - var err error - if tlsCertPath == "" && tlsKeyPath == "" { - a.log.Info("running web server", zap.String("address", bind)) - err = a.webServer.ListenAndServe(bind) - } else { - a.log.Info("running web server (TLS-enabled)", zap.String("address", bind)) - err = a.webServer.ListenAndServeTLS(bind, tlsCertPath, tlsKeyPath) - } - if err != nil { - a.log.Fatal("could not start server", zap.Error(err)) - } + + go pprof.Start() + go prometheus.Start() + + go func() { + var err error + if tlsCertPath == "" && tlsKeyPath == "" { + a.log.Info("running web server", zap.String("address", bind)) + err = a.webServer.ListenAndServe(bind) + } else { + a.log.Info("running web server (TLS-enabled)", zap.String("address", bind)) + err = a.webServer.ListenAndServeTLS(bind, tlsCertPath, tlsKeyPath) + } + if err != nil { + a.log.Fatal("could not start server", zap.Error(err)) + } + }() + + <-ctx.Done() + a.log.Info("shutting down web server", zap.Error(a.webServer.Shutdown())) + + ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) + defer cancel() + + pprof.ShutDown(ctx) + prometheus.ShutDown(ctx) + + close(a.webDone) } func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler { diff --git a/config/config.env b/config/config.env index f4f0f07..7f7b406 100644 --- a/config/config.env +++ b/config/config.env @@ -8,9 +8,12 @@ HTTP_GW_WALLET_ADDRESS=NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP HTTP_GW_WALLET_PASSPHRASE=pwd # Enable metrics. -HTTP_GW_METRICS=true -# Enable pprof. -HTTP_GW_PPROF=true +HTTP_GW_PPROF_ENABLED=true +HTTP_GW_PPROF_ADDRESS=localhost:8083 + +HTTP_GW_PROMETHEUS_ENABLED=true +HTTP_GW_PROMETHEUS_ADDRESS=localhost:8084 + # Log level. HTTP_GW_LOGGER_LEVEL=debug diff --git a/config/config.yaml b/config/config.yaml index ef77302..d3b5e9a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -3,8 +3,13 @@ wallet: address: NfgHwwTi3wHAS8aFAN243C5vGbkYDpqLHP # Account address. If omitted default one will be used. passphrase: pwd # Passphrase to decrypt wallet. If you're using a wallet without a password, place '' here. -metrics: true # Enable metrics. -pprof: true # Enable pprof. +pprof: + enabled: true # Enable pprof. + address: localhost:8083 +prometheus: + enabled: true # Enable metrics. + address: localhost:8084 + logger: level: debug # Log level. diff --git a/docs/gate-configuration.md b/docs/gate-configuration.md index 0950e5d..040ae1b 100644 --- a/docs/gate-configuration.md +++ b/docs/gate-configuration.md @@ -19,6 +19,9 @@ There are some custom types used for brevity: | `web` | [Web configuration](#web-section) | | `upload-header` | [Upload header configuration](#upload-header-section) | | `zip` | [ZIP configuration](#zip-section) | +| `pprof` | [Pprof configuration](#pprof-section) | +| `prometheus` | [Prometheus configuration](#prometheus-section) | + # General section @@ -32,9 +35,6 @@ resolve_order: - nns - dns -metrics: false -pprof: false - connect_timeout: 5s request_timeout: 5s rebalance_timer: 30s @@ -47,8 +47,6 @@ rebalance_timer: 30s | `tls_key` | `string` | | Path to the TLS key. | | `rpc_endpoint` | `string` | | The address of the RPC host to which the gateway connects to resolve bucket names. | | `resolve_order` | `[]string` | `[nns, dns]` | Order of bucket name resolvers to use. | -| `metrics` | `bool` | `false` | Flag to enable and expose the prometheus metrics. | -| `pprof` | `bool` | `false` | Flag to enable the profiler. | | `connect_timeout` | `duration` | `10s` | Timeout to connect to a node. | | `request_timeout` | `duration` | `15s` | Timeout to check node health during rebalance. | | `rebalance_timer` | `duration` | `60s` | Interval to check node health. | @@ -156,3 +154,33 @@ zip: |---------------|--------|---------------|--------------------------------------------------------------| | `compression` | `bool` | `false` | Enable zip compression when download files by common prefix. | + +# `pprof` section + +Contains configuration for the `pprof` profiler. + +```yaml +pprof: + enabled: true + address: localhost:8083 +``` + +| Parameter | Type | Default value | Description | +|-----------|----------|------------------|-----------------------------------------| +| `enabled` | `bool` | `false` | Flag to enable the service. | +| `address` | `string` | `localhost:8083` | Address that service listener binds to. | + +# `prometheus` section + +Contains configuration for the `prometheus` metrics service. + +```yaml +prometheus: + enabled: true + address: localhost:8084 +``` + +| Parameter | Type | Default value | Description | +|-----------|----------|------------------|-----------------------------------------| +| `enabled` | `bool` | `false` | Flag to enable the service. | +| `address` | `string` | `localhost:8084` | Address that service listener binds to. | diff --git a/metrics.go b/metrics.go deleted file mode 100644 index bb92b6a..0000000 --- a/metrics.go +++ /dev/null @@ -1,127 +0,0 @@ -package main - -import ( - "github.com/fasthttp/router" - "github.com/nspcc-dev/neofs-http-gw/response" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/expfmt" - "github.com/valyala/fasthttp" - "go.uber.org/zap" -) - -const ( - namespace = "neofs_http_gw" - stateSubsystem = "state" -) - -type GateMetrics struct { - stateMetrics -} - -type stateMetrics struct { - healthCheck prometheus.Gauge -} - -func newGateMetrics() *GateMetrics { - stateMetric := newStateMetrics() - stateMetric.register() - - return &GateMetrics{ - stateMetrics: *stateMetric, - } -} - -func newStateMetrics() *stateMetrics { - return &stateMetrics{ - healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: stateSubsystem, - Name: "health", - Help: "Current HTTP gateway state", - }), - } -} - -func (m stateMetrics) register() { - prometheus.MustRegister(m.healthCheck) -} - -func (m stateMetrics) SetHealth(s int32) { - m.healthCheck.Set(float64(s)) -} - -func attachMetrics(r *router.Router, l *zap.Logger) { - r.GET("/metrics/", metricsHandler(prometheus.DefaultGatherer, l)) -} - -func metricsHandler(reg prometheus.Gatherer, logger *zap.Logger) fasthttp.RequestHandler { - var ( - inFlightSem chan struct{} - errCnt = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "promhttp_metric_handler_errors_total", - Help: "Total number of internal errors encountered by the promhttp metric handler.", - }, - []string{"cause"}, - ) - ) - - h := fasthttp.RequestHandler(func(c *fasthttp.RequestCtx) { - if inFlightSem != nil { - select { - case inFlightSem <- struct{}{}: // All good, carry on. - defer func() { <-inFlightSem }() - default: - response.Error(c, "Limit of concurrent requests reached, try again later.", fasthttp.StatusServiceUnavailable) - return - } - } - mfs, err := reg.Gather() - if err != nil { - if logger != nil { - panic("error gathering metrics:" + err.Error()) - } - - errCnt.WithLabelValues("gathering").Inc() - response.Error(c, err.Error(), fasthttp.StatusServiceUnavailable) - return - } - - contentType := expfmt.FmtText - c.SetContentType(string(contentType)) - enc := expfmt.NewEncoder(c, contentType) - - var lastErr error - - // handleError handles the error according to opts.ErrorHandling - // and returns true if we have to abort after the handling. - handleError := func(err error) bool { - if err == nil { - return false - } - lastErr = err - if logger != nil { - logger.Error("encoding and sending metric family", zap.Error(err)) - } - errCnt.WithLabelValues("encoding").Inc() - response.Error(c, err.Error(), fasthttp.StatusServiceUnavailable) - return true - } - - for _, mf := range mfs { - if handleError(enc.Encode(mf)) { - return - } - } - if closer, ok := enc.(expfmt.Closer); ok { - // This in particular takes care of the final "# EOF\n" line for OpenMetrics. - if handleError(closer.Close()) { - return - } - } - - handleError(lastErr) - }) - - return h -} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..480cd81 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,68 @@ +package metrics + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" +) + +const ( + namespace = "neofs_http_gw" + stateSubsystem = "state" +) + +type GateMetrics struct { + stateMetrics +} + +type stateMetrics struct { + healthCheck prometheus.Gauge +} + +// NewGateMetrics creates new metrics for http gate. +func NewGateMetrics() *GateMetrics { + stateMetric := newStateMetrics() + stateMetric.register() + + return &GateMetrics{ + stateMetrics: *stateMetric, + } +} + +func newStateMetrics() *stateMetrics { + return &stateMetrics{ + healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: stateSubsystem, + Name: "health", + Help: "Current HTTP gateway state", + }), + } +} + +func (m stateMetrics) register() { + prometheus.MustRegister(m.healthCheck) +} + +func (m stateMetrics) SetHealth(s int32) { + m.healthCheck.Set(float64(s)) +} + +// NewPrometheusService creates a new service for gathering prometheus metrics. +func NewPrometheusService(log *zap.Logger, cfg Config) *Service { + if log == nil { + return nil + } + + return &Service{ + Server: &http.Server{ + Addr: cfg.Address, + Handler: promhttp.Handler(), + }, + enabled: cfg.Enabled, + serviceType: "Prometheus", + log: log.With(zap.String("service", "Prometheus")), + } +} diff --git a/metrics/pprof.go b/metrics/pprof.go new file mode 100644 index 0000000..4719a69 --- /dev/null +++ b/metrics/pprof.go @@ -0,0 +1,33 @@ +package metrics + +import ( + "net/http" + "net/http/pprof" + + "go.uber.org/zap" +) + +// NewPprofService creates a new service for gathering pprof metrics. +func NewPprofService(l *zap.Logger, cfg Config) *Service { + handler := http.NewServeMux() + handler.HandleFunc("/debug/pprof/", pprof.Index) + handler.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + handler.HandleFunc("/debug/pprof/profile", pprof.Profile) + handler.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + handler.HandleFunc("/debug/pprof/trace", pprof.Trace) + + // Manually add support for paths linked to by index page at /debug/pprof/ + for _, item := range []string{"allocs", "block", "heap", "goroutine", "mutex", "threadcreate"} { + handler.Handle("/debug/pprof/"+item, pprof.Handler(item)) + } + + return &Service{ + Server: &http.Server{ + Addr: cfg.Address, + Handler: handler, + }, + enabled: cfg.Enabled, + serviceType: "Pprof", + log: l.With(zap.String("service", "Pprof")), + } +} diff --git a/metrics/service.go b/metrics/service.go new file mode 100644 index 0000000..7cb3ca2 --- /dev/null +++ b/metrics/service.go @@ -0,0 +1,44 @@ +package metrics + +import ( + "context" + "net/http" + + "go.uber.org/zap" +) + +// Service serves metrics. +type Service struct { + *http.Server + enabled bool + log *zap.Logger + serviceType string +} + +// Config is a params to configure service. +type Config struct { + Address string + Enabled bool +} + +// Start runs http service with the exposed endpoint on the configured port. +func (ms *Service) Start() { + if ms.enabled { + ms.log.Info("service is running", zap.String("endpoint", ms.Addr)) + err := ms.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + ms.log.Warn("service couldn't start on configured port") + } + } else { + ms.log.Info("service hasn't started since it's disabled") + } +} + +// ShutDown stops the service. +func (ms *Service) ShutDown(ctx context.Context) { + ms.log.Info("shutting down service", zap.String("endpoint", ms.Addr)) + err := ms.Shutdown(ctx) + if err != nil { + ms.log.Panic("can't shut down service") + } +} diff --git a/pprof.go b/pprof.go deleted file mode 100644 index 8831314..0000000 --- a/pprof.go +++ /dev/null @@ -1,44 +0,0 @@ -package main - -import ( - "net/http/pprof" - rtp "runtime/pprof" - - "github.com/fasthttp/router" - "github.com/nspcc-dev/neofs-http-gw/response" - "github.com/valyala/fasthttp" - "github.com/valyala/fasthttp/fasthttpadaptor" -) - -func attachProfiler(r *router.Router) { - r.GET("/debug/pprof/", pprofHandler()) - r.GET("/debug/pprof/{name}/", pprofHandler()) -} - -func pprofHandler() fasthttp.RequestHandler { - items := rtp.Profiles() - - profiles := map[string]fasthttp.RequestHandler{ - "": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Index), - "cmdline": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Cmdline), - "profile": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Profile), - "symbol": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Symbol), - "trace": fasthttpadaptor.NewFastHTTPHandlerFunc(pprof.Trace), - } - - for i := range items { - name := items[i].Name() - profiles[name] = fasthttpadaptor.NewFastHTTPHandler(pprof.Handler(name)) - } - - return func(ctx *fasthttp.RequestCtx) { - name, _ := ctx.UserValue("name").(string) - - if handler, ok := profiles[name]; ok { - handler(ctx) - return - } - - response.Error(ctx, "Not found", fasthttp.StatusNotFound) - } -} diff --git a/settings.go b/settings.go index 1b0c8b9..e99f701 100644 --- a/settings.go +++ b/settings.go @@ -20,6 +20,8 @@ const ( defaultRequestTimeout = 15 * time.Second defaultConnectTimeout = 10 * time.Second + defaultShutdownTimeout = 15 * time.Second + cfgListenAddress = "listen_address" cfgTLSCertificate = "tls_certificate" cfgTLSKey = "tls_key" @@ -32,6 +34,12 @@ const ( cfgWebStreamRequestBody = "web.stream_request_body" cfgWebMaxRequestBodySize = "web.max_request_body_size" + // Metrics / Profiler. + cfgPrometheusEnabled = "prometheus.enabled" + cfgPrometheusAddress = "prometheus.address" + cfgPprofEnabled = "pprof.enabled" + cfgPprofAddress = "pprof.address" + // Timeouts. cfgConTimeout = "connect_timeout" cfgReqTimeout = "request_timeout" @@ -128,6 +136,18 @@ func settings() *viper.Viper { // zip: v.SetDefault(cfgZipCompression, false) + // metrics + v.SetDefault(cfgPprofAddress, "localhost:8083") + v.SetDefault(cfgPrometheusAddress, "localhost:8084") + + // Binding flags + if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil { + panic(err) + } + if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil { + panic(err) + } + if err := v.BindPFlags(flags); err != nil { panic(err) }