From 11965deb4188e78bdbe92b5d4b17e41620f2a86d Mon Sep 17 00:00:00 2001
From: Pavel Pogodaev
Date: Tue, 26 Mar 2024 14:34:20 +0300
Subject: [PATCH] [#100] server auto re-binding
Signed-off-by: Pavel Pogodaev
---
CHANGELOG.md | 1 +
cmd/http-gw/app.go | 132 +++++++++++++++++++++++++++++++------
cmd/http-gw/settings.go | 21 +++++-
config/config.env | 3 +
config/config.yaml | 1 +
docs/gate-configuration.md | 2 +
internal/logs/logs.go | 4 ++
7 files changed, 142 insertions(+), 22 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6da67f5..105ac41 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,7 @@ This document outlines major changes between releases.
### Added
- Tree pool traversal limit (#92)
+- Add new `reconnect_interval` config param (#100)
### Update from 0.28.0
See new `frostfs.tree_pool_max_attempts` config parameter.
diff --git a/cmd/http-gw/app.go b/cmd/http-gw/app.go
index 4532520..2a20d86 100644
--- a/cmd/http-gw/app.go
+++ b/cmd/http-gw/app.go
@@ -2,6 +2,7 @@ package main
import (
"context"
+ "errors"
"fmt"
"net/http"
"os"
@@ -57,7 +58,10 @@ type (
metrics *gateMetrics
services []*metrics.Service
settings *appSettings
- servers []Server
+
+ servers []Server
+ unbindServers []ServerInfo
+ mu sync.RWMutex
}
// App is an interface for the main gateway function.
@@ -78,6 +82,8 @@ type (
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
appSettings struct {
+ reconnectInterval time.Duration
+
mu sync.RWMutex
defaultTimestamp bool
zipCompression bool
@@ -199,8 +205,9 @@ func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
}
func (a *app) initAppSettings() {
- a.settings = &appSettings{}
-
+ a.settings = &appSettings{
+ reconnectInterval: fetchReconnectInterval(a.cfg),
+ }
a.updateSettings()
}
@@ -399,16 +406,22 @@ func (a *app) Serve() {
a.startServices()
a.initServers(a.ctx)
- for i := range a.servers {
+ servs := a.getServers()
+
+ for i := range servs {
go func(i int) {
- a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address()))
- if err := a.webServer.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
- a.metrics.MarkUnhealthy(a.servers[i].Address())
+ a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
+ if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
+ a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
}
}(i)
}
+ if len(a.unbindServers) != 0 {
+ a.scheduleReconnect(a.ctx, a.webServer)
+ }
+
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
@@ -598,7 +611,7 @@ func (a *app) AppParams() *utils.AppParams {
}
func (a *app) initServers(ctx context.Context) {
- serversInfo := fetchServers(a.cfg)
+ serversInfo := fetchServers(a.cfg, a.log)
a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo {
@@ -608,6 +621,7 @@ func (a *app) initServers(ctx context.Context) {
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
+ a.unbindServers = append(a.unbindServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
continue
@@ -624,21 +638,24 @@ func (a *app) initServers(ctx context.Context) {
}
func (a *app) updateServers() error {
- serversInfo := fetchServers(a.cfg)
+ serversInfo := fetchServers(a.cfg, a.log)
+
+ a.mu.Lock()
+ defer a.mu.Unlock()
var found bool
for _, serverInfo := range serversInfo {
- index := a.serverIndex(serverInfo.Address)
- if index == -1 {
- continue
- }
-
- if serverInfo.TLS.Enabled {
- if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
- return fmt.Errorf("failed to update tls certs: %w", err)
+ ser := a.getServer(serverInfo.Address)
+ if ser != nil {
+ if serverInfo.TLS.Enabled {
+ if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
+ return fmt.Errorf("failed to update tls certs: %w", err)
+ }
+ found = true
}
+ } else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
+ found = true
}
- found = true
}
if !found {
@@ -648,13 +665,29 @@ func (a *app) updateServers() error {
return nil
}
-func (a *app) serverIndex(address string) int {
+func (a *app) getServers() []Server {
+ a.mu.RLock()
+ defer a.mu.RUnlock()
+ return a.servers
+}
+
+func (a *app) getServer(address string) Server {
for i := range a.servers {
if a.servers[i].Address() == address {
- return i
+ return a.servers[i]
}
}
- return -1
+ return nil
+}
+
+func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
+ for i := range a.unbindServers {
+ if a.unbindServers[i].Address == info.Address {
+ a.unbindServers[i] = info
+ return true
+ }
+ }
+ return false
}
func (a *app) initTracing(ctx context.Context) {
@@ -727,3 +760,60 @@ func (s *appSettings) setDefaultNamespaces(namespaces []string) {
s.defaultNamespaces = namespaces
s.mu.Unlock()
}
+
+func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
+ go func() {
+ t := time.NewTicker(a.settings.reconnectInterval)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ if a.tryReconnect(ctx, srv) {
+ return
+ }
+ t.Reset(a.settings.reconnectInterval)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+}
+
+func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
+ a.mu.Lock()
+ defer a.mu.Unlock()
+
+ a.log.Info(logs.ServerReconnecting)
+ var failedServers []ServerInfo
+
+ for _, serverInfo := range a.unbindServers {
+ fields := []zap.Field{
+ zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
+ zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
+ }
+
+ srv, err := newServer(ctx, serverInfo)
+ if err != nil {
+ a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
+ failedServers = append(failedServers, serverInfo)
+ a.metrics.MarkUnhealthy(serverInfo.Address)
+ continue
+ }
+
+ go func() {
+ a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
+ a.metrics.MarkHealthy(serverInfo.Address)
+ if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ a.log.Warn(logs.ListenAndServe, zap.Error(err))
+ a.metrics.MarkUnhealthy(serverInfo.Address)
+ }
+ }()
+
+ a.servers = append(a.servers, srv)
+ a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
+ }
+
+ a.unbindServers = failedServers
+
+ return len(a.unbindServers) == 0
+}
diff --git a/cmd/http-gw/settings.go b/cmd/http-gw/settings.go
index a38ebc5..0d97dcb 100644
--- a/cmd/http-gw/settings.go
+++ b/cmd/http-gw/settings.go
@@ -51,11 +51,15 @@ const (
defaultNamespaceHeader = "X-Frostfs-Namespace"
+ defaultReconnectInterval = time.Minute
+
cfgServer = "server"
cfgTLSEnabled = "tls.enabled"
cfgTLSCertFile = "tls.cert_file"
cfgTLSKeyFile = "tls.key_file"
+ cfgReconnectInterval = "reconnect_interval"
+
// Web.
cfgWebReadBufferSize = "web.read_buffer_size"
cfgWebWriteBufferSize = "web.write_buffer_size"
@@ -454,8 +458,18 @@ func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
return lvl, nil
}
-func fetchServers(v *viper.Viper) []ServerInfo {
+func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
+ reconnect := cfg.GetDuration(cfgReconnectInterval)
+ if reconnect <= 0 {
+ reconnect = defaultReconnectInterval
+ }
+
+ return reconnect
+}
+
+func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
var servers []ServerInfo
+ seen := make(map[string]struct{})
for i := 0; ; i++ {
key := cfgServer + "." + strconv.Itoa(i) + "."
@@ -470,6 +484,11 @@ func fetchServers(v *viper.Viper) []ServerInfo {
break
}
+ if _, ok := seen[serverInfo.Address]; ok {
+ log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
+ continue
+ }
+ seen[serverInfo.Address] = struct{}{}
servers = append(servers, serverInfo)
}
diff --git a/config/config.env b/config/config.env
index 12f1ba4..05b83b3 100644
--- a/config/config.env
+++ b/config/config.env
@@ -26,6 +26,9 @@ HTTP_GW_SERVER_1_TLS_ENABLED=true
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
+# How often to reconnect to the servers
+HTTP_GW_RECONNECT_INTERVAL: 1m
+
# Nodes configuration.
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)
diff --git a/config/config.yaml b/config/config.yaml
index 7ea2748..7f8077b 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -55,6 +55,7 @@ peers:
priority: 2
weight: 9
+reconnect_interval: 1m
web:
# Per-connection buffer size for requests' reading.
diff --git a/docs/gate-configuration.md b/docs/gate-configuration.md
index bf792b7..8e3daad 100644
--- a/docs/gate-configuration.md
+++ b/docs/gate-configuration.md
@@ -72,6 +72,7 @@ stream_timeout: 10s
request_timeout: 5s
rebalance_timer: 30s
pool_error_threshold: 100
+reconnect_interval: 1m
```
| Parameter | Type | SIGHUP reload | Default value | Description |
@@ -83,6 +84,7 @@ pool_error_threshold: 100
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
+| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
# `wallet` section
diff --git a/internal/logs/logs.go b/internal/logs/logs.go
index 84954c3..0ab5dbf 100644
--- a/internal/logs/logs.go
+++ b/internal/logs/logs.go
@@ -73,4 +73,8 @@ const (
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
FailedToUnescapeQuery = "failed to unescape query"
+ ServerReconnecting = "reconnecting server..."
+ ServerReconnectedSuccessfully = "server reconnected successfully"
+ ServerReconnectFailed = "failed to reconnect server"
+ WarnDuplicateAddress = "duplicate address"
)