forked from TrueCloudLab/frostfs-s3-gw
[#541] Use default value if config param is unset after SIGHUP
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
619385836d
commit
250538a9b4
4 changed files with 234 additions and 127 deletions
116
cmd/s3-gw/app.go
116
cmd/s3-gw/app.go
|
@ -65,7 +65,7 @@ type (
|
||||||
App struct {
|
App struct {
|
||||||
ctr s3middleware.Center
|
ctr s3middleware.Center
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
cfg *viper.Viper
|
cfg *appCfg
|
||||||
pool *pool.Pool
|
pool *pool.Pool
|
||||||
treePool *treepool.Pool
|
treePool *treepool.Pool
|
||||||
key *keys.PrivateKey
|
key *keys.PrivateKey
|
||||||
|
@ -161,15 +161,15 @@ func (s *loggerSettings) setMetrics(appMetrics *metrics.AppMetrics) {
|
||||||
s.appMetrics = appMetrics
|
s.appMetrics = appMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
func newApp(ctx context.Context, v *viper.Viper) *App {
|
func newApp(ctx context.Context, cfg *appCfg) *App {
|
||||||
logSettings := &loggerSettings{}
|
logSettings := &loggerSettings{}
|
||||||
log := pickLogger(v, logSettings)
|
log := pickLogger(cfg.config(), logSettings)
|
||||||
settings := newAppSettings(log, v)
|
settings := newAppSettings(log, cfg.config())
|
||||||
appCache := layer.NewCache(getCacheOptions(v, log.logger))
|
appCache := layer.NewCache(getCacheOptions(cfg.config(), log.logger))
|
||||||
|
|
||||||
app := &App{
|
app := &App{
|
||||||
log: log.logger,
|
log: log.logger,
|
||||||
cfg: v,
|
cfg: cfg,
|
||||||
cache: appCache,
|
cache: appCache,
|
||||||
|
|
||||||
webDone: make(chan struct{}, 1),
|
webDone: make(chan struct{}, 1),
|
||||||
|
@ -184,6 +184,10 @@ func newApp(ctx context.Context, v *viper.Viper) *App {
|
||||||
return app
|
return app
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *App) config() *viper.Viper {
|
||||||
|
return a.cfg.config()
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) init(ctx context.Context) {
|
func (a *App) init(ctx context.Context) {
|
||||||
a.initPools(ctx)
|
a.initPools(ctx)
|
||||||
a.initResolver()
|
a.initResolver()
|
||||||
|
@ -198,7 +202,7 @@ func (a *App) init(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initAuthCenter(ctx context.Context) {
|
func (a *App) initAuthCenter(ctx context.Context) {
|
||||||
if a.cfg.IsSet(cfgContainersAccessBox) {
|
if a.config().IsSet(cfgContainersAccessBox) {
|
||||||
cnrID, err := a.resolveContainerID(ctx, cfgContainersAccessBox)
|
cnrID, err := a.resolveContainerID(ctx, cfgContainersAccessBox)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.CouldNotFetchAccessBoxContainerInfo, zap.Error(err))
|
a.log.Fatal(logs.CouldNotFetchAccessBoxContainerInfo, zap.Error(err))
|
||||||
|
@ -209,11 +213,11 @@ func (a *App) initAuthCenter(ctx context.Context) {
|
||||||
cfg := tokens.Config{
|
cfg := tokens.Config{
|
||||||
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(a.pool, a.key), a.log),
|
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(a.pool, a.key), a.log),
|
||||||
Key: a.key,
|
Key: a.key,
|
||||||
CacheConfig: getAccessBoxCacheConfig(a.cfg, a.log),
|
CacheConfig: getAccessBoxCacheConfig(a.config(), a.log),
|
||||||
RemovingCheckAfterDurations: fetchRemovingCheckInterval(a.cfg, a.log),
|
RemovingCheckAfterDurations: fetchRemovingCheckInterval(a.config(), a.log),
|
||||||
}
|
}
|
||||||
|
|
||||||
a.ctr = auth.New(tokens.New(cfg), a.cfg.GetStringSlice(cfgAllowedAccessKeyIDPrefixes), a.settings)
|
a.ctr = auth.New(tokens.New(cfg), a.config().GetStringSlice(cfgAllowedAccessKeyIDPrefixes), a.settings)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initLayer(ctx context.Context) {
|
func (a *App) initLayer(ctx context.Context) {
|
||||||
|
@ -227,7 +231,7 @@ func (a *App) initLayer(ctx context.Context) {
|
||||||
user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey)
|
user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey)
|
||||||
|
|
||||||
var corsCnrInfo *data.BucketInfo
|
var corsCnrInfo *data.BucketInfo
|
||||||
if a.cfg.IsSet(cfgContainersCORS) {
|
if a.config().IsSet(cfgContainersCORS) {
|
||||||
corsCnrInfo, err = a.fetchContainerInfo(ctx, cfgContainersCORS)
|
corsCnrInfo, err = a.fetchContainerInfo(ctx, cfgContainersCORS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.CouldNotFetchCORSContainerInfo, zap.Error(err))
|
a.log.Fatal(logs.CouldNotFetchCORSContainerInfo, zap.Error(err))
|
||||||
|
@ -235,7 +239,7 @@ func (a *App) initLayer(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var lifecycleCnrInfo *data.BucketInfo
|
var lifecycleCnrInfo *data.BucketInfo
|
||||||
if a.cfg.IsSet(cfgContainersLifecycle) {
|
if a.config().IsSet(cfgContainersLifecycle) {
|
||||||
lifecycleCnrInfo, err = a.fetchContainerInfo(ctx, cfgContainersLifecycle)
|
lifecycleCnrInfo, err = a.fetchContainerInfo(ctx, cfgContainersLifecycle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.CouldNotFetchLifecycleContainerInfo, zap.Error(err))
|
a.log.Fatal(logs.CouldNotFetchLifecycleContainerInfo, zap.Error(err))
|
||||||
|
@ -599,7 +603,7 @@ func (a *App) initMetrics() {
|
||||||
Logger: a.log,
|
Logger: a.log,
|
||||||
PoolStatistics: frostfs.NewPoolStatistic(a.pool),
|
PoolStatistics: frostfs.NewPoolStatistic(a.pool),
|
||||||
TreeStatistic: a.treePool,
|
TreeStatistic: a.treePool,
|
||||||
Enabled: a.cfg.GetBool(cfgPrometheusEnabled),
|
Enabled: a.config().GetBool(cfgPrometheusEnabled),
|
||||||
}
|
}
|
||||||
|
|
||||||
a.metrics = metrics.NewAppMetrics(cfg)
|
a.metrics = metrics.NewAppMetrics(cfg)
|
||||||
|
@ -609,9 +613,9 @@ func (a *App) initMetrics() {
|
||||||
|
|
||||||
func (a *App) initFrostfsID(ctx context.Context) {
|
func (a *App) initFrostfsID(ctx context.Context) {
|
||||||
cli, err := ffidcontract.New(ctx, ffidcontract.Config{
|
cli, err := ffidcontract.New(ctx, ffidcontract.Config{
|
||||||
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
RPCAddress: a.config().GetString(cfgRPCEndpoint),
|
||||||
Contract: a.cfg.GetString(cfgFrostfsIDContract),
|
Contract: a.config().GetString(cfgFrostfsIDContract),
|
||||||
ProxyContract: a.cfg.GetString(cfgProxyContract),
|
ProxyContract: a.config().GetString(cfgProxyContract),
|
||||||
Key: a.key,
|
Key: a.key,
|
||||||
Waiter: commonclient.WaiterOptions{
|
Waiter: commonclient.WaiterOptions{
|
||||||
IgnoreAlreadyExistsError: false,
|
IgnoreAlreadyExistsError: false,
|
||||||
|
@ -623,7 +627,7 @@ func (a *App) initFrostfsID(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
a.frostfsid, err = frostfsid.NewFrostFSID(frostfsid.Config{
|
a.frostfsid, err = frostfsid.NewFrostFSID(frostfsid.Config{
|
||||||
Cache: cache.NewFrostfsIDCache(getFrostfsIDCacheConfig(a.cfg, a.log)),
|
Cache: cache.NewFrostfsIDCache(getFrostfsIDCacheConfig(a.config(), a.log)),
|
||||||
FrostFSID: cli,
|
FrostFSID: cli,
|
||||||
Logger: a.log,
|
Logger: a.log,
|
||||||
})
|
})
|
||||||
|
@ -634,9 +638,9 @@ func (a *App) initFrostfsID(ctx context.Context) {
|
||||||
|
|
||||||
func (a *App) initPolicyStorage(ctx context.Context) {
|
func (a *App) initPolicyStorage(ctx context.Context) {
|
||||||
policyContract, err := contract.New(ctx, contract.Config{
|
policyContract, err := contract.New(ctx, contract.Config{
|
||||||
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
RPCAddress: a.config().GetString(cfgRPCEndpoint),
|
||||||
Contract: a.cfg.GetString(cfgPolicyContract),
|
Contract: a.config().GetString(cfgPolicyContract),
|
||||||
ProxyContract: a.cfg.GetString(cfgProxyContract),
|
ProxyContract: a.config().GetString(cfgProxyContract),
|
||||||
Key: a.key,
|
Key: a.key,
|
||||||
Waiter: commonclient.WaiterOptions{
|
Waiter: commonclient.WaiterOptions{
|
||||||
IgnoreAlreadyExistsError: false,
|
IgnoreAlreadyExistsError: false,
|
||||||
|
@ -649,7 +653,7 @@ func (a *App) initPolicyStorage(ctx context.Context) {
|
||||||
|
|
||||||
a.policyStorage = policy.NewStorage(policy.StorageConfig{
|
a.policyStorage = policy.NewStorage(policy.StorageConfig{
|
||||||
Contract: policyContract,
|
Contract: policyContract,
|
||||||
Cache: cache.NewMorphPolicyCache(getMorphPolicyCacheConfig(a.cfg, a.log)),
|
Cache: cache.NewMorphPolicyCache(getMorphPolicyCacheConfig(a.config(), a.log)),
|
||||||
Log: a.log,
|
Log: a.log,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -665,13 +669,13 @@ func (a *App) initResolver() {
|
||||||
func (a *App) getResolverConfig() *resolver.Config {
|
func (a *App) getResolverConfig() *resolver.Config {
|
||||||
return &resolver.Config{
|
return &resolver.Config{
|
||||||
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
||||||
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
RPCAddress: a.config().GetString(cfgRPCEndpoint),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) getResolverOrder() []string {
|
func (a *App) getResolverOrder() []string {
|
||||||
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
order := a.config().GetStringSlice(cfgResolveOrder)
|
||||||
if a.cfg.GetString(cfgRPCEndpoint) == "" {
|
if a.config().GetString(cfgRPCEndpoint) == "" {
|
||||||
order = remove(order, resolver.NNSResolver)
|
order = remove(order, resolver.NNSResolver)
|
||||||
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
|
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
|
||||||
}
|
}
|
||||||
|
@ -689,15 +693,15 @@ func (a *App) initTracing(ctx context.Context) {
|
||||||
instanceID = a.servers[0].Address()
|
instanceID = a.servers[0].Address()
|
||||||
}
|
}
|
||||||
cfg := tracing.Config{
|
cfg := tracing.Config{
|
||||||
Enabled: a.cfg.GetBool(cfgTracingEnabled),
|
Enabled: a.config().GetBool(cfgTracingEnabled),
|
||||||
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
|
Exporter: tracing.Exporter(a.config().GetString(cfgTracingExporter)),
|
||||||
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
|
Endpoint: a.config().GetString(cfgTracingEndpoint),
|
||||||
Service: "frostfs-s3-gw",
|
Service: "frostfs-s3-gw",
|
||||||
InstanceID: instanceID,
|
InstanceID: instanceID,
|
||||||
Version: version.Version,
|
Version: version.Version,
|
||||||
}
|
}
|
||||||
|
|
||||||
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" {
|
if trustedCa := a.config().GetString(cfgTracingTrustedCa); trustedCa != "" {
|
||||||
caBytes, err := os.ReadFile(trustedCa)
|
caBytes, err := os.ReadFile(trustedCa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||||
|
@ -712,7 +716,7 @@ func (a *App) initTracing(ctx context.Context) {
|
||||||
cfg.ServerCaCertPool = certPool
|
cfg.ServerCaCertPool = certPool
|
||||||
}
|
}
|
||||||
|
|
||||||
attributes, err := fetchTracingAttributes(a.cfg)
|
attributes, err := fetchTracingAttributes(a.config())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||||
return
|
return
|
||||||
|
@ -760,8 +764,8 @@ func (a *App) initPools(ctx context.Context) {
|
||||||
var prm pool.InitParameters
|
var prm pool.InitParameters
|
||||||
var prmTree treepool.InitParameters
|
var prmTree treepool.InitParameters
|
||||||
|
|
||||||
password := wallet.GetPassword(a.cfg, cfgWalletPassphrase)
|
password := wallet.GetPassword(a.config(), cfgWalletPassphrase)
|
||||||
key, err := wallet.GetKeyFromPath(a.cfg.GetString(cfgWalletPath), a.cfg.GetString(cfgWalletAddress), password)
|
key, err := wallet.GetKeyFromPath(a.config().GetString(cfgWalletPath), a.config().GetString(cfgWalletAddress), password)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -770,36 +774,36 @@ func (a *App) initPools(ctx context.Context) {
|
||||||
prmTree.SetKey(key)
|
prmTree.SetKey(key)
|
||||||
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
||||||
|
|
||||||
for _, peer := range fetchPeers(a.log, a.cfg) {
|
for _, peer := range fetchPeers(a.log, a.config()) {
|
||||||
prm.AddNode(peer)
|
prm.AddNode(peer)
|
||||||
prmTree.AddNode(peer)
|
prmTree.AddNode(peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
connTimeout := fetchConnectTimeout(a.cfg)
|
connTimeout := fetchConnectTimeout(a.config())
|
||||||
prm.SetNodeDialTimeout(connTimeout)
|
prm.SetNodeDialTimeout(connTimeout)
|
||||||
prmTree.SetNodeDialTimeout(connTimeout)
|
prmTree.SetNodeDialTimeout(connTimeout)
|
||||||
|
|
||||||
streamTimeout := fetchStreamTimeout(a.cfg)
|
streamTimeout := fetchStreamTimeout(a.config())
|
||||||
prm.SetNodeStreamTimeout(streamTimeout)
|
prm.SetNodeStreamTimeout(streamTimeout)
|
||||||
prmTree.SetNodeStreamTimeout(streamTimeout)
|
prmTree.SetNodeStreamTimeout(streamTimeout)
|
||||||
|
|
||||||
healthCheckTimeout := fetchHealthCheckTimeout(a.cfg)
|
healthCheckTimeout := fetchHealthCheckTimeout(a.config())
|
||||||
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
||||||
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
||||||
|
|
||||||
rebalanceInterval := fetchRebalanceInterval(a.cfg)
|
rebalanceInterval := fetchRebalanceInterval(a.config())
|
||||||
prm.SetClientRebalanceInterval(rebalanceInterval)
|
prm.SetClientRebalanceInterval(rebalanceInterval)
|
||||||
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
||||||
|
|
||||||
errorThreshold := fetchErrorThreshold(a.cfg)
|
errorThreshold := fetchErrorThreshold(a.config())
|
||||||
prm.SetErrorThreshold(errorThreshold)
|
prm.SetErrorThreshold(errorThreshold)
|
||||||
|
|
||||||
prm.SetGracefulCloseOnSwitchTimeout(fetchSetGracefulCloseOnSwitchTimeout(a.cfg))
|
prm.SetGracefulCloseOnSwitchTimeout(fetchSetGracefulCloseOnSwitchTimeout(a.config()))
|
||||||
|
|
||||||
prm.SetLogger(a.log)
|
prm.SetLogger(a.log)
|
||||||
prmTree.SetLogger(a.log)
|
prmTree.SetLogger(a.log)
|
||||||
|
|
||||||
prmTree.SetMaxRequestAttempts(a.cfg.GetInt(cfgTreePoolMaxAttempts))
|
prmTree.SetMaxRequestAttempts(a.config().GetInt(cfgTreePoolMaxAttempts))
|
||||||
|
|
||||||
interceptors := []grpc.DialOption{
|
interceptors := []grpc.DialOption{
|
||||||
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
||||||
|
@ -818,7 +822,7 @@ func (a *App) initPools(ctx context.Context) {
|
||||||
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if a.cfg.GetBool(cfgTreePoolNetmapSupport) {
|
if a.config().GetBool(cfgTreePoolNetmapSupport) {
|
||||||
prmTree.SetNetMapInfoSource(frostfs.NewSource(frostfs.NewFrostFS(p, key), a.cache))
|
prmTree.SetNetMapInfoSource(frostfs.NewSource(frostfs.NewFrostFS(p, key), a.cache))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -895,10 +899,10 @@ func (a *App) Serve(ctx context.Context) {
|
||||||
srv := new(http.Server)
|
srv := new(http.Server)
|
||||||
srv.Handler = chiRouter
|
srv.Handler = chiRouter
|
||||||
srv.ErrorLog = zap.NewStdLog(a.log)
|
srv.ErrorLog = zap.NewStdLog(a.log)
|
||||||
srv.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout)
|
srv.ReadTimeout = a.config().GetDuration(cfgWebReadTimeout)
|
||||||
srv.ReadHeaderTimeout = a.cfg.GetDuration(cfgWebReadHeaderTimeout)
|
srv.ReadHeaderTimeout = a.config().GetDuration(cfgWebReadHeaderTimeout)
|
||||||
srv.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout)
|
srv.WriteTimeout = a.config().GetDuration(cfgWebWriteTimeout)
|
||||||
srv.IdleTimeout = a.cfg.GetDuration(cfgWebIdleTimeout)
|
srv.IdleTimeout = a.config().GetDuration(cfgWebIdleTimeout)
|
||||||
|
|
||||||
a.startServices()
|
a.startServices()
|
||||||
|
|
||||||
|
@ -951,11 +955,11 @@ func shutdownContext() (context.Context, context.CancelFunc) {
|
||||||
func (a *App) configReload(ctx context.Context) {
|
func (a *App) configReload(ctx context.Context) {
|
||||||
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
||||||
|
|
||||||
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
if !a.config().IsSet(cmdConfig) && !a.config().IsSet(cmdConfigDir) {
|
||||||
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := readInConfig(a.cfg); err != nil {
|
if err := a.cfg.reload(); err != nil {
|
||||||
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -975,7 +979,7 @@ func (a *App) configReload(ctx context.Context) {
|
||||||
|
|
||||||
a.updateSettings()
|
a.updateSettings()
|
||||||
|
|
||||||
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
a.metrics.SetEnabled(a.config().GetBool(cfgPrometheusEnabled))
|
||||||
a.initTracing(ctx)
|
a.initTracing(ctx)
|
||||||
a.setHealthStatus()
|
a.setHealthStatus()
|
||||||
|
|
||||||
|
@ -983,33 +987,33 @@ func (a *App) configReload(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) updateSettings() {
|
func (a *App) updateSettings() {
|
||||||
if lvl, err := getLogLevel(a.cfg); err != nil {
|
if lvl, err := getLogLevel(a.config()); err != nil {
|
||||||
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
||||||
} else {
|
} else {
|
||||||
a.settings.logLevel.SetLevel(lvl)
|
a.settings.logLevel.SetLevel(lvl)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
|
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.config(), a.log)); err != nil {
|
||||||
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
a.settings.update(a.cfg, a.log)
|
a.settings.update(a.config(), a.log)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) startServices() {
|
func (a *App) startServices() {
|
||||||
a.services = a.services[:0]
|
a.services = a.services[:0]
|
||||||
|
|
||||||
pprofService := NewPprofService(a.cfg, a.log)
|
pprofService := NewPprofService(a.config(), a.log)
|
||||||
a.services = append(a.services, pprofService)
|
a.services = append(a.services, pprofService)
|
||||||
go pprofService.Start()
|
go pprofService.Start()
|
||||||
|
|
||||||
prometheusService := NewPrometheusService(a.cfg, a.log, a.metrics.Handler())
|
prometheusService := NewPrometheusService(a.config(), a.log, a.metrics.Handler())
|
||||||
a.services = append(a.services, prometheusService)
|
a.services = append(a.services, prometheusService)
|
||||||
go prometheusService.Start()
|
go prometheusService.Start()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) initServers(ctx context.Context) {
|
func (a *App) initServers(ctx context.Context) {
|
||||||
serversInfo := fetchServers(a.cfg, a.log)
|
serversInfo := fetchServers(a.config(), a.log)
|
||||||
|
|
||||||
a.servers = make([]Server, 0, len(serversInfo))
|
a.servers = make([]Server, 0, len(serversInfo))
|
||||||
for _, serverInfo := range serversInfo {
|
for _, serverInfo := range serversInfo {
|
||||||
|
@ -1036,7 +1040,7 @@ func (a *App) initServers(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) updateServers() error {
|
func (a *App) updateServers() error {
|
||||||
serversInfo := fetchServers(a.cfg, a.log)
|
serversInfo := fetchServers(a.config(), a.log)
|
||||||
|
|
||||||
a.mu.Lock()
|
a.mu.Lock()
|
||||||
defer a.mu.Unlock()
|
defer a.mu.Unlock()
|
||||||
|
@ -1171,7 +1175,7 @@ func (a *App) setRuntimeParameters() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
softMemoryLimit := fetchSoftMemoryLimit(a.cfg)
|
softMemoryLimit := fetchSoftMemoryLimit(a.config())
|
||||||
previous := debug.SetMemoryLimit(softMemoryLimit)
|
previous := debug.SetMemoryLimit(softMemoryLimit)
|
||||||
if softMemoryLimit != previous {
|
if softMemoryLimit != previous {
|
||||||
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
||||||
|
@ -1247,7 +1251,7 @@ func (a *App) fetchContainerInfo(ctx context.Context, cfgKey string) (info *data
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) resolveContainerID(ctx context.Context, cfgKey string) (cid.ID, error) {
|
func (a *App) resolveContainerID(ctx context.Context, cfgKey string) (cid.ID, error) {
|
||||||
containerString := a.cfg.GetString(cfgKey)
|
containerString := a.config().GetString(cfgKey)
|
||||||
|
|
||||||
var id cid.ID
|
var id cid.ID
|
||||||
if err := id.DecodeString(containerString); err != nil {
|
if err := id.DecodeString(containerString); err != nil {
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
|
@ -301,6 +302,49 @@ var ignore = map[string]struct{}{
|
||||||
cmdVersion: {},
|
cmdVersion: {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type appCfg struct {
|
||||||
|
flags *pflag.FlagSet
|
||||||
|
|
||||||
|
mu sync.RWMutex
|
||||||
|
settings *viper.Viper
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appCfg) reload() error {
|
||||||
|
old := a.config()
|
||||||
|
|
||||||
|
v, err := newViper(a.flags)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if old.IsSet(cmdConfig) {
|
||||||
|
v.Set(cmdConfig, old.Get(cmdConfig))
|
||||||
|
}
|
||||||
|
if old.IsSet(cmdConfigDir) {
|
||||||
|
v.Set(cmdConfigDir, old.Get(cmdConfigDir))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = readInConfig(v); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
a.setConfig(v)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appCfg) config() *viper.Viper {
|
||||||
|
a.mu.RLock()
|
||||||
|
defer a.mu.RUnlock()
|
||||||
|
|
||||||
|
return a.settings
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *appCfg) setConfig(v *viper.Viper) {
|
||||||
|
a.mu.Lock()
|
||||||
|
a.settings = v
|
||||||
|
a.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
|
func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
|
||||||
connTimeout := cfg.GetDuration(cfgConnectTimeout)
|
connTimeout := cfg.GetDuration(cfgConnectTimeout)
|
||||||
if connTimeout <= 0 {
|
if connTimeout <= 0 {
|
||||||
|
@ -878,7 +922,7 @@ func fetchTombstoneWorkerPoolSize(v *viper.Viper) int {
|
||||||
return tombstoneWorkerPoolSize
|
return tombstoneWorkerPoolSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSettings() *viper.Viper {
|
func newViper(flags *pflag.FlagSet) (*viper.Viper, error) {
|
||||||
v := viper.New()
|
v := viper.New()
|
||||||
|
|
||||||
v.AutomaticEnv()
|
v.AutomaticEnv()
|
||||||
|
@ -887,6 +931,16 @@ func newSettings() *viper.Viper {
|
||||||
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||||
v.AllowEmptyEnv(true)
|
v.AllowEmptyEnv(true)
|
||||||
|
|
||||||
|
setDefaults(v, flags)
|
||||||
|
|
||||||
|
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
|
||||||
|
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
return v, bindFlags(v, flags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSettings() *appCfg {
|
||||||
// flags setup:
|
// flags setup:
|
||||||
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
|
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
|
||||||
flags.SetOutput(os.Stdout)
|
flags.SetOutput(os.Stdout)
|
||||||
|
@ -914,15 +968,71 @@ func newSettings() *viper.Viper {
|
||||||
flags.String(cfgTLSCertFile, "", "TLS certificate file to use")
|
flags.String(cfgTLSCertFile, "", "TLS certificate file to use")
|
||||||
flags.String(cfgTLSKeyFile, "", "TLS key file to use")
|
flags.String(cfgTLSKeyFile, "", "TLS key file to use")
|
||||||
|
|
||||||
peers := flags.StringArrayP(cfgPeers, "p", nil, "set FrostFS nodes")
|
flags.StringArrayP(cfgPeers, "p", nil, "set FrostFS nodes")
|
||||||
|
|
||||||
flags.StringP(cfgRPCEndpoint, "r", "", "set RPC endpoint")
|
flags.StringP(cfgRPCEndpoint, "r", "", "set RPC endpoint")
|
||||||
resolveMethods := flags.StringSlice(cfgResolveOrder, []string{resolver.DNSResolver}, "set bucket name resolve order")
|
flags.StringSlice(cfgResolveOrder, []string{resolver.DNSResolver}, "set bucket name resolve order")
|
||||||
|
|
||||||
domains := flags.StringSliceP(cfgListenDomains, "d", nil, "set domains to be listened")
|
flags.StringSliceP(cfgListenDomains, "d", nil, "set domains to be listened")
|
||||||
|
|
||||||
// set defaults:
|
if err := flags.Parse(os.Args); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
v, err := newViper(flags)
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("bind flags: %w", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case help != nil && *help:
|
||||||
|
fmt.Printf("FrostFS S3 gateway %s\n", version.Version)
|
||||||
|
flags.PrintDefaults()
|
||||||
|
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Println("Default environments:")
|
||||||
|
fmt.Println()
|
||||||
|
keys := v.AllKeys()
|
||||||
|
sort.Strings(keys)
|
||||||
|
|
||||||
|
for i := range keys {
|
||||||
|
if _, ok := ignore[keys[i]]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultValue := v.GetString(keys[i])
|
||||||
|
if len(defaultValue) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
k := strings.Replace(keys[i], ".", "_", -1)
|
||||||
|
fmt.Printf("%s_%s = %s\n", envPrefix, strings.ToUpper(k), defaultValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println()
|
||||||
|
fmt.Println("Peers preset:")
|
||||||
|
fmt.Println()
|
||||||
|
|
||||||
|
fmt.Printf("%s_%s_[N]_ADDRESS = string\n", envPrefix, strings.ToUpper(cfgPeers))
|
||||||
|
fmt.Printf("%s_%s_[N]_WEIGHT = 0..1 (float)\n", envPrefix, strings.ToUpper(cfgPeers))
|
||||||
|
|
||||||
|
os.Exit(0)
|
||||||
|
case versionFlag != nil && *versionFlag:
|
||||||
|
fmt.Printf("FrostFS S3 Gateway\nVersion: %s\nGoVersion: %s\n", version.Version, runtime.Version())
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = readInConfig(v); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &appCfg{
|
||||||
|
flags: flags,
|
||||||
|
settings: v,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func setDefaults(v *viper.Viper, flags *pflag.FlagSet) {
|
||||||
v.SetDefault(cfgAccessBoxCacheRemovingCheckInterval, defaultAccessBoxCacheRemovingCheckInterval)
|
v.SetDefault(cfgAccessBoxCacheRemovingCheckInterval, defaultAccessBoxCacheRemovingCheckInterval)
|
||||||
|
|
||||||
// logger:
|
// logger:
|
||||||
|
@ -986,78 +1096,21 @@ func newSettings() *viper.Viper {
|
||||||
// multinet
|
// multinet
|
||||||
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
||||||
|
|
||||||
// Bind flags
|
if resolveMethods, err := flags.GetStringSlice(cfgResolveOrder); err == nil {
|
||||||
if err := bindFlags(v, flags); err != nil {
|
v.SetDefault(cfgResolveOrder, resolveMethods)
|
||||||
panic(fmt.Errorf("bind flags: %w", err))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := flags.Parse(os.Args); err != nil {
|
if peers, err := flags.GetStringArray(cfgPeers); err == nil {
|
||||||
panic(err)
|
for i := range peers {
|
||||||
}
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", peers[i])
|
||||||
|
|
||||||
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
|
|
||||||
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
if resolveMethods != nil {
|
|
||||||
v.SetDefault(cfgResolveOrder, *resolveMethods)
|
|
||||||
}
|
|
||||||
|
|
||||||
if peers != nil && len(*peers) > 0 {
|
|
||||||
for i := range *peers {
|
|
||||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", (*peers)[i])
|
|
||||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
|
||||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if domains != nil && len(*domains) > 0 {
|
if domains, err := flags.GetStringSlice(cfgListenDomains); err == nil && len(domains) > 0 {
|
||||||
v.SetDefault(cfgListenDomains, *domains)
|
v.SetDefault(cfgListenDomains, domains)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
|
||||||
case help != nil && *help:
|
|
||||||
fmt.Printf("FrostFS S3 gateway %s\n", version.Version)
|
|
||||||
flags.PrintDefaults()
|
|
||||||
|
|
||||||
fmt.Println()
|
|
||||||
fmt.Println("Default environments:")
|
|
||||||
fmt.Println()
|
|
||||||
keys := v.AllKeys()
|
|
||||||
sort.Strings(keys)
|
|
||||||
|
|
||||||
for i := range keys {
|
|
||||||
if _, ok := ignore[keys[i]]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
defaultValue := v.GetString(keys[i])
|
|
||||||
if len(defaultValue) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
k := strings.Replace(keys[i], ".", "_", -1)
|
|
||||||
fmt.Printf("%s_%s = %s\n", envPrefix, strings.ToUpper(k), defaultValue)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println()
|
|
||||||
fmt.Println("Peers preset:")
|
|
||||||
fmt.Println()
|
|
||||||
|
|
||||||
fmt.Printf("%s_%s_[N]_ADDRESS = string\n", envPrefix, strings.ToUpper(cfgPeers))
|
|
||||||
fmt.Printf("%s_%s_[N]_WEIGHT = 0..1 (float)\n", envPrefix, strings.ToUpper(cfgPeers))
|
|
||||||
|
|
||||||
os.Exit(0)
|
|
||||||
case versionFlag != nil && *versionFlag:
|
|
||||||
fmt.Printf("FrostFS S3 Gateway\nVersion: %s\nGoVersion: %s\n", version.Version, runtime.Version())
|
|
||||||
os.Exit(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := readInConfig(v); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return v
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func bindFlags(v *viper.Viper, flags *pflag.FlagSet) error {
|
func bindFlags(v *viper.Viper, flags *pflag.FlagSet) error {
|
||||||
|
|
50
cmd/s3-gw/app_settings_test.go
Normal file
50
cmd/s3-gw/app_settings_test.go
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConfigReload(t *testing.T) {
|
||||||
|
f, err := os.CreateTemp("", "conf")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, os.Remove(f.Name()))
|
||||||
|
}()
|
||||||
|
|
||||||
|
confData := `
|
||||||
|
pprof:
|
||||||
|
enabled: true
|
||||||
|
|
||||||
|
frostfsid:
|
||||||
|
contract: name.nns
|
||||||
|
|
||||||
|
resolve_order:
|
||||||
|
- nns
|
||||||
|
`
|
||||||
|
|
||||||
|
_, err = f.WriteString(confData)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, f.Close())
|
||||||
|
|
||||||
|
cfg := newSettings()
|
||||||
|
|
||||||
|
require.NoError(t, cfg.flags.Parse([]string{"--config", f.Name(), "--max_clients_count", "10"}))
|
||||||
|
require.NoError(t, cfg.reload())
|
||||||
|
|
||||||
|
require.True(t, cfg.config().GetBool(cfgPProfEnabled))
|
||||||
|
require.Equal(t, "name.nns", cfg.config().GetString(cfgFrostfsIDContract))
|
||||||
|
require.Equal(t, []string{resolver.NNSResolver}, cfg.config().GetStringSlice(cfgResolveOrder))
|
||||||
|
require.Equal(t, 10, cfg.config().GetInt(cfgMaxClientsCount))
|
||||||
|
|
||||||
|
require.NoError(t, os.Truncate(f.Name(), 0))
|
||||||
|
require.NoError(t, cfg.reload())
|
||||||
|
|
||||||
|
require.False(t, cfg.config().GetBool(cfgPProfEnabled))
|
||||||
|
require.Equal(t, "frostfsid.frostfs", cfg.config().GetString(cfgFrostfsIDContract))
|
||||||
|
require.Equal(t, []string{resolver.DNSResolver}, cfg.config().GetStringSlice(cfgResolveOrder))
|
||||||
|
require.Equal(t, 10, cfg.config().GetInt(cfgMaxClientsCount))
|
||||||
|
}
|
|
@ -8,9 +8,9 @@ import (
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
g, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
g, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
v := newSettings()
|
cfg := newSettings()
|
||||||
|
|
||||||
a := newApp(g, v)
|
a := newApp(g, cfg)
|
||||||
|
|
||||||
go a.Serve(g)
|
go a.Serve(g)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue