From e81f01c2abfb0958da757065c3d795a7b11e14bb Mon Sep 17 00:00:00 2001
From: Pavel Pogodaev
Date: Sun, 24 Nov 2024 13:32:40 +0300
Subject: [PATCH] [#150] Add dropped logs metric
Signed-off-by: Pavel Pogodaev
---
cmd/http-gw/app.go | 92 ++++++++++++++++-----------------
cmd/http-gw/integration_test.go | 4 +-
cmd/http-gw/main.go | 3 +-
cmd/http-gw/settings.go | 52 +++++++++++--------
metrics/desc.go | 18 +++++++
metrics/metrics.go | 69 ++++++++++++++++++-------
6 files changed, 147 insertions(+), 91 deletions(-)
diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go
index 0dd53a6..6ac9b1c 100644
--- a/cmd/http-gw/app.go
+++ b/cmd/http-gw/app.go
@@ -50,35 +50,38 @@ import (
type (
app struct {
- ctx context.Context
- log *zap.Logger
- logLevel zap.AtomicLevel
- pool *pool.Pool
- treePool *treepool.Pool
- key *keys.PrivateKey
- owner *user.ID
- cfg *viper.Viper
- webServer *fasthttp.Server
- webDone chan struct{}
- resolver *resolver.ContainerResolver
- metrics *gateMetrics
- services []*metrics.Service
- settings *appSettings
+ ctx context.Context
+ log *zap.Logger
+ logLevel zap.AtomicLevel
+ pool *pool.Pool
+ treePool *treepool.Pool
+ key *keys.PrivateKey
+ owner *user.ID
+ cfg *viper.Viper
+ webServer *fasthttp.Server
+ webDone chan struct{}
+ resolver *resolver.ContainerResolver
+ metrics *gateMetrics
+ services []*metrics.Service
+ settings *appSettings
+ loggerSettings *loggerSettings
servers []Server
unbindServers []ServerInfo
mu sync.RWMutex
}
+ loggerSettings struct {
+ mu sync.RWMutex
+ appMetrics *metrics.GateMetrics
+ }
+
// App is an interface for the main gateway function.
App interface {
Wait()
Serve()
}
- // Option is an application option.
- Option func(a *app)
-
gateMetrics struct {
logger *zap.Logger
provider *metrics.GateMetrics
@@ -119,37 +122,17 @@ type (
}
)
-// WithLogger returns Option to set a specific logger.
-func WithLogger(l *zap.Logger, lvl zap.AtomicLevel) Option {
- return func(a *app) {
- if l == nil {
- return
- }
- a.log = l
- a.logLevel = lvl
- }
-}
+func newApp(ctx context.Context, v *viper.Viper) App {
+ logSettings := &loggerSettings{}
+ log := pickLogger(v, logSettings)
-// WithConfig returns Option to use specific Viper configuration.
-func WithConfig(c *viper.Viper) Option {
- return func(a *app) {
- if c == nil {
- return
- }
- a.cfg = c
- }
-}
-
-func newApp(ctx context.Context, opt ...Option) App {
a := &app{
- ctx: ctx,
- log: zap.L(),
- cfg: viper.GetViper(),
- webServer: new(fasthttp.Server),
- webDone: make(chan struct{}),
- }
- for i := range opt {
- opt[i](a)
+ ctx: ctx,
+ log: log.logger,
+ cfg: v,
+ loggerSettings: logSettings,
+ webServer: new(fasthttp.Server),
+ webDone: make(chan struct{}),
}
a.initAppSettings()
@@ -227,6 +210,22 @@ func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
s.corsMaxAge = corsMaxAge
}
+func (s *loggerSettings) DroppedLogsInc() {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ if s.appMetrics != nil {
+ s.appMetrics.DroppedLogsInc()
+ }
+}
+
+func (s *loggerSettings) setMetrics(appMetrics *metrics.GateMetrics) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ s.appMetrics = appMetrics
+}
+
func (s *appSettings) DefaultTimestamp() bool {
s.mu.RLock()
defer s.mu.RUnlock()
@@ -338,6 +337,7 @@ func (a *app) initMetrics() {
gateMetricsProvider := metrics.NewGateMetrics(a.pool)
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting)
+ a.loggerSettings.setMetrics(a.metrics.provider)
}
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
diff --git a/cmd/http-gw/integration_test.go b/cmd/http-gw/integration_test.go
index 79a2da5..1516f1d 100644
--- a/cmd/http-gw/integration_test.go
+++ b/cmd/http-gw/integration_test.go
@@ -34,7 +34,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
- "go.uber.org/zap/zapcore"
)
type putResponse struct {
@@ -102,8 +101,7 @@ func runServer(pathToWallet string) (App, context.CancelFunc) {
v.Set(cfgWalletPath, pathToWallet)
v.Set(cfgWalletPassphrase, "")
- l, lvl := newStdoutLogger(v, zapcore.DebugLevel)
- application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl))
+ application := newApp(cancelCtx, v)
go application.Serve()
return application, cancel
diff --git a/cmd/http-gw/main.go b/cmd/http-gw/main.go
index ea9fbd7..fdd148c 100644
--- a/cmd/http-gw/main.go
+++ b/cmd/http-gw/main.go
@@ -9,9 +9,8 @@ import (
func main() {
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
v := settings()
- logger, atomicLevel := pickLogger(v)
- application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v))
+ application := newApp(globalContext, v)
go application.Serve()
application.Wait()
}
diff --git a/cmd/http-gw/settings.go b/cmd/http-gw/settings.go
index 316c500..2298124 100644
--- a/cmd/http-gw/settings.go
+++ b/cmd/http-gw/settings.go
@@ -182,6 +182,11 @@ var ignore = map[string]struct{}{
cmdVersion: {},
}
+type Logger struct {
+ logger *zap.Logger
+ lvl zap.AtomicLevel
+}
+
func settings() *viper.Viper {
v := viper.New()
v.AutomaticEnv()
@@ -418,7 +423,11 @@ func mergeConfig(v *viper.Viper, fileName string) error {
return v.MergeConfig(cfgFile)
}
-func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
+type LoggerAppSettings interface {
+ DroppedLogsInc()
+}
+
+func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger {
lvl, err := getLogLevel(v)
if err != nil {
panic(err)
@@ -428,9 +437,9 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
switch dest {
case destinationStdout:
- return newStdoutLogger(v, lvl)
+ return newStdoutLogger(v, lvl, settings)
case destinationJournald:
- return newJournaldLogger(v, lvl)
+ return newJournaldLogger(v, lvl, settings)
default:
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
}
@@ -447,18 +456,20 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
// Logger records a stack trace for all messages at or above fatal level.
//
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
-func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
+func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
stdout := zapcore.AddSync(os.Stderr)
level := zap.NewAtomicLevelAt(lvl)
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
- consoleOutCore = samplingEnabling(v, consoleOutCore)
+ consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings)
- l := zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
- return l, level
+ return &Logger{
+ logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
+ lvl: level,
+ }
}
-func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
+func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
level := zap.NewAtomicLevelAt(lvl)
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
@@ -470,11 +481,12 @@ func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.Atom
zapjournald.SyslogPid(),
})
- coreWithContext = samplingEnabling(v, coreWithContext)
+ coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings)
- l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
-
- return l, level
+ return &Logger{
+ logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
+ lvl: level,
+ }
}
func newLogEncoder() zapcore.Encoder {
@@ -484,19 +496,17 @@ func newLogEncoder() zapcore.Encoder {
return zapcore.NewConsoleEncoder(c)
}
-func samplingEnabling(v *viper.Viper, core zapcore.Core) zapcore.Core {
- // Zap samples by logging the first cgfLoggerSamplingInitial entries with a given level
- // and message within the specified time interval.
- // In the above config, only the first cgfLoggerSamplingInitial log entries with the same level and message
- // are recorded in cfgLoggerSamplingInterval interval. Every other log entry will be dropped within the interval since
- // cfgLoggerSamplingThereafter is specified here.
+func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core {
if v.GetBool(cfgLoggerSamplingEnabled) {
- core = zapcore.NewSamplerWithOptions(
- core,
+ core = zapcore.NewSamplerWithOptions(core,
v.GetDuration(cfgLoggerSamplingInterval),
v.GetInt(cfgLoggerSamplingInitial),
v.GetInt(cfgLoggerSamplingThereafter),
- )
+ zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
+ if dec&zapcore.LogDropped > 0 {
+ settings.DroppedLogsInc()
+ }
+ }))
}
return core
diff --git a/metrics/desc.go b/metrics/desc.go
index e10050c..a00ab3e 100644
--- a/metrics/desc.go
+++ b/metrics/desc.go
@@ -76,6 +76,15 @@ var appMetricsDesc = map[string]map[string]Description{
VariableLabels: []string{"endpoint"},
},
},
+ statisticSubsystem: {
+ droppedLogs: Description{
+ Type: dto.MetricType_COUNTER,
+ Namespace: namespace,
+ Subsystem: statisticSubsystem,
+ Name: droppedLogs,
+ Help: "Dropped logs (by sampling) count",
+ },
+ },
}
type Description struct {
@@ -148,3 +157,12 @@ func mustNewGaugeVec(description Description) *prometheus.GaugeVec {
description.VariableLabels,
)
}
+
+func mustNewCounter(description Description) prometheus.Counter {
+ if description.Type != dto.MetricType_COUNTER {
+ panic("invalid metric type")
+ }
+ return prometheus.NewCounter(
+ prometheus.CounterOpts(newOpts(description)),
+ )
+}
diff --git a/metrics/metrics.go b/metrics/metrics.go
index b516477..1c06868 100644
--- a/metrics/metrics.go
+++ b/metrics/metrics.go
@@ -10,15 +10,17 @@ import (
)
const (
- namespace = "frostfs_http_gw"
- stateSubsystem = "state"
- poolSubsystem = "pool"
- serverSubsystem = "server"
+ namespace = "frostfs_http_gw"
+ stateSubsystem = "state"
+ poolSubsystem = "pool"
+ serverSubsystem = "server"
+ statisticSubsystem = "statistic"
)
const (
healthMetric = "health"
versionInfoMetric = "version_info"
+ droppedLogs = "dropped_logs"
)
const (
@@ -30,21 +32,19 @@ const (
)
const (
- methodGetBalance = "get_balance"
- methodPutContainer = "put_container"
- methodGetContainer = "get_container"
- methodListContainer = "list_container"
- methodDeleteContainer = "delete_container"
- methodGetContainerEacl = "get_container_eacl"
- methodSetContainerEacl = "set_container_eacl"
- methodEndpointInfo = "endpoint_info"
- methodNetworkInfo = "network_info"
- methodPutObject = "put_object"
- methodDeleteObject = "delete_object"
- methodGetObject = "get_object"
- methodHeadObject = "head_object"
- methodRangeObject = "range_object"
- methodCreateSession = "create_session"
+ methodGetBalance = "get_balance"
+ methodPutContainer = "put_container"
+ methodGetContainer = "get_container"
+ methodListContainer = "list_container"
+ methodDeleteContainer = "delete_container"
+ methodEndpointInfo = "endpoint_info"
+ methodNetworkInfo = "network_info"
+ methodPutObject = "put_object"
+ methodDeleteObject = "delete_object"
+ methodGetObject = "get_object"
+ methodHeadObject = "head_object"
+ methodRangeObject = "range_object"
+ methodCreateSession = "create_session"
)
// HealthStatus of the gate application.
@@ -69,6 +69,7 @@ type GateMetrics struct {
stateMetrics
poolMetricsCollector
serverMetrics
+ statisticMetrics
}
type stateMetrics struct {
@@ -76,6 +77,10 @@ type stateMetrics struct {
versionInfo *prometheus.GaugeVec
}
+type statisticMetrics struct {
+ droppedLogs prometheus.Counter
+}
+
type poolMetricsCollector struct {
scraper StatisticScraper
overallErrors prometheus.Gauge
@@ -96,10 +101,14 @@ func NewGateMetrics(p StatisticScraper) *GateMetrics {
serverMetric := newServerMetrics()
serverMetric.register()
+ statsMetric := newStatisticMetrics()
+ statsMetric.register()
+
return &GateMetrics{
stateMetrics: *stateMetric,
poolMetricsCollector: *poolMetric,
serverMetrics: *serverMetric,
+ statisticMetrics: *statsMetric,
}
}
@@ -107,6 +116,7 @@ func (g *GateMetrics) Unregister() {
g.stateMetrics.unregister()
prometheus.Unregister(&g.poolMetricsCollector)
g.serverMetrics.unregister()
+ g.statisticMetrics.unregister()
}
func newStateMetrics() *stateMetrics {
@@ -116,6 +126,20 @@ func newStateMetrics() *stateMetrics {
}
}
+func newStatisticMetrics() *statisticMetrics {
+ return &statisticMetrics{
+ droppedLogs: mustNewCounter(appMetricsDesc[statisticSubsystem][droppedLogs]),
+ }
+}
+
+func (s *statisticMetrics) register() {
+ prometheus.MustRegister(s.droppedLogs)
+}
+
+func (s *statisticMetrics) unregister() {
+ prometheus.Unregister(s.droppedLogs)
+}
+
func (m stateMetrics) register() {
prometheus.MustRegister(m.healthCheck)
prometheus.MustRegister(m.versionInfo)
@@ -134,6 +158,13 @@ func (m stateMetrics) SetVersion(ver string) {
m.versionInfo.WithLabelValues(ver).Set(1)
}
+func (s *statisticMetrics) DroppedLogsInc() {
+ if s == nil {
+ return
+ }
+ s.droppedLogs.Inc()
+}
+
func newPoolMetricsCollector(p StatisticScraper) *poolMetricsCollector {
return &poolMetricsCollector{
scraper: p,