[#195] Add tags support
All checks were successful
/ DCO (pull_request) Successful in 37s
/ Vulncheck (pull_request) Successful in 51s
/ Builds (pull_request) Successful in 1m5s
/ OCI image (pull_request) Successful in 1m29s
/ Lint (pull_request) Successful in 1m51s
/ Tests (pull_request) Successful in 52s
All checks were successful
/ DCO (pull_request) Successful in 37s
/ Vulncheck (pull_request) Successful in 51s
/ Builds (pull_request) Successful in 1m5s
/ OCI image (pull_request) Successful in 1m29s
/ Lint (pull_request) Successful in 1m51s
/ Tests (pull_request) Successful in 52s
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
parent
4b782cf124
commit
a51358bb16
22 changed files with 577 additions and 311 deletions
|
@ -44,6 +44,7 @@ import (
|
|||
"github.com/valyala/fasthttp"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
|
@ -51,7 +52,6 @@ type (
|
|||
app struct {
|
||||
ctx context.Context
|
||||
log *zap.Logger
|
||||
logLevel zap.AtomicLevel
|
||||
pool *pool.Pool
|
||||
treePool *treepool.Pool
|
||||
key *keys.PrivateKey
|
||||
|
@ -96,6 +96,7 @@ type (
|
|||
workerPoolSize int
|
||||
|
||||
mu sync.RWMutex
|
||||
tagsConfig *tagsConfig
|
||||
defaultTimestamp bool
|
||||
zipCompression bool
|
||||
clientCut bool
|
||||
|
@ -113,6 +114,10 @@ type (
|
|||
enableFilepathFallback bool
|
||||
}
|
||||
|
||||
tagsConfig struct {
|
||||
tagLogs sync.Map
|
||||
}
|
||||
|
||||
CORS struct {
|
||||
AllowOrigin string
|
||||
AllowMethods []string
|
||||
|
@ -123,9 +128,56 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func newTagsConfig(v *viper.Viper) *tagsConfig {
|
||||
var t tagsConfig
|
||||
if err := t.update(v); err != nil {
|
||||
// panic here is analogue of the similar panic during common log level initialization.
|
||||
panic(err.Error())
|
||||
}
|
||||
return &t
|
||||
}
|
||||
|
||||
func (t *tagsConfig) LevelEnabled(tag string, tgtLevel zapcore.Level) bool {
|
||||
lvl, ok := t.tagLogs.Load(tag)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return lvl.(zapcore.Level).Enabled(tgtLevel)
|
||||
}
|
||||
|
||||
func (t *tagsConfig) update(cfg *viper.Viper) error {
|
||||
tags, err := fetchLogTagsConfig(cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.tagLogs.Range(func(key, value any) bool {
|
||||
k := key.(string)
|
||||
v := value.(zapcore.Level)
|
||||
|
||||
if lvl, ok := tags[k]; ok {
|
||||
if lvl != v {
|
||||
t.tagLogs.Store(key, lvl)
|
||||
}
|
||||
} else {
|
||||
t.tagLogs.Delete(key)
|
||||
delete(tags, k)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
for k, v := range tags {
|
||||
t.tagLogs.Store(k, v)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newApp(ctx context.Context, v *viper.Viper) App {
|
||||
logSettings := &loggerSettings{}
|
||||
log := pickLogger(v, logSettings)
|
||||
tagConfig := newTagsConfig(v)
|
||||
log := pickLogger(v, logSettings, tagConfig)
|
||||
|
||||
a := &app{
|
||||
ctx: ctx,
|
||||
|
@ -137,7 +189,7 @@ func newApp(ctx context.Context, v *viper.Viper) App {
|
|||
bucketCache: cache.NewBucketCache(getBucketCacheOptions(v, log.logger), v.GetBool(cfgFeaturesTreePoolNetmapSupport)),
|
||||
}
|
||||
|
||||
a.initAppSettings()
|
||||
a.initAppSettings(tagConfig)
|
||||
|
||||
// -- setup FastHTTP server --
|
||||
a.webServer.Name = "frost-http-gw"
|
||||
|
@ -167,11 +219,12 @@ func newApp(ctx context.Context, v *viper.Viper) App {
|
|||
return a
|
||||
}
|
||||
|
||||
func (a *app) initAppSettings() {
|
||||
func (a *app) initAppSettings(tc *tagsConfig) {
|
||||
a.settings = &appSettings{
|
||||
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||
dialerSource: getDialerSource(a.log, a.cfg),
|
||||
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
|
||||
tagsConfig: tc,
|
||||
}
|
||||
a.settings.update(a.cfg, a.log)
|
||||
}
|
||||
|
@ -319,7 +372,7 @@ func (a *app) initResolver() {
|
|||
var err error
|
||||
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -333,11 +386,12 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
|||
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
||||
if resolveCfg.RPCAddress == "" {
|
||||
order = remove(order, resolver.NNSResolver)
|
||||
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
|
||||
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided, logs.TagField(logs.TagConfig))
|
||||
}
|
||||
|
||||
if len(order) == 0 {
|
||||
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty)
|
||||
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty,
|
||||
logs.TagField(logs.TagConfig))
|
||||
}
|
||||
|
||||
return order, resolveCfg
|
||||
|
@ -352,7 +406,7 @@ func (a *app) initMetrics() {
|
|||
|
||||
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
|
||||
if !enabled {
|
||||
logger.Warn(logs.MetricsAreDisabled)
|
||||
logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagConfig))
|
||||
}
|
||||
return &gateMetrics{
|
||||
logger: logger,
|
||||
|
@ -370,7 +424,7 @@ func (m *gateMetrics) isEnabled() bool {
|
|||
|
||||
func (m *gateMetrics) SetEnabled(enabled bool) {
|
||||
if !enabled {
|
||||
m.logger.Warn(logs.MetricsAreDisabled)
|
||||
m.logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagConfig))
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
|
@ -433,7 +487,7 @@ func getFrostFSKey(cfg *viper.Viper, log *zap.Logger) (*keys.PrivateKey, error)
|
|||
walletPath := cfg.GetString(cfgWalletPath)
|
||||
|
||||
if len(walletPath) == 0 {
|
||||
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun)
|
||||
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun, logs.TagField(logs.TagConfig))
|
||||
key, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -490,7 +544,10 @@ func getKeyFromWallet(w *wallet.Wallet, addrStr string, password *string) (*keys
|
|||
}
|
||||
|
||||
func (a *app) Wait() {
|
||||
a.log.Info(logs.StartingApplication, zap.String("app_name", "frostfs-http-gw"), zap.String("version", Version))
|
||||
a.log.Info(logs.StartingApplication,
|
||||
zap.String("app_name", "frostfs-http-gw"),
|
||||
zap.String("version", Version),
|
||||
logs.TagField(logs.TagApp))
|
||||
|
||||
a.metrics.SetVersion(Version)
|
||||
a.setHealthStatus()
|
||||
|
@ -521,10 +578,10 @@ func (a *app) Serve() {
|
|||
|
||||
for i := range servs {
|
||||
go func(i int) {
|
||||
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
|
||||
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()), logs.TagField(logs.TagApp))
|
||||
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
|
||||
a.metrics.MarkUnhealthy(servs[i].Address())
|
||||
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
||||
a.log.Fatal(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
@ -546,7 +603,7 @@ LOOP:
|
|||
}
|
||||
}
|
||||
|
||||
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()))
|
||||
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()), logs.TagField(logs.TagApp))
|
||||
|
||||
a.metrics.Shutdown()
|
||||
a.stopServices()
|
||||
|
@ -556,7 +613,7 @@ LOOP:
|
|||
func (a *app) initWorkerPool() *ants.Pool {
|
||||
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
return workerPool
|
||||
}
|
||||
|
@ -567,37 +624,37 @@ func (a *app) shutdownTracing() {
|
|||
defer cancel()
|
||||
|
||||
if err := tracing.Shutdown(shdnCtx); err != nil {
|
||||
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) configReload(ctx context.Context) {
|
||||
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
||||
log := a.log.With(logs.TagField(logs.TagConfig))
|
||||
|
||||
log.Info(logs.SIGHUPConfigReloadStarted)
|
||||
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
||||
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
||||
log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
||||
return
|
||||
}
|
||||
if err := readInConfig(a.cfg); err != nil {
|
||||
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
||||
log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if lvl, err := getLogLevel(a.cfg); err != nil {
|
||||
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
||||
} else {
|
||||
a.logLevel.SetLevel(lvl)
|
||||
if err := a.settings.tagsConfig.update(a.cfg); err != nil {
|
||||
log.Warn(logs.TagsLogConfigWontBeUpdated, zap.Error(err))
|
||||
}
|
||||
|
||||
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
|
||||
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||
log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||
}
|
||||
|
||||
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
|
||||
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
|
||||
log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
|
||||
}
|
||||
|
||||
if err := a.updateServers(); err != nil {
|
||||
a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
|
||||
log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
|
||||
}
|
||||
|
||||
a.setRuntimeParameters()
|
||||
|
@ -611,7 +668,7 @@ func (a *app) configReload(ctx context.Context) {
|
|||
a.initTracing(ctx)
|
||||
a.setHealthStatus()
|
||||
|
||||
a.log.Info(logs.SIGHUPConfigReloadCompleted)
|
||||
log.Info(logs.SIGHUPConfigReloadCompleted)
|
||||
}
|
||||
|
||||
func (a *app) startServices() {
|
||||
|
@ -647,18 +704,18 @@ func (a *app) configureRouter(h *handler.Handler) {
|
|||
|
||||
r.POST("/upload/{cid}", a.addMiddlewares(h.Upload))
|
||||
r.OPTIONS("/upload/{cid}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathUploadCid)
|
||||
a.log.Info(logs.AddedPathUploadCid, logs.TagField(logs.TagApp))
|
||||
r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(h.DownloadByAddressOrBucketName))
|
||||
r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(h.HeadByAddressOrBucketName))
|
||||
r.OPTIONS("/get/{cid}/{oid:*}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathGetCidOid)
|
||||
a.log.Info(logs.AddedPathGetCidOid, logs.TagField(logs.TagApp))
|
||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.DownloadByAttribute))
|
||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute))
|
||||
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal, logs.TagField(logs.TagApp))
|
||||
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZipped))
|
||||
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathZipCidPrefix)
|
||||
a.log.Info(logs.AddedPathZipCidPrefix, logs.TagField(logs.TagApp))
|
||||
|
||||
a.webServer.Handler = r.Handler
|
||||
}
|
||||
|
@ -752,6 +809,7 @@ func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
|||
zap.ByteString("method", req.Method()),
|
||||
zap.ByteString("path", req.Path()),
|
||||
zap.ByteString("query", req.QueryArgs().QueryString()),
|
||||
logs.TagField(logs.TagDatapath),
|
||||
}
|
||||
|
||||
log.Info(logs.Request, fields...)
|
||||
|
@ -798,7 +856,7 @@ func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
|||
if err != nil {
|
||||
log := utils.GetReqLogOrDefault(reqCtx, a.log)
|
||||
|
||||
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err))
|
||||
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
handler.ResponseError(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
@ -852,6 +910,7 @@ func (a *app) initServers(ctx context.Context) {
|
|||
fields := []zap.Field{
|
||||
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||
logs.TagField(logs.TagApp),
|
||||
}
|
||||
srv, err := newServer(ctx, serverInfo)
|
||||
if err != nil {
|
||||
|
@ -867,7 +926,7 @@ func (a *app) initServers(ctx context.Context) {
|
|||
}
|
||||
|
||||
if len(a.servers) == 0 {
|
||||
a.log.Fatal(logs.NoHealthyServers)
|
||||
a.log.Fatal(logs.NoHealthyServers, logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -941,13 +1000,14 @@ func (a *app) initTracing(ctx context.Context) {
|
|||
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" {
|
||||
caBytes, err := os.ReadFile(trustedCa)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
return
|
||||
}
|
||||
certPool := x509.NewCertPool()
|
||||
ok := certPool.AppendCertsFromPEM(caBytes)
|
||||
if !ok {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"))
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"),
|
||||
logs.TagField(logs.TagApp))
|
||||
return
|
||||
}
|
||||
cfg.ServerCaCertPool = certPool
|
||||
|
@ -955,24 +1015,24 @@ func (a *app) initTracing(ctx context.Context) {
|
|||
|
||||
attributes, err := fetchTracingAttributes(a.cfg)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagConfig))
|
||||
return
|
||||
}
|
||||
cfg.Attributes = attributes
|
||||
|
||||
updated, err := tracing.Setup(ctx, cfg)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
if updated {
|
||||
a.log.Info(logs.TracingConfigUpdated)
|
||||
a.log.Info(logs.TracingConfigUpdated, logs.TagField(logs.TagConfig))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) setRuntimeParameters() {
|
||||
if len(os.Getenv("GOMEMLIMIT")) != 0 {
|
||||
// default limit < yaml limit < app env limit < GOMEMLIMIT
|
||||
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT)
|
||||
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT, logs.TagField(logs.TagApp))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -981,7 +1041,8 @@ func (a *app) setRuntimeParameters() {
|
|||
if softMemoryLimit != previous {
|
||||
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
||||
zap.Int64("new_value", softMemoryLimit),
|
||||
zap.Int64("old_value", previous))
|
||||
zap.Int64("old_value", previous),
|
||||
logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1007,28 +1068,29 @@ func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
|
|||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
a.log.Info(logs.ServerReconnecting)
|
||||
a.log.Info(logs.ServerReconnecting, logs.TagField(logs.TagApp))
|
||||
var failedServers []ServerInfo
|
||||
|
||||
for _, serverInfo := range a.unbindServers {
|
||||
fields := []zap.Field{
|
||||
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||
logs.TagField(logs.TagApp),
|
||||
}
|
||||
|
||||
srv, err := newServer(ctx, serverInfo)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
|
||||
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
failedServers = append(failedServers, serverInfo)
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
continue
|
||||
}
|
||||
|
||||
go func() {
|
||||
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
|
||||
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()), logs.TagField(logs.TagApp))
|
||||
a.metrics.MarkHealthy(serverInfo.Address)
|
||||
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
a.log.Warn(logs.ListenAndServe, zap.Error(err))
|
||||
a.log.Warn(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
}
|
||||
}()
|
||||
|
|
182
cmd/http-gw/logger.go
Normal file
182
cmd/http-gw/logger.go
Normal file
|
@ -0,0 +1,182 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/zapjournald"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/ssgreg/journald"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
||||
var lvl zapcore.Level
|
||||
lvlStr := v.GetString(cfgLoggerLevel)
|
||||
err := lvl.UnmarshalText([]byte(lvlStr))
|
||||
if err != nil {
|
||||
return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+
|
||||
"value should be one of %v", lvlStr, err, [...]zapcore.Level{
|
||||
zapcore.DebugLevel,
|
||||
zapcore.InfoLevel,
|
||||
zapcore.WarnLevel,
|
||||
zapcore.ErrorLevel,
|
||||
zapcore.DPanicLevel,
|
||||
zapcore.PanicLevel,
|
||||
zapcore.FatalLevel,
|
||||
})
|
||||
}
|
||||
return lvl, nil
|
||||
}
|
||||
|
||||
var _ zapcore.Core = (*zapCoreTagFilterWrapper)(nil)
|
||||
|
||||
type zapCoreTagFilterWrapper struct {
|
||||
core zapcore.Core
|
||||
settings TagFilterSettings
|
||||
extra []zap.Field
|
||||
}
|
||||
|
||||
type TagFilterSettings interface {
|
||||
LevelEnabled(tag string, lvl zapcore.Level) bool
|
||||
}
|
||||
|
||||
func (c *zapCoreTagFilterWrapper) Enabled(level zapcore.Level) bool {
|
||||
return c.core.Enabled(level)
|
||||
}
|
||||
|
||||
func (c *zapCoreTagFilterWrapper) With(fields []zapcore.Field) zapcore.Core {
|
||||
return &zapCoreTagFilterWrapper{
|
||||
core: c.core.With(fields),
|
||||
settings: c.settings,
|
||||
extra: append(c.extra, fields...),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *zapCoreTagFilterWrapper) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
|
||||
if c.core.Enabled(entry.Level) {
|
||||
return checked.AddCore(entry, c)
|
||||
}
|
||||
return checked
|
||||
}
|
||||
|
||||
func (c *zapCoreTagFilterWrapper) Write(entry zapcore.Entry, fields []zapcore.Field) error {
|
||||
if c.shouldSkip(entry, fields) || c.shouldSkip(entry, c.extra) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return c.core.Write(entry, fields)
|
||||
}
|
||||
|
||||
func (c *zapCoreTagFilterWrapper) shouldSkip(entry zapcore.Entry, fields []zap.Field) bool {
|
||||
for _, field := range fields {
|
||||
if field.Key == logs.TagFieldName && field.Type == zapcore.StringType {
|
||||
if !c.settings.LevelEnabled(field.String, entry.Level) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *zapCoreTagFilterWrapper) Sync() error {
|
||||
return c.core.Sync()
|
||||
}
|
||||
|
||||
func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, loggerSettings LoggerAppSettings, tagSetting TagFilterSettings) zapcore.Core {
|
||||
core = &zapCoreTagFilterWrapper{
|
||||
core: core,
|
||||
settings: tagSetting,
|
||||
}
|
||||
|
||||
if v.GetBool(cfgLoggerSamplingEnabled) {
|
||||
core = zapcore.NewSamplerWithOptions(core,
|
||||
v.GetDuration(cfgLoggerSamplingInterval),
|
||||
v.GetInt(cfgLoggerSamplingInitial),
|
||||
v.GetInt(cfgLoggerSamplingThereafter),
|
||||
zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
|
||||
if dec&zapcore.LogDropped > 0 {
|
||||
loggerSettings.DroppedLogsInc()
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
return core
|
||||
}
|
||||
|
||||
func newLogEncoder() zapcore.Encoder {
|
||||
c := zap.NewProductionEncoderConfig()
|
||||
c.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
|
||||
return zapcore.NewConsoleEncoder(c)
|
||||
}
|
||||
|
||||
// newStdoutLogger constructs a zap.Logger instance for current application.
|
||||
// Panics on failure.
|
||||
//
|
||||
// Logger is built from zap's production logging configuration with:
|
||||
// - parameterized level (debug by default)
|
||||
// - console encoding
|
||||
// - ISO8601 time encoding
|
||||
//
|
||||
// Logger records a stack trace for all messages at or above fatal level.
|
||||
//
|
||||
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
||||
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, loggerSettings LoggerAppSettings, tagSetting TagFilterSettings) *Logger {
|
||||
stdout := zapcore.AddSync(os.Stderr)
|
||||
level := zap.NewAtomicLevelAt(lvl)
|
||||
|
||||
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
|
||||
consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, loggerSettings, tagSetting)
|
||||
|
||||
return &Logger{
|
||||
logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
||||
lvl: level,
|
||||
}
|
||||
}
|
||||
|
||||
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, loggerSettings LoggerAppSettings, tagSetting TagFilterSettings) *Logger {
|
||||
level := zap.NewAtomicLevelAt(lvl)
|
||||
|
||||
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
|
||||
|
||||
core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
||||
coreWithContext := core.With([]zapcore.Field{
|
||||
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
||||
zapjournald.SyslogIdentifier(),
|
||||
zapjournald.SyslogPid(),
|
||||
})
|
||||
|
||||
coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, loggerSettings, tagSetting)
|
||||
|
||||
return &Logger{
|
||||
logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
||||
lvl: level,
|
||||
}
|
||||
}
|
||||
|
||||
type LoggerAppSettings interface {
|
||||
DroppedLogsInc()
|
||||
}
|
||||
|
||||
func pickLogger(v *viper.Viper, loggerSettings LoggerAppSettings, tagSettings TagFilterSettings) *Logger {
|
||||
lvl, err := getLogLevel(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
dest := v.GetString(cfgLoggerDestination)
|
||||
|
||||
switch dest {
|
||||
case destinationStdout:
|
||||
return newStdoutLogger(v, lvl, loggerSettings, tagSettings)
|
||||
case destinationJournald:
|
||||
return newJournaldLogger(v, lvl, loggerSettings, tagSettings)
|
||||
default:
|
||||
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
||||
}
|
||||
}
|
|
@ -22,10 +22,8 @@ import (
|
|||
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
||||
"git.frostfs.info/TrueCloudLab/zapjournald"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/ssgreg/journald"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
|
@ -110,6 +108,11 @@ const (
|
|||
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
|
||||
cfgLoggerSamplingInterval = "logger.sampling.interval"
|
||||
|
||||
cfgLoggerTags = "logger.tags"
|
||||
cfgLoggerTagsPrefixTmpl = cfgLoggerTags + ".%d."
|
||||
cfgLoggerTagsNameTmpl = cfgLoggerTagsPrefixTmpl + "name"
|
||||
cfgLoggerTagsLevelTmpl = cfgLoggerTagsPrefixTmpl + "level"
|
||||
|
||||
// Wallet.
|
||||
cfgWalletPassphrase = "wallet.passphrase"
|
||||
cfgWalletPath = "wallet.path"
|
||||
|
@ -187,6 +190,8 @@ var ignore = map[string]struct{}{
|
|||
cmdVersion: {},
|
||||
}
|
||||
|
||||
var defaultTags = []string{logs.TagApp, logs.TagConfig, logs.TagDatapath}
|
||||
|
||||
type Logger struct {
|
||||
logger *zap.Logger
|
||||
lvl zap.AtomicLevel
|
||||
|
@ -428,112 +433,39 @@ func mergeConfig(v *viper.Viper, fileName string) error {
|
|||
return v.MergeConfig(cfgFile)
|
||||
}
|
||||
|
||||
type LoggerAppSettings interface {
|
||||
DroppedLogsInc()
|
||||
}
|
||||
func fetchLogTagsConfig(v *viper.Viper) (map[string]zapcore.Level, error) {
|
||||
res := make(map[string]zapcore.Level)
|
||||
|
||||
func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger {
|
||||
lvl, err := getLogLevel(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
defaultLevel := v.GetString(cfgLoggerLevel)
|
||||
var defaultLvl zapcore.Level
|
||||
if err := defaultLvl.Set(defaultLevel); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse log level, unknown level: '%s'", defaultLevel)
|
||||
}
|
||||
|
||||
dest := v.GetString(cfgLoggerDestination)
|
||||
for i := 0; ; i++ {
|
||||
name := v.GetString(fmt.Sprintf(cfgLoggerTagsNameTmpl, i))
|
||||
if name == "" {
|
||||
break
|
||||
}
|
||||
|
||||
switch dest {
|
||||
case destinationStdout:
|
||||
return newStdoutLogger(v, lvl, settings)
|
||||
case destinationJournald:
|
||||
return newJournaldLogger(v, lvl, settings)
|
||||
default:
|
||||
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
||||
}
|
||||
}
|
||||
lvl := defaultLvl
|
||||
level := v.GetString(fmt.Sprintf(cfgLoggerTagsLevelTmpl, i))
|
||||
if level != "" {
|
||||
if err := lvl.Set(level); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse log tags config, unknown level: '%s'", level)
|
||||
}
|
||||
}
|
||||
|
||||
// newStdoutLogger constructs a zap.Logger instance for current application.
|
||||
// Panics on failure.
|
||||
//
|
||||
// Logger is built from zap's production logging configuration with:
|
||||
// - parameterized level (debug by default)
|
||||
// - console encoding
|
||||
// - ISO8601 time encoding
|
||||
//
|
||||
// Logger records a stack trace for all messages at or above fatal level.
|
||||
//
|
||||
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
||||
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
|
||||
stdout := zapcore.AddSync(os.Stderr)
|
||||
level := zap.NewAtomicLevelAt(lvl)
|
||||
|
||||
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
|
||||
consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings)
|
||||
|
||||
return &Logger{
|
||||
logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
||||
lvl: level,
|
||||
}
|
||||
}
|
||||
|
||||
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
|
||||
level := zap.NewAtomicLevelAt(lvl)
|
||||
|
||||
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
|
||||
|
||||
core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
||||
coreWithContext := core.With([]zapcore.Field{
|
||||
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
||||
zapjournald.SyslogIdentifier(),
|
||||
zapjournald.SyslogPid(),
|
||||
})
|
||||
|
||||
coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings)
|
||||
|
||||
return &Logger{
|
||||
logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
||||
lvl: level,
|
||||
}
|
||||
}
|
||||
|
||||
func newLogEncoder() zapcore.Encoder {
|
||||
c := zap.NewProductionEncoderConfig()
|
||||
c.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
|
||||
return zapcore.NewConsoleEncoder(c)
|
||||
}
|
||||
|
||||
func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core {
|
||||
if v.GetBool(cfgLoggerSamplingEnabled) {
|
||||
core = zapcore.NewSamplerWithOptions(core,
|
||||
v.GetDuration(cfgLoggerSamplingInterval),
|
||||
v.GetInt(cfgLoggerSamplingInitial),
|
||||
v.GetInt(cfgLoggerSamplingThereafter),
|
||||
zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
|
||||
if dec&zapcore.LogDropped > 0 {
|
||||
settings.DroppedLogsInc()
|
||||
}
|
||||
}))
|
||||
res[name] = lvl
|
||||
}
|
||||
|
||||
return core
|
||||
}
|
||||
|
||||
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
||||
var lvl zapcore.Level
|
||||
lvlStr := v.GetString(cfgLoggerLevel)
|
||||
err := lvl.UnmarshalText([]byte(lvlStr))
|
||||
if err != nil {
|
||||
return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+
|
||||
"value should be one of %v", lvlStr, err, [...]zapcore.Level{
|
||||
zapcore.DebugLevel,
|
||||
zapcore.InfoLevel,
|
||||
zapcore.WarnLevel,
|
||||
zapcore.ErrorLevel,
|
||||
zapcore.DPanicLevel,
|
||||
zapcore.PanicLevel,
|
||||
zapcore.FatalLevel,
|
||||
})
|
||||
if len(res) == 0 && !v.IsSet(cfgLoggerTags) {
|
||||
for _, tag := range defaultTags {
|
||||
res[tag] = defaultLvl
|
||||
}
|
||||
}
|
||||
return lvl, nil
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
||||
|
@ -552,17 +484,17 @@ func fetchIndexPageTemplate(v *viper.Viper, l *zap.Logger) (string, bool) {
|
|||
|
||||
reader, err := os.Open(v.GetString(cfgIndexPageTemplatePath))
|
||||
if err != nil {
|
||||
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
|
||||
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
return "", true
|
||||
}
|
||||
|
||||
tmpl, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
|
||||
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
return "", true
|
||||
}
|
||||
|
||||
l.Info(logs.SetCustomIndexPageTemplate)
|
||||
l.Info(logs.SetCustomIndexPageTemplate, logs.TagField(logs.TagApp))
|
||||
return string(tmpl), true
|
||||
}
|
||||
|
||||
|
@ -603,7 +535,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
|||
}
|
||||
|
||||
if _, ok := seen[serverInfo.Address]; ok {
|
||||
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
|
||||
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address), logs.TagField(logs.TagConfig))
|
||||
continue
|
||||
}
|
||||
seen[serverInfo.Address] = struct{}{}
|
||||
|
@ -616,7 +548,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
|||
func (a *app) initPools(ctx context.Context) {
|
||||
key, err := getFrostFSKey(a.cfg, a.log)
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
||||
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
var prm pool.InitParameters
|
||||
|
@ -624,7 +556,8 @@ func (a *app) initPools(ctx context.Context) {
|
|||
|
||||
prm.SetKey(&key.PrivateKey)
|
||||
prmTree.SetKey(key)
|
||||
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
||||
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())),
|
||||
logs.TagField(logs.TagApp))
|
||||
|
||||
for _, peer := range fetchPeers(a.log, a.cfg) {
|
||||
prm.AddNode(peer)
|
||||
|
@ -679,11 +612,11 @@ func (a *app) initPools(ctx context.Context) {
|
|||
|
||||
p, err := pool.NewPool(prm)
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
if err = p.Dial(ctx); err != nil {
|
||||
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
if a.cfg.GetBool(cfgFeaturesTreePoolNetmapSupport) {
|
||||
|
@ -692,10 +625,10 @@ func (a *app) initPools(ctx context.Context) {
|
|||
|
||||
treePool, err := treepool.NewPool(prmTree)
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToCreateTreePool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
if err = treePool.Dial(ctx); err != nil {
|
||||
a.log.Fatal(logs.FailedToDialTreePool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToDialTreePool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
a.pool = p
|
||||
|
@ -726,7 +659,8 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
|
|||
l.Info(logs.AddedStoragePeer,
|
||||
zap.Int("priority", priority),
|
||||
zap.String("address", address),
|
||||
zap.Float64("weight", weight))
|
||||
zap.Float64("weight", weight),
|
||||
logs.TagField(logs.TagConfig))
|
||||
}
|
||||
|
||||
return nodes
|
||||
|
@ -765,7 +699,8 @@ func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultV
|
|||
l.Error(logs.InvalidLifetimeUsingDefaultValue,
|
||||
zap.String("parameter", cfgEntry),
|
||||
zap.Duration("value in config", lifetime),
|
||||
zap.Duration("default", defaultValue))
|
||||
zap.Duration("default", defaultValue),
|
||||
logs.TagField(logs.TagConfig))
|
||||
} else {
|
||||
return lifetime
|
||||
}
|
||||
|
@ -781,7 +716,8 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue
|
|||
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
|
||||
zap.String("parameter", cfgEntry),
|
||||
zap.Int("value in config", size),
|
||||
zap.Int("default", defaultValue))
|
||||
zap.Int("default", defaultValue),
|
||||
logs.TagField(logs.TagConfig))
|
||||
} else {
|
||||
return size
|
||||
}
|
||||
|
@ -793,7 +729,7 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue
|
|||
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
|
||||
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger))
|
||||
if err != nil {
|
||||
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err))
|
||||
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err), logs.TagField(logs.TagConfig))
|
||||
}
|
||||
return source
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue