From 2f7a73ee36de51d0671bf123030ef15ce9a5d96d Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Mon, 23 Sep 2024 17:38:40 +0300 Subject: [PATCH] [#20] Support logging with tags Allow to log additionally "tagged" log entries. Signed-off-by: Denis Kirillov --- cmd/s3-lifecycler/logger.go | 48 +++++++++++++++++++++++++++++----- cmd/s3-lifecycler/settings.go | 1 + docs/configuration.md | 29 +++++++++++++++----- internal/lifecycle/executor.go | 10 ++++--- internal/logs/logs.go | 10 +++++-- 5 files changed, 80 insertions(+), 18 deletions(-) diff --git a/cmd/s3-lifecycler/logger.go b/cmd/s3-lifecycler/logger.go index 9ca5be3..8eb0f89 100644 --- a/cmd/s3-lifecycler/logger.go +++ b/cmd/s3-lifecycler/logger.go @@ -2,8 +2,10 @@ package main import ( "fmt" + "strings" "git.frostfs.info/TrueCloudLab/zapjournald" + "github.com/nspcc-dev/neo-go/cli/options" "github.com/spf13/viper" "github.com/ssgreg/journald" "go.uber.org/zap" @@ -33,18 +35,32 @@ func pickLogger(v *viper.Viper) *Logger { func newStdoutLogger(v *viper.Viper) *Logger { c := newZapLogConfig(v) - c.Encoding = "console" - l, err := c.Build( - zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), - ) + out, errSink, err := openZapSinks(c) if err != nil { - panic(fmt.Sprintf("build zap logger instance: %v", err)) + panic(fmt.Sprintf("open zap sinks: %v", err.Error())) } + core := zapcore.NewCore(zapcore.NewConsoleEncoder(c.EncoderConfig), out, c.Level) + core = options.NewFilteringCore(core, filteringLogOption(v)) + l := zap.New(core, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.ErrorOutput(errSink)) + return &Logger{logger: l, lvl: c.Level} } +func openZapSinks(cfg zap.Config) (zapcore.WriteSyncer, zapcore.WriteSyncer, error) { + sink, closeOut, err := zap.Open(cfg.OutputPaths...) + if err != nil { + return nil, nil, err + } + errSink, _, err := zap.Open(cfg.ErrorOutputPaths...) + if err != nil { + closeOut() + return nil, nil, err + } + return sink, errSink, nil +} + func newJournaldLogger(v *viper.Viper) *Logger { c := newZapLogConfig(v) @@ -53,7 +69,8 @@ func newJournaldLogger(v *viper.Viper) *Logger { encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields) core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields) - coreWithContext := core.With([]zapcore.Field{ + filteringCore := options.NewFilteringCore(core, filteringLogOption(v)) + coreWithContext := filteringCore.With([]zapcore.Field{ zapjournald.SyslogFacility(zapjournald.LogDaemon), zapjournald.SyslogIdentifier(), zapjournald.SyslogPid(), @@ -84,6 +101,25 @@ func newZapLogConfig(v *viper.Viper) zap.Config { return c } +func filteringLogOption(v *viper.Viper) options.FilterFunc { + tags := v.GetStringSlice(cfgLoggerTags) + + return func(entry zapcore.Entry) bool { + if !strings.HasPrefix(entry.Message, "tag:") { + return true + } + + msg := entry.Message[4:] // len("tag:") + for _, tag := range tags { + if msg == tag { + return true + } + } + + return false + } +} + func getLogLevel(lvlStr string) (zapcore.Level, error) { var lvl zapcore.Level err := lvl.UnmarshalText([]byte(lvlStr)) diff --git a/cmd/s3-lifecycler/settings.go b/cmd/s3-lifecycler/settings.go index cb650c1..2298b2b 100644 --- a/cmd/s3-lifecycler/settings.go +++ b/cmd/s3-lifecycler/settings.go @@ -41,6 +41,7 @@ const ( cfgLoggerSamplingEnabled = "logger.sampling.enabled" cfgLoggerSamplingInitial = "logger.sampling.initial" cfgLoggerSamplingThereafter = "logger.sampling.thereafter" + cfgLoggerTags = "logger.tags" // Morph. cfgMorphRPCEndpointPrefixTmpl = "morph.rpc_endpoint.%d." diff --git a/docs/configuration.md b/docs/configuration.md index 00c3f96..5f066b1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -53,15 +53,30 @@ logger: enabled: false initial: 100 thereafter: 100 + tags: + - "expiration_delete_object" + - "multipart_delete_object" ``` -| Parameter | Type | SIGHUP reload | Default value | Description | -|-----------------------|----------|---------------|---------------|--------------------------------------------------------------------------------------------------| -| `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. | -| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` | -| `sampling.enabled` | `bool` | no | `false` | Enable sampling. | -| `sampling.initial` | `int` | no | `100` | Logs firth N of the same (level and message) log entries each second. | -| `sampling.thereafter` | `int` | no | `100` | Logs every Mth of the same (level and message) log entries after first N entries in that second. | +| Parameter | Type | SIGHUP reload | Default value | Description | +|-----------------------|------------|---------------|---------------|--------------------------------------------------------------------------------------------------| +| `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. | +| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` | +| `sampling.enabled` | `bool` | no | `false` | Enable sampling. | +| `sampling.initial` | `int` | no | `100` | Logs firth N of the same (level and message) log entries each second. | +| `sampling.thereafter` | `int` | no | `100` | Logs every Mth of the same (level and message) log entries after first N entries in that second. | +| `sampling.tags` | `[]string` | no | | Tagged log entries that should be additionally logged (available tags see in the next section). | + +## Tags + +There are additional log entries that can hurt performance and can be additionally logged by using `logger.tags` +parameter. Available tags: + +* `expiration_delete_object` +* `expiration_process_version` +* `expiration_remove_version` +* `multipart_delete_object` +* `multipart_process_upload` # `pprof` section diff --git a/internal/lifecycle/executor.go b/internal/lifecycle/executor.go index 78289b3..b14e820 100644 --- a/internal/lifecycle/executor.go +++ b/internal/lifecycle/executor.go @@ -179,6 +179,7 @@ func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo, } for _, multipart := range multiparts { + e.log.Debug(logs.TagMultipartProcessUpload, zap.String("key", multipart.Key), zap.String("upload_id", multipart.UploadID), zap.Uint64("node_id", multipart.ID)) if !matcherFn(multipart) { continue } @@ -298,6 +299,9 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru return fmt.Errorf("get node version from stream: %w", err) } + e.log.Debug(logs.TagExpirationProcessVersion, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", nodeVersion.OID), + zap.String("filepath", nodeVersion.FilePath), zap.Uint64("node_id", nodeVersion.ID)) + if nodeVersion.FilePath != latestObjName { if err = e.expireObject(ctx, versions, bktInfo, ni, matcherFn, settings); err != nil { e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err)) @@ -579,7 +583,7 @@ func (e *Executor) deleteObject(ctx context.Context, version *data.NodeVersion, addr.SetContainer(bktInfo.CID) addr.SetObject(version.OID) - e.log.Debug(logs.DeleteObject, zap.String("address", addr.EncodeToString()), zap.String("filepath", version.FilePath)) + e.log.Debug(logs.TagExpirationDeleteObject, zap.Stringer("address", addr), zap.String("filepath", version.FilePath)) if err := e.frostfs.DeleteObject(ctx, addr); err != nil && !isNotFound(err) { e.log.Warn(logs.DeleteObjectVersionFromStorage, zap.String("key", version.FilePath), zap.String("address", addr.EncodeToString()), zap.Error(err)) @@ -620,7 +624,7 @@ func (e *Executor) addDeleteMarker(ctx context.Context, version *data.NodeVersio } func (e *Executor) removeVersion(ctx context.Context, version *data.NodeVersion, bktInfo *data.BucketInfo) { - e.log.Debug(logs.RemoveVersion, zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("oid", version.OID.EncodeToString()), + e.log.Debug(logs.TagExpirationRemoveVersion, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", version.OID), zap.String("filepath", version.FilePath), zap.Uint64("node_id", version.ID)) if err := e.tree.RemoveVersion(ctx, bktInfo, version.ID); err != nil { e.log.Warn(logs.DeleteObjectVersionFromTree, zap.String("key", version.FilePath), @@ -706,7 +710,7 @@ func (e *Executor) abortMultipart(ctx context.Context, bktInfo *data.BucketInfo, addr.SetContainer(bktInfo.CID) for _, part := range parts { addr.SetObject(part.OID) - e.log.Debug(logs.DeleteObject, zap.String("address", addr.EncodeToString()), zap.Int("part", part.Number)) + e.log.Debug(logs.TagMultipartDeleteObject, zap.Stringer("address", addr), zap.Int("part", part.Number)) if err = e.frostfs.DeleteObject(ctx, addr); err != nil { return fmt.Errorf("delete object '%s': %w", addr.EncodeToString(), err) } diff --git a/internal/logs/logs.go b/internal/logs/logs.go index ff03a6d..65c28db 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -65,7 +65,13 @@ const ( EpochMismatched = "epoch mismatched" UnknownCredentialSource = "unknown credential source to use" AbortMultipart = "abort multipart" - DeleteObject = "delete object" - RemoveVersion = "remove version" ProcessingJob = "processing job" ) + +const ( + TagExpirationDeleteObject = "tag:expiration_delete_object" + TagExpirationProcessVersion = "tag:expiration_process_version" + TagExpirationRemoveVersion = "tag:expiration_remove_version" + TagMultipartDeleteObject = "tag:multipart_delete_object" + TagMultipartProcessUpload = "tag:multipart_process_upload" +)