package main import ( "context" "encoding/hex" "fmt" "io" "math" "os" "path" "runtime" "sort" "strconv" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree" "git.frostfs.info/TrueCloudLab/zapjournald" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/spf13/pflag" "github.com/spf13/viper" "github.com/ssgreg/journald" "github.com/valyala/fasthttp" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" ) const ( destinationStdout = "stdout" destinationJournald = "journald" ) const ( defaultRebalanceTimer = 60 * time.Second defaultRequestTimeout = 15 * time.Second defaultConnectTimeout = 10 * time.Second defaultStreamTimeout = 10 * time.Second defaultLoggerSamplerInterval = 1 * time.Second defaultShutdownTimeout = 15 * time.Second defaultPoolErrorThreshold uint32 = 100 defaultSoftMemoryLimit = math.MaxInt64 defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb defaultNamespaceHeader = "X-Frostfs-Namespace" defaultReconnectInterval = time.Minute defaultCORSMaxAge = 600 // seconds defaultMultinetFallbackDelay = 300 * time.Millisecond cfgServer = "server" cfgTLSEnabled = "tls.enabled" cfgTLSCertFile = "tls.cert_file" cfgTLSKeyFile = "tls.key_file" cfgReconnectInterval = "reconnect_interval" cfgIndexPageEnabled = "index_page.enabled" cfgIndexPageTemplatePath = "index_page.template_path" cfgWorkerPoolSize = "worker_pool_size" // Web. cfgWebReadBufferSize = "web.read_buffer_size" cfgWebWriteBufferSize = "web.write_buffer_size" cfgWebReadTimeout = "web.read_timeout" cfgWebWriteTimeout = "web.write_timeout" cfgWebStreamRequestBody = "web.stream_request_body" cfgWebMaxRequestBodySize = "web.max_request_body_size" // Metrics / Profiler. cfgPrometheusEnabled = "prometheus.enabled" cfgPrometheusAddress = "prometheus.address" cfgPprofEnabled = "pprof.enabled" cfgPprofAddress = "pprof.address" // Tracing ... cfgTracingEnabled = "tracing.enabled" cfgTracingExporter = "tracing.exporter" cfgTracingEndpoint = "tracing.endpoint" cfgTracingTrustedCa = "tracing.trusted_ca" cfgTracingAttributes = "tracing.attributes" // Pool config. cfgConTimeout = "connect_timeout" cfgStreamTimeout = "stream_timeout" cfgReqTimeout = "request_timeout" cfgRebalance = "rebalance_timer" cfgPoolErrorThreshold = "pool_error_threshold" // Logger. cfgLoggerLevel = "logger.level" cfgLoggerDestination = "logger.destination" cfgLoggerSamplingEnabled = "logger.sampling.enabled" cfgLoggerSamplingInitial = "logger.sampling.initial" cfgLoggerSamplingThereafter = "logger.sampling.thereafter" cfgLoggerSamplingInterval = "logger.sampling.interval" // Wallet. cfgWalletPassphrase = "wallet.passphrase" cfgWalletPath = "wallet.path" cfgWalletAddress = "wallet.address" // Uploader Header. cfgUploaderHeaderEnableDefaultTimestamp = "upload_header.use_default_timestamp" // Peers. cfgPeers = "peers" // NeoGo. cfgRPCEndpoint = "rpc_endpoint" // Resolving. cfgResolveOrder = "resolve_order" // Zip compression. cfgZipCompression = "zip.compression" // Runtime. cfgSoftMemoryLimit = "runtime.soft_memory_limit" // Enabling client side object preparing for PUT operations. cfgClientCut = "frostfs.client_cut" // Sets max buffer size for read payload in put operations. cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put" // Configuration of parameters of requests to FrostFS. // Sets max attempt to make successful tree request. cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts" // Caching. cfgBucketsCacheLifetime = "cache.buckets.lifetime" cfgBucketsCacheSize = "cache.buckets.size" // Bucket resolving options. cfgResolveNamespaceHeader = "resolve_bucket.namespace_header" cfgResolveDefaultNamespaces = "resolve_bucket.default_namespaces" // CORS. cfgCORSAllowOrigin = "cors.allow_origin" cfgCORSAllowMethods = "cors.allow_methods" cfgCORSAllowHeaders = "cors.allow_headers" cfgCORSExposeHeaders = "cors.expose_headers" cfgCORSAllowCredentials = "cors.allow_credentials" cfgCORSMaxAge = "cors.max_age" // Multinet. cfgMultinetEnabled = "multinet.enabled" cfgMultinetBalancer = "multinet.balancer" cfgMultinetRestrict = "multinet.restrict" cfgMultinetFallbackDelay = "multinet.fallback_delay" cfgMultinetSubnets = "multinet.subnets" // Kludge. cfgKludgeAdditionalSearch = "kludge.additional_search" // Command line args. cmdHelp = "help" cmdVersion = "version" cmdPprof = "pprof" cmdMetrics = "metrics" cmdWallet = "wallet" cmdAddress = "address" cmdConfig = "config" cmdConfigDir = "config-dir" cmdListenAddress = "listen_address" ) var ignore = map[string]struct{}{ cfgPeers: {}, cmdHelp: {}, cmdVersion: {}, } type Logger struct { logger *zap.Logger lvl zap.AtomicLevel } func settings() *viper.Viper { v := viper.New() v.AutomaticEnv() v.SetEnvPrefix(Prefix) v.AllowEmptyEnv(true) v.SetConfigType("yaml") v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) // flags setup: flags := pflag.NewFlagSet("commandline", pflag.ExitOnError) flags.SetOutput(os.Stdout) flags.SortFlags = false flags.Bool(cmdPprof, false, "enable pprof") flags.Bool(cmdMetrics, false, "enable prometheus") help := flags.BoolP(cmdHelp, "h", false, "show help") version := flags.BoolP(cmdVersion, "v", false, "show version") flags.StringP(cmdWallet, "w", "", `path to the wallet`) flags.String(cmdAddress, "", `address of wallet account`) flags.StringArray(cmdConfig, nil, "config paths") flags.String(cmdConfigDir, "", "config dir path") flags.Duration(cfgConTimeout, defaultConnectTimeout, "gRPC connect timeout") flags.Duration(cfgStreamTimeout, defaultStreamTimeout, "gRPC individual message timeout") flags.Duration(cfgReqTimeout, defaultRequestTimeout, "gRPC request timeout") flags.Duration(cfgRebalance, defaultRebalanceTimer, "gRPC connection rebalance timer") flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen") flags.String(cfgTLSCertFile, "", "TLS certificate path") flags.String(cfgTLSKeyFile, "", "TLS key path") peers := flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes") resolveMethods := flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order") // set defaults: // logger: v.SetDefault(cfgLoggerLevel, "debug") v.SetDefault(cfgLoggerDestination, "stdout") v.SetDefault(cfgLoggerSamplingEnabled, false) v.SetDefault(cfgLoggerSamplingThereafter, 100) v.SetDefault(cfgLoggerSamplingInitial, 100) v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval) // pool: v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold) // frostfs: v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut) // web-server: v.SetDefault(cfgWebReadBufferSize, 4096) v.SetDefault(cfgWebWriteBufferSize, 4096) v.SetDefault(cfgWebReadTimeout, time.Minute*10) v.SetDefault(cfgWebWriteTimeout, time.Minute*5) v.SetDefault(cfgWebStreamRequestBody, true) v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize) v.SetDefault(cfgWorkerPoolSize, 1000) // upload header v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false) // zip: v.SetDefault(cfgZipCompression, false) // metrics v.SetDefault(cfgPprofAddress, "localhost:8083") v.SetDefault(cfgPrometheusAddress, "localhost:8084") // resolve bucket v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader) v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"}) // multinet v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay) // Binding flags if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil { panic(err) } if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil { panic(err) } if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil { panic(err) } if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil { panic(err) } if err := v.BindPFlags(flags); err != nil { panic(err) } if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil { panic(err) } if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil { panic(err) } if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil { panic(err) } if err := flags.Parse(os.Args); err != nil { panic(err) } if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) { v.Set(cfgServer+".0."+cfgTLSEnabled, true) } if resolveMethods != nil { v.SetDefault(cfgResolveOrder, *resolveMethods) } switch { case help != nil && *help: fmt.Printf("FrostFS HTTP Gateway %s\n", Version) flags.PrintDefaults() fmt.Println() fmt.Println("Default environments:") fmt.Println() keys := v.AllKeys() sort.Strings(keys) for i := range keys { if _, ok := ignore[keys[i]]; ok { continue } defaultValue := v.GetString(keys[i]) if len(defaultValue) == 0 { continue } k := strings.Replace(keys[i], ".", "_", -1) fmt.Printf("%s_%s = %s\n", Prefix, strings.ToUpper(k), defaultValue) } fmt.Println() fmt.Println("Peers preset:") fmt.Println() fmt.Printf("%s_%s_[N]_ADDRESS = string\n", Prefix, strings.ToUpper(cfgPeers)) fmt.Printf("%s_%s_[N]_WEIGHT = float\n", Prefix, strings.ToUpper(cfgPeers)) os.Exit(0) case version != nil && *version: fmt.Printf("FrostFS HTTP Gateway\nVersion: %s\nGoVersion: %s\n", Version, runtime.Version()) os.Exit(0) } if err := readInConfig(v); err != nil { panic(err) } if peers != nil && len(*peers) > 0 { for i := range *peers { v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", (*peers)[i]) v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1) v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1) } } return v } func readInConfig(v *viper.Viper) error { if v.IsSet(cmdConfig) { if err := readConfig(v); err != nil { return err } } if v.IsSet(cmdConfigDir) { if err := readConfigDir(v); err != nil { return err } } return nil } func readConfigDir(v *viper.Viper) error { cfgSubConfigDir := v.GetString(cmdConfigDir) entries, err := os.ReadDir(cfgSubConfigDir) if err != nil { return err } for _, entry := range entries { if entry.IsDir() { continue } ext := path.Ext(entry.Name()) if ext != ".yaml" && ext != ".yml" { continue } if err = mergeConfig(v, path.Join(cfgSubConfigDir, entry.Name())); err != nil { return err } } return nil } func readConfig(v *viper.Viper) error { for _, fileName := range v.GetStringSlice(cmdConfig) { if err := mergeConfig(v, fileName); err != nil { return err } } return nil } func mergeConfig(v *viper.Viper, fileName string) error { cfgFile, err := os.Open(fileName) if err != nil { return err } defer func() { if errClose := cfgFile.Close(); errClose != nil { panic(errClose) } }() return v.MergeConfig(cfgFile) } type LoggerAppSettings interface { DroppedLogsInc() } func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger { lvl, err := getLogLevel(v) if err != nil { panic(err) } dest := v.GetString(cfgLoggerDestination) switch dest { case destinationStdout: return newStdoutLogger(v, lvl, settings) case destinationJournald: return newJournaldLogger(v, lvl, settings) default: panic(fmt.Sprintf("wrong destination for logger: %s", dest)) } } // newStdoutLogger constructs a zap.Logger instance for current application. // Panics on failure. // // Logger is built from zap's production logging configuration with: // - parameterized level (debug by default) // - console encoding // - ISO8601 time encoding // // Logger records a stack trace for all messages at or above fatal level. // // See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace. func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger { stdout := zapcore.AddSync(os.Stderr) level := zap.NewAtomicLevelAt(lvl) consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level) consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings) return &Logger{ logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))), lvl: level, } } func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger { level := zap.NewAtomicLevelAt(lvl) encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields) core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields) coreWithContext := core.With([]zapcore.Field{ zapjournald.SyslogFacility(zapjournald.LogDaemon), zapjournald.SyslogIdentifier(), zapjournald.SyslogPid(), }) coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings) return &Logger{ logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))), lvl: level, } } func newLogEncoder() zapcore.Encoder { c := zap.NewProductionEncoderConfig() c.EncodeTime = zapcore.ISO8601TimeEncoder return zapcore.NewConsoleEncoder(c) } func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core { if v.GetBool(cfgLoggerSamplingEnabled) { core = zapcore.NewSamplerWithOptions(core, v.GetDuration(cfgLoggerSamplingInterval), v.GetInt(cfgLoggerSamplingInitial), v.GetInt(cfgLoggerSamplingThereafter), zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) { if dec&zapcore.LogDropped > 0 { settings.DroppedLogsInc() } })) } return core } func getLogLevel(v *viper.Viper) (zapcore.Level, error) { var lvl zapcore.Level lvlStr := v.GetString(cfgLoggerLevel) err := lvl.UnmarshalText([]byte(lvlStr)) if err != nil { return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+ "value should be one of %v", lvlStr, err, [...]zapcore.Level{ zapcore.DebugLevel, zapcore.InfoLevel, zapcore.WarnLevel, zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel, }) } return lvl, nil } func fetchReconnectInterval(cfg *viper.Viper) time.Duration { reconnect := cfg.GetDuration(cfgReconnectInterval) if reconnect <= 0 { reconnect = defaultReconnectInterval } return reconnect } func fetchIndexPageTemplate(v *viper.Viper, l *zap.Logger) (string, bool) { if !v.GetBool(cfgIndexPageEnabled) { return "", false } reader, err := os.Open(v.GetString(cfgIndexPageTemplatePath)) if err != nil { l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err)) return "", true } tmpl, err := io.ReadAll(reader) if err != nil { l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err)) return "", true } l.Info(logs.SetCustomIndexPageTemplate) return string(tmpl), true } func fetchDefaultNamespaces(v *viper.Viper) []string { namespaces := v.GetStringSlice(cfgResolveDefaultNamespaces) for i := range namespaces { // to be set namespaces in env variable as `HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"` namespaces[i] = strings.Trim(namespaces[i], "\"") } return namespaces } func fetchCORSMaxAge(v *viper.Viper) int { maxAge := v.GetInt(cfgCORSMaxAge) if maxAge <= 0 { maxAge = defaultCORSMaxAge } return maxAge } func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo { var servers []ServerInfo seen := make(map[string]struct{}) for i := 0; ; i++ { key := cfgServer + "." + strconv.Itoa(i) + "." var serverInfo ServerInfo serverInfo.Address = v.GetString(key + "address") serverInfo.TLS.Enabled = v.GetBool(key + cfgTLSEnabled) serverInfo.TLS.KeyFile = v.GetString(key + cfgTLSKeyFile) serverInfo.TLS.CertFile = v.GetString(key + cfgTLSCertFile) if serverInfo.Address == "" { break } if _, ok := seen[serverInfo.Address]; ok { log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address)) continue } seen[serverInfo.Address] = struct{}{} servers = append(servers, serverInfo) } return servers } func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) { key, err := getFrostFSKey(cfg, logger) if err != nil { logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err)) } var prm pool.InitParameters var prmTree treepool.InitParameters prm.SetKey(&key.PrivateKey) prmTree.SetKey(key) logger.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes()))) for _, peer := range fetchPeers(logger, cfg) { prm.AddNode(peer) prmTree.AddNode(peer) } connTimeout := cfg.GetDuration(cfgConTimeout) if connTimeout <= 0 { connTimeout = defaultConnectTimeout } prm.SetNodeDialTimeout(connTimeout) prmTree.SetNodeDialTimeout(connTimeout) streamTimeout := cfg.GetDuration(cfgStreamTimeout) if streamTimeout <= 0 { streamTimeout = defaultStreamTimeout } prm.SetNodeStreamTimeout(streamTimeout) prmTree.SetNodeStreamTimeout(streamTimeout) healthCheckTimeout := cfg.GetDuration(cfgReqTimeout) if healthCheckTimeout <= 0 { healthCheckTimeout = defaultRequestTimeout } prm.SetHealthcheckTimeout(healthCheckTimeout) prmTree.SetHealthcheckTimeout(healthCheckTimeout) rebalanceInterval := cfg.GetDuration(cfgRebalance) if rebalanceInterval <= 0 { rebalanceInterval = defaultRebalanceTimer } prm.SetClientRebalanceInterval(rebalanceInterval) prmTree.SetClientRebalanceInterval(rebalanceInterval) errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold) if errorThreshold <= 0 { errorThreshold = defaultPoolErrorThreshold } prm.SetErrorThreshold(errorThreshold) prm.SetLogger(logger) prmTree.SetLogger(logger) prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts)) interceptors := []grpc.DialOption{ grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()), grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()), grpc.WithContextDialer(dialSource.GrpcContextDialer()), } prm.SetGRPCDialOptions(interceptors...) prmTree.SetGRPCDialOptions(interceptors...) p, err := pool.NewPool(prm) if err != nil { logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err)) } if err = p.Dial(ctx); err != nil { logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err)) } treePool, err := treepool.NewPool(prmTree) if err != nil { logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err)) } if err = treePool.Dial(ctx); err != nil { logger.Fatal(logs.FailedToDialTreePool, zap.Error(err)) } return p, treePool, key } func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam { var nodes []pool.NodeParam for i := 0; ; i++ { key := cfgPeers + "." + strconv.Itoa(i) + "." address := v.GetString(key + "address") weight := v.GetFloat64(key + "weight") priority := v.GetInt(key + "priority") if address == "" { break } if weight <= 0 { // unspecified or wrong weight = 1 } if priority <= 0 { // unspecified or wrong priority = 1 } nodes = append(nodes, pool.NewNodeParam(priority, address, weight)) l.Info(logs.AddedStoragePeer, zap.Int("priority", priority), zap.String("address", address), zap.Float64("weight", weight)) } return nodes } func fetchSoftMemoryLimit(cfg *viper.Viper) int64 { softMemoryLimit := cfg.GetSizeInBytes(cfgSoftMemoryLimit) if softMemoryLimit <= 0 { softMemoryLimit = defaultSoftMemoryLimit } return int64(softMemoryLimit) } func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config { cacheCfg := cache.DefaultBucketConfig(l) cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime) cacheCfg.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Size) return cacheCfg } func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration { if v.IsSet(cfgEntry) { lifetime := v.GetDuration(cfgEntry) if lifetime <= 0 { l.Error(logs.InvalidLifetimeUsingDefaultValue, zap.String("parameter", cfgEntry), zap.Duration("value in config", lifetime), zap.Duration("default", defaultValue)) } else { return lifetime } } return defaultValue } func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int { if v.IsSet(cfgEntry) { size := v.GetInt(cfgEntry) if size <= 0 { l.Error(logs.InvalidCacheSizeUsingDefaultValue, zap.String("parameter", cfgEntry), zap.Int("value in config", size), zap.Int("default", defaultValue)) } else { return size } } return defaultValue } func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource { source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger)) if err != nil { logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err)) } return source } func fetchMultinetConfig(v *viper.Viper, l *zap.Logger) (cfg internalnet.Config) { cfg.Enabled = v.GetBool(cfgMultinetEnabled) cfg.Balancer = v.GetString(cfgMultinetBalancer) cfg.Restrict = v.GetBool(cfgMultinetRestrict) cfg.FallbackDelay = v.GetDuration(cfgMultinetFallbackDelay) cfg.Subnets = make([]internalnet.Subnet, 0, 5) cfg.EventHandler = internalnet.NewLogEventHandler(l) for i := 0; ; i++ { key := cfgMultinetSubnets + "." + strconv.Itoa(i) + "." subnet := internalnet.Subnet{} subnet.Prefix = v.GetString(key + "mask") if subnet.Prefix == "" { break } subnet.SourceIPs = v.GetStringSlice(key + "source_ips") cfg.Subnets = append(cfg.Subnets, subnet) } return } func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) { attributes := make(map[string]string) for i := 0; ; i++ { key := cfgTracingAttributes + "." + strconv.Itoa(i) + "." attrKey := v.GetString(key + "key") attrValue := v.GetString(key + "value") if attrKey == "" { break } if _, ok := attributes[attrKey]; ok { return nil, fmt.Errorf("tracing attribute key %s defined more than once", attrKey) } if attrValue == "" { return nil, fmt.Errorf("empty tracing attribute value for key %s", attrKey) } attributes[attrKey] = attrValue } return attributes, nil }