[#195] Add tags support
All checks were successful
/ DCO (pull_request) Successful in 31s
/ Vulncheck (pull_request) Successful in 46s
/ Builds (pull_request) Successful in 1m7s
/ OCI image (pull_request) Successful in 1m26s
/ Lint (pull_request) Successful in 2m14s
/ Tests (pull_request) Successful in 54s
/ Integration tests (pull_request) Successful in 5m41s
All checks were successful
/ DCO (pull_request) Successful in 31s
/ Vulncheck (pull_request) Successful in 46s
/ Builds (pull_request) Successful in 1m7s
/ OCI image (pull_request) Successful in 1m26s
/ Lint (pull_request) Successful in 2m14s
/ Tests (pull_request) Successful in 54s
/ Integration tests (pull_request) Successful in 5m41s
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
parent
36bd3e2d43
commit
d97dd52939
22 changed files with 597 additions and 303 deletions
|
@ -44,6 +44,7 @@ import (
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -51,7 +52,6 @@ type (
|
||||||
app struct {
|
app struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
logLevel zap.AtomicLevel
|
|
||||||
pool *pool.Pool
|
pool *pool.Pool
|
||||||
treePool *treepool.Pool
|
treePool *treepool.Pool
|
||||||
key *keys.PrivateKey
|
key *keys.PrivateKey
|
||||||
|
@ -94,6 +94,7 @@ type (
|
||||||
reconnectInterval time.Duration
|
reconnectInterval time.Duration
|
||||||
dialerSource *internalnet.DialerSource
|
dialerSource *internalnet.DialerSource
|
||||||
workerPoolSize int
|
workerPoolSize int
|
||||||
|
logLevelConfig *logLevelConfig
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
defaultTimestamp bool
|
defaultTimestamp bool
|
||||||
|
@ -113,6 +114,15 @@ type (
|
||||||
enableFilepathFallback bool
|
enableFilepathFallback bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tagsConfig struct {
|
||||||
|
tagLogs sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
logLevelConfig struct {
|
||||||
|
logLevel zap.AtomicLevel
|
||||||
|
tagsConfig *tagsConfig
|
||||||
|
}
|
||||||
|
|
||||||
CORS struct {
|
CORS struct {
|
||||||
AllowOrigin string
|
AllowOrigin string
|
||||||
AllowMethods []string
|
AllowMethods []string
|
||||||
|
@ -123,9 +133,87 @@ type (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func newLogLevel(v *viper.Viper) zap.AtomicLevel {
|
||||||
|
ll, err := getLogLevel(v)
|
||||||
|
if err != nil {
|
||||||
|
panic(err.Error())
|
||||||
|
}
|
||||||
|
atomicLogLevel := zap.NewAtomicLevel()
|
||||||
|
atomicLogLevel.SetLevel(ll)
|
||||||
|
return atomicLogLevel
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTagsConfig(v *viper.Viper, ll zapcore.Level) *tagsConfig {
|
||||||
|
var t tagsConfig
|
||||||
|
if err := t.update(v, ll); err != nil {
|
||||||
|
// panic here is analogue of the similar panic during common log level initialization.
|
||||||
|
panic(err.Error())
|
||||||
|
}
|
||||||
|
return &t
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLogLevelConfig(lvl zap.AtomicLevel, tagsConfig *tagsConfig) *logLevelConfig {
|
||||||
|
return &logLevelConfig{
|
||||||
|
logLevel: lvl,
|
||||||
|
tagsConfig: tagsConfig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *logLevelConfig) update(cfg *viper.Viper, log *zap.Logger) {
|
||||||
|
if lvl, err := getLogLevel(cfg); err != nil {
|
||||||
|
log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err), logs.TagField(logs.TagConfig))
|
||||||
|
} else {
|
||||||
|
l.logLevel.SetLevel(lvl)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.tagsConfig.update(cfg, l.logLevel.Level()); err != nil {
|
||||||
|
log.Warn(logs.TagsLogConfigWontBeUpdated, zap.Error(err), logs.TagField(logs.TagConfig))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tagsConfig) LevelEnabled(tag string, tgtLevel zapcore.Level) bool {
|
||||||
|
lvl, ok := t.tagLogs.Load(tag)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return lvl.(zapcore.Level).Enabled(tgtLevel)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *tagsConfig) update(cfg *viper.Viper, ll zapcore.Level) error {
|
||||||
|
tags, err := fetchLogTagsConfig(cfg, ll)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.tagLogs.Range(func(key, value any) bool {
|
||||||
|
k := key.(string)
|
||||||
|
v := value.(zapcore.Level)
|
||||||
|
|
||||||
|
if lvl, ok := tags[k]; ok {
|
||||||
|
if lvl != v {
|
||||||
|
t.tagLogs.Store(key, lvl)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
t.tagLogs.Delete(key)
|
||||||
|
delete(tags, k)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
for k, v := range tags {
|
||||||
|
t.tagLogs.Store(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func newApp(ctx context.Context, v *viper.Viper) App {
|
func newApp(ctx context.Context, v *viper.Viper) App {
|
||||||
logSettings := &loggerSettings{}
|
logSettings := &loggerSettings{}
|
||||||
log := pickLogger(v, logSettings)
|
logLevel := newLogLevel(v)
|
||||||
|
tagConfig := newTagsConfig(v, logLevel.Level())
|
||||||
|
logConfig := newLogLevelConfig(logLevel, tagConfig)
|
||||||
|
log := pickLogger(v, logConfig.logLevel, logSettings, tagConfig)
|
||||||
|
|
||||||
a := &app{
|
a := &app{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
|
@ -137,7 +225,7 @@ func newApp(ctx context.Context, v *viper.Viper) App {
|
||||||
bucketCache: cache.NewBucketCache(getBucketCacheOptions(v, log.logger), v.GetBool(cfgFeaturesTreePoolNetmapSupport)),
|
bucketCache: cache.NewBucketCache(getBucketCacheOptions(v, log.logger), v.GetBool(cfgFeaturesTreePoolNetmapSupport)),
|
||||||
}
|
}
|
||||||
|
|
||||||
a.initAppSettings()
|
a.initAppSettings(logConfig)
|
||||||
|
|
||||||
// -- setup FastHTTP server --
|
// -- setup FastHTTP server --
|
||||||
a.webServer.Name = "frost-http-gw"
|
a.webServer.Name = "frost-http-gw"
|
||||||
|
@ -167,11 +255,12 @@ func newApp(ctx context.Context, v *viper.Viper) App {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initAppSettings() {
|
func (a *app) initAppSettings(lc *logLevelConfig) {
|
||||||
a.settings = &appSettings{
|
a.settings = &appSettings{
|
||||||
reconnectInterval: fetchReconnectInterval(a.cfg),
|
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||||
dialerSource: getDialerSource(a.log, a.cfg),
|
dialerSource: getDialerSource(a.log, a.cfg),
|
||||||
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
|
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
|
||||||
|
logLevelConfig: lc,
|
||||||
}
|
}
|
||||||
a.settings.update(a.cfg, a.log)
|
a.settings.update(a.cfg, a.log)
|
||||||
}
|
}
|
||||||
|
@ -319,7 +408,7 @@ func (a *app) initResolver() {
|
||||||
var err error
|
var err error
|
||||||
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
|
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err))
|
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,11 +422,12 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
||||||
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
||||||
if resolveCfg.RPCAddress == "" {
|
if resolveCfg.RPCAddress == "" {
|
||||||
order = remove(order, resolver.NNSResolver)
|
order = remove(order, resolver.NNSResolver)
|
||||||
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
|
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided, logs.TagField(logs.TagConfig))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(order) == 0 {
|
if len(order) == 0 {
|
||||||
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty)
|
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty,
|
||||||
|
logs.TagField(logs.TagConfig))
|
||||||
}
|
}
|
||||||
|
|
||||||
return order, resolveCfg
|
return order, resolveCfg
|
||||||
|
@ -352,7 +442,7 @@ func (a *app) initMetrics() {
|
||||||
|
|
||||||
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
|
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
|
||||||
if !enabled {
|
if !enabled {
|
||||||
logger.Warn(logs.MetricsAreDisabled)
|
logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagConfig))
|
||||||
}
|
}
|
||||||
return &gateMetrics{
|
return &gateMetrics{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
@ -370,7 +460,7 @@ func (m *gateMetrics) isEnabled() bool {
|
||||||
|
|
||||||
func (m *gateMetrics) SetEnabled(enabled bool) {
|
func (m *gateMetrics) SetEnabled(enabled bool) {
|
||||||
if !enabled {
|
if !enabled {
|
||||||
m.logger.Warn(logs.MetricsAreDisabled)
|
m.logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagConfig))
|
||||||
}
|
}
|
||||||
|
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
|
@ -433,7 +523,7 @@ func getFrostFSKey(cfg *viper.Viper, log *zap.Logger) (*keys.PrivateKey, error)
|
||||||
walletPath := cfg.GetString(cfgWalletPath)
|
walletPath := cfg.GetString(cfgWalletPath)
|
||||||
|
|
||||||
if len(walletPath) == 0 {
|
if len(walletPath) == 0 {
|
||||||
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun)
|
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun, logs.TagField(logs.TagConfig))
|
||||||
key, err := keys.NewPrivateKey()
|
key, err := keys.NewPrivateKey()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -490,7 +580,10 @@ func getKeyFromWallet(w *wallet.Wallet, addrStr string, password *string) (*keys
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) Wait() {
|
func (a *app) Wait() {
|
||||||
a.log.Info(logs.StartingApplication, zap.String("app_name", "frostfs-http-gw"), zap.String("version", Version))
|
a.log.Info(logs.StartingApplication,
|
||||||
|
zap.String("app_name", "frostfs-http-gw"),
|
||||||
|
zap.String("version", Version),
|
||||||
|
logs.TagField(logs.TagApp))
|
||||||
|
|
||||||
a.metrics.SetVersion(Version)
|
a.metrics.SetVersion(Version)
|
||||||
a.setHealthStatus()
|
a.setHealthStatus()
|
||||||
|
@ -521,10 +614,10 @@ func (a *app) Serve() {
|
||||||
|
|
||||||
for i := range servs {
|
for i := range servs {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
|
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()), logs.TagField(logs.TagApp))
|
||||||
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
|
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
|
||||||
a.metrics.MarkUnhealthy(servs[i].Address())
|
a.metrics.MarkUnhealthy(servs[i].Address())
|
||||||
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
a.log.Fatal(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
|
@ -546,7 +639,7 @@ LOOP:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()))
|
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()), logs.TagField(logs.TagApp))
|
||||||
|
|
||||||
a.metrics.Shutdown()
|
a.metrics.Shutdown()
|
||||||
a.stopServices()
|
a.stopServices()
|
||||||
|
@ -556,7 +649,7 @@ LOOP:
|
||||||
func (a *app) initWorkerPool() *ants.Pool {
|
func (a *app) initWorkerPool() *ants.Pool {
|
||||||
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
|
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
|
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
return workerPool
|
return workerPool
|
||||||
}
|
}
|
||||||
|
@ -567,37 +660,35 @@ func (a *app) shutdownTracing() {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
if err := tracing.Shutdown(shdnCtx); err != nil {
|
if err := tracing.Shutdown(shdnCtx); err != nil {
|
||||||
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err))
|
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) configReload(ctx context.Context) {
|
func (a *app) configReload(ctx context.Context) {
|
||||||
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
log := a.log.With(logs.TagField(logs.TagConfig))
|
||||||
|
|
||||||
|
log.Info(logs.SIGHUPConfigReloadStarted)
|
||||||
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
||||||
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := readInConfig(a.cfg); err != nil {
|
if err := readInConfig(a.cfg); err != nil {
|
||||||
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if lvl, err := getLogLevel(a.cfg); err != nil {
|
a.settings.logLevelConfig.update(a.cfg, a.log)
|
||||||
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
|
||||||
} else {
|
|
||||||
a.logLevel.SetLevel(lvl)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
|
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
|
||||||
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
|
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
|
||||||
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
|
log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := a.updateServers(); err != nil {
|
if err := a.updateServers(); err != nil {
|
||||||
a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
|
log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
a.setRuntimeParameters()
|
a.setRuntimeParameters()
|
||||||
|
@ -611,7 +702,7 @@ func (a *app) configReload(ctx context.Context) {
|
||||||
a.initTracing(ctx)
|
a.initTracing(ctx)
|
||||||
a.setHealthStatus()
|
a.setHealthStatus()
|
||||||
|
|
||||||
a.log.Info(logs.SIGHUPConfigReloadCompleted)
|
log.Info(logs.SIGHUPConfigReloadCompleted)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) startServices() {
|
func (a *app) startServices() {
|
||||||
|
@ -647,20 +738,20 @@ func (a *app) configureRouter(h *handler.Handler) {
|
||||||
|
|
||||||
r.POST("/upload/{cid}", a.addMiddlewares(h.Upload))
|
r.POST("/upload/{cid}", a.addMiddlewares(h.Upload))
|
||||||
r.OPTIONS("/upload/{cid}", a.addPreflight())
|
r.OPTIONS("/upload/{cid}", a.addPreflight())
|
||||||
a.log.Info(logs.AddedPathUploadCid)
|
a.log.Info(logs.AddedPathUploadCid, logs.TagField(logs.TagApp))
|
||||||
r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(h.DownloadByAddressOrBucketName))
|
r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(h.DownloadByAddressOrBucketName))
|
||||||
r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(h.HeadByAddressOrBucketName))
|
r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(h.HeadByAddressOrBucketName))
|
||||||
r.OPTIONS("/get/{cid}/{oid:*}", a.addPreflight())
|
r.OPTIONS("/get/{cid}/{oid:*}", a.addPreflight())
|
||||||
a.log.Info(logs.AddedPathGetCidOid)
|
a.log.Info(logs.AddedPathGetCidOid, logs.TagField(logs.TagApp))
|
||||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.DownloadByAttribute))
|
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.DownloadByAttribute))
|
||||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute))
|
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute))
|
||||||
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
|
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
|
||||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal, logs.TagField(logs.TagApp))
|
||||||
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZip))
|
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZip))
|
||||||
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
|
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
|
||||||
r.GET("/tar/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadTar))
|
r.GET("/tar/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadTar))
|
||||||
r.OPTIONS("/tar/{cid}/{prefix:*}", a.addPreflight())
|
r.OPTIONS("/tar/{cid}/{prefix:*}", a.addPreflight())
|
||||||
a.log.Info(logs.AddedPathZipCidPrefix)
|
a.log.Info(logs.AddedPathZipCidPrefix, logs.TagField(logs.TagApp))
|
||||||
|
|
||||||
a.webServer.Handler = r.Handler
|
a.webServer.Handler = r.Handler
|
||||||
}
|
}
|
||||||
|
@ -754,6 +845,7 @@ func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
zap.ByteString("method", req.Method()),
|
zap.ByteString("method", req.Method()),
|
||||||
zap.ByteString("path", req.Path()),
|
zap.ByteString("path", req.Path()),
|
||||||
zap.ByteString("query", req.QueryArgs().QueryString()),
|
zap.ByteString("query", req.QueryArgs().QueryString()),
|
||||||
|
logs.TagField(logs.TagDatapath),
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info(logs.Request, fields...)
|
log.Info(logs.Request, fields...)
|
||||||
|
@ -800,7 +892,7 @@ func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log := utils.GetReqLogOrDefault(reqCtx, a.log)
|
log := utils.GetReqLogOrDefault(reqCtx, a.log)
|
||||||
|
|
||||||
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err))
|
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
handler.ResponseError(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
handler.ResponseError(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -854,6 +946,7 @@ func (a *app) initServers(ctx context.Context) {
|
||||||
fields := []zap.Field{
|
fields := []zap.Field{
|
||||||
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||||
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||||
|
logs.TagField(logs.TagApp),
|
||||||
}
|
}
|
||||||
srv, err := newServer(ctx, serverInfo)
|
srv, err := newServer(ctx, serverInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -869,7 +962,7 @@ func (a *app) initServers(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(a.servers) == 0 {
|
if len(a.servers) == 0 {
|
||||||
a.log.Fatal(logs.NoHealthyServers)
|
a.log.Fatal(logs.NoHealthyServers, logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -943,13 +1036,14 @@ func (a *app) initTracing(ctx context.Context) {
|
||||||
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" {
|
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" {
|
||||||
caBytes, err := os.ReadFile(trustedCa)
|
caBytes, err := os.ReadFile(trustedCa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
certPool := x509.NewCertPool()
|
certPool := x509.NewCertPool()
|
||||||
ok := certPool.AppendCertsFromPEM(caBytes)
|
ok := certPool.AppendCertsFromPEM(caBytes)
|
||||||
if !ok {
|
if !ok {
|
||||||
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"))
|
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"),
|
||||||
|
logs.TagField(logs.TagApp))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cfg.ServerCaCertPool = certPool
|
cfg.ServerCaCertPool = certPool
|
||||||
|
@ -957,24 +1051,24 @@ func (a *app) initTracing(ctx context.Context) {
|
||||||
|
|
||||||
attributes, err := fetchTracingAttributes(a.cfg)
|
attributes, err := fetchTracingAttributes(a.cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagConfig))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cfg.Attributes = attributes
|
cfg.Attributes = attributes
|
||||||
|
|
||||||
updated, err := tracing.Setup(ctx, cfg)
|
updated, err := tracing.Setup(ctx, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
if updated {
|
if updated {
|
||||||
a.log.Info(logs.TracingConfigUpdated)
|
a.log.Info(logs.TracingConfigUpdated, logs.TagField(logs.TagConfig))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) setRuntimeParameters() {
|
func (a *app) setRuntimeParameters() {
|
||||||
if len(os.Getenv("GOMEMLIMIT")) != 0 {
|
if len(os.Getenv("GOMEMLIMIT")) != 0 {
|
||||||
// default limit < yaml limit < app env limit < GOMEMLIMIT
|
// default limit < yaml limit < app env limit < GOMEMLIMIT
|
||||||
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT)
|
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT, logs.TagField(logs.TagApp))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -983,7 +1077,8 @@ func (a *app) setRuntimeParameters() {
|
||||||
if softMemoryLimit != previous {
|
if softMemoryLimit != previous {
|
||||||
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
||||||
zap.Int64("new_value", softMemoryLimit),
|
zap.Int64("new_value", softMemoryLimit),
|
||||||
zap.Int64("old_value", previous))
|
zap.Int64("old_value", previous),
|
||||||
|
logs.TagField(logs.TagConfig))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1009,28 +1104,29 @@ func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
|
||||||
a.mu.Lock()
|
a.mu.Lock()
|
||||||
defer a.mu.Unlock()
|
defer a.mu.Unlock()
|
||||||
|
|
||||||
a.log.Info(logs.ServerReconnecting)
|
a.log.Info(logs.ServerReconnecting, logs.TagField(logs.TagApp))
|
||||||
var failedServers []ServerInfo
|
var failedServers []ServerInfo
|
||||||
|
|
||||||
for _, serverInfo := range a.unbindServers {
|
for _, serverInfo := range a.unbindServers {
|
||||||
fields := []zap.Field{
|
fields := []zap.Field{
|
||||||
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||||
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||||
|
logs.TagField(logs.TagApp),
|
||||||
}
|
}
|
||||||
|
|
||||||
srv, err := newServer(ctx, serverInfo)
|
srv, err := newServer(ctx, serverInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
|
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
failedServers = append(failedServers, serverInfo)
|
failedServers = append(failedServers, serverInfo)
|
||||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
|
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()), logs.TagField(logs.TagApp))
|
||||||
a.metrics.MarkHealthy(serverInfo.Address)
|
a.metrics.MarkHealthy(serverInfo.Address)
|
||||||
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
a.log.Warn(logs.ListenAndServe, zap.Error(err))
|
a.log.Warn(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
174
cmd/http-gw/logger.go
Normal file
174
cmd/http-gw/logger.go
Normal file
|
@ -0,0 +1,174 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/zapjournald"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
"github.com/ssgreg/journald"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
||||||
|
var lvl zapcore.Level
|
||||||
|
lvlStr := v.GetString(cfgLoggerLevel)
|
||||||
|
err := lvl.UnmarshalText([]byte(lvlStr))
|
||||||
|
if err != nil {
|
||||||
|
return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+
|
||||||
|
"value should be one of %v", lvlStr, err, [...]zapcore.Level{
|
||||||
|
zapcore.DebugLevel,
|
||||||
|
zapcore.InfoLevel,
|
||||||
|
zapcore.WarnLevel,
|
||||||
|
zapcore.ErrorLevel,
|
||||||
|
zapcore.DPanicLevel,
|
||||||
|
zapcore.PanicLevel,
|
||||||
|
zapcore.FatalLevel,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return lvl, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ zapcore.Core = (*zapCoreTagFilterWrapper)(nil)
|
||||||
|
|
||||||
|
type zapCoreTagFilterWrapper struct {
|
||||||
|
core zapcore.Core
|
||||||
|
settings TagFilterSettings
|
||||||
|
extra []zap.Field
|
||||||
|
}
|
||||||
|
|
||||||
|
type TagFilterSettings interface {
|
||||||
|
LevelEnabled(tag string, lvl zapcore.Level) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *zapCoreTagFilterWrapper) Enabled(level zapcore.Level) bool {
|
||||||
|
return c.core.Enabled(level)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *zapCoreTagFilterWrapper) With(fields []zapcore.Field) zapcore.Core {
|
||||||
|
return &zapCoreTagFilterWrapper{
|
||||||
|
core: c.core.With(fields),
|
||||||
|
settings: c.settings,
|
||||||
|
extra: append(c.extra, fields...),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *zapCoreTagFilterWrapper) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
|
||||||
|
if c.core.Enabled(entry.Level) {
|
||||||
|
return checked.AddCore(entry, c)
|
||||||
|
}
|
||||||
|
return checked
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *zapCoreTagFilterWrapper) Write(entry zapcore.Entry, fields []zapcore.Field) error {
|
||||||
|
if c.shouldSkip(entry, fields) || c.shouldSkip(entry, c.extra) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.core.Write(entry, fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *zapCoreTagFilterWrapper) shouldSkip(entry zapcore.Entry, fields []zap.Field) bool {
|
||||||
|
for _, field := range fields {
|
||||||
|
if field.Key == logs.TagFieldName && field.Type == zapcore.StringType {
|
||||||
|
if !c.settings.LevelEnabled(field.String, entry.Level) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *zapCoreTagFilterWrapper) Sync() error {
|
||||||
|
return c.core.Sync()
|
||||||
|
}
|
||||||
|
|
||||||
|
func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, loggerSettings LoggerAppSettings, tagSetting TagFilterSettings) zapcore.Core {
|
||||||
|
core = &zapCoreTagFilterWrapper{
|
||||||
|
core: core,
|
||||||
|
settings: tagSetting,
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.GetBool(cfgLoggerSamplingEnabled) {
|
||||||
|
core = zapcore.NewSamplerWithOptions(core,
|
||||||
|
v.GetDuration(cfgLoggerSamplingInterval),
|
||||||
|
v.GetInt(cfgLoggerSamplingInitial),
|
||||||
|
v.GetInt(cfgLoggerSamplingThereafter),
|
||||||
|
zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
|
||||||
|
if dec&zapcore.LogDropped > 0 {
|
||||||
|
loggerSettings.DroppedLogsInc()
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
return core
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLogEncoder() zapcore.Encoder {
|
||||||
|
c := zap.NewProductionEncoderConfig()
|
||||||
|
c.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||||
|
|
||||||
|
return zapcore.NewConsoleEncoder(c)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newStdoutLogger constructs a zap.Logger instance for current application.
|
||||||
|
// Panics on failure.
|
||||||
|
//
|
||||||
|
// Logger is built from zap's production logging configuration with:
|
||||||
|
// - parameterized level (debug by default)
|
||||||
|
// - console encoding
|
||||||
|
// - ISO8601 time encoding
|
||||||
|
//
|
||||||
|
// Logger records a stack trace for all messages at or above fatal level.
|
||||||
|
//
|
||||||
|
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
||||||
|
func newStdoutLogger(v *viper.Viper, lvl zap.AtomicLevel, loggerSettings LoggerAppSettings, tagSetting TagFilterSettings) *Logger {
|
||||||
|
stdout := zapcore.AddSync(os.Stderr)
|
||||||
|
|
||||||
|
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, lvl)
|
||||||
|
consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, loggerSettings, tagSetting)
|
||||||
|
|
||||||
|
return &Logger{
|
||||||
|
logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
||||||
|
lvl: lvl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newJournaldLogger(v *viper.Viper, lvl zap.AtomicLevel, loggerSettings LoggerAppSettings, tagSetting TagFilterSettings) *Logger {
|
||||||
|
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
|
||||||
|
|
||||||
|
core := zapjournald.NewCore(lvl, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
||||||
|
coreWithContext := core.With([]zapcore.Field{
|
||||||
|
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
||||||
|
zapjournald.SyslogIdentifier(),
|
||||||
|
zapjournald.SyslogPid(),
|
||||||
|
})
|
||||||
|
|
||||||
|
coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, loggerSettings, tagSetting)
|
||||||
|
|
||||||
|
return &Logger{
|
||||||
|
logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
||||||
|
lvl: lvl,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type LoggerAppSettings interface {
|
||||||
|
DroppedLogsInc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func pickLogger(v *viper.Viper, lvl zap.AtomicLevel, loggerSettings LoggerAppSettings, tagSettings TagFilterSettings) *Logger {
|
||||||
|
dest := v.GetString(cfgLoggerDestination)
|
||||||
|
|
||||||
|
switch dest {
|
||||||
|
case destinationStdout:
|
||||||
|
return newStdoutLogger(v, lvl, loggerSettings, tagSettings)
|
||||||
|
case destinationJournald:
|
||||||
|
return newJournaldLogger(v, lvl, loggerSettings, tagSettings)
|
||||||
|
default:
|
||||||
|
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,10 +22,8 @@ import (
|
||||||
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/zapjournald"
|
|
||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"github.com/ssgreg/journald"
|
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
@ -110,6 +108,11 @@ const (
|
||||||
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
|
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
|
||||||
cfgLoggerSamplingInterval = "logger.sampling.interval"
|
cfgLoggerSamplingInterval = "logger.sampling.interval"
|
||||||
|
|
||||||
|
cfgLoggerTags = "logger.tags"
|
||||||
|
cfgLoggerTagsPrefixTmpl = cfgLoggerTags + ".%d."
|
||||||
|
cfgLoggerTagsNameTmpl = cfgLoggerTagsPrefixTmpl + "name"
|
||||||
|
cfgLoggerTagsLevelTmpl = cfgLoggerTagsPrefixTmpl + "level"
|
||||||
|
|
||||||
// Wallet.
|
// Wallet.
|
||||||
cfgWalletPassphrase = "wallet.passphrase"
|
cfgWalletPassphrase = "wallet.passphrase"
|
||||||
cfgWalletPath = "wallet.path"
|
cfgWalletPath = "wallet.path"
|
||||||
|
@ -192,6 +195,8 @@ var ignore = map[string]struct{}{
|
||||||
cmdVersion: {},
|
cmdVersion: {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var defaultTags = []string{logs.TagApp, logs.TagConfig, logs.TagDatapath}
|
||||||
|
|
||||||
type Logger struct {
|
type Logger struct {
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
lvl zap.AtomicLevel
|
lvl zap.AtomicLevel
|
||||||
|
@ -430,112 +435,33 @@ func mergeConfig(v *viper.Viper, fileName string) error {
|
||||||
return v.MergeConfig(cfgFile)
|
return v.MergeConfig(cfgFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
type LoggerAppSettings interface {
|
func fetchLogTagsConfig(v *viper.Viper, defaultLvl zapcore.Level) (map[string]zapcore.Level, error) {
|
||||||
DroppedLogsInc()
|
res := make(map[string]zapcore.Level)
|
||||||
|
|
||||||
|
for i := 0; ; i++ {
|
||||||
|
name := v.GetString(fmt.Sprintf(cfgLoggerTagsNameTmpl, i))
|
||||||
|
if name == "" {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger {
|
lvl := defaultLvl
|
||||||
lvl, err := getLogLevel(v)
|
level := v.GetString(fmt.Sprintf(cfgLoggerTagsLevelTmpl, i))
|
||||||
if err != nil {
|
if level != "" {
|
||||||
panic(err)
|
if err := lvl.Set(level); err != nil {
|
||||||
}
|
return nil, fmt.Errorf("failed to parse log tags config, unknown level: '%s'", level)
|
||||||
|
|
||||||
dest := v.GetString(cfgLoggerDestination)
|
|
||||||
|
|
||||||
switch dest {
|
|
||||||
case destinationStdout:
|
|
||||||
return newStdoutLogger(v, lvl, settings)
|
|
||||||
case destinationJournald:
|
|
||||||
return newJournaldLogger(v, lvl, settings)
|
|
||||||
default:
|
|
||||||
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newStdoutLogger constructs a zap.Logger instance for current application.
|
res[name] = lvl
|
||||||
// Panics on failure.
|
}
|
||||||
//
|
|
||||||
// Logger is built from zap's production logging configuration with:
|
|
||||||
// - parameterized level (debug by default)
|
|
||||||
// - console encoding
|
|
||||||
// - ISO8601 time encoding
|
|
||||||
//
|
|
||||||
// Logger records a stack trace for all messages at or above fatal level.
|
|
||||||
//
|
|
||||||
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
|
||||||
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
|
|
||||||
stdout := zapcore.AddSync(os.Stderr)
|
|
||||||
level := zap.NewAtomicLevelAt(lvl)
|
|
||||||
|
|
||||||
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
|
if len(res) == 0 && !v.IsSet(cfgLoggerTags) {
|
||||||
consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings)
|
for _, tag := range defaultTags {
|
||||||
|
res[tag] = defaultLvl
|
||||||
return &Logger{
|
|
||||||
logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
|
||||||
lvl: level,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
|
return res, nil
|
||||||
level := zap.NewAtomicLevelAt(lvl)
|
|
||||||
|
|
||||||
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
|
|
||||||
|
|
||||||
core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
|
||||||
coreWithContext := core.With([]zapcore.Field{
|
|
||||||
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
|
||||||
zapjournald.SyslogIdentifier(),
|
|
||||||
zapjournald.SyslogPid(),
|
|
||||||
})
|
|
||||||
|
|
||||||
coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings)
|
|
||||||
|
|
||||||
return &Logger{
|
|
||||||
logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
|
||||||
lvl: level,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newLogEncoder() zapcore.Encoder {
|
|
||||||
c := zap.NewProductionEncoderConfig()
|
|
||||||
c.EncodeTime = zapcore.ISO8601TimeEncoder
|
|
||||||
|
|
||||||
return zapcore.NewConsoleEncoder(c)
|
|
||||||
}
|
|
||||||
|
|
||||||
func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core {
|
|
||||||
if v.GetBool(cfgLoggerSamplingEnabled) {
|
|
||||||
core = zapcore.NewSamplerWithOptions(core,
|
|
||||||
v.GetDuration(cfgLoggerSamplingInterval),
|
|
||||||
v.GetInt(cfgLoggerSamplingInitial),
|
|
||||||
v.GetInt(cfgLoggerSamplingThereafter),
|
|
||||||
zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
|
|
||||||
if dec&zapcore.LogDropped > 0 {
|
|
||||||
settings.DroppedLogsInc()
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
return core
|
|
||||||
}
|
|
||||||
|
|
||||||
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
|
||||||
var lvl zapcore.Level
|
|
||||||
lvlStr := v.GetString(cfgLoggerLevel)
|
|
||||||
err := lvl.UnmarshalText([]byte(lvlStr))
|
|
||||||
if err != nil {
|
|
||||||
return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+
|
|
||||||
"value should be one of %v", lvlStr, err, [...]zapcore.Level{
|
|
||||||
zapcore.DebugLevel,
|
|
||||||
zapcore.InfoLevel,
|
|
||||||
zapcore.WarnLevel,
|
|
||||||
zapcore.ErrorLevel,
|
|
||||||
zapcore.DPanicLevel,
|
|
||||||
zapcore.PanicLevel,
|
|
||||||
zapcore.FatalLevel,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return lvl, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
||||||
|
@ -552,6 +478,7 @@ func fetchIndexPageTemplate(v *viper.Viper, l *zap.Logger) (string, bool) {
|
||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
l = l.With(logs.TagField(logs.TagConfig))
|
||||||
reader, err := os.Open(v.GetString(cfgIndexPageTemplatePath))
|
reader, err := os.Open(v.GetString(cfgIndexPageTemplatePath))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
|
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
|
||||||
|
@ -605,7 +532,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := seen[serverInfo.Address]; ok {
|
if _, ok := seen[serverInfo.Address]; ok {
|
||||||
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
|
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address), logs.TagField(logs.TagConfig))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
seen[serverInfo.Address] = struct{}{}
|
seen[serverInfo.Address] = struct{}{}
|
||||||
|
@ -618,7 +545,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
||||||
func (a *app) initPools(ctx context.Context) {
|
func (a *app) initPools(ctx context.Context) {
|
||||||
key, err := getFrostFSKey(a.cfg, a.log)
|
key, err := getFrostFSKey(a.cfg, a.log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
|
|
||||||
var prm pool.InitParameters
|
var prm pool.InitParameters
|
||||||
|
@ -626,7 +553,8 @@ func (a *app) initPools(ctx context.Context) {
|
||||||
|
|
||||||
prm.SetKey(&key.PrivateKey)
|
prm.SetKey(&key.PrivateKey)
|
||||||
prmTree.SetKey(key)
|
prmTree.SetKey(key)
|
||||||
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())),
|
||||||
|
logs.TagField(logs.TagApp))
|
||||||
|
|
||||||
for _, peer := range fetchPeers(a.log, a.cfg) {
|
for _, peer := range fetchPeers(a.log, a.cfg) {
|
||||||
prm.AddNode(peer)
|
prm.AddNode(peer)
|
||||||
|
@ -681,11 +609,11 @@ func (a *app) initPools(ctx context.Context) {
|
||||||
|
|
||||||
p, err := pool.NewPool(prm)
|
p, err := pool.NewPool(prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
|
a.log.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = p.Dial(ctx); err != nil {
|
if err = p.Dial(ctx); err != nil {
|
||||||
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
|
|
||||||
if a.cfg.GetBool(cfgFeaturesTreePoolNetmapSupport) {
|
if a.cfg.GetBool(cfgFeaturesTreePoolNetmapSupport) {
|
||||||
|
@ -694,10 +622,10 @@ func (a *app) initPools(ctx context.Context) {
|
||||||
|
|
||||||
treePool, err := treepool.NewPool(prmTree)
|
treePool, err := treepool.NewPool(prmTree)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
|
a.log.Fatal(logs.FailedToCreateTreePool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
if err = treePool.Dial(ctx); err != nil {
|
if err = treePool.Dial(ctx); err != nil {
|
||||||
a.log.Fatal(logs.FailedToDialTreePool, zap.Error(err))
|
a.log.Fatal(logs.FailedToDialTreePool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
|
|
||||||
a.pool = p
|
a.pool = p
|
||||||
|
@ -728,7 +656,8 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
|
||||||
l.Info(logs.AddedStoragePeer,
|
l.Info(logs.AddedStoragePeer,
|
||||||
zap.Int("priority", priority),
|
zap.Int("priority", priority),
|
||||||
zap.String("address", address),
|
zap.String("address", address),
|
||||||
zap.Float64("weight", weight))
|
zap.Float64("weight", weight),
|
||||||
|
logs.TagField(logs.TagConfig))
|
||||||
}
|
}
|
||||||
|
|
||||||
return nodes
|
return nodes
|
||||||
|
@ -767,7 +696,8 @@ func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultV
|
||||||
l.Error(logs.InvalidLifetimeUsingDefaultValue,
|
l.Error(logs.InvalidLifetimeUsingDefaultValue,
|
||||||
zap.String("parameter", cfgEntry),
|
zap.String("parameter", cfgEntry),
|
||||||
zap.Duration("value in config", lifetime),
|
zap.Duration("value in config", lifetime),
|
||||||
zap.Duration("default", defaultValue))
|
zap.Duration("default", defaultValue),
|
||||||
|
logs.TagField(logs.TagConfig))
|
||||||
} else {
|
} else {
|
||||||
return lifetime
|
return lifetime
|
||||||
}
|
}
|
||||||
|
@ -783,7 +713,8 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue
|
||||||
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
|
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
|
||||||
zap.String("parameter", cfgEntry),
|
zap.String("parameter", cfgEntry),
|
||||||
zap.Int("value in config", size),
|
zap.Int("value in config", size),
|
||||||
zap.Int("default", defaultValue))
|
zap.Int("default", defaultValue),
|
||||||
|
logs.TagField(logs.TagConfig))
|
||||||
} else {
|
} else {
|
||||||
return size
|
return size
|
||||||
}
|
}
|
||||||
|
@ -795,7 +726,7 @@ func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue
|
||||||
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
|
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
|
||||||
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger))
|
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err))
|
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err), logs.TagField(logs.TagConfig))
|
||||||
}
|
}
|
||||||
return source
|
return source
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,9 @@ HTTP_GW_LOGGER_SAMPLING_ENABLED=false
|
||||||
HTTP_GW_LOGGER_SAMPLING_INITIAL=100
|
HTTP_GW_LOGGER_SAMPLING_INITIAL=100
|
||||||
HTTP_GW_LOGGER_SAMPLING_THEREAFTER=100
|
HTTP_GW_LOGGER_SAMPLING_THEREAFTER=100
|
||||||
HTTP_GW_LOGGER_SAMPLING_INTERVAL=1s
|
HTTP_GW_LOGGER_SAMPLING_INTERVAL=1s
|
||||||
|
HTTP_GW_LOGGER_TAGS_0_NAME=app
|
||||||
|
HTTP_GW_LOGGER_TAGS_1_NAME=config
|
||||||
|
HTTP_GW_LOGGER_TAGS_2_NAME=datapath
|
||||||
|
|
||||||
HTTP_GW_SERVER_0_ADDRESS=0.0.0.0:443
|
HTTP_GW_SERVER_0_ADDRESS=0.0.0.0:443
|
||||||
HTTP_GW_SERVER_0_TLS_ENABLED=false
|
HTTP_GW_SERVER_0_TLS_ENABLED=false
|
||||||
|
|
|
@ -29,6 +29,11 @@ logger:
|
||||||
initial: 100
|
initial: 100
|
||||||
thereafter: 100
|
thereafter: 100
|
||||||
interval: 1s
|
interval: 1s
|
||||||
|
tags:
|
||||||
|
- name: app
|
||||||
|
- name: config
|
||||||
|
- name: datapath
|
||||||
|
level: debug
|
||||||
|
|
||||||
server:
|
server:
|
||||||
- address: 0.0.0.0:8080
|
- address: 0.0.0.0:8080
|
||||||
|
|
|
@ -174,6 +174,12 @@ logger:
|
||||||
initial: 100
|
initial: 100
|
||||||
thereafter: 100
|
thereafter: 100
|
||||||
interval: 1s
|
interval: 1s
|
||||||
|
tags:
|
||||||
|
- name: "app"
|
||||||
|
level: info
|
||||||
|
- name: "config"
|
||||||
|
- name: "datapath"
|
||||||
|
- name: "external_storage_tree"
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
@ -184,6 +190,31 @@ logger:
|
||||||
| `sampling.initial` | `int` | no | '100' | Sampling count of first log entries. |
|
| `sampling.initial` | `int` | no | '100' | Sampling count of first log entries. |
|
||||||
| `sampling.thereafter` | `int` | no | '100' | Sampling count of entries after an `interval`. |
|
| `sampling.thereafter` | `int` | no | '100' | Sampling count of entries after an `interval`. |
|
||||||
| `sampling.interval` | `duration` | no | '1s' | Sampling interval of messaging similar entries. |
|
| `sampling.interval` | `duration` | no | '1s' | Sampling interval of messaging similar entries. |
|
||||||
|
| `sampling.tags` | `[]Tag` | yes | | Tagged log entries that should be additionally logged (available tags see in the next section). |
|
||||||
|
|
||||||
|
## Tags
|
||||||
|
|
||||||
|
There are additional log entries that can hurt performance and can be additionally logged by using `logger.tags`
|
||||||
|
parameter. Available tags:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
tags:
|
||||||
|
- name: "app"
|
||||||
|
level: info
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
|-----------------------|------------|---------------|---------------------------|-------------------------------------------------------------------------------------------------------|
|
||||||
|
| `name` | `string` | yes | | Tag name. Possible values see below in `Tag values` section. |
|
||||||
|
| `level` | `string` | yes | Value from `logger.level` | Logging level for specific tag. Possible values: `debug`, `info`, `warn`, `dpanic`, `panic`, `fatal`. |
|
||||||
|
|
||||||
|
### Tag values
|
||||||
|
|
||||||
|
* `app` - common application logs (enabled by default).
|
||||||
|
* `config` - application configuration, SIGHUP etc (enabled by default).
|
||||||
|
* `datapath` - main logic of application (enabled by default).
|
||||||
|
* `external_storage` - external interaction with storage node.
|
||||||
|
* `external_storage_tree` - external interaction with tree service in storage node.
|
||||||
|
|
||||||
# `web` section
|
# `web` section
|
||||||
|
|
||||||
|
|
4
internal/cache/buckets.go
vendored
4
internal/cache/buckets.go
vendored
|
@ -72,7 +72,7 @@ func (o *BucketCache) GetByCID(cnrID cid.ID) *data.BucketInfo {
|
||||||
key, ok := entry.(string)
|
key, ok := entry.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||||
zap.String("expected", fmt.Sprintf("%T", key)))
|
zap.String("expected", fmt.Sprintf("%T", key)), logs.TagField(logs.TagDatapath))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,7 +88,7 @@ func (o *BucketCache) get(key string) *data.BucketInfo {
|
||||||
result, ok := entry.(*data.BucketInfo)
|
result, ok := entry.(*data.BucketInfo)
|
||||||
if !ok {
|
if !ok {
|
||||||
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
zap.String("expected", fmt.Sprintf("%T", result)), logs.TagField(logs.TagDatapath))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
2
internal/cache/netmap.go
vendored
2
internal/cache/netmap.go
vendored
|
@ -53,7 +53,7 @@ func (c *NetmapCache) Get() *netmap.NetMap {
|
||||||
result, ok := entry.(netmap.NetMap)
|
result, ok := entry.(netmap.NetMap)
|
||||||
if !ok {
|
if !ok {
|
||||||
c.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
c.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
zap.String("expected", fmt.Sprintf("%T", result)), logs.TagField(logs.TagDatapath))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -230,7 +230,7 @@ func (h *Handler) getDirObjectsNative(ctx context.Context, bucketInfo *data.Buck
|
||||||
}
|
}
|
||||||
for objExt := range resp {
|
for objExt := range resp {
|
||||||
if objExt.Error != nil {
|
if objExt.Error != nil {
|
||||||
log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error))
|
log.Error(logs.FailedToHeadObject, zap.Error(objExt.Error), logs.TagField(logs.TagExternalStorage))
|
||||||
result.hasErrors = true
|
result.hasErrors = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -273,7 +273,7 @@ func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs Re
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err))
|
log.Warn(logs.FailedToSumbitTaskToPool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -283,7 +283,7 @@ func (h *Handler) headDirObjects(ctx context.Context, cnrID cid.ID, objectIDs Re
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToIterateOverResponse, zap.Error(err))
|
log.Error(logs.FailedToIterateOverResponse, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}()
|
}()
|
||||||
|
@ -332,12 +332,18 @@ func (h *Handler) browseObjects(c *fasthttp.RequestCtx, p browseParams) {
|
||||||
const S3Protocol = "s3"
|
const S3Protocol = "s3"
|
||||||
const FrostfsProtocol = "frostfs"
|
const FrostfsProtocol = "frostfs"
|
||||||
|
|
||||||
|
logTag := logs.TagExternalStorageTree
|
||||||
|
if p.isNative {
|
||||||
|
logTag = logs.TagExternalStorage
|
||||||
|
}
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
ctx := utils.GetContextFromRequest(c)
|
||||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||||
log := reqLog.With(
|
log := reqLog.With(
|
||||||
zap.String("bucket", p.bucketInfo.Name),
|
zap.String("bucket", p.bucketInfo.Name),
|
||||||
zap.String("container", p.bucketInfo.CID.EncodeToString()),
|
zap.String("container", p.bucketInfo.CID.EncodeToString()),
|
||||||
zap.String("prefix", p.prefix),
|
zap.String("prefix", p.prefix),
|
||||||
|
logs.TagField(logTag),
|
||||||
)
|
)
|
||||||
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
|
resp, err := p.listObjects(ctx, p.bucketInfo, p.prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -37,13 +37,13 @@ func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorage)), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
||||||
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
||||||
logAndSendBucketError(c, log, checkS3Err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorageTree)), checkS3Err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,13 +121,13 @@ func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, res
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
if errIter != nil {
|
if errIter != nil {
|
||||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
|
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
||||||
return
|
return
|
||||||
} else if objectsWritten == 0 {
|
} else if objectsWritten == 0 {
|
||||||
log.Warn(logs.ObjectsNotFound)
|
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
if err := zipWriter.Close(); err != nil {
|
if err := zipWriter.Close(); err != nil {
|
||||||
log.Error(logs.CloseZipWriter, zap.Error(err))
|
log.Error(logs.CloseZipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,7 +158,7 @@ func (h *Handler) DownloadTar(c *fasthttp.RequestCtx) {
|
||||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorage)), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
||||||
|
@ -187,10 +187,10 @@ func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, res
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := tarWriter.Close(); err != nil {
|
if err := tarWriter.Close(); err != nil {
|
||||||
log.Error(logs.CloseTarWriter, zap.Error(err))
|
log.Error(logs.CloseTarWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
if err := gzipWriter.Close(); err != nil {
|
if err := gzipWriter.Close(); err != nil {
|
||||||
log.Error(logs.CloseGzipWriter, zap.Error(err))
|
log.Error(logs.CloseGzipWriter, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -204,9 +204,9 @@ func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, res
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
if errIter != nil {
|
if errIter != nil {
|
||||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
|
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter), logs.TagField(logs.TagDatapath))
|
||||||
} else if objectsWritten == 0 {
|
} else if objectsWritten == 0 {
|
||||||
log.Warn(logs.ObjectsNotFound)
|
log.Warn(logs.ObjectsNotFound, logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -237,18 +237,18 @@ func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID
|
||||||
|
|
||||||
resGet, err := h.frostfs.GetObject(ctx, prm)
|
resGet, err := h.frostfs.GetObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToGetObject, zap.Error(err))
|
log.Error(logs.FailedToGetObject, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
fileWriter, err := createArchiveHeader(&resGet.Header)
|
fileWriter, err := createArchiveHeader(&resGet.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err))
|
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = writeToArchive(resGet, fileWriter, buf); err != nil {
|
if err = writeToArchive(resGet, fileWriter, buf); err != nil {
|
||||||
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err))
|
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,7 +264,8 @@ func (h *Handler) searchObjectsByPrefix(c *fasthttp.RequestCtx, log *zap.Logger,
|
||||||
|
|
||||||
prefix, err := url.QueryUnescape(prefix)
|
prefix, err := url.QueryUnescape(prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Error(err))
|
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix),
|
||||||
|
zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -273,7 +274,7 @@ func (h *Handler) searchObjectsByPrefix(c *fasthttp.RequestCtx, log *zap.Logger,
|
||||||
|
|
||||||
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,8 @@ func filterHeaders(l *zap.Logger, header *fasthttp.RequestHeader) (map[string]st
|
||||||
|
|
||||||
l.Debug(logs.AddAttributeToResultObject,
|
l.Debug(logs.AddAttributeToResultObject,
|
||||||
zap.String("key", k),
|
zap.String("key", k),
|
||||||
zap.String("val", v))
|
zap.String("val", v),
|
||||||
|
logs.TagField(logs.TagDatapath))
|
||||||
})
|
})
|
||||||
|
|
||||||
return result, err
|
return result, err
|
||||||
|
|
|
@ -206,11 +206,11 @@ func (h *Handler) byS3Path(ctx context.Context, req request, cnrID cid.ID, path
|
||||||
|
|
||||||
foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path)
|
foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorageTree)), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if foundOID.IsDeleteMarker {
|
if foundOID.IsDeleteMarker {
|
||||||
log.Error(logs.ObjectWasDeleted)
|
log.Error(logs.ObjectWasDeleted, logs.TagField(logs.TagExternalStorageTree))
|
||||||
ResponseError(c, "object deleted", fasthttp.StatusNotFound)
|
ResponseError(c, "object deleted", fasthttp.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -230,14 +230,16 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte
|
||||||
|
|
||||||
key, err := url.QueryUnescape(key)
|
key, err := url.QueryUnescape(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key), zap.Error(err))
|
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key),
|
||||||
|
zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
val, err = url.QueryUnescape(val)
|
val, err = url.QueryUnescape(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val), zap.Error(err))
|
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val),
|
||||||
|
zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -246,7 +248,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorage)), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,7 +273,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte
|
||||||
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
||||||
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
|
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
log.Error(logs.CouldNotSearchForObjects, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
return oid.ID{}, fmt.Errorf("could not search for objects: %w", err)
|
return oid.ID{}, fmt.Errorf("could not search for objects: %w", err)
|
||||||
}
|
}
|
||||||
defer res.Close()
|
defer res.Close()
|
||||||
|
@ -282,13 +284,13 @@ func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cn
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
switch {
|
switch {
|
||||||
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
|
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
|
||||||
log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName)
|
log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName, logs.TagField(logs.TagExternalStorage))
|
||||||
return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal)
|
return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal)
|
||||||
case errors.Is(err, io.EOF):
|
case errors.Is(err, io.EOF):
|
||||||
log.Error(logs.ObjectNotFound, zap.Error(err))
|
log.Error(logs.ObjectNotFound, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
return oid.ID{}, fmt.Errorf("object not found: %w", err)
|
return oid.ID{}, fmt.Errorf("object not found: %w", err)
|
||||||
default:
|
default:
|
||||||
log.Error(logs.ReadObjectListFailed, zap.Error(err))
|
log.Error(logs.ReadObjectListFailed, zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
|
return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -342,7 +344,8 @@ func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *
|
||||||
log.Warn(logs.CouldntPutBucketIntoCache,
|
log.Warn(logs.CouldntPutBucketIntoCache,
|
||||||
zap.String("bucket name", bktInfo.Name),
|
zap.String("bucket name", bktInfo.Name),
|
||||||
zap.Stringer("bucket cid", bktInfo.CID),
|
zap.Stringer("bucket cid", bktInfo.CID),
|
||||||
zap.Error(err))
|
zap.Error(err),
|
||||||
|
logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
|
|
||||||
return bktInfo, nil
|
return bktInfo, nil
|
||||||
|
@ -386,13 +389,13 @@ func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
|
||||||
|
|
||||||
unescapedKey, err := url.QueryUnescape(oidURLParam)
|
unescapedKey, err := url.QueryUnescape(oidURLParam)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagDatapath)), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log)
|
bktInfo, err := h.getBucketInfo(ctx, cidURLParam, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorage)), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,7 +67,8 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
||||||
req.log.Info(logs.CouldntParseCreationDate,
|
req.log.Info(logs.CouldntParseCreationDate,
|
||||||
zap.String("key", key),
|
zap.String("key", key),
|
||||||
zap.String("val", val),
|
zap.String("val", val),
|
||||||
zap.Error(err))
|
zap.Error(err),
|
||||||
|
logs.TagField(logs.TagDatapath))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||||
|
@ -126,12 +127,12 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorage)), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
||||||
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
||||||
logAndSendBucketError(c, log, checkS3Err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorageTree)), checkS3Err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,7 +144,7 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
} else if err = objID.DecodeString(oidParam); err == nil {
|
} else if err = objID.DecodeString(oidParam); err == nil {
|
||||||
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject)
|
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject)
|
||||||
} else {
|
} else {
|
||||||
logAndSendBucketError(c, log, checkS3Err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorageTree)), checkS3Err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,7 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
||||||
|
|
||||||
name := part.FormName()
|
name := part.FormName()
|
||||||
if name == "" {
|
if name == "" {
|
||||||
l.Debug(logs.IgnorePartEmptyFormName)
|
l.Debug(logs.IgnorePartEmptyFormName, logs.TagField(logs.TagDatapath))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,9 +41,9 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
||||||
|
|
||||||
// ignore multipart/form-data values
|
// ignore multipart/form-data values
|
||||||
if filename == "" {
|
if filename == "" {
|
||||||
l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name))
|
l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name), logs.TagField(logs.TagDatapath))
|
||||||
if err = part.Close(); err != nil {
|
if err = part.Close(); err != nil {
|
||||||
l.Warn(logs.FailedToCloseReader, zap.Error(err))
|
l.Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ func fetchMultipartFileDefault(l *zap.Logger, r io.Reader, boundary string) (Mul
|
||||||
|
|
||||||
name := part.FormName()
|
name := part.FormName()
|
||||||
if name == "" {
|
if name == "" {
|
||||||
l.Debug(logs.IgnorePartEmptyFormName)
|
l.Debug(logs.IgnorePartEmptyFormName, logs.TagField(logs.TagDatapath))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,8 +120,7 @@ func fetchMultipartFileDefault(l *zap.Logger, r io.Reader, boundary string) (Mul
|
||||||
|
|
||||||
// ignore multipart/form-data values
|
// ignore multipart/form-data values
|
||||||
if filename == "" {
|
if filename == "" {
|
||||||
l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name))
|
l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name), logs.TagField(logs.TagDatapath))
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -110,7 +110,8 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
||||||
if err = req.setTimestamp(val); err != nil {
|
if err = req.setTimestamp(val); err != nil {
|
||||||
req.log.Error(logs.CouldntParseCreationDate,
|
req.log.Error(logs.CouldntParseCreationDate,
|
||||||
zap.String("val", val),
|
zap.String("val", val),
|
||||||
zap.Error(err))
|
zap.Error(err),
|
||||||
|
logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
case object.AttributeContentType:
|
case object.AttributeContentType:
|
||||||
contentType = val
|
contentType = val
|
||||||
|
@ -144,7 +145,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
||||||
return payload, nil
|
return payload, nil
|
||||||
}, filename)
|
}, filename)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,20 +62,20 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log.With(logs.TagField(logs.TagExternalStorage)), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
boundary := string(c.Request.Header.MultipartFormBoundary())
|
boundary := string(c.Request.Header.MultipartFormBoundary())
|
||||||
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
|
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
|
||||||
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err))
|
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
filtered, err := filterHeaders(log, &c.Request.Header)
|
filtered, err := filterHeaders(log, &c.Request.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToFilterHeaders, zap.Error(err))
|
log.Error(logs.FailedToFilterHeaders, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -106,7 +106,7 @@ func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file Mul
|
||||||
|
|
||||||
attributes, err := h.extractAttributes(c, log, filtered)
|
attributes, err := h.extractAttributes(c, log, filtered)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToGetAttributes, zap.Error(err))
|
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -119,13 +119,14 @@ func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file Mul
|
||||||
log.Debug(logs.ObjectUploaded,
|
log.Debug(logs.ObjectUploaded,
|
||||||
zap.String("oid", idObj.EncodeToString()),
|
zap.String("oid", idObj.EncodeToString()),
|
||||||
zap.String("FileName", file.FileName()),
|
zap.String("FileName", file.FileName()),
|
||||||
|
logs.TagField(logs.TagExternalStorage),
|
||||||
)
|
)
|
||||||
|
|
||||||
addr := newAddress(bkt.CID, idObj)
|
addr := newAddress(bkt.CID, idObj)
|
||||||
c.Response.Header.SetContentType(jsonHeader)
|
c.Response.Header.SetContentType(jsonHeader)
|
||||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||||
if err = newPutResponse(addr).encode(c); err != nil {
|
if err = newPutResponse(addr).encode(c); err != nil {
|
||||||
log.Error(logs.CouldNotEncodeResponse, zap.Error(err))
|
log.Error(logs.CouldNotEncodeResponse, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, "could not encode response", fasthttp.StatusBadRequest)
|
ResponseError(c, "could not encode response", fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -162,13 +163,14 @@ func (h *Handler) extractAttributes(c *fasthttp.RequestCtx, log *zap.Logger, fil
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||||
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
||||||
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err))
|
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
|
||||||
|
logs.TagField(logs.TagDatapath))
|
||||||
} else {
|
} else {
|
||||||
now = parsed
|
now = parsed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil {
|
if err := utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil {
|
||||||
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
|
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
attributes := make([]object.Attribute, 0, len(filtered))
|
attributes := make([]object.Attribute, 0, len(filtered))
|
||||||
|
@ -205,7 +207,7 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
||||||
|
|
||||||
commonAttributes, err := h.extractAttributes(c, log, filtered)
|
commonAttributes, err := h.extractAttributes(c, log, filtered)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToGetAttributes, zap.Error(err))
|
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -213,16 +215,16 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
||||||
|
|
||||||
reader := file
|
reader := file
|
||||||
if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
|
if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
|
||||||
log.Debug(logs.GzipReaderSelected)
|
log.Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
|
||||||
gzipReader, err := gzip.NewReader(file)
|
gzipReader, err := gzip.NewReader(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.FailedToCreateGzipReader, zap.Error(err))
|
log.Error(logs.FailedToCreateGzipReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := gzipReader.Close(); err != nil {
|
if err := gzipReader.Close(); err != nil {
|
||||||
log.Warn(logs.FailedToCloseReader, zap.Error(err))
|
log.Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
reader = gzipReader
|
reader = gzipReader
|
||||||
|
@ -234,7 +236,7 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
log.Error(logs.FailedToReadFileFromTar, zap.Error(err))
|
log.Error(logs.FailedToReadFileFromTar, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest)
|
ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -258,6 +260,7 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
||||||
log.Debug(logs.ObjectUploaded,
|
log.Debug(logs.ObjectUploaded,
|
||||||
zap.String("oid", idObj.EncodeToString()),
|
zap.String("oid", idObj.EncodeToString()),
|
||||||
zap.String("FileName", fileName),
|
zap.String("FileName", fileName),
|
||||||
|
logs.TagField(logs.TagExternalStorage),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -265,6 +268,7 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
||||||
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {
|
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {
|
||||||
statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err)
|
statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err)
|
||||||
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
|
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
|
||||||
|
logFields = append(logFields, logs.TagField(logs.TagExternalStorage))
|
||||||
|
|
||||||
log.Error(logs.CouldNotStoreFileInFrostfs, logFields...)
|
log.Error(logs.CouldNotStoreFileInFrostfs, logFields...)
|
||||||
ResponseError(r, msg, statusCode)
|
ResponseError(r, msg, statusCode)
|
||||||
|
|
|
@ -35,6 +35,7 @@ func (r *request) handleFrostFSErr(err error, start time.Time) {
|
||||||
logFields := []zap.Field{
|
logFields := []zap.Field{
|
||||||
zap.Stringer("elapsed", time.Since(start)),
|
zap.Stringer("elapsed", time.Since(start)),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
|
logs.TagField(logs.TagExternalStorage),
|
||||||
}
|
}
|
||||||
statusCode, msg, additionalFields := formErrorResponse("could not receive object", err)
|
statusCode, msg, additionalFields := formErrorResponse("could not receive object", err)
|
||||||
logFields = append(logFields, additionalFields...)
|
logFields = append(logFields, additionalFields...)
|
||||||
|
|
|
@ -1,63 +1,44 @@
|
||||||
package logs
|
package logs
|
||||||
|
|
||||||
|
import "go.uber.org/zap"
|
||||||
|
|
||||||
|
const (
|
||||||
|
TagFieldName = "tag"
|
||||||
|
|
||||||
|
TagApp = "app"
|
||||||
|
TagConfig = "config"
|
||||||
|
TagDatapath = "datapath"
|
||||||
|
TagExternalStorage = "external_storage"
|
||||||
|
TagExternalStorageTree = "external_storage_tree"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TagField(tag string) zap.Field {
|
||||||
|
return zap.String(TagFieldName, tag)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log messages with the "app" tag.
|
||||||
const (
|
const (
|
||||||
CouldntParseCreationDate = "couldn't parse creation date"
|
|
||||||
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload"
|
|
||||||
CouldNotReceiveObject = "could not receive object"
|
|
||||||
ObjectWasDeleted = "object was deleted"
|
|
||||||
CouldNotSearchForObjects = "could not search for objects"
|
|
||||||
ObjectNotFound = "object not found"
|
|
||||||
ReadObjectListFailed = "read object list failed"
|
|
||||||
FailedToAddObjectToArchive = "failed to add object to archive"
|
|
||||||
FailedToGetObject = "failed to get object"
|
|
||||||
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed"
|
|
||||||
ObjectsNotFound = "objects not found"
|
|
||||||
CloseZipWriter = "close zip writer"
|
|
||||||
ServiceIsRunning = "service is running"
|
ServiceIsRunning = "service is running"
|
||||||
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port"
|
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port"
|
||||||
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled"
|
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled"
|
||||||
ShuttingDownService = "shutting down service"
|
ShuttingDownService = "shutting down service"
|
||||||
CantShutDownService = "can't shut down service"
|
CantShutDownService = "can't shut down service"
|
||||||
CantGracefullyShutDownService = "can't gracefully shut down service, force stop"
|
CantGracefullyShutDownService = "can't gracefully shut down service, force stop"
|
||||||
IgnorePartEmptyFormName = "ignore part, empty form name"
|
|
||||||
IgnorePartEmptyFilename = "ignore part, empty filename"
|
|
||||||
CouldNotReceiveMultipartForm = "could not receive multipart/form"
|
|
||||||
CouldNotParseClientTime = "could not parse client time"
|
|
||||||
CouldNotPrepareExpirationHeader = "could not prepare expiration header"
|
|
||||||
CouldNotEncodeResponse = "could not encode response"
|
|
||||||
CouldNotStoreFileInFrostfs = "could not store file in frostfs"
|
|
||||||
AddAttributeToResultObject = "add attribute to result object"
|
|
||||||
FailedToCreateResolver = "failed to create resolver"
|
FailedToCreateResolver = "failed to create resolver"
|
||||||
FailedToCreateWorkerPool = "failed to create worker pool"
|
FailedToCreateWorkerPool = "failed to create worker pool"
|
||||||
FailedToReadIndexPageTemplate = "failed to read index page template"
|
|
||||||
SetCustomIndexPageTemplate = "set custom index page template"
|
|
||||||
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty"
|
|
||||||
MetricsAreDisabled = "metrics are disabled"
|
|
||||||
NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun = "no wallet path specified, creating ephemeral key automatically for this run"
|
|
||||||
StartingApplication = "starting application"
|
StartingApplication = "starting application"
|
||||||
StartingServer = "starting server"
|
StartingServer = "starting server"
|
||||||
ListenAndServe = "listen and serve"
|
ListenAndServe = "listen and serve"
|
||||||
ShuttingDownWebServer = "shutting down web server"
|
ShuttingDownWebServer = "shutting down web server"
|
||||||
FailedToShutdownTracing = "failed to shutdown tracing"
|
FailedToShutdownTracing = "failed to shutdown tracing"
|
||||||
SIGHUPConfigReloadStarted = "SIGHUP config reload started"
|
|
||||||
FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed"
|
|
||||||
FailedToReloadConfig = "failed to reload config"
|
|
||||||
LogLevelWontBeUpdated = "log level won't be updated"
|
|
||||||
FailedToUpdateResolvers = "failed to update resolvers"
|
|
||||||
FailedToReloadServerParameters = "failed to reload server parameters"
|
|
||||||
SIGHUPConfigReloadCompleted = "SIGHUP config reload completed"
|
|
||||||
AddedPathUploadCid = "added path /upload/{cid}"
|
AddedPathUploadCid = "added path /upload/{cid}"
|
||||||
AddedPathGetCidOid = "added path /get/{cid}/{oid}"
|
AddedPathGetCidOid = "added path /get/{cid}/{oid}"
|
||||||
AddedPathGetByAttributeCidAttrKeyAttrVal = "added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}"
|
AddedPathGetByAttributeCidAttrKeyAttrVal = "added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}"
|
||||||
AddedPathZipCidPrefix = "added path /zip/{cid}/{prefix}"
|
AddedPathZipCidPrefix = "added path /zip/{cid}/{prefix}"
|
||||||
Request = "request"
|
|
||||||
CouldNotFetchAndStoreBearerToken = "could not fetch and store bearer token"
|
|
||||||
FailedToAddServer = "failed to add server"
|
FailedToAddServer = "failed to add server"
|
||||||
AddServer = "add server"
|
AddServer = "add server"
|
||||||
NoHealthyServers = "no healthy servers"
|
NoHealthyServers = "no healthy servers"
|
||||||
FailedToInitializeTracing = "failed to initialize tracing"
|
FailedToInitializeTracing = "failed to initialize tracing"
|
||||||
TracingConfigUpdated = "tracing config updated"
|
|
||||||
ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided = "resolver nns won't be used since rpc_endpoint isn't provided"
|
|
||||||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
||||||
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
|
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
|
||||||
CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key"
|
CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key"
|
||||||
|
@ -66,33 +47,86 @@ const (
|
||||||
FailedToDialConnectionPool = "failed to dial connection pool"
|
FailedToDialConnectionPool = "failed to dial connection pool"
|
||||||
FailedToCreateTreePool = "failed to create tree pool"
|
FailedToCreateTreePool = "failed to create tree pool"
|
||||||
FailedToDialTreePool = "failed to dial tree pool"
|
FailedToDialTreePool = "failed to dial tree pool"
|
||||||
AddedStoragePeer = "added storage peer"
|
|
||||||
CouldntGetBucket = "could not get bucket"
|
|
||||||
CouldntPutBucketIntoCache = "couldn't put bucket info into cache"
|
|
||||||
FailedToSumbitTaskToPool = "failed to submit task to pool"
|
|
||||||
FailedToHeadObject = "failed to head object"
|
|
||||||
FailedToIterateOverResponse = "failed to iterate over search response"
|
|
||||||
InvalidCacheEntryType = "invalid cache entry type"
|
|
||||||
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)"
|
|
||||||
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value"
|
|
||||||
FailedToUnescapeQuery = "failed to unescape query"
|
|
||||||
ServerReconnecting = "reconnecting server..."
|
ServerReconnecting = "reconnecting server..."
|
||||||
ServerReconnectedSuccessfully = "server reconnected successfully"
|
ServerReconnectedSuccessfully = "server reconnected successfully"
|
||||||
ServerReconnectFailed = "failed to reconnect server"
|
ServerReconnectFailed = "failed to reconnect server"
|
||||||
WarnDuplicateAddress = "duplicate address"
|
FailedToSumbitTaskToPool = "failed to submit task to pool"
|
||||||
MultinetDialSuccess = "multinet dial successful"
|
MultinetDialSuccess = "multinet dial successful"
|
||||||
MultinetDialFail = "multinet dial failed"
|
MultinetDialFail = "multinet dial failed"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Log messages with the "config" tag.
|
||||||
|
const (
|
||||||
|
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty"
|
||||||
|
MetricsAreDisabled = "metrics are disabled"
|
||||||
|
NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun = "no wallet path specified, creating ephemeral key automatically for this run"
|
||||||
|
SIGHUPConfigReloadStarted = "SIGHUP config reload started"
|
||||||
|
FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed"
|
||||||
|
FailedToReloadConfig = "failed to reload config"
|
||||||
|
FailedToUpdateResolvers = "failed to update resolvers"
|
||||||
|
FailedToReloadServerParameters = "failed to reload server parameters"
|
||||||
|
SIGHUPConfigReloadCompleted = "SIGHUP config reload completed"
|
||||||
|
TracingConfigUpdated = "tracing config updated"
|
||||||
|
ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided = "resolver nns won't be used since rpc_endpoint isn't provided"
|
||||||
|
AddedStoragePeer = "added storage peer"
|
||||||
|
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)"
|
||||||
|
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value"
|
||||||
|
WarnDuplicateAddress = "duplicate address"
|
||||||
FailedToLoadMultinetConfig = "failed to load multinet config"
|
FailedToLoadMultinetConfig = "failed to load multinet config"
|
||||||
MultinetConfigWontBeUpdated = "multinet config won't be updated"
|
MultinetConfigWontBeUpdated = "multinet config won't be updated"
|
||||||
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
|
LogLevelWontBeUpdated = "log level won't be updated"
|
||||||
|
TagsLogConfigWontBeUpdated = "tags log config won't be updated"
|
||||||
|
FailedToReadIndexPageTemplate = "failed to read index page template"
|
||||||
|
SetCustomIndexPageTemplate = "set custom index page template"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Log messages with the "datapath" tag.
|
||||||
|
const (
|
||||||
|
CouldntParseCreationDate = "couldn't parse creation date"
|
||||||
|
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload"
|
||||||
|
FailedToAddObjectToArchive = "failed to add object to archive"
|
||||||
|
CloseZipWriter = "close zip writer"
|
||||||
|
IgnorePartEmptyFormName = "ignore part, empty form name"
|
||||||
|
IgnorePartEmptyFilename = "ignore part, empty filename"
|
||||||
|
CouldNotParseClientTime = "could not parse client time"
|
||||||
|
CouldNotPrepareExpirationHeader = "could not prepare expiration header"
|
||||||
|
CouldNotEncodeResponse = "could not encode response"
|
||||||
|
AddAttributeToResultObject = "add attribute to result object"
|
||||||
|
Request = "request"
|
||||||
|
CouldNotFetchAndStoreBearerToken = "could not fetch and store bearer token"
|
||||||
|
CouldntPutBucketIntoCache = "couldn't put bucket info into cache"
|
||||||
|
FailedToIterateOverResponse = "failed to iterate over search response"
|
||||||
|
InvalidCacheEntryType = "invalid cache entry type"
|
||||||
|
FailedToUnescapeQuery = "failed to unescape query"
|
||||||
CouldntCacheNetmap = "couldn't cache netmap"
|
CouldntCacheNetmap = "couldn't cache netmap"
|
||||||
|
FailedToCloseReader = "failed to close reader"
|
||||||
FailedToFilterHeaders = "failed to filter headers"
|
FailedToFilterHeaders = "failed to filter headers"
|
||||||
FailedToReadFileFromTar = "failed to read file from tar"
|
FailedToReadFileFromTar = "failed to read file from tar"
|
||||||
FailedToGetAttributes = "failed to get attributes"
|
FailedToGetAttributes = "failed to get attributes"
|
||||||
ObjectUploaded = "object uploaded"
|
|
||||||
CloseGzipWriter = "close gzip writer"
|
CloseGzipWriter = "close gzip writer"
|
||||||
CloseTarWriter = "close tar writer"
|
CloseTarWriter = "close tar writer"
|
||||||
FailedToCloseReader = "failed to close reader"
|
|
||||||
FailedToCreateGzipReader = "failed to create gzip reader"
|
FailedToCreateGzipReader = "failed to create gzip reader"
|
||||||
GzipReaderSelected = "gzip reader selected"
|
GzipReaderSelected = "gzip reader selected"
|
||||||
|
CouldNotReceiveMultipartForm = "could not receive multipart/form"
|
||||||
|
ObjectsNotFound = "objects not found"
|
||||||
|
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Log messages with the "external_storage" tag.
|
||||||
|
const (
|
||||||
|
CouldNotReceiveObject = "could not receive object"
|
||||||
|
CouldNotSearchForObjects = "could not search for objects"
|
||||||
|
ObjectNotFound = "object not found"
|
||||||
|
ReadObjectListFailed = "read object list failed"
|
||||||
|
CouldNotStoreFileInFrostfs = "could not store file in frostfs"
|
||||||
|
FailedToHeadObject = "failed to head object"
|
||||||
|
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
|
||||||
|
FailedToGetObject = "failed to get object"
|
||||||
|
ObjectUploaded = "object uploaded"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Log messages with the "external_storage_tree" tag.
|
||||||
|
const (
|
||||||
|
ObjectWasDeleted = "object was deleted"
|
||||||
|
CouldntGetBucket = "could not get bucket"
|
||||||
)
|
)
|
||||||
|
|
|
@ -12,14 +12,17 @@ type LogEventHandler struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l LogEventHandler) DialPerformed(sourceIP net.Addr, _, address string, err error) {
|
func (l LogEventHandler) DialPerformed(sourceIP net.Addr, _, address string, err error) {
|
||||||
|
log := l.logger.With(logs.TagField(logs.TagApp))
|
||||||
sourceIPString := "undefined"
|
sourceIPString := "undefined"
|
||||||
if sourceIP != nil {
|
if sourceIP != nil {
|
||||||
sourceIPString = sourceIP.Network() + "://" + sourceIP.String()
|
sourceIPString = sourceIP.Network() + "://" + sourceIP.String()
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
l.logger.Debug(logs.MultinetDialSuccess, zap.String("source", sourceIPString), zap.String("destination", address))
|
log.Debug(logs.MultinetDialSuccess, zap.String("source", sourceIPString),
|
||||||
|
zap.String("destination", address))
|
||||||
} else {
|
} else {
|
||||||
l.logger.Debug(logs.MultinetDialFail, zap.String("source", sourceIPString), zap.String("destination", address), zap.Error(err))
|
log.Debug(logs.MultinetDialFail, zap.String("source", sourceIPString),
|
||||||
|
zap.String("destination", address))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ func (s *Source) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = s.netmapCache.Put(netmapSnapshot); err != nil {
|
if err = s.netmapCache.Put(netmapSnapshot); err != nil {
|
||||||
s.log.Warn(logs.CouldntCacheNetmap, zap.Error(err))
|
s.log.Warn(logs.CouldntCacheNetmap, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||||
}
|
}
|
||||||
|
|
||||||
return netmapSnapshot, nil
|
return netmapSnapshot, nil
|
||||||
|
|
|
@ -25,24 +25,24 @@ type Config struct {
|
||||||
// Start runs http service with the exposed endpoint on the configured port.
|
// Start runs http service with the exposed endpoint on the configured port.
|
||||||
func (ms *Service) Start() {
|
func (ms *Service) Start() {
|
||||||
if ms.enabled {
|
if ms.enabled {
|
||||||
ms.log.Info(logs.ServiceIsRunning, zap.String("endpoint", ms.Addr))
|
ms.log.Info(logs.ServiceIsRunning, zap.String("endpoint", ms.Addr), logs.TagField(logs.TagApp))
|
||||||
err := ms.ListenAndServe()
|
err := ms.ListenAndServe()
|
||||||
if err != nil && err != http.ErrServerClosed {
|
if err != nil && err != http.ErrServerClosed {
|
||||||
ms.log.Warn(logs.ServiceCouldntStartOnConfiguredPort)
|
ms.log.Warn(logs.ServiceCouldntStartOnConfiguredPort, logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ms.log.Info(logs.ServiceHasntStartedSinceItsDisabled)
|
ms.log.Info(logs.ServiceHasntStartedSinceItsDisabled, logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShutDown stops the service.
|
// ShutDown stops the service.
|
||||||
func (ms *Service) ShutDown(ctx context.Context) {
|
func (ms *Service) ShutDown(ctx context.Context) {
|
||||||
ms.log.Info(logs.ShuttingDownService, zap.String("endpoint", ms.Addr))
|
ms.log.Info(logs.ShuttingDownService, zap.String("endpoint", ms.Addr), logs.TagField(logs.TagApp))
|
||||||
err := ms.Shutdown(ctx)
|
err := ms.Shutdown(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ms.log.Error(logs.CantGracefullyShutDownService, zap.Error(err))
|
ms.log.Error(logs.CantGracefullyShutDownService, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
if err = ms.Close(); err != nil {
|
if err = ms.Close(); err != nil {
|
||||||
ms.log.Panic(logs.CantShutDownService, zap.Error(err))
|
ms.log.Panic(logs.CantShutDownService, zap.Error(err), logs.TagField(logs.TagApp))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue