diff --git a/cmd/gate/app-healthy.go b/cmd/gate/app-healthy.go index edba9c6..fd42948 100644 --- a/cmd/gate/app-healthy.go +++ b/cmd/gate/app-healthy.go @@ -5,15 +5,18 @@ import ( "net/http" "github.com/gorilla/mux" - "go.uber.org/atomic" ) +type Healthy interface { + Status() error +} + const ( healthyState = "NeoFS S3 Gateway is " // defaultContentType = "text/plain; charset=utf-8" ) -func attachHealthy(r *mux.Router, e *atomic.Error) { +func attachHealthy(r *mux.Router, h Healthy) { r.HandleFunc("/-/ready", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = fmt.Fprintln(w, healthyState+"ready") @@ -23,7 +26,7 @@ func attachHealthy(r *mux.Router, e *atomic.Error) { code := http.StatusOK msg := "healthy" - if err := e.Load(); err != nil { + if err := h.Status(); err != nil { msg = "unhealthy: " + err.Error() code = http.StatusBadRequest } diff --git a/cmd/gate/app-metrics.go b/cmd/gate/app-metrics.go index fa31646..c2e34f8 100644 --- a/cmd/gate/app-metrics.go +++ b/cmd/gate/app-metrics.go @@ -4,12 +4,14 @@ import ( "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/viper" + "go.uber.org/zap" ) -func attachMetrics(v *viper.Viper, r *mux.Router) { +func attachMetrics(v *viper.Viper, l *zap.Logger, r *mux.Router) { if !v.GetBool(cfgEnableMetrics) { return } + l.Info("enable metrics") r.Handle("/metrics", promhttp.Handler()) } diff --git a/cmd/gate/app-profiler.go b/cmd/gate/app-profiler.go index 1da3047..be5c2d0 100644 --- a/cmd/gate/app-profiler.go +++ b/cmd/gate/app-profiler.go @@ -5,13 +5,16 @@ import ( "github.com/gorilla/mux" "github.com/spf13/viper" + "go.uber.org/zap" ) -func attachProfiler(v *viper.Viper, r *mux.Router) { +func attachProfiler(v *viper.Viper, l *zap.Logger, r *mux.Router) { if !v.GetBool(cfgEnableProfiler) { return } + l.Info("enable profiler") + r.HandleFunc("/debug/pprof/", pprof.Index) r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) r.HandleFunc("/debug/pprof/profile", pprof.Profile) diff --git a/cmd/gate/app-settings.go b/cmd/gate/app-settings.go index 81a3c94..8366707 100644 --- a/cmd/gate/app-settings.go +++ b/cmd/gate/app-settings.go @@ -32,9 +32,10 @@ const ( defaultTTL = minimumTTLInMinutes * time.Minute - defaultRebalanceTimer = 15 * time.Second - defaultRequestTimeout = 15 * time.Second - defaultConnectTimeout = 30 * time.Second + defaultRebalanceTimer = 15 * time.Second + defaultRequestTimeout = 15 * time.Second + defaultConnectTimeout = 30 * time.Second + defaultShutdownTimeout = 15 * time.Second defaultKeepaliveTime = 10 * time.Second defaultKeepaliveTimeout = 10 * time.Second diff --git a/cmd/gate/app.go b/cmd/gate/app.go index ec6ebe4..c75af62 100644 --- a/cmd/gate/app.go +++ b/cmd/gate/app.go @@ -2,8 +2,11 @@ package main import ( "context" + "net" + "net/http" "time" + "github.com/gorilla/mux" minio "github.com/minio/minio/legacy" "github.com/minio/minio/neofs/layer" "github.com/minio/minio/neofs/pool" @@ -19,12 +22,15 @@ type ( App struct { cli pool.Pool log *zap.Logger + web *mux.Router cfg *viper.Viper obj minio.ObjectLayer conTimeout time.Duration reqTimeout time.Duration + reBalance time.Duration + webDone chan struct{} wrkDone chan struct{} } @@ -40,6 +46,8 @@ func newApp(l *zap.Logger, v *viper.Viper) *App { key = fetchKey(l, v) + reBalance = defaultRebalanceTimer + conTimeout = defaultConnectTimeout reqTimeout = defaultRequestTimeout ) @@ -68,6 +76,10 @@ func newApp(l *zap.Logger, v *viper.Viper) *App { ClientParameters: keepalive.ClientParameters{}, } + if v := v.GetDuration(cfgRebalanceTimer); v > 0 { + reBalance = v + } + if cli, err = pool.New(poolConfig); err != nil { l.Fatal("could not prepare pool connections", zap.Error(err)) @@ -108,38 +120,90 @@ func newApp(l *zap.Logger, v *viper.Viper) *App { cli: cli, log: l, cfg: v, + web: minio.NewRouter(obj), webDone: make(chan struct{}, 1), wrkDone: make(chan struct{}, 1), + reBalance: reBalance, + conTimeout: conTimeout, reqTimeout: reqTimeout, } } -func (a *App) Wait(ctx context.Context) { - defer a.log.Info("application finished") +func (a *App) Wait() { a.log.Info("application started") + select { case <-a.wrkDone: // wait for worker is stopped <-a.webDone case <-a.webDone: // wait for web-server is stopped <-a.wrkDone } + + a.log.Info("application finished") } func (a *App) Server(ctx context.Context) { - defer func() { - <-ctx.Done() - a.log.Info("stopping server") - close(a.webDone) + var ( + err error + lis net.Listener + lic net.ListenConfig + srv = http.Server{Handler: a.web} + addr = a.cfg.GetString(cfgListenAddress) + ) + + if lis, err = lic.Listen(ctx, "tcp", addr); err != nil { + a.log.Fatal("could not prepare listener", + zap.Error(err)) + } + + // Attach app-specific routes: + attachHealthy(a.web, a.cli) + attachMetrics(a.cfg, a.log, a.web) + attachProfiler(a.cfg, a.log, a.web) + + go func() { + a.log.Info("starting server", + zap.String("bind", addr)) + + if err = srv.Serve(lis); err != nil && err != http.ErrServerClosed { + a.log.Warn("listen and serve", + zap.Error(err)) + } }() + + <-ctx.Done() + + ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) + defer cancel() + + a.log.Info("stopping server", + zap.Error(srv.Shutdown(ctx))) + + close(a.webDone) } func (a *App) Worker(ctx context.Context) { - defer func() { - <-ctx.Done() - a.log.Info("stopping worker") - close(a.wrkDone) - }() + tick := time.NewTimer(a.reBalance) + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-tick.C: + ctx, cancel := context.WithTimeout(ctx, a.conTimeout) + a.cli.ReBalance(ctx) + cancel() + + tick.Reset(a.reBalance) + } + } + + tick.Stop() + a.cli.Close() + a.log.Info("stopping worker") + close(a.wrkDone) } diff --git a/cmd/gate/main.go b/cmd/gate/main.go index 46f3d94..a2b06b1 100644 --- a/cmd/gate/main.go +++ b/cmd/gate/main.go @@ -11,5 +11,5 @@ func main() { go a.Server(g) go a.Worker(g) - a.Wait(g) + a.Wait() } diff --git a/legacy/neofs-router.go b/legacy/neofs-router.go new file mode 100644 index 0000000..b2c16cf --- /dev/null +++ b/legacy/neofs-router.go @@ -0,0 +1,44 @@ +package legacy + +import ( + "github.com/gorilla/mux" +) + +func NewRouter(obj ObjectLayer) *mux.Router { + // Initialize router. `SkipClean(true)` stops gorilla/mux from + // normalizing URL path minio/minio#3256 + // avoid URL path encoding minio/minio#8950 + router := mux.NewRouter().SkipClean(true).UseEncodedPath() + + // Add healthcheck router + registerHealthCheckRouter(router) + + // Add server metrics router + registerMetricsRouter(router) + + // Add API router. + registerAPIRouter(router, true, true) + + layer := NewGatewayLayerWithLocker(obj) + + // Once endpoints are finalized, initialize the new object api in safe mode. + globalObjLayerMutex.Lock() + globalSafeMode = true + globalObjectAPI = layer + globalObjLayerMutex.Unlock() + + // Calls all New() for all sub-systems. + newAllSubsystems() + + // Verify if object layer supports + // - encryption + // - compression + verifyObjectLayerFeatures("gateway NeoFS", layer) + + // Disable safe mode operation, after all initialization is over. + globalObjLayerMutex.Lock() + globalSafeMode = false + globalObjLayerMutex.Unlock() + + return router +} diff --git a/neofs/pool/pool.go b/neofs/pool/pool.go index 7bd919b..b6d2a0f 100644 --- a/neofs/pool/pool.go +++ b/neofs/pool/pool.go @@ -182,7 +182,7 @@ func (p *pool) ReBalance(ctx context.Context) { keys := make(map[uint32]struct{}) - p.log.Info("re-balancing connections") + p.log.Debug("re-balancing connections") for i := range p.nodes { var (