frostfs-s3-lifecycler/cmd/s3-lifecycler/settings.go
Denis Kirillov 8b6d93c94d
All checks were successful
/ DCO (pull_request) Successful in 48s
/ Builds (pull_request) Successful in 1m19s
/ Vulncheck (pull_request) Successful in 1m20s
/ Lint (pull_request) Successful in 1m40s
/ Tests (pull_request) Successful in 1m20s
[#20] Add metric to see number of dropped logs
Add new metric frostfs_s3_lifecycler_statistic_dropped_logs
Also, configuration sampling interval is added

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-09-24 17:47:03 +03:00

509 lines
14 KiB
Go

package main
import (
"fmt"
"os"
"path"
"runtime"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/credential/walletsource"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/lifecycle"
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
neogoflags "github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/cli/input"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"go.uber.org/zap"
)
const (
// Wallet.
cfgWalletPath = "wallet.path"
cfgWalletAddress = "wallet.address"
cfgWalletPassphrase = "wallet.passphrase"
// Metrics.
cfgPrometheusEnabled = "prometheus.enabled"
cfgPrometheusAddress = "prometheus.address"
cfgPprofEnabled = "pprof.enabled"
cfgPprofAddress = "pprof.address"
// Logger.
cfgLoggerLevel = "logger.level"
cfgLoggerDestination = "logger.destination"
cfgLoggerSamplingEnabled = "logger.sampling.enabled"
cfgLoggerSamplingInitial = "logger.sampling.initial"
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
cfgLoggerSamplingInterval = "logger.sampling.interval"
cfgLoggerTags = "logger.tags"
// Morph.
cfgMorphRPCEndpointPrefixTmpl = "morph.rpc_endpoint.%d."
cfgMorphRPCEndpointAddressTmpl = cfgMorphRPCEndpointPrefixTmpl + "address"
cfgMorphRPCEndpointPriorityTmpl = cfgMorphRPCEndpointPrefixTmpl + "priority"
cfgMorphRPCEndpointTrustedCAListTmpl = cfgMorphRPCEndpointPrefixTmpl + "trusted_ca_list"
cfgMorphRPCEndpointCertificateTmpl = cfgMorphRPCEndpointPrefixTmpl + "certificate"
cfgMorphRPCEndpointKeyTmpl = cfgMorphRPCEndpointPrefixTmpl + "key"
cfgMorphReconnectClientsInterval = "morph.reconnect_clients_interval"
cfgMorphDialTimeout = "morph.dial_timeout"
cfgMorphContractNetmap = "morph.contract.netmap"
cfgMorphContractFrostfsID = "morph.contract.frostfsid"
cfgMorphContractContainer = "morph.contract.container"
// Credential source.
cfgCredentialUse = "credential.use"
cfgCredentialSourceWalletsPrefixTmpl = "credential.source.wallets.%d."
cfgCredentialSourceWalletsPathTmpl = cfgCredentialSourceWalletsPrefixTmpl + "path"
cfgCredentialSourceWalletsAddressTmpl = cfgCredentialSourceWalletsPrefixTmpl + "address"
cfgCredentialSourceWalletsPassphraseTmpl = cfgCredentialSourceWalletsPrefixTmpl + "passphrase"
// FrostFS.
cfgFrostFSConnectTimeout = "frostfs.connect_timeout"
cfgFrostFSStreamTimeout = "frostfs.stream_timeout"
cfgFrostFSHealthcheckTimeout = "frostfs.healthcheck_timeout"
cfgFrostFSRebalanceInterval = "frostfs.rebalance_interval"
cfgFrostFSPoolErrorThreshold = "frostfs.pool_error_threshold"
cfgFrostFSTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
cfgFrostFSPeersPrefixTmpl = "frostfs.peers.%d."
cfgFrostFSPeersAddressTmpl = cfgFrostFSPeersPrefixTmpl + "address"
cfgFrostFSPeersPriorityTmpl = cfgFrostFSPeersPrefixTmpl + "priority"
cfgFrostFSPeersWeightTmpl = cfgFrostFSPeersPrefixTmpl + "weight"
// Lifecycle.
cfgLifecycleJobFetcherBuffer = "lifecycle.job_fetcher_buffer"
cfgLifecycleExecutorPoolSize = "lifecycle.executor_pool_size"
cfgLifecycleServices = "lifecycle.services"
// Command line args.
cmdHelp = "help"
cmdVersion = "version"
cmdConfig = "config"
cmdConfigDir = "config-dir"
)
const (
defaultShutdownTimeout = 15 * time.Second
componentName = "frostfs-s3-lifecycler"
defaultMorphRPCEndpointPriority = 1
defaultMorphReconnectClientsInterval = 30 * time.Second
defaultMorphDialTimeout = 5 * time.Second
defaultFrostFSRebalanceInterval = 60 * time.Second
defaultFrostFSHealthcheckTimeout = 15 * time.Second
defaultFrostFSConnectTimeout = 10 * time.Second
defaultFrostFSStreamTimeout = 10 * time.Second
defaultFrostFSPoolErrorThreshold uint32 = 100
defaultLifecycleJobFetcherBuffer = 1000
defaultLifecycleExecutorPoolSize = 100
)
func settings() *viper.Viper {
v := viper.New()
v.AutomaticEnv()
v.SetEnvPrefix(Prefix)
v.AllowEmptyEnv(true)
v.SetConfigType("yaml")
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
// flags setup:
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
flags.SetOutput(os.Stdout)
flags.SortFlags = false
help := flags.BoolP(cmdHelp, "h", false, "show help")
version := flags.BoolP(cmdVersion, "v", false, "show version")
flags.StringArrayP(cmdConfig, "c", nil, "config paths")
flags.String(cmdConfigDir, "", "config dir path")
// set defaults:
// logger:
v.SetDefault(cfgLoggerLevel, "info")
v.SetDefault(cfgLoggerDestination, "stdout")
v.SetDefault(cfgLoggerSamplingThereafter, 100)
v.SetDefault(cfgLoggerSamplingInitial, 100)
v.SetDefault(cfgLoggerSamplingInterval, time.Second)
// services:
v.SetDefault(cfgPrometheusEnabled, false)
v.SetDefault(cfgPprofEnabled, false)
// morph:
v.SetDefault(cfgMorphReconnectClientsInterval, defaultMorphReconnectClientsInterval)
v.SetDefault(cfgMorphDialTimeout, defaultMorphDialTimeout)
v.SetDefault(cfgMorphContractNetmap, "netmap.frostfs")
v.SetDefault(cfgMorphContractFrostfsID, "frostfsid.frostfs")
v.SetDefault(cfgMorphContractContainer, "container.frostfs")
// frostfs:
v.SetDefault(cfgFrostFSConnectTimeout, defaultFrostFSConnectTimeout)
v.SetDefault(cfgFrostFSRebalanceInterval, defaultFrostFSRebalanceInterval)
v.SetDefault(cfgFrostFSHealthcheckTimeout, defaultFrostFSHealthcheckTimeout)
v.SetDefault(cfgFrostFSStreamTimeout, defaultFrostFSStreamTimeout)
// lifecycle:
v.SetDefault(cfgLifecycleJobFetcherBuffer, defaultLifecycleJobFetcherBuffer)
v.SetDefault(cfgLifecycleExecutorPoolSize, defaultLifecycleExecutorPoolSize)
// Bind flags with configuration values.
if err := v.BindPFlags(flags); err != nil {
panic(err)
}
if err := flags.Parse(os.Args); err != nil {
panic(err)
}
switch {
case help != nil && *help:
printVersion()
flags.PrintDefaults()
os.Exit(0)
case version != nil && *version:
printVersion()
os.Exit(0)
}
if err := readInConfig(v); err != nil {
panic(err)
}
return v
}
func readInConfig(v *viper.Viper) error {
if v.IsSet(cmdConfig) {
if err := readConfig(v); err != nil {
return err
}
}
if v.IsSet(cmdConfigDir) {
if err := readConfigDir(v); err != nil {
return err
}
}
return nil
}
func readConfigDir(v *viper.Viper) error {
cfgSubConfigDir := v.GetString(cmdConfigDir)
entries, err := os.ReadDir(cfgSubConfigDir)
if err != nil {
return err
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
ext := path.Ext(entry.Name())
if ext != ".yaml" && ext != ".yml" {
continue
}
if err = mergeConfig(v, path.Join(cfgSubConfigDir, entry.Name())); err != nil {
return err
}
}
return nil
}
func readConfig(v *viper.Viper) error {
for _, fileName := range v.GetStringSlice(cmdConfig) {
if err := mergeConfig(v, fileName); err != nil {
return err
}
}
return nil
}
func mergeConfig(v *viper.Viper, fileName string) error {
cfgFile, err := os.Open(fileName)
if err != nil {
return err
}
defer func() {
if err2 := cfgFile.Close(); err2 != nil {
panic(err2)
}
}()
err = v.MergeConfig(cfgFile)
return err
}
func printVersion() {
fmt.Printf("%s\nVersion: %s\nGoVersion: %s\n", componentName, Version, runtime.Version())
}
func fetchKey(v *viper.Viper) (*keys.PrivateKey, error) {
var password *string
if v.IsSet(cfgWalletPassphrase) {
pwd := v.GetString(cfgWalletPassphrase)
password = &pwd
}
walletPath := v.GetString(cfgWalletPath)
if len(walletPath) == 0 {
return nil, fmt.Errorf("wallet path must not be empty")
}
w, err := wallet.NewWalletFromFile(walletPath)
if err != nil {
return nil, fmt.Errorf("parse wallet: %w", err)
}
walletAddress := v.GetString(cfgWalletAddress)
var addr util.Uint160
if len(walletAddress) == 0 {
addr = w.GetChangeAddress()
} else {
addr, err = neogoflags.ParseAddress(walletAddress)
if err != nil {
return nil, fmt.Errorf("invalid address")
}
}
acc := w.GetAccount(addr)
if acc == nil {
return nil, fmt.Errorf("couldn't find wallet account for %s", walletAddress)
}
if password == nil {
pwd, err := input.ReadPassword(fmt.Sprintf("Enter password for %s > ", walletPath))
if err != nil {
return nil, fmt.Errorf("couldn't read password")
}
password = &pwd
}
if err = acc.Decrypt(*password, w.Scrypt); err != nil {
return nil, fmt.Errorf("couldn't decrypt account: %w", err)
}
return acc.PrivateKey(), nil
}
func fetchMorphEndpoints(v *viper.Viper, l *zap.Logger) []client.Endpoint {
var res []client.Endpoint
for i := 0; ; i++ {
addr := v.GetString(fmt.Sprintf(cfgMorphRPCEndpointAddressTmpl, i))
if addr == "" {
break
}
priority := v.GetInt(fmt.Sprintf(cfgMorphRPCEndpointPriorityTmpl, i))
if priority <= 0 {
priority = defaultMorphRPCEndpointPriority
}
var mtlsConfig *client.MTLSConfig
rootCAs := v.GetStringSlice(fmt.Sprintf(cfgMorphRPCEndpointTrustedCAListTmpl, i))
if len(rootCAs) != 0 {
mtlsConfig = &client.MTLSConfig{
TrustedCAList: rootCAs,
KeyFile: v.GetString(fmt.Sprintf(cfgMorphRPCEndpointKeyTmpl, i)),
CertFile: v.GetString(fmt.Sprintf(cfgMorphRPCEndpointCertificateTmpl, i)),
}
}
res = append(res, client.Endpoint{
Address: addr,
Priority: priority,
MTLSConfig: mtlsConfig,
})
}
if len(res) == 0 {
l.Fatal(logs.NoMorphRPCEndpoints)
}
return res
}
func fetchWalletsCredentials(v *viper.Viper, l *zap.Logger) []walletsource.Wallet {
var res []walletsource.Wallet
for i := 0; ; i++ {
walletPath := v.GetString(fmt.Sprintf(cfgCredentialSourceWalletsPathTmpl, i))
if walletPath == "" {
break
}
res = append(res, walletsource.Wallet{
Path: walletPath,
Address: v.GetString(fmt.Sprintf(cfgCredentialSourceWalletsAddressTmpl, i)),
Passphrase: v.GetString(fmt.Sprintf(cfgCredentialSourceWalletsPassphraseTmpl, i)),
})
}
if len(res) == 0 {
l.Fatal(logs.NoCredentialSourceWallets)
}
return res
}
func fetchPeers(v *viper.Viper, l *zap.Logger) []pool.NodeParam {
var nodes []pool.NodeParam
for i := 0; ; i++ {
address := v.GetString(fmt.Sprintf(cfgFrostFSPeersAddressTmpl, i))
if address == "" {
break
}
priority := v.GetInt(fmt.Sprintf(cfgFrostFSPeersPriorityTmpl, i))
if priority <= 0 { // unspecified or wrong
priority = 1
}
weight := v.GetFloat64(fmt.Sprintf(cfgFrostFSPeersWeightTmpl, i))
if weight <= 0 { // unspecified or wrong
weight = 1
}
nodes = append(nodes, pool.NewNodeParam(priority, address, weight))
l.Info(logs.AddedStoragePeer,
zap.String("address", address),
zap.Int("priority", priority),
zap.Float64("weight", weight))
}
return nodes
}
func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
connTimeout := cfg.GetDuration(cfgFrostFSConnectTimeout)
if connTimeout <= 0 {
connTimeout = defaultFrostFSConnectTimeout
}
return connTimeout
}
func fetchStreamTimeout(cfg *viper.Viper) time.Duration {
streamTimeout := cfg.GetDuration(cfgFrostFSStreamTimeout)
if streamTimeout <= 0 {
streamTimeout = defaultFrostFSStreamTimeout
}
return streamTimeout
}
func fetchHealthCheckTimeout(cfg *viper.Viper) time.Duration {
healthCheckTimeout := cfg.GetDuration(cfgFrostFSHealthcheckTimeout)
if healthCheckTimeout <= 0 {
healthCheckTimeout = defaultFrostFSHealthcheckTimeout
}
return healthCheckTimeout
}
func fetchRebalanceInterval(cfg *viper.Viper) time.Duration {
rebalanceInterval := cfg.GetDuration(cfgFrostFSRebalanceInterval)
if rebalanceInterval <= 0 {
rebalanceInterval = defaultFrostFSRebalanceInterval
}
return rebalanceInterval
}
func fetchErrorThreshold(cfg *viper.Viper) uint32 {
errorThreshold := cfg.GetUint32(cfgFrostFSPoolErrorThreshold)
if errorThreshold <= 0 {
errorThreshold = defaultFrostFSPoolErrorThreshold
}
return errorThreshold
}
func fetchJobFetcherBuffer(cfg *viper.Viper) int {
bufferSize := cfg.GetInt(cfgLifecycleJobFetcherBuffer)
if bufferSize <= 0 {
bufferSize = defaultLifecycleJobFetcherBuffer
}
return bufferSize
}
func fetchExecutorPoolSize(cfg *viper.Viper) int {
val := cfg.GetInt(cfgLifecycleExecutorPoolSize)
if val <= 0 {
val = defaultLifecycleExecutorPoolSize
}
return val
}
func fetchMorphReconnectClientsInterval(cfg *viper.Viper) time.Duration {
val := cfg.GetDuration(cfgMorphReconnectClientsInterval)
if val <= 0 {
val = defaultMorphReconnectClientsInterval
}
return val
}
func fetchMorphDialTimeout(cfg *viper.Viper) time.Duration {
val := cfg.GetDuration(cfgMorphDialTimeout)
if val <= 0 {
val = defaultMorphDialTimeout
}
return val
}
func fetchLifecycleServices(v *viper.Viper) (keys.PublicKeys, error) {
configKeys := v.GetStringSlice(cfgLifecycleServices)
result := make(keys.PublicKeys, 0, len(configKeys))
uniqKeys := make(map[string]struct{}, len(configKeys))
for _, configKey := range configKeys {
if _, ok := uniqKeys[configKey]; ok {
continue
}
k, err := keys.NewPublicKeyFromString(configKey)
if err != nil {
return nil, fmt.Errorf("key '%s': %w", configKey, err)
}
result = append(result, k)
uniqKeys[configKey] = struct{}{}
}
return result, nil
}
func fetchCredentialSource(v *viper.Viper, l *zap.Logger) lifecycle.CredentialSource {
credUse := v.GetString(cfgCredentialUse)
var (
err error
credSource lifecycle.CredentialSource
)
switch credUse {
case "wallets":
if credSource, err = walletsource.New(fetchWalletsCredentials(v, l)); err != nil {
l.Fatal(logs.CouldntCreateWalletSource, zap.Error(err))
}
default:
l.Fatal(logs.UnknownCredentialSource, zap.String(cfgCredentialUse, credUse))
}
return credSource
}