From 2ccc778fca9a0e6dff405e853815a495a6e1eb3d Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Tue, 7 Jul 2020 14:25:13 +0300 Subject: [PATCH] Move config to constants --- app-logger.go | 16 +++---- app-metrics.go | 2 +- app-profiler.go | 2 +- app-settings.go | 103 +++++++++++++++++++++++++++++++++++-------- app.go | 31 +++++++++---- neofs/pool/pool.go | 106 ++++++++++++++++++--------------------------- 6 files changed, 160 insertions(+), 100 deletions(-) diff --git a/app-logger.go b/app-logger.go index 3bb969e52..b3ad7d402 100644 --- a/app-logger.go +++ b/app-logger.go @@ -69,21 +69,21 @@ func newLogger(v *viper.Viper) *zap.Logger { Thereafter: defaultSamplingThereafter, } - if val := v.GetInt("logger.sampling.initial"); val > 0 { + if val := v.GetInt(cfgLoggerSamplingInitial); val > 0 { c.Sampling.Initial = val } - if val := v.GetInt("logger.sampling.thereafter"); val > 0 { + if val := v.GetInt(cfgLoggerSamplingThereafter); val > 0 { c.Sampling.Thereafter = val } } // logger level - c.Level = safeLevel(v.GetString("logger.level")) - traceLvl := safeLevel(v.GetString("logger.trace_level")) + c.Level = safeLevel(v.GetString(cfgLoggerLevel)) + traceLvl := safeLevel(v.GetString(cfgLoggerTraceLevel)) // logger format - switch f := v.GetString("logger.format"); strings.ToLower(f) { + switch f := v.GetString(cfgLoggerFormat); strings.ToLower(f) { case formatConsole: c.Encoding = formatConsole default: @@ -100,12 +100,12 @@ func newLogger(v *viper.Viper) *zap.Logger { panic(err) } - if v.GetBool("logger.no_disclaimer") { + if v.GetBool(cfgLoggerNoDisclaimer) { return l } - name := v.GetString("app.name") - version := v.GetString("app.version") + name := v.GetString(cfgApplicationName) + version := v.GetString(cfgApplicationVersion) return l.With( zap.String("app_name", name), diff --git a/app-metrics.go b/app-metrics.go index d3bc35c79..fa3164605 100644 --- a/app-metrics.go +++ b/app-metrics.go @@ -7,7 +7,7 @@ import ( ) func attachMetrics(v *viper.Viper, r *mux.Router) { - if !v.GetBool("metrics") { + if !v.GetBool(cfgEnableMetrics) { return } diff --git a/app-profiler.go b/app-profiler.go index 2dc3c93a5..1da304711 100644 --- a/app-profiler.go +++ b/app-profiler.go @@ -8,7 +8,7 @@ import ( ) func attachProfiler(v *viper.Viper, r *mux.Router) { - if !v.GetBool("pprof") { + if !v.GetBool(cfgEnableProfiler) { return } diff --git a/app-settings.go b/app-settings.go index ec5b17898..c882b94c3 100644 --- a/app-settings.go +++ b/app-settings.go @@ -11,6 +11,8 @@ import ( "strings" "time" + "github.com/minio/minio/neofs/pool" + "github.com/minio/minio/misc" "github.com/nspcc-dev/neofs-api-go/refs" @@ -38,6 +40,41 @@ const ( defaultKeepaliveTimeout = 10 * time.Second ) +const ( // settings + // Logger: + cfgLoggerLevel = "logger.level" + cfgLoggerFormat = "logger.format" + cfgLoggerTraceLevel = "logger.trace_level" + cfgLoggerNoDisclaimer = "logger.no_disclaimer" + cfgLoggerSamplingInitial = "logger.sampling.initial" + cfgLoggerSamplingThereafter = "logger.sampling.thereafter" + + // KeepAlive + cfgKeepaliveTime = "keepalive.time" + cfgKeepaliveTimeout = "keepalive.timeout" + cfgKeepalivePermitWithoutStream = "keepalive.permit_without_stream" + + // Timeouts + cfgConnectionTTL = "con_ttl" + cfgConnectTimeout = "connect_timeout" + cfgRequestTimeout = "request_timeout" + cfgRebalanceTimer = "rebalance_timer" + + // gRPC + cfgGRPCVerbose = "verbose" + cfgGRPCPrivateKey = "key" + + // Metrics / Profiler / Web + cfgEnableMetrics = "metrics" + cfgEnableProfiler = "pprof" + cfgListenAddress = "listen_address" + + // Application + cfgApplicationName = "app.name" + cfgApplicationVersion = "app.version" + cfgApplicationBuildTime = "app.build_time" +) + func (empty) Read([]byte) (int, error) { return 0, io.EOF } func fetchKey(l *zap.Logger, v *viper.Viper) *ecdsa.PrivateKey { @@ -66,6 +103,30 @@ func fetchKey(l *zap.Logger, v *viper.Viper) *ecdsa.PrivateKey { return key } } + +func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.Peer { + peers := make([]pool.Peer, 0) + + for i := 0; ; i++ { + + key := "peers." + strconv.Itoa(i) + "." + address := v.GetString(key + "address") + weight := v.GetFloat64(key + "weight") + + if address == "" { + l.Warn("skip, empty address") + break + } + + peers = append(peers, pool.Peer{ + Address: address, + Weight: weight, + }) + } + + return peers +} + func newSettings() *viper.Viper { v := viper.New() @@ -78,44 +139,50 @@ func newSettings() *viper.Viper { flags := pflag.NewFlagSet("commandline", pflag.ExitOnError) flags.SortFlags = false - flags.Bool("pprof", false, "enable pprof") - flags.Bool("metrics", false, "enable prometheus") + flags.Bool(cfgEnableProfiler, false, "enable pprof") + flags.Bool(cfgEnableMetrics, false, "enable prometheus") help := flags.BoolP("help", "h", false, "show help") version := flags.BoolP("version", "v", false, "show version") - flags.String("key", generated, `"`+generated+`" to generate key, path to private key file, hex string or wif`) + flags.String(cfgGRPCPrivateKey, generated, `"`+generated+`" to generate key, path to private key file, hex string or wif`) - flags.Bool("verbose", false, "debug gRPC connections") - flags.Duration("request_timeout", defaultRequestTimeout, "gRPC request timeout") - flags.Duration("connect_timeout", defaultConnectTimeout, "gRPC connect timeout") - flags.Duration("rebalance_timer", defaultRebalanceTimer, "gRPC connection rebalance timer") + flags.Bool(cfgGRPCVerbose, false, "debug gRPC connections") + flags.Duration(cfgRequestTimeout, defaultRequestTimeout, "gRPC request timeout") + flags.Duration(cfgConnectTimeout, defaultConnectTimeout, "gRPC connect timeout") + flags.Duration(cfgRebalanceTimer, defaultRebalanceTimer, "gRPC connection rebalance timer") - ttl := flags.DurationP("conn_ttl", "t", defaultTTL, "gRPC connection time to live") + ttl := flags.DurationP(cfgConnectionTTL, "t", defaultTTL, "gRPC connection time to live") - flags.String("listen_address", "0.0.0.0:8080", "S3 Gateway listen address") + flags.String(cfgListenAddress, "0.0.0.0:8080", "S3 Gateway listen address") peers := flags.StringArrayP("peers", "p", nil, "NeoFS nodes") // set prefers: +<<<<<<< Updated upstream v.Set("app.name", misc.ApplicationName) v.Set("app.version", misc.Version) v.Set("app.build_time", misc.Build) +======= + v.Set(cfgApplicationName, "neofs-gw") + v.Set(cfgApplicationVersion, misc.Version) + v.Set(cfgApplicationBuildTime, misc.Build) +>>>>>>> Stashed changes // set defaults: // logger: - v.SetDefault("logger.level", "debug") - v.SetDefault("logger.format", "console") - v.SetDefault("logger.trace_level", "fatal") - v.SetDefault("logger.no_disclaimer", true) - v.SetDefault("logger.sampling.initial", 1000) - v.SetDefault("logger.sampling.thereafter", 1000) + v.SetDefault(cfgLoggerLevel, "debug") + v.SetDefault(cfgLoggerFormat, "console") + v.SetDefault(cfgLoggerTraceLevel, "fatal") + v.SetDefault(cfgLoggerNoDisclaimer, true) + v.SetDefault(cfgLoggerSamplingInitial, 1000) + v.SetDefault(cfgLoggerSamplingThereafter, 1000) // keepalive: // If set below 10s, a minimum value of 10s will be used instead. - v.SetDefault("keepalive.time", defaultKeepaliveTime) - v.SetDefault("keepalive.timeout", defaultKeepaliveTimeout) - v.SetDefault("keepalive.permit_without_stream", true) + v.SetDefault(cfgKeepaliveTime, defaultKeepaliveTime) + v.SetDefault(cfgKeepaliveTimeout, defaultKeepaliveTimeout) + v.SetDefault(cfgKeepalivePermitWithoutStream, true) if err := v.BindPFlags(flags); err != nil { panic(err) diff --git a/app.go b/app.go index 616f33978..7ef0090bd 100644 --- a/app.go +++ b/app.go @@ -4,16 +4,15 @@ import ( "context" "time" - crypto "github.com/nspcc-dev/neofs-crypto" - + minio "github.com/minio/minio/cmd" "github.com/minio/minio/neofs/layer" + "github.com/minio/minio/neofs/pool" "github.com/minio/minio/pkg/auth" "github.com/nspcc-dev/neofs-api-go/refs" - - minio "github.com/minio/minio/cmd" - "github.com/minio/minio/neofs/pool" + crypto "github.com/nspcc-dev/neofs-crypto" "github.com/spf13/viper" "go.uber.org/zap" + "google.golang.org/grpc/keepalive" ) type ( @@ -45,15 +44,31 @@ func newApp(l *zap.Logger, v *viper.Viper) *App { reqTimeout = defaultRequestTimeout ) - if v := v.GetDuration("connect_timeout"); v > 0 { + if v := v.GetDuration(cfgConnectTimeout); v > 0 { conTimeout = v } - if v := v.GetDuration("request_timeout"); v > 0 { + if v := v.GetDuration(cfgRequestTimeout); v > 0 { reqTimeout = v } - if cli, err = pool.New(l, v, key); err != nil { + poolConfig := &pool.Config{ + ConnectionTTL: v.GetDuration(cfgConnectionTTL), + ConnectTimeout: v.GetDuration(cfgConnectTimeout), + RequestTimeout: v.GetDuration(cfgRequestTimeout), + + Peers: fetchPeers(l, v), + + Logger: l, + PrivateKey: key, + + GRPCLogger: gRPCLogger(l), + GRPCVerbose: v.GetBool(cfgGRPCVerbose), + + ClientParameters: keepalive.ClientParameters{}, + } + + if cli, err = pool.New(poolConfig); err != nil { l.Fatal("could not prepare pool connections", zap.Error(err)) } diff --git a/neofs/pool/pool.go b/neofs/pool/pool.go index c129986b2..7bd919b69 100644 --- a/neofs/pool/pool.go +++ b/neofs/pool/pool.go @@ -7,14 +7,14 @@ import ( "encoding/binary" "math/rand" "sort" - "strconv" "sync" "time" + "google.golang.org/grpc/grpclog" + "github.com/nspcc-dev/neofs-api-go/service" "github.com/nspcc-dev/neofs-api-go/state" "github.com/pkg/errors" - "github.com/spf13/viper" "go.uber.org/atomic" "go.uber.org/zap" "google.golang.org/grpc" @@ -43,6 +43,27 @@ type ( ReBalance(ctx context.Context) } + Peer struct { + Address string + Weight float64 + } + + Config struct { + keepalive.ClientParameters + + ConnectionTTL time.Duration + ConnectTimeout time.Duration + RequestTimeout time.Duration + + Peers []Peer + + GRPCVerbose bool + GRPCLogger grpclog.LoggerV2 + + Logger *zap.Logger + PrivateKey *ecdsa.PrivateKey + } + pool struct { log *zap.Logger @@ -66,50 +87,35 @@ type ( } ) -const ( - minimumTTLInMinutes = 5 - - defaultTTL = minimumTTLInMinutes * time.Minute - - defaultRequestTimeout = 15 * time.Second - defaultConnectTimeout = 30 * time.Second - - defaultKeepaliveTime = 10 * time.Second - defaultKeepaliveTimeout = 10 * time.Second -) - var ( errBootstrapping = errors.New("bootstrapping") errEmptyConnection = errors.New("empty connection") errNoHealthyConnections = errors.New("no active connections") - - _ = New ) -func New(l *zap.Logger, v *viper.Viper, key *ecdsa.PrivateKey) (Pool, error) { +func New(cfg *Config) (Pool, error) { p := &pool{ - log: l, + log: cfg.Logger, Mutex: new(sync.Mutex), keys: make([]uint32, 0), nodes: make([]*node, 0), conns: make(map[uint32][]*node), - ttl: defaultTTL, - currentIdx: atomic.NewInt32(-1), - // fill with defaults: - reqTimeout: defaultRequestTimeout, - conTimeout: defaultConnectTimeout, - opts: keepalive.ClientParameters{ - Time: defaultKeepaliveTime, - Timeout: defaultKeepaliveTimeout, - PermitWithoutStream: true, - }, + ttl: cfg.ConnectionTTL, + + conTimeout: cfg.ConnectTimeout, + reqTimeout: cfg.RequestTimeout, + opts: cfg.ClientParameters, unhealthy: atomic.NewError(errBootstrapping), } + if cfg.GRPCVerbose { + grpclog.SetLoggerV2(cfg.GRPCLogger) + } + buf := make([]byte, 8) if _, err := crand.Read(buf); err != nil { return nil, err @@ -117,56 +123,28 @@ func New(l *zap.Logger, v *viper.Viper, key *ecdsa.PrivateKey) (Pool, error) { seed := binary.BigEndian.Uint64(buf) rand.Seed(int64(seed)) - l.Info("used random seed", zap.Uint64("seed", seed)) + cfg.Logger.Info("used random seed", zap.Uint64("seed", seed)) p.reqHealth = new(state.HealthRequest) p.reqHealth.SetTTL(service.NonForwardingTTL) - if err := service.SignRequestData(key, p.reqHealth); err != nil { + if err := service.SignRequestData(cfg.PrivateKey, p.reqHealth); err != nil { return nil, errors.Wrap(err, "could not sign `HealthRequest`") } - if val := v.GetDuration("conn_ttl"); val > 0 { - p.ttl = val - } - - if val := v.GetDuration("request_timeout"); val > 0 { - p.reqTimeout = val - } - - if val := v.GetDuration("connect_timeout"); val > 0 { - p.conTimeout = val - } - - if val := v.GetDuration("keepalive.time"); val > 0 { - p.opts.Time = val - } - - if val := v.GetDuration("keepalive.timeout"); val > 0 { - p.opts.Timeout = val - } - - if v.IsSet("keepalive.permit_without_stream") { - p.opts.PermitWithoutStream = v.GetBool("keepalive.permit_without_stream") - } - - for i := 0; ; i++ { - key := "peers." + strconv.Itoa(i) + "." - address := v.GetString(key + "address") - weight := v.GetFloat64(key + "weight") - - if address == "" { - l.Warn("skip, empty address") + for i := range cfg.Peers { + if cfg.Peers[i].Address == "" { + cfg.Logger.Warn("skip, empty address") break } p.nodes = append(p.nodes, &node{ index: int32(i), - address: address, - weight: uint32(weight * 100), + address: cfg.Peers[i].Address, + weight: uint32(cfg.Peers[i].Weight * 100), }) - l.Info("add new peer", + cfg.Logger.Info("add new peer", zap.String("address", p.nodes[i].address), zap.Uint32("weight", p.nodes[i].weight)) }