[#195] Add tags support

Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
Aleksey Kravchenko 2025-01-21 09:44:13 +03:00
parent 4b782cf124
commit 5615ab31b7
22 changed files with 571 additions and 303 deletions

View file

@ -44,6 +44,7 @@ import (
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"
)
@ -51,7 +52,6 @@ type (
app struct {
ctx context.Context
log *zap.Logger
logLevel zap.AtomicLevel
pool *pool.Pool
treePool *treepool.Pool
key *keys.PrivateKey
@ -94,6 +94,7 @@ type (
reconnectInterval time.Duration
dialerSource *internalnet.DialerSource
workerPoolSize int
logLevelConfig *logLevelConfig
mu sync.RWMutex
defaultTimestamp bool
@ -113,6 +114,11 @@ type (
enableFilepathFallback bool
}
logLevelConfig struct {
logLevel zap.AtomicLevel
tagLogs sync.Map
}
CORS struct {
AllowOrigin string
AllowMethods []string
@ -123,9 +129,61 @@ type (
}
)
func newLogLevelConfig(v *viper.Viper) *logLevelConfig {
var l logLevelConfig
l.logLevel = zap.NewAtomicLevel()
if err := l.update(v); err != nil {
panic(err.Error())
}
return &l
}
func (l *logLevelConfig) LevelEnabled(tag string, tgtLevel zapcore.Level) bool {
lvl, ok := l.tagLogs.Load(tag)
if !ok {
return false
}
return lvl.(zapcore.Level).Enabled(tgtLevel)
}
func (l *logLevelConfig) update(cfg *viper.Viper) error {
ll, err := getLogLevel(cfg)
if err != nil {
return err
}
l.logLevel.SetLevel(ll)
tags, err := fetchLogTagsConfig(cfg, ll)
if err != nil {
return err
}
l.tagLogs.Range(func(key, value any) bool {
k := key.(string)
v := value.(zapcore.Level)
if lvl, ok := tags[k]; ok {
if lvl != v {
l.tagLogs.Store(key, lvl)
}
} else {
l.tagLogs.Delete(key)
delete(tags, k)
}
return true
})
for k, v := range tags {
l.tagLogs.Store(k, v)
}
return nil
}
func newApp(ctx context.Context, v *viper.Viper) App {
logSettings := &loggerSettings{}
log := pickLogger(v, logSettings)
logConfig := newLogLevelConfig(v)
log := pickLogger(v, logConfig.logLevel, logSettings, logConfig)
a := &app{
ctx: ctx,
@ -137,7 +195,7 @@ func newApp(ctx context.Context, v *viper.Viper) App {
bucketCache: cache.NewBucketCache(getBucketCacheOptions(v, log.logger), v.GetBool(cfgFeaturesTreePoolNetmapSupport)),
}
a.initAppSettings()
a.initAppSettings(logConfig)
// -- setup FastHTTP server --
a.webServer.Name = "frost-http-gw"
@ -167,11 +225,12 @@ func newApp(ctx context.Context, v *viper.Viper) App {
return a
}
func (a *app) initAppSettings() {
func (a *app) initAppSettings(lc *logLevelConfig) {
a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg),
dialerSource: getDialerSource(a.log, a.cfg),
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
logLevelConfig: lc,
}
a.settings.update(a.cfg, a.log)
}
@ -319,7 +378,7 @@ func (a *app) initResolver() {
var err error
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
if err != nil {
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err))
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err), logs.TagField(logs.TagApp))
}
}
@ -333,11 +392,12 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
order := a.cfg.GetStringSlice(cfgResolveOrder)
if resolveCfg.RPCAddress == "" {
order = remove(order, resolver.NNSResolver)
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided, logs.TagField(logs.TagConfig))
}
if len(order) == 0 {
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty)
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty,
logs.TagField(logs.TagConfig))
}
return order, resolveCfg
@ -352,7 +412,7 @@ func (a *app) initMetrics() {
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
if !enabled {
logger.Warn(logs.MetricsAreDisabled)
logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagConfig))
}
return &gateMetrics{
logger: logger,
@ -370,7 +430,7 @@ func (m *gateMetrics) isEnabled() bool {
func (m *gateMetrics) SetEnabled(enabled bool) {
if !enabled {
m.logger.Warn(logs.MetricsAreDisabled)
m.logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagConfig))
}
m.mu.Lock()
@ -433,7 +493,7 @@ func getFrostFSKey(cfg *viper.Viper, log *zap.Logger) (*keys.PrivateKey, error)
walletPath := cfg.GetString(cfgWalletPath)
if len(walletPath) == 0 {
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun)
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun, logs.TagField(logs.TagConfig))
key, err := keys.NewPrivateKey()
if err != nil {
return nil, err
@ -490,7 +550,10 @@ func getKeyFromWallet(w *wallet.Wallet, addrStr string, password *string) (*keys
}
func (a *app) Wait() {
a.log.Info(logs.StartingApplication, zap.String("app_name", "frostfs-http-gw"), zap.String("version", Version))
a.log.Info(logs.StartingApplication,
zap.String("app_name", "frostfs-http-gw"),
zap.String("version", Version),
logs.TagField(logs.TagApp))
a.metrics.SetVersion(Version)
a.setHealthStatus()
@ -521,10 +584,10 @@ func (a *app) Serve() {
for i := range servs {
go func(i int) {
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()), logs.TagField(logs.TagApp))
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
a.log.Fatal(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp))
}
}(i)
}
@ -546,7 +609,7 @@ LOOP:
}
}
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()))
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()), logs.TagField(logs.TagApp))
a.metrics.Shutdown()
a.stopServices()
@ -556,7 +619,7 @@ LOOP:
func (a *app) initWorkerPool() *ants.Pool {
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
if err != nil {
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err), logs.TagField(logs.TagApp))
}
return workerPool
}
@ -567,37 +630,37 @@ func (a *app) shutdownTracing() {
defer cancel()
if err := tracing.Shutdown(shdnCtx); err != nil {
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err))
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err), logs.TagField(logs.TagApp))
}
}
func (a *app) configReload(ctx context.Context) {
a.log.Info(logs.SIGHUPConfigReloadStarted)
log := a.log.With(logs.TagField(logs.TagConfig))
log.Info(logs.SIGHUPConfigReloadStarted)
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
return
}
if err := readInConfig(a.cfg); err != nil {
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
log.Warn(logs.FailedToReloadConfig, zap.Error(err))
return
}
if lvl, err := getLogLevel(a.cfg); err != nil {
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
} else {
a.logLevel.SetLevel(lvl)
if err := a.settings.logLevelConfig.update(a.cfg); err != nil {
log.Warn(logs.LogLevelAndTagsConfigWontBeUpdated, zap.Error(err))
}
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
}
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
}
if err := a.updateServers(); err != nil {
a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
}
a.setRuntimeParameters()
@ -611,7 +674,7 @@ func (a *app) configReload(ctx context.Context) {
a.initTracing(ctx)
a.setHealthStatus()
a.log.Info(logs.SIGHUPConfigReloadCompleted)
log.Info(logs.SIGHUPConfigReloadCompleted)
}
func (a *app) startServices() {
@ -647,18 +710,18 @@ func (a *app) configureRouter(h *handler.Handler) {
r.POST("/upload/{cid}", a.addMiddlewares(h.Upload))
r.OPTIONS("/upload/{cid}", a.addPreflight())
a.log.Info(logs.AddedPathUploadCid)
a.log.Info(logs.AddedPathUploadCid, logs.TagField(logs.TagApp))
r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(h.DownloadByAddressOrBucketName))
r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(h.HeadByAddressOrBucketName))
r.OPTIONS("/get/{cid}/{oid:*}", a.addPreflight())
a.log.Info(logs.AddedPathGetCidOid)
a.log.Info(logs.AddedPathGetCidOid, logs.TagField(logs.TagApp))
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.DownloadByAttribute))
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute))
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal, logs.TagField(logs.TagApp))
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZipped))
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
a.log.Info(logs.AddedPathZipCidPrefix)
a.log.Info(logs.AddedPathZipCidPrefix, logs.TagField(logs.TagApp))
a.webServer.Handler = r.Handler
}
@ -752,6 +815,7 @@ func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
zap.ByteString("method", req.Method()),
zap.ByteString("path", req.Path()),
zap.ByteString("query", req.QueryArgs().QueryString()),
logs.TagField(logs.TagDatapath),
}
log.Info(logs.Request, fields...)
@ -798,7 +862,7 @@ func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
if err != nil {
log := utils.GetReqLogOrDefault(reqCtx, a.log)
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err))
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err), logs.TagField(logs.TagDatapath))
handler.ResponseError(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return
}
@ -852,6 +916,7 @@ func (a *app) initServers(ctx context.Context) {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
logs.TagField(logs.TagApp),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
@ -867,7 +932,7 @@ func (a *app) initServers(ctx context.Context) {
}
if len(a.servers) == 0 {
a.log.Fatal(logs.NoHealthyServers)
a.log.Fatal(logs.NoHealthyServers, logs.TagField(logs.TagApp))
}
}
@ -941,13 +1006,14 @@ func (a *app) initTracing(ctx context.Context) {
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" {
caBytes, err := os.ReadFile(trustedCa)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp))
return
}
certPool := x509.NewCertPool()
ok := certPool.AppendCertsFromPEM(caBytes)
if !ok {
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"))
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"),
logs.TagField(logs.TagApp))
return
}
cfg.ServerCaCertPool = certPool
@ -955,24 +1021,24 @@ func (a *app) initTracing(ctx context.Context) {
attributes, err := fetchTracingAttributes(a.cfg)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagConfig))
return
}
cfg.Attributes = attributes
updated, err := tracing.Setup(ctx, cfg)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp))
}
if updated {
a.log.Info(logs.TracingConfigUpdated)
a.log.Info(logs.TracingConfigUpdated, logs.TagField(logs.TagConfig))
}
}
func (a *app) setRuntimeParameters() {
if len(os.Getenv("GOMEMLIMIT")) != 0 {
// default limit < yaml limit < app env limit < GOMEMLIMIT
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT)
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT, logs.TagField(logs.TagApp))
return
}
@ -981,7 +1047,8 @@ func (a *app) setRuntimeParameters() {
if softMemoryLimit != previous {
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
zap.Int64("new_value", softMemoryLimit),
zap.Int64("old_value", previous))
zap.Int64("old_value", previous),
logs.TagField(logs.TagConfig))
}
}
@ -1007,28 +1074,29 @@ func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
a.mu.Lock()
defer a.mu.Unlock()
a.log.Info(logs.ServerReconnecting)
a.log.Info(logs.ServerReconnecting, logs.TagField(logs.TagApp))
var failedServers []ServerInfo
for _, serverInfo := range a.unbindServers {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
logs.TagField(logs.TagApp),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err), logs.TagField(logs.TagApp))
failedServers = append(failedServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
continue
}
go func() {
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()), logs.TagField(logs.TagApp))
a.metrics.MarkHealthy(serverInfo.Address)
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
a.log.Warn(logs.ListenAndServe, zap.Error(err))
a.log.Warn(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp))
a.metrics.MarkUnhealthy(serverInfo.Address)
}
}()