Move config to constants

This commit is contained in:
Evgeniy Kulikov 2020-07-07 14:25:13 +03:00
parent ac500c8d09
commit 2ccc778fca
6 changed files with 160 additions and 100 deletions

View file

@ -69,21 +69,21 @@ func newLogger(v *viper.Viper) *zap.Logger {
Thereafter: defaultSamplingThereafter, Thereafter: defaultSamplingThereafter,
} }
if val := v.GetInt("logger.sampling.initial"); val > 0 { if val := v.GetInt(cfgLoggerSamplingInitial); val > 0 {
c.Sampling.Initial = val c.Sampling.Initial = val
} }
if val := v.GetInt("logger.sampling.thereafter"); val > 0 { if val := v.GetInt(cfgLoggerSamplingThereafter); val > 0 {
c.Sampling.Thereafter = val c.Sampling.Thereafter = val
} }
} }
// logger level // logger level
c.Level = safeLevel(v.GetString("logger.level")) c.Level = safeLevel(v.GetString(cfgLoggerLevel))
traceLvl := safeLevel(v.GetString("logger.trace_level")) traceLvl := safeLevel(v.GetString(cfgLoggerTraceLevel))
// logger format // logger format
switch f := v.GetString("logger.format"); strings.ToLower(f) { switch f := v.GetString(cfgLoggerFormat); strings.ToLower(f) {
case formatConsole: case formatConsole:
c.Encoding = formatConsole c.Encoding = formatConsole
default: default:
@ -100,12 +100,12 @@ func newLogger(v *viper.Viper) *zap.Logger {
panic(err) panic(err)
} }
if v.GetBool("logger.no_disclaimer") { if v.GetBool(cfgLoggerNoDisclaimer) {
return l return l
} }
name := v.GetString("app.name") name := v.GetString(cfgApplicationName)
version := v.GetString("app.version") version := v.GetString(cfgApplicationVersion)
return l.With( return l.With(
zap.String("app_name", name), zap.String("app_name", name),

View file

@ -7,7 +7,7 @@ import (
) )
func attachMetrics(v *viper.Viper, r *mux.Router) { func attachMetrics(v *viper.Viper, r *mux.Router) {
if !v.GetBool("metrics") { if !v.GetBool(cfgEnableMetrics) {
return return
} }

View file

@ -8,7 +8,7 @@ import (
) )
func attachProfiler(v *viper.Viper, r *mux.Router) { func attachProfiler(v *viper.Viper, r *mux.Router) {
if !v.GetBool("pprof") { if !v.GetBool(cfgEnableProfiler) {
return return
} }

View file

@ -11,6 +11,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/minio/minio/neofs/pool"
"github.com/minio/minio/misc" "github.com/minio/minio/misc"
"github.com/nspcc-dev/neofs-api-go/refs" "github.com/nspcc-dev/neofs-api-go/refs"
@ -38,6 +40,41 @@ const (
defaultKeepaliveTimeout = 10 * time.Second defaultKeepaliveTimeout = 10 * time.Second
) )
const ( // settings
// Logger:
cfgLoggerLevel = "logger.level"
cfgLoggerFormat = "logger.format"
cfgLoggerTraceLevel = "logger.trace_level"
cfgLoggerNoDisclaimer = "logger.no_disclaimer"
cfgLoggerSamplingInitial = "logger.sampling.initial"
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
// KeepAlive
cfgKeepaliveTime = "keepalive.time"
cfgKeepaliveTimeout = "keepalive.timeout"
cfgKeepalivePermitWithoutStream = "keepalive.permit_without_stream"
// Timeouts
cfgConnectionTTL = "con_ttl"
cfgConnectTimeout = "connect_timeout"
cfgRequestTimeout = "request_timeout"
cfgRebalanceTimer = "rebalance_timer"
// gRPC
cfgGRPCVerbose = "verbose"
cfgGRPCPrivateKey = "key"
// Metrics / Profiler / Web
cfgEnableMetrics = "metrics"
cfgEnableProfiler = "pprof"
cfgListenAddress = "listen_address"
// Application
cfgApplicationName = "app.name"
cfgApplicationVersion = "app.version"
cfgApplicationBuildTime = "app.build_time"
)
func (empty) Read([]byte) (int, error) { return 0, io.EOF } func (empty) Read([]byte) (int, error) { return 0, io.EOF }
func fetchKey(l *zap.Logger, v *viper.Viper) *ecdsa.PrivateKey { func fetchKey(l *zap.Logger, v *viper.Viper) *ecdsa.PrivateKey {
@ -66,6 +103,30 @@ func fetchKey(l *zap.Logger, v *viper.Viper) *ecdsa.PrivateKey {
return key return key
} }
} }
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.Peer {
peers := make([]pool.Peer, 0)
for i := 0; ; i++ {
key := "peers." + strconv.Itoa(i) + "."
address := v.GetString(key + "address")
weight := v.GetFloat64(key + "weight")
if address == "" {
l.Warn("skip, empty address")
break
}
peers = append(peers, pool.Peer{
Address: address,
Weight: weight,
})
}
return peers
}
func newSettings() *viper.Viper { func newSettings() *viper.Viper {
v := viper.New() v := viper.New()
@ -78,44 +139,50 @@ func newSettings() *viper.Viper {
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError) flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
flags.SortFlags = false flags.SortFlags = false
flags.Bool("pprof", false, "enable pprof") flags.Bool(cfgEnableProfiler, false, "enable pprof")
flags.Bool("metrics", false, "enable prometheus") flags.Bool(cfgEnableMetrics, false, "enable prometheus")
help := flags.BoolP("help", "h", false, "show help") help := flags.BoolP("help", "h", false, "show help")
version := flags.BoolP("version", "v", false, "show version") version := flags.BoolP("version", "v", false, "show version")
flags.String("key", generated, `"`+generated+`" to generate key, path to private key file, hex string or wif`) flags.String(cfgGRPCPrivateKey, generated, `"`+generated+`" to generate key, path to private key file, hex string or wif`)
flags.Bool("verbose", false, "debug gRPC connections") flags.Bool(cfgGRPCVerbose, false, "debug gRPC connections")
flags.Duration("request_timeout", defaultRequestTimeout, "gRPC request timeout") flags.Duration(cfgRequestTimeout, defaultRequestTimeout, "gRPC request timeout")
flags.Duration("connect_timeout", defaultConnectTimeout, "gRPC connect timeout") flags.Duration(cfgConnectTimeout, defaultConnectTimeout, "gRPC connect timeout")
flags.Duration("rebalance_timer", defaultRebalanceTimer, "gRPC connection rebalance timer") flags.Duration(cfgRebalanceTimer, defaultRebalanceTimer, "gRPC connection rebalance timer")
ttl := flags.DurationP("conn_ttl", "t", defaultTTL, "gRPC connection time to live") ttl := flags.DurationP(cfgConnectionTTL, "t", defaultTTL, "gRPC connection time to live")
flags.String("listen_address", "0.0.0.0:8080", "S3 Gateway listen address") flags.String(cfgListenAddress, "0.0.0.0:8080", "S3 Gateway listen address")
peers := flags.StringArrayP("peers", "p", nil, "NeoFS nodes") peers := flags.StringArrayP("peers", "p", nil, "NeoFS nodes")
// set prefers: // set prefers:
<<<<<<< Updated upstream
v.Set("app.name", misc.ApplicationName) v.Set("app.name", misc.ApplicationName)
v.Set("app.version", misc.Version) v.Set("app.version", misc.Version)
v.Set("app.build_time", misc.Build) v.Set("app.build_time", misc.Build)
=======
v.Set(cfgApplicationName, "neofs-gw")
v.Set(cfgApplicationVersion, misc.Version)
v.Set(cfgApplicationBuildTime, misc.Build)
>>>>>>> Stashed changes
// set defaults: // set defaults:
// logger: // logger:
v.SetDefault("logger.level", "debug") v.SetDefault(cfgLoggerLevel, "debug")
v.SetDefault("logger.format", "console") v.SetDefault(cfgLoggerFormat, "console")
v.SetDefault("logger.trace_level", "fatal") v.SetDefault(cfgLoggerTraceLevel, "fatal")
v.SetDefault("logger.no_disclaimer", true) v.SetDefault(cfgLoggerNoDisclaimer, true)
v.SetDefault("logger.sampling.initial", 1000) v.SetDefault(cfgLoggerSamplingInitial, 1000)
v.SetDefault("logger.sampling.thereafter", 1000) v.SetDefault(cfgLoggerSamplingThereafter, 1000)
// keepalive: // keepalive:
// If set below 10s, a minimum value of 10s will be used instead. // If set below 10s, a minimum value of 10s will be used instead.
v.SetDefault("keepalive.time", defaultKeepaliveTime) v.SetDefault(cfgKeepaliveTime, defaultKeepaliveTime)
v.SetDefault("keepalive.timeout", defaultKeepaliveTimeout) v.SetDefault(cfgKeepaliveTimeout, defaultKeepaliveTimeout)
v.SetDefault("keepalive.permit_without_stream", true) v.SetDefault(cfgKeepalivePermitWithoutStream, true)
if err := v.BindPFlags(flags); err != nil { if err := v.BindPFlags(flags); err != nil {
panic(err) panic(err)

31
app.go
View file

@ -4,16 +4,15 @@ import (
"context" "context"
"time" "time"
crypto "github.com/nspcc-dev/neofs-crypto" minio "github.com/minio/minio/cmd"
"github.com/minio/minio/neofs/layer" "github.com/minio/minio/neofs/layer"
"github.com/minio/minio/neofs/pool"
"github.com/minio/minio/pkg/auth" "github.com/minio/minio/pkg/auth"
"github.com/nspcc-dev/neofs-api-go/refs" "github.com/nspcc-dev/neofs-api-go/refs"
crypto "github.com/nspcc-dev/neofs-crypto"
minio "github.com/minio/minio/cmd"
"github.com/minio/minio/neofs/pool"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc/keepalive"
) )
type ( type (
@ -45,15 +44,31 @@ func newApp(l *zap.Logger, v *viper.Viper) *App {
reqTimeout = defaultRequestTimeout reqTimeout = defaultRequestTimeout
) )
if v := v.GetDuration("connect_timeout"); v > 0 { if v := v.GetDuration(cfgConnectTimeout); v > 0 {
conTimeout = v conTimeout = v
} }
if v := v.GetDuration("request_timeout"); v > 0 { if v := v.GetDuration(cfgRequestTimeout); v > 0 {
reqTimeout = v reqTimeout = v
} }
if cli, err = pool.New(l, v, key); err != nil { poolConfig := &pool.Config{
ConnectionTTL: v.GetDuration(cfgConnectionTTL),
ConnectTimeout: v.GetDuration(cfgConnectTimeout),
RequestTimeout: v.GetDuration(cfgRequestTimeout),
Peers: fetchPeers(l, v),
Logger: l,
PrivateKey: key,
GRPCLogger: gRPCLogger(l),
GRPCVerbose: v.GetBool(cfgGRPCVerbose),
ClientParameters: keepalive.ClientParameters{},
}
if cli, err = pool.New(poolConfig); err != nil {
l.Fatal("could not prepare pool connections", l.Fatal("could not prepare pool connections",
zap.Error(err)) zap.Error(err))
} }

View file

@ -7,14 +7,14 @@ import (
"encoding/binary" "encoding/binary"
"math/rand" "math/rand"
"sort" "sort"
"strconv"
"sync" "sync"
"time" "time"
"google.golang.org/grpc/grpclog"
"github.com/nspcc-dev/neofs-api-go/service" "github.com/nspcc-dev/neofs-api-go/service"
"github.com/nspcc-dev/neofs-api-go/state" "github.com/nspcc-dev/neofs-api-go/state"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/spf13/viper"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -43,6 +43,27 @@ type (
ReBalance(ctx context.Context) ReBalance(ctx context.Context)
} }
Peer struct {
Address string
Weight float64
}
Config struct {
keepalive.ClientParameters
ConnectionTTL time.Duration
ConnectTimeout time.Duration
RequestTimeout time.Duration
Peers []Peer
GRPCVerbose bool
GRPCLogger grpclog.LoggerV2
Logger *zap.Logger
PrivateKey *ecdsa.PrivateKey
}
pool struct { pool struct {
log *zap.Logger log *zap.Logger
@ -66,50 +87,35 @@ type (
} }
) )
const (
minimumTTLInMinutes = 5
defaultTTL = minimumTTLInMinutes * time.Minute
defaultRequestTimeout = 15 * time.Second
defaultConnectTimeout = 30 * time.Second
defaultKeepaliveTime = 10 * time.Second
defaultKeepaliveTimeout = 10 * time.Second
)
var ( var (
errBootstrapping = errors.New("bootstrapping") errBootstrapping = errors.New("bootstrapping")
errEmptyConnection = errors.New("empty connection") errEmptyConnection = errors.New("empty connection")
errNoHealthyConnections = errors.New("no active connections") errNoHealthyConnections = errors.New("no active connections")
_ = New
) )
func New(l *zap.Logger, v *viper.Viper, key *ecdsa.PrivateKey) (Pool, error) { func New(cfg *Config) (Pool, error) {
p := &pool{ p := &pool{
log: l, log: cfg.Logger,
Mutex: new(sync.Mutex), Mutex: new(sync.Mutex),
keys: make([]uint32, 0), keys: make([]uint32, 0),
nodes: make([]*node, 0), nodes: make([]*node, 0),
conns: make(map[uint32][]*node), conns: make(map[uint32][]*node),
ttl: defaultTTL,
currentIdx: atomic.NewInt32(-1), currentIdx: atomic.NewInt32(-1),
// fill with defaults: ttl: cfg.ConnectionTTL,
reqTimeout: defaultRequestTimeout,
conTimeout: defaultConnectTimeout, conTimeout: cfg.ConnectTimeout,
opts: keepalive.ClientParameters{ reqTimeout: cfg.RequestTimeout,
Time: defaultKeepaliveTime, opts: cfg.ClientParameters,
Timeout: defaultKeepaliveTimeout,
PermitWithoutStream: true,
},
unhealthy: atomic.NewError(errBootstrapping), unhealthy: atomic.NewError(errBootstrapping),
} }
if cfg.GRPCVerbose {
grpclog.SetLoggerV2(cfg.GRPCLogger)
}
buf := make([]byte, 8) buf := make([]byte, 8)
if _, err := crand.Read(buf); err != nil { if _, err := crand.Read(buf); err != nil {
return nil, err return nil, err
@ -117,56 +123,28 @@ func New(l *zap.Logger, v *viper.Viper, key *ecdsa.PrivateKey) (Pool, error) {
seed := binary.BigEndian.Uint64(buf) seed := binary.BigEndian.Uint64(buf)
rand.Seed(int64(seed)) rand.Seed(int64(seed))
l.Info("used random seed", zap.Uint64("seed", seed)) cfg.Logger.Info("used random seed", zap.Uint64("seed", seed))
p.reqHealth = new(state.HealthRequest) p.reqHealth = new(state.HealthRequest)
p.reqHealth.SetTTL(service.NonForwardingTTL) p.reqHealth.SetTTL(service.NonForwardingTTL)
if err := service.SignRequestData(key, p.reqHealth); err != nil { if err := service.SignRequestData(cfg.PrivateKey, p.reqHealth); err != nil {
return nil, errors.Wrap(err, "could not sign `HealthRequest`") return nil, errors.Wrap(err, "could not sign `HealthRequest`")
} }
if val := v.GetDuration("conn_ttl"); val > 0 { for i := range cfg.Peers {
p.ttl = val if cfg.Peers[i].Address == "" {
} cfg.Logger.Warn("skip, empty address")
if val := v.GetDuration("request_timeout"); val > 0 {
p.reqTimeout = val
}
if val := v.GetDuration("connect_timeout"); val > 0 {
p.conTimeout = val
}
if val := v.GetDuration("keepalive.time"); val > 0 {
p.opts.Time = val
}
if val := v.GetDuration("keepalive.timeout"); val > 0 {
p.opts.Timeout = val
}
if v.IsSet("keepalive.permit_without_stream") {
p.opts.PermitWithoutStream = v.GetBool("keepalive.permit_without_stream")
}
for i := 0; ; i++ {
key := "peers." + strconv.Itoa(i) + "."
address := v.GetString(key + "address")
weight := v.GetFloat64(key + "weight")
if address == "" {
l.Warn("skip, empty address")
break break
} }
p.nodes = append(p.nodes, &node{ p.nodes = append(p.nodes, &node{
index: int32(i), index: int32(i),
address: address, address: cfg.Peers[i].Address,
weight: uint32(weight * 100), weight: uint32(cfg.Peers[i].Weight * 100),
}) })
l.Info("add new peer", cfg.Logger.Info("add new peer",
zap.String("address", p.nodes[i].address), zap.String("address", p.nodes[i].address),
zap.Uint32("weight", p.nodes[i].weight)) zap.Uint32("weight", p.nodes[i].weight))
} }