From 8d17dab86e9fea7121c9bb04d2ef148077a92a8d Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 12 May 2021 01:59:05 +0300 Subject: [PATCH] [#493] Refactor serving of prometheus and pprof services Rename `util/profiler` package to `httputil` and refactor it: * simplify utility HTTP server; * make more generic server's parameters in order to remove `viper.Viper` dependency; * use single constructor for creating the pprof and prometheus servers; * replace `enabled` config value with empty-check of the network address. Signed-off-by: Leonard Lyubich --- cmd/neofs-ir/defaults.go | 8 +-- cmd/neofs-ir/main.go | 63 +++++++++++++++---- cmd/neofs-node/config.go | 23 +++---- cmd/neofs-node/main.go | 2 - cmd/neofs-node/metrics.go | 39 +++++++++--- cmd/neofs-node/pprof.go | 38 +++++++++--- go.sum | Bin 60131 -> 60225 bytes pkg/util/http/calls.go | 44 ++++++++++++++ pkg/util/http/opts.go | 26 ++++++++ pkg/util/http/pprof.go | 16 +++++ pkg/util/http/server.go | 87 ++++++++++++++++++++++++++ pkg/util/profiler/http.go | 114 ----------------------------------- pkg/util/profiler/metrics.go | 32 ---------- pkg/util/profiler/pprof.go | 44 -------------- pkg/util/profiler/server.go | 62 ------------------- 15 files changed, 294 insertions(+), 304 deletions(-) create mode 100644 pkg/util/http/calls.go create mode 100644 pkg/util/http/opts.go create mode 100644 pkg/util/http/pprof.go create mode 100644 pkg/util/http/server.go delete mode 100644 pkg/util/profiler/http.go delete mode 100644 pkg/util/profiler/metrics.go delete mode 100644 pkg/util/profiler/pprof.go delete mode 100644 pkg/util/profiler/server.go diff --git a/cmd/neofs-ir/defaults.go b/cmd/neofs-ir/defaults.go index 3c1c247b..5fc79c73 100644 --- a/cmd/neofs-ir/defaults.go +++ b/cmd/neofs-ir/defaults.go @@ -32,13 +32,9 @@ func newConfig(path string) (*viper.Viper, error) { func defaultConfiguration(cfg *viper.Viper) { cfg.SetDefault("logger.level", "info") - cfg.SetDefault("pprof.enabled", false) - cfg.SetDefault("pprof.address", ":6060") - cfg.SetDefault("pprof.shutdown_ttl", "30s") + cfg.SetDefault("profiler.shutdown_timeout", "30s") - cfg.SetDefault("metrics.enabled", false) - cfg.SetDefault("metrics.address", ":9090") - cfg.SetDefault("metrics.shutdown_ttl", "30s") + cfg.SetDefault("metrics.shutdown_timeout", "30s") cfg.SetDefault("without_mainnet", false) cfg.SetDefault("without_notary", false) diff --git a/cmd/neofs-ir/main.go b/cmd/neofs-ir/main.go index 47e15c62..62de3e0c 100644 --- a/cmd/neofs-ir/main.go +++ b/cmd/neofs-ir/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "net/http" "os" "strings" @@ -10,8 +11,9 @@ import ( "github.com/nspcc-dev/neofs-node/misc" "github.com/nspcc-dev/neofs-node/pkg/innerring" "github.com/nspcc-dev/neofs-node/pkg/util/grace" + httputil "github.com/nspcc-dev/neofs-node/pkg/util/http" "github.com/nspcc-dev/neofs-node/pkg/util/logger" - "github.com/nspcc-dev/neofs-node/pkg/util/profiler" + "github.com/prometheus/client_golang/prometheus/promhttp" "go.uber.org/zap" ) @@ -57,8 +59,33 @@ func main() { ctx := grace.NewGracefulContext(log) intErr := make(chan error) // internal inner ring errors - pprof := profiler.NewProfiler(log, cfg) - prometheus := profiler.NewMetrics(log, cfg) + var httpServers []*httputil.Server + + for _, item := range []struct { + cfgPrefix string + handler func() http.Handler + }{ + {"profiler", httputil.Handler}, + {"metrics", promhttp.Handler}, + } { + addr := cfg.GetString(item.cfgPrefix + ".address") + if addr == "" { + continue + } + + var prm httputil.Prm + + prm.Address = addr + prm.Handler = item.handler() + + httpServers = append(httpServers, + httputil.New(prm, + httputil.WithShutdownTimeout( + cfg.GetDuration(item.cfgPrefix+".shutdown_timeout"), + ), + ), + ) + } innerRing, err := innerring.New(ctx, log, cfg) if err != nil { @@ -75,16 +102,12 @@ func main() { return } - // start pprof if enabled - if pprof != nil { - pprof.Start(ctx) - defer pprof.Stop() - } - - // start prometheus if enabled - if prometheus != nil { - prometheus.Start(ctx) - defer prometheus.Stop() + // start HTTP servers + for i := range httpServers { + srv := httpServers[i] + go func() { + exitErr(srv.Serve()) + }() } // start inner ring @@ -108,6 +131,20 @@ func main() { innerRing.Stop() + // shut down HTTP servers + for i := range httpServers { + srv := httpServers[i] + + go func() { + err := srv.Shutdown() + if err != nil { + log.Debug("could not shutdown HTTP server", + zap.String("error", err.Error()), + ) + } + }() + } + log.Info("application stopped") } diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index a34c5bee..a46e2f1b 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -40,7 +40,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/services/util/response" util2 "github.com/nspcc-dev/neofs-node/pkg/util" "github.com/nspcc-dev/neofs-node/pkg/util/logger" - "github.com/nspcc-dev/neofs-node/pkg/util/profiler" "github.com/panjf2000/ants/v2" "github.com/pkg/errors" "github.com/spf13/viper" @@ -55,13 +54,12 @@ const ( cfgLogLevel = "logger.level" // pprof keys - cfgProfilerEnable = "pprof.enabled" - cfgProfilerAddr = "pprof.address" - cfgProfilerTTL = "pprof.shutdown_ttl" + cfgProfilerAddr = "profiler.address" + cfgProfilerShutdownTimeout = "profiler.shutdown_timeout" // metrics keys - cfgMetricsEnable = "metrics.enabled" - cfgMetricsAddr = "metrics.address" + cfgMetricsAddr = "metrics.address" + cfgMetricsShutdownTimeout = "metrics.shutdown_timeout" // config keys for cfgNodeInfo cfgNodeKey = "node.key" @@ -203,10 +201,6 @@ type cfg struct { cfgObject cfgObject - profiler profiler.Profiler - - metricsServer profiler.Metrics - metricsCollector *metrics.StorageMetrics workers []worker @@ -438,7 +432,7 @@ func initCfg(path string) *cfg { }, } - if viperCfg.GetBool(cfgMetricsEnable) { + if c.viper.GetString(cfgMetricsAddr) != "" { c.metricsCollector = metrics.NewStorageMetrics() } @@ -496,12 +490,9 @@ func defaultConfiguration(v *viper.Viper) { v.SetDefault(cfgLogLevel, "info") - v.SetDefault(cfgProfilerEnable, false) - v.SetDefault(cfgProfilerAddr, ":6060") - v.SetDefault(cfgProfilerTTL, "30s") + v.SetDefault(cfgProfilerShutdownTimeout, "30s") - v.SetDefault(cfgMetricsEnable, false) - v.SetDefault(cfgMetricsAddr, ":9090") + v.SetDefault(cfgMetricsShutdownTimeout, "30s") v.SetDefault(cfgGCQueueSize, 1000) v.SetDefault(cfgGCQueueTick, "5s") diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index ba2dcbae..a277d2e8 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -60,12 +60,10 @@ func initApp(c *cfg) { } func bootUp(c *cfg) { - serveProfiler(c) serveGRPC(c) bootstrapNode(c) startWorkers(c) startBlockTimers(c) - serveMetrics(c) } func wait(c *cfg) { diff --git a/cmd/neofs-node/metrics.go b/cmd/neofs-node/metrics.go index b0056b4f..3b83fcf6 100644 --- a/cmd/neofs-node/metrics.go +++ b/cmd/neofs-node/metrics.go @@ -1,17 +1,40 @@ package main import ( - "github.com/nspcc-dev/neofs-node/pkg/util/profiler" + "context" + + httputil "github.com/nspcc-dev/neofs-node/pkg/util/http" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" ) func initMetrics(c *cfg) { - if c.metricsCollector != nil { - c.metricsServer = profiler.NewMetrics(c.log, c.viper) + addr := c.viper.GetString(cfgMetricsAddr) + if addr == "" { + return } -} -func serveMetrics(c *cfg) { - if c.metricsServer != nil { - c.metricsServer.Start(c.ctx) - } + var prm httputil.Prm + + prm.Address = addr + prm.Handler = promhttp.Handler() + + srv := httputil.New(prm, + httputil.WithShutdownTimeout( + c.viper.GetDuration(cfgMetricsShutdownTimeout), + ), + ) + + c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) { + fatalOnErr(srv.Serve()) + })) + + c.closers = append(c.closers, func() { + err := srv.Shutdown() + if err != nil { + c.log.Debug("could not shutdown metrics server", + zap.String("error", err.Error()), + ) + } + }) } diff --git a/cmd/neofs-node/pprof.go b/cmd/neofs-node/pprof.go index ba4b4950..9135484b 100644 --- a/cmd/neofs-node/pprof.go +++ b/cmd/neofs-node/pprof.go @@ -1,15 +1,39 @@ package main import ( - "github.com/nspcc-dev/neofs-node/pkg/util/profiler" + "context" + + httputil "github.com/nspcc-dev/neofs-node/pkg/util/http" + "go.uber.org/zap" ) func initProfiler(c *cfg) { - c.profiler = profiler.NewProfiler(c.log, c.viper) -} - -func serveProfiler(c *cfg) { - if c.profiler != nil { - c.profiler.Start(c.ctx) + addr := c.viper.GetString(cfgProfilerAddr) + if addr == "" { + return } + + var prm httputil.Prm + + prm.Address = addr + prm.Handler = httputil.Handler() + + srv := httputil.New(prm, + httputil.WithShutdownTimeout( + c.viper.GetDuration(cfgProfilerShutdownTimeout), + ), + ) + + c.workers = append(c.workers, newWorkerFromFunc(func(context.Context) { + fatalOnErr(srv.Serve()) + })) + + c.closers = append(c.closers, func() { + err := srv.Shutdown() + if err != nil { + c.log.Debug("could not shutdown pprof server", + zap.String("error", err.Error()), + ) + } + }) } diff --git a/go.sum b/go.sum index 79877c9a4942dc6b15b0a8daf0b989c76d9f4cc6..bfd4228b475854cf949e7dd5d3e46765c6c4916e 100644 GIT binary patch delta 66 zcmaESmHFT`<_&x{`U)9_R{G8Xrs)=?`e9xknOW(nDbD&nsp)yfPL4^AiQxh1smbLL WPM$7R<)(T0lOI}%Z2oNhVI2Szdl>-$ delta 18 acmX?jjrs9a<_&x{ll@A$H<#O-S_c4Aw+N2_ diff --git a/pkg/util/http/calls.go b/pkg/util/http/calls.go new file mode 100644 index 00000000..acb5ce7d --- /dev/null +++ b/pkg/util/http/calls.go @@ -0,0 +1,44 @@ +package httputil + +import ( + "context" + "net/http" + + "github.com/pkg/errors" +) + +// Serve listens and serves internal HTTP server. +// +// Returns any error returned by internal server +// except http.ErrServerClosed. +// +// After Shutdown call, Serve has no effect and +// returned error is always nil. +func (x *Server) Serve() error { + err := x.srv.ListenAndServe() + + // http.ErrServerClosed is returned on server shutdown + // so we ignore this error. + if err != nil && errors.Is(err, http.ErrServerClosed) { + err = nil + } + + return err +} + +// Shutdown gracefully shuts down internal HTTP server. +// +// Shutdown is called with context which expires after +// configured timeout. +// +// Once Shutdown has been called on a server, it may not be reused; +// future calls to Serve method will have no effect. +func (x *Server) Shutdown() error { + ctx, cancel := context.WithTimeout(context.Background(), x.shutdownTimeout) + + err := x.srv.Shutdown(ctx) + + cancel() + + return err +} diff --git a/pkg/util/http/opts.go b/pkg/util/http/opts.go new file mode 100644 index 00000000..83209373 --- /dev/null +++ b/pkg/util/http/opts.go @@ -0,0 +1,26 @@ +package httputil + +import ( + "time" +) + +// Option sets an optional parameter of Server. +type Option func(*cfg) + +type cfg struct { + shutdownTimeout time.Duration +} + +func defaultCfg() *cfg { + return &cfg{ + shutdownTimeout: 15 * time.Second, + } +} + +// WithShutdownTimeout returns option to set shutdown timeout +// of the internal HTTP server. +func WithShutdownTimeout(dur time.Duration) Option { + return func(c *cfg) { + c.shutdownTimeout = dur + } +} diff --git a/pkg/util/http/pprof.go b/pkg/util/http/pprof.go new file mode 100644 index 00000000..7a041300 --- /dev/null +++ b/pkg/util/http/pprof.go @@ -0,0 +1,16 @@ +package httputil + +import ( + "net/http" + "net/http/pprof" +) + +// initializes pprof package in order to +// register Prometheus handlers on http.DefaultServeMux. +var _ = pprof.Handler("") + +// Handler returns http.Handler for the +// Prometheus metrics collector. +func Handler() http.Handler { + return http.DefaultServeMux +} diff --git a/pkg/util/http/server.go b/pkg/util/http/server.go new file mode 100644 index 00000000..797f8a5c --- /dev/null +++ b/pkg/util/http/server.go @@ -0,0 +1,87 @@ +package httputil + +import ( + "fmt" + "net/http" + "time" +) + +// Prm groups the required parameters of the Server's constructor. +// +// All values must comply with the requirements imposed on them. +// Passing incorrect parameter values will result in constructor +// failure (error or panic depending on the implementation). +type Prm struct { + // TCP address for the server to listen on. + // + // Must be a valid TCP address. + Address string + + // Must not be nil. + Handler http.Handler +} + +// Server represents a wrapper over http.Server +// that provides interface to start and stop +// listening routine. +// +// For correct operation, Server must be created +// using the constructor (New) based on the required parameters +// and optional components. After successful creation, +// Server is immediately ready to work through API. +type Server struct { + shutdownTimeout time.Duration + + srv *http.Server +} + +const invalidValFmt = "invalid %s %s (%T): %v" + +func panicOnPrmValue(n string, v interface{}) { + panicOnValue("parameter", n, v) +} + +func panicOnOptValue(n string, v interface{}) { + panicOnValue("option", n, v) +} + +func panicOnValue(t, n string, v interface{}) { + panic(fmt.Sprintf(invalidValFmt, t, n, v, v)) +} + +// New creates a new instance of the Server. +// +// Panics if at least one value of the parameters is invalid. +// +// Panics if at least one of next optinal parameters is invalid: +// * shutdown timeout is non-positive. +// +// The created Server does not require additional +// initialization and is completely ready for work. +func New(prm Prm, opts ...Option) *Server { + switch { + case prm.Address == "": + panicOnPrmValue("Address", prm.Address) + case prm.Handler == nil: + panicOnPrmValue("Handler", prm.Handler) + } + + c := defaultCfg() + + for _, o := range opts { + o(c) + } + + switch { + case c.shutdownTimeout <= 0: + panicOnOptValue("shutdown timeout", c.shutdownTimeout) + } + + return &Server{ + shutdownTimeout: c.shutdownTimeout, + srv: &http.Server{ + Addr: prm.Address, + Handler: prm.Handler, + }, + } +} diff --git a/pkg/util/profiler/http.go b/pkg/util/profiler/http.go deleted file mode 100644 index 30264d66..00000000 --- a/pkg/util/profiler/http.go +++ /dev/null @@ -1,114 +0,0 @@ -package profiler - -import ( - "context" - "net/http" - "sync/atomic" - "time" - - "github.com/spf13/viper" - "go.uber.org/zap" -) - -type ( - httpParams struct { - Key string - Viper *viper.Viper - Logger *zap.Logger - Handler http.Handler - } - - httpServer struct { - name string - started *int32 - logger *zap.Logger - shutdownTTL time.Duration - server server - } -) - -func (h *httpServer) Start(ctx context.Context) { - if h == nil { - return - } - - if !atomic.CompareAndSwapInt32(h.started, 0, 1) { - h.logger.Info("http: already started", - zap.String("server", h.name)) - return - } - - go func() { - if err := h.server.serve(ctx); err != nil { - if err != http.ErrServerClosed { - h.logger.Error("http: could not start server", - zap.Error(err)) - } - } - }() -} - -func (h *httpServer) Stop() { - if h == nil { - return - } - - if !atomic.CompareAndSwapInt32(h.started, 1, 0) { - h.logger.Info("http: already stopped", - zap.String("server", h.name)) - return - } - - ctx, cancel := context.WithTimeout(context.Background(), h.shutdownTTL) - defer cancel() - - h.logger.Debug("http: try to stop server", - zap.String("server", h.name)) - - if err := h.server.shutdown(ctx); err != nil { - h.logger.Error("http: could not stop server", - zap.Error(err)) - } -} - -const defaultShutdownTTL = 30 * time.Second - -func newHTTPServer(p httpParams) *httpServer { - var ( - address string - shutdown time.Duration - ) - - if address = p.Viper.GetString(p.Key + ".address"); address == "" { - p.Logger.Info("Empty bind address, skip", - zap.String("server", p.Key)) - return nil - } - if p.Handler == nil { - p.Logger.Info("Empty handler, skip", - zap.String("server", p.Key)) - return nil - } - - p.Logger.Info("Create http.Server", - zap.String("server", p.Key), - zap.String("address", address)) - - if shutdown = p.Viper.GetDuration(p.Key + ".shutdown_ttl"); shutdown <= 0 { - shutdown = defaultShutdownTTL - } - - return &httpServer{ - name: p.Key, - started: new(int32), - logger: p.Logger, - shutdownTTL: shutdown, - server: newServer(params{ - Address: address, - Name: p.Key, - Config: p.Viper, - Logger: p.Logger, - Handler: p.Handler, - }), - } -} diff --git a/pkg/util/profiler/metrics.go b/pkg/util/profiler/metrics.go deleted file mode 100644 index e165d6dc..00000000 --- a/pkg/util/profiler/metrics.go +++ /dev/null @@ -1,32 +0,0 @@ -package profiler - -import ( - "context" - - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/spf13/viper" - "go.uber.org/zap" -) - -// Metrics is an interface of metric tool. -type Metrics interface { - Start(ctx context.Context) - Stop() -} - -const metricsKey = "metrics" - -// NewMetrics is a metric tool's constructor. -func NewMetrics(l *zap.Logger, v *viper.Viper) Metrics { - if !v.GetBool(metricsKey + ".enabled") { - l.Debug("metrics server disabled") - return nil - } - - return newHTTPServer(httpParams{ - Key: metricsKey, - Viper: v, - Logger: l, - Handler: promhttp.Handler(), - }) -} diff --git a/pkg/util/profiler/pprof.go b/pkg/util/profiler/pprof.go deleted file mode 100644 index e863a353..00000000 --- a/pkg/util/profiler/pprof.go +++ /dev/null @@ -1,44 +0,0 @@ -package profiler - -import ( - "context" - "expvar" - "net/http" - "net/http/pprof" - - "github.com/spf13/viper" - "go.uber.org/zap" -) - -// Profiler is an interface of profiler. -type Profiler interface { - Start(ctx context.Context) - Stop() -} - -const profilerKey = "pprof" - -// NewProfiler is a profiler's constructor. -func NewProfiler(l *zap.Logger, v *viper.Viper) Profiler { - if !v.GetBool(profilerKey + ".enabled") { - l.Debug("pprof server disabled") - return nil - } - - mux := http.NewServeMux() - - mux.Handle("/debug/vars", expvar.Handler()) - - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - - return newHTTPServer(httpParams{ - Key: profilerKey, - Viper: v, - Logger: l, - Handler: mux, - }) -} diff --git a/pkg/util/profiler/server.go b/pkg/util/profiler/server.go deleted file mode 100644 index ab5549a3..00000000 --- a/pkg/util/profiler/server.go +++ /dev/null @@ -1,62 +0,0 @@ -package profiler - -import ( - "context" - "net/http" - - "github.com/spf13/viper" - "go.uber.org/zap" -) - -type ( - // Server is an interface of server. - server interface { - serve(ctx context.Context) error - shutdown(ctx context.Context) error - } - - contextServer struct { - logger *zap.Logger - server *http.Server - } - - params struct { - Address string - Name string - Config *viper.Viper - Logger *zap.Logger - Handler http.Handler - } -) - -func newServer(p params) server { - return &contextServer{ - logger: p.Logger, - server: &http.Server{ - Addr: p.Address, - Handler: p.Handler, - ReadTimeout: p.Config.GetDuration(p.Name + ".read_timeout"), - ReadHeaderTimeout: p.Config.GetDuration(p.Name + ".read_header_timeout"), - WriteTimeout: p.Config.GetDuration(p.Name + ".write_timeout"), - IdleTimeout: p.Config.GetDuration(p.Name + ".idle_timeout"), - MaxHeaderBytes: p.Config.GetInt(p.Name + ".max_header_bytes"), - }, - } -} - -func (cs *contextServer) serve(ctx context.Context) error { - go func() { - <-ctx.Done() - - if err := cs.server.Close(); err != nil { - cs.logger.Info("something went wrong", - zap.Error(err)) - } - }() - - return cs.server.ListenAndServe() -} - -func (cs *contextServer) shutdown(ctx context.Context) error { - return cs.server.Shutdown(ctx) -}