[#117] Refactor fetch/parse config parameters functions #189
2 changed files with 197 additions and 126 deletions
157
cmd/s3-gw/app.go
157
cmd/s3-gw/app.go
|
@ -8,7 +8,6 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
|
@ -189,15 +188,6 @@ func (s *appSettings) setBypassContentEncodingInChunks(bypass bool) {
|
|||
s.bypassContentEncodingInChunks.Store(bypass)
|
||||
}
|
||||
|
||||
func getDefaultPolicyValue(v *viper.Viper) string {
|
||||
defaultPolicyStr := handler.DefaultPolicy
|
||||
if v.IsSet(cfgPolicyDefault) {
|
||||
defaultPolicyStr = v.GetString(cfgPolicyDefault)
|
||||
}
|
||||
|
||||
return defaultPolicyStr
|
||||
}
|
||||
|
||||
func (a *App) initAPI(ctx context.Context) {
|
||||
a.initLayer(ctx)
|
||||
a.initHandler()
|
||||
|
@ -270,15 +260,9 @@ func (a *App) shutdownTracing() {
|
|||
func newMaxClients(cfg *viper.Viper) maxClientsConfig {
|
||||
config := maxClientsConfig{}
|
||||
|
||||
config.count = cfg.GetInt(cfgMaxClientsCount)
|
||||
if config.count <= 0 {
|
||||
config.count = defaultMaxClientsCount
|
||||
}
|
||||
config.count = fetchMaxClientsCount(cfg)
|
||||
|
||||
config.deadline = cfg.GetDuration(cfgMaxClientsDeadline)
|
||||
if config.deadline <= 0 {
|
||||
config.deadline = defaultMaxClientsDeadline
|
||||
}
|
||||
config.deadline = fetchMaxClientsDeadline(cfg)
|
||||
|
||||
return config
|
||||
}
|
||||
|
@ -302,38 +286,23 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
|
|||
prmTree.AddNode(peer)
|
||||
}
|
||||
|
||||
connTimeout := cfg.GetDuration(cfgConnectTimeout)
|
||||
if connTimeout <= 0 {
|
||||
connTimeout = defaultConnectTimeout
|
||||
}
|
||||
connTimeout := fetchConnectTimeout(cfg)
|
||||
prm.SetNodeDialTimeout(connTimeout)
|
||||
prmTree.SetNodeDialTimeout(connTimeout)
|
||||
|
||||
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
|
||||
if streamTimeout <= 0 {
|
||||
streamTimeout = defaultStreamTimeout
|
||||
}
|
||||
streamTimeout := fetchStreamTimeout(cfg)
|
||||
prm.SetNodeStreamTimeout(streamTimeout)
|
||||
prmTree.SetNodeStreamTimeout(streamTimeout)
|
||||
|
||||
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout)
|
||||
if healthCheckTimeout <= 0 {
|
||||
healthCheckTimeout = defaultHealthcheckTimeout
|
||||
}
|
||||
healthCheckTimeout := fetchHealthCheckTimeout(cfg)
|
||||
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
||||
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
||||
|
||||
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval)
|
||||
if rebalanceInterval <= 0 {
|
||||
rebalanceInterval = defaultRebalanceInterval
|
||||
}
|
||||
rebalanceInterval := fetchRebalanceInterval(cfg)
|
||||
prm.SetClientRebalanceInterval(rebalanceInterval)
|
||||
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
||||
|
||||
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
||||
if errorThreshold <= 0 {
|
||||
errorThreshold = defaultPoolErrorThreshold
|
||||
}
|
||||
errorThreshold := fetchErrorThreshold(cfg)
|
||||
prm.SetErrorThreshold(errorThreshold)
|
||||
prm.SetLogger(logger)
|
||||
prmTree.SetLogger(logger)
|
||||
|
@ -380,7 +349,7 @@ func newPlacementPolicy(l *zap.Logger, v *viper.Viper) (*placementPolicy, error)
|
|||
policies.updateCopiesNumbers(l, v)
|
||||
policies.updateDefaultCopiesNumbers(l, v)
|
||||
|
||||
return policies, policies.updatePolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile))
|
||||
return policies, policies.updatePolicy(v)
|
||||
}
|
||||
|
||||
func (p *placementPolicy) DefaultPlacementPolicy() netmap.PlacementPolicy {
|
||||
|
@ -412,7 +381,7 @@ func (p *placementPolicy) DefaultCopiesNumbers() []uint32 {
|
|||
}
|
||||
|
||||
func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) {
|
||||
if err := p.updatePolicy(getDefaultPolicyValue(v), v.GetString(cfgPolicyRegionMapFile)); err != nil {
|
||||
if err := p.updatePolicy(v); err != nil {
|
||||
l.Warn("policies won't be updated", zap.Error(err))
|
||||
}
|
||||
|
||||
|
@ -420,31 +389,15 @@ func (p *placementPolicy) update(l *zap.Logger, v *viper.Viper) {
|
|||
p.updateDefaultCopiesNumbers(l, v)
|
||||
}
|
||||
|
||||
func (p *placementPolicy) updatePolicy(defaultPolicy string, regionPolicyFilepath string) error {
|
||||
var defaultPlacementPolicy netmap.PlacementPolicy
|
||||
if err := defaultPlacementPolicy.DecodeString(defaultPolicy); err != nil {
|
||||
return fmt.Errorf("parse default policy '%s': %w", defaultPolicy, err)
|
||||
}
|
||||
|
||||
regionPolicyMap, err := readRegionMap(regionPolicyFilepath)
|
||||
func (p *placementPolicy) updatePolicy(v *viper.Viper) error {
|
||||
defaultPlacementPolicy, err := fetchDefaultPolicy(v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read region map file: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
|
||||
for region, policy := range regionPolicyMap {
|
||||
var pp netmap.PlacementPolicy
|
||||
if err = pp.DecodeString(policy); err == nil {
|
||||
regionMap[region] = pp
|
||||
continue
|
||||
}
|
||||
|
||||
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
|
||||
regionMap[region] = pp
|
||||
continue
|
||||
}
|
||||
|
||||
return fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
|
||||
regionMap, err := fetchRegionMappingPolicies(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
|
@ -699,14 +652,7 @@ func (a *App) stopServices() {
|
|||
func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Options {
|
||||
cfg := notifications.Options{}
|
||||
cfg.URL = v.GetString(cfgNATSEndpoint)
|
||||
cfg.Timeout = v.GetDuration(cfgNATSTimeout)
|
||||
if cfg.Timeout <= 0 {
|
||||
l.Error("invalid lifetime, using default value (in seconds)",
|
||||
zap.String("parameter", cfgNATSTimeout),
|
||||
zap.Duration("value in config", cfg.Timeout),
|
||||
zap.Duration("default", notifications.DefaultTimeout))
|
||||
cfg.Timeout = notifications.DefaultTimeout
|
||||
}
|
||||
cfg.Timeout = fetchNATSTimeout(v, l)
|
||||
cfg.TLSCertFilepath = v.GetString(cfgNATSTLSCertFile)
|
||||
cfg.TLSAuthPrivateKeyFilePath = v.GetString(cfgNATSAuthPrivateKeyFile)
|
||||
cfg.RootCAFiles = v.GetStringSlice(cfgNATSRootCAFiles)
|
||||
|
@ -717,62 +663,32 @@ func getNotificationsOptions(v *viper.Viper, l *zap.Logger) *notifications.Optio
|
|||
func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
|
||||
cacheCfg := layer.DefaultCachesConfigs(l)
|
||||
|
||||
cacheCfg.Objects.Lifetime = getLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime)
|
||||
cacheCfg.Objects.Size = getSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size)
|
||||
cacheCfg.Objects.Lifetime = fetchCacheLifetime(v, l, cfgObjectsCacheLifetime, cacheCfg.Objects.Lifetime)
|
||||
cacheCfg.Objects.Size = fetchCacheSize(v, l, cfgObjectsCacheSize, cacheCfg.Objects.Size)
|
||||
|
||||
cacheCfg.ObjectsList.Lifetime = getLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
|
||||
cacheCfg.ObjectsList.Size = getSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
|
||||
cacheCfg.ObjectsList.Lifetime = fetchCacheLifetime(v, l, cfgListObjectsCacheLifetime, cacheCfg.ObjectsList.Lifetime)
|
||||
cacheCfg.ObjectsList.Size = fetchCacheSize(v, l, cfgListObjectsCacheSize, cacheCfg.ObjectsList.Size)
|
||||
|
||||
cacheCfg.Buckets.Lifetime = getLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
|
||||
cacheCfg.Buckets.Size = getSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
|
||||
cacheCfg.Buckets.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Buckets.Lifetime)
|
||||
cacheCfg.Buckets.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Buckets.Size)
|
||||
|
||||
cacheCfg.Names.Lifetime = getLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime)
|
||||
cacheCfg.Names.Size = getSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size)
|
||||
cacheCfg.Names.Lifetime = fetchCacheLifetime(v, l, cfgNamesCacheLifetime, cacheCfg.Names.Lifetime)
|
||||
cacheCfg.Names.Size = fetchCacheSize(v, l, cfgNamesCacheSize, cacheCfg.Names.Size)
|
||||
|
||||
cacheCfg.System.Lifetime = getLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime)
|
||||
cacheCfg.System.Size = getSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size)
|
||||
cacheCfg.System.Lifetime = fetchCacheLifetime(v, l, cfgSystemCacheLifetime, cacheCfg.System.Lifetime)
|
||||
cacheCfg.System.Size = fetchCacheSize(v, l, cfgSystemCacheSize, cacheCfg.System.Size)
|
||||
|
||||
cacheCfg.AccessControl.Lifetime = getLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
|
||||
cacheCfg.AccessControl.Size = getSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
|
||||
cacheCfg.AccessControl.Lifetime = fetchCacheLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
|
||||
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
|
||||
|
||||
return cacheCfg
|
||||
}
|
||||
|
||||
func getLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
||||
if v.IsSet(cfgEntry) {
|
||||
lifetime := v.GetDuration(cfgEntry)
|
||||
if lifetime <= 0 {
|
||||
l.Error("invalid lifetime, using default value (in seconds)",
|
||||
zap.String("parameter", cfgEntry),
|
||||
zap.Duration("value in config", lifetime),
|
||||
zap.Duration("default", defaultValue))
|
||||
} else {
|
||||
return lifetime
|
||||
}
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func getSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
||||
if v.IsSet(cfgEntry) {
|
||||
size := v.GetInt(cfgEntry)
|
||||
if size <= 0 {
|
||||
l.Error("invalid cache size, using default value",
|
||||
zap.String("parameter", cfgEntry),
|
||||
zap.Int("value in config", size),
|
||||
zap.Int("default", defaultValue))
|
||||
} else {
|
||||
return size
|
||||
}
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
||||
cacheCfg := cache.DefaultAccessBoxConfig(l)
|
||||
|
||||
cacheCfg.Lifetime = getLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime)
|
||||
cacheCfg.Size = getSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size)
|
||||
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgAccessBoxCacheLifetime, cacheCfg.Lifetime)
|
||||
cacheCfg.Size = fetchCacheSize(v, l, cfgAccessBoxCacheSize, cacheCfg.Size)
|
||||
|
||||
return cacheCfg
|
||||
}
|
||||
|
@ -780,22 +696,11 @@ func getAccessBoxCacheConfig(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|||
func (a *App) initHandler() {
|
||||
cfg := &handler.Config{
|
||||
Policy: a.settings.policies,
|
||||
DefaultMaxAge: handler.DefaultMaxAge,
|
||||
DefaultMaxAge: fetchDefaultMaxAge(a.cfg, a.log),
|
||||
NotificatorEnabled: a.cfg.GetBool(cfgEnableNATS),
|
||||
XMLDecoder: a.settings.xmlDecoder,
|
||||
}
|
||||
|
||||
if a.cfg.IsSet(cfgDefaultMaxAge) {
|
||||
defaultMaxAge := a.cfg.GetInt(cfgDefaultMaxAge)
|
||||
|
||||
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
|
||||
a.log.Fatal("invalid defaultMaxAge",
|
||||
zap.String("parameter", cfgDefaultMaxAge),
|
||||
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
|
||||
}
|
||||
cfg.DefaultMaxAge = defaultMaxAge
|
||||
}
|
||||
|
||||
cfg.ResolveZoneList = a.cfg.GetStringSlice(cfgResolveBucketAllow)
|
||||
cfg.IsResolveListAllow = len(cfg.ResolveZoneList) > 0
|
||||
if !cfg.IsResolveListAllow {
|
||||
|
|
|
@ -10,8 +10,11 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/notifications"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
|
@ -156,6 +159,169 @@ var ignore = map[string]struct{}{
|
|||
cmdVersion: {},
|
||||
}
|
||||
|
||||
func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
|
||||
connTimeout := cfg.GetDuration(cfgConnectTimeout)
|
||||
if connTimeout <= 0 {
|
||||
connTimeout = defaultConnectTimeout
|
||||
}
|
||||
|
||||
return connTimeout
|
||||
}
|
||||
|
||||
func fetchStreamTimeout(cfg *viper.Viper) time.Duration {
|
||||
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
|
||||
if streamTimeout <= 0 {
|
||||
streamTimeout = defaultStreamTimeout
|
||||
}
|
||||
|
||||
return streamTimeout
|
||||
}
|
||||
|
||||
func fetchHealthCheckTimeout(cfg *viper.Viper) time.Duration {
|
||||
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout)
|
||||
if healthCheckTimeout <= 0 {
|
||||
healthCheckTimeout = defaultHealthcheckTimeout
|
||||
}
|
||||
|
||||
return healthCheckTimeout
|
||||
}
|
||||
|
||||
func fetchRebalanceInterval(cfg *viper.Viper) time.Duration {
|
||||
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval)
|
||||
if rebalanceInterval <= 0 {
|
||||
rebalanceInterval = defaultRebalanceInterval
|
||||
}
|
||||
|
||||
return rebalanceInterval
|
||||
}
|
||||
|
||||
func fetchErrorThreshold(cfg *viper.Viper) uint32 {
|
||||
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
||||
if errorThreshold <= 0 {
|
||||
errorThreshold = defaultPoolErrorThreshold
|
||||
}
|
||||
|
||||
return errorThreshold
|
||||
}
|
||||
|
||||
func fetchMaxClientsCount(cfg *viper.Viper) int {
|
||||
maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
|
||||
if maxClientsCount <= 0 {
|
||||
maxClientsCount = defaultMaxClientsCount
|
||||
}
|
||||
|
||||
return maxClientsCount
|
||||
}
|
||||
|
||||
func fetchMaxClientsDeadline(cfg *viper.Viper) time.Duration {
|
||||
maxClientsDeadline := cfg.GetDuration(cfgMaxClientsDeadline)
|
||||
if maxClientsDeadline <= 0 {
|
||||
maxClientsDeadline = defaultMaxClientsDeadline
|
||||
}
|
||||
|
||||
return maxClientsDeadline
|
||||
}
|
||||
|
||||
func fetchDefaultPolicy(cfg *viper.Viper) (netmap.PlacementPolicy, error) {
|
||||
defaultPolicyStr := handler.DefaultPolicy
|
||||
if cfg.IsSet(cfgPolicyDefault) {
|
||||
defaultPolicyStr = cfg.GetString(cfgPolicyDefault)
|
||||
}
|
||||
|
||||
var defaultPlacementPolicy netmap.PlacementPolicy
|
||||
if err := defaultPlacementPolicy.DecodeString(defaultPolicyStr); err != nil {
|
||||
return netmap.PlacementPolicy{}, fmt.Errorf("parse default policy '%s': %w", defaultPolicyStr, err)
|
||||
}
|
||||
|
||||
return defaultPlacementPolicy, nil
|
||||
}
|
||||
|
||||
func fetchNATSTimeout(cfg *viper.Viper, l *zap.Logger) time.Duration {
|
||||
timeout := cfg.GetDuration(cfgNATSTimeout)
|
||||
if timeout <= 0 {
|
||||
l.Error("invalid lifetime, using default value (in seconds)",
|
||||
zap.String("parameter", cfgNATSTimeout),
|
||||
zap.Duration("value in config", timeout),
|
||||
zap.Duration("default", notifications.DefaultTimeout))
|
||||
timeout = notifications.DefaultTimeout
|
||||
}
|
||||
|
||||
return timeout
|
||||
}
|
||||
|
||||
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
||||
if v.IsSet(cfgEntry) {
|
||||
lifetime := v.GetDuration(cfgEntry)
|
||||
if lifetime <= 0 {
|
||||
l.Error("invalid lifetime, using default value (in seconds)",
|
||||
zap.String("parameter", cfgEntry),
|
||||
zap.Duration("value in config", lifetime),
|
||||
zap.Duration("default", defaultValue))
|
||||
} else {
|
||||
return lifetime
|
||||
}
|
||||
}
|
||||
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
||||
if v.IsSet(cfgEntry) {
|
||||
size := v.GetInt(cfgEntry)
|
||||
if size <= 0 {
|
||||
l.Error("invalid cache size, using default value",
|
||||
zap.String("parameter", cfgEntry),
|
||||
zap.Int("value in config", size),
|
||||
zap.Int("default", defaultValue))
|
||||
} else {
|
||||
return size
|
||||
}
|
||||
}
|
||||
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func fetchDefaultMaxAge(cfg *viper.Viper, l *zap.Logger) int {
|
||||
defaultMaxAge := handler.DefaultMaxAge
|
||||
|
||||
if cfg.IsSet(cfgDefaultMaxAge) {
|
||||
defaultMaxAge = cfg.GetInt(cfgDefaultMaxAge)
|
||||
|
||||
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
|
||||
l.Fatal("invalid defaultMaxAge",
|
||||
zap.String("parameter", cfgDefaultMaxAge),
|
||||
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
|
||||
}
|
||||
}
|
||||
|
||||
return defaultMaxAge
|
||||
}
|
||||
|
||||
func fetchRegionMappingPolicies(cfg *viper.Viper) (map[string]netmap.PlacementPolicy, error) {
|
||||
regionPolicyMap, err := readRegionMap(cfg.GetString(cfgPolicyRegionMapFile))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read region map file: %w", err)
|
||||
}
|
||||
|
||||
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
|
||||
for region, policy := range regionPolicyMap {
|
||||
var pp netmap.PlacementPolicy
|
||||
if err = pp.DecodeString(policy); err == nil {
|
||||
regionMap[region] = pp
|
||||
continue
|
||||
}
|
||||
|
||||
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
|
||||
regionMap[region] = pp
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("parse region '%s' to policy mapping: %w", region, err)
|
||||
}
|
||||
|
||||
return regionMap, nil
|
||||
}
|
||||
|
||||
func fetchDefaultCopiesNumbers(v *viper.Viper) ([]uint32, error) {
|
||||
unparsed := v.GetStringSlice(cfgSetCopiesNumber)
|
||||
var result []uint32
|
||||
|
|
Loading…
Reference in a new issue