[#502] Add Dropped logs (by sampling) metric #557
5 changed files with 75 additions and 21 deletions
|
@ -83,11 +83,17 @@ type (
|
|||
bucketResolver *resolver.BucketResolver
|
||||
services []*Service
|
||||
settings *appSettings
|
||||
loggerSettings *loggerSettings
|
||||
|
||||
webDone chan struct{}
|
||||
wrkDone chan struct{}
|
||||
}
|
||||
|
||||
loggerSettings struct {
|
||||
mu sync.RWMutex
|
||||
appMetrics *metrics.AppMetrics
|
||||
}
|
||||
|
||||
appSettings struct {
|
||||
logLevel zap.AtomicLevel
|
||||
httpLogging s3middleware.LogHTTPConfig
|
||||
|
@ -132,7 +138,25 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
||||
func (s *loggerSettings) DroppedLogsInc() {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.Statistic().DroppedLogsInc()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *loggerSettings) setMetrics(appMetrics *metrics.AppMetrics) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.appMetrics = appMetrics
|
||||
}
|
||||
|
||||
func newApp(ctx context.Context, v *viper.Viper) *App {
|
||||
logSettings := &loggerSettings{}
|
||||
log := pickLogger(v, logSettings)
|
||||
settings := newAppSettings(log, v)
|
||||
|
||||
objPool, treePool, key := getPools(ctx, log.logger, v, settings.dialerSource)
|
||||
|
@ -147,7 +171,8 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
|||
webDone: make(chan struct{}, 1),
|
||||
wrkDone: make(chan struct{}, 1),
|
||||
|
||||
settings: settings,
|
||||
settings: settings,
|
||||
loggerSettings: logSettings,
|
||||
}
|
||||
|
||||
app.init(ctx)
|
||||
|
@ -521,6 +546,7 @@ func (a *App) initMetrics() {
|
|||
|
||||
a.metrics = metrics.NewAppMetrics(cfg)
|
||||
a.metrics.State().SetHealth(metrics.HealthStatusStarting)
|
||||
a.loggerSettings.setMetrics(a.metrics)
|
||||
}
|
||||
|
||||
func (a *App) initFrostfsID(ctx context.Context) {
|
||||
|
|
|
@ -1094,7 +1094,11 @@ func mergeConfig(v *viper.Viper, fileName string) error {
|
|||
return v.MergeConfig(cfgFile)
|
||||
}
|
||||
|
||||
func pickLogger(v *viper.Viper) *Logger {
|
||||
type LoggerAppSettings interface {
|
||||
DroppedLogsInc()
|
||||
}
|
||||
|
||||
func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger {
|
||||
lvl, err := getLogLevel(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -1104,9 +1108,9 @@ func pickLogger(v *viper.Viper) *Logger {
|
|||
|
||||
switch dest {
|
||||
case destinationStdout:
|
||||
return newStdoutLogger(v, lvl)
|
||||
return newStdoutLogger(v, lvl, settings)
|
||||
case destinationJournald:
|
||||
return newJournaldLogger(v, lvl)
|
||||
return newJournaldLogger(v, lvl, settings)
|
||||
default:
|
||||
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
||||
}
|
||||
|
@ -1126,13 +1130,13 @@ func pickLogger(v *viper.Viper) *Logger {
|
|||
// Logger records a stack trace for all messages at or above fatal level.
|
||||
//
|
||||
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
||||
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
|
||||
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
|
||||
stdout := zapcore.AddSync(os.Stderr)
|
||||
level := zap.NewAtomicLevelAt(lvl)
|
||||
|
||||
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
|
||||
|
||||
consoleOutCore = samplingEnabling(v, consoleOutCore)
|
||||
consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings)
|
||||
|
||||
return &Logger{
|
||||
logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
||||
|
@ -1140,7 +1144,7 @@ func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
|
|||
}
|
||||
}
|
||||
|
||||
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
|
||||
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
|
||||
level := zap.NewAtomicLevelAt(lvl)
|
||||
|
||||
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
|
||||
|
@ -1152,7 +1156,7 @@ func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) *Logger {
|
|||
zapjournald.SyslogPid(),
|
||||
})
|
||||
|
||||
coreWithContext = samplingEnabling(v, coreWithContext)
|
||||
coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings)
|
||||
|
||||
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
|
||||
|
||||
|
@ -1169,19 +1173,17 @@ func newLogEncoder() zapcore.Encoder {
|
|||
return zapcore.NewConsoleEncoder(c)
|
||||
}
|
||||
|
||||
func samplingEnabling(v *viper.Viper, core zapcore.Core) zapcore.Core {
|
||||
// Zap samples by logging the first cgfLoggerSamplingInitial entries with a given level
|
||||
// and message within the specified time interval.
|
||||
// In the above config, only the first cgfLoggerSamplingInitial log entries with the same level and message
|
||||
// are recorded in cfgLoggerSamplingInterval interval. Every other log entry will be dropped within the interval since
|
||||
// cfgLoggerSamplingThereafter is specified here.
|
||||
func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core {
|
||||
if v.GetBool(cfgLoggerSamplingEnabled) {
|
||||
core = zapcore.NewSamplerWithOptions(
|
||||
core,
|
||||
core = zapcore.NewSamplerWithOptions(core,
|
||||
v.GetDuration(cfgLoggerSamplingInterval),
|
||||
v.GetInt(cfgLoggerSamplingInitial),
|
||||
v.GetInt(cfgLoggerSamplingThereafter),
|
||||
)
|
||||
zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
|
||||
if dec&zapcore.LogDropped > 0 {
|
||||
settings.DroppedLogsInc()
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
return core
|
||||
|
|
|
@ -8,11 +8,9 @@ import (
|
|||
|
||||
func main() {
|
||||
g, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
v := newSettings()
|
||||
l := pickLogger(v)
|
||||
|
||||
a := newApp(g, l, v)
|
||||
a := newApp(g, v)
|
||||
|
||||
go a.Serve(g)
|
||||
|
||||
|
|
|
@ -93,6 +93,13 @@ var appMetricsDesc = map[string]map[string]Description{
|
|||
},
|
||||
},
|
||||
statisticSubsystem: {
|
||||
droppedLogs: Description{
|
||||
Type: dto.MetricType_COUNTER,
|
||||
Namespace: namespace,
|
||||
Subsystem: statisticSubsystem,
|
||||
Name: droppedLogs,
|
||||
Help: "Dropped logs (by sampling) count",
|
||||
},
|
||||
requestsSecondsMetric: Description{
|
||||
Type: dto.MetricType_HISTOGRAM,
|
||||
Namespace: namespace,
|
||||
|
@ -252,3 +259,12 @@ func mustNewHistogramVec(description Description, buckets []float64) *prometheus
|
|||
description.VariableLabels,
|
||||
)
|
||||
}
|
||||
|
||||
func mustNewCounter(description Description) prometheus.Counter {
|
||||
if description.Type != dto.MetricType_COUNTER {
|
||||
panic("invalid metric type")
|
||||
}
|
||||
return prometheus.NewCounter(
|
||||
prometheus.CounterOpts(newOpts(description)),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ type (
|
|||
APIStatMetrics struct {
|
||||
stats *httpStats
|
||||
httpRequestsDuration *prometheus.HistogramVec
|
||||
droppedLogs prometheus.Counter
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -47,6 +48,7 @@ const (
|
|||
requestsTotalMetric = "requests_total"
|
||||
errorsTotalMetric = "errors_total"
|
||||
bytesTotalMetric = "bytes_total"
|
||||
droppedLogs = "dropped_logs"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -61,6 +63,7 @@ func newAPIStatMetrics() *APIStatMetrics {
|
|||
stats: newHTTPStats(),
|
||||
httpRequestsDuration: mustNewHistogramVec(histogramDesc,
|
||||
[]float64{.05, .1, .25, .5, 1, 2.5, 5, 10}),
|
||||
droppedLogs: mustNewCounter(appMetricsDesc[statisticSubsystem][droppedLogs]),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,6 +122,7 @@ func (a *APIStatMetrics) Describe(ch chan<- *prometheus.Desc) {
|
|||
return
|
||||
}
|
||||
a.stats.Describe(ch)
|
||||
a.droppedLogs.Describe(ch)
|
||||
a.httpRequestsDuration.Describe(ch)
|
||||
}
|
||||
|
||||
|
@ -127,9 +131,17 @@ func (a *APIStatMetrics) Collect(ch chan<- prometheus.Metric) {
|
|||
return
|
||||
}
|
||||
a.stats.Collect(ch)
|
||||
a.droppedLogs.Collect(ch)
|
||||
a.httpRequestsDuration.Collect(ch)
|
||||
}
|
||||
|
||||
func (a *APIStatMetrics) DroppedLogsInc() {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
a.droppedLogs.Inc()
|
||||
}
|
||||
|
||||
func newHTTPStats() *httpStats {
|
||||
return &httpStats{
|
||||
currentS3RequestsDesc: newDesc(appMetricsDesc[statisticSubsystem][requestsCurrentMetric]),
|
||||
|
|
Loading…
Reference in a new issue