frostfs-http-gw/cmd/http-gw/settings.go
Alex Vanin 1e8fa19bb9 [#195] Make all initial logging tags as default tags
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2025-02-07 11:47:38 +00:00

853 lines
22 KiB
Go

package main
import (
"context"
"encoding/hex"
"fmt"
"io"
"math"
"os"
"path"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const (
destinationStdout = "stdout"
destinationJournald = "journald"
)
const (
defaultRebalanceTimer = 60 * time.Second
defaultRequestTimeout = 15 * time.Second
defaultConnectTimeout = 10 * time.Second
defaultStreamTimeout = 10 * time.Second
defaultLoggerSamplerInterval = 1 * time.Second
defaultShutdownTimeout = 15 * time.Second
defaultPoolErrorThreshold uint32 = 100
defaultSoftMemoryLimit = math.MaxInt64
defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb
defaultNamespaceHeader = "X-Frostfs-Namespace"
defaultReconnectInterval = time.Minute
defaultCORSMaxAge = 600 // seconds
defaultMultinetFallbackDelay = 300 * time.Millisecond
cfgServer = "server"
cfgTLSEnabled = "tls.enabled"
cfgTLSCertFile = "tls.cert_file"
cfgTLSKeyFile = "tls.key_file"
cfgReconnectInterval = "reconnect_interval"
cfgIndexPageEnabled = "index_page.enabled"
cfgIndexPageTemplatePath = "index_page.template_path"
cfgWorkerPoolSize = "worker_pool_size"
// Web.
cfgWebReadBufferSize = "web.read_buffer_size"
cfgWebWriteBufferSize = "web.write_buffer_size"
cfgWebReadTimeout = "web.read_timeout"
cfgWebWriteTimeout = "web.write_timeout"
cfgWebStreamRequestBody = "web.stream_request_body"
cfgWebMaxRequestBodySize = "web.max_request_body_size"
// Metrics / Profiler.
cfgPrometheusEnabled = "prometheus.enabled"
cfgPrometheusAddress = "prometheus.address"
cfgPprofEnabled = "pprof.enabled"
cfgPprofAddress = "pprof.address"
// Tracing ...
cfgTracingEnabled = "tracing.enabled"
cfgTracingExporter = "tracing.exporter"
cfgTracingEndpoint = "tracing.endpoint"
cfgTracingTrustedCa = "tracing.trusted_ca"
cfgTracingAttributes = "tracing.attributes"
// Pool config.
cfgConTimeout = "connect_timeout"
cfgStreamTimeout = "stream_timeout"
cfgReqTimeout = "request_timeout"
cfgRebalance = "rebalance_timer"
cfgPoolErrorThreshold = "pool_error_threshold"
// Logger.
cfgLoggerLevel = "logger.level"
cfgLoggerDestination = "logger.destination"
cfgLoggerSamplingEnabled = "logger.sampling.enabled"
cfgLoggerSamplingInitial = "logger.sampling.initial"
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
cfgLoggerSamplingInterval = "logger.sampling.interval"
cfgLoggerTags = "logger.tags"
cfgLoggerTagsPrefixTmpl = cfgLoggerTags + ".%d."
cfgLoggerTagsNameTmpl = cfgLoggerTagsPrefixTmpl + "name"
cfgLoggerTagsLevelTmpl = cfgLoggerTagsPrefixTmpl + "level"
// Wallet.
cfgWalletPassphrase = "wallet.passphrase"
cfgWalletPath = "wallet.path"
cfgWalletAddress = "wallet.address"
// Uploader Header.
cfgUploaderHeaderEnableDefaultTimestamp = "upload_header.use_default_timestamp"
// Peers.
cfgPeers = "peers"
// NeoGo.
cfgRPCEndpoint = "rpc_endpoint"
// Resolving.
cfgResolveOrder = "resolve_order"
// Zip compression.
//
// Deprecated: Use cfgArchiveCompression instead.
cfgZipCompression = "zip.compression"
// Archive compression.
cfgArchiveCompression = "archive.compression"
// Runtime.
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
// Enabling client side object preparing for PUT operations.
cfgClientCut = "frostfs.client_cut"
// Sets max buffer size for read payload in put operations.
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
// Configuration of parameters of requests to FrostFS.
// Sets max attempt to make successful tree request.
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
// Caching.
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
cfgBucketsCacheSize = "cache.buckets.size"
cfgNetmapCacheLifetime = "cache.netmap.lifetime"
// Bucket resolving options.
cfgResolveNamespaceHeader = "resolve_bucket.namespace_header"
cfgResolveDefaultNamespaces = "resolve_bucket.default_namespaces"
// CORS.
cfgCORSAllowOrigin = "cors.allow_origin"
cfgCORSAllowMethods = "cors.allow_methods"
cfgCORSAllowHeaders = "cors.allow_headers"
cfgCORSExposeHeaders = "cors.expose_headers"
cfgCORSAllowCredentials = "cors.allow_credentials"
cfgCORSMaxAge = "cors.max_age"
// Multinet.
cfgMultinetEnabled = "multinet.enabled"
cfgMultinetBalancer = "multinet.balancer"
cfgMultinetRestrict = "multinet.restrict"
cfgMultinetFallbackDelay = "multinet.fallback_delay"
cfgMultinetSubnets = "multinet.subnets"
// Feature.
cfgFeaturesEnableFilepathFallback = "features.enable_filepath_fallback"
cfgFeaturesTreePoolNetmapSupport = "features.tree_pool_netmap_support"
// Command line args.
cmdHelp = "help"
cmdVersion = "version"
cmdPprof = "pprof"
cmdMetrics = "metrics"
cmdWallet = "wallet"
cmdAddress = "address"
cmdConfig = "config"
cmdConfigDir = "config-dir"
cmdListenAddress = "listen_address"
)
var ignore = map[string]struct{}{
cfgPeers: {},
cmdHelp: {},
cmdVersion: {},
}
var defaultTags = []string{logs.TagApp, logs.TagDatapath, logs.TagExternalStorage, logs.TagExternalStorageTree}
type Logger struct {
logger *zap.Logger
lvl zap.AtomicLevel
}
type appCfg struct {
flags *pflag.FlagSet
mu sync.RWMutex
settings *viper.Viper
}
func (a *appCfg) reload() error {
old := a.config()
v, err := newViper(a.flags)
if err != nil {
return err
}
if old.IsSet(cmdConfig) {
v.Set(cmdConfig, old.Get(cmdConfig))
}
if old.IsSet(cmdConfigDir) {
v.Set(cmdConfigDir, old.Get(cmdConfigDir))
}
if err = readInConfig(v); err != nil {
return err
}
a.setConfig(v)
return nil
}
func (a *appCfg) config() *viper.Viper {
a.mu.RLock()
defer a.mu.RUnlock()
return a.settings
}
func (a *appCfg) setConfig(v *viper.Viper) {
a.mu.Lock()
a.settings = v
a.mu.Unlock()
}
func newViper(flags *pflag.FlagSet) (*viper.Viper, error) {
v := viper.New()
v.AutomaticEnv()
v.SetEnvPrefix(Prefix)
v.AllowEmptyEnv(true)
v.SetConfigType("yaml")
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
if err := bindFlags(v, flags); err != nil {
return nil, err
}
setDefaults(v, flags)
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
}
return v, nil
}
func settings() *appCfg {
// flags setup:
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
flags.SetOutput(os.Stdout)
flags.SortFlags = false
flags.Bool(cmdPprof, false, "enable pprof")
flags.Bool(cmdMetrics, false, "enable prometheus")
help := flags.BoolP(cmdHelp, "h", false, "show help")
version := flags.BoolP(cmdVersion, "v", false, "show version")
flags.StringP(cmdWallet, "w", "", `path to the wallet`)
flags.String(cmdAddress, "", `address of wallet account`)
flags.StringArray(cmdConfig, nil, "config paths")
flags.String(cmdConfigDir, "", "config dir path")
flags.Duration(cfgConTimeout, defaultConnectTimeout, "gRPC connect timeout")
flags.Duration(cfgStreamTimeout, defaultStreamTimeout, "gRPC individual message timeout")
flags.Duration(cfgReqTimeout, defaultRequestTimeout, "gRPC request timeout")
flags.Duration(cfgRebalance, defaultRebalanceTimer, "gRPC connection rebalance timer")
flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen")
flags.String(cfgTLSCertFile, "", "TLS certificate path")
flags.String(cfgTLSKeyFile, "", "TLS key path")
flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes")
flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order")
if err := flags.Parse(os.Args); err != nil {
panic(err)
}
v, err := newViper(flags)
if err != nil {
panic(fmt.Errorf("bind flags: %w", err))
}
switch {
case help != nil && *help:
fmt.Printf("FrostFS HTTP Gateway %s\n", Version)
flags.PrintDefaults()
fmt.Println()
fmt.Println("Default environments:")
fmt.Println()
keys := v.AllKeys()
sort.Strings(keys)
for i := range keys {
if _, ok := ignore[keys[i]]; ok {
continue
}
defaultValue := v.GetString(keys[i])
if len(defaultValue) == 0 {
continue
}
k := strings.Replace(keys[i], ".", "_", -1)
fmt.Printf("%s_%s = %s\n", Prefix, strings.ToUpper(k), defaultValue)
}
fmt.Println()
fmt.Println("Peers preset:")
fmt.Println()
fmt.Printf("%s_%s_[N]_ADDRESS = string\n", Prefix, strings.ToUpper(cfgPeers))
fmt.Printf("%s_%s_[N]_WEIGHT = float\n", Prefix, strings.ToUpper(cfgPeers))
os.Exit(0)
case version != nil && *version:
fmt.Printf("FrostFS HTTP Gateway\nVersion: %s\nGoVersion: %s\n", Version, runtime.Version())
os.Exit(0)
}
if err := readInConfig(v); err != nil {
panic(err)
}
return &appCfg{
flags: flags,
settings: v,
}
}
func setDefaults(v *viper.Viper, flags *pflag.FlagSet) {
// set defaults:
// logger:
v.SetDefault(cfgLoggerLevel, "debug")
v.SetDefault(cfgLoggerDestination, "stdout")
v.SetDefault(cfgLoggerSamplingEnabled, false)
v.SetDefault(cfgLoggerSamplingThereafter, 100)
v.SetDefault(cfgLoggerSamplingInitial, 100)
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
// pool:
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
// frostfs:
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
// web-server:
v.SetDefault(cfgWebReadBufferSize, 4096)
v.SetDefault(cfgWebWriteBufferSize, 4096)
v.SetDefault(cfgWebReadTimeout, time.Minute*10)
v.SetDefault(cfgWebWriteTimeout, time.Minute*5)
v.SetDefault(cfgWebStreamRequestBody, true)
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
v.SetDefault(cfgWorkerPoolSize, 1000)
// upload header
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
// metrics
v.SetDefault(cfgPprofAddress, "localhost:8083")
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
// resolve bucket
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
// multinet
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
if resolveMethods, err := flags.GetStringSlice(cfgResolveOrder); err == nil {
v.SetDefault(cfgResolveOrder, resolveMethods)
}
if peers, err := flags.GetStringArray(cfgPeers); err == nil {
for i := range peers {
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", peers[i])
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
}
}
}
func bindFlags(v *viper.Viper, flags *pflag.FlagSet) error {
// Binding flags
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
return err
}
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
return err
}
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
return err
}
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
return err
}
if err := v.BindPFlags(flags); err != nil {
return err
}
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
return err
}
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
return err
}
if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil {
return err
}
return nil
}
func readInConfig(v *viper.Viper) error {
if v.IsSet(cmdConfig) {
if err := readConfig(v); err != nil {
return err
}
}
if v.IsSet(cmdConfigDir) {
if err := readConfigDir(v); err != nil {
return err
}
}
return nil
}
func readConfigDir(v *viper.Viper) error {
cfgSubConfigDir := v.GetString(cmdConfigDir)
entries, err := os.ReadDir(cfgSubConfigDir)
if err != nil {
return err
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
ext := path.Ext(entry.Name())
if ext != ".yaml" && ext != ".yml" {
continue
}
if err = mergeConfig(v, path.Join(cfgSubConfigDir, entry.Name())); err != nil {
return err
}
}
return nil
}
func readConfig(v *viper.Viper) error {
for _, fileName := range v.GetStringSlice(cmdConfig) {
if err := mergeConfig(v, fileName); err != nil {
return err
}
}
return nil
}
func mergeConfig(v *viper.Viper, fileName string) error {
cfgFile, err := os.Open(fileName)
if err != nil {
return err
}
defer func() {
if errClose := cfgFile.Close(); errClose != nil {
panic(errClose)
}
}()
return v.MergeConfig(cfgFile)
}
func fetchLogTagsConfig(v *viper.Viper, defaultLvl zapcore.Level) (map[string]zapcore.Level, error) {
res := make(map[string]zapcore.Level)
for i := 0; ; i++ {
name := v.GetString(fmt.Sprintf(cfgLoggerTagsNameTmpl, i))
if name == "" {
break
}
lvl := defaultLvl
level := v.GetString(fmt.Sprintf(cfgLoggerTagsLevelTmpl, i))
if level != "" {
if err := lvl.Set(level); err != nil {
return nil, fmt.Errorf("failed to parse log tags config, unknown level: '%s'", level)
}
}
res[name] = lvl
}
if len(res) == 0 && !v.IsSet(cfgLoggerTags) {
for _, tag := range defaultTags {
res[tag] = defaultLvl
}
}
return res, nil
}
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
reconnect := cfg.GetDuration(cfgReconnectInterval)
if reconnect <= 0 {
reconnect = defaultReconnectInterval
}
return reconnect
}
func fetchIndexPageTemplate(v *viper.Viper, l *zap.Logger) (string, bool) {
if !v.GetBool(cfgIndexPageEnabled) {
return "", false
}
reader, err := os.Open(v.GetString(cfgIndexPageTemplatePath))
if err != nil {
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err), logs.TagField(logs.TagApp))
return "", true
}
tmpl, err := io.ReadAll(reader)
if err != nil {
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err), logs.TagField(logs.TagApp))
return "", true
}
l.Info(logs.SetCustomIndexPageTemplate, logs.TagField(logs.TagApp))
return string(tmpl), true
}
func fetchDefaultNamespaces(v *viper.Viper) []string {
namespaces := v.GetStringSlice(cfgResolveDefaultNamespaces)
for i := range namespaces { // to be set namespaces in env variable as `HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"`
namespaces[i] = strings.Trim(namespaces[i], "\"")
}
return namespaces
}
func fetchCORSMaxAge(v *viper.Viper) int {
maxAge := v.GetInt(cfgCORSMaxAge)
if maxAge <= 0 {
maxAge = defaultCORSMaxAge
}
return maxAge
}
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
var servers []ServerInfo
seen := make(map[string]struct{})
for i := 0; ; i++ {
key := cfgServer + "." + strconv.Itoa(i) + "."
var serverInfo ServerInfo
serverInfo.Address = v.GetString(key + "address")
serverInfo.TLS.Enabled = v.GetBool(key + cfgTLSEnabled)
serverInfo.TLS.KeyFile = v.GetString(key + cfgTLSKeyFile)
serverInfo.TLS.CertFile = v.GetString(key + cfgTLSCertFile)
if serverInfo.Address == "" {
break
}
if _, ok := seen[serverInfo.Address]; ok {
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address), logs.TagField(logs.TagApp))
continue
}
seen[serverInfo.Address] = struct{}{}
servers = append(servers, serverInfo)
}
return servers
}
func (a *app) initPools(ctx context.Context) {
key, err := getFrostFSKey(a.config(), a.log)
if err != nil {
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err), logs.TagField(logs.TagApp))
}
var prm pool.InitParameters
var prmTree treepool.InitParameters
prm.SetKey(&key.PrivateKey)
prmTree.SetKey(key)
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())),
logs.TagField(logs.TagApp))
for _, peer := range fetchPeers(a.log, a.config()) {
prm.AddNode(peer)
prmTree.AddNode(peer)
}
connTimeout := a.config().GetDuration(cfgConTimeout)
if connTimeout <= 0 {
connTimeout = defaultConnectTimeout
}
prm.SetNodeDialTimeout(connTimeout)
prmTree.SetNodeDialTimeout(connTimeout)
streamTimeout := a.config().GetDuration(cfgStreamTimeout)
if streamTimeout <= 0 {
streamTimeout = defaultStreamTimeout
}
prm.SetNodeStreamTimeout(streamTimeout)
prmTree.SetNodeStreamTimeout(streamTimeout)
healthCheckTimeout := a.config().GetDuration(cfgReqTimeout)
if healthCheckTimeout <= 0 {
healthCheckTimeout = defaultRequestTimeout
}
prm.SetHealthcheckTimeout(healthCheckTimeout)
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
rebalanceInterval := a.config().GetDuration(cfgRebalance)
if rebalanceInterval <= 0 {
rebalanceInterval = defaultRebalanceTimer
}
prm.SetClientRebalanceInterval(rebalanceInterval)
prmTree.SetClientRebalanceInterval(rebalanceInterval)
errorThreshold := a.config().GetUint32(cfgPoolErrorThreshold)
if errorThreshold <= 0 {
errorThreshold = defaultPoolErrorThreshold
}
prm.SetErrorThreshold(errorThreshold)
prm.SetLogger(a.log)
prmTree.SetLogger(a.log)
prmTree.SetMaxRequestAttempts(a.config().GetInt(cfgTreePoolMaxAttempts))
interceptors := []grpc.DialOption{
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
grpc.WithContextDialer(a.settings.dialerSource.GrpcContextDialer()),
}
prm.SetGRPCDialOptions(interceptors...)
prmTree.SetGRPCDialOptions(interceptors...)
p, err := pool.NewPool(prm)
if err != nil {
a.log.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err), logs.TagField(logs.TagApp))
}
if err = p.Dial(ctx); err != nil {
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err), logs.TagField(logs.TagApp))
}
if a.config().GetBool(cfgFeaturesTreePoolNetmapSupport) {
prmTree.SetNetMapInfoSource(frostfs.NewSource(frostfs.NewFrostFS(p), cache.NewNetmapCache(getNetmapCacheOptions(a.config(), a.log)), a.bucketCache, a.log))
}
treePool, err := treepool.NewPool(prmTree)
if err != nil {
a.log.Fatal(logs.FailedToCreateTreePool, zap.Error(err), logs.TagField(logs.TagApp))
}
if err = treePool.Dial(ctx); err != nil {
a.log.Fatal(logs.FailedToDialTreePool, zap.Error(err), logs.TagField(logs.TagApp))
}
a.pool = p
a.treePool = treePool
a.key = key
}
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
var nodes []pool.NodeParam
for i := 0; ; i++ {
key := cfgPeers + "." + strconv.Itoa(i) + "."
address := v.GetString(key + "address")
weight := v.GetFloat64(key + "weight")
priority := v.GetInt(key + "priority")
if address == "" {
break
}
if weight <= 0 { // unspecified or wrong
weight = 1
}
if priority <= 0 { // unspecified or wrong
priority = 1
}
nodes = append(nodes, pool.NewNodeParam(priority, address, weight))
l.Info(logs.AddedStoragePeer,
zap.Int("priority", priority),
zap.String("address", address),
zap.Float64("weight", weight),
logs.TagField(logs.TagApp))
}
return nodes
}
func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
softMemoryLimit := cfg.GetSizeInBytes(cfgSoftMemoryLimit)
if softMemoryLimit <= 0 {
softMemoryLimit = defaultSoftMemoryLimit
}
return int64(softMemoryLimit)
}
func getBucketCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
cacheCfg := cache.DefaultBucketConfig(l)
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime)
cacheCfg.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Size)
return cacheCfg
}
func getNetmapCacheOptions(v *viper.Viper, l *zap.Logger) *cache.NetmapCacheConfig {
cacheCfg := cache.DefaultNetmapConfig(l)
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgNetmapCacheLifetime, cacheCfg.Lifetime)
return cacheCfg
}
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
if v.IsSet(cfgEntry) {
lifetime := v.GetDuration(cfgEntry)
if lifetime <= 0 {
l.Error(logs.InvalidLifetimeUsingDefaultValue,
zap.String("parameter", cfgEntry),
zap.Duration("value in config", lifetime),
zap.Duration("default", defaultValue),
logs.TagField(logs.TagApp))
} else {
return lifetime
}
}
return defaultValue
}
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
if v.IsSet(cfgEntry) {
size := v.GetInt(cfgEntry)
if size <= 0 {
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
zap.String("parameter", cfgEntry),
zap.Int("value in config", size),
zap.Int("default", defaultValue),
logs.TagField(logs.TagApp))
} else {
return size
}
}
return defaultValue
}
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger))
if err != nil {
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err), logs.TagField(logs.TagApp))
}
return source
}
func fetchMultinetConfig(v *viper.Viper, l *zap.Logger) (cfg internalnet.Config) {
cfg.Enabled = v.GetBool(cfgMultinetEnabled)
cfg.Balancer = v.GetString(cfgMultinetBalancer)
cfg.Restrict = v.GetBool(cfgMultinetRestrict)
cfg.FallbackDelay = v.GetDuration(cfgMultinetFallbackDelay)
cfg.Subnets = make([]internalnet.Subnet, 0, 5)
cfg.EventHandler = internalnet.NewLogEventHandler(l)
for i := 0; ; i++ {
key := cfgMultinetSubnets + "." + strconv.Itoa(i) + "."
subnet := internalnet.Subnet{}
subnet.Prefix = v.GetString(key + "mask")
if subnet.Prefix == "" {
break
}
subnet.SourceIPs = v.GetStringSlice(key + "source_ips")
cfg.Subnets = append(cfg.Subnets, subnet)
}
return
}
func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
attributes := make(map[string]string)
for i := 0; ; i++ {
key := cfgTracingAttributes + "." + strconv.Itoa(i) + "."
attrKey := v.GetString(key + "key")
attrValue := v.GetString(key + "value")
if attrKey == "" {
break
}
if _, ok := attributes[attrKey]; ok {
return nil, fmt.Errorf("tracing attribute key %s defined more than once", attrKey)
}
if attrValue == "" {
return nil, fmt.Errorf("empty tracing attribute value for key %s", attrKey)
}
attributes[attrKey] = attrValue
}
return attributes, nil
}
func fetchArchiveCompression(v *viper.Viper) bool {
if v.IsSet(cfgZipCompression) {
return v.GetBool(cfgZipCompression)
}
return v.GetBool(cfgArchiveCompression)
}