diff --git a/cmd/s3-lifecycler/app.go b/cmd/s3-lifecycler/app.go index 7a92268..0c010b1 100644 --- a/cmd/s3-lifecycler/app.go +++ b/cmd/s3-lifecycler/app.go @@ -49,13 +49,16 @@ const ( HealthStatusShuttingDown int32 = 3 ) -func newApp(ctx context.Context, cfg *viper.Viper, log *Logger) *App { +func newApp(ctx context.Context, cfg *viper.Viper) *App { + appMetrics := metrics.NewAppMetrics() + log := pickLogger(cfg, appMetrics) + a := &App{ log: log.logger, logLevel: log.lvl, cfg: cfg, done: make(chan struct{}), - appMetrics: metrics.NewAppMetrics(), + appMetrics: appMetrics, settings: newAppSettings(cfg, log), } a.appMetrics.SetHealth(HealthStatusStarting) diff --git a/cmd/s3-lifecycler/logger.go b/cmd/s3-lifecycler/logger.go index cdeb817..f87194d 100644 --- a/cmd/s3-lifecycler/logger.go +++ b/cmd/s3-lifecycler/logger.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/metrics" "git.frostfs.info/TrueCloudLab/zapjournald" "github.com/nspcc-dev/neo-go/cli/options" "github.com/spf13/viper" @@ -22,18 +23,18 @@ type Logger struct { lvl zap.AtomicLevel } -func pickLogger(v *viper.Viper) *Logger { +func pickLogger(v *viper.Viper, appMetrics *metrics.AppMetrics) *Logger { switch dest := v.GetString(cfgLoggerDestination); dest { case destinationStdout: - return newStdoutLogger(v) + return newStdoutLogger(v, appMetrics) case destinationJournald: - return newJournaldLogger(v) + return newJournaldLogger(v, appMetrics) default: panic(fmt.Sprintf("wrong destination for logger: %s", dest)) } } -func newStdoutLogger(v *viper.Viper) *Logger { +func newStdoutLogger(v *viper.Viper, appMetrics *metrics.AppMetrics) *Logger { c := newZapLogConfig(v) out, errSink, err := openZapSinks(c) @@ -42,27 +43,27 @@ func newStdoutLogger(v *viper.Viper) *Logger { } core := zapcore.NewCore(zapcore.NewConsoleEncoder(c.EncoderConfig), out, c.Level) - core = options.NewFilteringCore(core, filteringLogOption(v)) + core = applyZapCoreMiddlewares(core, v, appMetrics) l := zap.New(core, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.ErrorOutput(errSink)) return &Logger{logger: l, lvl: c.Level} } -func newJournaldLogger(v *viper.Viper) *Logger { +func newJournaldLogger(v *viper.Viper, appMetrics *metrics.AppMetrics) *Logger { c := newZapLogConfig(v) // We can use NewJSONEncoder instead if, say, frontend // would like to access journald logs and parse them easily. encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields) - core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields) - filteringCore := options.NewFilteringCore(core, filteringLogOption(v)) - coreWithContext := filteringCore.With([]zapcore.Field{ + journalCore := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields) + core := journalCore.With([]zapcore.Field{ zapjournald.SyslogFacility(zapjournald.LogDaemon), zapjournald.SyslogIdentifier(), zapjournald.SyslogPid(), }) - l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))) + core = applyZapCoreMiddlewares(core, v, appMetrics) + l := zap.New(core, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))) return &Logger{logger: l, lvl: c.Level} } @@ -79,24 +80,37 @@ func openZapSinks(cfg zap.Config) (zapcore.WriteSyncer, zapcore.WriteSyncer, err return sink, errSink, nil } +func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, appMetrics *metrics.AppMetrics) zapcore.Core { + core = options.NewFilteringCore(core, filteringLogOption(v)) + + if v.GetBool(cfgLoggerSamplingEnabled) { + core = zapcore.NewSamplerWithOptions(core, + v.GetDuration(cfgLoggerSamplingInterval), + v.GetInt(cfgLoggerSamplingInitial), + v.GetInt(cfgLoggerSamplingThereafter), + zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) { + if dec&zapcore.LogDropped > 0 { + appMetrics.DroppedLogsInc() + } + })) + } + + return core +} + func newZapLogConfig(v *viper.Viper) zap.Config { lvl, err := getLogLevel(v.GetString(cfgLoggerLevel)) if err != nil { panic(err) } - c := zap.NewProductionConfig() - c.Level = zap.NewAtomicLevelAt(lvl) - c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - - if v.GetBool(cfgLoggerSamplingEnabled) { - c.Sampling = &zap.SamplingConfig{ - Initial: v.GetInt(cfgLoggerSamplingInitial), - Thereafter: v.GetInt(cfgLoggerSamplingThereafter), - } - } else { - c.Sampling = nil + c := zap.Config{ + Level: zap.NewAtomicLevelAt(lvl), + EncoderConfig: zap.NewProductionEncoderConfig(), + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, } + c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder return c } diff --git a/cmd/s3-lifecycler/main.go b/cmd/s3-lifecycler/main.go index ccfb655..ed43b73 100644 --- a/cmd/s3-lifecycler/main.go +++ b/cmd/s3-lifecycler/main.go @@ -10,10 +10,7 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() - cfg := settings() - log := pickLogger(cfg) - - app := newApp(ctx, cfg, log) + app := newApp(ctx, settings()) go app.Serve(ctx) app.Wait() } diff --git a/cmd/s3-lifecycler/settings.go b/cmd/s3-lifecycler/settings.go index 2298b2b..6d5950f 100644 --- a/cmd/s3-lifecycler/settings.go +++ b/cmd/s3-lifecycler/settings.go @@ -41,6 +41,7 @@ const ( cfgLoggerSamplingEnabled = "logger.sampling.enabled" cfgLoggerSamplingInitial = "logger.sampling.initial" cfgLoggerSamplingThereafter = "logger.sampling.thereafter" + cfgLoggerSamplingInterval = "logger.sampling.interval" cfgLoggerTags = "logger.tags" // Morph. @@ -131,6 +132,7 @@ func settings() *viper.Viper { v.SetDefault(cfgLoggerDestination, "stdout") v.SetDefault(cfgLoggerSamplingThereafter, 100) v.SetDefault(cfgLoggerSamplingInitial, 100) + v.SetDefault(cfgLoggerSamplingInterval, time.Second) // services: v.SetDefault(cfgPrometheusEnabled, false) diff --git a/config/config.env b/config/config.env index 26d7da9..64c841f 100644 --- a/config/config.env +++ b/config/config.env @@ -12,6 +12,7 @@ S3_LIFECYCLER_LOGGER_DESTINATION=stdout S3_LIFECYCLER_LOGGER_SAMPLING_ENABLED=false S3_LIFECYCLER_LOGGER_SAMPLING_INITIAL=100 S3_LIFECYCLER_LOGGER_SAMPLING_THEREAFTER=100 +S3_LIFECYCLER_LOGGER_SAMPLING_INTERVAL=1s # Metrics S3_LIFECYCLER_PPROF_ENABLED=false diff --git a/config/config.yaml b/config/config.yaml index 2f8c48d..721abb9 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -11,6 +11,7 @@ logger: enabled: false initial: 100 thereafter: 100 + interval: 1s pprof: enabled: false diff --git a/docs/configuration.md b/docs/configuration.md index 5f066b1..33bc0f0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -53,19 +53,21 @@ logger: enabled: false initial: 100 thereafter: 100 + interval: 1s tags: - "expiration_delete_object" - "multipart_delete_object" ``` -| Parameter | Type | SIGHUP reload | Default value | Description | -|-----------------------|------------|---------------|---------------|--------------------------------------------------------------------------------------------------| -| `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. | -| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` | -| `sampling.enabled` | `bool` | no | `false` | Enable sampling. | -| `sampling.initial` | `int` | no | `100` | Logs firth N of the same (level and message) log entries each second. | -| `sampling.thereafter` | `int` | no | `100` | Logs every Mth of the same (level and message) log entries after first N entries in that second. | -| `sampling.tags` | `[]string` | no | | Tagged log entries that should be additionally logged (available tags see in the next section). | +| Parameter | Type | SIGHUP reload | Default value | Description | +|-----------------------|------------|---------------|---------------|----------------------------------------------------------------------------------------------------| +| `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. | +| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` | +| `sampling.enabled` | `bool` | no | `false` | Enable sampling. | +| `sampling.initial` | `int` | no | `100` | Logs firth N of the same (level and message) log entries each interval. | +| `sampling.thereafter` | `int` | no | `100` | Logs every Mth of the same (level and message) log entries after first N entries in that interval. | +| `sampling.interval` | `duration` | no | `1s` | Sampling interval. | +| `sampling.tags` | `[]string` | no | | Tagged log entries that should be additionally logged (available tags see in the next section). | ## Tags diff --git a/internal/metrics/desc.go b/internal/metrics/desc.go index 4f33014..4de61a4 100644 --- a/internal/metrics/desc.go +++ b/internal/metrics/desc.go @@ -25,6 +25,15 @@ var appMetricsDesc = map[string]map[string]Description{ VariableLabels: []string{"version"}, }, }, + statisticSubsystem: { + droppedLogs: Description{ + Type: dto.MetricType_COUNTER, + Namespace: namespace, + Subsystem: statisticSubsystem, + Name: droppedLogs, + Help: "Dropped logs (by sampling) count", + }, + }, } type Description struct { @@ -97,3 +106,12 @@ func mustNewGaugeVec(description Description) *prometheus.GaugeVec { description.VariableLabels, ) } + +func mustNewCounter(description Description) prometheus.Counter { + if description.Type != dto.MetricType_COUNTER { + panic("invalid metric type") + } + return prometheus.NewCounter( + prometheus.CounterOpts(newOpts(description)), + ) +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 9474d8b..240b081 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -12,6 +12,7 @@ type ( // AppMetrics is a metrics container for all app specific data. AppMetrics struct { stateMetrics + statisticMetrics } // stateMetrics are metrics of application state. @@ -19,6 +20,10 @@ type ( healthCheck prometheus.Gauge versionInfo *prometheus.GaugeVec } + + statisticMetrics struct { + droppedLogs prometheus.Counter + } ) const ( @@ -27,10 +32,15 @@ const ( healthMetric = "health" versionInfoMetric = "version_info" + + statisticSubsystem = "statistic" + + droppedLogs = "dropped_logs" ) func (m stateMetrics) register() { prometheus.MustRegister(m.healthCheck) + prometheus.MustRegister(m.versionInfo) } func (m stateMetrics) SetHealth(s int32) { @@ -41,13 +51,25 @@ func (m stateMetrics) SetVersion(ver string) { m.versionInfo.WithLabelValues(ver).Set(1) } +func (m statisticMetrics) register() { + prometheus.MustRegister(m.droppedLogs) +} + +func (m statisticMetrics) DroppedLogsInc() { + m.droppedLogs.Inc() +} + // NewAppMetrics creates an instance of application. func NewAppMetrics() *AppMetrics { stateMetric := newStateMetrics() stateMetric.register() + statisticMetric := newStatisticMetrics() + statisticMetric.register() + return &AppMetrics{ - stateMetrics: *stateMetric, + stateMetrics: *stateMetric, + statisticMetrics: *statisticMetric, } } @@ -58,6 +80,12 @@ func newStateMetrics() *stateMetrics { } } +func newStatisticMetrics() *statisticMetrics { + return &statisticMetrics{ + droppedLogs: mustNewCounter(appMetricsDesc[statisticSubsystem][droppedLogs]), + } +} + // NewPrometheusService creates a new service for gathering prometheus metrics. func NewPrometheusService(log *zap.Logger, cfg Config) *Service { if log == nil {