[#150] Add dropped logs metric
All checks were successful
/ DCO (pull_request) Successful in 2m19s
/ Builds (pull_request) Successful in 1m54s
/ Vulncheck (pull_request) Successful in 2m32s
/ Lint (pull_request) Successful in 2m46s
/ Tests (pull_request) Successful in 1m44s
/ Vulncheck (push) Successful in 1m21s
/ Builds (push) Successful in 1m33s
/ Lint (push) Successful in 3m23s
/ Tests (push) Successful in 2m5s

Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
This commit is contained in:
Pavel Pogodaev 2024-11-24 13:32:40 +03:00
parent a2f8cb6735
commit e81f01c2ab
6 changed files with 147 additions and 91 deletions

View file

@ -64,21 +64,24 @@ type (
metrics *gateMetrics metrics *gateMetrics
services []*metrics.Service services []*metrics.Service
settings *appSettings settings *appSettings
loggerSettings *loggerSettings
servers []Server servers []Server
unbindServers []ServerInfo unbindServers []ServerInfo
mu sync.RWMutex mu sync.RWMutex
} }
loggerSettings struct {
mu sync.RWMutex
appMetrics *metrics.GateMetrics
}
// App is an interface for the main gateway function. // App is an interface for the main gateway function.
App interface { App interface {
Wait() Wait()
Serve() Serve()
} }
// Option is an application option.
Option func(a *app)
gateMetrics struct { gateMetrics struct {
logger *zap.Logger logger *zap.Logger
provider *metrics.GateMetrics provider *metrics.GateMetrics
@ -119,38 +122,18 @@ type (
} }
) )
// WithLogger returns Option to set a specific logger. func newApp(ctx context.Context, v *viper.Viper) App {
func WithLogger(l *zap.Logger, lvl zap.AtomicLevel) Option { logSettings := &loggerSettings{}
return func(a *app) { log := pickLogger(v, logSettings)
if l == nil {
return
}
a.log = l
a.logLevel = lvl
}
}
// WithConfig returns Option to use specific Viper configuration.
func WithConfig(c *viper.Viper) Option {
return func(a *app) {
if c == nil {
return
}
a.cfg = c
}
}
func newApp(ctx context.Context, opt ...Option) App {
a := &app{ a := &app{
ctx: ctx, ctx: ctx,
log: zap.L(), log: log.logger,
cfg: viper.GetViper(), cfg: v,
loggerSettings: logSettings,
webServer: new(fasthttp.Server), webServer: new(fasthttp.Server),
webDone: make(chan struct{}), webDone: make(chan struct{}),
} }
for i := range opt {
opt[i](a)
}
a.initAppSettings() a.initAppSettings()
@ -227,6 +210,22 @@ func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
s.corsMaxAge = corsMaxAge s.corsMaxAge = corsMaxAge
} }
func (s *loggerSettings) DroppedLogsInc() {
s.mu.RLock()
defer s.mu.RUnlock()
if s.appMetrics != nil {
s.appMetrics.DroppedLogsInc()
}
}
func (s *loggerSettings) setMetrics(appMetrics *metrics.GateMetrics) {
s.mu.Lock()
defer s.mu.Unlock()
s.appMetrics = appMetrics
}
func (s *appSettings) DefaultTimestamp() bool { func (s *appSettings) DefaultTimestamp() bool {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
@ -338,6 +337,7 @@ func (a *app) initMetrics() {
gateMetricsProvider := metrics.NewGateMetrics(a.pool) gateMetricsProvider := metrics.NewGateMetrics(a.pool)
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting) a.metrics.SetHealth(metrics.HealthStatusStarting)
a.loggerSettings.setMetrics(a.metrics.provider)
} }
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics { func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {

View file

@ -34,7 +34,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait" "github.com/testcontainers/testcontainers-go/wait"
"go.uber.org/zap/zapcore"
) )
type putResponse struct { type putResponse struct {
@ -102,8 +101,7 @@ func runServer(pathToWallet string) (App, context.CancelFunc) {
v.Set(cfgWalletPath, pathToWallet) v.Set(cfgWalletPath, pathToWallet)
v.Set(cfgWalletPassphrase, "") v.Set(cfgWalletPassphrase, "")
l, lvl := newStdoutLogger(v, zapcore.DebugLevel) application := newApp(cancelCtx, v)
application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl))
go application.Serve() go application.Serve()
return application, cancel return application, cancel

View file

@ -9,9 +9,8 @@ import (
func main() { func main() {
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
v := settings() v := settings()
logger, atomicLevel := pickLogger(v)
application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v)) application := newApp(globalContext, v)
go application.Serve() go application.Serve()
application.Wait() application.Wait()
} }

View file

@ -182,6 +182,11 @@ var ignore = map[string]struct{}{
cmdVersion: {}, cmdVersion: {},
} }
type Logger struct {
logger *zap.Logger
lvl zap.AtomicLevel
}
func settings() *viper.Viper { func settings() *viper.Viper {
v := viper.New() v := viper.New()
v.AutomaticEnv() v.AutomaticEnv()
@ -418,7 +423,11 @@ func mergeConfig(v *viper.Viper, fileName string) error {
return v.MergeConfig(cfgFile) return v.MergeConfig(cfgFile)
} }
func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) { type LoggerAppSettings interface {
DroppedLogsInc()
}
func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger {
lvl, err := getLogLevel(v) lvl, err := getLogLevel(v)
if err != nil { if err != nil {
panic(err) panic(err)
@ -428,9 +437,9 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
switch dest { switch dest {
case destinationStdout: case destinationStdout:
return newStdoutLogger(v, lvl) return newStdoutLogger(v, lvl, settings)
case destinationJournald: case destinationJournald:
return newJournaldLogger(v, lvl) return newJournaldLogger(v, lvl, settings)
default: default:
panic(fmt.Sprintf("wrong destination for logger: %s", dest)) panic(fmt.Sprintf("wrong destination for logger: %s", dest))
} }
@ -447,18 +456,20 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
// Logger records a stack trace for all messages at or above fatal level. // Logger records a stack trace for all messages at or above fatal level.
// //
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace. // See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) { func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
stdout := zapcore.AddSync(os.Stderr) stdout := zapcore.AddSync(os.Stderr)
level := zap.NewAtomicLevelAt(lvl) level := zap.NewAtomicLevelAt(lvl)
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level) consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
consoleOutCore = samplingEnabling(v, consoleOutCore) consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings)
l := zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))) return &Logger{
return l, level logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
lvl: level,
}
} }
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) { func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
level := zap.NewAtomicLevelAt(lvl) level := zap.NewAtomicLevelAt(lvl)
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields) encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
@ -470,11 +481,12 @@ func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.Atom
zapjournald.SyslogPid(), zapjournald.SyslogPid(),
}) })
coreWithContext = samplingEnabling(v, coreWithContext) coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings)
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))) return &Logger{
logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
return l, level lvl: level,
}
} }
func newLogEncoder() zapcore.Encoder { func newLogEncoder() zapcore.Encoder {
@ -484,19 +496,17 @@ func newLogEncoder() zapcore.Encoder {
return zapcore.NewConsoleEncoder(c) return zapcore.NewConsoleEncoder(c)
} }
func samplingEnabling(v *viper.Viper, core zapcore.Core) zapcore.Core { func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core {
// Zap samples by logging the first cgfLoggerSamplingInitial entries with a given level
// and message within the specified time interval.
// In the above config, only the first cgfLoggerSamplingInitial log entries with the same level and message
// are recorded in cfgLoggerSamplingInterval interval. Every other log entry will be dropped within the interval since
// cfgLoggerSamplingThereafter is specified here.
if v.GetBool(cfgLoggerSamplingEnabled) { if v.GetBool(cfgLoggerSamplingEnabled) {
core = zapcore.NewSamplerWithOptions( core = zapcore.NewSamplerWithOptions(core,
core,
v.GetDuration(cfgLoggerSamplingInterval), v.GetDuration(cfgLoggerSamplingInterval),
v.GetInt(cfgLoggerSamplingInitial), v.GetInt(cfgLoggerSamplingInitial),
v.GetInt(cfgLoggerSamplingThereafter), v.GetInt(cfgLoggerSamplingThereafter),
) zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
if dec&zapcore.LogDropped > 0 {
settings.DroppedLogsInc()
}
}))
} }
return core return core

View file

@ -76,6 +76,15 @@ var appMetricsDesc = map[string]map[string]Description{
VariableLabels: []string{"endpoint"}, VariableLabels: []string{"endpoint"},
}, },
}, },
statisticSubsystem: {
droppedLogs: Description{
Type: dto.MetricType_COUNTER,
Namespace: namespace,
Subsystem: statisticSubsystem,
Name: droppedLogs,
Help: "Dropped logs (by sampling) count",
},
},
} }
type Description struct { type Description struct {
@ -148,3 +157,12 @@ func mustNewGaugeVec(description Description) *prometheus.GaugeVec {
description.VariableLabels, description.VariableLabels,
) )
} }
func mustNewCounter(description Description) prometheus.Counter {
if description.Type != dto.MetricType_COUNTER {
panic("invalid metric type")
}
return prometheus.NewCounter(
prometheus.CounterOpts(newOpts(description)),
)
}

View file

@ -14,11 +14,13 @@ const (
stateSubsystem = "state" stateSubsystem = "state"
poolSubsystem = "pool" poolSubsystem = "pool"
serverSubsystem = "server" serverSubsystem = "server"
statisticSubsystem = "statistic"
) )
const ( const (
healthMetric = "health" healthMetric = "health"
versionInfoMetric = "version_info" versionInfoMetric = "version_info"
droppedLogs = "dropped_logs"
) )
const ( const (
@ -35,8 +37,6 @@ const (
methodGetContainer = "get_container" methodGetContainer = "get_container"
methodListContainer = "list_container" methodListContainer = "list_container"
methodDeleteContainer = "delete_container" methodDeleteContainer = "delete_container"
methodGetContainerEacl = "get_container_eacl"
methodSetContainerEacl = "set_container_eacl"
methodEndpointInfo = "endpoint_info" methodEndpointInfo = "endpoint_info"
methodNetworkInfo = "network_info" methodNetworkInfo = "network_info"
methodPutObject = "put_object" methodPutObject = "put_object"
@ -69,6 +69,7 @@ type GateMetrics struct {
stateMetrics stateMetrics
poolMetricsCollector poolMetricsCollector
serverMetrics serverMetrics
statisticMetrics
} }
type stateMetrics struct { type stateMetrics struct {
@ -76,6 +77,10 @@ type stateMetrics struct {
versionInfo *prometheus.GaugeVec versionInfo *prometheus.GaugeVec
} }
type statisticMetrics struct {
droppedLogs prometheus.Counter
}
type poolMetricsCollector struct { type poolMetricsCollector struct {
scraper StatisticScraper scraper StatisticScraper
overallErrors prometheus.Gauge overallErrors prometheus.Gauge
@ -96,10 +101,14 @@ func NewGateMetrics(p StatisticScraper) *GateMetrics {
serverMetric := newServerMetrics() serverMetric := newServerMetrics()
serverMetric.register() serverMetric.register()
statsMetric := newStatisticMetrics()
statsMetric.register()
return &GateMetrics{ return &GateMetrics{
stateMetrics: *stateMetric, stateMetrics: *stateMetric,
poolMetricsCollector: *poolMetric, poolMetricsCollector: *poolMetric,
serverMetrics: *serverMetric, serverMetrics: *serverMetric,
statisticMetrics: *statsMetric,
} }
} }
@ -107,6 +116,7 @@ func (g *GateMetrics) Unregister() {
g.stateMetrics.unregister() g.stateMetrics.unregister()
prometheus.Unregister(&g.poolMetricsCollector) prometheus.Unregister(&g.poolMetricsCollector)
g.serverMetrics.unregister() g.serverMetrics.unregister()
g.statisticMetrics.unregister()
} }
func newStateMetrics() *stateMetrics { func newStateMetrics() *stateMetrics {
@ -116,6 +126,20 @@ func newStateMetrics() *stateMetrics {
} }
} }
func newStatisticMetrics() *statisticMetrics {
return &statisticMetrics{
droppedLogs: mustNewCounter(appMetricsDesc[statisticSubsystem][droppedLogs]),
}
}
func (s *statisticMetrics) register() {
prometheus.MustRegister(s.droppedLogs)
}
func (s *statisticMetrics) unregister() {
prometheus.Unregister(s.droppedLogs)
}
func (m stateMetrics) register() { func (m stateMetrics) register() {
prometheus.MustRegister(m.healthCheck) prometheus.MustRegister(m.healthCheck)
prometheus.MustRegister(m.versionInfo) prometheus.MustRegister(m.versionInfo)
@ -134,6 +158,13 @@ func (m stateMetrics) SetVersion(ver string) {
m.versionInfo.WithLabelValues(ver).Set(1) m.versionInfo.WithLabelValues(ver).Set(1)
} }
func (s *statisticMetrics) DroppedLogsInc() {
if s == nil {
return
}
s.droppedLogs.Inc()
}
func newPoolMetricsCollector(p StatisticScraper) *poolMetricsCollector { func newPoolMetricsCollector(p StatisticScraper) *poolMetricsCollector {
return &poolMetricsCollector{ return &poolMetricsCollector{
scraper: p, scraper: p,