package main import ( "bytes" "context" "crypto/x509" "errors" "fmt" "net/http" "os" "os/signal" "runtime/debug" "strconv" "strings" "sync" "syscall" "time" v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/fasthttp/router" "github.com/nspcc-dev/neo-go/cli/flags" "github.com/nspcc-dev/neo-go/cli/input" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/panjf2000/ants/v2" "github.com/spf13/viper" "github.com/valyala/fasthttp" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "golang.org/x/exp/slices" ) type ( app struct { ctx context.Context log *zap.Logger logLevel zap.AtomicLevel pool *pool.Pool treePool *treepool.Pool key *keys.PrivateKey owner *user.ID cfg *viper.Viper webServer *fasthttp.Server webDone chan struct{} resolver *resolver.ContainerResolver metrics *gateMetrics services []*metrics.Service settings *appSettings loggerSettings *loggerSettings servers []Server unbindServers []ServerInfo mu sync.RWMutex } loggerSettings struct { mu sync.RWMutex appMetrics *metrics.GateMetrics } // App is an interface for the main gateway function. App interface { Wait() Serve() } gateMetrics struct { logger *zap.Logger provider *metrics.GateMetrics mu sync.RWMutex enabled bool } // appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex. appSettings struct { reconnectInterval time.Duration dialerSource *internalnet.DialerSource workerPoolSize int mu sync.RWMutex defaultTimestamp bool archiveCompression bool clientCut bool returnIndexPage bool indexPageTemplate string bufferMaxSizeForPut uint64 namespaceHeader string defaultNamespaces []string corsAllowOrigin string corsAllowMethods []string corsAllowHeaders []string corsExposeHeaders []string corsAllowCredentials bool corsMaxAge int } CORS struct { AllowOrigin string AllowMethods []string AllowHeaders []string ExposeHeaders []string AllowCredentials bool MaxAge int } ) func newApp(ctx context.Context, v *viper.Viper) App { logSettings := &loggerSettings{} log := pickLogger(v, logSettings) a := &app{ ctx: ctx, log: log.logger, cfg: v, loggerSettings: logSettings, webServer: new(fasthttp.Server), webDone: make(chan struct{}), } a.initAppSettings() // -- setup FastHTTP server -- a.webServer.Name = "frost-http-gw" a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize) a.webServer.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize) a.webServer.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout) a.webServer.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout) a.webServer.DisableHeaderNamesNormalizing = true a.webServer.NoDefaultServerHeader = true a.webServer.NoDefaultContentType = true a.webServer.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize) a.webServer.DisablePreParseMultipartForm = true a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody) // -- -- -- -- -- -- -- -- -- -- -- -- -- -- a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg, a.settings.dialerSource) var owner user.ID user.IDFromKey(&owner, a.key.PrivateKey.PublicKey) a.owner = &owner a.setRuntimeParameters() a.initResolver() a.initMetrics() a.initTracing(ctx) return a } func (a *app) initAppSettings() { a.settings = &appSettings{ reconnectInterval: fetchReconnectInterval(a.cfg), dialerSource: getDialerSource(a.log, a.cfg), workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize), } a.settings.update(a.cfg, a.log) } func (s *appSettings) update(v *viper.Viper, l *zap.Logger) { defaultTimestamp := v.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) archiveCompression := v.GetBool(cfgArchiveCompression) returnIndexPage := v.GetBool(cfgIndexPageEnabled) clientCut := v.GetBool(cfgClientCut) bufferMaxSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut) namespaceHeader := v.GetString(cfgResolveNamespaceHeader) defaultNamespaces := fetchDefaultNamespaces(v) indexPage, indexEnabled := fetchIndexPageTemplate(v, l) corsAllowOrigin := v.GetString(cfgCORSAllowOrigin) corsAllowMethods := v.GetStringSlice(cfgCORSAllowMethods) corsAllowHeaders := v.GetStringSlice(cfgCORSAllowHeaders) corsExposeHeaders := v.GetStringSlice(cfgCORSExposeHeaders) corsAllowCredentials := v.GetBool(cfgCORSAllowCredentials) corsMaxAge := fetchCORSMaxAge(v) s.mu.Lock() defer s.mu.Unlock() s.defaultTimestamp = defaultTimestamp s.archiveCompression = archiveCompression s.returnIndexPage = returnIndexPage s.clientCut = clientCut s.bufferMaxSizeForPut = bufferMaxSizeForPut s.namespaceHeader = namespaceHeader s.defaultNamespaces = defaultNamespaces s.returnIndexPage = indexEnabled s.indexPageTemplate = indexPage s.corsAllowOrigin = corsAllowOrigin s.corsAllowMethods = corsAllowMethods s.corsAllowHeaders = corsAllowHeaders s.corsExposeHeaders = corsExposeHeaders s.corsAllowCredentials = corsAllowCredentials s.corsMaxAge = corsMaxAge } func (s *loggerSettings) DroppedLogsInc() { s.mu.RLock() defer s.mu.RUnlock() if s.appMetrics != nil { s.appMetrics.DroppedLogsInc() } } func (s *loggerSettings) setMetrics(appMetrics *metrics.GateMetrics) { s.mu.Lock() defer s.mu.Unlock() s.appMetrics = appMetrics } func (s *appSettings) DefaultTimestamp() bool { s.mu.RLock() defer s.mu.RUnlock() return s.defaultTimestamp } func (s *appSettings) ArchiveCompression() bool { s.mu.RLock() defer s.mu.RUnlock() return s.archiveCompression } func (s *appSettings) IndexPageEnabled() bool { s.mu.RLock() defer s.mu.RUnlock() return s.returnIndexPage } func (s *appSettings) IndexPageTemplate() string { s.mu.RLock() defer s.mu.RUnlock() if s.indexPageTemplate == "" { return templates.DefaultIndexTemplate } return s.indexPageTemplate } func (s *appSettings) CORS() CORS { s.mu.RLock() defer s.mu.RUnlock() allowMethods := make([]string, len(s.corsAllowMethods)) copy(allowMethods, s.corsAllowMethods) allowHeaders := make([]string, len(s.corsAllowHeaders)) copy(allowHeaders, s.corsAllowHeaders) exposeHeaders := make([]string, len(s.corsExposeHeaders)) copy(exposeHeaders, s.corsExposeHeaders) return CORS{ AllowOrigin: s.corsAllowOrigin, AllowMethods: allowMethods, AllowHeaders: allowHeaders, ExposeHeaders: exposeHeaders, AllowCredentials: s.corsAllowCredentials, MaxAge: s.corsMaxAge, } } func (s *appSettings) ClientCut() bool { s.mu.RLock() defer s.mu.RUnlock() return s.clientCut } func (s *appSettings) BufferMaxSizeForPut() uint64 { s.mu.RLock() defer s.mu.RUnlock() return s.bufferMaxSizeForPut } func (s *appSettings) NamespaceHeader() string { s.mu.RLock() defer s.mu.RUnlock() return s.namespaceHeader } func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) { s.mu.RLock() namespaces := s.defaultNamespaces s.mu.RUnlock() if slices.Contains(namespaces, ns) { return v2container.SysAttributeZoneDefault, true } return ns + ".ns", false } func (a *app) initResolver() { var err error a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig()) if err != nil { a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err)) } } func (a *app) getResolverConfig() ([]string, *resolver.Config) { resolveCfg := &resolver.Config{ FrostFS: frostfs.NewResolverFrostFS(a.pool), RPCAddress: a.cfg.GetString(cfgRPCEndpoint), Settings: a.settings, } order := a.cfg.GetStringSlice(cfgResolveOrder) if resolveCfg.RPCAddress == "" { order = remove(order, resolver.NNSResolver) a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided) } if len(order) == 0 { a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty) } return order, resolveCfg } func (a *app) initMetrics() { gateMetricsProvider := metrics.NewGateMetrics(a.pool) a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics.SetHealth(metrics.HealthStatusStarting) a.loggerSettings.setMetrics(a.metrics.provider) } func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics { if !enabled { logger.Warn(logs.MetricsAreDisabled) } return &gateMetrics{ logger: logger, provider: provider, enabled: enabled, } } func (m *gateMetrics) isEnabled() bool { m.mu.RLock() defer m.mu.RUnlock() return m.enabled } func (m *gateMetrics) SetEnabled(enabled bool) { if !enabled { m.logger.Warn(logs.MetricsAreDisabled) } m.mu.Lock() m.enabled = enabled m.mu.Unlock() } func (m *gateMetrics) SetHealth(status metrics.HealthStatus) { if !m.isEnabled() { return } m.provider.SetHealth(status) } func (m *gateMetrics) SetVersion(ver string) { if !m.isEnabled() { return } m.provider.SetVersion(ver) } func (m *gateMetrics) Shutdown() { m.mu.Lock() if m.enabled { m.provider.SetHealth(metrics.HealthStatusShuttingDown) m.enabled = false } m.provider.Unregister() m.mu.Unlock() } func (m *gateMetrics) MarkHealthy(endpoint string) { if !m.isEnabled() { return } m.provider.MarkHealthy(endpoint) } func (m *gateMetrics) MarkUnhealthy(endpoint string) { if !m.isEnabled() { return } m.provider.MarkUnhealthy(endpoint) } func remove(list []string, element string) []string { for i, item := range list { if item == element { return append(list[:i], list[i+1:]...) } } return list } func getFrostFSKey(cfg *viper.Viper, log *zap.Logger) (*keys.PrivateKey, error) { walletPath := cfg.GetString(cfgWalletPath) if len(walletPath) == 0 { log.Info(logs.NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun) key, err := keys.NewPrivateKey() if err != nil { return nil, err } return key, nil } w, err := wallet.NewWalletFromFile(walletPath) if err != nil { return nil, err } var password *string if cfg.IsSet(cfgWalletPassphrase) { pwd := cfg.GetString(cfgWalletPassphrase) password = &pwd } address := cfg.GetString(cfgWalletAddress) return getKeyFromWallet(w, address, password) } func getKeyFromWallet(w *wallet.Wallet, addrStr string, password *string) (*keys.PrivateKey, error) { var addr util.Uint160 var err error if addrStr == "" { addr = w.GetChangeAddress() } else { addr, err = flags.ParseAddress(addrStr) if err != nil { return nil, fmt.Errorf("invalid address") } } acc := w.GetAccount(addr) if acc == nil { return nil, fmt.Errorf("couldn't find wallet account for %s", addrStr) } if password == nil { pwd, err := input.ReadPassword("Enter password > ") if err != nil { return nil, fmt.Errorf("couldn't read password") } password = &pwd } if err := acc.Decrypt(*password, w.Scrypt); err != nil { return nil, fmt.Errorf("couldn't decrypt account: %w", err) } return acc.PrivateKey(), nil } func (a *app) Wait() { a.log.Info(logs.StartingApplication, zap.String("app_name", "frostfs-http-gw"), zap.String("version", Version)) a.metrics.SetVersion(Version) a.setHealthStatus() <-a.webDone // wait for web-server to be stopped } func (a *app) setHealthStatus() { a.metrics.SetHealth(metrics.HealthStatusReady) } func (a *app) Serve() { workerPool := a.initWorkerPool() defer func() { workerPool.Release() close(a.webDone) }() handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool) // Configure router. a.configureRouter(handler) a.startServices() a.initServers(a.ctx) servs := a.getServers() for i := range servs { go func(i int) { a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address())) if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed { a.metrics.MarkUnhealthy(servs[i].Address()) a.log.Fatal(logs.ListenAndServe, zap.Error(err)) } }(i) } if len(a.unbindServers) != 0 { a.scheduleReconnect(a.ctx, a.webServer) } sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGHUP) LOOP: for { select { case <-a.ctx.Done(): break LOOP case <-sigs: a.configReload(a.ctx) } } a.log.Info(logs.ShuttingDownWebServer, zap.Error(a.webServer.Shutdown())) a.metrics.Shutdown() a.stopServices() a.shutdownTracing() } func (a *app) initWorkerPool() *ants.Pool { workerPool, err := ants.NewPool(a.settings.workerPoolSize) if err != nil { a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err)) } return workerPool } func (a *app) shutdownTracing() { const tracingShutdownTimeout = 5 * time.Second shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout) defer cancel() if err := tracing.Shutdown(shdnCtx); err != nil { a.log.Warn(logs.FailedToShutdownTracing, zap.Error(err)) } } func (a *app) configReload(ctx context.Context) { a.log.Info(logs.SIGHUPConfigReloadStarted) if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) { a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed) return } if err := readInConfig(a.cfg); err != nil { a.log.Warn(logs.FailedToReloadConfig, zap.Error(err)) return } if lvl, err := getLogLevel(a.cfg); err != nil { a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err)) } else { a.logLevel.SetLevel(lvl) } if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil { a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err)) } if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil { a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err)) } if err := a.updateServers(); err != nil { a.log.Warn(logs.FailedToReloadServerParameters, zap.Error(err)) } a.setRuntimeParameters() a.stopServices() a.startServices() a.settings.update(a.cfg, a.log) a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled)) a.initTracing(ctx) a.setHealthStatus() a.log.Info(logs.SIGHUPConfigReloadCompleted) } func (a *app) startServices() { pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)} pprofService := metrics.NewPprofService(a.log, pprofConfig) a.services = append(a.services, pprofService) go pprofService.Start() prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)} prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig) a.services = append(a.services, prometheusService) go prometheusService.Start() } func (a *app) stopServices() { ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) defer cancel() for _, svc := range a.services { svc.ShutDown(ctx) } } func (a *app) configureRouter(handler *handler.Handler) { r := router.New() r.RedirectTrailingSlash = true r.NotFound = func(r *fasthttp.RequestCtx) { response.Error(r, "Not found", fasthttp.StatusNotFound) } r.MethodNotAllowed = func(r *fasthttp.RequestCtx) { response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed) } r.POST("/upload/{cid}", a.addMiddlewares(handler.Upload)) r.OPTIONS("/upload/{cid}", a.addPreflight()) a.log.Info(logs.AddedPathUploadCid) r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(handler.DownloadByAddressOrBucketName)) r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(handler.HeadByAddressOrBucketName)) r.OPTIONS("/get/{cid}/{oid:*}", a.addPreflight()) a.log.Info(logs.AddedPathGetCidOid) r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(handler.DownloadByAttribute)) r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(handler.HeadByAttribute)) r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight()) a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal) r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(handler.DownloadZipped)) r.GET("/tar/{cid}/{prefix:*}", a.addMiddlewares(handler.DownloadTar)) r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight()) a.log.Info(logs.AddedPathZipCidPrefix) a.webServer.Handler = r.Handler } func (a *app) addMiddlewares(h fasthttp.RequestHandler) fasthttp.RequestHandler { list := []func(fasthttp.RequestHandler) fasthttp.RequestHandler{ a.tracer, a.logger, a.canonicalizer, a.tokenizer, a.reqNamespace, a.cors, } for i := len(list) - 1; i >= 0; i-- { h = list[i](h) } return h } func (a *app) addPreflight() fasthttp.RequestHandler { list := []func(fasthttp.RequestHandler) fasthttp.RequestHandler{ a.tracer, a.logger, a.reqNamespace, } h := a.preflightHandler for i := len(list) - 1; i >= 0; i-- { h = list[i](h) } return h } func (a *app) preflightHandler(c *fasthttp.RequestCtx) { cors := a.settings.CORS() setCORSHeaders(c, cors) } func (a *app) cors(h fasthttp.RequestHandler) fasthttp.RequestHandler { return func(c *fasthttp.RequestCtx) { h(c) code := c.Response.StatusCode() if code >= fasthttp.StatusOK && code < fasthttp.StatusMultipleChoices { cors := a.settings.CORS() setCORSHeaders(c, cors) } } } func setCORSHeaders(c *fasthttp.RequestCtx, cors CORS) { c.Response.Header.Set(fasthttp.HeaderAccessControlMaxAge, strconv.Itoa(cors.MaxAge)) if len(cors.AllowOrigin) != 0 { c.Response.Header.Set(fasthttp.HeaderAccessControlAllowOrigin, cors.AllowOrigin) } if len(cors.AllowMethods) != 0 { c.Response.Header.Set(fasthttp.HeaderAccessControlAllowMethods, strings.Join(cors.AllowMethods, ",")) } if len(cors.AllowHeaders) != 0 { c.Response.Header.Set(fasthttp.HeaderAccessControlAllowHeaders, strings.Join(cors.AllowHeaders, ",")) } if len(cors.ExposeHeaders) != 0 { c.Response.Header.Set(fasthttp.HeaderAccessControlExposeHeaders, strings.Join(cors.ExposeHeaders, ",")) } if cors.AllowCredentials { c.Response.Header.Set(fasthttp.HeaderAccessControlAllowCredentials, "true") } } func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler { return func(req *fasthttp.RequestCtx) { requiredFields := []zap.Field{zap.Uint64("id", req.ID())} reqCtx := utils.GetContextFromRequest(req) if traceID := trace.SpanFromContext(reqCtx).SpanContext().TraceID(); traceID.IsValid() { requiredFields = append(requiredFields, zap.String("trace_id", traceID.String())) } log := a.log.With(requiredFields...) reqCtx = utils.SetReqLog(reqCtx, log) utils.SetContextToRequest(reqCtx, req) fields := []zap.Field{ zap.String("remote", req.RemoteAddr().String()), zap.ByteString("method", req.Method()), zap.ByteString("path", req.Path()), zap.ByteString("query", req.QueryArgs().QueryString()), } log.Info(logs.Request, fields...) h(req) } } func (a *app) canonicalizer(h fasthttp.RequestHandler) fasthttp.RequestHandler { return func(req *fasthttp.RequestCtx) { // regardless of DisableHeaderNamesNormalizing setting, some headers // MUST be normalized in order to process execution. They are normalized // here. toAddKeys := make([][]byte, 0, 10) toAddValues := make([][]byte, 0, 10) prefix := []byte(utils.UserAttributeHeaderPrefix) req.Request.Header.VisitAll(func(k, v []byte) { if bytes.HasPrefix(k, prefix) { return } toAddKeys = append(toAddKeys, k) toAddValues = append(toAddValues, v) }) // this is safe to do after all headers were read into header structure req.Request.Header.EnableNormalizing() for i := range toAddKeys { req.Request.Header.SetBytesKV(toAddKeys[i], toAddValues[i]) } // return normalization setting back req.Request.Header.DisableNormalizing() h(req) } } func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler { return func(req *fasthttp.RequestCtx) { reqCtx := utils.GetContextFromRequest(req) appCtx, err := tokens.StoreBearerTokenAppCtx(reqCtx, req) if err != nil { log := utils.GetReqLogOrDefault(reqCtx, a.log) log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err)) response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest) return } utils.SetContextToRequest(appCtx, req) h(req) } } func (a *app) tracer(h fasthttp.RequestHandler) fasthttp.RequestHandler { return func(req *fasthttp.RequestCtx) { appCtx, span := utils.StartHTTPServerSpan(a.ctx, req, "REQUEST") defer func() { utils.SetHTTPTraceInfo(appCtx, span, req) span.End() }() appCtx = treepool.SetRequestID(appCtx, strconv.FormatUint(req.ID(), 10)) utils.SetContextToRequest(appCtx, req) h(req) } } func (a *app) reqNamespace(h fasthttp.RequestHandler) fasthttp.RequestHandler { return func(req *fasthttp.RequestCtx) { appCtx := utils.GetContextFromRequest(req) nsBytes := req.Request.Header.Peek(a.settings.NamespaceHeader()) appCtx = middleware.SetNamespace(appCtx, string(nsBytes)) utils.SetContextToRequest(appCtx, req) h(req) } } func (a *app) AppParams() *handler.AppParams { return &handler.AppParams{ Logger: a.log, FrostFS: frostfs.NewFrostFS(a.pool), Owner: a.owner, Resolver: a.resolver, Cache: cache.NewBucketCache(getCacheOptions(a.cfg, a.log)), } } func (a *app) initServers(ctx context.Context) { serversInfo := fetchServers(a.cfg, a.log) a.servers = make([]Server, 0, len(serversInfo)) for _, serverInfo := range serversInfo { fields := []zap.Field{ zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled), zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile), } srv, err := newServer(ctx, serverInfo) if err != nil { a.unbindServers = append(a.unbindServers, serverInfo) a.metrics.MarkUnhealthy(serverInfo.Address) a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...) continue } a.metrics.MarkHealthy(serverInfo.Address) a.servers = append(a.servers, srv) a.log.Info(logs.AddServer, fields...) } if len(a.servers) == 0 { a.log.Fatal(logs.NoHealthyServers) } } func (a *app) updateServers() error { serversInfo := fetchServers(a.cfg, a.log) a.mu.Lock() defer a.mu.Unlock() var found bool for _, serverInfo := range serversInfo { ser := a.getServer(serverInfo.Address) if ser != nil { if serverInfo.TLS.Enabled { if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil { return fmt.Errorf("failed to update tls certs: %w", err) } found = true } } else if unbind := a.updateUnbindServerInfo(serverInfo); unbind { found = true } } if !found { return fmt.Errorf("invalid servers configuration: no known server found") } return nil } func (a *app) getServers() []Server { a.mu.RLock() defer a.mu.RUnlock() return a.servers } func (a *app) getServer(address string) Server { for i := range a.servers { if a.servers[i].Address() == address { return a.servers[i] } } return nil } func (a *app) updateUnbindServerInfo(info ServerInfo) bool { for i := range a.unbindServers { if a.unbindServers[i].Address == info.Address { a.unbindServers[i] = info return true } } return false } func (a *app) initTracing(ctx context.Context) { instanceID := "" if len(a.servers) > 0 { instanceID = a.servers[0].Address() } cfg := tracing.Config{ Enabled: a.cfg.GetBool(cfgTracingEnabled), Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)), Endpoint: a.cfg.GetString(cfgTracingEndpoint), Service: "frostfs-http-gw", InstanceID: instanceID, Version: Version, } if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" { caBytes, err := os.ReadFile(trustedCa) if err != nil { a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err)) return } certPool := x509.NewCertPool() ok := certPool.AppendCertsFromPEM(caBytes) if !ok { a.log.Warn(logs.FailedToInitializeTracing, zap.String("error", "can't fill cert pool by ca cert")) return } cfg.ServerCaCertPool = certPool } attributes, err := fetchTracingAttributes(a.cfg) if err != nil { a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err)) return } cfg.Attributes = attributes updated, err := tracing.Setup(ctx, cfg) if err != nil { a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err)) } if updated { a.log.Info(logs.TracingConfigUpdated) } } func (a *app) setRuntimeParameters() { if len(os.Getenv("GOMEMLIMIT")) != 0 { // default limit < yaml limit < app env limit < GOMEMLIMIT a.log.Warn(logs.RuntimeSoftMemoryDefinedWithGOMEMLIMIT) return } softMemoryLimit := fetchSoftMemoryLimit(a.cfg) previous := debug.SetMemoryLimit(softMemoryLimit) if softMemoryLimit != previous { a.log.Info(logs.RuntimeSoftMemoryLimitUpdated, zap.Int64("new_value", softMemoryLimit), zap.Int64("old_value", previous)) } } func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) { go func() { t := time.NewTicker(a.settings.reconnectInterval) defer t.Stop() for { select { case <-t.C: if a.tryReconnect(ctx, srv) { return } t.Reset(a.settings.reconnectInterval) case <-ctx.Done(): return } } }() } func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool { a.mu.Lock() defer a.mu.Unlock() a.log.Info(logs.ServerReconnecting) var failedServers []ServerInfo for _, serverInfo := range a.unbindServers { fields := []zap.Field{ zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled), zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile), } srv, err := newServer(ctx, serverInfo) if err != nil { a.log.Warn(logs.ServerReconnectFailed, zap.Error(err)) failedServers = append(failedServers, serverInfo) a.metrics.MarkUnhealthy(serverInfo.Address) continue } go func() { a.log.Info(logs.StartingServer, zap.String("address", srv.Address())) a.metrics.MarkHealthy(serverInfo.Address) if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) { a.log.Warn(logs.ListenAndServe, zap.Error(err)) a.metrics.MarkUnhealthy(serverInfo.Address) } }() a.servers = append(a.servers, srv) a.log.Info(logs.ServerReconnectedSuccessfully, fields...) } a.unbindServers = failedServers return len(a.unbindServers) == 0 }