[#493] Refactor serving of prometheus and pprof services

Rename `util/profiler` package to `httputil` and refactor it:

  * simplify utility HTTP server;

  * make more generic server's parameters in order to remove `viper.Viper`
    dependency;

  * use single constructor for creating the pprof and prometheus servers;

  * replace `enabled` config value with empty-check of the network address.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
Leonard Lyubich 2021-05-12 01:59:05 +03:00 committed by Alex Vanin
parent 6339f1a468
commit 8d17dab86e
15 changed files with 294 additions and 304 deletions

View file

@ -32,13 +32,9 @@ func newConfig(path string) (*viper.Viper, error) {
func defaultConfiguration(cfg *viper.Viper) {
cfg.SetDefault("logger.level", "info")
cfg.SetDefault("pprof.enabled", false)
cfg.SetDefault("pprof.address", ":6060")
cfg.SetDefault("pprof.shutdown_ttl", "30s")
cfg.SetDefault("profiler.shutdown_timeout", "30s")
cfg.SetDefault("metrics.enabled", false)
cfg.SetDefault("metrics.address", ":9090")
cfg.SetDefault("metrics.shutdown_ttl", "30s")
cfg.SetDefault("metrics.shutdown_timeout", "30s")
cfg.SetDefault("without_mainnet", false)
cfg.SetDefault("without_notary", false)

View file

@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"net/http"
"os"
"strings"
@ -10,8 +11,9 @@ import (
"github.com/nspcc-dev/neofs-node/misc"
"github.com/nspcc-dev/neofs-node/pkg/innerring"
"github.com/nspcc-dev/neofs-node/pkg/util/grace"
httputil "github.com/nspcc-dev/neofs-node/pkg/util/http"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/nspcc-dev/neofs-node/pkg/util/profiler"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
@ -57,8 +59,33 @@ func main() {
ctx := grace.NewGracefulContext(log)
intErr := make(chan error) // internal inner ring errors
pprof := profiler.NewProfiler(log, cfg)
prometheus := profiler.NewMetrics(log, cfg)
var httpServers []*httputil.Server
for _, item := range []struct {
cfgPrefix string
handler func() http.Handler
}{
{"profiler", httputil.Handler},
{"metrics", promhttp.Handler},
} {
addr := cfg.GetString(item.cfgPrefix + ".address")
if addr == "" {
continue
}
var prm httputil.Prm
prm.Address = addr
prm.Handler = item.handler()
httpServers = append(httpServers,
httputil.New(prm,
httputil.WithShutdownTimeout(
cfg.GetDuration(item.cfgPrefix+".shutdown_timeout"),
),
),
)
}
innerRing, err := innerring.New(ctx, log, cfg)
if err != nil {
@ -75,16 +102,12 @@ func main() {
return
}
// start pprof if enabled
if pprof != nil {
pprof.Start(ctx)
defer pprof.Stop()
}
// start prometheus if enabled
if prometheus != nil {
prometheus.Start(ctx)
defer prometheus.Stop()
// start HTTP servers
for i := range httpServers {
srv := httpServers[i]
go func() {
exitErr(srv.Serve())
}()
}
// start inner ring
@ -108,6 +131,20 @@ func main() {
innerRing.Stop()
// shut down HTTP servers
for i := range httpServers {
srv := httpServers[i]
go func() {
err := srv.Shutdown()
if err != nil {
log.Debug("could not shutdown HTTP server",
zap.String("error", err.Error()),
)
}
}()
}
log.Info("application stopped")
}

View file

@ -40,7 +40,6 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/util/response"
util2 "github.com/nspcc-dev/neofs-node/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/util/logger"
"github.com/nspcc-dev/neofs-node/pkg/util/profiler"
"github.com/panjf2000/ants/v2"
"github.com/pkg/errors"
"github.com/spf13/viper"
@ -55,13 +54,12 @@ const (
cfgLogLevel = "logger.level"
// pprof keys
cfgProfilerEnable = "pprof.enabled"
cfgProfilerAddr = "pprof.address"
cfgProfilerTTL = "pprof.shutdown_ttl"
cfgProfilerAddr = "profiler.address"
cfgProfilerShutdownTimeout = "profiler.shutdown_timeout"
// metrics keys
cfgMetricsEnable = "metrics.enabled"
cfgMetricsAddr = "metrics.address"
cfgMetricsShutdownTimeout = "metrics.shutdown_timeout"
// config keys for cfgNodeInfo
cfgNodeKey = "node.key"
@ -203,10 +201,6 @@ type cfg struct {
cfgObject cfgObject
profiler profiler.Profiler
metricsServer profiler.Metrics
metricsCollector *metrics.StorageMetrics
workers []worker
@ -438,7 +432,7 @@ func initCfg(path string) *cfg {
},
}
if viperCfg.GetBool(cfgMetricsEnable) {
if c.viper.GetString(cfgMetricsAddr) != "" {
c.metricsCollector = metrics.NewStorageMetrics()
}
@ -496,12 +490,9 @@ func defaultConfiguration(v *viper.Viper) {
v.SetDefault(cfgLogLevel, "info")
v.SetDefault(cfgProfilerEnable, false)
v.SetDefault(cfgProfilerAddr, ":6060")
v.SetDefault(cfgProfilerTTL, "30s")
v.SetDefault(cfgProfilerShutdownTimeout, "30s")
v.SetDefault(cfgMetricsEnable, false)
v.SetDefault(cfgMetricsAddr, ":9090")
v.SetDefault(cfgMetricsShutdownTimeout, "30s")
v.SetDefault(cfgGCQueueSize, 1000)
v.SetDefault(cfgGCQueueTick, "5s")

View file

@ -60,12 +60,10 @@ func initApp(c *cfg) {
}
func bootUp(c *cfg) {
serveProfiler(c)
serveGRPC(c)
bootstrapNode(c)
startWorkers(c)
startBlockTimers(c)
serveMetrics(c)
}
func wait(c *cfg) {

View file

@ -1,17 +1,40 @@
package main
import (
"github.com/nspcc-dev/neofs-node/pkg/util/profiler"
"context"
httputil "github.com/nspcc-dev/neofs-node/pkg/util/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
func initMetrics(c *cfg) {
if c.metricsCollector != nil {
c.metricsServer = profiler.NewMetrics(c.log, c.viper)
}
addr := c.viper.GetString(cfgMetricsAddr)
if addr == "" {
return
}
func serveMetrics(c *cfg) {
if c.metricsServer != nil {
c.metricsServer.Start(c.ctx)
var prm httputil.Prm
prm.Address = addr
prm.Handler = promhttp.Handler()
srv := httputil.New(prm,
httputil.WithShutdownTimeout(
c.viper.GetDuration(cfgMetricsShutdownTimeout),
),
)
c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) {
fatalOnErr(srv.Serve())
}))
c.closers = append(c.closers, func() {
err := srv.Shutdown()
if err != nil {
c.log.Debug("could not shutdown metrics server",
zap.String("error", err.Error()),
)
}
})
}

View file

@ -1,15 +1,39 @@
package main
import (
"github.com/nspcc-dev/neofs-node/pkg/util/profiler"
"context"
httputil "github.com/nspcc-dev/neofs-node/pkg/util/http"
"go.uber.org/zap"
)
func initProfiler(c *cfg) {
c.profiler = profiler.NewProfiler(c.log, c.viper)
addr := c.viper.GetString(cfgProfilerAddr)
if addr == "" {
return
}
func serveProfiler(c *cfg) {
if c.profiler != nil {
c.profiler.Start(c.ctx)
var prm httputil.Prm
prm.Address = addr
prm.Handler = httputil.Handler()
srv := httputil.New(prm,
httputil.WithShutdownTimeout(
c.viper.GetDuration(cfgProfilerShutdownTimeout),
),
)
c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) {
fatalOnErr(srv.Serve())
}))
c.closers = append(c.closers, func() {
err := srv.Shutdown()
if err != nil {
c.log.Debug("could not shutdown pprof server",
zap.String("error", err.Error()),
)
}
})
}

BIN
go.sum

Binary file not shown.

44
pkg/util/http/calls.go Normal file
View file

@ -0,0 +1,44 @@
package httputil
import (
"context"
"net/http"
"github.com/pkg/errors"
)
// Serve listens and serves internal HTTP server.
//
// Returns any error returned by internal server
// except http.ErrServerClosed.
//
// After Shutdown call, Serve has no effect and
// returned error is always nil.
func (x *Server) Serve() error {
err := x.srv.ListenAndServe()
// http.ErrServerClosed is returned on server shutdown
// so we ignore this error.
if err != nil && errors.Is(err, http.ErrServerClosed) {
err = nil
}
return err
}
// Shutdown gracefully shuts down internal HTTP server.
//
// Shutdown is called with context which expires after
// configured timeout.
//
// Once Shutdown has been called on a server, it may not be reused;
// future calls to Serve method will have no effect.
func (x *Server) Shutdown() error {
ctx, cancel := context.WithTimeout(context.Background(), x.shutdownTimeout)
err := x.srv.Shutdown(ctx)
cancel()
return err
}

26
pkg/util/http/opts.go Normal file
View file

@ -0,0 +1,26 @@
package httputil
import (
"time"
)
// Option sets an optional parameter of Server.
type Option func(*cfg)
type cfg struct {
shutdownTimeout time.Duration
}
func defaultCfg() *cfg {
return &cfg{
shutdownTimeout: 15 * time.Second,
}
}
// WithShutdownTimeout returns option to set shutdown timeout
// of the internal HTTP server.
func WithShutdownTimeout(dur time.Duration) Option {
return func(c *cfg) {
c.shutdownTimeout = dur
}
}

16
pkg/util/http/pprof.go Normal file
View file

@ -0,0 +1,16 @@
package httputil
import (
"net/http"
"net/http/pprof"
)
// initializes pprof package in order to
// register Prometheus handlers on http.DefaultServeMux.
var _ = pprof.Handler("")
// Handler returns http.Handler for the
// Prometheus metrics collector.
func Handler() http.Handler {
return http.DefaultServeMux
}

87
pkg/util/http/server.go Normal file
View file

@ -0,0 +1,87 @@
package httputil
import (
"fmt"
"net/http"
"time"
)
// Prm groups the required parameters of the Server's constructor.
//
// All values must comply with the requirements imposed on them.
// Passing incorrect parameter values will result in constructor
// failure (error or panic depending on the implementation).
type Prm struct {
// TCP address for the server to listen on.
//
// Must be a valid TCP address.
Address string
// Must not be nil.
Handler http.Handler
}
// Server represents a wrapper over http.Server
// that provides interface to start and stop
// listening routine.
//
// For correct operation, Server must be created
// using the constructor (New) based on the required parameters
// and optional components. After successful creation,
// Server is immediately ready to work through API.
type Server struct {
shutdownTimeout time.Duration
srv *http.Server
}
const invalidValFmt = "invalid %s %s (%T): %v"
func panicOnPrmValue(n string, v interface{}) {
panicOnValue("parameter", n, v)
}
func panicOnOptValue(n string, v interface{}) {
panicOnValue("option", n, v)
}
func panicOnValue(t, n string, v interface{}) {
panic(fmt.Sprintf(invalidValFmt, t, n, v, v))
}
// New creates a new instance of the Server.
//
// Panics if at least one value of the parameters is invalid.
//
// Panics if at least one of next optinal parameters is invalid:
// * shutdown timeout is non-positive.
//
// The created Server does not require additional
// initialization and is completely ready for work.
func New(prm Prm, opts ...Option) *Server {
switch {
case prm.Address == "":
panicOnPrmValue("Address", prm.Address)
case prm.Handler == nil:
panicOnPrmValue("Handler", prm.Handler)
}
c := defaultCfg()
for _, o := range opts {
o(c)
}
switch {
case c.shutdownTimeout <= 0:
panicOnOptValue("shutdown timeout", c.shutdownTimeout)
}
return &Server{
shutdownTimeout: c.shutdownTimeout,
srv: &http.Server{
Addr: prm.Address,
Handler: prm.Handler,
},
}
}

View file

@ -1,114 +0,0 @@
package profiler
import (
"context"
"net/http"
"sync/atomic"
"time"
"github.com/spf13/viper"
"go.uber.org/zap"
)
type (
httpParams struct {
Key string
Viper *viper.Viper
Logger *zap.Logger
Handler http.Handler
}
httpServer struct {
name string
started *int32
logger *zap.Logger
shutdownTTL time.Duration
server server
}
)
func (h *httpServer) Start(ctx context.Context) {
if h == nil {
return
}
if !atomic.CompareAndSwapInt32(h.started, 0, 1) {
h.logger.Info("http: already started",
zap.String("server", h.name))
return
}
go func() {
if err := h.server.serve(ctx); err != nil {
if err != http.ErrServerClosed {
h.logger.Error("http: could not start server",
zap.Error(err))
}
}
}()
}
func (h *httpServer) Stop() {
if h == nil {
return
}
if !atomic.CompareAndSwapInt32(h.started, 1, 0) {
h.logger.Info("http: already stopped",
zap.String("server", h.name))
return
}
ctx, cancel := context.WithTimeout(context.Background(), h.shutdownTTL)
defer cancel()
h.logger.Debug("http: try to stop server",
zap.String("server", h.name))
if err := h.server.shutdown(ctx); err != nil {
h.logger.Error("http: could not stop server",
zap.Error(err))
}
}
const defaultShutdownTTL = 30 * time.Second
func newHTTPServer(p httpParams) *httpServer {
var (
address string
shutdown time.Duration
)
if address = p.Viper.GetString(p.Key + ".address"); address == "" {
p.Logger.Info("Empty bind address, skip",
zap.String("server", p.Key))
return nil
}
if p.Handler == nil {
p.Logger.Info("Empty handler, skip",
zap.String("server", p.Key))
return nil
}
p.Logger.Info("Create http.Server",
zap.String("server", p.Key),
zap.String("address", address))
if shutdown = p.Viper.GetDuration(p.Key + ".shutdown_ttl"); shutdown <= 0 {
shutdown = defaultShutdownTTL
}
return &httpServer{
name: p.Key,
started: new(int32),
logger: p.Logger,
shutdownTTL: shutdown,
server: newServer(params{
Address: address,
Name: p.Key,
Config: p.Viper,
Logger: p.Logger,
Handler: p.Handler,
}),
}
}

View file

@ -1,32 +0,0 @@
package profiler
import (
"context"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/viper"
"go.uber.org/zap"
)
// Metrics is an interface of metric tool.
type Metrics interface {
Start(ctx context.Context)
Stop()
}
const metricsKey = "metrics"
// NewMetrics is a metric tool's constructor.
func NewMetrics(l *zap.Logger, v *viper.Viper) Metrics {
if !v.GetBool(metricsKey + ".enabled") {
l.Debug("metrics server disabled")
return nil
}
return newHTTPServer(httpParams{
Key: metricsKey,
Viper: v,
Logger: l,
Handler: promhttp.Handler(),
})
}

View file

@ -1,44 +0,0 @@
package profiler
import (
"context"
"expvar"
"net/http"
"net/http/pprof"
"github.com/spf13/viper"
"go.uber.org/zap"
)
// Profiler is an interface of profiler.
type Profiler interface {
Start(ctx context.Context)
Stop()
}
const profilerKey = "pprof"
// NewProfiler is a profiler's constructor.
func NewProfiler(l *zap.Logger, v *viper.Viper) Profiler {
if !v.GetBool(profilerKey + ".enabled") {
l.Debug("pprof server disabled")
return nil
}
mux := http.NewServeMux()
mux.Handle("/debug/vars", expvar.Handler())
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
return newHTTPServer(httpParams{
Key: profilerKey,
Viper: v,
Logger: l,
Handler: mux,
})
}

View file

@ -1,62 +0,0 @@
package profiler
import (
"context"
"net/http"
"github.com/spf13/viper"
"go.uber.org/zap"
)
type (
// Server is an interface of server.
server interface {
serve(ctx context.Context) error
shutdown(ctx context.Context) error
}
contextServer struct {
logger *zap.Logger
server *http.Server
}
params struct {
Address string
Name string
Config *viper.Viper
Logger *zap.Logger
Handler http.Handler
}
)
func newServer(p params) server {
return &contextServer{
logger: p.Logger,
server: &http.Server{
Addr: p.Address,
Handler: p.Handler,
ReadTimeout: p.Config.GetDuration(p.Name + ".read_timeout"),
ReadHeaderTimeout: p.Config.GetDuration(p.Name + ".read_header_timeout"),
WriteTimeout: p.Config.GetDuration(p.Name + ".write_timeout"),
IdleTimeout: p.Config.GetDuration(p.Name + ".idle_timeout"),
MaxHeaderBytes: p.Config.GetInt(p.Name + ".max_header_bytes"),
},
}
}
func (cs *contextServer) serve(ctx context.Context) error {
go func() {
<-ctx.Done()
if err := cs.server.Close(); err != nil {
cs.logger.Info("something went wrong",
zap.Error(err))
}
}()
return cs.server.ListenAndServe()
}
func (cs *contextServer) shutdown(ctx context.Context) error {
return cs.server.Shutdown(ctx)
}