[#20] Add metric to see number of dropped logs
Some checks failed
/ DCO (pull_request) Successful in 39s
/ Builds (pull_request) Successful in 1m14s
/ Vulncheck (pull_request) Successful in 1m16s
/ Lint (pull_request) Failing after 1m4s
/ Tests (pull_request) Successful in 1m14s

Add new metric frostfs_s3_lifecycler_statistic_dropped_logs
Also, configuration sampling interval is added

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-09-24 15:56:15 +03:00
parent f3359f98f4
commit 1827d365c7
9 changed files with 102 additions and 36 deletions

View file

@ -49,13 +49,16 @@ const (
HealthStatusShuttingDown int32 = 3 HealthStatusShuttingDown int32 = 3
) )
func newApp(ctx context.Context, cfg *viper.Viper, log *Logger) *App { func newApp(ctx context.Context, cfg *viper.Viper) *App {
appMetrics := metrics.NewAppMetrics()
log := pickLogger(cfg, appMetrics)
a := &App{ a := &App{
log: log.logger, log: log.logger,
logLevel: log.lvl, logLevel: log.lvl,
cfg: cfg, cfg: cfg,
done: make(chan struct{}), done: make(chan struct{}),
appMetrics: metrics.NewAppMetrics(), appMetrics: appMetrics,
settings: newAppSettings(cfg, log), settings: newAppSettings(cfg, log),
} }
a.appMetrics.SetHealth(HealthStatusStarting) a.appMetrics.SetHealth(HealthStatusStarting)

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/metrics"
"git.frostfs.info/TrueCloudLab/zapjournald" "git.frostfs.info/TrueCloudLab/zapjournald"
"github.com/nspcc-dev/neo-go/cli/options" "github.com/nspcc-dev/neo-go/cli/options"
"github.com/spf13/viper" "github.com/spf13/viper"
@ -22,18 +23,18 @@ type Logger struct {
lvl zap.AtomicLevel lvl zap.AtomicLevel
} }
func pickLogger(v *viper.Viper) *Logger { func pickLogger(v *viper.Viper, appMetrics *metrics.AppMetrics) *Logger {
switch dest := v.GetString(cfgLoggerDestination); dest { switch dest := v.GetString(cfgLoggerDestination); dest {
case destinationStdout: case destinationStdout:
return newStdoutLogger(v) return newStdoutLogger(v, appMetrics)
case destinationJournald: case destinationJournald:
return newJournaldLogger(v) return newJournaldLogger(v, appMetrics)
default: default:
panic(fmt.Sprintf("wrong destination for logger: %s", dest)) panic(fmt.Sprintf("wrong destination for logger: %s", dest))
} }
} }
func newStdoutLogger(v *viper.Viper) *Logger { func newStdoutLogger(v *viper.Viper, appMetrics *metrics.AppMetrics) *Logger {
c := newZapLogConfig(v) c := newZapLogConfig(v)
out, errSink, err := openZapSinks(c) out, errSink, err := openZapSinks(c)
@ -42,27 +43,27 @@ func newStdoutLogger(v *viper.Viper) *Logger {
} }
core := zapcore.NewCore(zapcore.NewConsoleEncoder(c.EncoderConfig), out, c.Level) core := zapcore.NewCore(zapcore.NewConsoleEncoder(c.EncoderConfig), out, c.Level)
core = options.NewFilteringCore(core, filteringLogOption(v)) core = applyZapCoreMiddlewares(core, v, appMetrics)
l := zap.New(core, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.ErrorOutput(errSink)) l := zap.New(core, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.ErrorOutput(errSink))
return &Logger{logger: l, lvl: c.Level} return &Logger{logger: l, lvl: c.Level}
} }
func newJournaldLogger(v *viper.Viper) *Logger { func newJournaldLogger(v *viper.Viper, appMetrics *metrics.AppMetrics) *Logger {
c := newZapLogConfig(v) c := newZapLogConfig(v)
// We can use NewJSONEncoder instead if, say, frontend // We can use NewJSONEncoder instead if, say, frontend
// would like to access journald logs and parse them easily. // would like to access journald logs and parse them easily.
encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields) encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields)
core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields) journalCore := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
filteringCore := options.NewFilteringCore(core, filteringLogOption(v)) core := journalCore.With([]zapcore.Field{
coreWithContext := filteringCore.With([]zapcore.Field{
zapjournald.SyslogFacility(zapjournald.LogDaemon), zapjournald.SyslogFacility(zapjournald.LogDaemon),
zapjournald.SyslogIdentifier(), zapjournald.SyslogIdentifier(),
zapjournald.SyslogPid(), zapjournald.SyslogPid(),
}) })
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))) core = applyZapCoreMiddlewares(core, v, appMetrics)
l := zap.New(core, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
return &Logger{logger: l, lvl: c.Level} return &Logger{logger: l, lvl: c.Level}
} }
@ -79,25 +80,38 @@ func openZapSinks(cfg zap.Config) (zapcore.WriteSyncer, zapcore.WriteSyncer, err
return sink, errSink, nil return sink, errSink, nil
} }
func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, appMetrics *metrics.AppMetrics) zapcore.Core {
core = options.NewFilteringCore(core, filteringLogOption(v))
if v.GetBool(cfgLoggerSamplingEnabled) {
core = zapcore.NewSamplerWithOptions(core,
v.GetDuration(cfgLoggerSamplingInterval),
v.GetInt(cfgLoggerSamplingInitial),
v.GetInt(cfgLoggerSamplingThereafter),
zapcore.SamplerHook(func(entry zapcore.Entry, dec zapcore.SamplingDecision) {
if dec&zapcore.LogDropped > 0 {
appMetrics.DroppedLogsInc()
}
}))
}
return core
}
func newZapLogConfig(v *viper.Viper) zap.Config { func newZapLogConfig(v *viper.Viper) zap.Config {
lvl, err := getLogLevel(v.GetString(cfgLoggerLevel)) lvl, err := getLogLevel(v.GetString(cfgLoggerLevel))
if err != nil { if err != nil {
panic(err) panic(err)
} }
c := zap.NewProductionConfig() c := zap.Config{
c.Level = zap.NewAtomicLevelAt(lvl) Level: zap.NewAtomicLevelAt(lvl),
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
if v.GetBool(cfgLoggerSamplingEnabled) {
c.Sampling = &zap.SamplingConfig{
Initial: v.GetInt(cfgLoggerSamplingInitial),
Thereafter: v.GetInt(cfgLoggerSamplingThereafter),
}
} else {
c.Sampling = nil
}
return c return c
} }

View file

@ -10,10 +10,7 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel() defer cancel()
cfg := settings() app := newApp(ctx, settings())
log := pickLogger(cfg)
app := newApp(ctx, cfg, log)
go app.Serve(ctx) go app.Serve(ctx)
app.Wait() app.Wait()
} }

View file

@ -41,6 +41,7 @@ const (
cfgLoggerSamplingEnabled = "logger.sampling.enabled" cfgLoggerSamplingEnabled = "logger.sampling.enabled"
cfgLoggerSamplingInitial = "logger.sampling.initial" cfgLoggerSamplingInitial = "logger.sampling.initial"
cfgLoggerSamplingThereafter = "logger.sampling.thereafter" cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
cfgLoggerSamplingInterval = "logger.sampling.interval"
cfgLoggerTags = "logger.tags" cfgLoggerTags = "logger.tags"
// Morph. // Morph.
@ -131,6 +132,7 @@ func settings() *viper.Viper {
v.SetDefault(cfgLoggerDestination, "stdout") v.SetDefault(cfgLoggerDestination, "stdout")
v.SetDefault(cfgLoggerSamplingThereafter, 100) v.SetDefault(cfgLoggerSamplingThereafter, 100)
v.SetDefault(cfgLoggerSamplingInitial, 100) v.SetDefault(cfgLoggerSamplingInitial, 100)
v.SetDefault(cfgLoggerSamplingInterval, time.Second)
// services: // services:
v.SetDefault(cfgPrometheusEnabled, false) v.SetDefault(cfgPrometheusEnabled, false)

View file

@ -12,6 +12,7 @@ S3_LIFECYCLER_LOGGER_DESTINATION=stdout
S3_LIFECYCLER_LOGGER_SAMPLING_ENABLED=false S3_LIFECYCLER_LOGGER_SAMPLING_ENABLED=false
S3_LIFECYCLER_LOGGER_SAMPLING_INITIAL=100 S3_LIFECYCLER_LOGGER_SAMPLING_INITIAL=100
S3_LIFECYCLER_LOGGER_SAMPLING_THEREAFTER=100 S3_LIFECYCLER_LOGGER_SAMPLING_THEREAFTER=100
S3_LIFECYCLER_LOGGER_SAMPLING_INTERVAL=1s
# Metrics # Metrics
S3_LIFECYCLER_PPROF_ENABLED=false S3_LIFECYCLER_PPROF_ENABLED=false

View file

@ -11,6 +11,7 @@ logger:
enabled: false enabled: false
initial: 100 initial: 100
thereafter: 100 thereafter: 100
interval: 1s
pprof: pprof:
enabled: false enabled: false

View file

@ -53,18 +53,20 @@ logger:
enabled: false enabled: false
initial: 100 initial: 100
thereafter: 100 thereafter: 100
interval: 1s
tags: tags:
- "expiration_delete_object" - "expiration_delete_object"
- "multipart_delete_object" - "multipart_delete_object"
``` ```
| Parameter | Type | SIGHUP reload | Default value | Description | | Parameter | Type | SIGHUP reload | Default value | Description |
|-----------------------|------------|---------------|---------------|--------------------------------------------------------------------------------------------------| |-----------------------|------------|---------------|---------------|----------------------------------------------------------------------------------------------------|
| `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. | | `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. |
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` | | `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
| `sampling.enabled` | `bool` | no | `false` | Enable sampling. | | `sampling.enabled` | `bool` | no | `false` | Enable sampling. |
| `sampling.initial` | `int` | no | `100` | Logs firth N of the same (level and message) log entries each second. | | `sampling.initial` | `int` | no | `100` | Logs firth N of the same (level and message) log entries each interval. |
| `sampling.thereafter` | `int` | no | `100` | Logs every Mth of the same (level and message) log entries after first N entries in that second. | | `sampling.thereafter` | `int` | no | `100` | Logs every Mth of the same (level and message) log entries after first N entries in that interval. |
| `sampling.interval` | `duration` | no | `1s` | Sampling interval. |
| `sampling.tags` | `[]string` | no | | Tagged log entries that should be additionally logged (available tags see in the next section). | | `sampling.tags` | `[]string` | no | | Tagged log entries that should be additionally logged (available tags see in the next section). |
## Tags ## Tags

View file

@ -25,6 +25,15 @@ var appMetricsDesc = map[string]map[string]Description{
VariableLabels: []string{"version"}, VariableLabels: []string{"version"},
}, },
}, },
statisticSubsystem: {
droppedLogs: Description{
Type: dto.MetricType_COUNTER,
Namespace: namespace,
Subsystem: statisticSubsystem,
Name: droppedLogs,
Help: "Dropped logs (by sampling) count",
},
},
} }
type Description struct { type Description struct {
@ -97,3 +106,12 @@ func mustNewGaugeVec(description Description) *prometheus.GaugeVec {
description.VariableLabels, description.VariableLabels,
) )
} }
func mustNewCounter(description Description) prometheus.Counter {
if description.Type != dto.MetricType_COUNTER {
panic("invalid metric type")
}
return prometheus.NewCounter(
prometheus.CounterOpts(newOpts(description)),
)
}

View file

@ -12,6 +12,7 @@ type (
// AppMetrics is a metrics container for all app specific data. // AppMetrics is a metrics container for all app specific data.
AppMetrics struct { AppMetrics struct {
stateMetrics stateMetrics
statisticMetrics
} }
// stateMetrics are metrics of application state. // stateMetrics are metrics of application state.
@ -19,6 +20,10 @@ type (
healthCheck prometheus.Gauge healthCheck prometheus.Gauge
versionInfo *prometheus.GaugeVec versionInfo *prometheus.GaugeVec
} }
statisticMetrics struct {
droppedLogs prometheus.Counter
}
) )
const ( const (
@ -27,10 +32,15 @@ const (
healthMetric = "health" healthMetric = "health"
versionInfoMetric = "version_info" versionInfoMetric = "version_info"
statisticSubsystem = "statistic"
droppedLogs = "dropped_logs"
) )
func (m stateMetrics) register() { func (m stateMetrics) register() {
prometheus.MustRegister(m.healthCheck) prometheus.MustRegister(m.healthCheck)
prometheus.MustRegister(m.versionInfo)
} }
func (m stateMetrics) SetHealth(s int32) { func (m stateMetrics) SetHealth(s int32) {
@ -41,13 +51,25 @@ func (m stateMetrics) SetVersion(ver string) {
m.versionInfo.WithLabelValues(ver).Set(1) m.versionInfo.WithLabelValues(ver).Set(1)
} }
func (m statisticMetrics) register() {
prometheus.MustRegister(m.droppedLogs)
}
func (m statisticMetrics) DroppedLogsInc() {
m.droppedLogs.Inc()
}
// NewAppMetrics creates an instance of application. // NewAppMetrics creates an instance of application.
func NewAppMetrics() *AppMetrics { func NewAppMetrics() *AppMetrics {
stateMetric := newStateMetrics() stateMetric := newStateMetrics()
stateMetric.register() stateMetric.register()
statisticMetric := newStatisticMetrics()
statisticMetric.register()
return &AppMetrics{ return &AppMetrics{
stateMetrics: *stateMetric, stateMetrics: *stateMetric,
statisticMetrics: *statisticMetric,
} }
} }
@ -58,6 +80,12 @@ func newStateMetrics() *stateMetrics {
} }
} }
func newStatisticMetrics() *statisticMetrics {
return &statisticMetrics{
droppedLogs: mustNewCounter(appMetricsDesc[statisticSubsystem][droppedLogs]),
}
}
// NewPrometheusService creates a new service for gathering prometheus metrics. // NewPrometheusService creates a new service for gathering prometheus metrics.
func NewPrometheusService(log *zap.Logger, cfg Config) *Service { func NewPrometheusService(log *zap.Logger, cfg Config) *Service {
if log == nil { if log == nil {