From 329cff1bb1b5d90578a97dfdd0cf900137d9bf78 Mon Sep 17 00:00:00 2001
From: Pavel Pogodaev
Date: Sun, 24 Nov 2024 13:32:40 +0300
Subject: [PATCH] [#150] Add dropped logs metric
Signed-off-by: Pavel Pogodaev
---
cmd/http-gw/app.go | 112 ++++++++++++++++++++--------------------
cmd/http-gw/main.go | 3 +-
cmd/http-gw/server.go | 7 +++
cmd/http-gw/settings.go | 47 +++++++++--------
metrics/desc.go | 18 +++++++
metrics/metrics.go | 69 ++++++++++++++++++-------
6 files changed, 158 insertions(+), 98 deletions(-)
diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go
index 0dd53a6..623772f 100644
--- a/cmd/http-gw/app.go
+++ b/cmd/http-gw/app.go
@@ -50,36 +50,39 @@ import (
type (
app struct {
- ctx context.Context
- log *zap.Logger
- logLevel zap.AtomicLevel
- pool *pool.Pool
- treePool *treepool.Pool
- key *keys.PrivateKey
- owner *user.ID
- cfg *viper.Viper
- webServer *fasthttp.Server
- webDone chan struct{}
- resolver *resolver.ContainerResolver
- metrics *gateMetrics
- services []*metrics.Service
- settings *appSettings
+ ctx context.Context
+ log *zap.Logger
+ logLevel zap.AtomicLevel
+ pool *pool.Pool
+ treePool *treepool.Pool
+ key *keys.PrivateKey
+ owner *user.ID
+ cfg *viper.Viper
+ webServer *fasthttp.Server
+ webDone chan struct{}
+ resolver *resolver.ContainerResolver
+ metrics *AppMetrics
+ services []*metrics.Service
+ settings *appSettings
+ loggerSettings *loggerSettings
servers []Server
unbindServers []ServerInfo
mu sync.RWMutex
}
+ loggerSettings struct {
+ mu sync.RWMutex
+ appMetrics *AppMetrics
+ }
+
// App is an interface for the main gateway function.
App interface {
Wait()
Serve()
}
- // Option is an application option.
- Option func(a *app)
-
- gateMetrics struct {
+ AppMetrics struct {
logger *zap.Logger
provider *metrics.GateMetrics
mu sync.RWMutex
@@ -119,37 +122,17 @@ type (
}
)
-// WithLogger returns Option to set a specific logger.
-func WithLogger(l *zap.Logger, lvl zap.AtomicLevel) Option {
- return func(a *app) {
- if l == nil {
- return
- }
- a.log = l
- a.logLevel = lvl
- }
-}
+func newApp(ctx context.Context, v *viper.Viper) App {
+ logSettings := &loggerSettings{}
+ log := pickLogger(v, logSettings)
-// WithConfig returns Option to use specific Viper configuration.
-func WithConfig(c *viper.Viper) Option {
- return func(a *app) {
- if c == nil {
- return
- }
- a.cfg = c
- }
-}
-
-func newApp(ctx context.Context, opt ...Option) App {
a := &app{
- ctx: ctx,
- log: zap.L(),
- cfg: viper.GetViper(),
- webServer: new(fasthttp.Server),
- webDone: make(chan struct{}),
- }
- for i := range opt {
- opt[i](a)
+ ctx: ctx,
+ log: log.logger,
+ cfg: v,
+ loggerSettings: logSettings,
+ webServer: new(fasthttp.Server),
+ webDone: make(chan struct{}),
}
a.initAppSettings()
@@ -227,6 +210,22 @@ func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
s.corsMaxAge = corsMaxAge
}
+func (s *loggerSettings) DroppedLogsInc() {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ if s.appMetrics != nil {
+ s.appMetrics.provider.DroppedLogsInc()
+ }
+}
+
+func (s *loggerSettings) setMetrics(appMetrics *AppMetrics) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ s.appMetrics = appMetrics
+}
+
func (s *appSettings) DefaultTimestamp() bool {
s.mu.RLock()
defer s.mu.RUnlock()
@@ -338,27 +337,28 @@ func (a *app) initMetrics() {
gateMetricsProvider := metrics.NewGateMetrics(a.pool)
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting)
+ a.loggerSettings.setMetrics(a.metrics)
}
-func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
+func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *AppMetrics {
if !enabled {
logger.Warn(logs.MetricsAreDisabled)
}
- return &gateMetrics{
+ return &AppMetrics{
logger: logger,
provider: provider,
enabled: enabled,
}
}
-func (m *gateMetrics) isEnabled() bool {
+func (m *AppMetrics) isEnabled() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.enabled
}
-func (m *gateMetrics) SetEnabled(enabled bool) {
+func (m *AppMetrics) SetEnabled(enabled bool) {
if !enabled {
m.logger.Warn(logs.MetricsAreDisabled)
}
@@ -368,7 +368,7 @@ func (m *gateMetrics) SetEnabled(enabled bool) {
m.mu.Unlock()
}
-func (m *gateMetrics) SetHealth(status metrics.HealthStatus) {
+func (m *AppMetrics) SetHealth(status metrics.HealthStatus) {
if !m.isEnabled() {
return
}
@@ -376,7 +376,7 @@ func (m *gateMetrics) SetHealth(status metrics.HealthStatus) {
m.provider.SetHealth(status)
}
-func (m *gateMetrics) SetVersion(ver string) {
+func (m *AppMetrics) SetVersion(ver string) {
if !m.isEnabled() {
return
}
@@ -384,7 +384,7 @@ func (m *gateMetrics) SetVersion(ver string) {
m.provider.SetVersion(ver)
}
-func (m *gateMetrics) Shutdown() {
+func (m *AppMetrics) Shutdown() {
m.mu.Lock()
if m.enabled {
m.provider.SetHealth(metrics.HealthStatusShuttingDown)
@@ -394,7 +394,7 @@ func (m *gateMetrics) Shutdown() {
m.mu.Unlock()
}
-func (m *gateMetrics) MarkHealthy(endpoint string) {
+func (m *AppMetrics) MarkHealthy(endpoint string) {
if !m.isEnabled() {
return
}
@@ -402,7 +402,7 @@ func (m *gateMetrics) MarkHealthy(endpoint string) {
m.provider.MarkHealthy(endpoint)
}
-func (m *gateMetrics) MarkUnhealthy(endpoint string) {
+func (m *AppMetrics) MarkUnhealthy(endpoint string) {
if !m.isEnabled() {
return
}
diff --git a/cmd/http-gw/main.go b/cmd/http-gw/main.go
index ea9fbd7..fdd148c 100644
--- a/cmd/http-gw/main.go
+++ b/cmd/http-gw/main.go
@@ -9,9 +9,8 @@ import (
func main() {
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
v := settings()
- logger, atomicLevel := pickLogger(v)
- application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v))
+ application := newApp(globalContext, v)
go application.Serve()
application.Wait()
}
diff --git a/cmd/http-gw/server.go b/cmd/http-gw/server.go
index 694e9ee..e4370da 100644
--- a/cmd/http-gw/server.go
+++ b/cmd/http-gw/server.go
@@ -7,6 +7,8 @@ import (
"fmt"
"net"
"sync"
+
+ "go.uber.org/zap"
)
type (
@@ -41,6 +43,11 @@ type (
keyPath string
cert *tls.Certificate
}
+
+ Logger struct {
+ logger *zap.Logger
+ lvl zap.AtomicLevel
+ }
)
func (s *server) Address() string {
diff --git a/cmd/http-gw/settings.go b/cmd/http-gw/settings.go
index 316c500..9f1dd02 100644
--- a/cmd/http-gw/settings.go
+++ b/cmd/http-gw/settings.go
@@ -418,7 +418,11 @@ func mergeConfig(v *viper.Viper, fileName string) error {
return v.MergeConfig(cfgFile)
}
-func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
+type LoggerAppSettings interface {
+ DroppedLogsInc()
+}
+
+func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger {
lvl, err := getLogLevel(v)
if err != nil {
panic(err)
@@ -428,9 +432,9 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
switch dest {
case destinationStdout:
- return newStdoutLogger(v, lvl)
+ return newStdoutLogger(v, lvl, settings)
case destinationJournald:
- return newJournaldLogger(v, lvl)
+ return newJournaldLogger(v, lvl, settings)
default:
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
}
@@ -447,18 +451,20 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
// Logger records a stack trace for all messages at or above fatal level.
//
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
-func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
+func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
stdout := zapcore.AddSync(os.Stderr)
level := zap.NewAtomicLevelAt(lvl)
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
- consoleOutCore = samplingEnabling(v, consoleOutCore)
+ consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings)
- l := zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
- return l, level
+ return &Logger{
+ logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
+ lvl: level,
+ }
}
-func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
+func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
level := zap.NewAtomicLevelAt(lvl)
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
@@ -470,11 +476,12 @@ func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.Atom
zapjournald.SyslogPid(),
})
- coreWithContext = samplingEnabling(v, coreWithContext)
+ coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings)
- l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
-
- return l, level
+ return &Logger{
+ logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
+ lvl: level,
+ }
}
func newLogEncoder() zapcore.Encoder {
@@ -484,19 +491,17 @@ func newLogEncoder() zapcore.Encoder {
return zapcore.NewConsoleEncoder(c)
}
-func samplingEnabling(v *viper.Viper, core zapcore.Core) zapcore.Core {
- // Zap samples by logging the first cgfLoggerSamplingInitial entries with a given level
- // and message within the specified time interval.
- // In the above config, only the first cgfLoggerSamplingInitial log entries with the same level and message
- // are recorded in cfgLoggerSamplingInterval interval. Every other log entry will be dropped within the interval since
- // cfgLoggerSamplingThereafter is specified here.
+func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core {
if v.GetBool(cfgLoggerSamplingEnabled) {
- core = zapcore.NewSamplerWithOptions(
- core,
+ core = zapcore.NewSamplerWithOptions(core,
v.GetDuration(cfgLoggerSamplingInterval),
v.GetInt(cfgLoggerSamplingInitial),
v.GetInt(cfgLoggerSamplingThereafter),
- )
+ zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
+ if dec&zapcore.LogDropped > 0 {
+ settings.DroppedLogsInc()
+ }
+ }))
}
return core
diff --git a/metrics/desc.go b/metrics/desc.go
index e10050c..a00ab3e 100644
--- a/metrics/desc.go
+++ b/metrics/desc.go
@@ -76,6 +76,15 @@ var appMetricsDesc = map[string]map[string]Description{
VariableLabels: []string{"endpoint"},
},
},
+ statisticSubsystem: {
+ droppedLogs: Description{
+ Type: dto.MetricType_COUNTER,
+ Namespace: namespace,
+ Subsystem: statisticSubsystem,
+ Name: droppedLogs,
+ Help: "Dropped logs (by sampling) count",
+ },
+ },
}
type Description struct {
@@ -148,3 +157,12 @@ func mustNewGaugeVec(description Description) *prometheus.GaugeVec {
description.VariableLabels,
)
}
+
+func mustNewCounter(description Description) prometheus.Counter {
+ if description.Type != dto.MetricType_COUNTER {
+ panic("invalid metric type")
+ }
+ return prometheus.NewCounter(
+ prometheus.CounterOpts(newOpts(description)),
+ )
+}
diff --git a/metrics/metrics.go b/metrics/metrics.go
index b516477..fd0db66 100644
--- a/metrics/metrics.go
+++ b/metrics/metrics.go
@@ -10,15 +10,17 @@ import (
)
const (
- namespace = "frostfs_http_gw"
- stateSubsystem = "state"
- poolSubsystem = "pool"
- serverSubsystem = "server"
+ namespace = "frostfs_http_gw"
+ stateSubsystem = "state"
+ poolSubsystem = "pool"
+ serverSubsystem = "server"
+ statisticSubsystem = "statistic"
)
const (
healthMetric = "health"
versionInfoMetric = "version_info"
+ droppedLogs = "dropped_logs"
)
const (
@@ -30,21 +32,19 @@ const (
)
const (
- methodGetBalance = "get_balance"
- methodPutContainer = "put_container"
- methodGetContainer = "get_container"
- methodListContainer = "list_container"
- methodDeleteContainer = "delete_container"
- methodGetContainerEacl = "get_container_eacl"
- methodSetContainerEacl = "set_container_eacl"
- methodEndpointInfo = "endpoint_info"
- methodNetworkInfo = "network_info"
- methodPutObject = "put_object"
- methodDeleteObject = "delete_object"
- methodGetObject = "get_object"
- methodHeadObject = "head_object"
- methodRangeObject = "range_object"
- methodCreateSession = "create_session"
+ methodGetBalance = "get_balance"
+ methodPutContainer = "put_container"
+ methodGetContainer = "get_container"
+ methodListContainer = "list_container"
+ methodDeleteContainer = "delete_container"
+ methodEndpointInfo = "endpoint_info"
+ methodNetworkInfo = "network_info"
+ methodPutObject = "put_object"
+ methodDeleteObject = "delete_object"
+ methodGetObject = "get_object"
+ methodHeadObject = "head_object"
+ methodRangeObject = "range_object"
+ methodCreateSession = "create_session"
)
// HealthStatus of the gate application.
@@ -69,6 +69,7 @@ type GateMetrics struct {
stateMetrics
poolMetricsCollector
serverMetrics
+ StatisticMetrics
}
type stateMetrics struct {
@@ -76,6 +77,10 @@ type stateMetrics struct {
versionInfo *prometheus.GaugeVec
}
+type StatisticMetrics struct {
+ droppedLogs prometheus.Counter
+}
+
type poolMetricsCollector struct {
scraper StatisticScraper
overallErrors prometheus.Gauge
@@ -96,10 +101,14 @@ func NewGateMetrics(p StatisticScraper) *GateMetrics {
serverMetric := newServerMetrics()
serverMetric.register()
+ statsMetric := newStatisticMetrics()
+ statsMetric.register()
+
return &GateMetrics{
stateMetrics: *stateMetric,
poolMetricsCollector: *poolMetric,
serverMetrics: *serverMetric,
+ StatisticMetrics: *statsMetric,
}
}
@@ -107,6 +116,7 @@ func (g *GateMetrics) Unregister() {
g.stateMetrics.unregister()
prometheus.Unregister(&g.poolMetricsCollector)
g.serverMetrics.unregister()
+ g.StatisticMetrics.unregister()
}
func newStateMetrics() *stateMetrics {
@@ -116,6 +126,20 @@ func newStateMetrics() *stateMetrics {
}
}
+func newStatisticMetrics() *StatisticMetrics {
+ return &StatisticMetrics{
+ droppedLogs: mustNewCounter(appMetricsDesc[statisticSubsystem][droppedLogs]),
+ }
+}
+
+func (s *StatisticMetrics) register() {
+ prometheus.MustRegister(s.droppedLogs)
+}
+
+func (s *StatisticMetrics) unregister() {
+ prometheus.Unregister(s.droppedLogs)
+}
+
func (m stateMetrics) register() {
prometheus.MustRegister(m.healthCheck)
prometheus.MustRegister(m.versionInfo)
@@ -134,6 +158,13 @@ func (m stateMetrics) SetVersion(ver string) {
m.versionInfo.WithLabelValues(ver).Set(1)
}
+func (s *StatisticMetrics) DroppedLogsInc() {
+ if s == nil {
+ return
+ }
+ s.droppedLogs.Inc()
+}
+
func newPoolMetricsCollector(p StatisticScraper) *poolMetricsCollector {
return &poolMetricsCollector{
scraper: p,