package main import ( "context" "encoding/hex" "encoding/json" "fmt" "net/http" "os" "os/signal" "sync" "sync/atomic" "syscall" "time" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/services" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/wallet" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/xml" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/spf13/viper" "go.uber.org/zap" "google.golang.org/grpc" ) type ( // App is the main application structure. App struct { ctr auth.Center log *zap.Logger cfg *viper.Viper pool *pool.Pool treePool *treepool.Pool key *keys.PrivateKey nc *notifications.Controller obj layer.Client api api.Handler servers []Server metrics *metrics.AppMetrics bucketResolver *resolver.BucketResolver services []*Service settings *appSettings webDone chan struct{} wrkDone chan struct{} } appSettings struct { logLevel zap.AtomicLevel policies *placementPolicy xmlDecoder *xml.DecoderProvider maxClient maxClientsConfig bypassContentEncodingInChunks atomic.Bool clientCut atomic.Bool } maxClientsConfig struct { deadline time.Duration count int } Logger struct { logger *zap.Logger lvl zap.AtomicLevel } placementPolicy struct { mu sync.RWMutex defaultPolicy netmap.PlacementPolicy regionMap map[string]netmap.PlacementPolicy copiesNumbers map[string][]uint32 defaultCopiesNumbers []uint32 } ) func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App { objPool, treePool, key := getPools(ctx, log.logger, v) // prepare auth center ctr := auth.New(frostfs.NewAuthmateFrostFS(objPool, key), key, v.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), getAccessBoxCacheConfig(v, log.logger)) app := &App{ ctr: ctr, log: log.logger, cfg: v, pool: objPool, treePool: treePool, key: key, webDone: make(chan struct{}, 1), wrkDone: make(chan struct{}, 1), settings: newAppSettings(log, v), } app.init(ctx) return app } func (a *App) init(ctx context.Context) { a.initAPI(ctx) a.initMetrics() a.initServers(ctx) a.initTracing(ctx) } func (a *App) initLayer(ctx context.Context) { a.initResolver() // prepare random key for anonymous requests randomKey, err := keys.NewPrivateKey() if err != nil { a.log.Fatal("couldn't generate random key", zap.Error(err)) } var gateOwner user.ID user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey) layerCfg := &layer.Config{ Caches: getCacheOptions(a.cfg, a.log), AnonKey: layer.AnonymousKey{ Key: randomKey, }, GateOwner: gateOwner, Resolver: a.bucketResolver, TreeService: tree.NewTree(services.NewPoolWrapper(a.treePool), a.log), Features: a.settings, } // prepare object layer a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg) if a.cfg.GetBool(cfgEnableNATS) { nopts := getNotificationsOptions(a.cfg, a.log) a.nc, err = notifications.NewController(nopts, a.log) if err != nil { a.log.Fatal("failed to enable notifications", zap.Error(err)) } if err = a.obj.Initialize(ctx, a.nc); err != nil { a.log.Fatal("couldn't initialize layer", zap.Error(err)) } } } func newAppSettings(log *Logger, v *viper.Viper) *appSettings { policies, err := newPlacementPolicy(log.logger, v) if err != nil { log.logger.Fatal("failed to create new policy mapping", zap.Error(err)) } settings := &appSettings{ logLevel: log.lvl, policies: policies, xmlDecoder: xml.NewDecoderProvider(v.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload)), maxClient: newMaxClients(v), } settings.setBypassContentEncodingInChunks(v.GetBool(cfgKludgeBypassContentEncodingCheckInChunks)) settings.setClientCut(v.GetBool(cfgClientCut)) return settings } func (s *appSettings) BypassContentEncodingInChunks() bool { return s.bypassContentEncodingInChunks.Load() } func (s *appSettings) setBypassContentEncodingInChunks(bypass bool) { s.bypassContentEncodingInChunks.Store(bypass) } func (s *appSettings) ClientCut() bool { return s.clientCut.Load() } func (s *appSettings) setClientCut(clientCut bool) { s.clientCut.Store(clientCut) } func (a *App) initAPI(ctx context.Context) { a.initLayer(ctx) a.initHandler() } func (a *App) initMetrics() { a.metrics = metrics.NewAppMetrics(a.log, frostfs.NewPoolStatistic(a.pool), a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics.State().SetHealth(metrics.HealthStatusStarting) } func (a *App) initResolver() { var err error a.bucketResolver, err = resolver.NewBucketResolver(a.getResolverConfig()) if err != nil { a.log.Fatal("failed to create resolver", zap.Error(err)) } } func (a *App) getResolverConfig() ([]string, *resolver.Config) { resolveCfg := &resolver.Config{ FrostFS: frostfs.NewResolverFrostFS(a.pool), RPCAddress: a.cfg.GetString(cfgRPCEndpoint), } order := a.cfg.GetStringSlice(cfgResolveOrder) if resolveCfg.RPCAddress == "" { order = remove(order, resolver.NNSResolver) a.log.Warn(fmt.Sprintf("resolver '%s' won't be used since '%s' isn't provided", resolver.NNSResolver, cfgRPCEndpoint)) } if len(order) == 0 { a.log.Info("container resolver will be disabled because of resolvers 'resolver_order' is empty") } return order, resolveCfg } func (a *App) initTracing(ctx context.Context) { instanceID := "" if len(a.servers) > 0 { instanceID = a.servers[0].Address() } cfg := tracing.Config{ Enabled: a.cfg.GetBool(cfgTracingEnabled), Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)), Endpoint: a.cfg.GetString(cfgTracingEndpoint), Service: "frostfs-s3-gw", InstanceID: instanceID, Version: version.Version, } updated, err := tracing.Setup(ctx, cfg) if err != nil { a.log.Warn("failed to initialize tracing", zap.Error(err)) } if updated { a.log.Info("tracing config updated") } } func (a *App) shutdownTracing() { const tracingShutdownTimeout = 5 * time.Second shdnCtx, cancel := context.WithTimeout(context.Background(), tracingShutdownTimeout) defer cancel() if err := tracing.Shutdown(shdnCtx); err != nil { a.log.Warn("failed to shutdown tracing", zap.Error(err)) } } func newMaxClients(cfg *viper.Viper) maxClientsConfig { config := maxClientsConfig{} config.count = fetchMaxClientsCount(cfg) config.deadline = fetchMaxClientsDeadline(cfg) return config } func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) { var prm pool.InitParameters var prmTree treepool.InitParameters password := wallet.GetPassword(cfg, cfgWalletPassphrase) key, err := wallet.GetKeyFromPath(cfg.GetString(cfgWalletPath), cfg.GetString(cfgWalletAddress), password) if err != nil { logger.Fatal("could not load FrostFS private key", zap.Error(err)) } prm.SetKey(&key.PrivateKey) prmTree.SetKey(key) logger.Info("using credentials", zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes()))) for _, peer := range fetchPeers(logger, cfg) { prm.AddNode(peer) prmTree.AddNode(peer) } connTimeout := fetchConnectTimeout(cfg) prm.SetNodeDialTimeout(connTimeout) prmTree.SetNodeDialTimeout(connTimeout) streamTimeout := fetchStreamTimeout(cfg) prm.SetNodeStreamTimeout(streamTimeout) prmTree.SetNodeStreamTimeout(streamTimeout) healthCheckTimeout := fetchHealthCheckTimeout(cfg) prm.SetHealthcheckTimeout(healthCheckTimeout) prmTree.SetHealthcheckTimeout(healthCheckTimeout) rebalanceInterval := fetchRebalanceInterval(cfg) prm.SetClientRebalanceInterval(rebalanceInterval) prmTree.SetClientRebalanceInterval(rebalanceInterval) errorThreshold := fetchErrorThreshold(cfg) prm.SetErrorThreshold(errorThreshold) prm.SetLogger(logger) prmTree.SetLogger(logger) var apiGRPCDialOpts []grpc.DialOption var treeGRPCDialOpts []grpc.DialOption if cfg.GetBool(cfgTracingEnabled) { interceptors := []grpc.DialOption{ grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()), grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()), } treeGRPCDialOpts = append(treeGRPCDialOpts, interceptors...) apiGRPCDialOpts = append(apiGRPCDialOpts, interceptors...) } prm.SetGRPCDialOptions(apiGRPCDialOpts...) prmTree.SetGRPCDialOptions(treeGRPCDialOpts...) p, err := pool.NewPool(prm) if err != nil { logger.Fatal("failed to create connection pool", zap.Error(err)) } if err = p.Dial(ctx); err != nil { logger.Fatal("failed to dial connection pool", zap.Error(err)) } treePool, err := treepool.NewPool(prmTree) if err != nil { logger.Fatal("failed to create tree pool", zap.Error(err)) } if err = treePool.Dial(ctx); err != nil { logger.Fatal("failed to dial tree pool", zap.Error(err)) } return p, treePool, key } func newPlacementPolicy(l *zap.Logger, v *viper.Viper) (*placementPolicy, error) { policies := &placementPolicy{ regionMap: make(map[string]netmap.PlacementPolicy), defaultCopiesNumbers: []uint32{handler.DefaultCopiesNumber}, } policies.updateCopiesNumbers(l, v) policies.updateDefaultCopiesNumbers(l, v) return policies, policies.updatePolicy(v) } func (p *placementPolicy) DefaultPlacementPolicy() netmap.PlacementPolicy { p.mu.RLock() defer p.mu.RUnlock() return p.defaultPolicy } func (p *placementPolicy) PlacementPolicy(name string) (netmap.PlacementPolicy, bool) { p.mu.RLock() policy, ok := p.regionMap[name] p.mu.RUnlock() return policy, ok } func (p *placementPolicy) CopiesNumbers(locationConstraint string) ([]uint32, bool) { p.mu.RLock() copiesNumbers, ok := p.copiesNumbers[locationConstraint] p.mu.RUnlock() return copiesNumbers, ok } func (p *placementPolicy) DefaultCopiesNumbers() []uint32 { p.mu.RLock() defer p.mu.RUnlock() return p.defaultCopiesNumbers } func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) { if err := p.updatePolicy(v); err != nil { l.Warn("policies won't be updated", zap.Error(err)) } p.updateCopiesNumbers(l, v) p.updateDefaultCopiesNumbers(l, v) } func (p *placementPolicy) updatePolicy(v *viper.Viper) error { defaultPlacementPolicy, err := fetchDefaultPolicy(v) if err != nil { return err } regionMap, err := fetchRegionMappingPolicies(v) if err != nil { return err } p.mu.Lock() p.defaultPolicy = defaultPlacementPolicy p.regionMap = regionMap p.mu.Unlock() return nil } func (p *placementPolicy) updateCopiesNumbers(l *zap.Logger, v *viper.Viper) { if newCopiesNumbers, err := fetchCopiesNumbers(l, v); err != nil { l.Warn("copies numbers won't be updated", zap.Error(err)) } else { p.mu.Lock() p.copiesNumbers = newCopiesNumbers p.mu.Unlock() } } func (p *placementPolicy) updateDefaultCopiesNumbers(l *zap.Logger, v *viper.Viper) { configuredValues, err := fetchDefaultCopiesNumbers(v) if err == nil { p.mu.Lock() p.defaultCopiesNumbers = configuredValues p.mu.Unlock() l.Info("default copies numbers", zap.Uint32s("vector", p.defaultCopiesNumbers)) return } l.Error("cannot parse default copies numbers", zap.Error(err)) l.Warn("default copies numbers won't be updated", zap.Uint32s("current value", p.DefaultCopiesNumbers())) } func remove(list []string, element string) []string { for i, item := range list { if item == element { return append(list[:i], list[i+1:]...) } } return list } // Wait waits for an application to finish. // // Pre-logs a message about the launch of the application mentioning its // version (version.Version) and its name (frostfs-s3-gw). At the end, it writes // about the stop to the log. func (a *App) Wait() { a.log.Info("application started", zap.String("name", "frostfs-s3-gw"), zap.String("version", version.Version), ) a.metrics.State().SetVersion(version.Version) a.setHealthStatus() <-a.webDone // wait for web-server to be stopped a.log.Info("application finished") } func (a *App) setHealthStatus() { a.metrics.State().SetHealth(metrics.HealthStatusReady) } // Serve runs HTTP server to handle S3 API requests. func (a *App) Serve(ctx context.Context) { // Attach S3 API: domains := a.cfg.GetStringSlice(cfgListenDomains) a.log.Info("fetch domains, prepare to use API", zap.Strings("domains", domains)) throttleOps := middleware.ThrottleOpts{ Limit: a.settings.maxClient.count, BacklogTimeout: a.settings.maxClient.deadline, } chiRouter := chi.NewRouter() api.AttachChi(chiRouter, domains, throttleOps, a.api, a.ctr, a.log, a.metrics) // Use mux.Router as http.Handler srv := new(http.Server) srv.Handler = chiRouter srv.ErrorLog = zap.NewStdLog(a.log) a.startServices() for i := range a.servers { go func(i int) { a.log.Info("starting server", zap.String("address", a.servers[i].Address())) if err := srv.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed { a.log.Fatal("listen and serve", zap.Error(err)) } }(i) } sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGHUP) LOOP: for { select { case <-ctx.Done(): break LOOP case <-sigs: a.configReload(ctx) } } ctx, cancel := shutdownContext() defer cancel() a.log.Info("stopping server", zap.Error(srv.Shutdown(ctx))) a.metrics.Shutdown() a.stopServices() a.shutdownTracing() close(a.webDone) } func shutdownContext() (context.Context, context.CancelFunc) { return context.WithTimeout(context.Background(), defaultShutdownTimeout) } func (a *App) configReload(ctx context.Context) { a.log.Info("SIGHUP config reload started") if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) { a.log.Warn("failed to reload config because it's missed") return } if err := readInConfig(a.cfg); err != nil { a.log.Warn("failed to reload config", zap.Error(err)) return } if err := a.bucketResolver.UpdateResolvers(a.getResolverConfig()); err != nil { a.log.Warn("failed to reload resolvers", zap.Error(err)) } if err := a.updateServers(); err != nil { a.log.Warn("failed to reload server parameters", zap.Error(err)) } a.stopServices() a.startServices() a.updateSettings() a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled)) a.initTracing(ctx) a.setHealthStatus() a.log.Info("SIGHUP config reload completed") } func (a *App) updateSettings() { if lvl, err := getLogLevel(a.cfg); err != nil { a.log.Warn("log level won't be updated", zap.Error(err)) } else { a.settings.logLevel.SetLevel(lvl) } a.settings.policies.update(a.log, a.cfg) a.settings.xmlDecoder.UseDefaultNamespaceForCompleteMultipart(a.cfg.GetBool(cfgKludgeUseDefaultXMLNSForCompleteMultipartUpload)) a.settings.setBypassContentEncodingInChunks(a.cfg.GetBool(cfgKludgeBypassContentEncodingCheckInChunks)) a.settings.setClientCut(a.cfg.GetBool(cfgClientCut)) } func (a *App) startServices() { a.services = a.services[:0] pprofService := NewPprofService(a.cfg, a.log) a.services = append(a.services, pprofService) go pprofService.Start() prometheusService := NewPrometheusService(a.cfg, a.log, a.metrics.Handler()) a.services = append(a.services, prometheusService) go prometheusService.Start() } func (a *App) initServers(ctx context.Context) { serversInfo := fetchServers(a.cfg) a.servers = make([]Server, 0, len(serversInfo)) for _, serverInfo := range serversInfo { fields := []zap.Field{ zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled), zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile), } srv, err := newServer(ctx, serverInfo) if err != nil { a.log.Warn("failed to add server", append(fields, zap.Error(err))...) continue } a.servers = append(a.servers, srv) a.log.Info("add server", fields...) } if len(a.servers) == 0 { a.log.Fatal("no healthy servers") } } func (a *App) updateServers() error { serversInfo := fetchServers(a.cfg) var found bool for _, serverInfo := range serversInfo { index := a.serverIndex(serverInfo.Address) if index == -1 { continue } if serverInfo.TLS.Enabled { if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil { return fmt.Errorf("failed to update tls certs: %w", err) } } found = true } if !found { return fmt.Errorf("invalid servers configuration: no known server found") } return nil } func (a *App) serverIndex(address string) int { for i := range a.servers { if a.servers[i].Address() == address { return i } } return -1 } func (a *App) stopServices() { ctx, cancel := shutdownContext() defer cancel() for _, svc := range a.services { svc.ShutDown(ctx) } } func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options { cfg := notifications.Options{} cfg.URL = v.GetString(cfgNATSEndpoint) cfg.Timeout = fetchNATSTimeout(v, l) cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile) cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile) cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles) return &cfg } func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig { cacheCfg := layer.DefaultCachesConfigs(l) cacheCfg.Objects.Lifetime = fetchCacheLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime) cacheCfg.Objects.Size = fetchCacheSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size) cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime) cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size) cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime) cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size) cacheCfg.Names.Lifetime = fetchCacheLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime) cacheCfg.Names.Size = fetchCacheSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size) cacheCfg.System.Lifetime = fetchCacheLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime) cacheCfg.System.Size = fetchCacheSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size) cacheCfg.AccessControl.Lifetime = fetchCacheLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime) cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size) return cacheCfg } func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config { cacheCfg := cache.DefaultAccessBoxConfig(l) cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime) cacheCfg.Size = fetchCacheSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size) return cacheCfg } func (a *App) initHandler() { cfg := &handler.Config{ Policy: a.settings.policies, DefaultMaxAge: fetchDefaultMaxAge(a.cfg, a.log), NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS), XMLDecoder: a.settings.xmlDecoder, } cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketAllow) cfg.IsResolveListAllow = len(cfg.ResolveZoneList) > 0 if !cfg.IsResolveListAllow { cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketDeny) } cfg.CompleteMultipartKeepalive = a.cfg.GetDuration(cfgKludgeCompleteMultipartUploadKeepalive) cfg.Kludge = a.settings var err error a.api, err = handler.New(a.log, a.obj, a.nc, cfg) if err != nil { a.log.Fatal("could not initialize API handler", zap.Error(err)) } } func readRegionMap(filePath string) (map[string]string, error) { regionMap := make(map[string]string) if filePath == "" { return regionMap, nil } data, err := os.ReadFile(filePath) if err != nil { return nil, fmt.Errorf("coudln't read file '%s'", filePath) } if err = json.Unmarshal(data, ®ionMap); err != nil { return nil, fmt.Errorf("unmarshal policies: %w", err) } if _, ok := regionMap[api.DefaultLocationConstraint]; ok { return nil, fmt.Errorf("config overrides %s location constraint", api.DefaultLocationConstraint) } return regionMap, nil }