frostfs-http-gw/cmd/http-gw/app.go
Nikita Zinkevich 6523816d9b
All checks were successful
/ Vulncheck (pull_request) Successful in 1m48s
/ DCO (pull_request) Successful in 2m10s
/ Builds (pull_request) Successful in 2m1s
/ Lint (pull_request) Successful in 3m27s
/ Tests (pull_request) Successful in 2m3s
[#469] Update frostfs-sdk-go with new tree service client
Add tree service's GetBucketSettings to use them to check for protocol to use (S3 or native). Also add mock implementations for this and GetLatestVersion methods.

Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
2024-12-09 14:45:24 +03:00

1033 lines
27 KiB
Go

package main
import (
"bytes"
"context"
"crypto/x509"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"runtime/debug"
"strconv"
"strings"
"sync"
"syscall"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
v2container "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/fasthttp/router"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper"
"github.com/valyala/fasthttp"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
type (
app struct {
ctx context.Context
log *zap.Logger
logLevel zap.AtomicLevel
pool *pool.Pool
treePool *treepool.Pool
key *keys.PrivateKey
owner *user.ID
cfg *viper.Viper
webServer *fasthttp.Server
webDone chan struct{}
resolver *resolver.ContainerResolver
metrics *gateMetrics
services []*metrics.Service
settings *appSettings
loggerSettings *loggerSettings
servers []Server
unbindServers []ServerInfo
mu sync.RWMutex
}
loggerSettings struct {
mu sync.RWMutex
appMetrics *metrics.GateMetrics
}
// App is an interface for the main gateway function.
App interface {
Wait()
Serve()
}
gateMetrics struct {
logger *zap.Logger
provider *metrics.GateMetrics
mu sync.RWMutex
enabled bool
}
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
appSettings struct {
reconnectInterval time.Duration
dialerSource *internalnet.DialerSource
workerPoolSize int
mu sync.RWMutex
defaultTimestamp bool
zipCompression bool
clientCut bool
returnIndexPage bool
indexPageTemplate string
bufferMaxSizeForPut uint64
namespaceHeader string
defaultNamespaces []string
corsAllowOrigin string
corsAllowMethods []string
corsAllowHeaders []string
corsExposeHeaders []string
corsAllowCredentials bool
corsMaxAge int
}
CORS struct {
AllowOrigin string
AllowMethods []string
AllowHeaders []string
ExposeHeaders []string
AllowCredentials bool
MaxAge int
}
)
func newApp(ctx context.Context, v *viper.Viper) App {
logSettings := &loggerSettings{}
log := pickLogger(v, logSettings)
a := &app{
ctx: ctx,
log: log.logger,
cfg: v,
loggerSettings: logSettings,
webServer: new(fasthttp.Server),
webDone: make(chan struct{}),
}
a.initAppSettings()
// -- setup FastHTTP server --
a.webServer.Name = "frost-http-gw"
a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize)
a.webServer.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize)
a.webServer.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout)
a.webServer.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout)
a.webServer.DisableHeaderNamesNormalizing = true
a.webServer.NoDefaultServerHeader = true
a.webServer.NoDefaultContentType = true
a.webServer.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize)
a.webServer.DisablePreParseMultipartForm = true
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
// -- -- -- -- -- -- -- -- -- -- -- -- -- --
a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg, a.settings.dialerSource)
var owner user.ID
user.IDFromKey(&owner, a.key.PrivateKey.PublicKey)
a.owner = &owner
a.setRuntimeParameters()
a.initResolver()
a.initMetrics()
a.initTracing(ctx)
return a
}
func (a *app) initAppSettings() {
a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg),
dialerSource: getDialerSource(a.log, a.cfg),
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
}
a.settings.update(a.cfg, a.log)
}
func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
defaultTimestamp := v.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)
zipCompression := v.GetBool(cfgZipCompression)
returnIndexPage := v.GetBool(cfgIndexPageEnabled)
clientCut := v.GetBool(cfgClientCut)
bufferMaxSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut)
namespaceHeader := v.GetString(cfgResolveNamespaceHeader)
defaultNamespaces := fetchDefaultNamespaces(v)
indexPage, indexEnabled := fetchIndexPageTemplate(v, l)
corsAllowOrigin := v.GetString(cfgCORSAllowOrigin)
corsAllowMethods := v.GetStringSlice(cfgCORSAllowMethods)
corsAllowHeaders := v.GetStringSlice(cfgCORSAllowHeaders)
corsExposeHeaders := v.GetStringSlice(cfgCORSExposeHeaders)
corsAllowCredentials := v.GetBool(cfgCORSAllowCredentials)
corsMaxAge := fetchCORSMaxAge(v)
s.mu.Lock()
defer s.mu.Unlock()
s.defaultTimestamp = defaultTimestamp
s.zipCompression = zipCompression
s.returnIndexPage = returnIndexPage
s.clientCut = clientCut
s.bufferMaxSizeForPut = bufferMaxSizeForPut
s.namespaceHeader = namespaceHeader
s.defaultNamespaces = defaultNamespaces
s.returnIndexPage = indexEnabled
s.indexPageTemplate = indexPage
s.corsAllowOrigin = corsAllowOrigin
s.corsAllowMethods = corsAllowMethods
s.corsAllowHeaders = corsAllowHeaders
s.corsExposeHeaders = corsExposeHeaders
s.corsAllowCredentials = corsAllowCredentials
s.corsMaxAge = corsMaxAge
}
func (s *loggerSettings) DroppedLogsInc() {
s.mu.RLock()
defer s.mu.RUnlock()
if s.appMetrics != nil {
s.appMetrics.DroppedLogsInc()
}
}
func (s *loggerSettings) setMetrics(appMetrics *metrics.GateMetrics) {
s.mu.Lock()
defer s.mu.Unlock()
s.appMetrics = appMetrics
}
func (s *appSettings) DefaultTimestamp() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.defaultTimestamp
}
func (s *appSettings) ZipCompression() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.zipCompression
}
func (s *appSettings) IndexPageEnabled() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.returnIndexPage
}
func (s *appSettings) IndexPageTemplate() string {
s.mu.RLock()
defer s.mu.RUnlock()
if s.indexPageTemplate == "" {
return templates.DefaultIndexTemplate
}
return s.indexPageTemplate
}
func (s *appSettings) CORS() CORS {
s.mu.RLock()
defer s.mu.RUnlock()
allowMethods := make([]string, len(s.corsAllowMethods))
copy(allowMethods, s.corsAllowMethods)
allowHeaders := make([]string, len(s.corsAllowHeaders))
copy(allowHeaders, s.corsAllowHeaders)
exposeHeaders := make([]string, len(s.corsExposeHeaders))
copy(exposeHeaders, s.corsExposeHeaders)
return CORS{
AllowOrigin: s.corsAllowOrigin,
AllowMethods: allowMethods,
AllowHeaders: allowHeaders,
ExposeHeaders: exposeHeaders,
AllowCredentials: s.corsAllowCredentials,
MaxAge: s.corsMaxAge,
}
}
func (s *appSettings) ClientCut() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.clientCut
}
func (s *appSettings) BufferMaxSizeForPut() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.bufferMaxSizeForPut
}
func (s *appSettings) NamespaceHeader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.namespaceHeader
}
func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) {
s.mu.RLock()
namespaces := s.defaultNamespaces
s.mu.RUnlock()
if slices.Contains(namespaces, ns) {
return v2container.SysAttributeZoneDefault, true
}
return ns + ".ns", false
}
func (a *app) initResolver() {
var err error
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
if err != nil {
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err))
}
}
func (a *app) getResolverConfig() ([]string, *resolver.Config) {
resolveCfg := &resolver.Config{
FrostFS: frostfs.NewResolverFrostFS(a.pool),
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
Settings: a.settings,
}
order := a.cfg.GetStringSlice(cfgResolveOrder)
if resolveCfg.RPCAddress == "" {
order = remove(order, resolver.NNSResolver)
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
}
if len(order) == 0 {
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty)
}
return order, resolveCfg
}
func (a *app) initMetrics() {
gateMetricsProvider := metrics.NewGateMetrics(a.pool)
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting)
a.loggerSettings.setMetrics(a.metrics.provider)
}
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
if !enabled {
logger.Warn(logs.MetricsAreDisabled)
}
return &gateMetrics{
logger: logger,
provider: provider,
enabled: enabled,
}
}
func (m *gateMetrics) isEnabled() bool {
m.mu.RLock()
defer m.mu.RUnlock()
return m.enabled
}
func (m *gateMetrics) SetEnabled(enabled bool) {
if !enabled {
m.logger.Warn(logs.MetricsAreDisabled)
}
m.mu.Lock()
m.enabled = enabled
m.mu.Unlock()
}
func (m *gateMetrics) SetHealth(status metrics.HealthStatus) {
if !m.isEnabled() {
return
}
m.provider.SetHealth(status)
}
func (m *gateMetrics) SetVersion(ver string) {
if !m.isEnabled() {
return
}
m.provider.SetVersion(ver)
}
func (m *gateMetrics) Shutdown() {
m.mu.Lock()
if m.enabled {
m.provider.SetHealth(metrics.HealthStatusShuttingDown)
m.enabled = false
}
m.provider.Unregister()
m.mu.Unlock()
}
func (m *gateMetrics) MarkHealthy(endpoint string) {
if !m.isEnabled() {
return
}
m.provider.MarkHealthy(endpoint)
}
func (m *gateMetrics) MarkUnhealthy(endpoint string) {
if !m.isEnabled() {
return
}
m.provider.MarkUnhealthy(endpoint)
}
func remove(list []string, element string) []string {
for i, item := range list {
if item == element {
return append(list[:i], list[i+1:]...)
}
}
return list
}
func getFrostFSKey(cfg *viper.Viper, log *zap.Logger) (*keys.PrivateKey, error) {
walletPath := cfg.GetString(cfgWalletPath)
if len(walletPath) == 0 {
log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun)
key, err := keys.NewPrivateKey()
if err != nil {
return nil, err
}
return key, nil
}
w, err := wallet.NewWalletFromFile(walletPath)
if err != nil {
return nil, err
}
var password *string
if cfg.IsSet(cfgWalletPassphrase) {
pwd := cfg.GetString(cfgWalletPassphrase)
password = &pwd
}
address := cfg.GetString(cfgWalletAddress)
return getKeyFromWallet(w, address, password)
}
func getKeyFromWallet(w *wallet.Wallet, addrStr string, password *string) (*keys.PrivateKey, error) {
var addr util.Uint160
var err error
if addrStr == "" {
addr = w.GetChangeAddress()
} else {
addr, err = flags.ParseAddress(addrStr)
if err != nil {
return nil, fmt.Errorf("invalid address")
}
}
acc := w.GetAccount(addr)
if acc == nil {
return nil, fmt.Errorf("couldn't find wallet account for %s", addrStr)
}
if password == nil {
pwd, err := input.ReadPassword("Enter password > ")
if err != nil {
return nil, fmt.Errorf("couldn't read password")
}
password = &pwd
}
if err := acc.Decrypt(*password, w.Scrypt); err != nil {
return nil, fmt.Errorf("couldn't decrypt account: %w", err)
}
return acc.PrivateKey(), nil
}
func (a *app) Wait() {
a.log.Info(logs.StartingApplication, zap.String("app_name", "frostfs-http-gw"), zap.String("version", Version))
a.metrics.SetVersion(Version)
a.setHealthStatus()
<-a.webDone // wait for web-server to be stopped
}
func (a *app) setHealthStatus() {
a.metrics.SetHealth(metrics.HealthStatusReady)
}
func (a *app) Serve() {
workerPool := a.initWorkerPool()
defer func() {
workerPool.Release()
close(a.webDone)
}()
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool)
// Configure router.
a.configureRouter(handler)
a.startServices()
a.initServers(a.ctx)
servs := a.getServers()
for i := range servs {
go func(i int) {
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
}
}(i)
}
if len(a.unbindServers) != 0 {
a.scheduleReconnect(a.ctx, a.webServer)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
LOOP:
for {
select {
case <-a.ctx.Done():
break LOOP
case <-sigs:
a.configReload(a.ctx)
}
}
a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown()))
a.metrics.Shutdown()
a.stopServices()
a.shutdownTracing()
}
func (a *app) initWorkerPool() *ants.Pool {
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
if err != nil {
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
}
return workerPool
}
func (a *app) shutdownTracing() {
const tracingShutdownTimeout = 5 * time.Second
shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout)
defer cancel()
if err := tracing.Shutdown(shdnCtx); err != nil {
a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err))
}
}
func (a *app) configReload(ctx context.Context) {
a.log.Info(logs.SIGHUPConfigReloadStarted)
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
return
}
if err := readInConfig(a.cfg); err != nil {
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
return
}
if lvl, err := getLogLevel(a.cfg); err != nil {
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
} else {
a.logLevel.SetLevel(lvl)
}
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
}
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
}
if err := a.updateServers(); err != nil {
a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err))
}
a.setRuntimeParameters()
a.stopServices()
a.startServices()
a.settings.update(a.cfg, a.log)
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
a.initTracing(ctx)
a.setHealthStatus()
a.log.Info(logs.SIGHUPConfigReloadCompleted)
}
func (a *app) startServices() {
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
pprofService := metrics.NewPprofService(a.log, pprofConfig)
a.services = append(a.services, pprofService)
go pprofService.Start()
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
a.services = append(a.services, prometheusService)
go prometheusService.Start()
}
func (a *app) stopServices() {
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()
for _, svc := range a.services {
svc.ShutDown(ctx)
}
}
func (a *app) configureRouter(handler *handler.Handler) {
r := router.New()
r.RedirectTrailingSlash = true
r.NotFound = func(r *fasthttp.RequestCtx) {
response.Error(r, "Not found", fasthttp.StatusNotFound)
}
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
}
r.POST("/upload/{cid}", a.addMiddlewares(handler.Upload))
r.OPTIONS("/upload/{cid}", a.addPreflight())
a.log.Info(logs.AddedPathUploadCid)
r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(handler.DownloadByAddressOrBucketName))
r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(handler.HeadByAddressOrBucketName))
r.OPTIONS("/get/{cid}/{oid:*}", a.addPreflight())
a.log.Info(logs.AddedPathGetCidOid)
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(handler.DownloadByAttribute))
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(handler.HeadByAttribute))
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(handler.DownloadZipped))
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
a.log.Info(logs.AddedPathZipCidPrefix)
a.webServer.Handler = r.Handler
}
func (a *app) addMiddlewares(h fasthttp.RequestHandler) fasthttp.RequestHandler {
list := []func(fasthttp.RequestHandler) fasthttp.RequestHandler{
a.tracer,
a.logger,
a.canonicalizer,
a.tokenizer,
a.reqNamespace,
a.cors,
}
for i := len(list) - 1; i >= 0; i-- {
h = list[i](h)
}
return h
}
func (a *app) addPreflight() fasthttp.RequestHandler {
list := []func(fasthttp.RequestHandler) fasthttp.RequestHandler{
a.tracer,
a.logger,
a.reqNamespace,
}
h := a.preflightHandler
for i := len(list) - 1; i >= 0; i-- {
h = list[i](h)
}
return h
}
func (a *app) preflightHandler(c *fasthttp.RequestCtx) {
cors := a.settings.CORS()
setCORSHeaders(c, cors)
}
func (a *app) cors(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(c *fasthttp.RequestCtx) {
h(c)
code := c.Response.StatusCode()
if code >= fasthttp.StatusOK && code < fasthttp.StatusMultipleChoices {
cors := a.settings.CORS()
setCORSHeaders(c, cors)
}
}
}
func setCORSHeaders(c *fasthttp.RequestCtx, cors CORS) {
c.Response.Header.Set(fasthttp.HeaderAccessControlMaxAge, strconv.Itoa(cors.MaxAge))
if len(cors.AllowOrigin) != 0 {
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, cors.AllowOrigin)
}
if len(cors.AllowMethods) != 0 {
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowMethods, strings.Join(cors.AllowMethods, ","))
}
if len(cors.AllowHeaders) != 0 {
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowHeaders, strings.Join(cors.AllowHeaders, ","))
}
if len(cors.ExposeHeaders) != 0 {
c.Response.Header.Set(fasthttp.HeaderAccessControlExposeHeaders, strings.Join(cors.ExposeHeaders, ","))
}
if cors.AllowCredentials {
c.Response.Header.Set(fasthttp.HeaderAccessControlAllowCredentials, "true")
}
}
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
requiredFields := []zap.Field{zap.Uint64("id", req.ID())}
reqCtx := utils.GetContextFromRequest(req)
if traceID := trace.SpanFromContext(reqCtx).SpanContext().TraceID(); traceID.IsValid() {
requiredFields = append(requiredFields, zap.String("trace_id", traceID.String()))
}
log := a.log.With(requiredFields...)
reqCtx = utils.SetReqLog(reqCtx, log)
utils.SetContextToRequest(reqCtx, req)
fields := []zap.Field{
zap.String("remote", req.RemoteAddr().String()),
zap.ByteString("method", req.Method()),
zap.ByteString("path", req.Path()),
zap.ByteString("query", req.QueryArgs().QueryString()),
}
log.Info(logs.Request, fields...)
h(req)
}
}
func (a *app) canonicalizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
// regardless of DisableHeaderNamesNormalizing setting, some headers
// MUST be normalized in order to process execution. They are normalized
// here.
toAddKeys := make([][]byte, 0, 10)
toAddValues := make([][]byte, 0, 10)
prefix := []byte(utils.UserAttributeHeaderPrefix)
req.Request.Header.VisitAll(func(k, v []byte) {
if bytes.HasPrefix(k, prefix) {
return
}
toAddKeys = append(toAddKeys, k)
toAddValues = append(toAddValues, v)
})
// this is safe to do after all headers were read into header structure
req.Request.Header.EnableNormalizing()
for i := range toAddKeys {
req.Request.Header.SetBytesKV(toAddKeys[i], toAddValues[i])
}
// return normalization setting back
req.Request.Header.DisableNormalizing()
h(req)
}
}
func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
reqCtx := utils.GetContextFromRequest(req)
appCtx, err := tokens.StoreBearerTokenAppCtx(reqCtx, req)
if err != nil {
log := utils.GetReqLogOrDefault(reqCtx, a.log)
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err))
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return
}
utils.SetContextToRequest(appCtx, req)
h(req)
}
}
func (a *app) tracer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
appCtx, span := utils.StartHTTPServerSpan(a.ctx, req, "REQUEST")
defer func() {
utils.SetHTTPTraceInfo(appCtx, span, req)
span.End()
}()
appCtx = treepool.SetRequestID(appCtx, strconv.FormatUint(req.ID(), 10))
utils.SetContextToRequest(appCtx, req)
h(req)
}
}
func (a *app) reqNamespace(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
appCtx := utils.GetContextFromRequest(req)
nsBytes := req.Request.Header.Peek(a.settings.NamespaceHeader())
appCtx = middleware.SetNamespace(appCtx, string(nsBytes))
utils.SetContextToRequest(appCtx, req)
h(req)
}
}
func (a *app) AppParams() *handler.AppParams {
return &handler.AppParams{
Logger: a.log,
FrostFS: frostfs.NewFrostFS(a.pool),
Owner: a.owner,
Resolver: a.resolver,
Cache: cache.NewBucketCache(getCacheOptions(a.cfg, a.log)),
}
}
func (a *app) initServers(ctx context.Context) {
serversInfo := fetchServers(a.cfg, a.log)
a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.unbindServers = append(a.unbindServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
continue
}
a.metrics.MarkHealthy(serverInfo.Address)
a.servers = append(a.servers, srv)
a.log.Info(logs.AddServer, fields...)
}
if len(a.servers) == 0 {
a.log.Fatal(logs.NoHealthyServers)
}
}
func (a *app) updateServers() error {
serversInfo := fetchServers(a.cfg, a.log)
a.mu.Lock()
defer a.mu.Unlock()
var found bool
for _, serverInfo := range serversInfo {
ser := a.getServer(serverInfo.Address)
if ser != nil {
if serverInfo.TLS.Enabled {
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
}
found = true
}
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
found = true
}
}
if !found {
return fmt.Errorf("invalid servers configuration: no known server found")
}
return nil
}
func (a *app) getServers() []Server {
a.mu.RLock()
defer a.mu.RUnlock()
return a.servers
}
func (a *app) getServer(address string) Server {
for i := range a.servers {
if a.servers[i].Address() == address {
return a.servers[i]
}
}
return nil
}
func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
for i := range a.unbindServers {
if a.unbindServers[i].Address == info.Address {
a.unbindServers[i] = info
return true
}
}
return false
}
func (a *app) initTracing(ctx context.Context) {
instanceID := ""
if len(a.servers) > 0 {
instanceID = a.servers[0].Address()
}
cfg := tracing.Config{
Enabled: a.cfg.GetBool(cfgTracingEnabled),
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
Service: "frostfs-http-gw",
InstanceID: instanceID,
Version: Version,
}
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" {
caBytes, err := os.ReadFile(trustedCa)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
return
}
certPool := x509.NewCertPool()
ok := certPool.AppendCertsFromPEM(caBytes)
if !ok {
a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert"))
return
}
cfg.ServerCaCertPool = certPool
}
attributes, err := fetchTracingAttributes(a.cfg)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
return
}
cfg.Attributes = attributes
updated, err := tracing.Setup(ctx, cfg)
if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
}
if updated {
a.log.Info(logs.TracingConfigUpdated)
}
}
func (a *app) setRuntimeParameters() {
if len(os.Getenv("GOMEMLIMIT")) != 0 {
// default limit < yaml limit < app env limit < GOMEMLIMIT
a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT)
return
}
softMemoryLimit := fetchSoftMemoryLimit(a.cfg)
previous := debug.SetMemoryLimit(softMemoryLimit)
if softMemoryLimit != previous {
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
zap.Int64("new_value", softMemoryLimit),
zap.Int64("old_value", previous))
}
}
func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
go func() {
t := time.NewTicker(a.settings.reconnectInterval)
defer t.Stop()
for {
select {
case <-t.C:
if a.tryReconnect(ctx, srv) {
return
}
t.Reset(a.settings.reconnectInterval)
case <-ctx.Done():
return
}
}
}()
}
func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
a.mu.Lock()
defer a.mu.Unlock()
a.log.Info(logs.ServerReconnecting)
var failedServers []ServerInfo
for _, serverInfo := range a.unbindServers {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
failedServers = append(failedServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
continue
}
go func() {
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
a.metrics.MarkHealthy(serverInfo.Address)
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
a.log.Warn(logs.ListenAndServe, zap.Error(err))
a.metrics.MarkUnhealthy(serverInfo.Address)
}
}()
a.servers = append(a.servers, srv)
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
}
a.unbindServers = failedServers
return len(a.unbindServers) == 0
}