[#502] Add Dropped logs (by sampling) metric
All checks were successful
/ DCO (pull_request) Successful in 2m9s
/ Vulncheck (pull_request) Successful in 2m22s
/ Builds (pull_request) Successful in 2m0s
/ Lint (pull_request) Successful in 3m6s
/ Tests (pull_request) Successful in 2m2s
/ Vulncheck (push) Successful in 1m13s
/ Builds (push) Successful in 1m58s
/ Lint (push) Successful in 3m33s
/ Tests (push) Successful in 2m19s

Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
This commit is contained in:
Pavel Pogodaev 2024-11-20 15:05:04 +03:00 committed by pogpp
parent 3cd88d6204
commit 51322cccdf
5 changed files with 75 additions and 21 deletions

View file

@ -83,11 +83,17 @@ type (
bucketResolver *resolver.BucketResolver bucketResolver *resolver.BucketResolver
services []*Service services []*Service
settings *appSettings settings *appSettings
loggerSettings *loggerSettings
webDone chan struct{} webDone chan struct{}
wrkDone chan struct{} wrkDone chan struct{}
} }
loggerSettings struct {
mu sync.RWMutex
appMetrics *metrics.AppMetrics
}
appSettings struct { appSettings struct {
logLevel zap.AtomicLevel logLevel zap.AtomicLevel
httpLogging s3middleware.LogHTTPConfig httpLogging s3middleware.LogHTTPConfig
@ -132,7 +138,25 @@ type (
} }
) )
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { func (s *loggerSettings) DroppedLogsInc() {
s.mu.RLock()
defer s.mu.RUnlock()
if s.appMetrics != nil {
s.appMetrics.Statistic().DroppedLogsInc()
}
}
func (s *loggerSettings) setMetrics(appMetrics *metrics.AppMetrics) {
s.mu.Lock()
defer s.mu.Unlock()
s.appMetrics = appMetrics
}
func newApp(ctx context.Context, v *viper.Viper) *App {
logSettings := &loggerSettings{}
log := pickLogger(v, logSettings)
settings := newAppSettings(log, v) settings := newAppSettings(log, v)
objPool, treePool, key := getPools(ctx, log.logger, v, settings.dialerSource) objPool, treePool, key := getPools(ctx, log.logger, v, settings.dialerSource)
@ -148,6 +172,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
wrkDone: make(chan struct{}, 1), wrkDone: make(chan struct{}, 1),
settings: settings, settings: settings,
loggerSettings: logSettings,
} }
app.init(ctx) app.init(ctx)
@ -521,6 +546,7 @@ func (a *App) initMetrics() {
a.metrics = metrics.NewAppMetrics(cfg) a.metrics = metrics.NewAppMetrics(cfg)
a.metrics.State().SetHealth(metrics.HealthStatusStarting) a.metrics.State().SetHealth(metrics.HealthStatusStarting)
a.loggerSettings.setMetrics(a.metrics)
} }
func (a *App) initFrostfsID(ctx context.Context) { func (a *App) initFrostfsID(ctx context.Context) {

View file

@ -1094,7 +1094,11 @@ func mergeConfig(v *viper.Viper, fileName string) error {
return v.MergeConfig(cfgFile) return v.MergeConfig(cfgFile)
} }
func pickLogger(v *viper.Viper) *Logger { type LoggerAppSettings interface {
DroppedLogsInc()
}
func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger {
lvl, err := getLogLevel(v) lvl, err := getLogLevel(v)
if err != nil { if err != nil {
panic(err) panic(err)
@ -1104,9 +1108,9 @@ func pickLogger(v *viper.Viper) *Logger {
switch dest { switch dest {
case destinationStdout: case destinationStdout:
return newStdoutLogger(v, lvl) return newStdoutLogger(v, lvl, settings)
case destinationJournald: case destinationJournald:
return newJournaldLogger(v, lvl) return newJournaldLogger(v, lvl, settings)
default: default:
panic(fmt.Sprintf("wrong destination for logger: %s", dest)) panic(fmt.Sprintf("wrong destination for logger: %s", dest))
} }
@ -1126,13 +1130,13 @@ func pickLogger(v *viper.Viper) *Logger {
// Logger records a stack trace for all messages at or above fatal level. // Logger records a stack trace for all messages at or above fatal level.
// //
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace. // See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) *Logger { func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
stdout := zapcore.AddSync(os.Stderr) stdout := zapcore.AddSync(os.Stderr)
level := zap.NewAtomicLevelAt(lvl) level := zap.NewAtomicLevelAt(lvl)
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level) consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
consoleOutCore = samplingEnabling(v, consoleOutCore) consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings)
return &Logger{ return &Logger{
logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))), logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
@ -1140,7 +1144,7 @@ func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
} }
} }
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) *Logger { func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
level := zap.NewAtomicLevelAt(lvl) level := zap.NewAtomicLevelAt(lvl)
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields) encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
@ -1152,7 +1156,7 @@ func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
zapjournald.SyslogPid(), zapjournald.SyslogPid(),
}) })
coreWithContext = samplingEnabling(v, coreWithContext) coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings)
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))) l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
@ -1169,19 +1173,17 @@ func newLogEncoder() zapcore.Encoder {
return zapcore.NewConsoleEncoder(c) return zapcore.NewConsoleEncoder(c)
} }
func samplingEnabling(v *viper.Viper, core zapcore.Core) zapcore.Core { func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core {
// Zap samples by logging the first cgfLoggerSamplingInitial entries with a given level
// and message within the specified time interval.
// In the above config, only the first cgfLoggerSamplingInitial log entries with the same level and message
// are recorded in cfgLoggerSamplingInterval interval. Every other log entry will be dropped within the interval since
// cfgLoggerSamplingThereafter is specified here.
if v.GetBool(cfgLoggerSamplingEnabled) { if v.GetBool(cfgLoggerSamplingEnabled) {
core = zapcore.NewSamplerWithOptions( core = zapcore.NewSamplerWithOptions(core,
core,
v.GetDuration(cfgLoggerSamplingInterval), v.GetDuration(cfgLoggerSamplingInterval),
v.GetInt(cfgLoggerSamplingInitial), v.GetInt(cfgLoggerSamplingInitial),
v.GetInt(cfgLoggerSamplingThereafter), v.GetInt(cfgLoggerSamplingThereafter),
) zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
if dec&zapcore.LogDropped > 0 {
settings.DroppedLogsInc()
}
}))
} }
return core return core

View file

@ -8,11 +8,9 @@ import (
func main() { func main() {
g, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) g, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
v := newSettings() v := newSettings()
l := pickLogger(v)
a := newApp(g, l, v) a := newApp(g, v)
go a.Serve(g) go a.Serve(g)

View file

@ -93,6 +93,13 @@ var appMetricsDesc = map[string]map[string]Description{
}, },
}, },
statisticSubsystem: { statisticSubsystem: {
droppedLogs: Description{
Type: dto.MetricType_COUNTER,
Namespace: namespace,
Subsystem: statisticSubsystem,
Name: droppedLogs,
Help: "Dropped logs (by sampling) count",
},
requestsSecondsMetric: Description{ requestsSecondsMetric: Description{
Type: dto.MetricType_HISTOGRAM, Type: dto.MetricType_HISTOGRAM,
Namespace: namespace, Namespace: namespace,
@ -252,3 +259,12 @@ func mustNewHistogramVec(description Description, buckets []float64) *prometheus
description.VariableLabels, description.VariableLabels,
) )
} }
func mustNewCounter(description Description) prometheus.Counter {
if description.Type != dto.MetricType_COUNTER {
panic("invalid metric type")
}
return prometheus.NewCounter(
prometheus.CounterOpts(newOpts(description)),
)
}

View file

@ -34,6 +34,7 @@ type (
APIStatMetrics struct { APIStatMetrics struct {
stats *httpStats stats *httpStats
httpRequestsDuration *prometheus.HistogramVec httpRequestsDuration *prometheus.HistogramVec
droppedLogs prometheus.Counter
} }
) )
@ -47,6 +48,7 @@ const (
requestsTotalMetric = "requests_total" requestsTotalMetric = "requests_total"
errorsTotalMetric = "errors_total" errorsTotalMetric = "errors_total"
bytesTotalMetric = "bytes_total" bytesTotalMetric = "bytes_total"
droppedLogs = "dropped_logs"
) )
const ( const (
@ -61,6 +63,7 @@ func newAPIStatMetrics() *APIStatMetrics {
stats: newHTTPStats(), stats: newHTTPStats(),
httpRequestsDuration: mustNewHistogramVec(histogramDesc, httpRequestsDuration: mustNewHistogramVec(histogramDesc,
[]float64{.05, .1, .25, .5, 1, 2.5, 5, 10}), []float64{.05, .1, .25, .5, 1, 2.5, 5, 10}),
droppedLogs: mustNewCounter(appMetricsDesc[statisticSubsystem][droppedLogs]),
} }
} }
@ -119,6 +122,7 @@ func (a *APIStatMetrics) Describe(ch chan<- *prometheus.Desc) {
return return
} }
a.stats.Describe(ch) a.stats.Describe(ch)
a.droppedLogs.Describe(ch)
a.httpRequestsDuration.Describe(ch) a.httpRequestsDuration.Describe(ch)
} }
@ -127,9 +131,17 @@ func (a *APIStatMetrics) Collect(ch chan<- prometheus.Metric) {
return return
} }
a.stats.Collect(ch) a.stats.Collect(ch)
a.droppedLogs.Collect(ch)
a.httpRequestsDuration.Collect(ch) a.httpRequestsDuration.Collect(ch)
} }
func (a *APIStatMetrics) DroppedLogsInc() {
if a == nil {
return
}
a.droppedLogs.Inc()
}
func newHTTPStats() *httpStats { func newHTTPStats() *httpStats {
return &httpStats{ return &httpStats{
currentS3RequestsDesc: newDesc(appMetricsDesc[statisticSubsystem][requestsCurrentMetric]), currentS3RequestsDesc: newDesc(appMetricsDesc[statisticSubsystem][requestsCurrentMetric]),