feature/20-add_logs #21
12 changed files with 241 additions and 47 deletions
|
@ -49,13 +49,16 @@ const (
|
|||
HealthStatusShuttingDown int32 = 3
|
||||
)
|
||||
|
||||
func newApp(ctx context.Context, cfg *viper.Viper, log *Logger) *App {
|
||||
func newApp(ctx context.Context, cfg *viper.Viper) *App {
|
||||
appMetrics := metrics.NewAppMetrics()
|
||||
log := pickLogger(cfg, appMetrics)
|
||||
|
||||
a := &App{
|
||||
log: log.logger,
|
||||
logLevel: log.lvl,
|
||||
cfg: cfg,
|
||||
done: make(chan struct{}),
|
||||
appMetrics: metrics.NewAppMetrics(),
|
||||
appMetrics: appMetrics,
|
||||
settings: newAppSettings(cfg, log),
|
||||
}
|
||||
a.appMetrics.SetHealth(HealthStatusStarting)
|
||||
|
|
|
@ -2,8 +2,11 @@ package main
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/zapjournald"
|
||||
"github.com/nspcc-dev/neo-go/cli/options"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/ssgreg/journald"
|
||||
"go.uber.org/zap"
|
||||
|
@ -20,60 +23,117 @@ type Logger struct {
|
|||
lvl zap.AtomicLevel
|
||||
}
|
||||
|
||||
func pickLogger(v *viper.Viper) *Logger {
|
||||
lvl, err := getLogLevel(v.GetString(cfgLoggerLevel))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
func pickLogger(v *viper.Viper, appMetrics *metrics.AppMetrics) *Logger {
|
||||
switch dest := v.GetString(cfgLoggerDestination); dest {
|
||||
case destinationStdout:
|
||||
return newStdoutLogger(v, appMetrics)
|
||||
case destinationJournald:
|
||||
return newJournaldLogger(v, appMetrics)
|
||||
default:
|
||||
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
||||
}
|
||||
|
||||
dest := v.GetString(cfgLoggerDestination)
|
||||
|
||||
if dest == destinationStdout {
|
||||
return newStdoutLogger(lvl)
|
||||
}
|
||||
|
||||
if dest == destinationJournald {
|
||||
return newJournaldLogger(lvl)
|
||||
}
|
||||
|
||||
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
||||
}
|
||||
|
||||
func newStdoutLogger(lvl zapcore.Level) *Logger {
|
||||
c := zap.NewProductionConfig()
|
||||
c.Level = zap.NewAtomicLevelAt(lvl)
|
||||
c.Encoding = "console"
|
||||
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
func newStdoutLogger(v *viper.Viper, appMetrics *metrics.AppMetrics) *Logger {
|
||||
c := newZapLogConfig(v)
|
||||
|
||||
l, err := c.Build(
|
||||
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
||||
)
|
||||
out, errSink, err := openZapSinks(c)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("build zap logger instance: %v", err))
|
||||
panic(fmt.Sprintf("open zap sinks: %v", err.Error()))
|
||||
}
|
||||
|
||||
core := zapcore.NewCore(zapcore.NewConsoleEncoder(c.EncoderConfig), out, c.Level)
|
||||
core = applyZapCoreMiddlewares(core, v, appMetrics)
|
||||
l := zap.New(core, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.ErrorOutput(errSink))
|
||||
|
||||
return &Logger{logger: l, lvl: c.Level}
|
||||
}
|
||||
|
||||
func newJournaldLogger(lvl zapcore.Level) *Logger {
|
||||
c := zap.NewProductionConfig()
|
||||
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
c.Level = zap.NewAtomicLevelAt(lvl)
|
||||
func newJournaldLogger(v *viper.Viper, appMetrics *metrics.AppMetrics) *Logger {
|
||||
c := newZapLogConfig(v)
|
||||
|
||||
// We can use NewJSONEncoder instead if, say, frontend
|
||||
// would like to access journald logs and parse them easily.
|
||||
encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields)
|
||||
|
||||
core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
||||
coreWithContext := core.With([]zapcore.Field{
|
||||
journalCore := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
||||
core := journalCore.With([]zapcore.Field{
|
||||
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
||||
zapjournald.SyslogIdentifier(),
|
||||
zapjournald.SyslogPid(),
|
||||
})
|
||||
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
|
||||
core = applyZapCoreMiddlewares(core, v, appMetrics)
|
||||
l := zap.New(core, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
|
||||
return &Logger{logger: l, lvl: c.Level}
|
||||
}
|
||||
|
||||
func openZapSinks(cfg zap.Config) (zapcore.WriteSyncer, zapcore.WriteSyncer, error) {
|
||||
sink, closeOut, err := zap.Open(cfg.OutputPaths...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
errSink, _, err := zap.Open(cfg.ErrorOutputPaths...)
|
||||
if err != nil {
|
||||
closeOut()
|
||||
return nil, nil, err
|
||||
}
|
||||
return sink, errSink, nil
|
||||
}
|
||||
|
||||
func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, appMetrics *metrics.AppMetrics) zapcore.Core {
|
||||
core = options.NewFilteringCore(core, filteringLogOption(v))
|
||||
|
||||
if v.GetBool(cfgLoggerSamplingEnabled) {
|
||||
core = zapcore.NewSamplerWithOptions(core,
|
||||
v.GetDuration(cfgLoggerSamplingInterval),
|
||||
v.GetInt(cfgLoggerSamplingInitial),
|
||||
v.GetInt(cfgLoggerSamplingThereafter),
|
||||
zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
|
||||
if dec&zapcore.LogDropped > 0 {
|
||||
appMetrics.DroppedLogsInc()
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
return core
|
||||
}
|
||||
|
||||
func newZapLogConfig(v *viper.Viper) zap.Config {
|
||||
lvl, err := getLogLevel(v.GetString(cfgLoggerLevel))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c := zap.Config{
|
||||
Level: zap.NewAtomicLevelAt(lvl),
|
||||
EncoderConfig: zap.NewProductionEncoderConfig(),
|
||||
OutputPaths: []string{"stderr"},
|
||||
ErrorOutputPaths: []string{"stderr"},
|
||||
}
|
||||
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func filteringLogOption(v *viper.Viper) options.FilterFunc {
|
||||
tags := v.GetStringSlice(cfgLoggerTags)
|
||||
|
||||
return func(entry zapcore.Entry) bool {
|
||||
if !strings.HasPrefix(entry.Message, "tag:") {
|
||||
return true
|
||||
}
|
||||
|
||||
msg := entry.Message[4:] // len("tag:")
|
||||
for _, tag := range tags {
|
||||
if strings.HasPrefix(msg, tag) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func getLogLevel(lvlStr string) (zapcore.Level, error) {
|
||||
var lvl zapcore.Level
|
||||
err := lvl.UnmarshalText([]byte(lvlStr))
|
||||
|
|
|
@ -10,10 +10,7 @@ func main() {
|
|||
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
defer cancel()
|
||||
|
||||
cfg := settings()
|
||||
log := pickLogger(cfg)
|
||||
|
||||
app := newApp(ctx, cfg, log)
|
||||
app := newApp(ctx, settings())
|
||||
go app.Serve(ctx)
|
||||
app.Wait()
|
||||
}
|
||||
|
|
|
@ -36,8 +36,13 @@ const (
|
|||
cfgPprofAddress = "pprof.address"
|
||||
|
||||
// Logger.
|
||||
cfgLoggerLevel = "logger.level"
|
||||
cfgLoggerDestination = "logger.destination"
|
||||
cfgLoggerLevel = "logger.level"
|
||||
cfgLoggerDestination = "logger.destination"
|
||||
cfgLoggerSamplingEnabled = "logger.sampling.enabled"
|
||||
cfgLoggerSamplingInitial = "logger.sampling.initial"
|
||||
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
|
||||
alexvanin marked this conversation as resolved
|
||||
cfgLoggerSamplingInterval = "logger.sampling.interval"
|
||||
cfgLoggerTags = "logger.tags"
|
||||
|
||||
// Morph.
|
||||
cfgMorphRPCEndpointPrefixTmpl = "morph.rpc_endpoint.%d."
|
||||
|
@ -125,6 +130,9 @@ func settings() *viper.Viper {
|
|||
// logger:
|
||||
v.SetDefault(cfgLoggerLevel, "info")
|
||||
v.SetDefault(cfgLoggerDestination, "stdout")
|
||||
v.SetDefault(cfgLoggerSamplingThereafter, 100)
|
||||
v.SetDefault(cfgLoggerSamplingInitial, 100)
|
||||
v.SetDefault(cfgLoggerSamplingInterval, time.Second)
|
||||
|
||||
// services:
|
||||
v.SetDefault(cfgPrometheusEnabled, false)
|
||||
|
|
|
@ -9,6 +9,10 @@ S3_LIFECYCLER_WALLET_PASSPHRASE=pwd
|
|||
# Logger
|
||||
S3_LIFECYCLER_LOGGER_LEVEL=debug
|
||||
S3_LIFECYCLER_LOGGER_DESTINATION=stdout
|
||||
S3_LIFECYCLER_LOGGER_SAMPLING_ENABLED=false
|
||||
S3_LIFECYCLER_LOGGER_SAMPLING_INITIAL=100
|
||||
S3_LIFECYCLER_LOGGER_SAMPLING_THEREAFTER=100
|
||||
S3_LIFECYCLER_LOGGER_SAMPLING_INTERVAL=1s
|
||||
|
||||
# Metrics
|
||||
S3_LIFECYCLER_PPROF_ENABLED=false
|
||||
|
|
|
@ -7,6 +7,11 @@ wallet:
|
|||
logger:
|
||||
level: info # Log level.
|
||||
destination: stdout # Logging destination.
|
||||
sampling:
|
||||
enabled: false
|
||||
initial: 100
|
||||
thereafter: 100
|
||||
interval: 1s
|
||||
|
||||
pprof:
|
||||
enabled: false
|
||||
|
|
|
@ -47,14 +47,38 @@ wallet:
|
|||
|
||||
```yaml
|
||||
logger:
|
||||
level: debug
|
||||
level: info
|
||||
destination: stdout
|
||||
sampling:
|
||||
enabled: false
|
||||
initial: 100
|
||||
thereafter: 100
|
||||
interval: 1s
|
||||
tags:
|
||||
- "expiration_delete_object"
|
||||
- "multipart_delete_object"
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|---------------|----------|---------------|---------------|--------------------------------------------------------------------------------------|
|
||||
| `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. |
|
||||
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|-----------------------|------------|---------------|---------------|----------------------------------------------------------------------------------------------------|
|
||||
| `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. |
|
||||
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
|
||||
| `sampling.enabled` | `bool` | no | `false` | Enable sampling. |
|
||||
| `sampling.initial` | `int` | no | `100` | Logs firth N of the same (level and message) log entries each interval. |
|
||||
| `sampling.thereafter` | `int` | no | `100` | Logs every Mth of the same (level and message) log entries after first N entries in that interval. |
|
||||
| `sampling.interval` | `duration` | no | `1s` | Sampling interval. |
|
||||
| `sampling.tags` | `[]string` | no | | Tagged log entries that should be additionally logged (available tags see in the next section). |
|
||||
|
||||
## Tags
|
||||
|
||||
There are additional log entries that can hurt performance and can be additionally logged by using `logger.tags`
|
||||
parameter. Available tags:
|
||||
|
||||
* `expiration_delete_object`
|
||||
* `expiration_process_version`
|
||||
* `expiration_remove_version`
|
||||
* `multipart_delete_object`
|
||||
* `multipart_process_upload`
|
||||
|
||||
# `pprof` section
|
||||
|
||||
|
|
22
go.sum
22
go.sum
|
@ -25,6 +25,8 @@ git.frostfs.info/TrueCloudLab/tzhash v1.8.0/go.mod h1:dhY+oy274hV8wGvGL4MwwMpdL3
|
|||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 h1:HeY8n27VyPRQe49l/fzyVMkWEB2fsLJYKp64pwA7tz4=
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02/go.mod h1:rQFJJdEOV7KbbMtQYR2lNfiZk+ONRDJSbMCTWxKt8Fw=
|
||||
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
||||
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 h1:npHgfD4Tl2WJS3AJaMUi5ynGDPUBfkg3U3fCzDyXZ+4=
|
||||
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12/go.mod h1:pSwJ0fSY5KhvocuWSx4fz3BA8OrA1bQn+K1Eli3BRwM=
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8TVTI=
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
|
||||
github.com/aws/aws-sdk-go v1.44.6 h1:Y+uHxmZfhRTLX2X3khkdxCoTZAyGEX21aOUHe1U6geg=
|
||||
|
@ -41,6 +43,8 @@ github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj
|
|||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
github.com/chzyer/readline v1.5.1 h1:upd/6fQk4src78LMRzh5vItIt361/o4uq553V8B5sGI=
|
||||
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ=
|
||||
github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI=
|
||||
|
@ -108,6 +112,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw
|
|||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
|
@ -124,12 +130,24 @@ github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iP
|
|||
github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU=
|
||||
github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
|
||||
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
|
||||
github.com/nspcc-dev/dbft v0.2.0 h1:sDwsQES600OSIMncV176t2SX5OvB14lzeOAyKFOkbMI=
|
||||
github.com/nspcc-dev/dbft v0.2.0/go.mod h1:oFE6paSC/yfFh9mcNU6MheMGOYXK9+sPiRk3YMoz49o=
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 h1:mD9hU3v+zJcnHAVmHnZKt3I++tvn30gBj2rP2PocZMk=
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2/go.mod h1:U5VfmPNM88P4RORFb6KSUVBdJBDhlqggJZYGXGPxOcc=
|
||||
github.com/nspcc-dev/hrw v1.0.9 h1:17VcAuTtrstmFppBjfRiia4K2wA/ukXZhLFS8Y8rz5Y=
|
||||
github.com/nspcc-dev/hrw v1.0.9/go.mod h1:l/W2vx83vMQo6aStyx2AuZrJ+07lGv2JQGlVkPG06MU=
|
||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d h1:Vcb7YkZuUSSIC+WF/xV3UDfHbAxZgyT2zGleJP3Ig5k=
|
||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d/go.mod h1:/vrbWSHc7YS1KSYhVOyyeucXW/e+1DkVBOgnBEXUCeY=
|
||||
github.com/nspcc-dev/neofs-api-go/v2 v2.14.0 h1:jhuN8Ldqz7WApvUJRFY0bjRXE1R3iCkboMX5QVZhHVk=
|
||||
github.com/nspcc-dev/neofs-api-go/v2 v2.14.0/go.mod h1:DRIr0Ic1s+6QgdqmNFNLIqMqd7lNMJfYwkczlm1hDtM=
|
||||
github.com/nspcc-dev/neofs-crypto v0.4.0 h1:5LlrUAM5O0k1+sH/sktBtrgfWtq1pgpDs09fZo+KYi4=
|
||||
github.com/nspcc-dev/neofs-crypto v0.4.0/go.mod h1:6XJ8kbXgOfevbI2WMruOtI+qUJXNwSGM/E9eClXxPHs=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11 h1:QOc8ZRN5DXlAeRPh5QG9u8rMLgoeRNiZF5/vL7QupWg=
|
||||
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11/go.mod h1:W+ImTNRnSNMH8w43H1knCcIqwu7dLHePXtlJNZ7EFIs=
|
||||
github.com/nspcc-dev/rfc6979 v0.2.1 h1:8wWxkamHWFmO790GsewSoKUSJjVnL1fmdRpokU/RgRM=
|
||||
github.com/nspcc-dev/rfc6979 v0.2.1/go.mod h1:Tk7h5kyUWkhjyO3zUgFFhy1v2vQv3BvQEntakdtqrWc=
|
||||
github.com/nspcc-dev/tzhash v1.7.0 h1:/+aL33NC7y5OIGnY2kYgjZt8mg7LVGFMdj/KAJLndnk=
|
||||
github.com/nspcc-dev/tzhash v1.7.0/go.mod h1:Dnx9LUlOLr5paL2Rtc96x0PPs8D9eIkUtowt1n+KQus=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||
|
@ -148,6 +166,8 @@ github.com/panjf2000/ants/v2 v2.9.0 h1:SztCLkVxBRigbg+vt0S5QvF5vxAbxbKt09/YfAJ0t
|
|||
github.com/panjf2000/ants/v2 v2.9.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
|
||||
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
|
||||
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
|
||||
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
|
@ -170,6 +190,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
|
|||
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
|
||||
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
|
||||
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
|
||||
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
|
||||
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spf13/afero v1.11.0 h1:WJQKhtpdm3v2IzqG8VMqrr6Rf3UYpEF239Jy9wNepM8=
|
||||
github.com/spf13/afero v1.11.0/go.mod h1:GH9Y3pIexgf1MTIWtNGyogA5MwRIDXGUr+hbWNoBjkY=
|
||||
github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
|
||||
|
|
|
@ -122,6 +122,7 @@ LOOP:
|
|||
}
|
||||
|
||||
func (e *Executor) worker(ctx context.Context, job Job) error {
|
||||
e.log.Debug(logs.ProcessingJob, zap.String("user", job.PrivateKey.Address()), zap.String("cid", job.ContainerID.EncodeToString()))
|
||||
ctx = addBearerToContext(ctx, job.Bearer)
|
||||
|
||||
var userID user.ID
|
||||
|
@ -178,6 +179,7 @@ func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo,
|
|||
}
|
||||
|
||||
for _, multipart := range multiparts {
|
||||
e.log.Debug(logs.TagMultipartProcessUpload, zap.String("key", multipart.Key), zap.String("upload_id", multipart.UploadID), zap.Uint64("node_id", multipart.ID))
|
||||
if !matcherFn(multipart) {
|
||||
continue
|
||||
}
|
||||
|
@ -277,7 +279,7 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru
|
|||
|
||||
objectStream, err := e.tree.InitVersionsByPrefixStream(ctx, bktInfo, "", false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list multiparts: %w", err)
|
||||
return fmt.Errorf("list versions: %w", err)
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -297,6 +299,9 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru
|
|||
return fmt.Errorf("get node version from stream: %w", err)
|
||||
}
|
||||
|
||||
e.log.Debug(logs.TagExpirationProcessVersion, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", nodeVersion.OID),
|
||||
zap.String("filepath", nodeVersion.FilePath), zap.Uint64("node_id", nodeVersion.ID))
|
||||
|
||||
if nodeVersion.FilePath != latestObjName {
|
||||
if err = e.expireObject(ctx, versions, bktInfo, ni, matcherFn, settings); err != nil {
|
||||
e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err))
|
||||
|
@ -577,6 +582,8 @@ func (e *Executor) deleteObject(ctx context.Context, version *data.NodeVersion,
|
|||
var addr oid.Address
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(version.OID)
|
||||
|
||||
e.log.Debug(logs.TagExpirationDeleteObject, zap.Stringer("address", addr), zap.String("filepath", version.FilePath))
|
||||
if err := e.frostfs.DeleteObject(ctx, addr); err != nil && !isNotFound(err) {
|
||||
e.log.Warn(logs.DeleteObjectVersionFromStorage, zap.String("key", version.FilePath),
|
||||
zap.String("address", addr.EncodeToString()), zap.Error(err))
|
||||
|
@ -594,6 +601,9 @@ func (e *Executor) addDeleteMarker(ctx context.Context, version *data.NodeVersio
|
|||
return
|
||||
}
|
||||
|
||||
e.log.Debug(logs.AddDeleteMarker, zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("oid", randOID.EncodeToString()),
|
||||
zap.String("filepath", version.FilePath))
|
||||
|
||||
now := nowTime()
|
||||
newVersion := &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{
|
||||
|
@ -614,6 +624,8 @@ func (e *Executor) addDeleteMarker(ctx context.Context, version *data.NodeVersio
|
|||
}
|
||||
|
||||
func (e *Executor) removeVersion(ctx context.Context, version *data.NodeVersion, bktInfo *data.BucketInfo) {
|
||||
e.log.Debug(logs.TagExpirationRemoveVersion, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", version.OID),
|
||||
zap.String("filepath", version.FilePath), zap.Uint64("node_id", version.ID))
|
||||
if err := e.tree.RemoveVersion(ctx, bktInfo, version.ID); err != nil {
|
||||
e.log.Warn(logs.DeleteObjectVersionFromTree, zap.String("key", version.FilePath),
|
||||
zap.Uint64("id", version.ID), zap.Error(err))
|
||||
|
@ -687,6 +699,8 @@ func versionCreationEpoch(version *data.NodeVersion, ni *netmap.NetworkInfo) (ui
|
|||
}
|
||||
|
||||
func (e *Executor) abortMultipart(ctx context.Context, bktInfo *data.BucketInfo, multipart *data.MultipartInfo) error {
|
||||
e.log.Debug(logs.AbortMultipart, zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("key", multipart.Key),
|
||||
zap.String("upload_id", multipart.UploadID), zap.Uint64("node_id", multipart.ID))
|
||||
parts, err := e.tree.GetParts(ctx, bktInfo, multipart.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get parts: %w", err)
|
||||
|
@ -696,6 +710,7 @@ func (e *Executor) abortMultipart(ctx context.Context, bktInfo *data.BucketInfo,
|
|||
addr.SetContainer(bktInfo.CID)
|
||||
for _, part := range parts {
|
||||
addr.SetObject(part.OID)
|
||||
e.log.Debug(logs.TagMultipartDeleteObject, zap.Stringer("address", addr), zap.Int("part", part.Number))
|
||||
if err = e.frostfs.DeleteObject(ctx, addr); err != nil {
|
||||
return fmt.Errorf("delete object '%s': %w", addr.EncodeToString(), err)
|
||||
}
|
||||
|
|
|
@ -64,4 +64,14 @@ const (
|
|||
DeleteObjectVersionFromTree = "delete object version from tree"
|
||||
EpochMismatched = "epoch mismatched"
|
||||
UnknownCredentialSource = "unknown credential source to use"
|
||||
AbortMultipart = "abort multipart"
|
||||
ProcessingJob = "processing job"
|
||||
)
|
||||
|
||||
const (
|
||||
TagExpirationDeleteObject = "tag:expiration_delete_object"
|
||||
TagExpirationProcessVersion = "tag:expiration_process_version"
|
||||
TagExpirationRemoveVersion = "tag:expiration_remove_version"
|
||||
TagMultipartDeleteObject = "tag:multipart_delete_object"
|
||||
TagMultipartProcessUpload = "tag:multipart_process_upload"
|
||||
)
|
||||
|
|
|
@ -25,6 +25,15 @@ var appMetricsDesc = map[string]map[string]Description{
|
|||
VariableLabels: []string{"version"},
|
||||
},
|
||||
},
|
||||
statisticSubsystem: {
|
||||
droppedLogs: Description{
|
||||
Type: dto.MetricType_COUNTER,
|
||||
Namespace: namespace,
|
||||
Subsystem: statisticSubsystem,
|
||||
Name: droppedLogs,
|
||||
Help: "Dropped logs (by sampling) count",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
type Description struct {
|
||||
|
@ -97,3 +106,12 @@ func mustNewGaugeVec(description Description) *prometheus.GaugeVec {
|
|||
description.VariableLabels,
|
||||
)
|
||||
}
|
||||
|
||||
func mustNewCounter(description Description) prometheus.Counter {
|
||||
if description.Type != dto.MetricType_COUNTER {
|
||||
panic("invalid metric type")
|
||||
}
|
||||
return prometheus.NewCounter(
|
||||
prometheus.CounterOpts(newOpts(description)),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ type (
|
|||
// AppMetrics is a metrics container for all app specific data.
|
||||
AppMetrics struct {
|
||||
stateMetrics
|
||||
statisticMetrics
|
||||
}
|
||||
|
||||
// stateMetrics are metrics of application state.
|
||||
|
@ -19,6 +20,10 @@ type (
|
|||
healthCheck prometheus.Gauge
|
||||
versionInfo *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
statisticMetrics struct {
|
||||
droppedLogs prometheus.Counter
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -27,10 +32,15 @@ const (
|
|||
|
||||
healthMetric = "health"
|
||||
versionInfoMetric = "version_info"
|
||||
|
||||
statisticSubsystem = "statistic"
|
||||
|
||||
droppedLogs = "dropped_logs"
|
||||
)
|
||||
|
||||
func (m stateMetrics) register() {
|
||||
prometheus.MustRegister(m.healthCheck)
|
||||
prometheus.MustRegister(m.versionInfo)
|
||||
}
|
||||
|
||||
func (m stateMetrics) SetHealth(s int32) {
|
||||
|
@ -41,13 +51,25 @@ func (m stateMetrics) SetVersion(ver string) {
|
|||
m.versionInfo.WithLabelValues(ver).Set(1)
|
||||
}
|
||||
|
||||
func (m statisticMetrics) register() {
|
||||
prometheus.MustRegister(m.droppedLogs)
|
||||
}
|
||||
|
||||
func (m statisticMetrics) DroppedLogsInc() {
|
||||
m.droppedLogs.Inc()
|
||||
}
|
||||
|
||||
// NewAppMetrics creates an instance of application.
|
||||
func NewAppMetrics() *AppMetrics {
|
||||
stateMetric := newStateMetrics()
|
||||
stateMetric.register()
|
||||
|
||||
statisticMetric := newStatisticMetrics()
|
||||
statisticMetric.register()
|
||||
|
||||
return &AppMetrics{
|
||||
stateMetrics: *stateMetric,
|
||||
stateMetrics: *stateMetric,
|
||||
statisticMetrics: *statisticMetric,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,6 +80,12 @@ func newStateMetrics() *stateMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
func newStatisticMetrics() *statisticMetrics {
|
||||
return &statisticMetrics{
|
||||
droppedLogs: mustNewCounter(appMetricsDesc[statisticSubsystem][droppedLogs]),
|
||||
}
|
||||
}
|
||||
|
||||
// NewPrometheusService creates a new service for gathering prometheus metrics.
|
||||
func NewPrometheusService(log *zap.Logger, cfg Config) *Service {
|
||||
if log == nil {
|
||||
|
|
Loading…
Reference in a new issue
https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pulls/495/files introduces
interval
as well, what you think about it?