From bfcde09f07f0be2302b4bcf2321993d848a794e3 Mon Sep 17 00:00:00 2001
From: Pavel Pogodaev
Date: Sun, 11 Feb 2024 21:00:56 +0300
Subject: [PATCH] [#291] server auto re-binding
Signed-off-by: Pavel Pogodaev
---
CHANGELOG.md | 1 +
cmd/s3-gw/app.go | 139 +++++++++++++++++++++++++++++++-------
cmd/s3-gw/app_settings.go | 22 +++++-
config/config.env | 3 +
config/config.yaml | 2 +
docs/configuration.md | 3 +
internal/logs/logs.go | 4 ++
7 files changed, 147 insertions(+), 27 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7cf6b70..5447d81 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -26,6 +26,7 @@ This document outlines major changes between releases.
- Support `policy` contract (#259)
- Support `proxy` contract (#287)
- Authmate: support custom attributes (#292)
+- Add new `reconnect_interval` config param (#291)
### Changed
- Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221)
diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go
index 222a7ac..7d4e8b6 100644
--- a/cmd/s3-gw/app.go
+++ b/cmd/s3-gw/app.go
@@ -71,7 +71,9 @@ type (
policyStorage *policy.Storage
- servers []Server
+ servers []Server
+ unbindServers []ServerInfo
+ mu sync.RWMutex
controlAPI *grpc.Server
@@ -88,6 +90,7 @@ type (
logLevel zap.AtomicLevel
maxClient maxClientsConfig
defaultMaxAge int
+ reconnectInterval time.Duration
notificatorEnabled bool
resolveZoneList []string
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
@@ -205,6 +208,7 @@ func newAppSettings(log *Logger, v *viper.Viper, key *keys.PrivateKey) *appSetti
logLevel: log.lvl,
maxClient: newMaxClients(v),
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
+ reconnectInterval: fetchReconnectInterval(v),
notificatorEnabled: v.GetBool(cfgEnableNATS),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
}
@@ -699,17 +703,23 @@ func (a *App) Serve(ctx context.Context) {
a.startServices()
- for i := range a.servers {
- go func(i int) {
- a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address()))
+ servs := a.getServers()
- if err := srv.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
- a.metrics.MarkUnhealthy(a.servers[i].Address())
+ for i := range servs {
+ go func(i int) {
+ a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
+
+ if err := srv.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
+ a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
}
}(i)
}
+ if len(a.unbindServers) != 0 {
+ a.scheduleReconnect(ctx, srv)
+ }
+
go func() {
address := a.cfg.GetString(cfgControlGRPCEndpoint)
a.log.Info(logs.StartingControlAPI, zap.String("address", address))
@@ -826,7 +836,7 @@ func (a *App) startServices() {
}
func (a *App) initServers(ctx context.Context) {
- serversInfo := fetchServers(a.cfg)
+ serversInfo := fetchServers(a.cfg, a.log)
a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo {
@@ -836,6 +846,7 @@ func (a *App) initServers(ctx context.Context) {
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
+ a.unbindServers = append(a.unbindServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
continue
@@ -852,21 +863,24 @@ func (a *App) initServers(ctx context.Context) {
}
func (a *App) updateServers() error {
- serversInfo := fetchServers(a.cfg)
+ serversInfo := fetchServers(a.cfg, a.log)
+
+ a.mu.Lock()
+ defer a.mu.Unlock()
var found bool
for _, serverInfo := range serversInfo {
- index := a.serverIndex(serverInfo.Address)
- if index == -1 {
- continue
- }
-
- if serverInfo.TLS.Enabled {
- if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
- return fmt.Errorf("failed to update tls certs: %w", err)
+ ser := a.getServer(serverInfo.Address)
+ if ser != nil {
+ if serverInfo.TLS.Enabled {
+ if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
+ return fmt.Errorf("failed to update tls certs: %w", err)
+ }
+ found = true
}
+ } else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
+ found = true
}
- found = true
}
if !found {
@@ -876,15 +890,6 @@ func (a *App) updateServers() error {
return nil
}
-func (a *App) serverIndex(address string) int {
- for i := range a.servers {
- if a.servers[i].Address() == address {
- return i
- }
- }
- return -1
-}
-
func (a *App) stopServices() {
ctx, cancel := shutdownContext()
defer cancel()
@@ -959,6 +964,31 @@ func (a *App) initHandler() {
}
}
+func (a *App) getServer(address string) Server {
+ for i := range a.servers {
+ if a.servers[i].Address() == address {
+ return a.servers[i]
+ }
+ }
+ return nil
+}
+
+func (a *App) updateUnbindServerInfo(info ServerInfo) bool {
+ for i := range a.unbindServers {
+ if a.unbindServers[i].Address == info.Address {
+ a.unbindServers[i] = info
+ return true
+ }
+ }
+ return false
+}
+
+func (a *App) getServers() []Server {
+ a.mu.RLock()
+ defer a.mu.RUnlock()
+ return a.servers
+}
+
func (a *App) setRuntimeParameters() {
if len(os.Getenv("GOMEMLIMIT")) != 0 {
// default limit < yaml limit < app env limit < GOMEMLIMIT
@@ -974,3 +1004,60 @@ func (a *App) setRuntimeParameters() {
zap.Int64("old_value", previous))
}
}
+
+func (a *App) scheduleReconnect(ctx context.Context, srv *http.Server) {
+ go func() {
+ t := time.NewTicker(a.settings.reconnectInterval)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ if a.tryReconnect(ctx, srv) {
+ return
+ }
+ t.Reset(a.settings.reconnectInterval)
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+}
+
+func (a *App) tryReconnect(ctx context.Context, sr *http.Server) bool {
+ a.mu.Lock()
+ defer a.mu.Unlock()
+
+ a.log.Info(logs.ServerReconnecting)
+ var failedServers []ServerInfo
+
+ for _, serverInfo := range a.unbindServers {
+ fields := []zap.Field{
+ zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
+ zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
+ }
+
+ srv, err := newServer(ctx, serverInfo)
+ if err != nil {
+ a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
+ failedServers = append(failedServers, serverInfo)
+ a.metrics.MarkUnhealthy(serverInfo.Address)
+ continue
+ }
+
+ go func() {
+ a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
+ a.metrics.MarkHealthy(serverInfo.Address)
+ if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ a.log.Warn(logs.ListenAndServe, zap.Error(err))
+ a.metrics.MarkUnhealthy(serverInfo.Address)
+ }
+ }()
+
+ a.servers = append(a.servers, srv)
+ a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
+ }
+
+ a.unbindServers = failedServers
+
+ return len(a.unbindServers) == 0
+}
diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go
index cc2dd5c..2025765 100644
--- a/cmd/s3-gw/app_settings.go
+++ b/cmd/s3-gw/app_settings.go
@@ -59,6 +59,8 @@ const (
defaultConstraintName = "default"
defaultNamespace = ""
+
+ defaultReconnectInterval = time.Minute
)
var (
@@ -222,6 +224,9 @@ const ( // Settings.
// Proxy.
cfgProxyContract = "proxy.contract"
+ // Server.
+ cfgReconnectInterval = "reconnect_interval"
+
// envPrefix is an environment variables prefix used for configuration.
envPrefix = "S3_GW"
)
@@ -244,6 +249,15 @@ func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
return connTimeout
}
+func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
+ reconnect := cfg.GetDuration(cfgReconnectInterval)
+ if reconnect <= 0 {
+ reconnect = defaultReconnectInterval
+ }
+
+ return reconnect
+}
+
func fetchStreamTimeout(cfg *viper.Viper) time.Duration {
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
if streamTimeout <= 0 {
@@ -611,8 +625,9 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
return nodes
}
-func fetchServers(v *viper.Viper) []ServerInfo {
+func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
var servers []ServerInfo
+ seen := make(map[string]struct{})
for i := 0; ; i++ {
key := cfgServer + "." + strconv.Itoa(i) + "."
@@ -627,6 +642,11 @@ func fetchServers(v *viper.Viper) []ServerInfo {
break
}
+ if _, ok := seen[serverInfo.Address]; ok {
+ log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
+ continue
+ }
+ seen[serverInfo.Address] = struct{}{}
servers = append(servers, serverInfo)
}
diff --git a/config/config.env b/config/config.env
index 3fff17a..7110985 100644
--- a/config/config.env
+++ b/config/config.env
@@ -33,6 +33,9 @@ S3_GW_SERVER_1_TLS_ENABLED=true
S3_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
S3_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
+# How often to reconnect to the servers
+S3_GW_RECONNECT_INTERVAL: 1m
+
# Control API
# List of hex-encoded public keys that have rights to use the Control Service
S3_GW_CONTROL_AUTHORIZED_KEYS=035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6
diff --git a/config/config.yaml b/config/config.yaml
index fe16150..16c8513 100644
--- a/config/config.yaml
+++ b/config/config.yaml
@@ -25,6 +25,8 @@ peers:
priority: 2
weight: 0.9
+reconnect_interval: 1m
+
server:
- address: 0.0.0.0:8080
tls:
diff --git a/docs/configuration.md b/docs/configuration.md
index c3327ff..280d9dc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -218,6 +218,8 @@ max_clients_deadline: 30s
allowed_access_key_id_prefixes:
- Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX
- 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn
+
+reconnect_interval: 1m
```
| Parameter | Type | SIGHUP reload | Default value | Description |
@@ -233,6 +235,7 @@ allowed_access_key_id_prefixes:
| `max_clients_count` | `int` | no | `100` | Limits for processing of clients' requests. |
| `max_clients_deadline` | `duration` | no | `30s` | Deadline after which the gate sends error `RequestTimeout` to a client. |
| `allowed_access_key_id_prefixes` | `[]string` | no | | List of allowed `AccessKeyID` prefixes which S3 GW serve. If the parameter is omitted, all `AccessKeyID` will be accepted. |
+| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
### `wallet` section
diff --git a/internal/logs/logs.go b/internal/logs/logs.go
index 3bed892..14a4c66 100644
--- a/internal/logs/logs.go
+++ b/internal/logs/logs.go
@@ -135,6 +135,9 @@ const (
ControlAPIGetPolicy = "get policy request"
ControlAPIListPolicies = "list policies request"
PolicyValidationFailed = "policy validation failed"
+ ServerReconnecting = "reconnecting server..."
+ ServerReconnectedSuccessfully = "server reconnected successfully"
+ ServerReconnectFailed = "failed to reconnect server"
ParseTreeNode = "parse tree node"
FailedToGetRealObjectSize = "failed to get real object size"
CouldntDeleteObjectFromStorageContinueDeleting = "couldn't delete object from storage, continue deleting from tree"
@@ -149,4 +152,5 @@ const (
InvalidBucketObjectLockEnabledHeader = "invalid X-Amz-Bucket-Object-Lock-Enabled header"
InvalidTreeKV = "invalid tree service meta KV"
FailedToWriteResponse = "failed to write response"
+ WarnDuplicateAddress = "duplicate address"
)