package main import ( "fmt" "os" "path" "runtime" "strings" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/credential/walletsource" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/lifecycle" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" neogoflags "github.com/nspcc-dev/neo-go/cli/flags" "github.com/nspcc-dev/neo-go/cli/input" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/wallet" "github.com/spf13/pflag" "github.com/spf13/viper" "go.uber.org/zap" ) const ( // Wallet. cfgWalletPath = "wallet.path" cfgWalletAddress = "wallet.address" cfgWalletPassphrase = "wallet.passphrase" // Metrics. cfgPrometheusEnabled = "prometheus.enabled" cfgPrometheusAddress = "prometheus.address" cfgPprofEnabled = "pprof.enabled" cfgPprofAddress = "pprof.address" // Logger. cfgLoggerLevel = "logger.level" cfgLoggerDestination = "logger.destination" cfgLoggerSamplingEnabled = "logger.sampling.enabled" cfgLoggerSamplingInitial = "logger.sampling.initial" cfgLoggerSamplingThereafter = "logger.sampling.thereafter" cfgLoggerSamplingInterval = "logger.sampling.interval" cfgLoggerTags = "logger.tags" // Morph. cfgMorphRPCEndpointPrefixTmpl = "morph.rpc_endpoint.%d." cfgMorphRPCEndpointAddressTmpl = cfgMorphRPCEndpointPrefixTmpl + "address" cfgMorphRPCEndpointPriorityTmpl = cfgMorphRPCEndpointPrefixTmpl + "priority" cfgMorphRPCEndpointTrustedCAListTmpl = cfgMorphRPCEndpointPrefixTmpl + "trusted_ca_list" cfgMorphRPCEndpointCertificateTmpl = cfgMorphRPCEndpointPrefixTmpl + "certificate" cfgMorphRPCEndpointKeyTmpl = cfgMorphRPCEndpointPrefixTmpl + "key" cfgMorphReconnectClientsInterval = "morph.reconnect_clients_interval" cfgMorphDialTimeout = "morph.dial_timeout" cfgMorphContractNetmap = "morph.contract.netmap" cfgMorphContractFrostfsID = "morph.contract.frostfsid" cfgMorphContractContainer = "morph.contract.container" // Credential source. cfgCredentialUse = "credential.use" cfgCredentialSourceWalletsPrefixTmpl = "credential.source.wallets.%d." cfgCredentialSourceWalletsPathTmpl = cfgCredentialSourceWalletsPrefixTmpl + "path" cfgCredentialSourceWalletsAddressTmpl = cfgCredentialSourceWalletsPrefixTmpl + "address" cfgCredentialSourceWalletsPassphraseTmpl = cfgCredentialSourceWalletsPrefixTmpl + "passphrase" // FrostFS. cfgFrostFSConnectTimeout = "frostfs.connect_timeout" cfgFrostFSStreamTimeout = "frostfs.stream_timeout" cfgFrostFSHealthcheckTimeout = "frostfs.healthcheck_timeout" cfgFrostFSRebalanceInterval = "frostfs.rebalance_interval" cfgFrostFSPoolErrorThreshold = "frostfs.pool_error_threshold" cfgFrostFSTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts" cfgFrostFSPeersPrefixTmpl = "frostfs.peers.%d." cfgFrostFSPeersAddressTmpl = cfgFrostFSPeersPrefixTmpl + "address" cfgFrostFSPeersPriorityTmpl = cfgFrostFSPeersPrefixTmpl + "priority" cfgFrostFSPeersWeightTmpl = cfgFrostFSPeersPrefixTmpl + "weight" // Lifecycle. cfgLifecycleJobFetcherBuffer = "lifecycle.job_fetcher_buffer" cfgLifecycleExecutorPoolSize = "lifecycle.executor_pool_size" cfgLifecycleServices = "lifecycle.services" // Command line args. cmdHelp = "help" cmdVersion = "version" cmdConfig = "config" cmdConfigDir = "config-dir" ) const ( defaultShutdownTimeout = 15 * time.Second componentName = "frostfs-s3-lifecycler" defaultMorphRPCEndpointPriority = 1 defaultMorphReconnectClientsInterval = 30 * time.Second defaultMorphDialTimeout = 5 * time.Second defaultFrostFSRebalanceInterval = 60 * time.Second defaultFrostFSHealthcheckTimeout = 15 * time.Second defaultFrostFSConnectTimeout = 10 * time.Second defaultFrostFSStreamTimeout = 10 * time.Second defaultFrostFSPoolErrorThreshold uint32 = 100 defaultLifecycleJobFetcherBuffer = 1000 defaultLifecycleExecutorPoolSize = 100 ) func settings() *viper.Viper { v := viper.New() v.AutomaticEnv() v.SetEnvPrefix(Prefix) v.AllowEmptyEnv(true) v.SetConfigType("yaml") v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) // flags setup: flags := pflag.NewFlagSet("commandline", pflag.ExitOnError) flags.SetOutput(os.Stdout) flags.SortFlags = false help := flags.BoolP(cmdHelp, "h", false, "show help") version := flags.BoolP(cmdVersion, "v", false, "show version") flags.StringArrayP(cmdConfig, "c", nil, "config paths") flags.String(cmdConfigDir, "", "config dir path") // set defaults: // logger: v.SetDefault(cfgLoggerLevel, "info") v.SetDefault(cfgLoggerDestination, "stdout") v.SetDefault(cfgLoggerSamplingThereafter, 100) v.SetDefault(cfgLoggerSamplingInitial, 100) v.SetDefault(cfgLoggerSamplingInterval, time.Second) // services: v.SetDefault(cfgPrometheusEnabled, false) v.SetDefault(cfgPprofEnabled, false) // morph: v.SetDefault(cfgMorphReconnectClientsInterval, defaultMorphReconnectClientsInterval) v.SetDefault(cfgMorphDialTimeout, defaultMorphDialTimeout) v.SetDefault(cfgMorphContractNetmap, "netmap.frostfs") v.SetDefault(cfgMorphContractFrostfsID, "frostfsid.frostfs") v.SetDefault(cfgMorphContractContainer, "container.frostfs") // frostfs: v.SetDefault(cfgFrostFSConnectTimeout, defaultFrostFSConnectTimeout) v.SetDefault(cfgFrostFSRebalanceInterval, defaultFrostFSRebalanceInterval) v.SetDefault(cfgFrostFSHealthcheckTimeout, defaultFrostFSHealthcheckTimeout) v.SetDefault(cfgFrostFSStreamTimeout, defaultFrostFSStreamTimeout) // lifecycle: v.SetDefault(cfgLifecycleJobFetcherBuffer, defaultLifecycleJobFetcherBuffer) v.SetDefault(cfgLifecycleExecutorPoolSize, defaultLifecycleExecutorPoolSize) // Bind flags with configuration values. if err := v.BindPFlags(flags); err != nil { panic(err) } if err := flags.Parse(os.Args); err != nil { panic(err) } switch { case help != nil && *help: printVersion() flags.PrintDefaults() os.Exit(0) case version != nil && *version: printVersion() os.Exit(0) } if err := readInConfig(v); err != nil { panic(err) } return v } func readInConfig(v *viper.Viper) error { if v.IsSet(cmdConfig) { if err := readConfig(v); err != nil { return err } } if v.IsSet(cmdConfigDir) { if err := readConfigDir(v); err != nil { return err } } return nil } func readConfigDir(v *viper.Viper) error { cfgSubConfigDir := v.GetString(cmdConfigDir) entries, err := os.ReadDir(cfgSubConfigDir) if err != nil { return err } for _, entry := range entries { if entry.IsDir() { continue } ext := path.Ext(entry.Name()) if ext != ".yaml" && ext != ".yml" { continue } if err = mergeConfig(v, path.Join(cfgSubConfigDir, entry.Name())); err != nil { return err } } return nil } func readConfig(v *viper.Viper) error { for _, fileName := range v.GetStringSlice(cmdConfig) { if err := mergeConfig(v, fileName); err != nil { return err } } return nil } func mergeConfig(v *viper.Viper, fileName string) error { cfgFile, err := os.Open(fileName) if err != nil { return err } defer func() { if err2 := cfgFile.Close(); err2 != nil { panic(err2) } }() err = v.MergeConfig(cfgFile) return err } func printVersion() { fmt.Printf("%s\nVersion: %s\nGoVersion: %s\n", componentName, Version, runtime.Version()) } func fetchKey(v *viper.Viper) (*keys.PrivateKey, error) { var password *string if v.IsSet(cfgWalletPassphrase) { pwd := v.GetString(cfgWalletPassphrase) password = &pwd } walletPath := v.GetString(cfgWalletPath) if len(walletPath) == 0 { return nil, fmt.Errorf("wallet path must not be empty") } w, err := wallet.NewWalletFromFile(walletPath) if err != nil { return nil, fmt.Errorf("parse wallet: %w", err) } walletAddress := v.GetString(cfgWalletAddress) var addr util.Uint160 if len(walletAddress) == 0 { addr = w.GetChangeAddress() } else { addr, err = neogoflags.ParseAddress(walletAddress) if err != nil { return nil, fmt.Errorf("invalid address") } } acc := w.GetAccount(addr) if acc == nil { return nil, fmt.Errorf("couldn't find wallet account for %s", walletAddress) } if password == nil { pwd, err := input.ReadPassword(fmt.Sprintf("Enter password for %s > ", walletPath)) if err != nil { return nil, fmt.Errorf("couldn't read password") } password = &pwd } if err = acc.Decrypt(*password, w.Scrypt); err != nil { return nil, fmt.Errorf("couldn't decrypt account: %w", err) } return acc.PrivateKey(), nil } func fetchMorphEndpoints(v *viper.Viper, l *zap.Logger) []client.Endpoint { var res []client.Endpoint for i := 0; ; i++ { addr := v.GetString(fmt.Sprintf(cfgMorphRPCEndpointAddressTmpl, i)) if addr == "" { break } priority := v.GetInt(fmt.Sprintf(cfgMorphRPCEndpointPriorityTmpl, i)) if priority <= 0 { priority = defaultMorphRPCEndpointPriority } var mtlsConfig *client.MTLSConfig rootCAs := v.GetStringSlice(fmt.Sprintf(cfgMorphRPCEndpointTrustedCAListTmpl, i)) if len(rootCAs) != 0 { mtlsConfig = &client.MTLSConfig{ TrustedCAList: rootCAs, KeyFile: v.GetString(fmt.Sprintf(cfgMorphRPCEndpointKeyTmpl, i)), CertFile: v.GetString(fmt.Sprintf(cfgMorphRPCEndpointCertificateTmpl, i)), } } res = append(res, client.Endpoint{ Address: addr, Priority: priority, MTLSConfig: mtlsConfig, }) } if len(res) == 0 { l.Fatal(logs.NoMorphRPCEndpoints) } return res } func fetchWalletsCredentials(v *viper.Viper, l *zap.Logger) []walletsource.Wallet { var res []walletsource.Wallet for i := 0; ; i++ { walletPath := v.GetString(fmt.Sprintf(cfgCredentialSourceWalletsPathTmpl, i)) if walletPath == "" { break } res = append(res, walletsource.Wallet{ Path: walletPath, Address: v.GetString(fmt.Sprintf(cfgCredentialSourceWalletsAddressTmpl, i)), Passphrase: v.GetString(fmt.Sprintf(cfgCredentialSourceWalletsPassphraseTmpl, i)), }) } if len(res) == 0 { l.Fatal(logs.NoCredentialSourceWallets) } return res } func fetchPeers(v *viper.Viper, l *zap.Logger) []pool.NodeParam { var nodes []pool.NodeParam for i := 0; ; i++ { address := v.GetString(fmt.Sprintf(cfgFrostFSPeersAddressTmpl, i)) if address == "" { break } priority := v.GetInt(fmt.Sprintf(cfgFrostFSPeersPriorityTmpl, i)) if priority <= 0 { // unspecified or wrong priority = 1 } weight := v.GetFloat64(fmt.Sprintf(cfgFrostFSPeersWeightTmpl, i)) if weight <= 0 { // unspecified or wrong weight = 1 } nodes = append(nodes, pool.NewNodeParam(priority, address, weight)) l.Info(logs.AddedStoragePeer, zap.String("address", address), zap.Int("priority", priority), zap.Float64("weight", weight)) } return nodes } func fetchConnectTimeout(cfg *viper.Viper) time.Duration { connTimeout := cfg.GetDuration(cfgFrostFSConnectTimeout) if connTimeout <= 0 { connTimeout = defaultFrostFSConnectTimeout } return connTimeout } func fetchStreamTimeout(cfg *viper.Viper) time.Duration { streamTimeout := cfg.GetDuration(cfgFrostFSStreamTimeout) if streamTimeout <= 0 { streamTimeout = defaultFrostFSStreamTimeout } return streamTimeout } func fetchHealthCheckTimeout(cfg *viper.Viper) time.Duration { healthCheckTimeout := cfg.GetDuration(cfgFrostFSHealthcheckTimeout) if healthCheckTimeout <= 0 { healthCheckTimeout = defaultFrostFSHealthcheckTimeout } return healthCheckTimeout } func fetchRebalanceInterval(cfg *viper.Viper) time.Duration { rebalanceInterval := cfg.GetDuration(cfgFrostFSRebalanceInterval) if rebalanceInterval <= 0 { rebalanceInterval = defaultFrostFSRebalanceInterval } return rebalanceInterval } func fetchErrorThreshold(cfg *viper.Viper) uint32 { errorThreshold := cfg.GetUint32(cfgFrostFSPoolErrorThreshold) if errorThreshold <= 0 { errorThreshold = defaultFrostFSPoolErrorThreshold } return errorThreshold } func fetchJobFetcherBuffer(cfg *viper.Viper) int { bufferSize := cfg.GetInt(cfgLifecycleJobFetcherBuffer) if bufferSize <= 0 { bufferSize = defaultLifecycleJobFetcherBuffer } return bufferSize } func fetchExecutorPoolSize(cfg *viper.Viper) int { val := cfg.GetInt(cfgLifecycleExecutorPoolSize) if val <= 0 { val = defaultLifecycleExecutorPoolSize } return val } func fetchMorphReconnectClientsInterval(cfg *viper.Viper) time.Duration { val := cfg.GetDuration(cfgMorphReconnectClientsInterval) if val <= 0 { val = defaultMorphReconnectClientsInterval } return val } func fetchMorphDialTimeout(cfg *viper.Viper) time.Duration { val := cfg.GetDuration(cfgMorphDialTimeout) if val <= 0 { val = defaultMorphDialTimeout } return val } func fetchLifecycleServices(v *viper.Viper) (keys.PublicKeys, error) { configKeys := v.GetStringSlice(cfgLifecycleServices) result := make(keys.PublicKeys, 0, len(configKeys)) uniqKeys := make(map[string]struct{}, len(configKeys)) for _, configKey := range configKeys { if _, ok := uniqKeys[configKey]; ok { continue } k, err := keys.NewPublicKeyFromString(configKey) if err != nil { return nil, fmt.Errorf("key '%s': %w", configKey, err) } result = append(result, k) uniqKeys[configKey] = struct{}{} } return result, nil } func fetchCredentialSource(v *viper.Viper, l *zap.Logger) lifecycle.CredentialSource { credUse := v.GetString(cfgCredentialUse) var ( err error credSource lifecycle.CredentialSource ) switch credUse { case "wallets": if credSource, err = walletsource.New(fetchWalletsCredentials(v, l)); err != nil { l.Fatal(logs.CouldntCreateWalletSource, zap.Error(err)) } default: l.Fatal(logs.UnknownCredentialSource, zap.String(cfgCredentialUse, credUse)) } return credSource }