Roman Loginov
dc100f03a6
Fallback path to search is needed because some software may keep FileName attribute and ignore FilePath attribute during file upload. Therefore, if this feature is enabled under certain conditions (for more information, see gate-configuration.md) a search will be performed for the FileName attribute. Signed-off-by: Roman Loginov <r.loginov@yadro.com>
830 lines
22 KiB
Go
830 lines
22 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"os"
|
|
"path"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
|
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
|
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
|
"git.frostfs.info/TrueCloudLab/zapjournald"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/spf13/pflag"
|
|
"github.com/spf13/viper"
|
|
"github.com/ssgreg/journald"
|
|
"github.com/valyala/fasthttp"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const (
|
|
destinationStdout = "stdout"
|
|
destinationJournald = "journald"
|
|
)
|
|
|
|
const (
|
|
defaultRebalanceTimer = 60 * time.Second
|
|
defaultRequestTimeout = 15 * time.Second
|
|
defaultConnectTimeout = 10 * time.Second
|
|
defaultStreamTimeout = 10 * time.Second
|
|
|
|
defaultLoggerSamplerInterval = 1 * time.Second
|
|
|
|
defaultShutdownTimeout = 15 * time.Second
|
|
|
|
defaultPoolErrorThreshold uint32 = 100
|
|
|
|
defaultSoftMemoryLimit = math.MaxInt64
|
|
|
|
defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb
|
|
|
|
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
|
|
|
defaultReconnectInterval = time.Minute
|
|
|
|
defaultCORSMaxAge = 600 // seconds
|
|
|
|
defaultMultinetFallbackDelay = 300 * time.Millisecond
|
|
|
|
cfgServer = "server"
|
|
cfgTLSEnabled = "tls.enabled"
|
|
cfgTLSCertFile = "tls.cert_file"
|
|
cfgTLSKeyFile = "tls.key_file"
|
|
|
|
cfgReconnectInterval = "reconnect_interval"
|
|
|
|
cfgIndexPageEnabled = "index_page.enabled"
|
|
cfgIndexPageTemplatePath = "index_page.template_path"
|
|
|
|
cfgWorkerPoolSize = "worker_pool_size"
|
|
|
|
// Web.
|
|
cfgWebReadBufferSize = "web.read_buffer_size"
|
|
cfgWebWriteBufferSize = "web.write_buffer_size"
|
|
cfgWebReadTimeout = "web.read_timeout"
|
|
cfgWebWriteTimeout = "web.write_timeout"
|
|
cfgWebStreamRequestBody = "web.stream_request_body"
|
|
cfgWebMaxRequestBodySize = "web.max_request_body_size"
|
|
|
|
// Metrics / Profiler.
|
|
cfgPrometheusEnabled = "prometheus.enabled"
|
|
cfgPrometheusAddress = "prometheus.address"
|
|
cfgPprofEnabled = "pprof.enabled"
|
|
cfgPprofAddress = "pprof.address"
|
|
|
|
// Tracing ...
|
|
cfgTracingEnabled = "tracing.enabled"
|
|
cfgTracingExporter = "tracing.exporter"
|
|
cfgTracingEndpoint = "tracing.endpoint"
|
|
cfgTracingTrustedCa = "tracing.trusted_ca"
|
|
cfgTracingAttributes = "tracing.attributes"
|
|
|
|
// Pool config.
|
|
cfgConTimeout = "connect_timeout"
|
|
cfgStreamTimeout = "stream_timeout"
|
|
cfgReqTimeout = "request_timeout"
|
|
cfgRebalance = "rebalance_timer"
|
|
cfgPoolErrorThreshold = "pool_error_threshold"
|
|
|
|
// Logger.
|
|
cfgLoggerLevel = "logger.level"
|
|
cfgLoggerDestination = "logger.destination"
|
|
|
|
cfgLoggerSamplingEnabled = "logger.sampling.enabled"
|
|
cfgLoggerSamplingInitial = "logger.sampling.initial"
|
|
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
|
|
cfgLoggerSamplingInterval = "logger.sampling.interval"
|
|
|
|
// Wallet.
|
|
cfgWalletPassphrase = "wallet.passphrase"
|
|
cfgWalletPath = "wallet.path"
|
|
cfgWalletAddress = "wallet.address"
|
|
|
|
// Uploader Header.
|
|
cfgUploaderHeaderEnableDefaultTimestamp = "upload_header.use_default_timestamp"
|
|
|
|
// Peers.
|
|
cfgPeers = "peers"
|
|
|
|
// NeoGo.
|
|
cfgRPCEndpoint = "rpc_endpoint"
|
|
|
|
// Resolving.
|
|
cfgResolveOrder = "resolve_order"
|
|
|
|
// Zip compression.
|
|
cfgZipCompression = "zip.compression"
|
|
|
|
// Runtime.
|
|
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
|
|
|
// Enabling client side object preparing for PUT operations.
|
|
cfgClientCut = "frostfs.client_cut"
|
|
// Sets max buffer size for read payload in put operations.
|
|
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
|
// Configuration of parameters of requests to FrostFS.
|
|
// Sets max attempt to make successful tree request.
|
|
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
|
|
|
|
// Caching.
|
|
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
|
|
cfgBucketsCacheSize = "cache.buckets.size"
|
|
|
|
// Bucket resolving options.
|
|
cfgResolveNamespaceHeader = "resolve_bucket.namespace_header"
|
|
cfgResolveDefaultNamespaces = "resolve_bucket.default_namespaces"
|
|
|
|
// CORS.
|
|
cfgCORSAllowOrigin = "cors.allow_origin"
|
|
cfgCORSAllowMethods = "cors.allow_methods"
|
|
cfgCORSAllowHeaders = "cors.allow_headers"
|
|
cfgCORSExposeHeaders = "cors.expose_headers"
|
|
cfgCORSAllowCredentials = "cors.allow_credentials"
|
|
cfgCORSMaxAge = "cors.max_age"
|
|
|
|
// Multinet.
|
|
cfgMultinetEnabled = "multinet.enabled"
|
|
cfgMultinetBalancer = "multinet.balancer"
|
|
cfgMultinetRestrict = "multinet.restrict"
|
|
cfgMultinetFallbackDelay = "multinet.fallback_delay"
|
|
cfgMultinetSubnets = "multinet.subnets"
|
|
|
|
// Feature.
|
|
cfgFeaturesEnableFilepathFallback = "features.enable_filepath_fallback"
|
|
|
|
// Command line args.
|
|
cmdHelp = "help"
|
|
cmdVersion = "version"
|
|
cmdPprof = "pprof"
|
|
cmdMetrics = "metrics"
|
|
cmdWallet = "wallet"
|
|
cmdAddress = "address"
|
|
cmdConfig = "config"
|
|
cmdConfigDir = "config-dir"
|
|
cmdListenAddress = "listen_address"
|
|
)
|
|
|
|
var ignore = map[string]struct{}{
|
|
cfgPeers: {},
|
|
cmdHelp: {},
|
|
cmdVersion: {},
|
|
}
|
|
|
|
type Logger struct {
|
|
logger *zap.Logger
|
|
lvl zap.AtomicLevel
|
|
}
|
|
|
|
func settings() *viper.Viper {
|
|
v := viper.New()
|
|
v.AutomaticEnv()
|
|
v.SetEnvPrefix(Prefix)
|
|
v.AllowEmptyEnv(true)
|
|
v.SetConfigType("yaml")
|
|
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
|
|
|
// flags setup:
|
|
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
|
|
flags.SetOutput(os.Stdout)
|
|
flags.SortFlags = false
|
|
|
|
flags.Bool(cmdPprof, false, "enable pprof")
|
|
flags.Bool(cmdMetrics, false, "enable prometheus")
|
|
|
|
help := flags.BoolP(cmdHelp, "h", false, "show help")
|
|
version := flags.BoolP(cmdVersion, "v", false, "show version")
|
|
|
|
flags.StringP(cmdWallet, "w", "", `path to the wallet`)
|
|
flags.String(cmdAddress, "", `address of wallet account`)
|
|
flags.StringArray(cmdConfig, nil, "config paths")
|
|
flags.String(cmdConfigDir, "", "config dir path")
|
|
flags.Duration(cfgConTimeout, defaultConnectTimeout, "gRPC connect timeout")
|
|
flags.Duration(cfgStreamTimeout, defaultStreamTimeout, "gRPC individual message timeout")
|
|
flags.Duration(cfgReqTimeout, defaultRequestTimeout, "gRPC request timeout")
|
|
flags.Duration(cfgRebalance, defaultRebalanceTimer, "gRPC connection rebalance timer")
|
|
|
|
flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen")
|
|
flags.String(cfgTLSCertFile, "", "TLS certificate path")
|
|
flags.String(cfgTLSKeyFile, "", "TLS key path")
|
|
peers := flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes")
|
|
|
|
resolveMethods := flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order")
|
|
|
|
// set defaults:
|
|
|
|
// logger:
|
|
v.SetDefault(cfgLoggerLevel, "debug")
|
|
v.SetDefault(cfgLoggerDestination, "stdout")
|
|
v.SetDefault(cfgLoggerSamplingEnabled, false)
|
|
v.SetDefault(cfgLoggerSamplingThereafter, 100)
|
|
v.SetDefault(cfgLoggerSamplingInitial, 100)
|
|
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
|
|
|
|
// pool:
|
|
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
|
|
|
// frostfs:
|
|
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
|
|
|
// web-server:
|
|
v.SetDefault(cfgWebReadBufferSize, 4096)
|
|
v.SetDefault(cfgWebWriteBufferSize, 4096)
|
|
v.SetDefault(cfgWebReadTimeout, time.Minute*10)
|
|
v.SetDefault(cfgWebWriteTimeout, time.Minute*5)
|
|
v.SetDefault(cfgWebStreamRequestBody, true)
|
|
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
|
|
|
|
v.SetDefault(cfgWorkerPoolSize, 1000)
|
|
// upload header
|
|
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
|
|
|
|
// zip:
|
|
v.SetDefault(cfgZipCompression, false)
|
|
|
|
// metrics
|
|
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
|
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
|
|
|
// resolve bucket
|
|
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
|
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
|
|
|
|
// multinet
|
|
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
|
|
|
// Binding flags
|
|
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
|
panic(err)
|
|
}
|
|
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := v.BindPFlags(flags); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
|
|
panic(err)
|
|
}
|
|
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
|
|
panic(err)
|
|
}
|
|
if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := flags.Parse(os.Args); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
|
|
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
|
|
}
|
|
|
|
if resolveMethods != nil {
|
|
v.SetDefault(cfgResolveOrder, *resolveMethods)
|
|
}
|
|
|
|
switch {
|
|
case help != nil && *help:
|
|
fmt.Printf("FrostFS HTTP Gateway %s\n", Version)
|
|
flags.PrintDefaults()
|
|
|
|
fmt.Println()
|
|
fmt.Println("Default environments:")
|
|
fmt.Println()
|
|
keys := v.AllKeys()
|
|
sort.Strings(keys)
|
|
|
|
for i := range keys {
|
|
if _, ok := ignore[keys[i]]; ok {
|
|
continue
|
|
}
|
|
|
|
defaultValue := v.GetString(keys[i])
|
|
if len(defaultValue) == 0 {
|
|
continue
|
|
}
|
|
|
|
k := strings.Replace(keys[i], ".", "_", -1)
|
|
fmt.Printf("%s_%s = %s\n", Prefix, strings.ToUpper(k), defaultValue)
|
|
}
|
|
|
|
fmt.Println()
|
|
fmt.Println("Peers preset:")
|
|
fmt.Println()
|
|
|
|
fmt.Printf("%s_%s_[N]_ADDRESS = string\n", Prefix, strings.ToUpper(cfgPeers))
|
|
fmt.Printf("%s_%s_[N]_WEIGHT = float\n", Prefix, strings.ToUpper(cfgPeers))
|
|
|
|
os.Exit(0)
|
|
case version != nil && *version:
|
|
fmt.Printf("FrostFS HTTP Gateway\nVersion: %s\nGoVersion: %s\n", Version, runtime.Version())
|
|
os.Exit(0)
|
|
}
|
|
|
|
if err := readInConfig(v); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if peers != nil && len(*peers) > 0 {
|
|
for i := range *peers {
|
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", (*peers)[i])
|
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
|
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
|
|
}
|
|
}
|
|
|
|
return v
|
|
}
|
|
|
|
func readInConfig(v *viper.Viper) error {
|
|
if v.IsSet(cmdConfig) {
|
|
if err := readConfig(v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if v.IsSet(cmdConfigDir) {
|
|
if err := readConfigDir(v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func readConfigDir(v *viper.Viper) error {
|
|
cfgSubConfigDir := v.GetString(cmdConfigDir)
|
|
entries, err := os.ReadDir(cfgSubConfigDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
if entry.IsDir() {
|
|
continue
|
|
}
|
|
ext := path.Ext(entry.Name())
|
|
if ext != ".yaml" && ext != ".yml" {
|
|
continue
|
|
}
|
|
|
|
if err = mergeConfig(v, path.Join(cfgSubConfigDir, entry.Name())); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func readConfig(v *viper.Viper) error {
|
|
for _, fileName := range v.GetStringSlice(cmdConfig) {
|
|
if err := mergeConfig(v, fileName); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func mergeConfig(v *viper.Viper, fileName string) error {
|
|
cfgFile, err := os.Open(fileName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if errClose := cfgFile.Close(); errClose != nil {
|
|
panic(errClose)
|
|
}
|
|
}()
|
|
|
|
return v.MergeConfig(cfgFile)
|
|
}
|
|
|
|
type LoggerAppSettings interface {
|
|
DroppedLogsInc()
|
|
}
|
|
|
|
func pickLogger(v *viper.Viper, settings LoggerAppSettings) *Logger {
|
|
lvl, err := getLogLevel(v)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
dest := v.GetString(cfgLoggerDestination)
|
|
|
|
switch dest {
|
|
case destinationStdout:
|
|
return newStdoutLogger(v, lvl, settings)
|
|
case destinationJournald:
|
|
return newJournaldLogger(v, lvl, settings)
|
|
default:
|
|
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
|
}
|
|
}
|
|
|
|
// newStdoutLogger constructs a zap.Logger instance for current application.
|
|
// Panics on failure.
|
|
//
|
|
// Logger is built from zap's production logging configuration with:
|
|
// - parameterized level (debug by default)
|
|
// - console encoding
|
|
// - ISO8601 time encoding
|
|
//
|
|
// Logger records a stack trace for all messages at or above fatal level.
|
|
//
|
|
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
|
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
|
|
stdout := zapcore.AddSync(os.Stderr)
|
|
level := zap.NewAtomicLevelAt(lvl)
|
|
|
|
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
|
|
consoleOutCore = applyZapCoreMiddlewares(consoleOutCore, v, settings)
|
|
|
|
return &Logger{
|
|
logger: zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
|
lvl: level,
|
|
}
|
|
}
|
|
|
|
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level, settings LoggerAppSettings) *Logger {
|
|
level := zap.NewAtomicLevelAt(lvl)
|
|
|
|
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
|
|
|
|
core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
|
coreWithContext := core.With([]zapcore.Field{
|
|
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
|
zapjournald.SyslogIdentifier(),
|
|
zapjournald.SyslogPid(),
|
|
})
|
|
|
|
coreWithContext = applyZapCoreMiddlewares(coreWithContext, v, settings)
|
|
|
|
return &Logger{
|
|
logger: zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel))),
|
|
lvl: level,
|
|
}
|
|
}
|
|
|
|
func newLogEncoder() zapcore.Encoder {
|
|
c := zap.NewProductionEncoderConfig()
|
|
c.EncodeTime = zapcore.ISO8601TimeEncoder
|
|
|
|
return zapcore.NewConsoleEncoder(c)
|
|
}
|
|
|
|
func applyZapCoreMiddlewares(core zapcore.Core, v *viper.Viper, settings LoggerAppSettings) zapcore.Core {
|
|
if v.GetBool(cfgLoggerSamplingEnabled) {
|
|
core = zapcore.NewSamplerWithOptions(core,
|
|
v.GetDuration(cfgLoggerSamplingInterval),
|
|
v.GetInt(cfgLoggerSamplingInitial),
|
|
v.GetInt(cfgLoggerSamplingThereafter),
|
|
zapcore.SamplerHook(func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
|
|
if dec&zapcore.LogDropped > 0 {
|
|
settings.DroppedLogsInc()
|
|
}
|
|
}))
|
|
}
|
|
|
|
return core
|
|
}
|
|
|
|
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
|
var lvl zapcore.Level
|
|
lvlStr := v.GetString(cfgLoggerLevel)
|
|
err := lvl.UnmarshalText([]byte(lvlStr))
|
|
if err != nil {
|
|
return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+
|
|
"value should be one of %v", lvlStr, err, [...]zapcore.Level{
|
|
zapcore.DebugLevel,
|
|
zapcore.InfoLevel,
|
|
zapcore.WarnLevel,
|
|
zapcore.ErrorLevel,
|
|
zapcore.DPanicLevel,
|
|
zapcore.PanicLevel,
|
|
zapcore.FatalLevel,
|
|
})
|
|
}
|
|
return lvl, nil
|
|
}
|
|
|
|
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
|
reconnect := cfg.GetDuration(cfgReconnectInterval)
|
|
if reconnect <= 0 {
|
|
reconnect = defaultReconnectInterval
|
|
}
|
|
|
|
return reconnect
|
|
}
|
|
|
|
func fetchIndexPageTemplate(v *viper.Viper, l *zap.Logger) (string, bool) {
|
|
if !v.GetBool(cfgIndexPageEnabled) {
|
|
return "", false
|
|
}
|
|
|
|
reader, err := os.Open(v.GetString(cfgIndexPageTemplatePath))
|
|
if err != nil {
|
|
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
|
|
return "", true
|
|
}
|
|
|
|
tmpl, err := io.ReadAll(reader)
|
|
if err != nil {
|
|
l.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
|
|
return "", true
|
|
}
|
|
|
|
l.Info(logs.SetCustomIndexPageTemplate)
|
|
return string(tmpl), true
|
|
}
|
|
|
|
func fetchDefaultNamespaces(v *viper.Viper) []string {
|
|
namespaces := v.GetStringSlice(cfgResolveDefaultNamespaces)
|
|
|
|
for i := range namespaces { // to be set namespaces in env variable as `HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"`
|
|
namespaces[i] = strings.Trim(namespaces[i], "\"")
|
|
}
|
|
|
|
return namespaces
|
|
}
|
|
|
|
func fetchCORSMaxAge(v *viper.Viper) int {
|
|
maxAge := v.GetInt(cfgCORSMaxAge)
|
|
if maxAge <= 0 {
|
|
maxAge = defaultCORSMaxAge
|
|
}
|
|
|
|
return maxAge
|
|
}
|
|
|
|
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
|
var servers []ServerInfo
|
|
seen := make(map[string]struct{})
|
|
|
|
for i := 0; ; i++ {
|
|
key := cfgServer + "." + strconv.Itoa(i) + "."
|
|
|
|
var serverInfo ServerInfo
|
|
serverInfo.Address = v.GetString(key + "address")
|
|
serverInfo.TLS.Enabled = v.GetBool(key + cfgTLSEnabled)
|
|
serverInfo.TLS.KeyFile = v.GetString(key + cfgTLSKeyFile)
|
|
serverInfo.TLS.CertFile = v.GetString(key + cfgTLSCertFile)
|
|
|
|
if serverInfo.Address == "" {
|
|
break
|
|
}
|
|
|
|
if _, ok := seen[serverInfo.Address]; ok {
|
|
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
|
|
continue
|
|
}
|
|
seen[serverInfo.Address] = struct{}{}
|
|
servers = append(servers, serverInfo)
|
|
}
|
|
|
|
return servers
|
|
}
|
|
|
|
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
|
key, err := getFrostFSKey(cfg, logger)
|
|
if err != nil {
|
|
logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
|
}
|
|
|
|
var prm pool.InitParameters
|
|
var prmTree treepool.InitParameters
|
|
|
|
prm.SetKey(&key.PrivateKey)
|
|
prmTree.SetKey(key)
|
|
logger.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
|
|
|
for _, peer := range fetchPeers(logger, cfg) {
|
|
prm.AddNode(peer)
|
|
prmTree.AddNode(peer)
|
|
}
|
|
|
|
connTimeout := cfg.GetDuration(cfgConTimeout)
|
|
if connTimeout <= 0 {
|
|
connTimeout = defaultConnectTimeout
|
|
}
|
|
prm.SetNodeDialTimeout(connTimeout)
|
|
prmTree.SetNodeDialTimeout(connTimeout)
|
|
|
|
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
|
|
if streamTimeout <= 0 {
|
|
streamTimeout = defaultStreamTimeout
|
|
}
|
|
prm.SetNodeStreamTimeout(streamTimeout)
|
|
prmTree.SetNodeStreamTimeout(streamTimeout)
|
|
|
|
healthCheckTimeout := cfg.GetDuration(cfgReqTimeout)
|
|
if healthCheckTimeout <= 0 {
|
|
healthCheckTimeout = defaultRequestTimeout
|
|
}
|
|
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
|
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
|
|
|
rebalanceInterval := cfg.GetDuration(cfgRebalance)
|
|
if rebalanceInterval <= 0 {
|
|
rebalanceInterval = defaultRebalanceTimer
|
|
}
|
|
prm.SetClientRebalanceInterval(rebalanceInterval)
|
|
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
|
|
|
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
|
if errorThreshold <= 0 {
|
|
errorThreshold = defaultPoolErrorThreshold
|
|
}
|
|
prm.SetErrorThreshold(errorThreshold)
|
|
prm.SetLogger(logger)
|
|
prmTree.SetLogger(logger)
|
|
|
|
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts))
|
|
|
|
interceptors := []grpc.DialOption{
|
|
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
|
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
|
|
grpc.WithContextDialer(dialSource.GrpcContextDialer()),
|
|
}
|
|
prm.SetGRPCDialOptions(interceptors...)
|
|
prmTree.SetGRPCDialOptions(interceptors...)
|
|
|
|
p, err := pool.NewPool(prm)
|
|
if err != nil {
|
|
logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
|
|
}
|
|
|
|
if err = p.Dial(ctx); err != nil {
|
|
logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
|
}
|
|
|
|
treePool, err := treepool.NewPool(prmTree)
|
|
if err != nil {
|
|
logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
|
|
}
|
|
if err = treePool.Dial(ctx); err != nil {
|
|
logger.Fatal(logs.FailedToDialTreePool, zap.Error(err))
|
|
}
|
|
|
|
return p, treePool, key
|
|
}
|
|
|
|
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
|
|
var nodes []pool.NodeParam
|
|
for i := 0; ; i++ {
|
|
key := cfgPeers + "." + strconv.Itoa(i) + "."
|
|
address := v.GetString(key + "address")
|
|
weight := v.GetFloat64(key + "weight")
|
|
priority := v.GetInt(key + "priority")
|
|
|
|
if address == "" {
|
|
break
|
|
}
|
|
if weight <= 0 { // unspecified or wrong
|
|
weight = 1
|
|
}
|
|
if priority <= 0 { // unspecified or wrong
|
|
priority = 1
|
|
}
|
|
|
|
nodes = append(nodes, pool.NewNodeParam(priority, address, weight))
|
|
|
|
l.Info(logs.AddedStoragePeer,
|
|
zap.Int("priority", priority),
|
|
zap.String("address", address),
|
|
zap.Float64("weight", weight))
|
|
}
|
|
|
|
return nodes
|
|
}
|
|
|
|
func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
|
|
softMemoryLimit := cfg.GetSizeInBytes(cfgSoftMemoryLimit)
|
|
if softMemoryLimit <= 0 {
|
|
softMemoryLimit = defaultSoftMemoryLimit
|
|
}
|
|
|
|
return int64(softMemoryLimit)
|
|
}
|
|
|
|
func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|
cacheCfg := cache.DefaultBucketConfig(l)
|
|
|
|
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime)
|
|
cacheCfg.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Size)
|
|
|
|
return cacheCfg
|
|
}
|
|
|
|
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
|
if v.IsSet(cfgEntry) {
|
|
lifetime := v.GetDuration(cfgEntry)
|
|
if lifetime <= 0 {
|
|
l.Error(logs.InvalidLifetimeUsingDefaultValue,
|
|
zap.String("parameter", cfgEntry),
|
|
zap.Duration("value in config", lifetime),
|
|
zap.Duration("default", defaultValue))
|
|
} else {
|
|
return lifetime
|
|
}
|
|
}
|
|
|
|
return defaultValue
|
|
}
|
|
|
|
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
|
if v.IsSet(cfgEntry) {
|
|
size := v.GetInt(cfgEntry)
|
|
if size <= 0 {
|
|
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
|
|
zap.String("parameter", cfgEntry),
|
|
zap.Int("value in config", size),
|
|
zap.Int("default", defaultValue))
|
|
} else {
|
|
return size
|
|
}
|
|
}
|
|
|
|
return defaultValue
|
|
}
|
|
|
|
func getDialerSource(logger *zap.Logger, cfg *viper.Viper) *internalnet.DialerSource {
|
|
source, err := internalnet.NewDialerSource(fetchMultinetConfig(cfg, logger))
|
|
if err != nil {
|
|
logger.Fatal(logs.FailedToLoadMultinetConfig, zap.Error(err))
|
|
}
|
|
return source
|
|
}
|
|
|
|
func fetchMultinetConfig(v *viper.Viper, l *zap.Logger) (cfg internalnet.Config) {
|
|
cfg.Enabled = v.GetBool(cfgMultinetEnabled)
|
|
cfg.Balancer = v.GetString(cfgMultinetBalancer)
|
|
cfg.Restrict = v.GetBool(cfgMultinetRestrict)
|
|
cfg.FallbackDelay = v.GetDuration(cfgMultinetFallbackDelay)
|
|
cfg.Subnets = make([]internalnet.Subnet, 0, 5)
|
|
cfg.EventHandler = internalnet.NewLogEventHandler(l)
|
|
|
|
for i := 0; ; i++ {
|
|
key := cfgMultinetSubnets + "." + strconv.Itoa(i) + "."
|
|
subnet := internalnet.Subnet{}
|
|
|
|
subnet.Prefix = v.GetString(key + "mask")
|
|
if subnet.Prefix == "" {
|
|
break
|
|
}
|
|
subnet.SourceIPs = v.GetStringSlice(key + "source_ips")
|
|
cfg.Subnets = append(cfg.Subnets, subnet)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
|
|
attributes := make(map[string]string)
|
|
for i := 0; ; i++ {
|
|
key := cfgTracingAttributes + "." + strconv.Itoa(i) + "."
|
|
attrKey := v.GetString(key + "key")
|
|
attrValue := v.GetString(key + "value")
|
|
if attrKey == "" {
|
|
break
|
|
}
|
|
|
|
if _, ok := attributes[attrKey]; ok {
|
|
return nil, fmt.Errorf("tracing attribute key %s defined more than once", attrKey)
|
|
}
|
|
|
|
if attrValue == "" {
|
|
return nil, fmt.Errorf("empty tracing attribute value for key %s", attrKey)
|
|
}
|
|
|
|
attributes[attrKey] = attrValue
|
|
}
|
|
|
|
return attributes, nil
|
|
}
|