From f36a9119147e10fef21dc487e6c571b9c4956afc Mon Sep 17 00:00:00 2001 From: Evgeniy Kulikov Date: Mon, 6 Jul 2020 12:18:16 +0300 Subject: [PATCH] Prepare base application --- app-graceful.go | 24 ++++++++ app-healthy.go | 34 +++++++++++ app-logger.go | 143 +++++++++++++++++++++++++++++++++++++++++++++ app-metrics.go | 15 +++++ app-profiler.go | 20 +++++++ app-settings.go | 152 ++++++++++++++++++++++++++++++++++++++++++++++++ app.go | 130 +++++++++++++++++++++++++++++++++++++++++ go.mod | 3 +- go.sum | 4 +- main.go | 47 ++++----------- 10 files changed, 534 insertions(+), 38 deletions(-) create mode 100644 app-graceful.go create mode 100644 app-healthy.go create mode 100644 app-logger.go create mode 100644 app-metrics.go create mode 100644 app-profiler.go create mode 100644 app-settings.go create mode 100644 app.go diff --git a/app-graceful.go b/app-graceful.go new file mode 100644 index 00000000..8469d628 --- /dev/null +++ b/app-graceful.go @@ -0,0 +1,24 @@ +package main + +import ( + "context" + "os" + "os/signal" + "syscall" + + "go.uber.org/zap" +) + +// newGracefulContext returns graceful context +func newGracefulContext(l *zap.Logger) context.Context { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + sig := <-ch + l.Info("received signal", + zap.String("signal", sig.String())) + cancel() + }() + return ctx +} diff --git a/app-healthy.go b/app-healthy.go new file mode 100644 index 00000000..edba9c63 --- /dev/null +++ b/app-healthy.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + "net/http" + + "github.com/gorilla/mux" + "go.uber.org/atomic" +) + +const ( + healthyState = "NeoFS S3 Gateway is " + // defaultContentType = "text/plain; charset=utf-8" +) + +func attachHealthy(r *mux.Router, e *atomic.Error) { + r.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = fmt.Fprintln(w, healthyState+"ready") + }) + + r.HandleFunc("/-/healthy", func(w http.ResponseWriter, r *http.Request) { + code := http.StatusOK + msg := "healthy" + + if err := e.Load(); err != nil { + msg = "unhealthy: " + err.Error() + code = http.StatusBadRequest + } + + w.WriteHeader(code) + _, _ = fmt.Fprintln(w, healthyState+msg) + }) +} diff --git a/app-logger.go b/app-logger.go new file mode 100644 index 00000000..3bb969e5 --- /dev/null +++ b/app-logger.go @@ -0,0 +1,143 @@ +package main + +import ( + "strings" + + "google.golang.org/grpc/grpclog" + + "github.com/spf13/viper" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type ( + zapLogger struct { + zapcore.Core + log *zap.SugaredLogger + } + + logger interface { + grpclog.LoggerV2 + Println(v ...interface{}) + } +) + +const ( + formatJSON = "json" + formatConsole = "console" + + defaultSamplingInitial = 100 + defaultSamplingThereafter = 100 +) + +func gRPCLogger(l *zap.Logger) logger { + log := l.WithOptions( + // skip gRPCLog + zapLogger in caller + zap.AddCallerSkip(2)) + return &zapLogger{ + Core: log.Core(), + log: log.Sugar(), + } +} + +func safeLevel(lvl string) zap.AtomicLevel { + switch strings.ToLower(lvl) { + case "debug": + return zap.NewAtomicLevelAt(zap.DebugLevel) + case "warn": + return zap.NewAtomicLevelAt(zap.WarnLevel) + case "error": + return zap.NewAtomicLevelAt(zap.ErrorLevel) + case "fatal": + return zap.NewAtomicLevelAt(zap.FatalLevel) + case "panic": + return zap.NewAtomicLevelAt(zap.PanicLevel) + default: + return zap.NewAtomicLevelAt(zap.InfoLevel) + } +} + +func newLogger(v *viper.Viper) *zap.Logger { + c := zap.NewProductionConfig() + + c.OutputPaths = []string{"stdout"} + c.ErrorOutputPaths = []string{"stdout"} + + if v.IsSet("logger.sampling") { + c.Sampling = &zap.SamplingConfig{ + Initial: defaultSamplingInitial, + Thereafter: defaultSamplingThereafter, + } + + if val := v.GetInt("logger.sampling.initial"); val > 0 { + c.Sampling.Initial = val + } + + if val := v.GetInt("logger.sampling.thereafter"); val > 0 { + c.Sampling.Thereafter = val + } + } + + // logger level + c.Level = safeLevel(v.GetString("logger.level")) + traceLvl := safeLevel(v.GetString("logger.trace_level")) + + // logger format + switch f := v.GetString("logger.format"); strings.ToLower(f) { + case formatConsole: + c.Encoding = formatConsole + default: + c.Encoding = formatJSON + } + + // logger time + c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + + l, err := c.Build( + // enable trace only for current log-level + zap.AddStacktrace(traceLvl)) + if err != nil { + panic(err) + } + + if v.GetBool("logger.no_disclaimer") { + return l + } + + name := v.GetString("app.name") + version := v.GetString("app.version") + + return l.With( + zap.String("app_name", name), + zap.String("app_version", version)) +} + +func (z *zapLogger) Info(args ...interface{}) { z.log.Info(args...) } + +func (z *zapLogger) Infoln(args ...interface{}) { z.log.Info(args...) } + +func (z *zapLogger) Infof(format string, args ...interface{}) { z.log.Infof(format, args...) } + +func (z *zapLogger) Println(args ...interface{}) { z.log.Info(args...) } + +func (z *zapLogger) Printf(format string, args ...interface{}) { z.log.Infof(format, args...) } + +func (z *zapLogger) Warning(args ...interface{}) { z.log.Warn(args...) } + +func (z *zapLogger) Warningln(args ...interface{}) { z.log.Warn(args...) } + +func (z *zapLogger) Warningf(format string, args ...interface{}) { z.log.Warnf(format, args...) } + +func (z *zapLogger) Error(args ...interface{}) { z.log.Error(args...) } + +func (z *zapLogger) Errorln(args ...interface{}) { z.log.Error(args...) } + +func (z *zapLogger) Errorf(format string, args ...interface{}) { z.log.Errorf(format, args...) } + +func (z *zapLogger) Fatal(args ...interface{}) { z.log.Fatal(args...) } + +func (z *zapLogger) Fatalln(args ...interface{}) { z.log.Fatal(args...) } + +func (z *zapLogger) Fatalf(format string, args ...interface{}) { z.Fatalf(format, args...) } + +func (z *zapLogger) V(int) bool { return z.Enabled(zapcore.DebugLevel) } diff --git a/app-metrics.go b/app-metrics.go new file mode 100644 index 00000000..d3bc35c7 --- /dev/null +++ b/app-metrics.go @@ -0,0 +1,15 @@ +package main + +import ( + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/viper" +) + +func attachMetrics(v *viper.Viper, r *mux.Router) { + if !v.GetBool("metrics") { + return + } + + r.Handle("/metrics", promhttp.Handler()) +} diff --git a/app-profiler.go b/app-profiler.go new file mode 100644 index 00000000..2dc3c93a --- /dev/null +++ b/app-profiler.go @@ -0,0 +1,20 @@ +package main + +import ( + "net/http/pprof" + + "github.com/gorilla/mux" + "github.com/spf13/viper" +) + +func attachProfiler(v *viper.Viper, r *mux.Router) { + if !v.GetBool("pprof") { + return + } + + r.HandleFunc("/debug/pprof/", pprof.Index) + r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + r.HandleFunc("/debug/pprof/profile", pprof.Profile) + r.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + r.HandleFunc("/debug/pprof/trace", pprof.Trace) +} diff --git a/app-settings.go b/app-settings.go new file mode 100644 index 00000000..9bff9ddd --- /dev/null +++ b/app-settings.go @@ -0,0 +1,152 @@ +package main + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "fmt" + "io" + "os" + "strconv" + "strings" + "time" + + "github.com/minio/minio/misc" + + "github.com/nspcc-dev/neofs-api-go/refs" + crypto "github.com/nspcc-dev/neofs-crypto" + "github.com/spf13/pflag" + "github.com/spf13/viper" + "go.uber.org/zap" +) + +type empty int + +const ( + devNull = empty(0) + generated = "generated" + + minimumTTLInMinutes = 5 + + defaultTTL = minimumTTLInMinutes * time.Minute + + defaultRebalanceTimer = 15 * time.Second + defaultRequestTimeout = 15 * time.Second + defaultConnectTimeout = 30 * time.Second + + defaultKeepaliveTime = 10 * time.Second + defaultKeepaliveTimeout = 10 * time.Second +) + +func (empty) Read([]byte) (int, error) { return 0, io.EOF } + +func fetchKey(l *zap.Logger, v *viper.Viper) *ecdsa.PrivateKey { + switch val := v.GetString("key"); val { + case generated: + key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + l.Fatal("could not generate private key", zap.Error(err)) + } + + id, err := refs.NewOwnerID(&key.PublicKey) + l.Info("generate new key", + zap.Stringer("key", id), + zap.Error(err)) + + return key + + default: + key, err := crypto.LoadPrivateKey(val) + if err != nil { + l.Fatal("could not load private key", + zap.String("key", v.GetString("key")), + zap.Error(err)) + } + + return key + } +} +func newSettings() *viper.Viper { + v := viper.New() + + v.AutomaticEnv() + v.SetEnvPrefix("S3") + v.SetConfigType("yaml") + v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + + // flags setup: + flags := pflag.NewFlagSet("commandline", pflag.ExitOnError) + flags.SortFlags = false + + flags.Bool("pprof", false, "enable pprof") + flags.Bool("metrics", false, "enable prometheus") + + help := flags.BoolP("help", "h", false, "show help") + version := flags.BoolP("version", "v", false, "show version") + + flags.String("key", generated, `"`+generated+`" to generate key, path to private key file, hex string or wif`) + + flags.Bool("verbose", false, "debug gRPC connections") + flags.Duration("request_timeout", defaultRequestTimeout, "gRPC request timeout") + flags.Duration("connect_timeout", defaultConnectTimeout, "gRPC connect timeout") + flags.Duration("rebalance_timer", defaultRebalanceTimer, "gRPC connection rebalance timer") + + ttl := flags.DurationP("conn_ttl", "t", defaultTTL, "gRPC connection time to live") + + flags.String("listen_address", "0.0.0.0:8080", "S3 Gateway listen address") + peers := flags.StringArrayP("peers", "p", nil, "NeoFS nodes") + + // set prefers: + v.Set("app.name", "neofs-gw") + v.Set("app.version", misc.Version) + v.Set("app.build_time", misc.Build) + + // set defaults: + + // logger: + v.SetDefault("logger.level", "debug") + v.SetDefault("logger.format", "console") + v.SetDefault("logger.trace_level", "fatal") + v.SetDefault("logger.no_disclaimer", true) + v.SetDefault("logger.sampling.initial", 1000) + v.SetDefault("logger.sampling.thereafter", 1000) + + // keepalive: + // If set below 10s, a minimum value of 10s will be used instead. + v.SetDefault("keepalive.time", defaultKeepaliveTime) + v.SetDefault("keepalive.timeout", defaultKeepaliveTimeout) + v.SetDefault("keepalive.permit_without_stream", true) + + if err := v.BindPFlags(flags); err != nil { + panic(err) + } + + if err := v.ReadConfig(devNull); err != nil { + panic(err) + } + + if err := flags.Parse(os.Args); err != nil { + panic(err) + } + + switch { + case help != nil && *help: + fmt.Printf("NeoFS S3 Gateway %s (%s)\n", misc.Version, misc.Build) + flags.PrintDefaults() + os.Exit(0) + case version != nil && *version: + fmt.Printf("NeoFS S3 Gateway %s (%s)\n", misc.Version, misc.Build) + os.Exit(0) + case ttl != nil && ttl.Minutes() < minimumTTLInMinutes: + fmt.Printf("connection ttl should not be less than %s", defaultTTL) + } + + if peers != nil && len(*peers) > 0 { + for i := range *peers { + v.SetDefault("peers."+strconv.Itoa(i)+".address", (*peers)[i]) + v.SetDefault("peers."+strconv.Itoa(i)+".weight", 1) + } + } + + return v +} diff --git a/app.go b/app.go new file mode 100644 index 00000000..58d59c89 --- /dev/null +++ b/app.go @@ -0,0 +1,130 @@ +package main + +import ( + "context" + "time" + + crypto "github.com/nspcc-dev/neofs-crypto" + + "github.com/minio/minio/neofs/layer" + "github.com/minio/minio/pkg/auth" + "github.com/nspcc-dev/neofs-api-go/refs" + + minio "github.com/minio/minio/cmd" + "github.com/minio/minio/neofs/pool" + "github.com/spf13/viper" + "go.uber.org/zap" +) + +type ( + App struct { + cli pool.Pool + log *zap.Logger + cfg *viper.Viper + obj minio.ObjectLayer + + conTimeout time.Duration + reqTimeout time.Duration + + webDone chan struct{} + wrkDone chan struct{} + } +) + +func newApp(l *zap.Logger, v *viper.Viper) *App { + var ( + err error + wif string + cli pool.Pool + uid refs.OwnerID + obj minio.ObjectLayer + + key = fetchKey(l, v) + + conTimeout = defaultConnectTimeout + reqTimeout = defaultRequestTimeout + ) + + if v := v.GetDuration("connect_timeout"); v > 0 { + conTimeout = v + } + + if v := v.GetDuration("request_timeout"); v > 0 { + reqTimeout = v + } + + if cli, err = pool.New(l, v, key); err != nil { + l.Fatal("could not prepare pool connections", + zap.Error(err)) + } + + { // should establish connection with NeoFS Storage Nodes + ctx, cancel := context.WithTimeout(context.Background(), conTimeout) + defer cancel() + + cli.ReBalance(ctx) + + if _, err = cli.GetConnection(ctx); err != nil { + l.Fatal("could not establish connection", + zap.Error(err)) + } + } + + { // should prepare object layer + if uid, err = refs.NewOwnerID(&key.PublicKey); err != nil { + l.Fatal("could not fetch OwnerID", + zap.Error(err)) + } + + if wif, err = crypto.WIFEncode(key); err != nil { + l.Fatal("could not encode key to WIF", + zap.Error(err)) + } + + if obj, err = layer.NewLayer(cli, auth.Credentials{AccessKey: uid.String(), SecretKey: wif}); err != nil { + l.Fatal("could not prepare ObjectLayer", + zap.Error(err)) + } + + _ = obj + } + + return &App{ + cli: cli, + log: l, + cfg: v, + + webDone: make(chan struct{}, 1), + wrkDone: make(chan struct{}, 1), + + conTimeout: conTimeout, + reqTimeout: reqTimeout, + } +} + +func (a *App) Wait(ctx context.Context) { + a.log.Info("application started") + + select { + case <-a.wrkDone: // wait for worker is stopped + <-a.webDone + case <-a.webDone: // wait for web-server is stopped + <-a.wrkDone + } +} + +func (a *App) Server(ctx context.Context) { + defer func() { + <-ctx.Done() + a.log.Info("stopping server") + close(a.webDone) + }() +} + +func (a *App) Worker(ctx context.Context) { + defer func() { + <-ctx.Done() + a.log.Info("stopping worker") + close(a.wrkDone) + }() +} diff --git a/go.mod b/go.mod index 164aebc6..fc2c6e00 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,7 @@ require ( github.com/hashicorp/vault/api v1.0.4 github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf github.com/json-iterator/go v1.1.10 - github.com/klauspost/compress v1.10.3 + github.com/klauspost/compress v1.10.4 github.com/klauspost/cpuid v1.3.0 github.com/klauspost/pgzip v1.2.1 github.com/klauspost/readahead v1.3.1 @@ -88,6 +88,7 @@ require ( github.com/secure-io/sio-go v0.3.0 github.com/shirou/gopsutil v2.20.3-0.20200314133625-53cec6b37e6a+incompatible github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 // indirect + github.com/spf13/pflag v1.0.3 github.com/spf13/viper v1.7.0 github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94 github.com/stretchr/testify v1.5.1 diff --git a/go.sum b/go.sum index d5e44574..c3a0832e 100644 --- a/go.sum +++ b/go.sum @@ -300,8 +300,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.4/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.1/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.10.3 h1:OP96hzwJVBIHYU52pVTI6CczrxPvrGfgqF9N5eTO0Q8= -github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.10.4 h1:jFzIFaf586tquEB5EhzQG0HwGNSlgAJpG53G6Ss11wc= +github.com/klauspost/compress v1.10.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/cpuid v1.2.2 h1:1xAgYebNnsb9LKCdLOvFWtAxGU/33mjJtyOVbmUa0Us= github.com/klauspost/cpuid v1.2.2/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= diff --git a/main.go b/main.go index e66a243c..46f3d944 100644 --- a/main.go +++ b/main.go @@ -1,38 +1,15 @@ -// +build go1.13 - -/* - * MinIO Cloud Storage, (C) 2016, 2017, 2018 MinIO, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Below main package has canonical imports for 'go get' and 'go build' - * to work with all other clones of github.com/minio/minio repository. For - * more information refer https://golang.org/doc/go1.4#canonicalimports - */ - -package main // import "github.com/minio/minio" - -import ( - "fmt" - - "github.com/minio/minio/misc" - // Import gateway - // _ "github.com/minio/minio/cmd/gateway" -) +package main func main() { - fmt.Println(misc.Build) - fmt.Println(misc.Version) + var ( + v = newSettings() + l = newLogger(v) + a = newApp(l, v) + g = newGracefulContext(l) + ) + + go a.Server(g) + go a.Worker(g) + + a.Wait(g) }