[#20] Support logging with tags
Allow to log additionally "tagged" log entries. Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
4f250eba42
commit
2f7a73ee36
5 changed files with 80 additions and 18 deletions
|
@ -2,8 +2,10 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/zapjournald"
|
"git.frostfs.info/TrueCloudLab/zapjournald"
|
||||||
|
"github.com/nspcc-dev/neo-go/cli/options"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"github.com/ssgreg/journald"
|
"github.com/ssgreg/journald"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -33,18 +35,32 @@ func pickLogger(v *viper.Viper) *Logger {
|
||||||
|
|
||||||
func newStdoutLogger(v *viper.Viper) *Logger {
|
func newStdoutLogger(v *viper.Viper) *Logger {
|
||||||
c := newZapLogConfig(v)
|
c := newZapLogConfig(v)
|
||||||
c.Encoding = "console"
|
|
||||||
|
|
||||||
l, err := c.Build(
|
out, errSink, err := openZapSinks(c)
|
||||||
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("build zap logger instance: %v", err))
|
panic(fmt.Sprintf("open zap sinks: %v", err.Error()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
core := zapcore.NewCore(zapcore.NewConsoleEncoder(c.EncoderConfig), out, c.Level)
|
||||||
|
core = options.NewFilteringCore(core, filteringLogOption(v))
|
||||||
|
l := zap.New(core, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)), zap.ErrorOutput(errSink))
|
||||||
|
|
||||||
return &Logger{logger: l, lvl: c.Level}
|
return &Logger{logger: l, lvl: c.Level}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func openZapSinks(cfg zap.Config) (zapcore.WriteSyncer, zapcore.WriteSyncer, error) {
|
||||||
|
sink, closeOut, err := zap.Open(cfg.OutputPaths...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
errSink, _, err := zap.Open(cfg.ErrorOutputPaths...)
|
||||||
|
if err != nil {
|
||||||
|
closeOut()
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return sink, errSink, nil
|
||||||
|
}
|
||||||
|
|
||||||
func newJournaldLogger(v *viper.Viper) *Logger {
|
func newJournaldLogger(v *viper.Viper) *Logger {
|
||||||
c := newZapLogConfig(v)
|
c := newZapLogConfig(v)
|
||||||
|
|
||||||
|
@ -53,7 +69,8 @@ func newJournaldLogger(v *viper.Viper) *Logger {
|
||||||
encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields)
|
encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields)
|
||||||
|
|
||||||
core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
||||||
coreWithContext := core.With([]zapcore.Field{
|
filteringCore := options.NewFilteringCore(core, filteringLogOption(v))
|
||||||
|
coreWithContext := filteringCore.With([]zapcore.Field{
|
||||||
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
||||||
zapjournald.SyslogIdentifier(),
|
zapjournald.SyslogIdentifier(),
|
||||||
zapjournald.SyslogPid(),
|
zapjournald.SyslogPid(),
|
||||||
|
@ -84,6 +101,25 @@ func newZapLogConfig(v *viper.Viper) zap.Config {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func filteringLogOption(v *viper.Viper) options.FilterFunc {
|
||||||
|
tags := v.GetStringSlice(cfgLoggerTags)
|
||||||
|
|
||||||
|
return func(entry zapcore.Entry) bool {
|
||||||
|
if !strings.HasPrefix(entry.Message, "tag:") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := entry.Message[4:] // len("tag:")
|
||||||
|
for _, tag := range tags {
|
||||||
|
if msg == tag {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func getLogLevel(lvlStr string) (zapcore.Level, error) {
|
func getLogLevel(lvlStr string) (zapcore.Level, error) {
|
||||||
var lvl zapcore.Level
|
var lvl zapcore.Level
|
||||||
err := lvl.UnmarshalText([]byte(lvlStr))
|
err := lvl.UnmarshalText([]byte(lvlStr))
|
||||||
|
|
|
@ -41,6 +41,7 @@ const (
|
||||||
cfgLoggerSamplingEnabled = "logger.sampling.enabled"
|
cfgLoggerSamplingEnabled = "logger.sampling.enabled"
|
||||||
cfgLoggerSamplingInitial = "logger.sampling.initial"
|
cfgLoggerSamplingInitial = "logger.sampling.initial"
|
||||||
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
|
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
|
||||||
|
cfgLoggerTags = "logger.tags"
|
||||||
|
|
||||||
// Morph.
|
// Morph.
|
||||||
cfgMorphRPCEndpointPrefixTmpl = "morph.rpc_endpoint.%d."
|
cfgMorphRPCEndpointPrefixTmpl = "morph.rpc_endpoint.%d."
|
||||||
|
|
|
@ -53,15 +53,30 @@ logger:
|
||||||
enabled: false
|
enabled: false
|
||||||
initial: 100
|
initial: 100
|
||||||
thereafter: 100
|
thereafter: 100
|
||||||
|
tags:
|
||||||
|
- "expiration_delete_object"
|
||||||
|
- "multipart_delete_object"
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|-----------------------|----------|---------------|---------------|--------------------------------------------------------------------------------------------------|
|
|-----------------------|------------|---------------|---------------|--------------------------------------------------------------------------------------------------|
|
||||||
| `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. |
|
| `level` | `string` | yes | `info` | Logging level. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. |
|
||||||
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
|
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
|
||||||
| `sampling.enabled` | `bool` | no | `false` | Enable sampling. |
|
| `sampling.enabled` | `bool` | no | `false` | Enable sampling. |
|
||||||
| `sampling.initial` | `int` | no | `100` | Logs firth N of the same (level and message) log entries each second. |
|
| `sampling.initial` | `int` | no | `100` | Logs firth N of the same (level and message) log entries each second. |
|
||||||
| `sampling.thereafter` | `int` | no | `100` | Logs every Mth of the same (level and message) log entries after first N entries in that second. |
|
| `sampling.thereafter` | `int` | no | `100` | Logs every Mth of the same (level and message) log entries after first N entries in that second. |
|
||||||
|
| `sampling.tags` | `[]string` | no | | Tagged log entries that should be additionally logged (available tags see in the next section). |
|
||||||
|
|
||||||
|
## Tags
|
||||||
|
|
||||||
|
There are additional log entries that can hurt performance and can be additionally logged by using `logger.tags`
|
||||||
|
parameter. Available tags:
|
||||||
|
|
||||||
|
* `expiration_delete_object`
|
||||||
|
* `expiration_process_version`
|
||||||
|
* `expiration_remove_version`
|
||||||
|
* `multipart_delete_object`
|
||||||
|
* `multipart_process_upload`
|
||||||
|
|
||||||
# `pprof` section
|
# `pprof` section
|
||||||
|
|
||||||
|
|
|
@ -179,6 +179,7 @@ func (e *Executor) abortMultiparts(ctx context.Context, ni *netmap.NetworkInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, multipart := range multiparts {
|
for _, multipart := range multiparts {
|
||||||
|
e.log.Debug(logs.TagMultipartProcessUpload, zap.String("key", multipart.Key), zap.String("upload_id", multipart.UploadID), zap.Uint64("node_id", multipart.ID))
|
||||||
if !matcherFn(multipart) {
|
if !matcherFn(multipart) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -298,6 +299,9 @@ func (e *Executor) expireObjects(ctx context.Context, ni *netmap.NetworkInfo, ru
|
||||||
return fmt.Errorf("get node version from stream: %w", err)
|
return fmt.Errorf("get node version from stream: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.log.Debug(logs.TagExpirationProcessVersion, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", nodeVersion.OID),
|
||||||
|
zap.String("filepath", nodeVersion.FilePath), zap.Uint64("node_id", nodeVersion.ID))
|
||||||
|
|
||||||
if nodeVersion.FilePath != latestObjName {
|
if nodeVersion.FilePath != latestObjName {
|
||||||
if err = e.expireObject(ctx, versions, bktInfo, ni, matcherFn, settings); err != nil {
|
if err = e.expireObject(ctx, versions, bktInfo, ni, matcherFn, settings); err != nil {
|
||||||
e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err))
|
e.log.Warn(logs.FailedToExpireObject, zap.String("object", latestObjName), zap.Error(err))
|
||||||
|
@ -579,7 +583,7 @@ func (e *Executor) deleteObject(ctx context.Context, version *data.NodeVersion,
|
||||||
addr.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addr.SetObject(version.OID)
|
addr.SetObject(version.OID)
|
||||||
|
|
||||||
e.log.Debug(logs.DeleteObject, zap.String("address", addr.EncodeToString()), zap.String("filepath", version.FilePath))
|
e.log.Debug(logs.TagExpirationDeleteObject, zap.Stringer("address", addr), zap.String("filepath", version.FilePath))
|
||||||
if err := e.frostfs.DeleteObject(ctx, addr); err != nil && !isNotFound(err) {
|
if err := e.frostfs.DeleteObject(ctx, addr); err != nil && !isNotFound(err) {
|
||||||
e.log.Warn(logs.DeleteObjectVersionFromStorage, zap.String("key", version.FilePath),
|
e.log.Warn(logs.DeleteObjectVersionFromStorage, zap.String("key", version.FilePath),
|
||||||
zap.String("address", addr.EncodeToString()), zap.Error(err))
|
zap.String("address", addr.EncodeToString()), zap.Error(err))
|
||||||
|
@ -620,7 +624,7 @@ func (e *Executor) addDeleteMarker(ctx context.Context, version *data.NodeVersio
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Executor) removeVersion(ctx context.Context, version *data.NodeVersion, bktInfo *data.BucketInfo) {
|
func (e *Executor) removeVersion(ctx context.Context, version *data.NodeVersion, bktInfo *data.BucketInfo) {
|
||||||
e.log.Debug(logs.RemoveVersion, zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("oid", version.OID.EncodeToString()),
|
e.log.Debug(logs.TagExpirationRemoveVersion, zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", version.OID),
|
||||||
zap.String("filepath", version.FilePath), zap.Uint64("node_id", version.ID))
|
zap.String("filepath", version.FilePath), zap.Uint64("node_id", version.ID))
|
||||||
if err := e.tree.RemoveVersion(ctx, bktInfo, version.ID); err != nil {
|
if err := e.tree.RemoveVersion(ctx, bktInfo, version.ID); err != nil {
|
||||||
e.log.Warn(logs.DeleteObjectVersionFromTree, zap.String("key", version.FilePath),
|
e.log.Warn(logs.DeleteObjectVersionFromTree, zap.String("key", version.FilePath),
|
||||||
|
@ -706,7 +710,7 @@ func (e *Executor) abortMultipart(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
addr.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
addr.SetObject(part.OID)
|
addr.SetObject(part.OID)
|
||||||
e.log.Debug(logs.DeleteObject, zap.String("address", addr.EncodeToString()), zap.Int("part", part.Number))
|
e.log.Debug(logs.TagMultipartDeleteObject, zap.Stringer("address", addr), zap.Int("part", part.Number))
|
||||||
if err = e.frostfs.DeleteObject(ctx, addr); err != nil {
|
if err = e.frostfs.DeleteObject(ctx, addr); err != nil {
|
||||||
return fmt.Errorf("delete object '%s': %w", addr.EncodeToString(), err)
|
return fmt.Errorf("delete object '%s': %w", addr.EncodeToString(), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,13 @@ const (
|
||||||
EpochMismatched = "epoch mismatched"
|
EpochMismatched = "epoch mismatched"
|
||||||
UnknownCredentialSource = "unknown credential source to use"
|
UnknownCredentialSource = "unknown credential source to use"
|
||||||
AbortMultipart = "abort multipart"
|
AbortMultipart = "abort multipart"
|
||||||
DeleteObject = "delete object"
|
|
||||||
RemoveVersion = "remove version"
|
|
||||||
ProcessingJob = "processing job"
|
ProcessingJob = "processing job"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
TagExpirationDeleteObject = "tag:expiration_delete_object"
|
||||||
|
TagExpirationProcessVersion = "tag:expiration_process_version"
|
||||||
|
TagExpirationRemoveVersion = "tag:expiration_remove_version"
|
||||||
|
TagMultipartDeleteObject = "tag:multipart_delete_object"
|
||||||
|
TagMultipartProcessUpload = "tag:multipart_process_upload"
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue