From 51322cccdf54e0e7bd4fd3fb0acd5553606498f1 Mon Sep 17 00:00:00 2001
From: Pavel Pogodaev
Date: Wed, 20 Nov 2024 15:05:04 +0300
Subject: [PATCH] [#502] Add Dropped logs (by sampling) metric
Signed-off-by: Pavel Pogodaev
---
cmd/s3-gw/app.go | 30 ++++++++++++++++++++++++++++--
cmd/s3-gw/app_settings.go | 34 ++++++++++++++++++----------------
cmd/s3-gw/main.go | 4 +---
metrics/desc.go | 16 ++++++++++++++++
metrics/stats.go | 12 ++++++++++++
5 files changed, 75 insertions(+), 21 deletions(-)
diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go
index 1ac2b30..8404022 100644
--- a/cmd/s3-gw/app.go
+++ b/cmd/s3-gw/app.go
@@ -83,11 +83,17 @@ type (
bucketResolver *resolver.BucketResolver
services []*Service
settings *appSettings
+ loggerSettings *loggerSettings
webDone chan struct{}
wrkDone chan struct{}
}
+ loggerSettings struct {
+ mu sync.RWMutex
+ appMetrics *metrics.AppMetrics
+ }
+
appSettings struct {
logLevel zap.AtomicLevel
httpLogging s3middleware.LogHTTPConfig
@@ -132,7 +138,25 @@ type (
}
)
-func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
+func (s *loggerSettings) DroppedLogsInc() {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ if s.appMetrics != nil {
+ s.appMetrics.Statistic().DroppedLogsInc()
+ }
+}
+
+func (s *loggerSettings) setMetrics(appMetrics *metrics.AppMetrics) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ s.appMetrics = appMetrics
+}
+
+func newApp(ctx context.Context, v *viper.Viper) *App {
+ logSettings := &loggerSettings{}
+ log := pickLogger(v, logSettings)
settings := newAppSettings(log, v)
objPool, treePool, key := getPools(ctx, log.logger, v, settings.dialerSource)
@@ -147,7 +171,8 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
webDone: make(chan struct{}, 1),
wrkDone: make(chan struct{}, 1),
- settings: settings,
+ settings: settings,
+ loggerSettings: logSettings,
}
app.init(ctx)
@@ -521,6 +546,7 @@ func (a *App) initMetrics() {
a.metrics = metrics.NewAppMetrics(cfg)
a.metrics.State().SetHealth(metrics.HealthStatusStarting)
+ a.loggerSettings.setMetrics(a.metrics)
}
func (a *App) initFrostfsID(ctx context.Context) {
diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go
index 691e231..f33de2e 100644
--- a/cmd/s3-gw/app_settings.go
+++ b/cmd/s3-gw/app_settings.go
@@ -1094,7 +1094,11 @@ func mergeConfig(v *viper.Viper, fileName string) error {
return v.MergeConfig(cfgFile)
}
-func pickLogger(v *viper.Viper) *Logger {
+type LoggerAppSettings interface {
+ DroppedLogsInc()
+}
+
+func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger {
lvl, err := getLogLevel(v)
if err != nil {
panic(err)
@@ -1104,9 +1108,9 @@ func pickLogger(v *viper.Viper) *Logger {
switch dest {
case destinationStdout:
- return newStdoutLogger(v, lvl)
+ return newStdoutLogger(v, lvl, settings)
case destinationJournald:
- return newJournaldLogger(v, lvl)
+ return newJournaldLogger(v, lvl, settings)
default:
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
}
@@ -1126,13 +1130,13 @@ func pickLogger(v *viper.Viper) *Logger {
// Logger records a stack trace for all messages at or above fatal level.
//
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
-func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
+func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
stdout := zapcore.AddSync(os.Stderr)
level := zap.NewAtomicLevelAt(lvl)
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
- consoleOutCore = samplingEnabling(v, consoleOutCore)
+ consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings)
return &Logger{
logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
@@ -1140,7 +1144,7 @@ func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
}
}
-func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
+func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
level := zap.NewAtomicLevelAt(lvl)
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
@@ -1152,7 +1156,7 @@ func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
zapjournald.SyslogPid(),
})
- coreWithContext = samplingEnabling(v, coreWithContext)
+ coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings)
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
@@ -1169,19 +1173,17 @@ func newLogEncoder() zapcore.Encoder {
return zapcore.NewConsoleEncoder(c)
}
-func samplingEnabling(v *viper.Viper, core zapcore.Core) zapcore.Core {
- // Zap samples by logging the first cgfLoggerSamplingInitial entries with a given level
- // and message within the specified time interval.
- // In the above config, only the first cgfLoggerSamplingInitial log entries with the same level and message
- // are recorded in cfgLoggerSamplingInterval interval. Every other log entry will be dropped within the interval since
- // cfgLoggerSamplingThereafter is specified here.
+func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core {
if v.GetBool(cfgLoggerSamplingEnabled) {
- core = zapcore.NewSamplerWithOptions(
- core,
+ core = zapcore.NewSamplerWithOptions(core,
v.GetDuration(cfgLoggerSamplingInterval),
v.GetInt(cfgLoggerSamplingInitial),
v.GetInt(cfgLoggerSamplingThereafter),
- )
+ zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
+ if dec&zapcore.LogDropped > 0 {
+ settings.DroppedLogsInc()
+ }
+ }))
}
return core
diff --git a/cmd/s3-gw/main.go b/cmd/s3-gw/main.go
index f76bced..927ceb9 100644
--- a/cmd/s3-gw/main.go
+++ b/cmd/s3-gw/main.go
@@ -8,11 +8,9 @@ import (
func main() {
g, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
-
v := newSettings()
- l := pickLogger(v)
- a := newApp(g, l, v)
+ a := newApp(g, v)
go a.Serve(g)
diff --git a/metrics/desc.go b/metrics/desc.go
index 6affeef..e3827ec 100644
--- a/metrics/desc.go
+++ b/metrics/desc.go
@@ -93,6 +93,13 @@ var appMetricsDesc = map[string]map[string]Description{
},
},
statisticSubsystem: {
+ droppedLogs: Description{
+ Type: dto.MetricType_COUNTER,
+ Namespace: namespace,
+ Subsystem: statisticSubsystem,
+ Name: droppedLogs,
+ Help: "Dropped logs (by sampling) count",
+ },
requestsSecondsMetric: Description{
Type: dto.MetricType_HISTOGRAM,
Namespace: namespace,
@@ -252,3 +259,12 @@ func mustNewHistogramVec(description Description, buckets []float64) *prometheus
description.VariableLabels,
)
}
+
+func mustNewCounter(description Description) prometheus.Counter {
+ if description.Type != dto.MetricType_COUNTER {
+ panic("invalid metric type")
+ }
+ return prometheus.NewCounter(
+ prometheus.CounterOpts(newOpts(description)),
+ )
+}
diff --git a/metrics/stats.go b/metrics/stats.go
index 7f23c62..5f4f408 100644
--- a/metrics/stats.go
+++ b/metrics/stats.go
@@ -34,6 +34,7 @@ type (
APIStatMetrics struct {
stats *httpStats
httpRequestsDuration *prometheus.HistogramVec
+ droppedLogs prometheus.Counter
}
)
@@ -47,6 +48,7 @@ const (
requestsTotalMetric = "requests_total"
errorsTotalMetric = "errors_total"
bytesTotalMetric = "bytes_total"
+ droppedLogs = "dropped_logs"
)
const (
@@ -61,6 +63,7 @@ func newAPIStatMetrics() *APIStatMetrics {
stats: newHTTPStats(),
httpRequestsDuration: mustNewHistogramVec(histogramDesc,
[]float64{.05, .1, .25, .5, 1, 2.5, 5, 10}),
+ droppedLogs: mustNewCounter(appMetricsDesc[statisticSubsystem][droppedLogs]),
}
}
@@ -119,6 +122,7 @@ func (a *APIStatMetrics) Describe(ch chan<- *prometheus.Desc) {
return
}
a.stats.Describe(ch)
+ a.droppedLogs.Describe(ch)
a.httpRequestsDuration.Describe(ch)
}
@@ -127,9 +131,17 @@ func (a *APIStatMetrics) Collect(ch chan<- prometheus.Metric) {
return
}
a.stats.Collect(ch)
+ a.droppedLogs.Collect(ch)
a.httpRequestsDuration.Collect(ch)
}
+func (a *APIStatMetrics) DroppedLogsInc() {
+ if a == nil {
+ return
+ }
+ a.droppedLogs.Inc()
+}
+
func newHTTPStats() *httpStats {
return &httpStats{
currentS3RequestsDesc: newDesc(appMetricsDesc[statisticSubsystem][requestsCurrentMetric]),