diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index f1e5ec6..47cd22f 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -9,6 +9,7 @@ import ( "os" "os/signal" "strconv" + "sync" "syscall" "time" @@ -39,13 +40,14 @@ type ( obj layer.Client api api.Handler - metrics GateMetricsCollector + metrics *appMetrics maxClients api.MaxClients webDone chan struct{} wrkDone chan struct{} + services []*Service settings *appSettings } @@ -63,8 +65,16 @@ type ( CertFile string } + appMetrics struct { + logger *zap.Logger + provider GateMetricsCollector + mu sync.RWMutex + enabled bool + } + GateMetricsCollector interface { SetHealth(int32) + Unregister() } ) @@ -80,8 +90,6 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { obj layer.Client nc *notifications.Controller - gateMetrics GateMetricsCollector - prmPool pool.InitParameters reBalance = defaultRebalanceInterval @@ -211,11 +219,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { l.Fatal("could not initialize API handler", zap.Error(err)) } - if v.GetBool(cfgPrometheusEnabled) { - gateMetrics = newGateMetrics(neofs.NewPoolStatistic(conns)) - } - - return &App{ + app := &App{ ctr: ctr, log: l, cfg: v, @@ -223,13 +227,61 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { tls: tls, api: caller, - metrics: gateMetrics, - webDone: make(chan struct{}, 1), wrkDone: make(chan struct{}, 1), maxClients: api.NewMaxClientsMiddleware(maxClientsCount, maxClientsDeadline), } + + app.initMetrics(neofs.NewPoolStatistic(conns)) + + return app +} + +func (a *App) initMetrics(scraper StatisticScraper) { + gateMetricsProvider := newGateMetrics(scraper) + a.metrics = newAppMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled)) +} + +func newAppMetrics(logger *zap.Logger, provider GateMetricsCollector, enabled bool) *appMetrics { + if !enabled { + logger.Warn("metrics are disabled") + } + return &appMetrics{ + logger: logger, + provider: provider, + } +} + +func (m *appMetrics) SetEnabled(enabled bool) { + if !enabled { + m.logger.Warn("metrics are disabled") + } + + m.mu.Lock() + m.enabled = enabled + m.mu.Unlock() +} + +func (m *appMetrics) SetHealth(status int32) { + m.mu.RLock() + if !m.enabled { + m.mu.RUnlock() + return + } + m.mu.RUnlock() + + m.provider.SetHealth(status) +} + +func (m *appMetrics) Shutdown() { + m.mu.Lock() + if m.enabled { + m.provider.SetHealth(0) + m.enabled = false + } + m.provider.Unregister() + m.mu.Unlock() } func remove(list []string, element string) []string { @@ -252,15 +304,17 @@ func (a *App) Wait() { zap.String("version", version.Version), ) - if a.metrics != nil { - a.metrics.SetHealth(1) - } + a.setHealthStatus() <-a.webDone // wait for web-server to be stopped a.log.Info("application finished") } +func (a *App) setHealthStatus() { + a.metrics.SetHealth(1) +} + // Serve runs HTTP server to handle S3 API requests. func (a *App) Serve(ctx context.Context) { var ( @@ -276,9 +330,6 @@ func (a *App) Serve(ctx context.Context) { zap.Error(err)) } - pprof := NewPprofService(a.cfg, a.log) - prometheus := NewPrometheusService(a.cfg, a.log) - router := mux.NewRouter().SkipClean(true).UseEncodedPath() // Attach S3 API: domains := a.cfg.GetStringSlice(cfgListenDomains) @@ -290,8 +341,7 @@ func (a *App) Serve(ctx context.Context) { srv.Handler = router srv.ErrorLog = zap.NewStdLog(a.log) - go pprof.Start() - go prometheus.Start() + a.startServices() go func() { a.log.Info("starting server", @@ -328,17 +378,21 @@ LOOP: } } - ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) + ctx, cancel := shutdownContext() defer cancel() - a.log.Info("stopping server", - zap.Error(srv.Shutdown(ctx))) - pprof.ShutDown(ctx) - prometheus.ShutDown(ctx) + a.log.Info("stopping server", zap.Error(srv.Shutdown(ctx))) + + a.metrics.Shutdown() + a.stopServices() close(a.webDone) } +func shutdownContext() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), defaultShutdownTimeout) +} + func (a *App) configReload() { a.log.Info("SIGHUP config reload started") @@ -351,8 +405,14 @@ func (a *App) configReload() { return } + a.stopServices() + a.startServices() + a.updateSettings() + a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled)) + a.setHealthStatus() + a.log.Info("SIGHUP config reload completed") } @@ -364,6 +424,25 @@ func (a *App) updateSettings() { } } +func (a *App) startServices() { + pprofService := NewPprofService(a.cfg, a.log) + a.services = append(a.services, pprofService) + go pprofService.Start() + + prometheusService := NewPrometheusService(a.cfg, a.log) + a.services = append(a.services, prometheusService) + go prometheusService.Start() +} + +func (a *App) stopServices() { + ctx, cancel := shutdownContext() + defer cancel() + + for _, svc := range a.services { + svc.ShutDown(ctx) + } +} + func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options { cfg := notifications.Options{} cfg.URL = v.GetString(cfgNATSEndpoint) diff --git a/cmd/s3-gw/app_metrics.go b/cmd/s3-gw/app_metrics.go index aa58199..9523990 100644 --- a/cmd/s3-gw/app_metrics.go +++ b/cmd/s3-gw/app_metrics.go @@ -67,6 +67,11 @@ func newGateMetrics(scraper StatisticScraper) *GateMetrics { } } +func (g *GateMetrics) Unregister() { + g.stateMetrics.unregister() + prometheus.Unregister(&g.poolMetricsCollector) +} + func newStateMetrics() *stateMetrics { return &stateMetrics{ healthCheck: prometheus.NewGauge(prometheus.GaugeOpts{ @@ -82,6 +87,10 @@ func (m stateMetrics) register() { prometheus.MustRegister(m.healthCheck) } +func (m stateMetrics) unregister() { + prometheus.Unregister(m.healthCheck) +} + func (m stateMetrics) SetHealth(s int32) { m.healthCheck.Set(float64(s)) } @@ -164,7 +173,7 @@ func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) { m.requestDuration.Collect(ch) } -func (m poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) { +func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) { m.overallErrors.Describe(descs) m.overallNodeErrors.Describe(descs) m.overallNodeRequests.Describe(descs)