From 7705035e8b84b49c3571aa4366be24dfc1e6609b Mon Sep 17 00:00:00 2001 From: Aleksey Kravchenko Date: Tue, 21 Jan 2025 09:44:13 +0300 Subject: [PATCH] [#195] Add tags support Signed-off-by: Aleksey Kravchenko --- cmd/http-gw/app.go | 146 ++++++++++++++++------- cmd/http-gw/logger.go | 182 +++++++++++++++++++++++++++++ cmd/http-gw/settings.go | 164 ++++++++------------------ config/config.yaml | 4 + docs/gate-configuration.md | 31 +++++ internal/cache/buckets.go | 4 +- internal/cache/netmap.go | 2 +- internal/handler/browse.go | 17 ++- internal/handler/download.go | 20 ++-- internal/handler/filter.go | 3 +- internal/handler/handler.go | 27 +++-- internal/handler/head.go | 9 +- internal/handler/multipart.go | 5 +- internal/handler/multipart_test.go | 5 +- internal/handler/reader.go | 5 +- internal/handler/upload.go | 15 ++- internal/handler/utils.go | 5 +- internal/logs/logs.go | 18 ++- internal/net/event_handler.go | 6 +- internal/service/frostfs/source.go | 2 +- metrics/service.go | 12 +- 21 files changed, 465 insertions(+), 217 deletions(-) create mode 100644 cmd/http-gw/logger.go diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go index e34386c..449c6d7 100644 --- a/cmd/http-gw/app.go +++ b/cmd/http-gw/app.go @@ -44,6 +44,7 @@ import ( "github.com/valyala/fasthttp" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/exp/slices" ) @@ -51,7 +52,7 @@ type ( app struct { ctx context.Context log *zap.Logger - logLevel zap.AtomicLevel + tagsConfig *tagsConfig pool *pool.Pool treePool *treepool.Pool key *keys.PrivateKey @@ -113,6 +114,10 @@ type ( enableFilepathFallback bool } + tagsConfig struct { + tagLogs sync.Map + } + CORS struct { AllowOrigin string AllowMethods []string @@ -123,13 +128,61 @@ type ( } ) +func newTagsConfig(v *viper.Viper) *tagsConfig { + var t tagsConfig + if err := t.update(v); err != nil { + // panic here is analogue of the similar panic during common log level initialization. + panic(err.Error()) + } + return &t +} + +func (t *tagsConfig) LevelEnabled(tag string, tgtLevel zapcore.Level) bool { + lvl, ok := t.tagLogs.Load(tag) + if !ok { + return false + } + + return lvl.(zapcore.Level).Enabled(tgtLevel) +} + +func (t *tagsConfig) update(cfg *viper.Viper) error { + tags, err := fetchLogTagsConfig(cfg) + if err != nil { + return err + } + + t.tagLogs.Range(func(key, value any) bool { + k := key.(string) + v := value.(zapcore.Level) + + if lvl, ok := tags[k]; ok { + if lvl != v { + t.tagLogs.Store(key, lvl) + } + } else { + t.tagLogs.Delete(key) + delete(tags, k) + } + return true + }) + + for k, v := range tags { + t.tagLogs.Store(k, v) + } + + return nil +} + func newApp(ctx context.Context, v *viper.Viper) App { logSettings := &loggerSettings{} - log := pickLogger(v, logSettings) + tagConfig := newTagsConfig(v) + log := pickLogger(v, logSettings, tagConfig) a := &app{ ctx: ctx, log: log.logger, + tagsConfig: tagConfig, cfg: v, loggerSettings: logSettings, webServer: new(fasthttp.Server), @@ -319,7 +372,7 @@ func (a *app) initResolver() { var err error a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig()) if err != nil { - a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err)) + a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err), logs.TagField(logs.TagApp)) } } @@ -333,11 +386,12 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) { order := a.cfg.GetStringSlice(cfgResolveOrder) if resolveCfg.RPCAddress == "" { order = remove(order, resolver.NNSResolver) - a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided) + a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided, logs.TagField(logs.TagConfig)) } if len(order) == 0 { - a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty) + a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty, + logs.TagField(logs.TagConfig)) } return order, resolveCfg @@ -352,7 +406,7 @@ func (a *app) initMetrics() { func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics { if !enabled { - logger.Warn(logs.MetricsAreDisabled) + logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagConfig)) } return &gateMetrics{ logger: logger, @@ -370,7 +424,7 @@ func (m *gateMetrics) isEnabled() bool { func (m *gateMetrics) SetEnabled(enabled bool) { if !enabled { - m.logger.Warn(logs.MetricsAreDisabled) + m.logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagConfig)) } m.mu.Lock() @@ -433,7 +487,7 @@ func getFrostFSKey(cfg *viper.Viper, log *zap.Logger) (*keys.PrivateKey, error) walletPath := cfg.GetString(cfgWalletPath) if len(walletPath) == 0 { - log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun) + log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun, logs.TagField(logs.TagConfig)) key, err := keys.NewPrivateKey() if err != nil { return nil, err @@ -490,7 +544,10 @@ func getKeyFromWallet(w *wallet.Wallet, addrStr string, password *string) (*keys } func (a *app) Wait() { - a.log.Info(logs.StartingApplication, zap.String("app_name", "frostfs-http-gw"), zap.String("version", Version)) + a.log.Info(logs.StartingApplication, + zap.String("app_name", "frostfs-http-gw"), + zap.String("version", Version), + logs.TagField(logs.TagApp)) a.metrics.SetVersion(Version) a.setHealthStatus() @@ -521,10 +578,10 @@ func (a *app) Serve() { for i := range servs { go func(i int) { - a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address())) + a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()), logs.TagField(logs.TagApp)) if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed { a.metrics.MarkUnhealthy(servs[i].Address()) - a.log.Fatal(logs.ListenAndServe, zap.Error(err)) + a.log.Fatal(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp)) } }(i) } @@ -546,7 +603,7 @@ LOOP: } } - a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown())) + a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()), logs.TagField(logs.TagApp)) a.metrics.Shutdown() a.stopServices() @@ -556,7 +613,7 @@ LOOP: func (a *app) initWorkerPool() *ants.Pool { workerPool, err := ants.NewPool(a.settings.workerPoolSize) if err != nil { - a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err)) + a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err), logs.TagField(logs.TagApp)) } return workerPool } @@ -567,37 +624,37 @@ func (a *app) shutdownTracing() { defer cancel() if err := tracing.Shutdown(shdnCtx); err != nil { - a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err)) + a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err), logs.TagField(logs.TagApp)) } } func (a *app) configReload(ctx context.Context) { - a.log.Info(logs.SIGHUPConfigReloadStarted) + log := a.log.With(logs.TagField(logs.TagConfig)) + + log.Info(logs.SIGHUPConfigReloadStarted) if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) { - a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed) + log.Warn(logs.FailedToReloadConfigBecauseItsMissed) return } if err := readInConfig(a.cfg); err != nil { - a.log.Warn(logs.FailedToReloadConfig, zap.Error(err)) + log.Warn(logs.FailedToReloadConfig, zap.Error(err)) return } - if lvl, err := getLogLevel(a.cfg); err != nil { - a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err)) - } else { - a.logLevel.SetLevel(lvl) + if err := a.tagsConfig.update(a.cfg); err != nil { + log.Warn(logs.TagsLogConfigWontBeUpdated, zap.Error(err)) } if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil { - a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err)) + log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err)) } if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil { - a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err)) + log.Warn(logs.FailedToUpdateResolvers, zap.Error(err)) } if err := a.updateServers(); err != nil { - a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err)) + log.Warn(logs.FailedToReloadServerParameters, zap.Error(err)) } a.setRuntimeParameters() @@ -611,7 +668,7 @@ func (a *app) configReload(ctx context.Context) { a.initTracing(ctx) a.setHealthStatus() - a.log.Info(logs.SIGHUPConfigReloadCompleted) + log.Info(logs.SIGHUPConfigReloadCompleted) } func (a *app) startServices() { @@ -647,18 +704,18 @@ func (a *app) configureRouter(h *handler.Handler) { r.POST("/upload/{cid}", a.addMiddlewares(h.Upload)) r.OPTIONS("/upload/{cid}", a.addPreflight()) - a.log.Info(logs.AddedPathUploadCid) + a.log.Info(logs.AddedPathUploadCid, logs.TagField(logs.TagApp)) r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(h.DownloadByAddressOrBucketName)) r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(h.HeadByAddressOrBucketName)) r.OPTIONS("/get/{cid}/{oid:*}", a.addPreflight()) - a.log.Info(logs.AddedPathGetCidOid) + a.log.Info(logs.AddedPathGetCidOid, logs.TagField(logs.TagApp)) r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.DownloadByAttribute)) r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute)) r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight()) - a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal) + a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal, logs.TagField(logs.TagApp)) r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZipped)) r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight()) - a.log.Info(logs.AddedPathZipCidPrefix) + a.log.Info(logs.AddedPathZipCidPrefix, logs.TagField(logs.TagApp)) a.webServer.Handler = r.Handler } @@ -752,6 +809,7 @@ func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler { zap.ByteString("method", req.Method()), zap.ByteString("path", req.Path()), zap.ByteString("query", req.QueryArgs().QueryString()), + logs.TagField(logs.TagDatapath), } log.Info(logs.Request, fields...) @@ -798,7 +856,7 @@ func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler { if err != nil { log := utils.GetReqLogOrDefault(reqCtx, a.log) - log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err)) + log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err), logs.TagField(logs.TagDatapath)) handler.ResponseError(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) return } @@ -852,6 +910,7 @@ func (a *app) initServers(ctx context.Context) { fields := []zap.Field{ zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled), zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile), + logs.TagField(logs.TagApp), } srv, err := newServer(ctx, serverInfo) if err != nil { @@ -867,7 +926,7 @@ func (a *app) initServers(ctx context.Context) { } if len(a.servers) == 0 { - a.log.Fatal(logs.NoHealthyServers) + a.log.Fatal(logs.NoHealthyServers, logs.TagField(logs.TagApp)) } } @@ -941,13 +1000,14 @@ func (a *app) initTracing(ctx context.Context) { if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" { caBytes, err := os.ReadFile(trustedCa) if err != nil { - a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err)) + a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp)) return } certPool := x509.NewCertPool() ok := certPool.AppendCertsFromPEM(caBytes) if !ok { - a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert")) + a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"), + logs.TagField(logs.TagApp)) return } cfg.ServerCaCertPool = certPool @@ -955,24 +1015,24 @@ func (a *app) initTracing(ctx context.Context) { attributes, err := fetchTracingAttributes(a.cfg) if err != nil { - a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err)) + a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagConfig)) return } cfg.Attributes = attributes updated, err := tracing.Setup(ctx, cfg) if err != nil { - a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err)) + a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp)) } if updated { - a.log.Info(logs.TracingConfigUpdated) + a.log.Info(logs.TracingConfigUpdated, logs.TagField(logs.TagConfig)) } } func (a *app) setRuntimeParameters() { if len(os.Getenv("GOMEMLIMIT")) != 0 { // default limit < yaml limit < app env limit < GOMEMLIMIT - a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT) + a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT, logs.TagField(logs.TagApp)) return } @@ -981,7 +1041,8 @@ func (a *app) setRuntimeParameters() { if softMemoryLimit != previous { a.log.Info(logs.RuntimeSoftMemoryLimitUpdated, zap.Int64("new_value", softMemoryLimit), - zap.Int64("old_value", previous)) + zap.Int64("old_value", previous), + logs.TagField(logs.TagApp)) } } @@ -1007,28 +1068,29 @@ func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool { a.mu.Lock() defer a.mu.Unlock() - a.log.Info(logs.ServerReconnecting) + a.log.Info(logs.ServerReconnecting, logs.TagField(logs.TagApp)) var failedServers []ServerInfo for _, serverInfo := range a.unbindServers { fields := []zap.Field{ zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled), zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile), + logs.TagField(logs.TagApp), } srv, err := newServer(ctx, serverInfo) if err != nil { - a.log.Warn(logs.ServerReconnectFailed, zap.Error(err)) + a.log.Warn(logs.ServerReconnectFailed, zap.Error(err), logs.TagField(logs.TagApp)) failedServers = append(failedServers, serverInfo) a.metrics.MarkUnhealthy(serverInfo.Address) continue } go func() { - a.log.Info(logs.StartingServer, zap.String("address", srv.Address())) + a.log.Info(logs.StartingServer, zap.String("address", srv.Address()), logs.TagField(logs.TagApp)) a.metrics.MarkHealthy(serverInfo.Address) if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) { - a.log.Warn(logs.ListenAndServe, zap.Error(err)) + a.log.Warn(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp)) a.metrics.MarkUnhealthy(serverInfo.Address) } }() diff --git a/cmd/http-gw/logger.go b/cmd/http-gw/logger.go new file mode 100644 index 0000000..b328a24 --- /dev/null +++ b/cmd/http-gw/logger.go @@ -0,0 +1,182 @@ +package main + +import ( + "fmt" + "os" + + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + "git.frostfs.info/TrueCloudLab/zapjournald" + "github.com/spf13/viper" + "github.com/ssgreg/journald" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func getLogLevel(v *viper.Viper) (zapcore.Level, error) { + var lvl zapcore.Level + lvlStr := v.GetString(cfgLoggerLevel) + err := lvl.UnmarshalText([]byte(lvlStr)) + if err != nil { + return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+ + "value should be one of %v", lvlStr, err, [...]zapcore.Level{ + zapcore.DebugLevel, + zapcore.InfoLevel, + zapcore.WarnLevel, + zapcore.ErrorLevel, + zapcore.DPanicLevel, + zapcore.PanicLevel, + zapcore.FatalLevel, + }) + } + return lvl, nil +} + +var _ zapcore.Core = (*zapCoreTagFilterWrapper)(nil) + +type zapCoreTagFilterWrapper struct { + core zapcore.Core + settings TagFilterSettings + extra []zap.Field +} + +type TagFilterSettings interface { + LevelEnabled(tag string, lvl zapcore.Level) bool +} + +func (c *zapCoreTagFilterWrapper) Enabled(level zapcore.Level) bool { + return c.core.Enabled(level) +} + +func (c *zapCoreTagFilterWrapper) With(fields []zapcore.Field) zapcore.Core { + return &zapCoreTagFilterWrapper{ + core: c.core.With(fields), + settings: c.settings, + extra: append(c.extra, fields...), + } +} + +func (c *zapCoreTagFilterWrapper) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if c.core.Enabled(entry.Level) { + return checked.AddCore(entry, c) + } + return checked +} + +func (c *zapCoreTagFilterWrapper) Write(entry zapcore.Entry, fields []zapcore.Field) error { + if c.shouldSkip(entry, fields) || c.shouldSkip(entry, c.extra) { + return nil + } + + return c.core.Write(entry, fields) +} + +func (c *zapCoreTagFilterWrapper) shouldSkip(entry zapcore.Entry, fields []zap.Field) bool { + for _, field := range fields { + if field.Key == logs.TagFieldName && field.Type == zapcore.StringType { + if !c.settings.LevelEnabled(field.String, entry.Level) { + return true + } + break + } + } + + return false +} + +func (c *zapCoreTagFilterWrapper) Sync() error { + return c.core.Sync() +} + +func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, loggerSettings LoggerAppSettings, tagSetting TagFilterSettings) zapcore.Core { + core = &zapCoreTagFilterWrapper{ + core: core, + settings: tagSetting, + } + + if v.GetBool(cfgLoggerSamplingEnabled) { + core = zapcore.NewSamplerWithOptions(core, + v.GetDuration(cfgLoggerSamplingInterval), + v.GetInt(cfgLoggerSamplingInitial), + v.GetInt(cfgLoggerSamplingThereafter), + zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) { + if dec&zapcore.LogDropped > 0 { + loggerSettings.DroppedLogsInc() + } + })) + } + + return core +} + +func newLogEncoder() zapcore.Encoder { + c := zap.NewProductionEncoderConfig() + c.EncodeTime = zapcore.ISO8601TimeEncoder + + return zapcore.NewConsoleEncoder(c) +} + +// newStdoutLogger constructs a zap.Logger instance for current application. +// Panics on failure. +// +// Logger is built from zap's production logging configuration with: +// - parameterized level (debug by default) +// - console encoding +// - ISO8601 time encoding +// +// Logger records a stack trace for all messages at or above fatal level. +// +// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace. +func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, loggerSettings LoggerAppSettings, tagSetting TagFilterSettings) *Logger { + stdout := zapcore.AddSync(os.Stderr) + level := zap.NewAtomicLevelAt(lvl) + + consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level) + consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, loggerSettings, tagSetting) + + return &Logger{ + logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))), + lvl: level, + } +} + +func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, loggerSettings LoggerAppSettings, tagSetting TagFilterSettings) *Logger { + level := zap.NewAtomicLevelAt(lvl) + + encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields) + + core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields) + coreWithContext := core.With([]zapcore.Field{ + zapjournald.SyslogFacility(zapjournald.LogDaemon), + zapjournald.SyslogIdentifier(), + zapjournald.SyslogPid(), + }) + + coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, loggerSettings, tagSetting) + + return &Logger{ + logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))), + lvl: level, + } +} + +type LoggerAppSettings interface { + DroppedLogsInc() +} + +func pickLogger(v *viper.Viper, loggerSettings LoggerAppSettings, tagSettings TagFilterSettings) *Logger { + lvl, err := getLogLevel(v) + if err != nil { + panic(err) + } + + dest := v.GetString(cfgLoggerDestination) + + switch dest { + case destinationStdout: + return newStdoutLogger(v, lvl, loggerSettings, tagSettings) + case destinationJournald: + return newJournaldLogger(v, lvl, loggerSettings, tagSettings) + default: + panic(fmt.Sprintf("wrong destination for logger: %s", dest)) + } +} diff --git a/cmd/http-gw/settings.go b/cmd/http-gw/settings.go index 691e9ba..b3d169a 100644 --- a/cmd/http-gw/settings.go +++ b/cmd/http-gw/settings.go @@ -22,10 +22,8 @@ import ( grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree" - "git.frostfs.info/TrueCloudLab/zapjournald" "github.com/spf13/pflag" "github.com/spf13/viper" - "github.com/ssgreg/journald" "github.com/valyala/fasthttp" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -110,6 +108,11 @@ const ( cfgLoggerSamplingThereafter = "logger.sampling.thereafter" cfgLoggerSamplingInterval = "logger.sampling.interval" + cfgLoggerTags = "logger.tags" + cfgLoggerTagsPrefixTmpl = cfgLoggerTags + ".%d." + cfgLoggerTagsNameTmpl = cfgLoggerTagsPrefixTmpl + "name" + cfgLoggerTagsLevelTmpl = cfgLoggerTagsPrefixTmpl + "level" + // Wallet. cfgWalletPassphrase = "wallet.passphrase" cfgWalletPath = "wallet.path" @@ -187,6 +190,8 @@ var ignore = map[string]struct{}{ cmdVersion: {}, } +var defaultTags = []string{logs.TagApp, logs.TagConfig, logs.TagDatapath} + type Logger struct { logger *zap.Logger lvl zap.AtomicLevel @@ -428,112 +433,39 @@ func mergeConfig(v *viper.Viper, fileName string) error { return v.MergeConfig(cfgFile) } -type LoggerAppSettings interface { - DroppedLogsInc() -} +func fetchLogTagsConfig(v *viper.Viper) (map[string]zapcore.Level, error) { + res := make(map[string]zapcore.Level) -func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger { - lvl, err := getLogLevel(v) - if err != nil { - panic(err) + defaultLevel := v.GetString(cfgLoggerLevel) + var defaultLvl zapcore.Level + if err := defaultLvl.Set(defaultLevel); err != nil { + return nil, fmt.Errorf("failed to parse log level, unknown level: '%s'", defaultLevel) } - dest := v.GetString(cfgLoggerDestination) + for i := 0; ; i++ { + name := v.GetString(fmt.Sprintf(cfgLoggerTagsNameTmpl, i)) + if name == "" { + break + } - switch dest { - case destinationStdout: - return newStdoutLogger(v, lvl, settings) - case destinationJournald: - return newJournaldLogger(v, lvl, settings) - default: - panic(fmt.Sprintf("wrong destination for logger: %s", dest)) - } -} + lvl := defaultLvl + level := v.GetString(fmt.Sprintf(cfgLoggerTagsLevelTmpl, i)) + if level != "" { + if err := lvl.Set(level); err != nil { + return nil, fmt.Errorf("failed to parse log tags config, unknown level: '%s'", level) + } + } -// newStdoutLogger constructs a zap.Logger instance for current application. -// Panics on failure. -// -// Logger is built from zap's production logging configuration with: -// - parameterized level (debug by default) -// - console encoding -// - ISO8601 time encoding -// -// Logger records a stack trace for all messages at or above fatal level. -// -// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace. -func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger { - stdout := zapcore.AddSync(os.Stderr) - level := zap.NewAtomicLevelAt(lvl) - - consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level) - consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings) - - return &Logger{ - logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))), - lvl: level, - } -} - -func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger { - level := zap.NewAtomicLevelAt(lvl) - - encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields) - - core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields) - coreWithContext := core.With([]zapcore.Field{ - zapjournald.SyslogFacility(zapjournald.LogDaemon), - zapjournald.SyslogIdentifier(), - zapjournald.SyslogPid(), - }) - - coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings) - - return &Logger{ - logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))), - lvl: level, - } -} - -func newLogEncoder() zapcore.Encoder { - c := zap.NewProductionEncoderConfig() - c.EncodeTime = zapcore.ISO8601TimeEncoder - - return zapcore.NewConsoleEncoder(c) -} - -func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core { - if v.GetBool(cfgLoggerSamplingEnabled) { - core = zapcore.NewSamplerWithOptions(core, - v.GetDuration(cfgLoggerSamplingInterval), - v.GetInt(cfgLoggerSamplingInitial), - v.GetInt(cfgLoggerSamplingThereafter), - zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) { - if dec&zapcore.LogDropped > 0 { - settings.DroppedLogsInc() - } - })) + res[name] = lvl } - return core -} - -func getLogLevel(v *viper.Viper) (zapcore.Level, error) { - var lvl zapcore.Level - lvlStr := v.GetString(cfgLoggerLevel) - err := lvl.UnmarshalText([]byte(lvlStr)) - if err != nil { - return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+ - "value should be one of %v", lvlStr, err, [...]zapcore.Level{ - zapcore.DebugLevel, - zapcore.InfoLevel, - zapcore.WarnLevel, - zapcore.ErrorLevel, - zapcore.DPanicLevel, - zapcore.PanicLevel, - zapcore.FatalLevel, - }) + if len(res) == 0 && !v.IsSet(cfgLoggerTags) { + for _, tag := range defaultTags { + res[tag] = defaultLvl + } } - return lvl, nil + + return res, nil } func fetchReconnectInterval(cfg *viper.Viper) time.Duration { @@ -552,17 +484,17 @@ func fetchIndexPageTemplate(v *viper.Viper, l *zap.Logger) (string, bool) { reader, err := os.Open(v.GetString(cfgIndexPageTemplatePath)) if err != nil { - l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err)) + l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err), logs.TagField(logs.TagApp)) return "", true } tmpl, err := io.ReadAll(reader) if err != nil { - l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err)) + l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err), logs.TagField(logs.TagApp)) return "", true } - l.Info(logs.SetCustomIndexPageTemplate) + l.Info(logs.SetCustomIndexPageTemplate, logs.TagField(logs.TagApp)) return string(tmpl), true } @@ -603,7 +535,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo { } if _, ok := seen[serverInfo.Address]; ok { - log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address)) + log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address), logs.TagField(logs.TagConfig)) continue } seen[serverInfo.Address] = struct{}{} @@ -616,7 +548,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo { func (a *app) initPools(ctx context.Context) { key, err := getFrostFSKey(a.cfg, a.log) if err != nil { - a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err)) + a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err), logs.TagField(logs.TagApp)) } var prm pool.InitParameters @@ -624,7 +556,8 @@ func (a *app) initPools(ctx context.Context) { prm.SetKey(&key.PrivateKey) prmTree.SetKey(key) - a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes()))) + a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())), + logs.TagField(logs.TagApp)) for _, peer := range fetchPeers(a.log, a.cfg) { prm.AddNode(peer) @@ -679,11 +612,11 @@ func (a *app) initPools(ctx context.Context) { p, err := pool.NewPool(prm) if err != nil { - a.log.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err)) + a.log.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err), logs.TagField(logs.TagApp)) } if err = p.Dial(ctx); err != nil { - a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err)) + a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err), logs.TagField(logs.TagApp)) } if a.cfg.GetBool(cfgFeaturesTreePoolNetmapSupport) { @@ -692,10 +625,10 @@ func (a *app) initPools(ctx context.Context) { treePool, err := treepool.NewPool(prmTree) if err != nil { - a.log.Fatal(logs.FailedToCreateTreePool, zap.Error(err)) + a.log.Fatal(logs.FailedToCreateTreePool, zap.Error(err), logs.TagField(logs.TagApp)) } if err = treePool.Dial(ctx); err != nil { - a.log.Fatal(logs.FailedToDialTreePool, zap.Error(err)) + a.log.Fatal(logs.FailedToDialTreePool, zap.Error(err), logs.TagField(logs.TagApp)) } a.pool = p @@ -726,7 +659,8 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam { l.Info(logs.AddedStoragePeer, zap.Int("priority", priority), zap.String("address", address), - zap.Float64("weight", weight)) + zap.Float64("weight", weight), + logs.TagField(logs.TagConfig)) } return nodes @@ -765,7 +699,8 @@ func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultV l.Error(logs.InvalidLifetimeUsingDefaultValue, zap.String("parameter", cfgEntry), zap.Duration("value in config", lifetime), - zap.Duration("default", defaultValue)) + zap.Duration("default", defaultValue), + logs.TagField(logs.TagConfig)) } else { return lifetime } @@ -781,7 +716,8 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue l.Error(logs.InvalidCacheSizeUsingDefaultValue, zap.String("parameter", cfgEntry), zap.Int("value in config", size), - zap.Int("default", defaultValue)) + zap.Int("default", defaultValue), + logs.TagField(logs.TagConfig)) } else { return size } @@ -793,7 +729,7 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource { source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger)) if err != nil { - logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err)) + logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err), logs.TagField(logs.TagConfig)) } return source } diff --git a/config/config.yaml b/config/config.yaml index eee84e5..200890f 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -29,6 +29,10 @@ logger: initial: 100 thereafter: 100 interval: 1s + tags: + - name: "app" + - name: "config" + - name: "datapath" server: - address: 0.0.0.0:8080 diff --git a/docs/gate-configuration.md b/docs/gate-configuration.md index ce7c0c7..93ebdc2 100644 --- a/docs/gate-configuration.md +++ b/docs/gate-configuration.md @@ -174,6 +174,12 @@ logger: initial: 100 thereafter: 100 interval: 1s + tags: + - name: "app" + level: info + - name: "config" + - name: "datapath" + - name: "external_storage_tree" ``` | Parameter | Type | SIGHUP reload | Default value | Description | @@ -184,6 +190,31 @@ logger: | `sampling.initial` | `int` | no | '100' | Sampling count of first log entries. | | `sampling.thereafter` | `int` | no | '100' | Sampling count of entries after an `interval`. | | `sampling.interval` | `duration` | no | '1s' | Sampling interval of messaging similar entries. | +| `sampling.tags` | `[]Tag` | yes | | Tagged log entries that should be additionally logged (available tags see in the next section). | + +## Tags + +There are additional log entries that can hurt performance and can be additionally logged by using `logger.tags` +parameter. Available tags: + +```yaml +tags: + - name: "app" + level: info +``` + +| Parameter | Type | SIGHUP reload | Default value | Description | +|-----------------------|------------|---------------|---------------------------|-------------------------------------------------------------------------------------------------------| +| `name` | `string` | yes | | Tag name. Possible values see below in `Tag values` section. | +| `level` | `string` | yes | Value from `logger.level` | Logging level for specific tag. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. | + +### Tag values + +* `app` - common application logs (enabled by default). +* `config` - application configuration, SIGHUP etc (enabled by default). +* `datapath` - main logic of application (enabled by default). +* `external_storage` - external interaction with storage node. +* `external_storage_tree` - external interaction with tree service in storage node. # `web` section diff --git a/internal/cache/buckets.go b/internal/cache/buckets.go index 2fa8f25..91ae5b2 100644 --- a/internal/cache/buckets.go +++ b/internal/cache/buckets.go @@ -72,7 +72,7 @@ func (o *BucketCache) GetByCID(cnrID cid.ID) *data.BucketInfo { key, ok := entry.(string) if !ok { o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)), - zap.String("expected", fmt.Sprintf("%T", key))) + zap.String("expected", fmt.Sprintf("%T", key)), logs.TagField(logs.TagDatapath)) return nil } @@ -88,7 +88,7 @@ func (o *BucketCache) get(key string) *data.BucketInfo { result, ok := entry.(*data.BucketInfo) if !ok { o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)), - zap.String("expected", fmt.Sprintf("%T", result))) + zap.String("expected", fmt.Sprintf("%T", result)), logs.TagField(logs.TagDatapath)) return nil } diff --git a/internal/cache/netmap.go b/internal/cache/netmap.go index 6d91fe7..ce01b47 100644 --- a/internal/cache/netmap.go +++ b/internal/cache/netmap.go @@ -53,7 +53,7 @@ func (c *NetmapCache) Get() *netmap.NetMap { result, ok := entry.(netmap.NetMap) if !ok { c.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)), - zap.String("expected", fmt.Sprintf("%T", result))) + zap.String("expected", fmt.Sprintf("%T", result)), logs.TagField(logs.TagDatapath)) return nil } diff --git a/internal/handler/browse.go b/internal/handler/browse.go index 64ad1f5..56ffbc1 100644 --- a/internal/handler/browse.go +++ b/internal/handler/browse.go @@ -230,7 +230,7 @@ func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.Buck } for objExt := range resp { if objExt.Error != nil { - log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error)) + log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error), logs.TagField(logs.TagExternalStorage)) result.hasErrors = true continue } @@ -273,7 +273,7 @@ func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs Re }) if err != nil { wg.Done() - log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err)) + log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err), logs.TagField(logs.TagDatapath)) } select { case <-ctx.Done(): @@ -283,7 +283,7 @@ func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs Re } }) if err != nil { - log.Error(logs.FailedToIterateOverResponse, zap.Error(err)) + log.Error(logs.FailedToIterateOverResponse, zap.Error(err), logs.TagField(logs.TagDatapath)) } wg.Wait() }() @@ -332,6 +332,11 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) { const S3Protocol = "s3" const FrostfsProtocol = "frostfs" + logTag := logs.TagExternalStorageTree + if p.isNative { + logTag = logs.TagExternalStorage + } + ctx := utils.GetContextFromRequest(c) reqLog := utils.GetReqLogOrDefault(ctx, h.log) log := reqLog.With( @@ -341,7 +346,7 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) { ) resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logTag) return } @@ -360,7 +365,7 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) { "parentDir": parentDir, }).Parse(h.config.IndexPageTemplate()) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logTag) return } bucketName := p.bucketInfo.Name @@ -376,7 +381,7 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) { Protocol: protocol, HasErrors: resp.hasErrors, }); err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logTag) return } } diff --git a/internal/handler/download.go b/internal/handler/download.go index 8766f0c..a9650da 100644 --- a/internal/handler/download.go +++ b/internal/handler/download.go @@ -36,13 +36,13 @@ func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) { bktInfo, err := h.getBucketInfo(ctx, cidParam, log) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logs.TagExternalStorage) return } checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo) if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) { - logAndSendBucketError(c, log, checkS3Err) + logAndSendBucketError(c, log, checkS3Err, logs.TagExternalStorageTree) return } @@ -118,7 +118,8 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) { prefix, err := url.QueryUnescape(prefix) if err != nil { - log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Error(err)) + log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), + zap.Error(err), logs.TagField(logs.TagDatapath)) ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest) return } @@ -127,13 +128,13 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) { bktInfo, err := h.getBucketInfo(ctx, scid, log) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logs.TagExternalStorage) return } resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) if err != nil { - log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) + log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage)) ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) return } @@ -165,19 +166,20 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) { addr.SetObject(id) if err = h.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil { - log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err)) + log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err), + logs.TagField(logs.TagDatapath)) } return false }) if errIter != nil { - log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter)) + log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagExternalStorage)) } else if !called { - log.Error(logs.ObjectsNotFound) + log.Error(logs.ObjectsNotFound, logs.TagField(logs.TagExternalStorage)) } if err = zipWriter.Close(); err != nil { - log.Error(logs.CloseZipWriter, zap.Error(err)) + log.Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath)) } }) } diff --git a/internal/handler/filter.go b/internal/handler/filter.go index 745718a..da99db7 100644 --- a/internal/handler/filter.go +++ b/internal/handler/filter.go @@ -50,7 +50,8 @@ func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) (map[string]st l.Debug(logs.AddAttributeToResultObject, zap.String("key", k), - zap.String("val", v)) + zap.String("val", v), + logs.TagField(logs.TagDatapath)) }) return result, err diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 2f1c6ad..9c43ad8 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -206,11 +206,11 @@ func (h *Handler) byS3Path(ctx context.Context, req request, cnrID cid.ID, path foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logs.TagExternalStorageTree) return } if foundOID.IsDeleteMarker { - log.Error(logs.ObjectWasDeleted) + log.Error(logs.ObjectWasDeleted, logs.TagField(logs.TagExternalStorageTree)) ResponseError(c, "object deleted", fasthttp.StatusNotFound) return } @@ -230,14 +230,16 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte key, err := url.QueryUnescape(key) if err != nil { - log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key), zap.Error(err)) + log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key), + zap.Error(err), logs.TagField(logs.TagDatapath)) ResponseError(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest) return } val, err = url.QueryUnescape(val) if err != nil { - log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val), zap.Error(err)) + log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val), + zap.Error(err), logs.TagField(logs.TagDatapath)) ResponseError(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest) return } @@ -246,7 +248,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte bktInfo, err := h.getBucketInfo(ctx, cidParam, log) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logs.TagExternalStorage) return } @@ -271,7 +273,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) { res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual) if err != nil { - log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) + log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage)) return oid.ID{}, fmt.Errorf("could not search for objects: %w", err) } defer res.Close() @@ -282,13 +284,13 @@ func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cn if n == 0 { switch { case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal): - log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName) + log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage)) return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal) case errors.Is(err, io.EOF): - log.Error(logs.ObjectNotFound, zap.Error(err)) + log.Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage)) return oid.ID{}, fmt.Errorf("object not found: %w", err) default: - log.Error(logs.ReadObjectListFailed, zap.Error(err)) + log.Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage)) return oid.ID{}, fmt.Errorf("read object list failed: %w", err) } } @@ -342,7 +344,8 @@ func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log * log.Warn(logs.CouldntPutBucketIntoCache, zap.String("bucket name", bktInfo.Name), zap.Stringer("bucket cid", bktInfo.CID), - zap.Error(err)) + zap.Error(err), + logs.TagField(logs.TagDatapath)) } return bktInfo, nil @@ -386,13 +389,13 @@ func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) { unescapedKey, err := url.QueryUnescape(oidURLParam) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logs.TagExternalStorage) return } bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logs.TagExternalStorage) return } diff --git a/internal/handler/head.go b/internal/handler/head.go index da96eff..181f08a 100644 --- a/internal/handler/head.go +++ b/internal/handler/head.go @@ -67,7 +67,8 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid req.log.Info(logs.CouldntParseCreationDate, zap.String("key", key), zap.String("val", val), - zap.Error(err)) + zap.Error(err), + logs.TagField(logs.TagDatapath)) continue } req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat)) @@ -126,12 +127,12 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) { bktInfo, err := h.getBucketInfo(ctx, cidParam, log) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logs.TagExternalStorage) return } checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo) if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) { - logAndSendBucketError(c, log, checkS3Err) + logAndSendBucketError(c, log, checkS3Err, logs.TagExternalStorageTree) return } @@ -143,7 +144,7 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) { } else if err = objID.DecodeString(oidParam); err == nil { h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject) } else { - logAndSendBucketError(c, log, checkS3Err) + logAndSendBucketError(c, log, checkS3Err, logs.TagExternalStorageTree) return } } diff --git a/internal/handler/multipart.go b/internal/handler/multipart.go index 213286c..d89ef07 100644 --- a/internal/handler/multipart.go +++ b/internal/handler/multipart.go @@ -33,7 +33,7 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF name := part.FormName() if name == "" { - l.Debug(logs.IgnorePartEmptyFormName) + l.Debug(logs.IgnorePartEmptyFormName, logs.TagField(logs.TagDatapath)) continue } @@ -41,8 +41,7 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF // ignore multipart/form-data values if filename == "" { - l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name)) - + l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name), logs.TagField(logs.TagDatapath)) continue } diff --git a/internal/handler/multipart_test.go b/internal/handler/multipart_test.go index 2c50a87..431d0d6 100644 --- a/internal/handler/multipart_test.go +++ b/internal/handler/multipart_test.go @@ -112,7 +112,7 @@ func fetchMultipartFileDefault(l *zap.Logger, r io.Reader, boundary string) (Mul name := part.FormName() if name == "" { - l.Debug(logs.IgnorePartEmptyFormName) + l.Debug(logs.IgnorePartEmptyFormName, logs.TagField(logs.TagDatapath)) continue } @@ -120,8 +120,7 @@ func fetchMultipartFileDefault(l *zap.Logger, r io.Reader, boundary string) (Mul // ignore multipart/form-data values if filename == "" { - l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name)) - + l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name), logs.TagField(logs.TagDatapath)) continue } diff --git a/internal/handler/reader.go b/internal/handler/reader.go index cbd8294..e8ac098 100644 --- a/internal/handler/reader.go +++ b/internal/handler/reader.go @@ -110,7 +110,8 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A if err = req.setTimestamp(val); err != nil { req.log.Error(logs.CouldntParseCreationDate, zap.String("val", val), - zap.Error(err)) + zap.Error(err), + logs.TagField(logs.TagDatapath)) } case object.AttributeContentType: contentType = val @@ -144,7 +145,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A return payload, nil }, filename) if err != nil && err != io.EOF { - req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err)) + req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err), logs.TagField(logs.TagDatapath)) ResponseError(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest) return } diff --git a/internal/handler/upload.go b/internal/handler/upload.go index 9493635..85b779a 100644 --- a/internal/handler/upload.go +++ b/internal/handler/upload.go @@ -59,7 +59,7 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) { bktInfo, err := h.getBucketInfo(ctx, scid, log) if err != nil { - logAndSendBucketError(c, log, err) + logAndSendBucketError(c, log, err, logs.TagExternalStorageTree) return } @@ -74,19 +74,20 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) { zap.Stringer("address", addr), zap.String("filename", file.FileName()), zap.Error(err), + logs.TagField(logs.TagDatapath), ) }() boundary := string(c.Request.Header.MultipartFormBoundary()) if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil { - log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err)) + log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err), logs.TagField(logs.TagDatapath)) ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) return } filtered, err := filterHeaders(log, &c.Request.Header) if err != nil { - log.Error(logs.CouldNotProcessHeaders, zap.Error(err)) + log.Error(logs.CouldNotProcessHeaders, zap.Error(err), logs.TagField(logs.TagDatapath)) ResponseError(c, err.Error(), fasthttp.StatusBadRequest) return } @@ -94,14 +95,15 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) { now := time.Now() if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil { if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil { - log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err)) + log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err), + logs.TagField(logs.TagDatapath)) } else { now = parsed } } if err = utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil { - log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err)) + log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath)) ResponseError(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest) return } @@ -155,7 +157,7 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) { // Try to return the response, otherwise, if something went wrong, throw an error. if err = newPutResponse(addr).encode(c); err != nil { - log.Error(logs.CouldNotEncodeResponse, zap.Error(err)) + log.Error(logs.CouldNotEncodeResponse, zap.Error(err), logs.TagField(logs.TagDatapath)) ResponseError(c, "could not encode response", fasthttp.StatusBadRequest) return @@ -180,6 +182,7 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) { func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) { statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err) logFields := append([]zap.Field{zap.Error(err)}, additionalFields...) + logFields = append(logFields, logs.TagField(logs.TagExternalStorage)) log.Error(logs.CouldNotStoreFileInFrostfs, logFields...) ResponseError(r, msg, statusCode) diff --git a/internal/handler/utils.go b/internal/handler/utils.go index 7fdd396..69638d6 100644 --- a/internal/handler/utils.go +++ b/internal/handler/utils.go @@ -28,6 +28,7 @@ func (r *request) handleFrostFSErr(err error, start time.Time) { logFields := []zap.Field{ zap.Stringer("elapsed", time.Since(start)), zap.Error(err), + logs.TagField(logs.TagExternalStorage), } statusCode, msg, additionalFields := formErrorResponse("could not receive object", err) logFields = append(logFields, additionalFields...) @@ -77,8 +78,8 @@ func isValidValue(s string) bool { return true } -func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) { - log.Error(logs.CouldntGetBucket, zap.Error(err)) +func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error, tag string) { + log.Error(logs.CouldntGetBucket, zap.Error(err), logs.TagField(tag)) if client.IsErrContainerNotFound(err) { ResponseError(c, "Not Found", fasthttp.StatusNotFound) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index f9b13b1..e000beb 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -1,5 +1,21 @@ package logs +import "go.uber.org/zap" + +const ( + TagFieldName = "tag" + + TagApp = "app" + TagConfig = "config" + TagDatapath = "datapath" + TagExternalStorage = "external_storage" + TagExternalStorageTree = "external_storage_tree" +) + +func TagField(tag string) zap.Field { + return zap.String(TagFieldName, tag) +} + const ( CouldntParseCreationDate = "couldn't parse creation date" // Info in ../../downloader/* CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload" // Error in ../../downloader/download.go @@ -45,7 +61,7 @@ const ( SIGHUPConfigReloadStarted = "SIGHUP config reload started" // Info in ../../app.go FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed" // Warn in ../../app.go FailedToReloadConfig = "failed to reload config" // Warn in ../../app.go - LogLevelWontBeUpdated = "log level won't be updated" // Warn in ../../app.go + TagsLogConfigWontBeUpdated = "tags log config won't be updated" // Warn in ../../app.go FailedToUpdateResolvers = "failed to update resolvers" // Warn in ../../app.go FailedToReloadServerParameters = "failed to reload server parameters" // Warn in ../../app.go SIGHUPConfigReloadCompleted = "SIGHUP config reload completed" // Info in ../../app.go diff --git a/internal/net/event_handler.go b/internal/net/event_handler.go index 9520c01..5d775b8 100644 --- a/internal/net/event_handler.go +++ b/internal/net/event_handler.go @@ -17,9 +17,11 @@ func (l LogEventHandler) DialPerformed(sourceIP net.Addr, _, address string, err sourceIPString = sourceIP.Network() + "://" + sourceIP.String() } if err == nil { - l.logger.Debug(logs.MultinetDialSuccess, zap.String("source", sourceIPString), zap.String("destination", address)) + l.logger.Debug(logs.MultinetDialSuccess, zap.String("source", sourceIPString), + zap.String("destination", address), logs.TagField(logs.TagDatapath)) } else { - l.logger.Debug(logs.MultinetDialFail, zap.String("source", sourceIPString), zap.String("destination", address), zap.Error(err)) + l.logger.Debug(logs.MultinetDialFail, zap.String("source", sourceIPString), + zap.String("destination", address), zap.Error(err), logs.TagField(logs.TagDatapath)) } } diff --git a/internal/service/frostfs/source.go b/internal/service/frostfs/source.go index de6c681..84f7b74 100644 --- a/internal/service/frostfs/source.go +++ b/internal/service/frostfs/source.go @@ -40,7 +40,7 @@ func (s *Source) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) { } if err = s.netmapCache.Put(netmapSnapshot); err != nil { - s.log.Warn(logs.CouldntCacheNetmap, zap.Error(err)) + s.log.Warn(logs.CouldntCacheNetmap, zap.Error(err), logs.TagField(logs.TagDatapath)) } return netmapSnapshot, nil diff --git a/metrics/service.go b/metrics/service.go index dea5ac0..e6b803b 100644 --- a/metrics/service.go +++ b/metrics/service.go @@ -25,24 +25,24 @@ type Config struct { // Start runs http service with the exposed endpoint on the configured port. func (ms *Service) Start() { if ms.enabled { - ms.log.Info(logs.ServiceIsRunning, zap.String("endpoint", ms.Addr)) + ms.log.Info(logs.ServiceIsRunning, zap.String("endpoint", ms.Addr), logs.TagField(logs.TagApp)) err := ms.ListenAndServe() if err != nil && err != http.ErrServerClosed { - ms.log.Warn(logs.ServiceCouldntStartOnConfiguredPort) + ms.log.Warn(logs.ServiceCouldntStartOnConfiguredPort, logs.TagField(logs.TagApp)) } } else { - ms.log.Info(logs.ServiceHasntStartedSinceItsDisabled) + ms.log.Info(logs.ServiceHasntStartedSinceItsDisabled, logs.TagField(logs.TagApp)) } } // ShutDown stops the service. func (ms *Service) ShutDown(ctx context.Context) { - ms.log.Info(logs.ShuttingDownService, zap.String("endpoint", ms.Addr)) + ms.log.Info(logs.ShuttingDownService, zap.String("endpoint", ms.Addr), logs.TagField(logs.TagApp)) err := ms.Shutdown(ctx) if err != nil { - ms.log.Error(logs.CantGracefullyShutDownService, zap.Error(err)) + ms.log.Error(logs.CantGracefullyShutDownService, zap.Error(err), logs.TagField(logs.TagApp)) if err = ms.Close(); err != nil { - ms.log.Panic(logs.CantShutDownService, zap.Error(err)) + ms.log.Panic(logs.CantShutDownService, zap.Error(err), logs.TagField(logs.TagApp)) } } }