feature/fix_sighup #199
5 changed files with 269 additions and 137 deletions
|
@ -56,7 +56,7 @@ type (
|
||||||
treePool *treepool.Pool
|
treePool *treepool.Pool
|
||||||
key *keys.PrivateKey
|
key *keys.PrivateKey
|
||||||
owner *user.ID
|
owner *user.ID
|
||||||
cfg *viper.Viper
|
cfg *appCfg
|
||||||
webServer *fasthttp.Server
|
webServer *fasthttp.Server
|
||||||
webDone chan struct{}
|
webDone chan struct{}
|
||||||
resolver *resolver.ContainerResolver
|
resolver *resolver.ContainerResolver
|
||||||
|
@ -123,35 +123,35 @@ type (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func newApp(ctx context.Context, v *viper.Viper) App {
|
func newApp(ctx context.Context, cfg *appCfg) App {
|
||||||
logSettings := &loggerSettings{}
|
logSettings := &loggerSettings{}
|
||||||
log := pickLogger(v, logSettings)
|
log := pickLogger(cfg.config(), logSettings)
|
||||||
|
|
||||||
a := &app{
|
a := &app{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
log: log.logger,
|
log: log.logger,
|
||||||
logLevel: log.lvl,
|
logLevel: log.lvl,
|
||||||
cfg: v,
|
cfg: cfg,
|
||||||
loggerSettings: logSettings,
|
loggerSettings: logSettings,
|
||||||
webServer: new(fasthttp.Server),
|
webServer: new(fasthttp.Server),
|
||||||
webDone: make(chan struct{}),
|
webDone: make(chan struct{}),
|
||||||
bucketCache: cache.NewBucketCache(getBucketCacheOptions(v, log.logger), v.GetBool(cfgFeaturesTreePoolNetmapSupport)),
|
bucketCache: cache.NewBucketCache(getBucketCacheOptions(cfg.config(), log.logger), cfg.config().GetBool(cfgFeaturesTreePoolNetmapSupport)),
|
||||||
}
|
}
|
||||||
|
|
||||||
a.initAppSettings()
|
a.initAppSettings()
|
||||||
|
|
||||||
// -- setup FastHTTP server --
|
// -- setup FastHTTP server --
|
||||||
a.webServer.Name = "frost-http-gw"
|
a.webServer.Name = "frost-http-gw"
|
||||||
a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize)
|
a.webServer.ReadBufferSize = a.config().GetInt(cfgWebReadBufferSize)
|
||||||
a.webServer.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize)
|
a.webServer.WriteBufferSize = a.config().GetInt(cfgWebWriteBufferSize)
|
||||||
a.webServer.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout)
|
a.webServer.ReadTimeout = a.config().GetDuration(cfgWebReadTimeout)
|
||||||
a.webServer.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout)
|
a.webServer.WriteTimeout = a.config().GetDuration(cfgWebWriteTimeout)
|
||||||
a.webServer.DisableHeaderNamesNormalizing = true
|
a.webServer.DisableHeaderNamesNormalizing = true
|
||||||
a.webServer.NoDefaultServerHeader = true
|
a.webServer.NoDefaultServerHeader = true
|
||||||
a.webServer.NoDefaultContentType = true
|
a.webServer.NoDefaultContentType = true
|
||||||
a.webServer.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize)
|
a.webServer.MaxRequestBodySize = a.config().GetInt(cfgWebMaxRequestBodySize)
|
||||||
a.webServer.DisablePreParseMultipartForm = true
|
a.webServer.DisablePreParseMultipartForm = true
|
||||||
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
|
a.webServer.StreamRequestBody = a.config().GetBool(cfgWebStreamRequestBody)
|
||||||
// -- -- -- -- -- -- -- -- -- -- -- -- -- --
|
// -- -- -- -- -- -- -- -- -- -- -- -- -- --
|
||||||
a.initPools(ctx)
|
a.initPools(ctx)
|
||||||
|
|
||||||
|
@ -168,13 +168,17 @@ func newApp(ctx context.Context, v *viper.Viper) App {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) config() *viper.Viper {
|
||||||
|
return a.cfg.config()
|
||||||
|
}
|
||||||
|
|
||||||
func (a *app) initAppSettings() {
|
func (a *app) initAppSettings() {
|
||||||
a.settings = &appSettings{
|
a.settings = &appSettings{
|
||||||
reconnectInterval: fetchReconnectInterval(a.cfg),
|
reconnectInterval: fetchReconnectInterval(a.config()),
|
||||||
dialerSource: getDialerSource(a.log, a.cfg),
|
dialerSource: getDialerSource(a.log, a.config()),
|
||||||
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
|
workerPoolSize: a.config().GetInt(cfgWorkerPoolSize),
|
||||||
}
|
}
|
||||||
a.settings.update(a.cfg, a.log)
|
a.settings.update(a.config(), a.log)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
|
func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
|
||||||
|
@ -327,11 +331,11 @@ func (a *app) initResolver() {
|
||||||
func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
||||||
resolveCfg := &resolver.Config{
|
resolveCfg := &resolver.Config{
|
||||||
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
||||||
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
RPCAddress: a.config().GetString(cfgRPCEndpoint),
|
||||||
Settings: a.settings,
|
Settings: a.settings,
|
||||||
}
|
}
|
||||||
|
|
||||||
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
order := a.config().GetStringSlice(cfgResolveOrder)
|
||||||
if resolveCfg.RPCAddress == "" {
|
if resolveCfg.RPCAddress == "" {
|
||||||
order = remove(order, resolver.NNSResolver)
|
order = remove(order, resolver.NNSResolver)
|
||||||
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
|
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
|
||||||
|
@ -346,7 +350,7 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
||||||
|
|
||||||
func (a *app) initMetrics() {
|
func (a *app) initMetrics() {
|
||||||
gateMetricsProvider := metrics.NewGateMetrics(a.pool)
|
gateMetricsProvider := metrics.NewGateMetrics(a.pool)
|
||||||
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
|
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.config().GetBool(cfgPrometheusEnabled))
|
||||||
a.metrics.SetHealth(metrics.HealthStatusStarting)
|
a.metrics.SetHealth(metrics.HealthStatusStarting)
|
||||||
a.loggerSettings.setMetrics(a.metrics.provider)
|
a.loggerSettings.setMetrics(a.metrics.provider)
|
||||||
}
|
}
|
||||||
|
@ -574,22 +578,22 @@ func (a *app) shutdownTracing() {
|
||||||
|
|
||||||
func (a *app) configReload(ctx context.Context) {
|
func (a *app) configReload(ctx context.Context) {
|
||||||
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
||||||
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
if !a.config().IsSet(cmdConfig) && !a.config().IsSet(cmdConfigDir) {
|
||||||
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := readInConfig(a.cfg); err != nil {
|
if err := a.cfg.reload(); err != nil {
|
||||||
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if lvl, err := getLogLevel(a.cfg); err != nil {
|
if lvl, err := getLogLevel(a.config()); err != nil {
|
||||||
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
a.logLevel.SetLevel(lvl)
|
a.logLevel.SetLevel(lvl)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
|
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.config(), a.log)); err != nil {
|
||||||
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -606,9 +610,9 @@ func (a *app) configReload(ctx context.Context) {
|
||||||
a.stopServices()
|
a.stopServices()
|
||||||
a.startServices()
|
a.startServices()
|
||||||
|
|
||||||
a.settings.update(a.cfg, a.log)
|
a.settings.update(a.config(), a.log)
|
||||||
|
|
||||||
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
a.metrics.SetEnabled(a.config().GetBool(cfgPrometheusEnabled))
|
||||||
a.initTracing(ctx)
|
a.initTracing(ctx)
|
||||||
a.setHealthStatus()
|
a.setHealthStatus()
|
||||||
|
|
||||||
|
@ -618,12 +622,12 @@ func (a *app) configReload(ctx context.Context) {
|
||||||
func (a *app) startServices() {
|
func (a *app) startServices() {
|
||||||
a.services = a.services[:0]
|
a.services = a.services[:0]
|
||||||
|
|
||||||
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
|
pprofConfig := metrics.Config{Enabled: a.config().GetBool(cfgPprofEnabled), Address: a.config().GetString(cfgPprofAddress)}
|
||||||
pprofService := metrics.NewPprofService(a.log, pprofConfig)
|
pprofService := metrics.NewPprofService(a.log, pprofConfig)
|
||||||
a.services = append(a.services, pprofService)
|
a.services = append(a.services, pprofService)
|
||||||
go pprofService.Start()
|
go pprofService.Start()
|
||||||
|
|
||||||
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
|
prometheusConfig := metrics.Config{Enabled: a.config().GetBool(cfgPrometheusEnabled), Address: a.config().GetString(cfgPrometheusAddress)}
|
||||||
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
|
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
|
||||||
a.services = append(a.services, prometheusService)
|
a.services = append(a.services, prometheusService)
|
||||||
go prometheusService.Start()
|
go prometheusService.Start()
|
||||||
|
@ -850,7 +854,7 @@ func (a *app) AppParams() *handler.AppParams {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initServers(ctx context.Context) {
|
func (a *app) initServers(ctx context.Context) {
|
||||||
serversInfo := fetchServers(a.cfg, a.log)
|
serversInfo := fetchServers(a.config(), a.log)
|
||||||
|
|
||||||
a.servers = make([]Server, 0, len(serversInfo))
|
a.servers = make([]Server, 0, len(serversInfo))
|
||||||
for _, serverInfo := range serversInfo {
|
for _, serverInfo := range serversInfo {
|
||||||
|
@ -877,7 +881,7 @@ func (a *app) initServers(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) updateServers() error {
|
func (a *app) updateServers() error {
|
||||||
serversInfo := fetchServers(a.cfg, a.log)
|
serversInfo := fetchServers(a.config(), a.log)
|
||||||
|
|
||||||
a.mu.Lock()
|
a.mu.Lock()
|
||||||
defer a.mu.Unlock()
|
defer a.mu.Unlock()
|
||||||
|
@ -935,15 +939,15 @@ func (a *app) initTracing(ctx context.Context) {
|
||||||
instanceID = a.servers[0].Address()
|
instanceID = a.servers[0].Address()
|
||||||
}
|
}
|
||||||
cfg := tracing.Config{
|
cfg := tracing.Config{
|
||||||
Enabled: a.cfg.GetBool(cfgTracingEnabled),
|
Enabled: a.config().GetBool(cfgTracingEnabled),
|
||||||
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
|
Exporter: tracing.Exporter(a.config().GetString(cfgTracingExporter)),
|
||||||
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
|
Endpoint: a.config().GetString(cfgTracingEndpoint),
|
||||||
Service: "frostfs-http-gw",
|
Service: "frostfs-http-gw",
|
||||||
InstanceID: instanceID,
|
InstanceID: instanceID,
|
||||||
Version: Version,
|
Version: Version,
|
||||||
}
|
}
|
||||||
|
|
||||||
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" {
|
if trustedCa := a.config().GetString(cfgTracingTrustedCa); trustedCa != "" {
|
||||||
caBytes, err := os.ReadFile(trustedCa)
|
caBytes, err := os.ReadFile(trustedCa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||||
|
@ -958,7 +962,7 @@ func (a *app) initTracing(ctx context.Context) {
|
||||||
cfg.ServerCaCertPool = certPool
|
cfg.ServerCaCertPool = certPool
|
||||||
}
|
}
|
||||||
|
|
||||||
attributes, err := fetchTracingAttributes(a.cfg)
|
attributes, err := fetchTracingAttributes(a.config())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||||
return
|
return
|
||||||
|
@ -981,7 +985,7 @@ func (a *app) setRuntimeParameters() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
softMemoryLimit := fetchSoftMemoryLimit(a.cfg)
|
softMemoryLimit := fetchSoftMemoryLimit(a.config())
|
||||||
previous := debug.SetMemoryLimit(softMemoryLimit)
|
previous := debug.SetMemoryLimit(softMemoryLimit)
|
||||||
if softMemoryLimit != previous {
|
if softMemoryLimit != previous {
|
||||||
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
||||||
|
|
|
@ -32,7 +32,6 @@ import (
|
||||||
docker "github.com/docker/docker/api/types/container"
|
docker "github.com/docker/docker/api/types/container"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||||
"github.com/spf13/viper"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/testcontainers/testcontainers-go"
|
"github.com/testcontainers/testcontainers-go"
|
||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
@ -108,8 +107,8 @@ func runServer(pathToWallet string) (App, context.CancelFunc) {
|
||||||
cancelCtx, cancel := context.WithCancel(context.Background())
|
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
v := getDefaultConfig()
|
v := getDefaultConfig()
|
||||||
v.Set(cfgWalletPath, pathToWallet)
|
v.config().Set(cfgWalletPath, pathToWallet)
|
||||||
v.Set(cfgWalletPassphrase, "")
|
v.config().Set(cfgWalletPassphrase, "")
|
||||||
|
|
||||||
application := newApp(cancelCtx, v)
|
application := newApp(cancelCtx, v)
|
||||||
go application.Serve()
|
go application.Serve()
|
||||||
|
@ -452,14 +451,14 @@ func createDockerContainer(ctx context.Context, t *testing.T, image string) test
|
||||||
return aioC
|
return aioC
|
||||||
}
|
}
|
||||||
|
|
||||||
func getDefaultConfig() *viper.Viper {
|
func getDefaultConfig() *appCfg {
|
||||||
v := settings()
|
v := settings()
|
||||||
v.SetDefault(cfgPeers+".0.address", "localhost:8080")
|
v.config().SetDefault(cfgPeers+".0.address", "localhost:8080")
|
||||||
v.SetDefault(cfgPeers+".0.weight", 1)
|
v.config().SetDefault(cfgPeers+".0.weight", 1)
|
||||||
v.SetDefault(cfgPeers+".0.priority", 1)
|
v.config().SetDefault(cfgPeers+".0.priority", 1)
|
||||||
|
|
||||||
v.SetDefault(cfgRPCEndpoint, "http://localhost:30333")
|
v.config().SetDefault(cfgRPCEndpoint, "http://localhost:30333")
|
||||||
v.SetDefault("server.0.address", testListenAddress)
|
v.config().SetDefault("server.0.address", testListenAddress)
|
||||||
|
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,9 +8,9 @@ import (
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
v := settings()
|
cfg := settings()
|
||||||
|
|
||||||
application := newApp(globalContext, v)
|
application := newApp(globalContext, cfg)
|
||||||
go application.Serve()
|
go application.Serve()
|
||||||
application.Wait()
|
application.Wait()
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
|
@ -197,14 +198,72 @@ type Logger struct {
|
||||||
lvl zap.AtomicLevel
|
lvl zap.AtomicLevel
|
||||||
}
|
}
|
||||||
|
|
||||||
func settings() *viper.Viper {
|
type appCfg struct {
|
||||||
|
flags *pflag.FlagSet
|
||||||
|
|
||||||
|
mu sync.RWMutex
|
||||||
|
settings *viper.Viper
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appCfg) reload() error {
|
||||||
|
old := a.config()
|
||||||
|
|
||||||
|
v, err := newViper(a.flags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if old.IsSet(cmdConfig) {
|
||||||
|
v.Set(cmdConfig, old.Get(cmdConfig))
|
||||||
|
}
|
||||||
|
if old.IsSet(cmdConfigDir) {
|
||||||
|
v.Set(cmdConfigDir, old.Get(cmdConfigDir))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = readInConfig(v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.setConfig(v)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appCfg) config() *viper.Viper {
|
||||||
|
a.mu.RLock()
|
||||||
|
defer a.mu.RUnlock()
|
||||||
|
|
||||||
|
return a.settings
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appCfg) setConfig(v *viper.Viper) {
|
||||||
|
a.mu.Lock()
|
||||||
|
a.settings = v
|
||||||
|
a.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newViper(flags *pflag.FlagSet) (*viper.Viper, error) {
|
||||||
v := viper.New()
|
v := viper.New()
|
||||||
|
|
||||||
v.AutomaticEnv()
|
v.AutomaticEnv()
|
||||||
v.SetEnvPrefix(Prefix)
|
v.SetEnvPrefix(Prefix)
|
||||||
v.AllowEmptyEnv(true)
|
v.AllowEmptyEnv(true)
|
||||||
v.SetConfigType("yaml")
|
v.SetConfigType("yaml")
|
||||||
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||||
|
|
||||||
|
if err := bindFlags(v, flags); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
setDefaults(v, flags)
|
||||||
|
|
||||||
|
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
|
||||||
|
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func settings() *appCfg {
|
||||||
// flags setup:
|
// flags setup:
|
||||||
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
|
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
|
||||||
flags.SetOutput(os.Stdout)
|
flags.SetOutput(os.Stdout)
|
||||||
|
@ -228,89 +287,17 @@ func settings() *viper.Viper {
|
||||||
flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen")
|
flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen")
|
||||||
flags.String(cfgTLSCertFile, "", "TLS certificate path")
|
flags.String(cfgTLSCertFile, "", "TLS certificate path")
|
||||||
flags.String(cfgTLSKeyFile, "", "TLS key path")
|
flags.String(cfgTLSKeyFile, "", "TLS key path")
|
||||||
peers := flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes")
|
flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes")
|
||||||
|
|
||||||
resolveMethods := flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order")
|
flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order")
|
||||||
|
|
||||||
// set defaults:
|
|
||||||
|
|
||||||
// logger:
|
|
||||||
v.SetDefault(cfgLoggerLevel, "debug")
|
|
||||||
v.SetDefault(cfgLoggerDestination, "stdout")
|
|
||||||
v.SetDefault(cfgLoggerSamplingEnabled, false)
|
|
||||||
v.SetDefault(cfgLoggerSamplingThereafter, 100)
|
|
||||||
v.SetDefault(cfgLoggerSamplingInitial, 100)
|
|
||||||
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
|
|
||||||
|
|
||||||
// pool:
|
|
||||||
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
|
||||||
|
|
||||||
// frostfs:
|
|
||||||
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
|
||||||
|
|
||||||
// web-server:
|
|
||||||
v.SetDefault(cfgWebReadBufferSize, 4096)
|
|
||||||
v.SetDefault(cfgWebWriteBufferSize, 4096)
|
|
||||||
v.SetDefault(cfgWebReadTimeout, time.Minute*10)
|
|
||||||
v.SetDefault(cfgWebWriteTimeout, time.Minute*5)
|
|
||||||
v.SetDefault(cfgWebStreamRequestBody, true)
|
|
||||||
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
|
|
||||||
|
|
||||||
v.SetDefault(cfgWorkerPoolSize, 1000)
|
|
||||||
// upload header
|
|
||||||
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
|
|
||||||
|
|
||||||
// metrics
|
|
||||||
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
|
||||||
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
|
||||||
|
|
||||||
// resolve bucket
|
|
||||||
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
|
||||||
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
|
|
||||||
|
|
||||||
// multinet
|
|
||||||
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
|
||||||
|
|
||||||
// Binding flags
|
|
||||||
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := v.BindPFlags(flags); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := flags.Parse(os.Args); err != nil {
|
if err := flags.Parse(os.Args); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
|
v, err := newViper(flags)
|
||||||
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
|
if err != nil {
|
||||||
}
|
panic(fmt.Errorf("bind flags: %w", err))
|
||||||
|
|
||||||
if resolveMethods != nil {
|
|
||||||
v.SetDefault(cfgResolveOrder, *resolveMethods)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
|
@ -355,15 +342,97 @@ func settings() *viper.Viper {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if peers != nil && len(*peers) > 0 {
|
return &appCfg{
|
||||||
for i := range *peers {
|
flags: flags,
|
||||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", (*peers)[i])
|
settings: v,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setDefaults(v *viper.Viper, flags *pflag.FlagSet) {
|
||||||
|
// set defaults:
|
||||||
|
|
||||||
|
// logger:
|
||||||
|
v.SetDefault(cfgLoggerLevel, "debug")
|
||||||
|
v.SetDefault(cfgLoggerDestination, "stdout")
|
||||||
|
v.SetDefault(cfgLoggerSamplingEnabled, false)
|
||||||
|
v.SetDefault(cfgLoggerSamplingThereafter, 100)
|
||||||
|
v.SetDefault(cfgLoggerSamplingInitial, 100)
|
||||||
|
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
|
||||||
|
|
||||||
|
// pool:
|
||||||
|
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
||||||
|
|
||||||
|
// frostfs:
|
||||||
|
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
||||||
|
|
||||||
|
// web-server:
|
||||||
|
v.SetDefault(cfgWebReadBufferSize, 4096)
|
||||||
|
v.SetDefault(cfgWebWriteBufferSize, 4096)
|
||||||
|
v.SetDefault(cfgWebReadTimeout, time.Minute*10)
|
||||||
|
v.SetDefault(cfgWebWriteTimeout, time.Minute*5)
|
||||||
|
v.SetDefault(cfgWebStreamRequestBody, true)
|
||||||
|
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
|
||||||
|
|
||||||
|
v.SetDefault(cfgWorkerPoolSize, 1000)
|
||||||
|
// upload header
|
||||||
|
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
|
||||||
|
|
||||||
|
// metrics
|
||||||
|
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
||||||
|
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
||||||
|
|
||||||
|
// resolve bucket
|
||||||
|
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
||||||
|
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
|
||||||
|
|
||||||
|
// multinet
|
||||||
|
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
||||||
|
|
||||||
|
if resolveMethods, err := flags.GetStringSlice(cfgResolveOrder); err == nil {
|
||||||
|
v.SetDefault(cfgResolveOrder, resolveMethods)
|
||||||
|
}
|
||||||
|
|
||||||
|
if peers, err := flags.GetStringArray(cfgPeers); err == nil {
|
||||||
|
for i := range peers {
|
||||||
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", peers[i])
|
||||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
|
||||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return v
|
func bindFlags(v *viper.Viper, flags *pflag.FlagSet) error {
|
||||||
|
// Binding flags
|
||||||
|
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := v.BindPFlags(flags); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func readInConfig(v *viper.Viper) error {
|
func readInConfig(v *viper.Viper) error {
|
||||||
|
@ -616,7 +685,7 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initPools(ctx context.Context) {
|
func (a *app) initPools(ctx context.Context) {
|
||||||
key, err := getFrostFSKey(a.cfg, a.log)
|
key, err := getFrostFSKey(a.config(), a.log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -628,40 +697,40 @@ func (a *app) initPools(ctx context.Context) {
|
||||||
prmTree.SetKey(key)
|
prmTree.SetKey(key)
|
||||||
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
||||||
|
|
||||||
for _, peer := range fetchPeers(a.log, a.cfg) {
|
for _, peer := range fetchPeers(a.log, a.config()) {
|
||||||
prm.AddNode(peer)
|
prm.AddNode(peer)
|
||||||
prmTree.AddNode(peer)
|
prmTree.AddNode(peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
connTimeout := a.cfg.GetDuration(cfgConTimeout)
|
connTimeout := a.config().GetDuration(cfgConTimeout)
|
||||||
if connTimeout <= 0 {
|
if connTimeout <= 0 {
|
||||||
connTimeout = defaultConnectTimeout
|
connTimeout = defaultConnectTimeout
|
||||||
}
|
}
|
||||||
prm.SetNodeDialTimeout(connTimeout)
|
prm.SetNodeDialTimeout(connTimeout)
|
||||||
prmTree.SetNodeDialTimeout(connTimeout)
|
prmTree.SetNodeDialTimeout(connTimeout)
|
||||||
|
|
||||||
streamTimeout := a.cfg.GetDuration(cfgStreamTimeout)
|
streamTimeout := a.config().GetDuration(cfgStreamTimeout)
|
||||||
if streamTimeout <= 0 {
|
if streamTimeout <= 0 {
|
||||||
streamTimeout = defaultStreamTimeout
|
streamTimeout = defaultStreamTimeout
|
||||||
}
|
}
|
||||||
prm.SetNodeStreamTimeout(streamTimeout)
|
prm.SetNodeStreamTimeout(streamTimeout)
|
||||||
prmTree.SetNodeStreamTimeout(streamTimeout)
|
prmTree.SetNodeStreamTimeout(streamTimeout)
|
||||||
|
|
||||||
healthCheckTimeout := a.cfg.GetDuration(cfgReqTimeout)
|
healthCheckTimeout := a.config().GetDuration(cfgReqTimeout)
|
||||||
if healthCheckTimeout <= 0 {
|
if healthCheckTimeout <= 0 {
|
||||||
healthCheckTimeout = defaultRequestTimeout
|
healthCheckTimeout = defaultRequestTimeout
|
||||||
}
|
}
|
||||||
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
||||||
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
||||||
|
|
||||||
rebalanceInterval := a.cfg.GetDuration(cfgRebalance)
|
rebalanceInterval := a.config().GetDuration(cfgRebalance)
|
||||||
if rebalanceInterval <= 0 {
|
if rebalanceInterval <= 0 {
|
||||||
rebalanceInterval = defaultRebalanceTimer
|
rebalanceInterval = defaultRebalanceTimer
|
||||||
}
|
}
|
||||||
prm.SetClientRebalanceInterval(rebalanceInterval)
|
prm.SetClientRebalanceInterval(rebalanceInterval)
|
||||||
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
||||||
|
|
||||||
errorThreshold := a.cfg.GetUint32(cfgPoolErrorThreshold)
|
errorThreshold := a.config().GetUint32(cfgPoolErrorThreshold)
|
||||||
if errorThreshold <= 0 {
|
if errorThreshold <= 0 {
|
||||||
errorThreshold = defaultPoolErrorThreshold
|
errorThreshold = defaultPoolErrorThreshold
|
||||||
}
|
}
|
||||||
|
@ -669,7 +738,7 @@ func (a *app) initPools(ctx context.Context) {
|
||||||
prm.SetLogger(a.log)
|
prm.SetLogger(a.log)
|
||||||
prmTree.SetLogger(a.log)
|
prmTree.SetLogger(a.log)
|
||||||
|
|
||||||
prmTree.SetMaxRequestAttempts(a.cfg.GetInt(cfgTreePoolMaxAttempts))
|
prmTree.SetMaxRequestAttempts(a.config().GetInt(cfgTreePoolMaxAttempts))
|
||||||
|
|
||||||
interceptors := []grpc.DialOption{
|
interceptors := []grpc.DialOption{
|
||||||
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
||||||
|
@ -688,8 +757,8 @@ func (a *app) initPools(ctx context.Context) {
|
||||||
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if a.cfg.GetBool(cfgFeaturesTreePoolNetmapSupport) {
|
if a.config().GetBool(cfgFeaturesTreePoolNetmapSupport) {
|
||||||
prmTree.SetNetMapInfoSource(frostfs.NewSource(frostfs.NewFrostFS(p), cache.NewNetmapCache(getNetmapCacheOptions(a.cfg, a.log)), a.bucketCache, a.log))
|
prmTree.SetNetMapInfoSource(frostfs.NewSource(frostfs.NewFrostFS(p), cache.NewNetmapCache(getNetmapCacheOptions(a.config(), a.log)), a.bucketCache, a.log))
|
||||||
}
|
}
|
||||||
|
|
||||||
treePool, err := treepool.NewPool(prmTree)
|
treePool, err := treepool.NewPool(prmTree)
|
||||||
|
|
60
cmd/http-gw/settings_test.go
Normal file
60
cmd/http-gw/settings_test.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConfigReload(t *testing.T) {
|
||||||
|
f, err := os.CreateTemp("", "conf")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.Remove(f.Name()))
|
||||||
|
}()
|
||||||
|
|
||||||
|
confData := `
|
||||||
|
pprof:
|
||||||
|
enabled: true
|
||||||
|
|
||||||
|
resolve_bucket:
|
||||||
|
default_namespaces: [""]
|
||||||
|
|
||||||
|
resolve_order:
|
||||||
|
- nns
|
||||||
|
`
|
||||||
|
|
||||||
|
_, err = f.WriteString(confData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, f.Close())
|
||||||
|
|
||||||
|
cfg := settings()
|
||||||
|
|
||||||
|
require.NoError(t, cfg.flags.Parse([]string{"--config", f.Name(), "--connect_timeout", "15s"}))
|
||||||
|
require.NoError(t, cfg.reload())
|
||||||
|
|
||||||
|
require.True(t, cfg.config().GetBool(cfgPprofEnabled))
|
||||||
|
require.Equal(t, []string{""}, cfg.config().GetStringSlice(cfgResolveDefaultNamespaces))
|
||||||
|
require.Equal(t, []string{resolver.NNSResolver}, cfg.config().GetStringSlice(cfgResolveOrder))
|
||||||
|
require.Equal(t, 15*time.Second, cfg.config().GetDuration(cfgConTimeout))
|
||||||
|
|
||||||
|
require.NoError(t, os.Truncate(f.Name(), 0))
|
||||||
|
require.NoError(t, cfg.reload())
|
||||||
|
|
||||||
|
require.False(t, cfg.config().GetBool(cfgPprofEnabled))
|
||||||
|
require.Equal(t, []string{"", "root"}, cfg.config().GetStringSlice(cfgResolveDefaultNamespaces))
|
||||||
|
require.Equal(t, []string{resolver.NNSResolver, resolver.DNSResolver}, cfg.config().GetStringSlice(cfgResolveOrder))
|
||||||
|
require.Equal(t, 15*time.Second, cfg.config().GetDuration(cfgConTimeout))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSetTLSEnabled(t *testing.T) {
|
||||||
|
cfg := settings()
|
||||||
|
|
||||||
|
require.NoError(t, cfg.flags.Parse([]string{"--" + cfgTLSCertFile, "tls.crt", "--" + cfgTLSKeyFile, "tls.key"}))
|
||||||
|
require.NoError(t, cfg.reload())
|
||||||
|
|
||||||
|
require.True(t, cfg.config().GetBool(cfgServer+".0."+cfgTLSEnabled))
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue