frostfs-s3-gw/cmd/gate/app.go

210 lines
4 KiB
Go
Raw Normal View History

2020-07-06 09:18:16 +00:00
package main
import (
"context"
2020-07-12 23:00:47 +00:00
"net"
"net/http"
2020-07-06 09:18:16 +00:00
"time"
2020-07-12 23:00:47 +00:00
"github.com/gorilla/mux"
2020-07-09 09:23:46 +00:00
minio "github.com/minio/minio/legacy"
2020-07-06 09:18:16 +00:00
"github.com/minio/minio/neofs/layer"
2020-07-07 11:25:13 +00:00
"github.com/minio/minio/neofs/pool"
2020-07-06 09:18:16 +00:00
"github.com/minio/minio/pkg/auth"
"github.com/nspcc-dev/neofs-api-go/refs"
2020-07-07 11:25:13 +00:00
crypto "github.com/nspcc-dev/neofs-crypto"
2020-07-06 09:18:16 +00:00
"github.com/spf13/viper"
"go.uber.org/zap"
2020-07-07 11:25:13 +00:00
"google.golang.org/grpc/keepalive"
2020-07-06 09:18:16 +00:00
)
type (
App struct {
cli pool.Pool
log *zap.Logger
2020-07-12 23:00:47 +00:00
web *mux.Router
2020-07-06 09:18:16 +00:00
cfg *viper.Viper
obj minio.ObjectLayer
conTimeout time.Duration
reqTimeout time.Duration
2020-07-12 23:00:47 +00:00
reBalance time.Duration
2020-07-06 09:18:16 +00:00
webDone chan struct{}
wrkDone chan struct{}
}
)
func newApp(l *zap.Logger, v *viper.Viper) *App {
var (
err error
wif string
cli pool.Pool
uid refs.OwnerID
obj minio.ObjectLayer
key = fetchKey(l, v)
2020-07-12 23:00:47 +00:00
reBalance = defaultRebalanceTimer
2020-07-06 09:18:16 +00:00
conTimeout = defaultConnectTimeout
reqTimeout = defaultRequestTimeout
)
2020-07-07 11:25:13 +00:00
if v := v.GetDuration(cfgConnectTimeout); v > 0 {
2020-07-06 09:18:16 +00:00
conTimeout = v
}
2020-07-07 11:25:13 +00:00
if v := v.GetDuration(cfgRequestTimeout); v > 0 {
2020-07-06 09:18:16 +00:00
reqTimeout = v
}
2020-07-07 11:25:13 +00:00
poolConfig := &pool.Config{
ConnectionTTL: v.GetDuration(cfgConnectionTTL),
ConnectTimeout: v.GetDuration(cfgConnectTimeout),
RequestTimeout: v.GetDuration(cfgRequestTimeout),
Peers: fetchPeers(l, v),
Logger: l,
PrivateKey: key,
GRPCLogger: gRPCLogger(l),
GRPCVerbose: v.GetBool(cfgGRPCVerbose),
ClientParameters: keepalive.ClientParameters{},
}
2020-07-12 23:00:47 +00:00
if v := v.GetDuration(cfgRebalanceTimer); v > 0 {
reBalance = v
}
2020-07-07 11:25:13 +00:00
if cli, err = pool.New(poolConfig); err != nil {
2020-07-06 09:18:16 +00:00
l.Fatal("could not prepare pool connections",
zap.Error(err))
}
{ // should establish connection with NeoFS Storage Nodes
ctx, cancel := context.WithTimeout(context.Background(), conTimeout)
defer cancel()
cli.ReBalance(ctx)
if _, err = cli.GetConnection(ctx); err != nil {
l.Fatal("could not establish connection",
zap.Error(err))
}
}
{ // should prepare object layer
if uid, err = refs.NewOwnerID(&key.PublicKey); err != nil {
l.Fatal("could not fetch OwnerID",
zap.Error(err))
}
if wif, err = crypto.WIFEncode(key); err != nil {
l.Fatal("could not encode key to WIF",
zap.Error(err))
}
if obj, err = layer.NewLayer(cli, auth.Credentials{AccessKey: uid.String(), SecretKey: wif}); err != nil {
l.Fatal("could not prepare ObjectLayer",
zap.Error(err))
}
_ = obj
}
return &App{
cli: cli,
log: l,
cfg: v,
2020-07-12 23:00:47 +00:00
web: minio.NewRouter(obj),
2020-07-06 09:18:16 +00:00
webDone: make(chan struct{}, 1),
wrkDone: make(chan struct{}, 1),
2020-07-12 23:00:47 +00:00
reBalance: reBalance,
2020-07-06 09:18:16 +00:00
conTimeout: conTimeout,
reqTimeout: reqTimeout,
}
}
2020-07-12 23:00:47 +00:00
func (a *App) Wait() {
2020-07-06 09:18:16 +00:00
a.log.Info("application started")
2020-07-12 23:00:47 +00:00
2020-07-06 09:18:16 +00:00
select {
case <-a.wrkDone: // wait for worker is stopped
<-a.webDone
case <-a.webDone: // wait for web-server is stopped
<-a.wrkDone
}
2020-07-12 23:00:47 +00:00
a.log.Info("application finished")
2020-07-06 09:18:16 +00:00
}
func (a *App) Server(ctx context.Context) {
2020-07-12 23:00:47 +00:00
var (
err error
lis net.Listener
lic net.ListenConfig
srv = http.Server{Handler: a.web}
addr = a.cfg.GetString(cfgListenAddress)
)
if lis, err = lic.Listen(ctx, "tcp", addr); err != nil {
a.log.Fatal("could not prepare listener",
zap.Error(err))
}
// Attach app-specific routes:
attachHealthy(a.web, a.cli)
attachMetrics(a.cfg, a.log, a.web)
attachProfiler(a.cfg, a.log, a.web)
go func() {
a.log.Info("starting server",
zap.String("bind", addr))
if err = srv.Serve(lis); err != nil && err != http.ErrServerClosed {
a.log.Warn("listen and serve",
zap.Error(err))
}
2020-07-06 09:18:16 +00:00
}()
2020-07-12 23:00:47 +00:00
<-ctx.Done()
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()
a.log.Info("stopping server",
zap.Error(srv.Shutdown(ctx)))
close(a.webDone)
2020-07-06 09:18:16 +00:00
}
func (a *App) Worker(ctx context.Context) {
2020-07-12 23:00:47 +00:00
tick := time.NewTimer(a.reBalance)
loop:
for {
select {
case <-ctx.Done():
break loop
case <-tick.C:
ctx, cancel := context.WithTimeout(ctx, a.conTimeout)
a.cli.ReBalance(ctx)
cancel()
tick.Reset(a.reBalance)
}
}
tick.Stop()
a.cli.Close()
a.log.Info("stopping worker")
close(a.wrkDone)
2020-07-06 09:18:16 +00:00
}