[#195] Add tags support
Signed-off-by: Aleksey Kravchenko <al.kravchenko@yadro.com>
This commit is contained in:
parent
76bd6ea40f
commit
6a4d3206bd
22 changed files with 572 additions and 295 deletions
|
@ -44,6 +44,7 @@ import (
|
|||
"github.com/valyala/fasthttp"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
|
@ -51,7 +52,6 @@ type (
|
|||
app struct {
|
||||
ctx context.Context
|
||||
log *zap.Logger
|
||||
logLevel zap.AtomicLevel
|
||||
pool *pool.Pool
|
||||
treePool *treepool.Pool
|
||||
key *keys.PrivateKey
|
||||
|
@ -94,6 +94,7 @@ type (
|
|||
reconnectInterval time.Duration
|
||||
dialerSource *internalnet.DialerSource
|
||||
workerPoolSize int
|
||||
logLevelConfig *logLevelConfig
|
||||
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
|
@ -113,6 +114,15 @@ type (
|
|||
enableFilepathFallback bool
|
||||
}
|
||||
|
||||
tagsConfig struct {
|
||||
tagLogs sync.Map
|
||||
}
|
||||
|
||||
logLevelConfig struct {
|
||||
logLevel zap.AtomicLevel
|
||||
tagsConfig *tagsConfig
|
||||
}
|
||||
|
||||
CORS struct {
|
||||
AllowOrigin string
|
||||
AllowMethods []string
|
||||
|
@ -123,14 +133,91 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func newLogLevel(v *viper.Viper) zap.AtomicLevel {
|
||||
ll, err := getLogLevel(v)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
atomicLogLevel := zap.NewAtomicLevel()
|
||||
atomicLogLevel.SetLevel(ll)
|
||||
return atomicLogLevel
|
||||
}
|
||||
|
||||
func newTagsConfig(v *viper.Viper, ll zapcore.Level) *tagsConfig {
|
||||
var t tagsConfig
|
||||
if err := t.update(v, ll); err != nil {
|
||||
// panic here is analogue of the similar panic during common log level initialization.
|
||||
panic(err.Error())
|
||||
}
|
||||
return &t
|
||||
}
|
||||
|
||||
func newLogLevelConfig(lvl zap.AtomicLevel, tagsConfig *tagsConfig) *logLevelConfig {
|
||||
return &logLevelConfig{
|
||||
logLevel: lvl,
|
||||
tagsConfig: tagsConfig,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *logLevelConfig) update(cfg *viper.Viper, log *zap.Logger) {
|
||||
if lvl, err := getLogLevel(cfg); err != nil {
|
||||
log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
} else {
|
||||
l.logLevel.SetLevel(lvl)
|
||||
}
|
||||
|
||||
if err := l.tagsConfig.update(cfg, l.logLevel.Level()); err != nil {
|
||||
log.Warn(logs.TagsLogConfigWontBeUpdated, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tagsConfig) LevelEnabled(tag string, tgtLevel zapcore.Level) bool {
|
||||
lvl, ok := t.tagLogs.Load(tag)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return lvl.(zapcore.Level).Enabled(tgtLevel)
|
||||
}
|
||||
|
||||
func (t *tagsConfig) update(cfg *viper.Viper, ll zapcore.Level) error {
|
||||
tags, err := fetchLogTagsConfig(cfg, ll)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.tagLogs.Range(func(key, value any) bool {
|
||||
k := key.(string)
|
||||
v := value.(zapcore.Level)
|
||||
|
||||
if lvl, ok := tags[k]; ok {
|
||||
if lvl != v {
|
||||
t.tagLogs.Store(key, lvl)
|
||||
}
|
||||
} else {
|
||||
t.tagLogs.Delete(key)
|
||||
delete(tags, k)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
for k, v := range tags {
|
||||
t.tagLogs.Store(k, v)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newApp(ctx context.Context, cfg *appCfg) App {
|
||||
logSettings := &loggerSettings{}
|
||||
log := pickLogger(cfg.config(), logSettings)
|
||||
logLevel := newLogLevel(cfg.config())
|
||||
tagConfig := newTagsConfig(cfg.config(), logLevel.Level())
|
||||
logConfig := newLogLevelConfig(logLevel, tagConfig)
|
||||
log := pickLogger(cfg.config(), logConfig.logLevel, logSettings, tagConfig)
|
||||
|
||||
a := &app{
|
||||
ctx: ctx,
|
||||
log: log.logger,
|
||||
logLevel: log.lvl,
|
||||
cfg: cfg,
|
||||
loggerSettings: logSettings,
|
||||
webServer: new(fasthttp.Server),
|
||||
|
@ -138,7 +225,7 @@ func newApp(ctx context.Context, cfg *appCfg) App {
|
|||
bucketCache: cache.NewBucketCache(getBucketCacheOptions(cfg.config(), log.logger), cfg.config().GetBool(cfgFeaturesTreePoolNetmapSupport)),
|
||||
}
|
||||
|
||||
a.initAppSettings()
|
||||
a.initAppSettings(logConfig)
|
||||
|
||||
// -- setup FastHTTP server --
|
||||
a.webServer.Name = "frost-http-gw"
|
||||
|
@ -172,11 +259,12 @@ func (a *app) config() *viper.Viper {
|
|||
return a.cfg.config()
|
||||
}
|
||||
|
||||
func (a *app) initAppSettings() {
|
||||
func (a *app) initAppSettings(lc *logLevelConfig) {
|
||||
a.settings = &appSettings{
|
||||
reconnectInterval: fetchReconnectInterval(a.config()),
|
||||
dialerSource: getDialerSource(a.log, a.config()),
|
||||
workerPoolSize: a.config().GetInt(cfgWorkerPoolSize),
|
||||
logLevelConfig: lc,
|
||||
}
|
||||
a.settings.update(a.config(), a.log)
|
||||
}
|
||||
|
@ -324,7 +412,7 @@ func (a *app) initResolver() {
|
|||
var err error
|
||||
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -338,11 +426,12 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
|||
order := a.config().GetStringSlice(cfgResolveOrder)
|
||||
if resolveCfg.RPCAddress == "" {
|
||||
order = remove(order, resolver.NNSResolver)
|
||||
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
|
||||
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided, logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
if len(order) == 0 {
|
||||
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty)
|
||||
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty,
|
||||
logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
return order, resolveCfg
|
||||
|
@ -357,7 +446,7 @@ func (a *app) initMetrics() {
|
|||
|
||||
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
|
||||
if !enabled {
|
||||
logger.Warn(logs.MetricsAreDisabled)
|
||||
logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagApp))
|
||||
}
|
||||
return &gateMetrics{
|
||||
logger: logger,
|
||||
|
@ -375,7 +464,7 @@ func (m *gateMetrics) isEnabled() bool {
|
|||
|
||||
func (m *gateMetrics) SetEnabled(enabled bool) {
|
||||
if !enabled {
|
||||
m.logger.Warn(logs.MetricsAreDisabled)
|
||||
m.logger.Warn(logs.MetricsAreDisabled, logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
|
@ -438,7 +527,7 @@ func getFrostFSKey(cfg *viper.Viper, log *zap.Logger) (*keys.PrivateKey, error)
|
|||
walletPath := cfg.GetString(cfgWalletPath)
|
||||
|
||||
if len(walletPath) == 0 {
|
||||
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun)
|
||||
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun, logs.TagField(logs.TagApp))
|
||||
key, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -495,7 +584,10 @@ func getKeyFromWallet(w *wallet.Wallet, addrStr string, password *string) (*keys
|
|||
}
|
||||
|
||||
func (a *app) Wait() {
|
||||
a.log.Info(logs.StartingApplication, zap.String("app_name", "frostfs-http-gw"), zap.String("version", Version))
|
||||
a.log.Info(logs.StartingApplication,
|
||||
zap.String("app_name", "frostfs-http-gw"),
|
||||
zap.String("version", Version),
|
||||
logs.TagField(logs.TagApp))
|
||||
|
||||
a.metrics.SetVersion(Version)
|
||||
a.setHealthStatus()
|
||||
|
@ -526,10 +618,10 @@ func (a *app) Serve() {
|
|||
|
||||
for i := range servs {
|
||||
go func(i int) {
|
||||
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
|
||||
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()), logs.TagField(logs.TagApp))
|
||||
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
|
||||
a.metrics.MarkUnhealthy(servs[i].Address())
|
||||
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
||||
a.log.Fatal(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
@ -551,7 +643,7 @@ LOOP:
|
|||
}
|
||||
}
|
||||
|
||||
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()))
|
||||
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()), logs.TagField(logs.TagApp))
|
||||
|
||||
a.metrics.Shutdown()
|
||||
a.stopServices()
|
||||
|
@ -561,7 +653,7 @@ LOOP:
|
|||
func (a *app) initWorkerPool() *ants.Pool {
|
||||
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
return workerPool
|
||||
}
|
||||
|
@ -572,37 +664,33 @@ func (a *app) shutdownTracing() {
|
|||
defer cancel()
|
||||
|
||||
if err := tracing.Shutdown(shdnCtx); err != nil {
|
||||
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) configReload(ctx context.Context) {
|
||||
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
||||
a.log.Info(logs.SIGHUPConfigReloadStarted, logs.TagField(logs.TagApp))
|
||||
if !a.config().IsSet(cmdConfig) && !a.config().IsSet(cmdConfigDir) {
|
||||
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
||||
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed, logs.TagField(logs.TagApp))
|
||||
return
|
||||
}
|
||||
if err := a.cfg.reload(); err != nil {
|
||||
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
return
|
||||
}
|
||||
|
||||
if lvl, err := getLogLevel(a.config()); err != nil {
|
||||
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
||||
} else {
|
||||
a.logLevel.SetLevel(lvl)
|
||||
}
|
||||
a.settings.logLevelConfig.update(a.cfg.settings, a.log)
|
||||
|
||||
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.config(), a.log)); err != nil {
|
||||
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
|
||||
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
if err := a.updateServers(); err != nil {
|
||||
a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
a.setRuntimeParameters()
|
||||
|
@ -616,7 +704,7 @@ func (a *app) configReload(ctx context.Context) {
|
|||
a.initTracing(ctx)
|
||||
a.setHealthStatus()
|
||||
|
||||
a.log.Info(logs.SIGHUPConfigReloadCompleted)
|
||||
a.log.Info(logs.SIGHUPConfigReloadCompleted, logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
func (a *app) startServices() {
|
||||
|
@ -654,20 +742,20 @@ func (a *app) configureRouter(h *handler.Handler) {
|
|||
|
||||
r.POST("/upload/{cid}", a.addMiddlewares(h.Upload))
|
||||
r.OPTIONS("/upload/{cid}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathUploadCid)
|
||||
a.log.Info(logs.AddedPathUploadCid, logs.TagField(logs.TagApp))
|
||||
r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(h.DownloadByAddressOrBucketName))
|
||||
r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(h.HeadByAddressOrBucketName))
|
||||
r.OPTIONS("/get/{cid}/{oid:*}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathGetCidOid)
|
||||
a.log.Info(logs.AddedPathGetCidOid, logs.TagField(logs.TagApp))
|
||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.DownloadByAttribute))
|
||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute))
|
||||
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal, logs.TagField(logs.TagApp))
|
||||
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZip))
|
||||
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
|
||||
r.GET("/tar/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadTar))
|
||||
r.OPTIONS("/tar/{cid}/{prefix:*}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathZipCidPrefix)
|
||||
a.log.Info(logs.AddedPathZipCidPrefix, logs.TagField(logs.TagApp))
|
||||
|
||||
a.webServer.Handler = r.Handler
|
||||
}
|
||||
|
@ -756,14 +844,11 @@ func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
|||
reqCtx = utils.SetReqLog(reqCtx, log)
|
||||
utils.SetContextToRequest(reqCtx, req)
|
||||
|
||||
fields := []zap.Field{
|
||||
zap.String("remote", req.RemoteAddr().String()),
|
||||
log.Info(logs.Request, zap.String("remote", req.RemoteAddr().String()),
|
||||
zap.ByteString("method", req.Method()),
|
||||
zap.ByteString("path", req.Path()),
|
||||
zap.ByteString("query", req.QueryArgs().QueryString()),
|
||||
}
|
||||
|
||||
log.Info(logs.Request, fields...)
|
||||
logs.TagField(logs.TagDatapath))
|
||||
h(req)
|
||||
}
|
||||
}
|
||||
|
@ -807,7 +892,7 @@ func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
|||
if err != nil {
|
||||
log := utils.GetReqLogOrDefault(reqCtx, a.log)
|
||||
|
||||
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err))
|
||||
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
handler.ResponseError(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
@ -866,17 +951,17 @@ func (a *app) initServers(ctx context.Context) {
|
|||
if err != nil {
|
||||
a.unbindServers = append(a.unbindServers, serverInfo)
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
|
||||
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err), logs.TagField(logs.TagApp))...)
|
||||
continue
|
||||
}
|
||||
a.metrics.MarkHealthy(serverInfo.Address)
|
||||
|
||||
a.servers = append(a.servers, srv)
|
||||
a.log.Info(logs.AddServer, fields...)
|
||||
a.log.Info(logs.AddServer, append(fields, logs.TagField(logs.TagApp))...)
|
||||
}
|
||||
|
||||
if len(a.servers) == 0 {
|
||||
a.log.Fatal(logs.NoHealthyServers)
|
||||
a.log.Fatal(logs.NoHealthyServers, logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -950,13 +1035,14 @@ func (a *app) initTracing(ctx context.Context) {
|
|||
if trustedCa := a.config().GetString(cfgTracingTrustedCa); trustedCa != "" {
|
||||
caBytes, err := os.ReadFile(trustedCa)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
return
|
||||
}
|
||||
certPool := x509.NewCertPool()
|
||||
ok := certPool.AppendCertsFromPEM(caBytes)
|
||||
if !ok {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"))
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"),
|
||||
logs.TagField(logs.TagApp))
|
||||
return
|
||||
}
|
||||
cfg.ServerCaCertPool = certPool
|
||||
|
@ -964,24 +1050,24 @@ func (a *app) initTracing(ctx context.Context) {
|
|||
|
||||
attributes, err := fetchTracingAttributes(a.config())
|
||||
if err != nil {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
return
|
||||
}
|
||||
cfg.Attributes = attributes
|
||||
|
||||
updated, err := tracing.Setup(ctx, cfg)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
}
|
||||
if updated {
|
||||
a.log.Info(logs.TracingConfigUpdated)
|
||||
a.log.Info(logs.TracingConfigUpdated, logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) setRuntimeParameters() {
|
||||
if len(os.Getenv("GOMEMLIMIT")) != 0 {
|
||||
// default limit < yaml limit < app env limit < GOMEMLIMIT
|
||||
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT)
|
||||
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT, logs.TagField(logs.TagApp))
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -990,7 +1076,8 @@ func (a *app) setRuntimeParameters() {
|
|||
if softMemoryLimit != previous {
|
||||
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
||||
zap.Int64("new_value", softMemoryLimit),
|
||||
zap.Int64("old_value", previous))
|
||||
zap.Int64("old_value", previous),
|
||||
logs.TagField(logs.TagApp))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1016,34 +1103,32 @@ func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
|
|||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
a.log.Info(logs.ServerReconnecting)
|
||||
a.log.Info(logs.ServerReconnecting, logs.TagField(logs.TagApp))
|
||||
var failedServers []ServerInfo
|
||||
|
||||
for _, serverInfo := range a.unbindServers {
|
||||
fields := []zap.Field{
|
||||
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||
}
|
||||
|
||||
srv, err := newServer(ctx, serverInfo)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
|
||||
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
failedServers = append(failedServers, serverInfo)
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
continue
|
||||
}
|
||||
|
||||
go func() {
|
||||
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
|
||||
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()), logs.TagField(logs.TagApp))
|
||||
a.metrics.MarkHealthy(serverInfo.Address)
|
||||
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
a.log.Warn(logs.ListenAndServe, zap.Error(err))
|
||||
a.log.Warn(logs.ListenAndServe, zap.Error(err), logs.TagField(logs.TagApp))
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
}
|
||||
}()
|
||||
|
||||
a.servers = append(a.servers, srv)
|
||||
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
|
||||
a.log.Info(logs.ServerReconnectedSuccessfully,
|
||||
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||
logs.TagField(logs.TagApp))
|
||||
}
|
||||
|
||||
a.unbindServers = failedServers
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue