frostfs-http-gw/cmd/http-gw/app.go
Alex Vanin d9cbd302b1 [#121] Add canonicalizer
Some headers might be passed in non-canonical way
by proxy servers, such as 'Authorization' header.
Server does not normalize headers, so we can get
custom object attributes. Therefore, app has to normalize
all non object attribute headers by itself.

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2024-06-26 11:21:21 +03:00

853 lines
22 KiB
Go

package main
import (
"bytes"
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"runtime/debug"
"strconv"
"strings"
"sync"
"syscall"
"time"
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/fasthttp/router"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/viper"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
type (
app struct {
ctx context.Context
log *zap.Logger
logLevel zap.AtomicLevel
pool *pool.Pool
treePool *treepool.Pool
key *keys.PrivateKey
owner *user.ID
cfg *viper.Viper
webServer *fasthttp.Server
webDone chan struct{}
resolver *resolver.ContainerResolver
metrics *gateMetrics
services []*metrics.Service
settings *appSettings
servers []Server
unbindServers []ServerInfo
mu sync.RWMutex
}
// App is an interface for the main gateway function.
App interface {
Wait()
Serve()
}
// Option is an application option.
Option func(a *app)
gateMetrics struct {
logger *zap.Logger
provider *metrics.GateMetrics
mu sync.RWMutex
enabled bool
}
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
appSettings struct {
reconnectInterval time.Duration
mu sync.RWMutex
defaultTimestamp bool
zipCompression bool
clientCut bool
bufferMaxSizeForPut uint64
namespaceHeader string
defaultNamespaces []string
}
)
// WithLogger returns Option to set a specific logger.
func WithLogger(l *zap.Logger, lvl zap.AtomicLevel) Option {
return func(a *app) {
if l == nil {
return
}
a.log = l
a.logLevel = lvl
}
}
// WithConfig returns Option to use specific Viper configuration.
func WithConfig(c *viper.Viper) Option {
return func(a *app) {
if c == nil {
return
}
a.cfg = c
}
}
func newApp(ctx context.Context, opt ...Option) App {
a := &app{
ctx: ctx,
log: zap.L(),
cfg: viper.GetViper(),
webServer: new(fasthttp.Server),
webDone: make(chan struct{}),
}
for i := range opt {
opt[i](a)
}
// -- setup FastHTTP server --
a.webServer.Name = "frost-http-gw"
a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize)
a.webServer.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize)
a.webServer.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout)
a.webServer.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout)
a.webServer.DisableHeaderNamesNormalizing = true
a.webServer.NoDefaultServerHeader = true
a.webServer.NoDefaultContentType = true
a.webServer.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize)
a.webServer.DisablePreParseMultipartForm = true
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
// -- -- -- -- -- -- -- -- -- -- -- -- -- --
a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg)
var owner user.ID
user.IDFromKey(&owner, a.key.PrivateKey.PublicKey)
a.owner = &owner
a.setRuntimeParameters()
a.initAppSettings()
a.initResolver()
a.initMetrics()
a.initTracing(ctx)
return a
}
func (s *appSettings) DefaultTimestamp() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.defaultTimestamp
}
func (s *appSettings) setDefaultTimestamp(val bool) {
s.mu.Lock()
s.defaultTimestamp = val
s.mu.Unlock()
}
func (s *appSettings) ZipCompression() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.zipCompression
}
func (s *appSettings) setZipCompression(val bool) {
s.mu.Lock()
s.zipCompression = val
s.mu.Unlock()
}
func (s *appSettings) ClientCut() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.clientCut
}
func (s *appSettings) setClientCut(val bool) {
s.mu.Lock()
s.clientCut = val
s.mu.Unlock()
}
func (s *appSettings) BufferMaxSizeForPut() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.bufferMaxSizeForPut
}
func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
s.mu.Lock()
s.bufferMaxSizeForPut = val
s.mu.Unlock()
}
func (a *app) initAppSettings() {
a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg),
}
a.updateSettings()
}
func (a *app) initResolver() {
var err error
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
if err != nil {
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err))
}
}
func (a *app) getResolverConfig() ([]string, *resolver.Config) {
resolveCfg := &resolver.Config{
FrostFS: resolver.NewFrostFSResolver(a.pool),
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
Settings: a.settings,
}
order := a.cfg.GetStringSlice(cfgResolveOrder)
if resolveCfg.RPCAddress == "" {
order = remove(order, resolver.NNSResolver)
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
}
if len(order) == 0 {
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty)
}
return order, resolveCfg
}
func (a *app) initMetrics() {
gateMetricsProvider := metrics.NewGateMetrics(a.pool)
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting)
}
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
if !enabled {
logger.Warn(logs.MetricsAreDisabled)
}
return &gateMetrics{
logger: logger,
provider: provider,
enabled: enabled,
}
}
func (m *gateMetrics) isEnabled() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.enabled
}
func (m *gateMetrics) SetEnabled(enabled bool) {
if !enabled {
m.logger.Warn(logs.MetricsAreDisabled)
}
m.mu.Lock()
m.enabled = enabled
m.mu.Unlock()
}
func (m *gateMetrics) SetHealth(status metrics.HealthStatus) {
if !m.isEnabled() {
return
}
m.provider.SetHealth(status)
}
func (m *gateMetrics) SetVersion(ver string) {
if !m.isEnabled() {
return
}
m.provider.SetVersion(ver)
}
func (m *gateMetrics) Shutdown() {
m.mu.Lock()
if m.enabled {
m.provider.SetHealth(metrics.HealthStatusShuttingDown)
m.enabled = false
}
m.provider.Unregister()
m.mu.Unlock()
}
func (m *gateMetrics) MarkHealthy(endpoint string) {
if !m.isEnabled() {
return
}
m.provider.MarkHealthy(endpoint)
}
func (m *gateMetrics) MarkUnhealthy(endpoint string) {
if !m.isEnabled() {
return
}
m.provider.MarkUnhealthy(endpoint)
}
func remove(list []string, element string) []string {
for i, item := range list {
if item == element {
return append(list[:i], list[i+1:]...)
}
}
return list
}
func getFrostFSKey(cfg *viper.Viper, log *zap.Logger) (*keys.PrivateKey, error) {
walletPath := cfg.GetString(cfgWalletPath)
if len(walletPath) == 0 {
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun)
key, err := keys.NewPrivateKey()
if err != nil {
return nil, err
}
return key, nil
}
w, err := wallet.NewWalletFromFile(walletPath)
if err != nil {
return nil, err
}
var password *string
if cfg.IsSet(cfgWalletPassphrase) {
pwd := cfg.GetString(cfgWalletPassphrase)
password = &pwd
}
address := cfg.GetString(cfgWalletAddress)
return getKeyFromWallet(w, address, password)
}
func getKeyFromWallet(w *wallet.Wallet, addrStr string, password *string) (*keys.PrivateKey, error) {
var addr util.Uint160
var err error
if addrStr == "" {
addr = w.GetChangeAddress()
} else {
addr, err = flags.ParseAddress(addrStr)
if err != nil {
return nil, fmt.Errorf("invalid address")
}
}
acc := w.GetAccount(addr)
if acc == nil {
return nil, fmt.Errorf("couldn't find wallet account for %s", addrStr)
}
if password == nil {
pwd, err := input.ReadPassword("Enter password > ")
if err != nil {
return nil, fmt.Errorf("couldn't read password")
}
password = &pwd
}
if err := acc.Decrypt(*password, w.Scrypt); err != nil {
return nil, fmt.Errorf("couldn't decrypt account: %w", err)
}
return acc.PrivateKey(), nil
}
func (a *app) Wait() {
a.log.Info(logs.StartingApplication, zap.String("app_name", "frostfs-http-gw"), zap.String("version", Version))
a.metrics.SetVersion(Version)
a.setHealthStatus()
<-a.webDone // wait for web-server to be stopped
}
func (a *app) setHealthStatus() {
a.metrics.SetHealth(metrics.HealthStatusReady)
}
func (a *app) Serve() {
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(services.NewPoolWrapper(a.treePool)))
// Configure router.
a.configureRouter(handler)
a.startServices()
a.initServers(a.ctx)
servs := a.getServers()
for i := range servs {
go func(i int) {
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
}
}(i)
}
if len(a.unbindServers) != 0 {
a.scheduleReconnect(a.ctx, a.webServer)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
LOOP:
for {
select {
case <-a.ctx.Done():
break LOOP
case <-sigs:
a.configReload(a.ctx)
}
}
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()))
a.metrics.Shutdown()
a.stopServices()
a.shutdownTracing()
close(a.webDone)
}
func (a *app) shutdownTracing() {
const tracingShutdownTimeout = 5 * time.Second
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
defer cancel()
if err := tracing.Shutdown(shdnCtx); err != nil {
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err))
}
}
func (a *app) configReload(ctx context.Context) {
a.log.Info(logs.SIGHUPConfigReloadStarted)
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
return
}
if err := readInConfig(a.cfg); err != nil {
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
return
}
if lvl, err := getLogLevel(a.cfg); err != nil {
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
} else {
a.logLevel.SetLevel(lvl)
}
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
}
if err := a.updateServers(); err != nil {
a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
}
a.setRuntimeParameters()
a.stopServices()
a.startServices()
a.updateSettings()
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.initTracing(ctx)
a.setHealthStatus()
a.log.Info(logs.SIGHUPConfigReloadCompleted)
}
func (a *app) updateSettings() {
a.settings.setDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
a.settings.setZipCompression(a.cfg.GetBool(cfgZipCompression))
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
a.settings.setNamespaceHeader(a.cfg.GetString(cfgResolveNamespaceHeader))
a.settings.setDefaultNamespaces(a.cfg.GetStringSlice(cfgResolveDefaultNamespaces))
}
func (a *app) startServices() {
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
pprofService := metrics.NewPprofService(a.log, pprofConfig)
a.services = append(a.services, pprofService)
go pprofService.Start()
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
a.services = append(a.services, prometheusService)
go prometheusService.Start()
}
func (a *app) stopServices() {
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()
for _, svc := range a.services {
svc.ShutDown(ctx)
}
}
func (a *app) configureRouter(handler *handler.Handler) {
r := router.New()
r.RedirectTrailingSlash = true
r.NotFound = func(r *fasthttp.RequestCtx) {
response.Error(r, "Not found", fasthttp.StatusNotFound)
}
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
}
r.POST("/upload/{cid}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.Upload))))))
a.log.Info(logs.AddedPathUploadCid)
r.GET("/get/{cid}/{oid:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAddressOrBucketName))))))
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAddressOrBucketName))))))
a.log.Info(logs.AddedPathGetCidOid)
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAttribute))))))
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAttribute))))))
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadZipped))))))
a.log.Info(logs.AddedPathZipCidPrefix)
a.webServer.Handler = r.Handler
}
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
a.log.Info(logs.Request, zap.String("remote", req.RemoteAddr().String()),
zap.ByteString("method", req.Method()),
zap.ByteString("path", req.Path()),
zap.ByteString("query", req.QueryArgs().QueryString()),
zap.Uint64("id", req.ID()))
h(req)
}
}
func (a *app) canonicalizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
// regardless of DisableHeaderNamesNormalizing setting, some headers
// MUST be normalized in order to process execution. They are normalized
// here.
toAddKeys := make([][]byte, 0, 10)
toAddValues := make([][]byte, 0, 10)
prefix := []byte(utils.UserAttributeHeaderPrefix)
req.Request.Header.VisitAll(func(k, v []byte) {
if bytes.HasPrefix(k, prefix) {
return
}
toAddKeys = append(toAddKeys, k)
toAddValues = append(toAddValues, v)
})
// this is safe to do after all headers were read into header structure
req.Request.Header.EnableNormalizing()
for i := range toAddKeys {
req.Request.Header.SetBytesKV(toAddKeys[i], toAddValues[i])
}
// return normalization setting back
req.Request.Header.DisableNormalizing()
h(req)
}
}
func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
appCtx, err := tokens.StoreBearerTokenAppCtx(a.ctx, req)
if err != nil {
a.log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Uint64("id", req.ID()), zap.Error(err))
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return
}
utils.SetContextToRequest(appCtx, req)
h(req)
}
}
func (a *app) tracer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
appCtx := utils.GetContextFromRequest(req)
appCtx, span := utils.StartHTTPServerSpan(appCtx, req, "REQUEST")
defer func() {
utils.SetHTTPTraceInfo(appCtx, span, req)
span.End()
}()
appCtx = treepool.SetRequestID(appCtx, strconv.FormatUint(req.ID(), 10))
utils.SetContextToRequest(appCtx, req)
h(req)
}
}
func (a *app) reqNamespace(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
appCtx := utils.GetContextFromRequest(req)
nsBytes := req.Request.Header.Peek(a.settings.NamespaceHeader())
appCtx = middleware.SetNamespace(appCtx, string(nsBytes))
utils.SetContextToRequest(appCtx, req)
h(req)
}
}
func (a *app) AppParams() *handler.AppParams {
return &handler.AppParams{
Logger: a.log,
FrostFS: frostfs.NewFrostFS(a.pool),
Owner: a.owner,
Resolver: a.resolver,
Cache: cache.NewBucketCache(getCacheOptions(a.cfg, a.log)),
}
}
func (a *app) initServers(ctx context.Context) {
serversInfo := fetchServers(a.cfg, a.log)
a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.unbindServers = append(a.unbindServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
continue
}
a.metrics.MarkHealthy(serverInfo.Address)
a.servers = append(a.servers, srv)
a.log.Info(logs.AddServer, fields...)
}
if len(a.servers) == 0 {
a.log.Fatal(logs.NoHealthyServers)
}
}
func (a *app) updateServers() error {
serversInfo := fetchServers(a.cfg, a.log)
a.mu.Lock()
defer a.mu.Unlock()
var found bool
for _, serverInfo := range serversInfo {
ser := a.getServer(serverInfo.Address)
if ser != nil {
if serverInfo.TLS.Enabled {
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
}
found = true
}
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
found = true
}
}
if !found {
return fmt.Errorf("invalid servers configuration: no known server found")
}
return nil
}
func (a *app) getServers() []Server {
a.mu.RLock()
defer a.mu.RUnlock()
return a.servers
}
func (a *app) getServer(address string) Server {
for i := range a.servers {
if a.servers[i].Address() == address {
return a.servers[i]
}
}
return nil
}
func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
for i := range a.unbindServers {
if a.unbindServers[i].Address == info.Address {
a.unbindServers[i] = info
return true
}
}
return false
}
func (a *app) initTracing(ctx context.Context) {
instanceID := ""
if len(a.servers) > 0 {
instanceID = a.servers[0].Address()
}
cfg := tracing.Config{
Enabled: a.cfg.GetBool(cfgTracingEnabled),
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
Service: "frostfs-http-gw",
InstanceID: instanceID,
Version: Version,
}
updated, err := tracing.Setup(ctx, cfg)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
}
if updated {
a.log.Info(logs.TracingConfigUpdated)
}
}
func (a *app) setRuntimeParameters() {
if len(os.Getenv("GOMEMLIMIT")) != 0 {
// default limit < yaml limit < app env limit < GOMEMLIMIT
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT)
return
}
softMemoryLimit := fetchSoftMemoryLimit(a.cfg)
previous := debug.SetMemoryLimit(softMemoryLimit)
if softMemoryLimit != previous {
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
zap.Int64("new_value", softMemoryLimit),
zap.Int64("old_value", previous))
}
}
func (s *appSettings) NamespaceHeader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.namespaceHeader
}
func (s *appSettings) setNamespaceHeader(nsHeader string) {
s.mu.Lock()
s.namespaceHeader = nsHeader
s.mu.Unlock()
}
func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) {
s.mu.RLock()
namespaces := s.defaultNamespaces
s.mu.RUnlock()
if slices.Contains(namespaces, ns) {
return v2container.SysAttributeZoneDefault, true
}
return ns + ".ns", false
}
func (s *appSettings) setDefaultNamespaces(namespaces []string) {
for i := range namespaces { // to be set namespaces in env variable as `HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"`
namespaces[i] = strings.Trim(namespaces[i], "\"")
}
s.mu.Lock()
s.defaultNamespaces = namespaces
s.mu.Unlock()
}
func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
go func() {
t := time.NewTicker(a.settings.reconnectInterval)
defer t.Stop()
for {
select {
case <-t.C:
if a.tryReconnect(ctx, srv) {
return
}
t.Reset(a.settings.reconnectInterval)
case <-ctx.Done():
return
}
}
}()
}
func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
a.mu.Lock()
defer a.mu.Unlock()
a.log.Info(logs.ServerReconnecting)
var failedServers []ServerInfo
for _, serverInfo := range a.unbindServers {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
failedServers = append(failedServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
continue
}
go func() {
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
a.metrics.MarkHealthy(serverInfo.Address)
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
a.log.Warn(logs.ListenAndServe, zap.Error(err))
a.metrics.MarkUnhealthy(serverInfo.Address)
}
}()
a.servers = append(a.servers, srv)
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
}
a.unbindServers = failedServers
return len(a.unbindServers) == 0
}