Roman Loginov
70846fdaec
We can always add interceptors to the grpc connection to the storage, since the actual use will be controlled by the configuration from the frostfs-observability library. Signed-off-by: Roman Loginov <r.loginov@yadro.com>
696 lines
19 KiB
Go
696 lines
19 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"path"
|
|
"runtime"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
|
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
|
"git.frostfs.info/TrueCloudLab/zapjournald"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
"github.com/spf13/pflag"
|
|
"github.com/spf13/viper"
|
|
"github.com/ssgreg/journald"
|
|
"github.com/valyala/fasthttp"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
const (
|
|
destinationStdout = "stdout"
|
|
destinationJournald = "journald"
|
|
)
|
|
|
|
const (
|
|
defaultRebalanceTimer = 60 * time.Second
|
|
defaultRequestTimeout = 15 * time.Second
|
|
defaultConnectTimeout = 10 * time.Second
|
|
defaultStreamTimeout = 10 * time.Second
|
|
|
|
defaultLoggerSamplerInterval = 1 * time.Second
|
|
|
|
defaultShutdownTimeout = 15 * time.Second
|
|
|
|
defaultPoolErrorThreshold uint32 = 100
|
|
|
|
defaultSoftMemoryLimit = math.MaxInt64
|
|
|
|
defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb
|
|
|
|
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
|
|
|
defaultReconnectInterval = time.Minute
|
|
|
|
cfgServer = "server"
|
|
cfgTLSEnabled = "tls.enabled"
|
|
cfgTLSCertFile = "tls.cert_file"
|
|
cfgTLSKeyFile = "tls.key_file"
|
|
|
|
cfgReconnectInterval = "reconnect_interval"
|
|
|
|
cfgIndexPageEnabled = "index_page.enabled"
|
|
cfgIndexPageTemplatePath = "index_page.template_path"
|
|
|
|
// Web.
|
|
cfgWebReadBufferSize = "web.read_buffer_size"
|
|
cfgWebWriteBufferSize = "web.write_buffer_size"
|
|
cfgWebReadTimeout = "web.read_timeout"
|
|
cfgWebWriteTimeout = "web.write_timeout"
|
|
cfgWebStreamRequestBody = "web.stream_request_body"
|
|
cfgWebMaxRequestBodySize = "web.max_request_body_size"
|
|
|
|
// Metrics / Profiler.
|
|
cfgPrometheusEnabled = "prometheus.enabled"
|
|
cfgPrometheusAddress = "prometheus.address"
|
|
cfgPprofEnabled = "pprof.enabled"
|
|
cfgPprofAddress = "pprof.address"
|
|
|
|
// Tracing ...
|
|
cfgTracingEnabled = "tracing.enabled"
|
|
cfgTracingExporter = "tracing.exporter"
|
|
cfgTracingEndpoint = "tracing.endpoint"
|
|
cfgTracingTrustedCa = "tracing.trusted_ca"
|
|
|
|
// Pool config.
|
|
cfgConTimeout = "connect_timeout"
|
|
cfgStreamTimeout = "stream_timeout"
|
|
cfgReqTimeout = "request_timeout"
|
|
cfgRebalance = "rebalance_timer"
|
|
cfgPoolErrorThreshold = "pool_error_threshold"
|
|
|
|
// Logger.
|
|
cfgLoggerLevel = "logger.level"
|
|
cfgLoggerDestination = "logger.destination"
|
|
|
|
cfgLoggerSamplingEnabled = "logger.sampling.enabled"
|
|
cfgLoggerSamplingInitial = "logger.sampling.initial"
|
|
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
|
|
cfgLoggerSamplingInterval = "logger.sampling.interval"
|
|
|
|
// Wallet.
|
|
cfgWalletPassphrase = "wallet.passphrase"
|
|
cfgWalletPath = "wallet.path"
|
|
cfgWalletAddress = "wallet.address"
|
|
|
|
// Uploader Header.
|
|
cfgUploaderHeaderEnableDefaultTimestamp = "upload_header.use_default_timestamp"
|
|
|
|
// Peers.
|
|
cfgPeers = "peers"
|
|
|
|
// NeoGo.
|
|
cfgRPCEndpoint = "rpc_endpoint"
|
|
|
|
// Resolving.
|
|
cfgResolveOrder = "resolve_order"
|
|
|
|
// Zip compression.
|
|
cfgZipCompression = "zip.compression"
|
|
|
|
// Runtime.
|
|
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
|
|
|
// Enabling client side object preparing for PUT operations.
|
|
cfgClientCut = "frostfs.client_cut"
|
|
// Sets max buffer size for read payload in put operations.
|
|
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
|
// Configuration of parameters of requests to FrostFS.
|
|
// Sets max attempt to make successful tree request.
|
|
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
|
|
|
|
// Caching.
|
|
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
|
|
cfgBucketsCacheSize = "cache.buckets.size"
|
|
|
|
// Bucket resolving options.
|
|
cfgResolveNamespaceHeader = "resolve_bucket.namespace_header"
|
|
cfgResolveDefaultNamespaces = "resolve_bucket.default_namespaces"
|
|
|
|
// Command line args.
|
|
cmdHelp = "help"
|
|
cmdVersion = "version"
|
|
cmdPprof = "pprof"
|
|
cmdMetrics = "metrics"
|
|
cmdWallet = "wallet"
|
|
cmdAddress = "address"
|
|
cmdConfig = "config"
|
|
cmdConfigDir = "config-dir"
|
|
cmdListenAddress = "listen_address"
|
|
)
|
|
|
|
var ignore = map[string]struct{}{
|
|
cfgPeers: {},
|
|
cmdHelp: {},
|
|
cmdVersion: {},
|
|
}
|
|
|
|
func settings() *viper.Viper {
|
|
v := viper.New()
|
|
v.AutomaticEnv()
|
|
v.SetEnvPrefix(Prefix)
|
|
v.AllowEmptyEnv(true)
|
|
v.SetConfigType("yaml")
|
|
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
|
|
|
// flags setup:
|
|
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
|
|
flags.SetOutput(os.Stdout)
|
|
flags.SortFlags = false
|
|
|
|
flags.Bool(cmdPprof, false, "enable pprof")
|
|
flags.Bool(cmdMetrics, false, "enable prometheus")
|
|
|
|
help := flags.BoolP(cmdHelp, "h", false, "show help")
|
|
version := flags.BoolP(cmdVersion, "v", false, "show version")
|
|
|
|
flags.StringP(cmdWallet, "w", "", `path to the wallet`)
|
|
flags.String(cmdAddress, "", `address of wallet account`)
|
|
flags.StringArray(cmdConfig, nil, "config paths")
|
|
flags.String(cmdConfigDir, "", "config dir path")
|
|
flags.Duration(cfgConTimeout, defaultConnectTimeout, "gRPC connect timeout")
|
|
flags.Duration(cfgStreamTimeout, defaultStreamTimeout, "gRPC individual message timeout")
|
|
flags.Duration(cfgReqTimeout, defaultRequestTimeout, "gRPC request timeout")
|
|
flags.Duration(cfgRebalance, defaultRebalanceTimer, "gRPC connection rebalance timer")
|
|
|
|
flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen")
|
|
flags.String(cfgTLSCertFile, "", "TLS certificate path")
|
|
flags.String(cfgTLSKeyFile, "", "TLS key path")
|
|
peers := flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes")
|
|
|
|
resolveMethods := flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order")
|
|
|
|
// set defaults:
|
|
|
|
// logger:
|
|
v.SetDefault(cfgLoggerLevel, "debug")
|
|
v.SetDefault(cfgLoggerDestination, "stdout")
|
|
v.SetDefault(cfgLoggerSamplingEnabled, false)
|
|
v.SetDefault(cfgLoggerSamplingThereafter, 100)
|
|
v.SetDefault(cfgLoggerSamplingInitial, 100)
|
|
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
|
|
|
|
// pool:
|
|
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
|
|
|
v.SetDefault(cfgIndexPageEnabled, false)
|
|
v.SetDefault(cfgIndexPageTemplatePath, "")
|
|
|
|
// frostfs:
|
|
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
|
|
|
// web-server:
|
|
v.SetDefault(cfgWebReadBufferSize, 4096)
|
|
v.SetDefault(cfgWebWriteBufferSize, 4096)
|
|
v.SetDefault(cfgWebReadTimeout, time.Minute*10)
|
|
v.SetDefault(cfgWebWriteTimeout, time.Minute*5)
|
|
v.SetDefault(cfgWebStreamRequestBody, true)
|
|
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
|
|
|
|
// upload header
|
|
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
|
|
|
|
// zip:
|
|
v.SetDefault(cfgZipCompression, false)
|
|
|
|
// metrics
|
|
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
|
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
|
|
|
// resolve bucket
|
|
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
|
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
|
|
|
|
// Binding flags
|
|
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
|
panic(err)
|
|
}
|
|
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := v.BindPFlags(flags); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
|
|
panic(err)
|
|
}
|
|
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
|
|
panic(err)
|
|
}
|
|
if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if err := flags.Parse(os.Args); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
|
|
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
|
|
}
|
|
|
|
if resolveMethods != nil {
|
|
v.SetDefault(cfgResolveOrder, *resolveMethods)
|
|
}
|
|
|
|
switch {
|
|
case help != nil && *help:
|
|
fmt.Printf("FrostFS HTTP Gateway %s\n", Version)
|
|
flags.PrintDefaults()
|
|
|
|
fmt.Println()
|
|
fmt.Println("Default environments:")
|
|
fmt.Println()
|
|
keys := v.AllKeys()
|
|
sort.Strings(keys)
|
|
|
|
for i := range keys {
|
|
if _, ok := ignore[keys[i]]; ok {
|
|
continue
|
|
}
|
|
|
|
defaultValue := v.GetString(keys[i])
|
|
if len(defaultValue) == 0 {
|
|
continue
|
|
}
|
|
|
|
k := strings.Replace(keys[i], ".", "_", -1)
|
|
fmt.Printf("%s_%s = %s\n", Prefix, strings.ToUpper(k), defaultValue)
|
|
}
|
|
|
|
fmt.Println()
|
|
fmt.Println("Peers preset:")
|
|
fmt.Println()
|
|
|
|
fmt.Printf("%s_%s_[N]_ADDRESS = string\n", Prefix, strings.ToUpper(cfgPeers))
|
|
fmt.Printf("%s_%s_[N]_WEIGHT = float\n", Prefix, strings.ToUpper(cfgPeers))
|
|
|
|
os.Exit(0)
|
|
case version != nil && *version:
|
|
fmt.Printf("FrostFS HTTP Gateway\nVersion: %s\nGoVersion: %s\n", Version, runtime.Version())
|
|
os.Exit(0)
|
|
}
|
|
|
|
if err := readInConfig(v); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if peers != nil && len(*peers) > 0 {
|
|
for i := range *peers {
|
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", (*peers)[i])
|
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
|
|
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
|
|
}
|
|
}
|
|
|
|
return v
|
|
}
|
|
|
|
func readInConfig(v *viper.Viper) error {
|
|
if v.IsSet(cmdConfig) {
|
|
if err := readConfig(v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if v.IsSet(cmdConfigDir) {
|
|
if err := readConfigDir(v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func readConfigDir(v *viper.Viper) error {
|
|
cfgSubConfigDir := v.GetString(cmdConfigDir)
|
|
entries, err := os.ReadDir(cfgSubConfigDir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
if entry.IsDir() {
|
|
continue
|
|
}
|
|
ext := path.Ext(entry.Name())
|
|
if ext != ".yaml" && ext != ".yml" {
|
|
continue
|
|
}
|
|
|
|
if err = mergeConfig(v, path.Join(cfgSubConfigDir, entry.Name())); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func readConfig(v *viper.Viper) error {
|
|
for _, fileName := range v.GetStringSlice(cmdConfig) {
|
|
if err := mergeConfig(v, fileName); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func mergeConfig(v *viper.Viper, fileName string) error {
|
|
cfgFile, err := os.Open(fileName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if errClose := cfgFile.Close(); errClose != nil {
|
|
panic(errClose)
|
|
}
|
|
}()
|
|
|
|
return v.MergeConfig(cfgFile)
|
|
}
|
|
|
|
func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
|
|
lvl, err := getLogLevel(v)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
dest := v.GetString(cfgLoggerDestination)
|
|
|
|
switch dest {
|
|
case destinationStdout:
|
|
return newStdoutLogger(v, lvl)
|
|
case destinationJournald:
|
|
return newJournaldLogger(v, lvl)
|
|
default:
|
|
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
|
}
|
|
}
|
|
|
|
// newStdoutLogger constructs a zap.Logger instance for current application.
|
|
// Panics on failure.
|
|
//
|
|
// Logger is built from zap's production logging configuration with:
|
|
// - parameterized level (debug by default)
|
|
// - console encoding
|
|
// - ISO8601 time encoding
|
|
//
|
|
// Logger records a stack trace for all messages at or above fatal level.
|
|
//
|
|
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
|
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
|
|
stdout := zapcore.AddSync(os.Stderr)
|
|
level := zap.NewAtomicLevelAt(lvl)
|
|
|
|
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
|
|
consoleOutCore = samplingEnabling(v, consoleOutCore)
|
|
|
|
l := zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
|
|
return l, level
|
|
}
|
|
|
|
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
|
|
level := zap.NewAtomicLevelAt(lvl)
|
|
|
|
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
|
|
|
|
core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
|
coreWithContext := core.With([]zapcore.Field{
|
|
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
|
zapjournald.SyslogIdentifier(),
|
|
zapjournald.SyslogPid(),
|
|
})
|
|
|
|
coreWithContext = samplingEnabling(v, coreWithContext)
|
|
|
|
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
|
|
|
|
return l, level
|
|
}
|
|
|
|
func newLogEncoder() zapcore.Encoder {
|
|
c := zap.NewProductionEncoderConfig()
|
|
c.EncodeTime = zapcore.ISO8601TimeEncoder
|
|
|
|
return zapcore.NewConsoleEncoder(c)
|
|
}
|
|
|
|
func samplingEnabling(v *viper.Viper, core zapcore.Core) zapcore.Core {
|
|
// Zap samples by logging the first cgfLoggerSamplingInitial entries with a given level
|
|
// and message within the specified time interval.
|
|
// In the above config, only the first cgfLoggerSamplingInitial log entries with the same level and message
|
|
// are recorded in cfgLoggerSamplingInterval interval. Every other log entry will be dropped within the interval since
|
|
// cfgLoggerSamplingThereafter is specified here.
|
|
if v.GetBool(cfgLoggerSamplingEnabled) {
|
|
core = zapcore.NewSamplerWithOptions(
|
|
core,
|
|
v.GetDuration(cfgLoggerSamplingInterval),
|
|
v.GetInt(cfgLoggerSamplingInitial),
|
|
v.GetInt(cfgLoggerSamplingThereafter),
|
|
)
|
|
}
|
|
|
|
return core
|
|
}
|
|
|
|
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
|
var lvl zapcore.Level
|
|
lvlStr := v.GetString(cfgLoggerLevel)
|
|
err := lvl.UnmarshalText([]byte(lvlStr))
|
|
if err != nil {
|
|
return lvl, fmt.Errorf("incorrect logger level configuration %s (%v), "+
|
|
"value should be one of %v", lvlStr, err, [...]zapcore.Level{
|
|
zapcore.DebugLevel,
|
|
zapcore.InfoLevel,
|
|
zapcore.WarnLevel,
|
|
zapcore.ErrorLevel,
|
|
zapcore.DPanicLevel,
|
|
zapcore.PanicLevel,
|
|
zapcore.FatalLevel,
|
|
})
|
|
}
|
|
return lvl, nil
|
|
}
|
|
|
|
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
|
reconnect := cfg.GetDuration(cfgReconnectInterval)
|
|
if reconnect <= 0 {
|
|
reconnect = defaultReconnectInterval
|
|
}
|
|
|
|
return reconnect
|
|
}
|
|
|
|
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
|
var servers []ServerInfo
|
|
seen := make(map[string]struct{})
|
|
|
|
for i := 0; ; i++ {
|
|
key := cfgServer + "." + strconv.Itoa(i) + "."
|
|
|
|
var serverInfo ServerInfo
|
|
serverInfo.Address = v.GetString(key + "address")
|
|
serverInfo.TLS.Enabled = v.GetBool(key + cfgTLSEnabled)
|
|
serverInfo.TLS.KeyFile = v.GetString(key + cfgTLSKeyFile)
|
|
serverInfo.TLS.CertFile = v.GetString(key + cfgTLSCertFile)
|
|
|
|
if serverInfo.Address == "" {
|
|
break
|
|
}
|
|
|
|
if _, ok := seen[serverInfo.Address]; ok {
|
|
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
|
|
continue
|
|
}
|
|
seen[serverInfo.Address] = struct{}{}
|
|
servers = append(servers, serverInfo)
|
|
}
|
|
|
|
return servers
|
|
}
|
|
|
|
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
|
key, err := getFrostFSKey(cfg, logger)
|
|
if err != nil {
|
|
logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
|
}
|
|
|
|
var prm pool.InitParameters
|
|
var prmTree treepool.InitParameters
|
|
|
|
prm.SetKey(&key.PrivateKey)
|
|
prmTree.SetKey(key)
|
|
logger.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
|
|
|
for _, peer := range fetchPeers(logger, cfg) {
|
|
prm.AddNode(peer)
|
|
prmTree.AddNode(peer)
|
|
}
|
|
|
|
connTimeout := cfg.GetDuration(cfgConTimeout)
|
|
if connTimeout <= 0 {
|
|
connTimeout = defaultConnectTimeout
|
|
}
|
|
prm.SetNodeDialTimeout(connTimeout)
|
|
prmTree.SetNodeDialTimeout(connTimeout)
|
|
|
|
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
|
|
if streamTimeout <= 0 {
|
|
streamTimeout = defaultStreamTimeout
|
|
}
|
|
prm.SetNodeStreamTimeout(streamTimeout)
|
|
prmTree.SetNodeStreamTimeout(streamTimeout)
|
|
|
|
healthCheckTimeout := cfg.GetDuration(cfgReqTimeout)
|
|
if healthCheckTimeout <= 0 {
|
|
healthCheckTimeout = defaultRequestTimeout
|
|
}
|
|
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
|
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
|
|
|
rebalanceInterval := cfg.GetDuration(cfgRebalance)
|
|
if rebalanceInterval <= 0 {
|
|
rebalanceInterval = defaultRebalanceTimer
|
|
}
|
|
prm.SetClientRebalanceInterval(rebalanceInterval)
|
|
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
|
|
|
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
|
if errorThreshold <= 0 {
|
|
errorThreshold = defaultPoolErrorThreshold
|
|
}
|
|
prm.SetErrorThreshold(errorThreshold)
|
|
prm.SetLogger(logger)
|
|
prmTree.SetLogger(logger)
|
|
|
|
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts))
|
|
|
|
interceptors := []grpc.DialOption{
|
|
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
|
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
|
|
}
|
|
prm.SetGRPCDialOptions(interceptors...)
|
|
prmTree.SetGRPCDialOptions(interceptors...)
|
|
|
|
p, err := pool.NewPool(prm)
|
|
if err != nil {
|
|
logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
|
|
}
|
|
|
|
if err = p.Dial(ctx); err != nil {
|
|
logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
|
}
|
|
|
|
treePool, err := treepool.NewPool(prmTree)
|
|
if err != nil {
|
|
logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
|
|
}
|
|
if err = treePool.Dial(ctx); err != nil {
|
|
logger.Fatal(logs.FailedToDialTreePool, zap.Error(err))
|
|
}
|
|
|
|
return p, treePool, key
|
|
}
|
|
|
|
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
|
|
var nodes []pool.NodeParam
|
|
for i := 0; ; i++ {
|
|
key := cfgPeers + "." + strconv.Itoa(i) + "."
|
|
address := v.GetString(key + "address")
|
|
weight := v.GetFloat64(key + "weight")
|
|
priority := v.GetInt(key + "priority")
|
|
|
|
if address == "" {
|
|
break
|
|
}
|
|
if weight <= 0 { // unspecified or wrong
|
|
weight = 1
|
|
}
|
|
if priority <= 0 { // unspecified or wrong
|
|
priority = 1
|
|
}
|
|
|
|
nodes = append(nodes, pool.NewNodeParam(priority, address, weight))
|
|
|
|
l.Info(logs.AddedStoragePeer,
|
|
zap.Int("priority", priority),
|
|
zap.String("address", address),
|
|
zap.Float64("weight", weight))
|
|
}
|
|
|
|
return nodes
|
|
}
|
|
|
|
func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
|
|
softMemoryLimit := cfg.GetSizeInBytes(cfgSoftMemoryLimit)
|
|
if softMemoryLimit <= 0 {
|
|
softMemoryLimit = defaultSoftMemoryLimit
|
|
}
|
|
|
|
return int64(softMemoryLimit)
|
|
}
|
|
|
|
func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|
cacheCfg := cache.DefaultBucketConfig(l)
|
|
|
|
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime)
|
|
cacheCfg.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Size)
|
|
|
|
return cacheCfg
|
|
}
|
|
|
|
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
|
if v.IsSet(cfgEntry) {
|
|
lifetime := v.GetDuration(cfgEntry)
|
|
if lifetime <= 0 {
|
|
l.Error(logs.InvalidLifetimeUsingDefaultValue,
|
|
zap.String("parameter", cfgEntry),
|
|
zap.Duration("value in config", lifetime),
|
|
zap.Duration("default", defaultValue))
|
|
} else {
|
|
return lifetime
|
|
}
|
|
}
|
|
|
|
return defaultValue
|
|
}
|
|
|
|
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
|
if v.IsSet(cfgEntry) {
|
|
size := v.GetInt(cfgEntry)
|
|
if size <= 0 {
|
|
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
|
|
zap.String("parameter", cfgEntry),
|
|
zap.Int("value in config", size),
|
|
zap.Int("default", defaultValue))
|
|
} else {
|
|
return size
|
|
}
|
|
}
|
|
|
|
return defaultValue
|
|
}
|