Denis Kirillov
136b5521fe
This allows in-flight requests finish during rebalance Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
1111 lines
30 KiB
Go
1111 lines
30 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"path"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/resolver"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
"git.frostfs.info/TrueCloudLab/zapjournald"
|
|
"github.com/spf13/pflag"
|
|
"github.com/spf13/viper"
|
|
"github.com/ssgreg/journald"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
)
|
|
|
|
const (
|
|
destinationStdout = "stdout"
|
|
destinationJournald = "journald"
|
|
|
|
wildcardPlaceholder = "<wildcard>"
|
|
)
|
|
|
|
const (
|
|
defaultRebalanceInterval = 60 * time.Second
|
|
defaultHealthcheckTimeout = 15 * time.Second
|
|
defaultConnectTimeout = 10 * time.Second
|
|
defaultStreamTimeout = 10 * time.Second
|
|
defaultShutdownTimeout = 15 * time.Second
|
|
|
|
defaultGracefulCloseOnSwitchTimeout = 10 * time.Second
|
|
|
|
defaultPoolErrorThreshold uint32 = 100
|
|
defaultPlacementPolicy = "REP 3"
|
|
|
|
defaultMaxClientsCount = 100
|
|
defaultMaxClientsDeadline = time.Second * 30
|
|
|
|
defaultSoftMemoryLimit = math.MaxInt64
|
|
|
|
defaultReadHeaderTimeout = 30 * time.Second
|
|
defaultIdleTimeout = 30 * time.Second
|
|
|
|
defaultAccessBoxCacheRemovingCheckInterval = 5 * time.Minute
|
|
|
|
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
|
defaultVHSHeader = "X-Frostfs-S3-VHS"
|
|
defaultServernameHeader = "X-Frostfs-Servername"
|
|
|
|
defaultConstraintName = "default"
|
|
|
|
defaultNamespace = ""
|
|
|
|
defaultReconnectInterval = time.Minute
|
|
|
|
defaultRetryMaxAttempts = 4
|
|
defaultRetryMaxBackoff = 30 * time.Second
|
|
defaultRetryStrategy = handler.RetryStrategyExponential
|
|
)
|
|
|
|
var (
|
|
defaultCopiesNumbers = []uint32{0}
|
|
defaultDefaultNamespaces = []string{"", "root"}
|
|
)
|
|
|
|
const ( // Settings.
|
|
// Logger.
|
|
cfgLoggerLevel = "logger.level"
|
|
cfgLoggerDestination = "logger.destination"
|
|
|
|
// Wallet.
|
|
cfgWalletPath = "wallet.path"
|
|
cfgWalletAddress = "wallet.address"
|
|
cfgWalletPassphrase = "wallet.passphrase"
|
|
cmdWallet = "wallet"
|
|
cmdAddress = "address"
|
|
|
|
// Server.
|
|
cfgServer = "server"
|
|
cfgTLSEnabled = "tls.enabled"
|
|
cfgTLSKeyFile = "tls.key_file"
|
|
cfgTLSCertFile = "tls.cert_file"
|
|
|
|
// Pool config.
|
|
cfgConnectTimeout = "connect_timeout"
|
|
cfgStreamTimeout = "stream_timeout"
|
|
cfgHealthcheckTimeout = "healthcheck_timeout"
|
|
cfgRebalanceInterval = "rebalance_interval"
|
|
cfgPoolErrorThreshold = "pool_error_threshold"
|
|
|
|
// Caching.
|
|
cfgObjectsCacheLifetime = "cache.objects.lifetime"
|
|
cfgObjectsCacheSize = "cache.objects.size"
|
|
cfgListObjectsCacheLifetime = "cache.list.lifetime"
|
|
cfgListObjectsCacheSize = "cache.list.size"
|
|
cfgSessionListCacheLifetime = "cache.list_session.lifetime"
|
|
cfgSessionListCacheSize = "cache.list_session.size"
|
|
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
|
|
cfgBucketsCacheSize = "cache.buckets.size"
|
|
cfgNamesCacheLifetime = "cache.names.lifetime"
|
|
cfgNamesCacheSize = "cache.names.size"
|
|
cfgSystemCacheLifetime = "cache.system.lifetime"
|
|
cfgSystemCacheSize = "cache.system.size"
|
|
cfgAccessBoxCacheLifetime = "cache.accessbox.lifetime"
|
|
cfgAccessBoxCacheSize = "cache.accessbox.size"
|
|
cfgAccessControlCacheLifetime = "cache.accesscontrol.lifetime"
|
|
cfgAccessControlCacheSize = "cache.accesscontrol.size"
|
|
cfgMorphPolicyCacheLifetime = "cache.morph_policy.lifetime"
|
|
cfgMorphPolicyCacheSize = "cache.morph_policy.size"
|
|
cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime"
|
|
cfgFrostfsIDCacheSize = "cache.frostfsid.size"
|
|
|
|
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"
|
|
|
|
// Policy.
|
|
cfgPolicyDefault = "placement_policy.default"
|
|
cfgPolicyRegionMapFile = "placement_policy.region_mapping"
|
|
cfgCopiesNumbers = "placement_policy.copies_numbers"
|
|
|
|
// CORS.
|
|
cfgDefaultMaxAge = "cors.default_max_age"
|
|
|
|
// MaxClients.
|
|
cfgMaxClientsCount = "max_clients_count"
|
|
cfgMaxClientsDeadline = "max_clients_deadline"
|
|
|
|
// Metrics / Profiler / Web.
|
|
cfgPrometheusEnabled = "prometheus.enabled"
|
|
cfgPrometheusAddress = "prometheus.address"
|
|
cfgPProfEnabled = "pprof.enabled"
|
|
cfgPProfAddress = "pprof.address"
|
|
|
|
// Tracing.
|
|
cfgTracingEnabled = "tracing.enabled"
|
|
cfgTracingExporter = "tracing.exporter"
|
|
cfgTracingEndpoint = "tracing.endpoint"
|
|
|
|
cfgListenDomains = "listen_domains"
|
|
|
|
// VHS.
|
|
cfgVHSEnabled = "vhs.enabled"
|
|
cfgVHSHeader = "vhs.vhs_header"
|
|
cfgServernameHeader = "vhs.servername_header"
|
|
cfgVHSNamespaces = "vhs.namespaces"
|
|
|
|
// Peers.
|
|
cfgPeers = "peers"
|
|
|
|
// NeoGo.
|
|
cfgRPCEndpoint = "rpc_endpoint"
|
|
|
|
// Resolving.
|
|
cfgResolveOrder = "resolve_order"
|
|
|
|
// Application.
|
|
cfgApplicationBuildTime = "app.build_time"
|
|
|
|
// Kludge.
|
|
cfgKludgeUseDefaultXMLNS = "kludge.use_default_xmlns"
|
|
cfgKludgeBypassContentEncodingCheckInChunks = "kludge.bypass_content_encoding_check_in_chunks"
|
|
cfgKludgeDefaultNamespaces = "kludge.default_namespaces"
|
|
// Web.
|
|
cfgWebReadTimeout = "web.read_timeout"
|
|
cfgWebReadHeaderTimeout = "web.read_header_timeout"
|
|
cfgWebWriteTimeout = "web.write_timeout"
|
|
cfgWebIdleTimeout = "web.idle_timeout"
|
|
|
|
// Retry.
|
|
cfgRetryMaxAttempts = "retry.max_attempts"
|
|
cfgRetryMaxBackoff = "retry.max_backoff"
|
|
cfgRetryStrategy = "retry.strategy"
|
|
|
|
// Namespaces.
|
|
cfgNamespacesConfig = "namespaces.config"
|
|
|
|
cfgSourceIPHeader = "source_ip_header"
|
|
|
|
// Containers.
|
|
cfgContainersCORS = "containers.cors"
|
|
cfgContainersLifecycle = "containers.lifecycle"
|
|
|
|
// Command line args.
|
|
cmdHelp = "help"
|
|
cmdVersion = "version"
|
|
cmdConfig = "config"
|
|
cmdConfigDir = "config-dir"
|
|
cmdPProf = "pprof"
|
|
cmdMetrics = "metrics"
|
|
|
|
cmdListenAddress = "listen_address"
|
|
|
|
// Configuration of parameters of requests to FrostFS.
|
|
// Number of the object copies to consider PUT to FrostFS successful.
|
|
cfgSetCopiesNumber = "frostfs.set_copies_number"
|
|
// Enabling client side object preparing for PUT operations.
|
|
cfgClientCut = "frostfs.client_cut"
|
|
// Sets max buffer size for read payload in put operations.
|
|
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
|
// Sets max attempt to make successful tree request.
|
|
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
|
|
|
|
// Specifies the timeout after which unhealthy client be closed during rebalancing
|
|
// if it will become healthy back.
|
|
cfgGracefulCloseOnSwitchTimeout = "frostfs.graceful_close_on_switch_timeout"
|
|
|
|
// List of allowed AccessKeyID prefixes.
|
|
cfgAllowedAccessKeyIDPrefixes = "allowed_access_key_id_prefixes"
|
|
|
|
// Bucket resolving options.
|
|
cfgResolveNamespaceHeader = "resolve_bucket.namespace_header"
|
|
cfgResolveBucketAllow = "resolve_bucket.allow"
|
|
cfgResolveBucketDeny = "resolve_bucket.deny"
|
|
|
|
// Runtime.
|
|
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
|
|
|
// Enable return MD5 checksum in ETag.
|
|
cfgMD5Enabled = "features.md5.enabled"
|
|
cfgPolicyDenyByDefault = "features.policy.deny_by_default"
|
|
|
|
// FrostfsID.
|
|
cfgFrostfsIDContract = "frostfsid.contract"
|
|
cfgFrostfsIDValidationEnabled = "frostfsid.validation.enabled"
|
|
|
|
// Policy.
|
|
cfgPolicyContract = "policy.contract"
|
|
|
|
// Proxy.
|
|
cfgProxyContract = "proxy.contract"
|
|
|
|
// Server.
|
|
cfgReconnectInterval = "reconnect_interval"
|
|
|
|
// envPrefix is an environment variables prefix used for configuration.
|
|
envPrefix = "S3_GW"
|
|
)
|
|
|
|
var ignore = map[string]struct{}{
|
|
cfgApplicationBuildTime: {},
|
|
|
|
cfgPeers: {},
|
|
|
|
cmdHelp: {},
|
|
cmdVersion: {},
|
|
}
|
|
|
|
func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
|
|
connTimeout := cfg.GetDuration(cfgConnectTimeout)
|
|
if connTimeout <= 0 {
|
|
connTimeout = defaultConnectTimeout
|
|
}
|
|
|
|
return connTimeout
|
|
}
|
|
|
|
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
|
reconnect := cfg.GetDuration(cfgReconnectInterval)
|
|
if reconnect <= 0 {
|
|
reconnect = defaultReconnectInterval
|
|
}
|
|
|
|
return reconnect
|
|
}
|
|
|
|
func fetchStreamTimeout(cfg *viper.Viper) time.Duration {
|
|
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
|
|
if streamTimeout <= 0 {
|
|
streamTimeout = defaultStreamTimeout
|
|
}
|
|
|
|
return streamTimeout
|
|
}
|
|
|
|
func fetchHealthCheckTimeout(cfg *viper.Viper) time.Duration {
|
|
healthCheckTimeout := cfg.GetDuration(cfgHealthcheckTimeout)
|
|
if healthCheckTimeout <= 0 {
|
|
healthCheckTimeout = defaultHealthcheckTimeout
|
|
}
|
|
|
|
return healthCheckTimeout
|
|
}
|
|
|
|
func fetchRebalanceInterval(cfg *viper.Viper) time.Duration {
|
|
rebalanceInterval := cfg.GetDuration(cfgRebalanceInterval)
|
|
if rebalanceInterval <= 0 {
|
|
rebalanceInterval = defaultRebalanceInterval
|
|
}
|
|
|
|
return rebalanceInterval
|
|
}
|
|
|
|
func fetchSetGracefulCloseOnSwitchTimeout(cfg *viper.Viper) time.Duration {
|
|
val := cfg.GetDuration(cfgGracefulCloseOnSwitchTimeout)
|
|
if val <= 0 {
|
|
val = defaultGracefulCloseOnSwitchTimeout
|
|
}
|
|
|
|
return val
|
|
}
|
|
|
|
func fetchErrorThreshold(cfg *viper.Viper) uint32 {
|
|
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
|
if errorThreshold <= 0 {
|
|
errorThreshold = defaultPoolErrorThreshold
|
|
}
|
|
|
|
return errorThreshold
|
|
}
|
|
|
|
func fetchMaxClientsCount(cfg *viper.Viper) int {
|
|
maxClientsCount := cfg.GetInt(cfgMaxClientsCount)
|
|
if maxClientsCount <= 0 {
|
|
maxClientsCount = defaultMaxClientsCount
|
|
}
|
|
|
|
return maxClientsCount
|
|
}
|
|
|
|
func fetchMaxClientsDeadline(cfg *viper.Viper) time.Duration {
|
|
maxClientsDeadline := cfg.GetDuration(cfgMaxClientsDeadline)
|
|
if maxClientsDeadline <= 0 {
|
|
maxClientsDeadline = defaultMaxClientsDeadline
|
|
}
|
|
|
|
return maxClientsDeadline
|
|
}
|
|
|
|
func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
|
|
softMemoryLimit := cfg.GetSizeInBytes(cfgSoftMemoryLimit)
|
|
if softMemoryLimit <= 0 {
|
|
softMemoryLimit = defaultSoftMemoryLimit
|
|
}
|
|
|
|
return int64(softMemoryLimit)
|
|
}
|
|
|
|
func fetchRetryMaxAttempts(cfg *viper.Viper) int {
|
|
val := cfg.GetInt(cfgRetryMaxAttempts)
|
|
if val <= 0 {
|
|
val = defaultRetryMaxAttempts
|
|
}
|
|
|
|
return val
|
|
}
|
|
|
|
func fetchRetryMaxBackoff(cfg *viper.Viper) time.Duration {
|
|
val := cfg.GetDuration(cfgRetryMaxBackoff)
|
|
if val <= 0 {
|
|
val = defaultRetryMaxBackoff
|
|
}
|
|
|
|
return val
|
|
}
|
|
|
|
func fetchRetryStrategy(cfg *viper.Viper) handler.RetryStrategy {
|
|
val := handler.RetryStrategy(cfg.GetString(cfgRetryStrategy))
|
|
if val != handler.RetryStrategyExponential && val != handler.RetryStrategyConstant {
|
|
val = defaultRetryStrategy
|
|
}
|
|
|
|
return val
|
|
}
|
|
|
|
func fetchDefaultPolicy(l *zap.Logger, cfg *viper.Viper) netmap.PlacementPolicy {
|
|
var policy netmap.PlacementPolicy
|
|
|
|
if cfg.IsSet(cfgPolicyDefault) {
|
|
policyStr := cfg.GetString(cfgPolicyDefault)
|
|
if err := policy.DecodeString(policyStr); err != nil {
|
|
l.Warn(logs.FailedToParseDefaultLocationConstraint,
|
|
zap.String("policy", policyStr), zap.String("default", defaultPlacementPolicy), zap.Error(err))
|
|
} else {
|
|
return policy
|
|
}
|
|
}
|
|
|
|
if err := policy.DecodeString(defaultPlacementPolicy); err != nil {
|
|
l.Fatal(logs.FailedToParseDefaultDefaultLocationConstraint, zap.String("policy", defaultPlacementPolicy))
|
|
}
|
|
|
|
return policy
|
|
}
|
|
|
|
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
|
if v.IsSet(cfgEntry) {
|
|
lifetime := v.GetDuration(cfgEntry)
|
|
if lifetime <= 0 {
|
|
l.Error(logs.InvalidLifetimeUsingDefaultValue,
|
|
zap.String("parameter", cfgEntry),
|
|
zap.Duration("value in config", lifetime),
|
|
zap.Duration("default", defaultValue))
|
|
} else {
|
|
return lifetime
|
|
}
|
|
}
|
|
|
|
return defaultValue
|
|
}
|
|
|
|
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
|
if v.IsSet(cfgEntry) {
|
|
size := v.GetInt(cfgEntry)
|
|
if size <= 0 {
|
|
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
|
|
zap.String("parameter", cfgEntry),
|
|
zap.Int("value in config", size),
|
|
zap.Int("default", defaultValue))
|
|
} else {
|
|
return size
|
|
}
|
|
}
|
|
|
|
return defaultValue
|
|
}
|
|
|
|
func fetchRemovingCheckInterval(v *viper.Viper, l *zap.Logger) time.Duration {
|
|
if !v.IsSet(cfgAccessBoxCacheRemovingCheckInterval) {
|
|
return defaultAccessBoxCacheRemovingCheckInterval
|
|
}
|
|
|
|
duration := v.GetDuration(cfgAccessBoxCacheRemovingCheckInterval)
|
|
if duration >= 0 {
|
|
return duration
|
|
}
|
|
|
|
l.Error(logs.InvalidAccessBoxCacheRemovingCheckInterval,
|
|
zap.String("parameter", cfgAccessBoxCacheRemovingCheckInterval),
|
|
zap.Duration("value in config", duration),
|
|
zap.Duration("default", defaultAccessBoxCacheRemovingCheckInterval))
|
|
|
|
return defaultAccessBoxCacheRemovingCheckInterval
|
|
}
|
|
|
|
func fetchDefaultMaxAge(cfg *viper.Viper, l *zap.Logger) int {
|
|
defaultMaxAge := handler.DefaultMaxAge
|
|
|
|
if cfg.IsSet(cfgDefaultMaxAge) {
|
|
defaultMaxAge = cfg.GetInt(cfgDefaultMaxAge)
|
|
|
|
if defaultMaxAge <= 0 && defaultMaxAge != -1 {
|
|
l.Fatal(logs.InvalidDefaultMaxAge,
|
|
zap.String("parameter", cfgDefaultMaxAge),
|
|
zap.String("value in config", strconv.Itoa(defaultMaxAge)))
|
|
}
|
|
}
|
|
|
|
return defaultMaxAge
|
|
}
|
|
|
|
func fetchRegionMappingPolicies(l *zap.Logger, cfg *viper.Viper) map[string]netmap.PlacementPolicy {
|
|
filepath := cfg.GetString(cfgPolicyRegionMapFile)
|
|
regionPolicyMap, err := readRegionMap(filepath)
|
|
if err != nil {
|
|
l.Warn(logs.FailedToReadRegionMapFilePolicies, zap.String("file", filepath), zap.Error(err))
|
|
return make(map[string]netmap.PlacementPolicy)
|
|
}
|
|
|
|
regionMap := make(map[string]netmap.PlacementPolicy, len(regionPolicyMap))
|
|
for region, policy := range regionPolicyMap {
|
|
if region == api.DefaultLocationConstraint {
|
|
l.Warn(logs.DefaultLocationConstraintCantBeOverriden, zap.String("policy", policy))
|
|
continue
|
|
}
|
|
|
|
var pp netmap.PlacementPolicy
|
|
if err = pp.DecodeString(policy); err == nil {
|
|
regionMap[region] = pp
|
|
continue
|
|
}
|
|
|
|
if err = pp.UnmarshalJSON([]byte(policy)); err == nil {
|
|
regionMap[region] = pp
|
|
continue
|
|
}
|
|
|
|
l.Warn(logs.FailedToParseLocationConstraint, zap.String("region", region), zap.String("policy", policy))
|
|
}
|
|
|
|
return regionMap
|
|
}
|
|
|
|
func readRegionMap(filePath string) (map[string]string, error) {
|
|
regionMap := make(map[string]string)
|
|
|
|
if filePath == "" {
|
|
return regionMap, nil
|
|
}
|
|
|
|
data, err := os.ReadFile(filePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("coudln't read file '%s'", filePath)
|
|
}
|
|
|
|
if err = json.Unmarshal(data, ®ionMap); err != nil {
|
|
return nil, fmt.Errorf("unmarshal policies: %w", err)
|
|
}
|
|
|
|
return regionMap, nil
|
|
}
|
|
|
|
func fetchDefaultCopiesNumbers(l *zap.Logger, v *viper.Viper) []uint32 {
|
|
unparsed := v.GetStringSlice(cfgSetCopiesNumber)
|
|
result := make([]uint32, len(unparsed))
|
|
for i := range unparsed {
|
|
parsedValue, err := strconv.ParseUint(unparsed[i], 10, 32)
|
|
if err != nil {
|
|
l.Warn(logs.FailedToParseDefaultCopiesNumbers,
|
|
zap.Strings("copies numbers", unparsed), zap.Uint32s("default", defaultCopiesNumbers), zap.Error(err))
|
|
return defaultCopiesNumbers
|
|
}
|
|
result[i] = uint32(parsedValue)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func fetchCopiesNumbers(l *zap.Logger, v *viper.Viper) map[string][]uint32 {
|
|
copiesNums := make(map[string][]uint32)
|
|
for i := 0; ; i++ {
|
|
key := cfgCopiesNumbers + "." + strconv.Itoa(i) + "."
|
|
constraint := v.GetString(key + "location_constraint")
|
|
vector := v.GetStringSlice(key + "vector")
|
|
|
|
if constraint == "" || len(vector) == 0 {
|
|
break
|
|
}
|
|
|
|
vector32 := make([]uint32, len(vector))
|
|
for j := range vector {
|
|
parsedValue, err := strconv.ParseUint(vector[j], 10, 32)
|
|
if err != nil {
|
|
l.Warn(logs.FailedToParseCopiesNumbers, zap.String("location", constraint),
|
|
zap.Strings("copies numbers", vector), zap.Error(err))
|
|
continue
|
|
}
|
|
vector32[j] = uint32(parsedValue)
|
|
}
|
|
|
|
copiesNums[constraint] = vector32
|
|
l.Info(logs.ConstraintAdded, zap.String("location", constraint), zap.Strings("copies numbers", vector))
|
|
}
|
|
return copiesNums
|
|
}
|
|
|
|
func fetchDefaultNamespaces(l *zap.Logger, v *viper.Viper) []string {
|
|
defaultNamespaces := v.GetStringSlice(cfgKludgeDefaultNamespaces)
|
|
if len(defaultNamespaces) == 0 {
|
|
defaultNamespaces = defaultDefaultNamespaces
|
|
l.Warn(logs.DefaultNamespacesCannotBeEmpty, zap.Strings("namespaces", defaultNamespaces))
|
|
}
|
|
|
|
for i := range defaultNamespaces { // to be set namespaces in env variable as `S3_GW_KLUDGE_DEFAULT_NAMESPACES="" 'root'`
|
|
defaultNamespaces[i] = strings.Trim(defaultNamespaces[i], "\"'")
|
|
}
|
|
|
|
return defaultNamespaces
|
|
}
|
|
|
|
func fetchNamespacesConfig(l *zap.Logger, v *viper.Viper) (NamespacesConfig, []string) {
|
|
defaultNSRegionMap := fetchRegionMappingPolicies(l, v)
|
|
defaultNSRegionMap[defaultConstraintName] = fetchDefaultPolicy(l, v)
|
|
|
|
defaultNSCopiesNumbers := fetchCopiesNumbers(l, v)
|
|
defaultNSCopiesNumbers[defaultConstraintName] = fetchDefaultCopiesNumbers(l, v)
|
|
|
|
defaultNSValue := Namespace{
|
|
LocationConstraints: defaultNSRegionMap,
|
|
CopiesNumbers: defaultNSCopiesNumbers,
|
|
}
|
|
|
|
nsConfig, err := readNamespacesConfig(v.GetString(cfgNamespacesConfig))
|
|
if err != nil {
|
|
l.Warn(logs.FailedToParseNamespacesConfig, zap.Error(err))
|
|
}
|
|
|
|
defaultNamespacesNames := fetchDefaultNamespaces(l, v)
|
|
|
|
var overrideDefaults []Namespace
|
|
for _, name := range defaultNamespacesNames {
|
|
if ns, ok := nsConfig.Namespaces[name]; ok {
|
|
overrideDefaults = append(overrideDefaults, ns)
|
|
delete(nsConfig.Namespaces, name)
|
|
}
|
|
}
|
|
|
|
if len(overrideDefaults) > 0 {
|
|
l.Warn(logs.DefaultNamespaceConfigValuesBeOverwritten)
|
|
defaultNSValue.LocationConstraints = overrideDefaults[0].LocationConstraints
|
|
defaultNSValue.CopiesNumbers = overrideDefaults[0].CopiesNumbers
|
|
if len(overrideDefaults) > 1 {
|
|
l.Warn(logs.MultipleDefaultOverridesFound, zap.String("name", overrideDefaults[0].Name))
|
|
}
|
|
}
|
|
|
|
nsConfig.Namespaces[defaultNamespace] = Namespace{
|
|
Name: defaultNamespace,
|
|
LocationConstraints: defaultNSValue.LocationConstraints,
|
|
CopiesNumbers: defaultNSValue.CopiesNumbers,
|
|
}
|
|
|
|
return nsConfig, defaultNamespacesNames
|
|
}
|
|
|
|
func readNamespacesConfig(filepath string) (NamespacesConfig, error) {
|
|
nsConfig := NamespacesConfig{
|
|
Namespaces: make(Namespaces),
|
|
}
|
|
|
|
if filepath == "" {
|
|
return nsConfig, nil
|
|
}
|
|
|
|
data, err := os.ReadFile(filepath)
|
|
if err != nil {
|
|
return nsConfig, fmt.Errorf("failed to read namespace config '%s': %w", filepath, err)
|
|
}
|
|
|
|
if err = json.Unmarshal(data, &nsConfig); err != nil {
|
|
return nsConfig, fmt.Errorf("failed to parse namespace config: %w", err)
|
|
}
|
|
|
|
return nsConfig, nil
|
|
}
|
|
|
|
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
|
|
var nodes []pool.NodeParam
|
|
for i := 0; ; i++ {
|
|
key := cfgPeers + "." + strconv.Itoa(i) + "."
|
|
address := v.GetString(key + "address")
|
|
weight := v.GetFloat64(key + "weight")
|
|
priority := v.GetInt(key + "priority")
|
|
|
|
if address == "" {
|
|
l.Warn(logs.SkipEmptyAddress)
|
|
break
|
|
}
|
|
if weight <= 0 { // unspecified or wrong
|
|
weight = 1
|
|
}
|
|
if priority <= 0 { // unspecified or wrong
|
|
priority = 1
|
|
}
|
|
|
|
nodes = append(nodes, pool.NewNodeParam(priority, address, weight))
|
|
|
|
l.Info(logs.AddedStoragePeer,
|
|
zap.Int("priority", priority),
|
|
zap.String("address", address),
|
|
zap.Float64("weight", weight))
|
|
}
|
|
|
|
return nodes
|
|
}
|
|
|
|
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
|
var servers []ServerInfo
|
|
seen := make(map[string]struct{})
|
|
|
|
for i := 0; ; i++ {
|
|
key := cfgServer + "." + strconv.Itoa(i) + "."
|
|
|
|
var serverInfo ServerInfo
|
|
serverInfo.Address = v.GetString(key + "address")
|
|
serverInfo.TLS.Enabled = v.GetBool(key + cfgTLSEnabled)
|
|
serverInfo.TLS.KeyFile = v.GetString(key + cfgTLSKeyFile)
|
|
serverInfo.TLS.CertFile = v.GetString(key + cfgTLSCertFile)
|
|
|
|
if serverInfo.Address == "" {
|
|
break
|
|
}
|
|
|
|
if _, ok := seen[serverInfo.Address]; ok {
|
|
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
|
|
continue
|
|
}
|
|
seen[serverInfo.Address] = struct{}{}
|
|
servers = append(servers, serverInfo)
|
|
}
|
|
|
|
return servers
|
|
}
|
|
|
|
func fetchDomains(v *viper.Viper, log *zap.Logger) []string {
|
|
domains := validateDomains(v.GetStringSlice(cfgListenDomains), log)
|
|
|
|
countParts := func(domain string) int {
|
|
return strings.Count(domain, ".")
|
|
}
|
|
|
|
sort.Slice(domains, func(i, j int) bool {
|
|
return countParts(domains[i]) > countParts(domains[j])
|
|
})
|
|
|
|
return domains
|
|
}
|
|
|
|
func fetchVHSNamespaces(v *viper.Viper, log *zap.Logger) map[string]bool {
|
|
vhsNamespacesEnabled := make(map[string]bool)
|
|
nsMap := v.GetStringMap(cfgVHSNamespaces)
|
|
for ns, val := range nsMap {
|
|
if _, ok := vhsNamespacesEnabled[ns]; ok {
|
|
log.Warn(logs.WarnDuplicateNamespaceVHS, zap.String("namespace", ns))
|
|
continue
|
|
}
|
|
|
|
enabledFlag, ok := val.(bool)
|
|
if !ok {
|
|
log.Warn(logs.WarnValueVHSEnabledFlagWrongType, zap.String("namespace", ns))
|
|
continue
|
|
}
|
|
|
|
vhsNamespacesEnabled[ns] = enabledFlag
|
|
}
|
|
|
|
return vhsNamespacesEnabled
|
|
}
|
|
|
|
func newSettings() *viper.Viper {
|
|
v := viper.New()
|
|
|
|
v.AutomaticEnv()
|
|
v.SetEnvPrefix(envPrefix)
|
|
v.SetConfigType("yaml")
|
|
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
|
v.AllowEmptyEnv(true)
|
|
|
|
// flags setup:
|
|
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
|
|
flags.SetOutput(os.Stdout)
|
|
flags.SortFlags = false
|
|
|
|
flags.Bool(cmdPProf, false, "enable pprof")
|
|
flags.Bool(cmdMetrics, false, "enable prometheus metrics")
|
|
|
|
help := flags.BoolP(cmdHelp, "h", false, "show help")
|
|
versionFlag := flags.BoolP(cmdVersion, "v", false, "show version")
|
|
|
|
flags.StringP(cmdWallet, "w", "", `path to the wallet`)
|
|
flags.String(cmdAddress, "", `address of wallet account`)
|
|
flags.StringArray(cmdConfig, nil, "config paths")
|
|
flags.String(cmdConfigDir, "", "config dir path")
|
|
|
|
flags.Duration(cfgHealthcheckTimeout, defaultHealthcheckTimeout, "set timeout to check node health during rebalance")
|
|
flags.Duration(cfgConnectTimeout, defaultConnectTimeout, "set timeout to connect to FrostFS nodes")
|
|
flags.Duration(cfgRebalanceInterval, defaultRebalanceInterval, "set rebalance interval")
|
|
|
|
flags.Int(cfgMaxClientsCount, defaultMaxClientsCount, "set max-clients count")
|
|
flags.Duration(cfgMaxClientsDeadline, defaultMaxClientsDeadline, "set max-clients deadline")
|
|
|
|
flags.String(cmdListenAddress, "0.0.0.0:8080", "set the main address to listen")
|
|
flags.String(cfgTLSCertFile, "", "TLS certificate file to use")
|
|
flags.String(cfgTLSKeyFile, "", "TLS key file to use")
|
|
|
|
peers := flags.StringArrayP(cfgPeers, "p", nil, "set FrostFS nodes")
|
|
|
|
flags.StringP(cfgRPCEndpoint, "r", "", "set RPC endpoint")
|
|
resolveMethods := flags.StringSlice(cfgResolveOrder, []string{resolver.DNSResolver}, "set bucket name resolve order")
|
|
|
|
domains := flags.StringSliceP(cfgListenDomains, "d", nil, "set domains to be listened")
|
|
|
|
// set defaults:
|
|
|
|
v.SetDefault(cfgAccessBoxCacheRemovingCheckInterval, defaultAccessBoxCacheRemovingCheckInterval)
|
|
|
|
// logger:
|
|
v.SetDefault(cfgLoggerLevel, "debug")
|
|
v.SetDefault(cfgLoggerDestination, "stdout")
|
|
|
|
// pool:
|
|
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
|
v.SetDefault(cfgStreamTimeout, defaultStreamTimeout)
|
|
|
|
v.SetDefault(cfgPProfAddress, "localhost:8085")
|
|
v.SetDefault(cfgPrometheusAddress, "localhost:8086")
|
|
|
|
// frostfs
|
|
v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb
|
|
|
|
// kludge
|
|
v.SetDefault(cfgKludgeUseDefaultXMLNS, false)
|
|
v.SetDefault(cfgKludgeBypassContentEncodingCheckInChunks, false)
|
|
v.SetDefault(cfgKludgeDefaultNamespaces, defaultDefaultNamespaces)
|
|
|
|
// web
|
|
v.SetDefault(cfgWebReadHeaderTimeout, defaultReadHeaderTimeout)
|
|
v.SetDefault(cfgWebIdleTimeout, defaultIdleTimeout)
|
|
|
|
// frostfsid
|
|
v.SetDefault(cfgFrostfsIDContract, "frostfsid.frostfs")
|
|
|
|
// policy
|
|
v.SetDefault(cfgPolicyContract, "policy.frostfs")
|
|
|
|
// proxy
|
|
v.SetDefault(cfgProxyContract, "proxy.frostfs")
|
|
|
|
// resolve
|
|
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
|
|
|
// retry
|
|
v.SetDefault(cfgRetryMaxAttempts, defaultRetryMaxAttempts)
|
|
v.SetDefault(cfgRetryMaxBackoff, defaultRetryMaxBackoff)
|
|
|
|
// vhs
|
|
v.SetDefault(cfgVHSHeader, defaultVHSHeader)
|
|
v.SetDefault(cfgServernameHeader, defaultServernameHeader)
|
|
|
|
// Bind flags
|
|
if err := bindFlags(v, flags); err != nil {
|
|
panic(fmt.Errorf("bind flags: %w", err))
|
|
}
|
|
|
|
if err := flags.Parse(os.Args); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
|
|
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
|
|
}
|
|
|
|
if resolveMethods != nil {
|
|
v.SetDefault(cfgResolveOrder, *resolveMethods)
|
|
}
|
|
|
|
if peers != nil && len(*peers) > 0 {
|
|
for i := range *peers {
|
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", (*peers)[i])
|
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
|
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
|
|
}
|
|
}
|
|
|
|
if domains != nil && len(*domains) > 0 {
|
|
v.SetDefault(cfgListenDomains, *domains)
|
|
}
|
|
|
|
switch {
|
|
case help != nil && *help:
|
|
fmt.Printf("FrostFS S3 gateway %s\n", version.Version)
|
|
flags.PrintDefaults()
|
|
|
|
fmt.Println()
|
|
fmt.Println("Default environments:")
|
|
fmt.Println()
|
|
keys := v.AllKeys()
|
|
sort.Strings(keys)
|
|
|
|
for i := range keys {
|
|
if _, ok := ignore[keys[i]]; ok {
|
|
continue
|
|
}
|
|
|
|
defaultValue := v.GetString(keys[i])
|
|
if len(defaultValue) == 0 {
|
|
continue
|
|
}
|
|
|
|
k := strings.Replace(keys[i], ".", "_", -1)
|
|
fmt.Printf("%s_%s = %s\n", envPrefix, strings.ToUpper(k), defaultValue)
|
|
}
|
|
|
|
fmt.Println()
|
|
fmt.Println("Peers preset:")
|
|
fmt.Println()
|
|
|
|
fmt.Printf("%s_%s_[N]_ADDRESS = string\n", envPrefix, strings.ToUpper(cfgPeers))
|
|
fmt.Printf("%s_%s_[N]_WEIGHT = 0..1 (float)\n", envPrefix, strings.ToUpper(cfgPeers))
|
|
|
|
os.Exit(0)
|
|
case versionFlag != nil && *versionFlag:
|
|
fmt.Printf("FrostFS S3 Gateway\nVersion: %s\nGoVersion: %s\n", version.Version, runtime.Version())
|
|
os.Exit(0)
|
|
}
|
|
|
|
if err := readInConfig(v); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return v
|
|
}
|
|
|
|
func bindFlags(v *viper.Viper, flags *pflag.FlagSet) error {
|
|
if err := v.BindPFlag(cfgPProfEnabled, flags.Lookup(cmdPProf)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cmdConfig, flags.Lookup(cmdConfig)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cmdConfigDir, flags.Lookup(cmdConfigDir)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgHealthcheckTimeout, flags.Lookup(cfgHealthcheckTimeout)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgConnectTimeout, flags.Lookup(cfgConnectTimeout)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgRebalanceInterval, flags.Lookup(cfgRebalanceInterval)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgMaxClientsCount, flags.Lookup(cfgMaxClientsCount)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgMaxClientsDeadline, flags.Lookup(cfgMaxClientsDeadline)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgRPCEndpoint, flags.Lookup(cfgRPCEndpoint)); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
|
|
return err
|
|
}
|
|
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
|
|
return err
|
|
}
|
|
return v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile))
|
|
}
|
|
|
|
func readInConfig(v *viper.Viper) error {
|
|
if v.IsSet(cmdConfig) {
|
|
if err := readConfig(v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if v.IsSet(cmdConfigDir) {
|
|
if err := readConfigDir(v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func readConfigDir(v *viper.Viper) error {
|
|
cfgSubConfigDir := v.GetString(cmdConfigDir)
|
|
entries, err := os.ReadDir(cfgSubConfigDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
if entry.IsDir() {
|
|
continue
|
|
}
|
|
ext := path.Ext(entry.Name())
|
|
if ext != ".yaml" && ext != ".yml" {
|
|
continue
|
|
}
|
|
|
|
if err = mergeConfig(v, path.Join(cfgSubConfigDir, entry.Name())); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func readConfig(v *viper.Viper) error {
|
|
for _, fileName := range v.GetStringSlice(cmdConfig) {
|
|
if err := mergeConfig(v, fileName); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func mergeConfig(v *viper.Viper, fileName string) error {
|
|
cfgFile, err := os.Open(fileName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if errClose := cfgFile.Close(); errClose != nil {
|
|
panic(errClose)
|
|
}
|
|
}()
|
|
|
|
return v.MergeConfig(cfgFile)
|
|
}
|
|
|
|
func pickLogger(v *viper.Viper) *Logger {
|
|
lvl, err := getLogLevel(v)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
dest := v.GetString(cfgLoggerDestination)
|
|
|
|
switch dest {
|
|
case destinationStdout:
|
|
return newStdoutLogger(lvl)
|
|
case destinationJournald:
|
|
return newJournaldLogger(lvl)
|
|
default:
|
|
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
|
}
|
|
}
|
|
|
|
// newStdoutLogger constructs a Logger instance for the current application.
|
|
// Panics on failure.
|
|
//
|
|
// Logger contains a logger is built from zap's production logging configuration with:
|
|
// - parameterized level (debug by default)
|
|
// - console encoding
|
|
// - ISO8601 time encoding
|
|
//
|
|
// and atomic log level to dynamically change it.
|
|
//
|
|
// Logger records a stack trace for all messages at or above fatal level.
|
|
//
|
|
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
|
func newStdoutLogger(lvl zapcore.Level) *Logger {
|
|
c := zap.NewProductionConfig()
|
|
c.Level = zap.NewAtomicLevelAt(lvl)
|
|
c.Encoding = "console"
|
|
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
|
|
|
l, err := c.Build(
|
|
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
|
)
|
|
if err != nil {
|
|
panic(fmt.Sprintf("build zap logger instance: %v", err))
|
|
}
|
|
|
|
return &Logger{
|
|
logger: l,
|
|
lvl: c.Level,
|
|
}
|
|
}
|
|
|
|
func newJournaldLogger(lvl zapcore.Level) *Logger {
|
|
c := zap.NewProductionConfig()
|
|
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
|
c.Level = zap.NewAtomicLevelAt(lvl)
|
|
|
|
encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields)
|
|
|
|
core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
|
coreWithContext := core.With([]zapcore.Field{
|
|
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
|
zapjournald.SyslogIdentifier(),
|
|
zapjournald.SyslogPid(),
|
|
})
|
|
|
|
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
|
|
|
|
return &Logger{
|
|
logger: l,
|
|
lvl: c.Level,
|
|
}
|
|
}
|
|
|
|
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
|
var lvl zapcore.Level
|
|
lvlStr := v.GetString(cfgLoggerLevel)
|
|
err := lvl.UnmarshalText([]byte(lvlStr))
|
|
if err != nil {
|
|
return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+
|
|
"value should be one of %v", lvlStr, err, [...]zapcore.Level{
|
|
zapcore.DebugLevel,
|
|
zapcore.InfoLevel,
|
|
zapcore.WarnLevel,
|
|
zapcore.ErrorLevel,
|
|
zapcore.DPanicLevel,
|
|
zapcore.PanicLevel,
|
|
zapcore.FatalLevel,
|
|
})
|
|
}
|
|
return lvl, nil
|
|
}
|
|
|
|
func validateDomains(domains []string, log *zap.Logger) []string {
|
|
validDomains := make([]string, 0, len(domains))
|
|
LOOP:
|
|
for _, domain := range domains {
|
|
domainParts := strings.Split(domain, ".")
|
|
for _, part := range domainParts {
|
|
if strings.ContainsAny(part, "<>") && part != wildcardPlaceholder {
|
|
log.Warn(logs.WarnDomainContainsInvalidPlaceholder, zap.String("domain", domain))
|
|
continue LOOP
|
|
}
|
|
}
|
|
validDomains = append(validDomains, domain)
|
|
}
|
|
return validDomains
|
|
}
|