feature/fix_sighup #199

Merged
alexvanin merged 4 commits from mbiryukova/frostfs-http-gw:feature/fix_sighup into master 2025-01-29 13:19:14 +00:00
7 changed files with 280 additions and 139 deletions

View file

@ -7,6 +7,11 @@ This document outlines major changes between releases.
### Added ### Added
- Add handling quota limit reached error (#187) - Add handling quota limit reached error (#187)
## [0.32.1] - 2025-01-27
### Fixed
- SIGHUP panic (#198)
## [0.32.0] - Khumbu - 2024-12-20 ## [0.32.0] - Khumbu - 2024-12-20
### Fixed ### Fixed
@ -187,4 +192,5 @@ To see CHANGELOG for older versions, refer to https://github.com/nspcc-dev/neofs
[0.30.3]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.30.2...v0.30.3 [0.30.3]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.30.2...v0.30.3
[0.31.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.30.3...v0.31.0 [0.31.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.30.3...v0.31.0
[0.32.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.31.0...v0.32.0 [0.32.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.31.0...v0.32.0
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.32.0...master [0.32.1]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.32.0...v0.32.1
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.32.1...master

View file

@ -1 +1 @@
v0.32.0 v0.32.1

View file

@ -56,7 +56,7 @@ type (
treePool *treepool.Pool treePool *treepool.Pool
key *keys.PrivateKey key *keys.PrivateKey
owner *user.ID owner *user.ID
cfg *viper.Viper cfg *appCfg
webServer *fasthttp.Server webServer *fasthttp.Server
webDone chan struct{} webDone chan struct{}
resolver *resolver.ContainerResolver resolver *resolver.ContainerResolver
@ -123,34 +123,35 @@ type (
} }
) )
func newApp(ctx context.Context, v *viper.Viper) App { func newApp(ctx context.Context, cfg *appCfg) App {
logSettings := &loggerSettings{} logSettings := &loggerSettings{}
log := pickLogger(v, logSettings) log := pickLogger(cfg.config(), logSettings)
a := &app{ a := &app{
ctx: ctx, ctx: ctx,
log: log.logger, log: log.logger,
cfg: v, logLevel: log.lvl,
cfg: cfg,
loggerSettings: logSettings, loggerSettings: logSettings,
webServer: new(fasthttp.Server), webServer: new(fasthttp.Server),
webDone: make(chan struct{}), webDone: make(chan struct{}),
bucketCache: cache.NewBucketCache(getBucketCacheOptions(v, log.logger), v.GetBool(cfgFeaturesTreePoolNetmapSupport)), bucketCache: cache.NewBucketCache(getBucketCacheOptions(cfg.config(), log.logger), cfg.config().GetBool(cfgFeaturesTreePoolNetmapSupport)),
} }
a.initAppSettings() a.initAppSettings()
// -- setup FastHTTP server -- // -- setup FastHTTP server --
a.webServer.Name = "frost-http-gw" a.webServer.Name = "frost-http-gw"
a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize) a.webServer.ReadBufferSize = a.config().GetInt(cfgWebReadBufferSize)
a.webServer.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize) a.webServer.WriteBufferSize = a.config().GetInt(cfgWebWriteBufferSize)
a.webServer.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout) a.webServer.ReadTimeout = a.config().GetDuration(cfgWebReadTimeout)
a.webServer.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout) a.webServer.WriteTimeout = a.config().GetDuration(cfgWebWriteTimeout)
a.webServer.DisableHeaderNamesNormalizing = true a.webServer.DisableHeaderNamesNormalizing = true
a.webServer.NoDefaultServerHeader = true a.webServer.NoDefaultServerHeader = true
a.webServer.NoDefaultContentType = true a.webServer.NoDefaultContentType = true
a.webServer.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize) a.webServer.MaxRequestBodySize = a.config().GetInt(cfgWebMaxRequestBodySize)
a.webServer.DisablePreParseMultipartForm = true a.webServer.DisablePreParseMultipartForm = true
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody) a.webServer.StreamRequestBody = a.config().GetBool(cfgWebStreamRequestBody)
// -- -- -- -- -- -- -- -- -- -- -- -- -- -- // -- -- -- -- -- -- -- -- -- -- -- -- -- --
a.initPools(ctx) a.initPools(ctx)
@ -167,13 +168,17 @@ func newApp(ctx context.Context, v *viper.Viper) App {
return a return a
} }
func (a *app) config() *viper.Viper {
return a.cfg.config()
}
func (a *app) initAppSettings() { func (a *app) initAppSettings() {
a.settings = &appSettings{ a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg), reconnectInterval: fetchReconnectInterval(a.config()),
dialerSource: getDialerSource(a.log, a.cfg), dialerSource: getDialerSource(a.log, a.config()),
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize), workerPoolSize: a.config().GetInt(cfgWorkerPoolSize),
} }
a.settings.update(a.cfg, a.log) a.settings.update(a.config(), a.log)
} }
func (s *appSettings) update(v *viper.Viper, l *zap.Logger) { func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
@ -326,11 +331,11 @@ func (a *app) initResolver() {
func (a *app) getResolverConfig() ([]string, *resolver.Config) { func (a *app) getResolverConfig() ([]string, *resolver.Config) {
resolveCfg := &resolver.Config{ resolveCfg := &resolver.Config{
FrostFS: frostfs.NewResolverFrostFS(a.pool), FrostFS: frostfs.NewResolverFrostFS(a.pool),
RPCAddress: a.cfg.GetString(cfgRPCEndpoint), RPCAddress: a.config().GetString(cfgRPCEndpoint),
Settings: a.settings, Settings: a.settings,
} }
order := a.cfg.GetStringSlice(cfgResolveOrder) order := a.config().GetStringSlice(cfgResolveOrder)
if resolveCfg.RPCAddress == "" { if resolveCfg.RPCAddress == "" {
order = remove(order, resolver.NNSResolver) order = remove(order, resolver.NNSResolver)
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided) a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
@ -345,7 +350,7 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
func (a *app) initMetrics() { func (a *app) initMetrics() {
gateMetricsProvider := metrics.NewGateMetrics(a.pool) gateMetricsProvider := metrics.NewGateMetrics(a.pool)
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.config().GetBool(cfgPrometheusEnabled))
a.metrics.SetHealth(metrics.HealthStatusStarting) a.metrics.SetHealth(metrics.HealthStatusStarting)
a.loggerSettings.setMetrics(a.metrics.provider) a.loggerSettings.setMetrics(a.metrics.provider)
} }
@ -573,22 +578,22 @@ func (a *app) shutdownTracing() {
func (a *app) configReload(ctx context.Context) { func (a *app) configReload(ctx context.Context) {
a.log.Info(logs.SIGHUPConfigReloadStarted) a.log.Info(logs.SIGHUPConfigReloadStarted)
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) { if !a.config().IsSet(cmdConfig) && !a.config().IsSet(cmdConfigDir) {
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed) a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
return return
} }
if err := readInConfig(a.cfg); err != nil { if err := a.cfg.reload(); err != nil {
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err)) a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
return return
} }
if lvl, err := getLogLevel(a.cfg); err != nil { if lvl, err := getLogLevel(a.config()); err != nil {
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err)) a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
} else { } else {
a.logLevel.SetLevel(lvl) a.logLevel.SetLevel(lvl)
} }
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil { if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.config(), a.log)); err != nil {
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err)) a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
} }
@ -605,9 +610,9 @@ func (a *app) configReload(ctx context.Context) {
a.stopServices() a.stopServices()
a.startServices() a.startServices()
a.settings.update(a.cfg, a.log) a.settings.update(a.config(), a.log)
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled)) a.metrics.SetEnabled(a.config().GetBool(cfgPrometheusEnabled))
a.initTracing(ctx) a.initTracing(ctx)
a.setHealthStatus() a.setHealthStatus()
@ -615,12 +620,14 @@ func (a *app) configReload(ctx context.Context) {
} }
func (a *app) startServices() { func (a *app) startServices() {
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)} a.services = a.services[:0]
pprofConfig := metrics.Config{Enabled: a.config().GetBool(cfgPprofEnabled), Address: a.config().GetString(cfgPprofAddress)}
pprofService := metrics.NewPprofService(a.log, pprofConfig) pprofService := metrics.NewPprofService(a.log, pprofConfig)
a.services = append(a.services, pprofService) a.services = append(a.services, pprofService)
go pprofService.Start() go pprofService.Start()
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)} prometheusConfig := metrics.Config{Enabled: a.config().GetBool(cfgPrometheusEnabled), Address: a.config().GetString(cfgPrometheusAddress)}
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig) prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
a.services = append(a.services, prometheusService) a.services = append(a.services, prometheusService)
go prometheusService.Start() go prometheusService.Start()
@ -847,7 +854,7 @@ func (a *app) AppParams() *handler.AppParams {
} }
func (a *app) initServers(ctx context.Context) { func (a *app) initServers(ctx context.Context) {
serversInfo := fetchServers(a.cfg, a.log) serversInfo := fetchServers(a.config(), a.log)
a.servers = make([]Server, 0, len(serversInfo)) a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo { for _, serverInfo := range serversInfo {
@ -874,7 +881,7 @@ func (a *app) initServers(ctx context.Context) {
} }
func (a *app) updateServers() error { func (a *app) updateServers() error {
serversInfo := fetchServers(a.cfg, a.log) serversInfo := fetchServers(a.config(), a.log)
a.mu.Lock() a.mu.Lock()
defer a.mu.Unlock() defer a.mu.Unlock()
@ -932,15 +939,15 @@ func (a *app) initTracing(ctx context.Context) {
instanceID = a.servers[0].Address() instanceID = a.servers[0].Address()
} }
cfg := tracing.Config{ cfg := tracing.Config{
Enabled: a.cfg.GetBool(cfgTracingEnabled), Enabled: a.config().GetBool(cfgTracingEnabled),
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)), Exporter: tracing.Exporter(a.config().GetString(cfgTracingExporter)),
Endpoint: a.cfg.GetString(cfgTracingEndpoint), Endpoint: a.config().GetString(cfgTracingEndpoint),
Service: "frostfs-http-gw", Service: "frostfs-http-gw",
InstanceID: instanceID, InstanceID: instanceID,
Version: Version, Version: Version,
} }
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" { if trustedCa := a.config().GetString(cfgTracingTrustedCa); trustedCa != "" {
caBytes, err := os.ReadFile(trustedCa) caBytes, err := os.ReadFile(trustedCa)
if err != nil { if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err)) a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
@ -955,7 +962,7 @@ func (a *app) initTracing(ctx context.Context) {
cfg.ServerCaCertPool = certPool cfg.ServerCaCertPool = certPool
} }
attributes, err := fetchTracingAttributes(a.cfg) attributes, err := fetchTracingAttributes(a.config())
if err != nil { if err != nil {
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err)) a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
return return
@ -978,7 +985,7 @@ func (a *app) setRuntimeParameters() {
return return
} }
softMemoryLimit := fetchSoftMemoryLimit(a.cfg) softMemoryLimit := fetchSoftMemoryLimit(a.config())
previous := debug.SetMemoryLimit(softMemoryLimit) previous := debug.SetMemoryLimit(softMemoryLimit)
if softMemoryLimit != previous { if softMemoryLimit != previous {
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated, a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,

View file

@ -32,7 +32,6 @@ import (
docker "github.com/docker/docker/api/types/container" docker "github.com/docker/docker/api/types/container"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/viper"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait" "github.com/testcontainers/testcontainers-go/wait"
@ -108,8 +107,8 @@ func runServer(pathToWallet string) (App, context.CancelFunc) {
cancelCtx, cancel := context.WithCancel(context.Background()) cancelCtx, cancel := context.WithCancel(context.Background())
v := getDefaultConfig() v := getDefaultConfig()
v.Set(cfgWalletPath, pathToWallet) v.config().Set(cfgWalletPath, pathToWallet)
v.Set(cfgWalletPassphrase, "") v.config().Set(cfgWalletPassphrase, "")
application := newApp(cancelCtx, v) application := newApp(cancelCtx, v)
go application.Serve() go application.Serve()
@ -452,14 +451,14 @@ func createDockerContainer(ctx context.Context, t *testing.T, image string) test
return aioC return aioC
} }
func getDefaultConfig() *viper.Viper { func getDefaultConfig() *appCfg {
v := settings() v := settings()
v.SetDefault(cfgPeers+".0.address", "localhost:8080") v.config().SetDefault(cfgPeers+".0.address", "localhost:8080")
v.SetDefault(cfgPeers+".0.weight", 1) v.config().SetDefault(cfgPeers+".0.weight", 1)
v.SetDefault(cfgPeers+".0.priority", 1) v.config().SetDefault(cfgPeers+".0.priority", 1)
v.SetDefault(cfgRPCEndpoint, "http://localhost:30333") v.config().SetDefault(cfgRPCEndpoint, "http://localhost:30333")
v.SetDefault("server.0.address", testListenAddress) v.config().SetDefault("server.0.address", testListenAddress)
return v return v
} }

View file

@ -8,9 +8,9 @@ import (
func main() { func main() {
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
v := settings() cfg := settings()
application := newApp(globalContext, v) application := newApp(globalContext, cfg)
go application.Serve() go application.Serve()
application.Wait() application.Wait()
} }

View file

@ -12,6 +12,7 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
@ -197,14 +198,72 @@ type Logger struct {
lvl zap.AtomicLevel lvl zap.AtomicLevel
} }
func settings() *viper.Viper { type appCfg struct {
flags *pflag.FlagSet
mu sync.RWMutex
settings *viper.Viper
}
func (a *appCfg) reload() error {
old := a.config()
v, err := newViper(a.flags)
if err != nil {
return err
}
if old.IsSet(cmdConfig) {
v.Set(cmdConfig, old.Get(cmdConfig))
}
if old.IsSet(cmdConfigDir) {
v.Set(cmdConfigDir, old.Get(cmdConfigDir))
}
if err = readInConfig(v); err != nil {
return err
}
a.setConfig(v)
return nil
}
func (a *appCfg) config() *viper.Viper {
a.mu.RLock()
defer a.mu.RUnlock()
return a.settings
}
func (a *appCfg) setConfig(v *viper.Viper) {
a.mu.Lock()
a.settings = v
a.mu.Unlock()
}
func newViper(flags *pflag.FlagSet) (*viper.Viper, error) {
v := viper.New() v := viper.New()
v.AutomaticEnv() v.AutomaticEnv()
v.SetEnvPrefix(Prefix) v.SetEnvPrefix(Prefix)
v.AllowEmptyEnv(true) v.AllowEmptyEnv(true)
v.SetConfigType("yaml") v.SetConfigType("yaml")
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
if err := bindFlags(v, flags); err != nil {
return nil, err
}
setDefaults(v, flags)
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
}
return v, nil
}
func settings() *appCfg {
// flags setup: // flags setup:
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError) flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
flags.SetOutput(os.Stdout) flags.SetOutput(os.Stdout)
@ -228,89 +287,17 @@ func settings() *viper.Viper {
flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen") flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen")
flags.String(cfgTLSCertFile, "", "TLS certificate path") flags.String(cfgTLSCertFile, "", "TLS certificate path")
flags.String(cfgTLSKeyFile, "", "TLS key path") flags.String(cfgTLSKeyFile, "", "TLS key path")
peers := flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes") flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes")
resolveMethods := flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order") flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order")
// set defaults:
// logger:
v.SetDefault(cfgLoggerLevel, "debug")
v.SetDefault(cfgLoggerDestination, "stdout")
v.SetDefault(cfgLoggerSamplingEnabled, false)
v.SetDefault(cfgLoggerSamplingThereafter, 100)
v.SetDefault(cfgLoggerSamplingInitial, 100)
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
// pool:
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
// frostfs:
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
// web-server:
v.SetDefault(cfgWebReadBufferSize, 4096)
v.SetDefault(cfgWebWriteBufferSize, 4096)
v.SetDefault(cfgWebReadTimeout, time.Minute*10)
v.SetDefault(cfgWebWriteTimeout, time.Minute*5)
v.SetDefault(cfgWebStreamRequestBody, true)
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
v.SetDefault(cfgWorkerPoolSize, 1000)
// upload header
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
// metrics
v.SetDefault(cfgPprofAddress, "localhost:8083")
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
// resolve bucket
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
// multinet
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
// Binding flags
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
panic(err)
}
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
panic(err)
}
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
panic(err)
}
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
panic(err)
}
if err := v.BindPFlags(flags); err != nil {
panic(err)
}
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
panic(err)
}
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
panic(err)
}
if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil {
panic(err)
}
if err := flags.Parse(os.Args); err != nil { if err := flags.Parse(os.Args); err != nil {
panic(err) panic(err)
} }
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) { v, err := newViper(flags)
v.Set(cfgServer+".0."+cfgTLSEnabled, true) if err != nil {
} panic(fmt.Errorf("bind flags: %w", err))
if resolveMethods != nil {
v.SetDefault(cfgResolveOrder, *resolveMethods)
} }
switch { switch {
@ -355,15 +342,97 @@ func settings() *viper.Viper {
panic(err) panic(err)
} }
if peers != nil && len(*peers) > 0 { return &appCfg{
for i := range *peers { flags: flags,
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", (*peers)[i]) settings: v,
}
}
func setDefaults(v *viper.Viper, flags *pflag.FlagSet) {
// set defaults:
// logger:
v.SetDefault(cfgLoggerLevel, "debug")
v.SetDefault(cfgLoggerDestination, "stdout")
v.SetDefault(cfgLoggerSamplingEnabled, false)
v.SetDefault(cfgLoggerSamplingThereafter, 100)
v.SetDefault(cfgLoggerSamplingInitial, 100)
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
// pool:
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
// frostfs:
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
// web-server:
v.SetDefault(cfgWebReadBufferSize, 4096)
v.SetDefault(cfgWebWriteBufferSize, 4096)
v.SetDefault(cfgWebReadTimeout, time.Minute*10)
v.SetDefault(cfgWebWriteTimeout, time.Minute*5)
v.SetDefault(cfgWebStreamRequestBody, true)
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
v.SetDefault(cfgWorkerPoolSize, 1000)
// upload header
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
// metrics
v.SetDefault(cfgPprofAddress, "localhost:8083")
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
// resolve bucket
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
// multinet
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
if resolveMethods, err := flags.GetStringSlice(cfgResolveOrder); err == nil {
v.SetDefault(cfgResolveOrder, resolveMethods)
}
if peers, err := flags.GetStringArray(cfgPeers); err == nil {
for i := range peers {
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", peers[i])
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1) v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1) v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
} }
} }
}
return v func bindFlags(v *viper.Viper, flags *pflag.FlagSet) error {
// Binding flags
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
return err
}
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
return err
}
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
return err
}
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
return err
}
if err := v.BindPFlags(flags); err != nil {
return err
}
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
return err
}
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
return err
}
if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil {
return err
}
return nil
} }
func readInConfig(v *viper.Viper) error { func readInConfig(v *viper.Viper) error {
@ -616,7 +685,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
} }
func (a *app) initPools(ctx context.Context) { func (a *app) initPools(ctx context.Context) {
key, err := getFrostFSKey(a.cfg, a.log) key, err := getFrostFSKey(a.config(), a.log)
if err != nil { if err != nil {
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err)) a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
} }
@ -628,40 +697,40 @@ func (a *app) initPools(ctx context.Context) {
prmTree.SetKey(key) prmTree.SetKey(key)
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes()))) a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
for _, peer := range fetchPeers(a.log, a.cfg) { for _, peer := range fetchPeers(a.log, a.config()) {
prm.AddNode(peer) prm.AddNode(peer)
prmTree.AddNode(peer) prmTree.AddNode(peer)
} }
connTimeout := a.cfg.GetDuration(cfgConTimeout) connTimeout := a.config().GetDuration(cfgConTimeout)
if connTimeout <= 0 { if connTimeout <= 0 {
connTimeout = defaultConnectTimeout connTimeout = defaultConnectTimeout
} }
prm.SetNodeDialTimeout(connTimeout) prm.SetNodeDialTimeout(connTimeout)
prmTree.SetNodeDialTimeout(connTimeout) prmTree.SetNodeDialTimeout(connTimeout)
streamTimeout := a.cfg.GetDuration(cfgStreamTimeout) streamTimeout := a.config().GetDuration(cfgStreamTimeout)
if streamTimeout <= 0 { if streamTimeout <= 0 {
streamTimeout = defaultStreamTimeout streamTimeout = defaultStreamTimeout
} }
prm.SetNodeStreamTimeout(streamTimeout) prm.SetNodeStreamTimeout(streamTimeout)
prmTree.SetNodeStreamTimeout(streamTimeout) prmTree.SetNodeStreamTimeout(streamTimeout)
healthCheckTimeout := a.cfg.GetDuration(cfgReqTimeout) healthCheckTimeout := a.config().GetDuration(cfgReqTimeout)
if healthCheckTimeout <= 0 { if healthCheckTimeout <= 0 {
healthCheckTimeout = defaultRequestTimeout healthCheckTimeout = defaultRequestTimeout
} }
prm.SetHealthcheckTimeout(healthCheckTimeout) prm.SetHealthcheckTimeout(healthCheckTimeout)
prmTree.SetHealthcheckTimeout(healthCheckTimeout) prmTree.SetHealthcheckTimeout(healthCheckTimeout)
rebalanceInterval := a.cfg.GetDuration(cfgRebalance) rebalanceInterval := a.config().GetDuration(cfgRebalance)
if rebalanceInterval <= 0 { if rebalanceInterval <= 0 {
rebalanceInterval = defaultRebalanceTimer rebalanceInterval = defaultRebalanceTimer
} }
prm.SetClientRebalanceInterval(rebalanceInterval) prm.SetClientRebalanceInterval(rebalanceInterval)
prmTree.SetClientRebalanceInterval(rebalanceInterval) prmTree.SetClientRebalanceInterval(rebalanceInterval)
errorThreshold := a.cfg.GetUint32(cfgPoolErrorThreshold) errorThreshold := a.config().GetUint32(cfgPoolErrorThreshold)
if errorThreshold <= 0 { if errorThreshold <= 0 {
errorThreshold = defaultPoolErrorThreshold errorThreshold = defaultPoolErrorThreshold
} }
@ -669,7 +738,7 @@ func (a *app) initPools(ctx context.Context) {
prm.SetLogger(a.log) prm.SetLogger(a.log)
prmTree.SetLogger(a.log) prmTree.SetLogger(a.log)
prmTree.SetMaxRequestAttempts(a.cfg.GetInt(cfgTreePoolMaxAttempts)) prmTree.SetMaxRequestAttempts(a.config().GetInt(cfgTreePoolMaxAttempts))
interceptors := []grpc.DialOption{ interceptors := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()), grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
@ -688,8 +757,8 @@ func (a *app) initPools(ctx context.Context) {
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err)) a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
} }
if a.cfg.GetBool(cfgFeaturesTreePoolNetmapSupport) { if a.config().GetBool(cfgFeaturesTreePoolNetmapSupport) {
prmTree.SetNetMapInfoSource(frostfs.NewSource(frostfs.NewFrostFS(p), cache.NewNetmapCache(getNetmapCacheOptions(a.cfg, a.log)), a.bucketCache, a.log)) prmTree.SetNetMapInfoSource(frostfs.NewSource(frostfs.NewFrostFS(p), cache.NewNetmapCache(getNetmapCacheOptions(a.config(), a.log)), a.bucketCache, a.log))
} }
treePool, err := treepool.NewPool(prmTree) treePool, err := treepool.NewPool(prmTree)

View file

@ -0,0 +1,60 @@
package main
import (
"os"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"github.com/stretchr/testify/require"
)
func TestConfigReload(t *testing.T) {
f, err := os.CreateTemp("", "conf")
require.NoError(t, err)
defer func() {
require.NoError(t, os.Remove(f.Name()))
}()
confData := `
pprof:
enabled: true
resolve_bucket:
default_namespaces: [""]
resolve_order:
- nns
`
_, err = f.WriteString(confData)
require.NoError(t, err)
require.NoError(t, f.Close())
cfg := settings()
require.NoError(t, cfg.flags.Parse([]string{"--config", f.Name(), "--connect_timeout", "15s"}))
require.NoError(t, cfg.reload())
require.True(t, cfg.config().GetBool(cfgPprofEnabled))
require.Equal(t, []string{""}, cfg.config().GetStringSlice(cfgResolveDefaultNamespaces))
require.Equal(t, []string{resolver.NNSResolver}, cfg.config().GetStringSlice(cfgResolveOrder))
require.Equal(t, 15*time.Second, cfg.config().GetDuration(cfgConTimeout))
require.NoError(t, os.Truncate(f.Name(), 0))
require.NoError(t, cfg.reload())
require.False(t, cfg.config().GetBool(cfgPprofEnabled))
require.Equal(t, []string{"", "root"}, cfg.config().GetStringSlice(cfgResolveDefaultNamespaces))
require.Equal(t, []string{resolver.NNSResolver, resolver.DNSResolver}, cfg.config().GetStringSlice(cfgResolveOrder))
require.Equal(t, 15*time.Second, cfg.config().GetDuration(cfgConTimeout))
}
func TestSetTLSEnabled(t *testing.T) {
cfg := settings()
require.NoError(t, cfg.flags.Parse([]string{"--" + cfgTLSCertFile, "tls.crt", "--" + cfgTLSKeyFile, "tls.key"}))
require.NoError(t, cfg.reload())
require.True(t, cfg.config().GetBool(cfgServer+".0."+cfgTLSEnabled))
}