package main import ( "context" "encoding/hex" "fmt" "math" "os" "path" "runtime" "sort" "strconv" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree" "git.frostfs.info/TrueCloudLab/zapjournald" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/spf13/pflag" "github.com/spf13/viper" "github.com/ssgreg/journald" "github.com/valyala/fasthttp" "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" ) const ( destinationStdout = "stdout" destinationJournald = "journald" ) const ( defaultRebalanceTimer = 60 * time.Second defaultRequestTimeout = 15 * time.Second defaultConnectTimeout = 10 * time.Second defaultStreamTimeout = 10 * time.Second defaultLoggerSamplerInterval = 1 * time.Second defaultShutdownTimeout = 15 * time.Second defaultPoolErrorThreshold uint32 = 100 defaultSoftMemoryLimit = math.MaxInt64 defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb defaultNamespaceHeader = "X-Frostfs-Namespace" defaultReconnectInterval = time.Minute cfgServer = "server" cfgTLSEnabled = "tls.enabled" cfgTLSCertFile = "tls.cert_file" cfgTLSKeyFile = "tls.key_file" cfgReconnectInterval = "reconnect_interval" // Web. cfgWebReadBufferSize = "web.read_buffer_size" cfgWebWriteBufferSize = "web.write_buffer_size" cfgWebReadTimeout = "web.read_timeout" cfgWebWriteTimeout = "web.write_timeout" cfgWebStreamRequestBody = "web.stream_request_body" cfgWebMaxRequestBodySize = "web.max_request_body_size" // Metrics / Profiler. cfgPrometheusEnabled = "prometheus.enabled" cfgPrometheusAddress = "prometheus.address" cfgPprofEnabled = "pprof.enabled" cfgPprofAddress = "pprof.address" // Tracing ... cfgTracingEnabled = "tracing.enabled" cfgTracingExporter = "tracing.exporter" cfgTracingEndpoint = "tracing.endpoint" cfgTracingTrustedCa = "tracing.trusted_ca" // Pool config. cfgConTimeout = "connect_timeout" cfgStreamTimeout = "stream_timeout" cfgReqTimeout = "request_timeout" cfgRebalance = "rebalance_timer" cfgPoolErrorThreshold = "pool_error_threshold" // Logger. cfgLoggerLevel = "logger.level" cfgLoggerDestination = "logger.destination" cfgLoggerSamplingEnabled = "logger.sampling.enabled" cfgLoggerSamplingInitial = "logger.sampling.initial" cfgLoggerSamplingThereafter = "logger.sampling.thereafter" cfgLoggerSamplingInterval = "logger.sampling.interval" // Wallet. cfgWalletPassphrase = "wallet.passphrase" cfgWalletPath = "wallet.path" cfgWalletAddress = "wallet.address" // Uploader Header. cfgUploaderHeaderEnableDefaultTimestamp = "upload_header.use_default_timestamp" // Peers. cfgPeers = "peers" // NeoGo. cfgRPCEndpoint = "rpc_endpoint" // Resolving. cfgResolveOrder = "resolve_order" // Zip compression. cfgZipCompression = "zip.compression" // Runtime. cfgSoftMemoryLimit = "runtime.soft_memory_limit" // Enabling client side object preparing for PUT operations. cfgClientCut = "frostfs.client_cut" // Sets max buffer size for read payload in put operations. cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put" // Configuration of parameters of requests to FrostFS. // Sets max attempt to make successful tree request. cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts" // Caching. cfgBucketsCacheLifetime = "cache.buckets.lifetime" cfgBucketsCacheSize = "cache.buckets.size" // Bucket resolving options. cfgResolveNamespaceHeader = "resolve_bucket.namespace_header" cfgResolveDefaultNamespaces = "resolve_bucket.default_namespaces" // Command line args. cmdHelp = "help" cmdVersion = "version" cmdPprof = "pprof" cmdMetrics = "metrics" cmdWallet = "wallet" cmdAddress = "address" cmdConfig = "config" cmdConfigDir = "config-dir" cmdListenAddress = "listen_address" ) var ignore = map[string]struct{}{ cfgPeers: {}, cmdHelp: {}, cmdVersion: {}, } func settings() *viper.Viper { v := viper.New() v.AutomaticEnv() v.SetEnvPrefix(Prefix) v.AllowEmptyEnv(true) v.SetConfigType("yaml") v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) // flags setup: flags := pflag.NewFlagSet("commandline", pflag.ExitOnError) flags.SetOutput(os.Stdout) flags.SortFlags = false flags.Bool(cmdPprof, false, "enable pprof") flags.Bool(cmdMetrics, false, "enable prometheus") help := flags.BoolP(cmdHelp, "h", false, "show help") version := flags.BoolP(cmdVersion, "v", false, "show version") flags.StringP(cmdWallet, "w", "", `path to the wallet`) flags.String(cmdAddress, "", `address of wallet account`) flags.StringArray(cmdConfig, nil, "config paths") flags.String(cmdConfigDir, "", "config dir path") flags.Duration(cfgConTimeout, defaultConnectTimeout, "gRPC connect timeout") flags.Duration(cfgStreamTimeout, defaultStreamTimeout, "gRPC individual message timeout") flags.Duration(cfgReqTimeout, defaultRequestTimeout, "gRPC request timeout") flags.Duration(cfgRebalance, defaultRebalanceTimer, "gRPC connection rebalance timer") flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen") flags.String(cfgTLSCertFile, "", "TLS certificate path") flags.String(cfgTLSKeyFile, "", "TLS key path") peers := flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes") resolveMethods := flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order") // set defaults: // logger: v.SetDefault(cfgLoggerLevel, "debug") v.SetDefault(cfgLoggerDestination, "stdout") v.SetDefault(cfgLoggerSamplingEnabled, false) v.SetDefault(cfgLoggerSamplingThereafter, 100) v.SetDefault(cfgLoggerSamplingInitial, 100) v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval) // pool: v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold) // frostfs: v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut) // web-server: v.SetDefault(cfgWebReadBufferSize, 4096) v.SetDefault(cfgWebWriteBufferSize, 4096) v.SetDefault(cfgWebReadTimeout, time.Minute*10) v.SetDefault(cfgWebWriteTimeout, time.Minute*5) v.SetDefault(cfgWebStreamRequestBody, true) v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize) // upload header v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false) // zip: v.SetDefault(cfgZipCompression, false) // metrics v.SetDefault(cfgPprofAddress, "localhost:8083") v.SetDefault(cfgPrometheusAddress, "localhost:8084") // resolve bucket v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader) v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"}) // Binding flags if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil { panic(err) } if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil { panic(err) } if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil { panic(err) } if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil { panic(err) } if err := v.BindPFlags(flags); err != nil { panic(err) } if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil { panic(err) } if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil { panic(err) } if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil { panic(err) } if err := flags.Parse(os.Args); err != nil { panic(err) } if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) { v.Set(cfgServer+".0."+cfgTLSEnabled, true) } if resolveMethods != nil { v.SetDefault(cfgResolveOrder, *resolveMethods) } switch { case help != nil && *help: fmt.Printf("FrostFS HTTP Gateway %s\n", Version) flags.PrintDefaults() fmt.Println() fmt.Println("Default environments:") fmt.Println() keys := v.AllKeys() sort.Strings(keys) for i := range keys { if _, ok := ignore[keys[i]]; ok { continue } defaultValue := v.GetString(keys[i]) if len(defaultValue) == 0 { continue } k := strings.Replace(keys[i], ".", "_", -1) fmt.Printf("%s_%s = %s\n", Prefix, strings.ToUpper(k), defaultValue) } fmt.Println() fmt.Println("Peers preset:") fmt.Println() fmt.Printf("%s_%s_[N]_ADDRESS = string\n", Prefix, strings.ToUpper(cfgPeers)) fmt.Printf("%s_%s_[N]_WEIGHT = float\n", Prefix, strings.ToUpper(cfgPeers)) os.Exit(0) case version != nil && *version: fmt.Printf("FrostFS HTTP Gateway\nVersion: %s\nGoVersion: %s\n", Version, runtime.Version()) os.Exit(0) } if err := readInConfig(v); err != nil { panic(err) } if peers != nil && len(*peers) > 0 { for i := range *peers { v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", (*peers)[i]) v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1) v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1) } } return v } func readInConfig(v *viper.Viper) error { if v.IsSet(cmdConfig) { if err := readConfig(v); err != nil { return err } } if v.IsSet(cmdConfigDir) { if err := readConfigDir(v); err != nil { return err } } return nil } func readConfigDir(v *viper.Viper) error { cfgSubConfigDir := v.GetString(cmdConfigDir) entries, err := os.ReadDir(cfgSubConfigDir) if err != nil { return err } for _, entry := range entries { if entry.IsDir() { continue } ext := path.Ext(entry.Name()) if ext != ".yaml" && ext != ".yml" { continue } if err = mergeConfig(v, path.Join(cfgSubConfigDir, entry.Name())); err != nil { return err } } return nil } func readConfig(v *viper.Viper) error { for _, fileName := range v.GetStringSlice(cmdConfig) { if err := mergeConfig(v, fileName); err != nil { return err } } return nil } func mergeConfig(v *viper.Viper, fileName string) error { cfgFile, err := os.Open(fileName) if err != nil { return err } defer func() { if errClose := cfgFile.Close(); errClose != nil { panic(errClose) } }() return v.MergeConfig(cfgFile) } func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) { lvl, err := getLogLevel(v) if err != nil { panic(err) } dest := v.GetString(cfgLoggerDestination) switch dest { case destinationStdout: return newStdoutLogger(v, lvl) case destinationJournald: return newJournaldLogger(v, lvl) default: panic(fmt.Sprintf("wrong destination for logger: %s", dest)) } } // newStdoutLogger constructs a zap.Logger instance for current application. // Panics on failure. // // Logger is built from zap's production logging configuration with: // - parameterized level (debug by default) // - console encoding // - ISO8601 time encoding // // Logger records a stack trace for all messages at or above fatal level. // // See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace. func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) { stdout := zapcore.AddSync(os.Stderr) level := zap.NewAtomicLevelAt(lvl) consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level) consoleOutCore = samplingEnabling(v, consoleOutCore) l := zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))) return l, level } func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) { level := zap.NewAtomicLevelAt(lvl) encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields) core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields) coreWithContext := core.With([]zapcore.Field{ zapjournald.SyslogFacility(zapjournald.LogDaemon), zapjournald.SyslogIdentifier(), zapjournald.SyslogPid(), }) coreWithContext = samplingEnabling(v, coreWithContext) l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))) return l, level } func newLogEncoder() zapcore.Encoder { c := zap.NewProductionEncoderConfig() c.EncodeTime = zapcore.ISO8601TimeEncoder return zapcore.NewConsoleEncoder(c) } func samplingEnabling(v *viper.Viper, core zapcore.Core) zapcore.Core { // Zap samples by logging the first cgfLoggerSamplingInitial entries with a given level // and message within the specified time interval. // In the above config, only the first cgfLoggerSamplingInitial log entries with the same level and message // are recorded in cfgLoggerSamplingInterval interval. Every other log entry will be dropped within the interval since // cfgLoggerSamplingThereafter is specified here. if v.GetBool(cfgLoggerSamplingEnabled) { core = zapcore.NewSamplerWithOptions( core, v.GetDuration(cfgLoggerSamplingInterval), v.GetInt(cfgLoggerSamplingInitial), v.GetInt(cfgLoggerSamplingThereafter), ) } return core } func getLogLevel(v *viper.Viper) (zapcore.Level, error) { var lvl zapcore.Level lvlStr := v.GetString(cfgLoggerLevel) err := lvl.UnmarshalText([]byte(lvlStr)) if err != nil { return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+ "value should be one of %v", lvlStr, err, [...]zapcore.Level{ zapcore.DebugLevel, zapcore.InfoLevel, zapcore.WarnLevel, zapcore.ErrorLevel, zapcore.DPanicLevel, zapcore.PanicLevel, zapcore.FatalLevel, }) } return lvl, nil } func fetchReconnectInterval(cfg *viper.Viper) time.Duration { reconnect := cfg.GetDuration(cfgReconnectInterval) if reconnect <= 0 { reconnect = defaultReconnectInterval } return reconnect } func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo { var servers []ServerInfo seen := make(map[string]struct{}) for i := 0; ; i++ { key := cfgServer + "." + strconv.Itoa(i) + "." var serverInfo ServerInfo serverInfo.Address = v.GetString(key + "address") serverInfo.TLS.Enabled = v.GetBool(key + cfgTLSEnabled) serverInfo.TLS.KeyFile = v.GetString(key + cfgTLSKeyFile) serverInfo.TLS.CertFile = v.GetString(key + cfgTLSCertFile) if serverInfo.Address == "" { break } if _, ok := seen[serverInfo.Address]; ok { log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address)) continue } seen[serverInfo.Address] = struct{}{} servers = append(servers, serverInfo) } return servers } func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) { key, err := getFrostFSKey(cfg, logger) if err != nil { logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err)) } var prm pool.InitParameters var prmTree treepool.InitParameters prm.SetKey(&key.PrivateKey) prmTree.SetKey(key) logger.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes()))) for _, peer := range fetchPeers(logger, cfg) { prm.AddNode(peer) prmTree.AddNode(peer) } connTimeout := cfg.GetDuration(cfgConTimeout) if connTimeout <= 0 { connTimeout = defaultConnectTimeout } prm.SetNodeDialTimeout(connTimeout) prmTree.SetNodeDialTimeout(connTimeout) streamTimeout := cfg.GetDuration(cfgStreamTimeout) if streamTimeout <= 0 { streamTimeout = defaultStreamTimeout } prm.SetNodeStreamTimeout(streamTimeout) prmTree.SetNodeStreamTimeout(streamTimeout) healthCheckTimeout := cfg.GetDuration(cfgReqTimeout) if healthCheckTimeout <= 0 { healthCheckTimeout = defaultRequestTimeout } prm.SetHealthcheckTimeout(healthCheckTimeout) prmTree.SetHealthcheckTimeout(healthCheckTimeout) rebalanceInterval := cfg.GetDuration(cfgRebalance) if rebalanceInterval <= 0 { rebalanceInterval = defaultRebalanceTimer } prm.SetClientRebalanceInterval(rebalanceInterval) prmTree.SetClientRebalanceInterval(rebalanceInterval) errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold) if errorThreshold <= 0 { errorThreshold = defaultPoolErrorThreshold } prm.SetErrorThreshold(errorThreshold) prm.SetLogger(logger) prmTree.SetLogger(logger) prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts)) var apiGRPCDialOpts []grpc.DialOption var treeGRPCDialOpts []grpc.DialOption if cfg.GetBool(cfgTracingEnabled) { interceptors := []grpc.DialOption{ grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()), grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()), } treeGRPCDialOpts = append(treeGRPCDialOpts, interceptors...) apiGRPCDialOpts = append(apiGRPCDialOpts, interceptors...) } prm.SetGRPCDialOptions(apiGRPCDialOpts...) prmTree.SetGRPCDialOptions(treeGRPCDialOpts...) p, err := pool.NewPool(prm) if err != nil { logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err)) } if err = p.Dial(ctx); err != nil { logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err)) } treePool, err := treepool.NewPool(prmTree) if err != nil { logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err)) } if err = treePool.Dial(ctx); err != nil { logger.Fatal(logs.FailedToDialTreePool, zap.Error(err)) } return p, treePool, key } func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam { var nodes []pool.NodeParam for i := 0; ; i++ { key := cfgPeers + "." + strconv.Itoa(i) + "." address := v.GetString(key + "address") weight := v.GetFloat64(key + "weight") priority := v.GetInt(key + "priority") if address == "" { break } if weight <= 0 { // unspecified or wrong weight = 1 } if priority <= 0 { // unspecified or wrong priority = 1 } nodes = append(nodes, pool.NewNodeParam(priority, address, weight)) l.Info(logs.AddedStoragePeer, zap.Int("priority", priority), zap.String("address", address), zap.Float64("weight", weight)) } return nodes } func fetchSoftMemoryLimit(cfg *viper.Viper) int64 { softMemoryLimit := cfg.GetSizeInBytes(cfgSoftMemoryLimit) if softMemoryLimit <= 0 { softMemoryLimit = defaultSoftMemoryLimit } return int64(softMemoryLimit) } func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config { cacheCfg := cache.DefaultBucketConfig(l) cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime) cacheCfg.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Size) return cacheCfg } func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration { if v.IsSet(cfgEntry) { lifetime := v.GetDuration(cfgEntry) if lifetime <= 0 { l.Error(logs.InvalidLifetimeUsingDefaultValue, zap.String("parameter", cfgEntry), zap.Duration("value in config", lifetime), zap.Duration("default", defaultValue)) } else { return lifetime } } return defaultValue } func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int { if v.IsSet(cfgEntry) { size := v.GetInt(cfgEntry) if size <= 0 { l.Error(logs.InvalidCacheSizeUsingDefaultValue, zap.String("parameter", cfgEntry), zap.Int("value in config", size), zap.Int("default", defaultValue)) } else { return size } } return defaultValue }