[#100] server auto re-binding #109
7 changed files with 142 additions and 22 deletions
|
@ -8,6 +8,7 @@ This document outlines major changes between releases.
|
|||
|
||||
### Added
|
||||
- Tree pool traversal limit (#92)
|
||||
- Add new `reconnect_interval` config param (#100)
|
||||
|
||||
### Update from 0.28.0
|
||||
See new `frostfs.tree_pool_max_attempts` config parameter.
|
||||
|
|
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -57,7 +58,10 @@ type (
|
|||
metrics *gateMetrics
|
||||
services []*metrics.Service
|
||||
settings *appSettings
|
||||
servers []Server
|
||||
|
||||
servers []Server
|
||||
unbindServers []ServerInfo
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// App is an interface for the main gateway function.
|
||||
|
@ -78,6 +82,8 @@ type (
|
|||
|
||||
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
|
||||
appSettings struct {
|
||||
reconnectInterval time.Duration
|
||||
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
zipCompression bool
|
||||
|
@ -199,8 +205,9 @@ func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
|
|||
}
|
||||
|
||||
func (a *app) initAppSettings() {
|
||||
a.settings = &appSettings{}
|
||||
|
||||
a.settings = &appSettings{
|
||||
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||
}
|
||||
a.updateSettings()
|
||||
}
|
||||
|
||||
|
@ -399,16 +406,22 @@ func (a *app) Serve() {
|
|||
a.startServices()
|
||||
a.initServers(a.ctx)
|
||||
|
||||
for i := range a.servers {
|
||||
servs := a.getServers()
|
||||
|
||||
for i := range servs {
|
||||
go func(i int) {
|
||||
a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address()))
|
||||
if err := a.webServer.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
|
||||
a.metrics.MarkUnhealthy(a.servers[i].Address())
|
||||
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
|
||||
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
|
||||
a.metrics.MarkUnhealthy(servs[i].Address())
|
||||
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
if len(a.unbindServers) != 0 {
|
||||
a.scheduleReconnect(a.ctx, a.webServer)
|
||||
}
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGHUP)
|
||||
|
||||
|
@ -598,7 +611,7 @@ func (a *app) AppParams() *utils.AppParams {
|
|||
}
|
||||
|
||||
func (a *app) initServers(ctx context.Context) {
|
||||
serversInfo := fetchServers(a.cfg)
|
||||
serversInfo := fetchServers(a.cfg, a.log)
|
||||
|
||||
a.servers = make([]Server, 0, len(serversInfo))
|
||||
for _, serverInfo := range serversInfo {
|
||||
|
@ -608,6 +621,7 @@ func (a *app) initServers(ctx context.Context) {
|
|||
}
|
||||
srv, err := newServer(ctx, serverInfo)
|
||||
if err != nil {
|
||||
a.unbindServers = append(a.unbindServers, serverInfo)
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
|
||||
continue
|
||||
|
@ -624,21 +638,24 @@ func (a *app) initServers(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (a *app) updateServers() error {
|
||||
serversInfo := fetchServers(a.cfg)
|
||||
serversInfo := fetchServers(a.cfg, a.log)
|
||||
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
var found bool
|
||||
for _, serverInfo := range serversInfo {
|
||||
index := a.serverIndex(serverInfo.Address)
|
||||
if index == -1 {
|
||||
continue
|
||||
}
|
||||
|
||||
if serverInfo.TLS.Enabled {
|
||||
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
||||
return fmt.Errorf("failed to update tls certs: %w", err)
|
||||
ser := a.getServer(serverInfo.Address)
|
||||
if ser != nil {
|
||||
if serverInfo.TLS.Enabled {
|
||||
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
||||
return fmt.Errorf("failed to update tls certs: %w", err)
|
||||
}
|
||||
found = true
|
||||
}
|
||||
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
|
||||
found = true
|
||||
}
|
||||
found = true
|
||||
}
|
||||
|
||||
if !found {
|
||||
|
@ -648,13 +665,29 @@ func (a *app) updateServers() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *app) serverIndex(address string) int {
|
||||
func (a *app) getServers() []Server {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return a.servers
|
||||
}
|
||||
|
||||
func (a *app) getServer(address string) Server {
|
||||
for i := range a.servers {
|
||||
if a.servers[i].Address() == address {
|
||||
return i
|
||||
return a.servers[i]
|
||||
}
|
||||
}
|
||||
return -1
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
|
||||
for i := range a.unbindServers {
|
||||
if a.unbindServers[i].Address == info.Address {
|
||||
a.unbindServers[i] = info
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (a *app) initTracing(ctx context.Context) {
|
||||
|
@ -727,3 +760,60 @@ func (s *appSettings) setDefaultNamespaces(namespaces []string) {
|
|||
s.defaultNamespaces = namespaces
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
|
||||
go func() {
|
||||
t := time.NewTicker(a.settings.reconnectInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if a.tryReconnect(ctx, srv) {
|
||||
return
|
||||
}
|
||||
t.Reset(a.settings.reconnectInterval)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
a.log.Info(logs.ServerReconnecting)
|
||||
var failedServers []ServerInfo
|
||||
|
||||
for _, serverInfo := range a.unbindServers {
|
||||
fields := []zap.Field{
|
||||
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||
}
|
||||
|
||||
srv, err := newServer(ctx, serverInfo)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
|
||||
failedServers = append(failedServers, serverInfo)
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
continue
|
||||
}
|
||||
|
||||
go func() {
|
||||
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
|
||||
a.metrics.MarkHealthy(serverInfo.Address)
|
||||
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
a.log.Warn(logs.ListenAndServe, zap.Error(err))
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
}
|
||||
}()
|
||||
|
||||
a.servers = append(a.servers, srv)
|
||||
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
|
||||
}
|
||||
|
||||
a.unbindServers = failedServers
|
||||
|
||||
return len(a.unbindServers) == 0
|
||||
}
|
||||
|
|
|
@ -51,11 +51,15 @@ const (
|
|||
|
||||
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
||||
|
||||
defaultReconnectInterval = time.Minute
|
||||
|
||||
cfgServer = "server"
|
||||
cfgTLSEnabled = "tls.enabled"
|
||||
cfgTLSCertFile = "tls.cert_file"
|
||||
cfgTLSKeyFile = "tls.key_file"
|
||||
|
||||
cfgReconnectInterval = "reconnect_interval"
|
||||
|
||||
// Web.
|
||||
cfgWebReadBufferSize = "web.read_buffer_size"
|
||||
cfgWebWriteBufferSize = "web.write_buffer_size"
|
||||
|
@ -454,8 +458,18 @@ func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
|||
return lvl, nil
|
||||
}
|
||||
|
||||
func fetchServers(v *viper.Viper) []ServerInfo {
|
||||
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
||||
reconnect := cfg.GetDuration(cfgReconnectInterval)
|
||||
if reconnect <= 0 {
|
||||
reconnect = defaultReconnectInterval
|
||||
}
|
||||
|
||||
return reconnect
|
||||
}
|
||||
|
||||
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
||||
var servers []ServerInfo
|
||||
seen := make(map[string]struct{})
|
||||
|
||||
for i := 0; ; i++ {
|
||||
key := cfgServer + "." + strconv.Itoa(i) + "."
|
||||
|
@ -470,6 +484,11 @@ func fetchServers(v *viper.Viper) []ServerInfo {
|
|||
break
|
||||
}
|
||||
|
||||
if _, ok := seen[serverInfo.Address]; ok {
|
||||
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
|
||||
continue
|
||||
}
|
||||
seen[serverInfo.Address] = struct{}{}
|
||||
servers = append(servers, serverInfo)
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,9 @@ HTTP_GW_SERVER_1_TLS_ENABLED=true
|
|||
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
|
||||
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
|
||||
|
||||
# How often to reconnect to the servers
|
||||
HTTP_GW_RECONNECT_INTERVAL: 1m
|
||||
|
||||
# Nodes configuration.
|
||||
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
|
||||
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)
|
||||
|
|
|
@ -55,6 +55,7 @@ peers:
|
|||
priority: 2
|
||||
weight: 9
|
||||
|
||||
reconnect_interval: 1m
|
||||
|
||||
web:
|
||||
# Per-connection buffer size for requests' reading.
|
||||
|
|
|
@ -72,6 +72,7 @@ stream_timeout: 10s
|
|||
request_timeout: 5s
|
||||
rebalance_timer: 30s
|
||||
pool_error_threshold: 100
|
||||
reconnect_interval: 1m
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|
@ -83,6 +84,7 @@ pool_error_threshold: 100
|
|||
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
||||
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
||||
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
||||
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
|
||||
|
||||
# `wallet` section
|
||||
|
||||
|
|
|
@ -73,4 +73,8 @@ const (
|
|||
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
|
||||
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
|
||||
FailedToUnescapeQuery = "failed to unescape query"
|
||||
ServerReconnecting = "reconnecting server..."
|
||||
ServerReconnectedSuccessfully = "server reconnected successfully"
|
||||
ServerReconnectFailed = "failed to reconnect server"
|
||||
WarnDuplicateAddress = "duplicate address"
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue