[#100] server auto re-binding
Some checks failed
/ DCO (pull_request) Successful in 3m26s
/ Builds (1.20) (pull_request) Successful in 4m25s
/ Builds (1.21) (pull_request) Successful in 3m30s
/ Vulncheck (pull_request) Failing after 3m51s
/ Lint (pull_request) Successful in 6m8s
/ Tests (1.20) (pull_request) Successful in 4m26s
/ Tests (1.21) (pull_request) Successful in 4m17s
Some checks failed
/ DCO (pull_request) Successful in 3m26s
/ Builds (1.20) (pull_request) Successful in 4m25s
/ Builds (1.21) (pull_request) Successful in 3m30s
/ Vulncheck (pull_request) Failing after 3m51s
/ Lint (pull_request) Successful in 6m8s
/ Tests (1.20) (pull_request) Successful in 4m26s
/ Tests (1.21) (pull_request) Successful in 4m17s
Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
This commit is contained in:
parent
a95dc6c8c7
commit
11965deb41
7 changed files with 142 additions and 22 deletions
|
@ -8,6 +8,7 @@ This document outlines major changes between releases.
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- Tree pool traversal limit (#92)
|
- Tree pool traversal limit (#92)
|
||||||
|
- Add new `reconnect_interval` config param (#100)
|
||||||
|
|
||||||
### Update from 0.28.0
|
### Update from 0.28.0
|
||||||
See new `frostfs.tree_pool_max_attempts` config parameter.
|
See new `frostfs.tree_pool_max_attempts` config parameter.
|
||||||
|
|
|
@ -2,6 +2,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -57,7 +58,10 @@ type (
|
||||||
metrics *gateMetrics
|
metrics *gateMetrics
|
||||||
services []*metrics.Service
|
services []*metrics.Service
|
||||||
settings *appSettings
|
settings *appSettings
|
||||||
servers []Server
|
|
||||||
|
servers []Server
|
||||||
|
unbindServers []ServerInfo
|
||||||
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// App is an interface for the main gateway function.
|
// App is an interface for the main gateway function.
|
||||||
|
@ -78,6 +82,8 @@ type (
|
||||||
|
|
||||||
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
|
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
|
||||||
appSettings struct {
|
appSettings struct {
|
||||||
|
reconnectInterval time.Duration
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
defaultTimestamp bool
|
defaultTimestamp bool
|
||||||
zipCompression bool
|
zipCompression bool
|
||||||
|
@ -199,8 +205,9 @@ func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initAppSettings() {
|
func (a *app) initAppSettings() {
|
||||||
a.settings = &appSettings{}
|
a.settings = &appSettings{
|
||||||
|
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||||
|
}
|
||||||
a.updateSettings()
|
a.updateSettings()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -399,16 +406,22 @@ func (a *app) Serve() {
|
||||||
a.startServices()
|
a.startServices()
|
||||||
a.initServers(a.ctx)
|
a.initServers(a.ctx)
|
||||||
|
|
||||||
for i := range a.servers {
|
servs := a.getServers()
|
||||||
|
|
||||||
|
for i := range servs {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address()))
|
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
|
||||||
if err := a.webServer.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
|
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
|
||||||
a.metrics.MarkUnhealthy(a.servers[i].Address())
|
a.metrics.MarkUnhealthy(servs[i].Address())
|
||||||
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(a.unbindServers) != 0 {
|
||||||
|
a.scheduleReconnect(a.ctx, a.webServer)
|
||||||
|
}
|
||||||
|
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, syscall.SIGHUP)
|
signal.Notify(sigs, syscall.SIGHUP)
|
||||||
|
|
||||||
|
@ -598,7 +611,7 @@ func (a *app) AppParams() *utils.AppParams {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initServers(ctx context.Context) {
|
func (a *app) initServers(ctx context.Context) {
|
||||||
serversInfo := fetchServers(a.cfg)
|
serversInfo := fetchServers(a.cfg, a.log)
|
||||||
|
|
||||||
a.servers = make([]Server, 0, len(serversInfo))
|
a.servers = make([]Server, 0, len(serversInfo))
|
||||||
for _, serverInfo := range serversInfo {
|
for _, serverInfo := range serversInfo {
|
||||||
|
@ -608,6 +621,7 @@ func (a *app) initServers(ctx context.Context) {
|
||||||
}
|
}
|
||||||
srv, err := newServer(ctx, serverInfo)
|
srv, err := newServer(ctx, serverInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
a.unbindServers = append(a.unbindServers, serverInfo)
|
||||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||||
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
|
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
|
||||||
continue
|
continue
|
||||||
|
@ -624,21 +638,24 @@ func (a *app) initServers(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) updateServers() error {
|
func (a *app) updateServers() error {
|
||||||
serversInfo := fetchServers(a.cfg)
|
serversInfo := fetchServers(a.cfg, a.log)
|
||||||
|
|
||||||
|
a.mu.Lock()
|
||||||
|
defer a.mu.Unlock()
|
||||||
|
|
||||||
var found bool
|
var found bool
|
||||||
for _, serverInfo := range serversInfo {
|
for _, serverInfo := range serversInfo {
|
||||||
index := a.serverIndex(serverInfo.Address)
|
ser := a.getServer(serverInfo.Address)
|
||||||
if index == -1 {
|
if ser != nil {
|
||||||
continue
|
if serverInfo.TLS.Enabled {
|
||||||
}
|
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
||||||
|
return fmt.Errorf("failed to update tls certs: %w", err)
|
||||||
if serverInfo.TLS.Enabled {
|
}
|
||||||
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
found = true
|
||||||
return fmt.Errorf("failed to update tls certs: %w", err)
|
|
||||||
}
|
}
|
||||||
|
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
|
||||||
|
found = true
|
||||||
}
|
}
|
||||||
found = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !found {
|
if !found {
|
||||||
|
@ -648,13 +665,29 @@ func (a *app) updateServers() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) serverIndex(address string) int {
|
func (a *app) getServers() []Server {
|
||||||
|
a.mu.RLock()
|
||||||
|
defer a.mu.RUnlock()
|
||||||
|
return a.servers
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) getServer(address string) Server {
|
||||||
for i := range a.servers {
|
for i := range a.servers {
|
||||||
if a.servers[i].Address() == address {
|
if a.servers[i].Address() == address {
|
||||||
return i
|
return a.servers[i]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
|
||||||
|
for i := range a.unbindServers {
|
||||||
|
if a.unbindServers[i].Address == info.Address {
|
||||||
|
a.unbindServers[i] = info
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initTracing(ctx context.Context) {
|
func (a *app) initTracing(ctx context.Context) {
|
||||||
|
@ -727,3 +760,60 @@ func (s *appSettings) setDefaultNamespaces(namespaces []string) {
|
||||||
s.defaultNamespaces = namespaces
|
s.defaultNamespaces = namespaces
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
|
||||||
|
go func() {
|
||||||
|
t := time.NewTicker(a.settings.reconnectInterval)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
if a.tryReconnect(ctx, srv) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Reset(a.settings.reconnectInterval)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
|
||||||
|
a.mu.Lock()
|
||||||
|
defer a.mu.Unlock()
|
||||||
|
|
||||||
|
a.log.Info(logs.ServerReconnecting)
|
||||||
|
var failedServers []ServerInfo
|
||||||
|
|
||||||
|
for _, serverInfo := range a.unbindServers {
|
||||||
|
fields := []zap.Field{
|
||||||
|
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||||
|
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||||
|
}
|
||||||
|
|
||||||
|
srv, err := newServer(ctx, serverInfo)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
|
||||||
|
failedServers = append(failedServers, serverInfo)
|
||||||
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
|
||||||
|
a.metrics.MarkHealthy(serverInfo.Address)
|
||||||
|
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
|
a.log.Warn(logs.ListenAndServe, zap.Error(err))
|
||||||
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
a.servers = append(a.servers, srv)
|
||||||
|
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.unbindServers = failedServers
|
||||||
|
|
||||||
|
return len(a.unbindServers) == 0
|
||||||
|
}
|
||||||
|
|
|
@ -51,11 +51,15 @@ const (
|
||||||
|
|
||||||
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
||||||
|
|
||||||
|
defaultReconnectInterval = time.Minute
|
||||||
|
|
||||||
cfgServer = "server"
|
cfgServer = "server"
|
||||||
cfgTLSEnabled = "tls.enabled"
|
cfgTLSEnabled = "tls.enabled"
|
||||||
cfgTLSCertFile = "tls.cert_file"
|
cfgTLSCertFile = "tls.cert_file"
|
||||||
cfgTLSKeyFile = "tls.key_file"
|
cfgTLSKeyFile = "tls.key_file"
|
||||||
|
|
||||||
|
cfgReconnectInterval = "reconnect_interval"
|
||||||
|
|
||||||
// Web.
|
// Web.
|
||||||
cfgWebReadBufferSize = "web.read_buffer_size"
|
cfgWebReadBufferSize = "web.read_buffer_size"
|
||||||
cfgWebWriteBufferSize = "web.write_buffer_size"
|
cfgWebWriteBufferSize = "web.write_buffer_size"
|
||||||
|
@ -454,8 +458,18 @@ func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
||||||
return lvl, nil
|
return lvl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchServers(v *viper.Viper) []ServerInfo {
|
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
||||||
|
reconnect := cfg.GetDuration(cfgReconnectInterval)
|
||||||
|
if reconnect <= 0 {
|
||||||
|
reconnect = defaultReconnectInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
return reconnect
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
||||||
var servers []ServerInfo
|
var servers []ServerInfo
|
||||||
|
seen := make(map[string]struct{})
|
||||||
|
|
||||||
for i := 0; ; i++ {
|
for i := 0; ; i++ {
|
||||||
key := cfgServer + "." + strconv.Itoa(i) + "."
|
key := cfgServer + "." + strconv.Itoa(i) + "."
|
||||||
|
@ -470,6 +484,11 @@ func fetchServers(v *viper.Viper) []ServerInfo {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := seen[serverInfo.Address]; ok {
|
||||||
|
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[serverInfo.Address] = struct{}{}
|
||||||
servers = append(servers, serverInfo)
|
servers = append(servers, serverInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,9 @@ HTTP_GW_SERVER_1_TLS_ENABLED=true
|
||||||
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
|
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
|
||||||
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
|
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
|
||||||
|
|
||||||
|
# How often to reconnect to the servers
|
||||||
|
HTTP_GW_RECONNECT_INTERVAL: 1m
|
||||||
|
|
||||||
# Nodes configuration.
|
# Nodes configuration.
|
||||||
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
|
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
|
||||||
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)
|
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)
|
||||||
|
|
|
@ -55,6 +55,7 @@ peers:
|
||||||
priority: 2
|
priority: 2
|
||||||
weight: 9
|
weight: 9
|
||||||
|
|
||||||
|
reconnect_interval: 1m
|
||||||
|
|
||||||
web:
|
web:
|
||||||
# Per-connection buffer size for requests' reading.
|
# Per-connection buffer size for requests' reading.
|
||||||
|
|
|
@ -72,6 +72,7 @@ stream_timeout: 10s
|
||||||
request_timeout: 5s
|
request_timeout: 5s
|
||||||
rebalance_timer: 30s
|
rebalance_timer: 30s
|
||||||
pool_error_threshold: 100
|
pool_error_threshold: 100
|
||||||
|
reconnect_interval: 1m
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
@ -83,6 +84,7 @@ pool_error_threshold: 100
|
||||||
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
||||||
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
||||||
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
||||||
|
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
|
||||||
|
|
||||||
# `wallet` section
|
# `wallet` section
|
||||||
|
|
||||||
|
|
|
@ -73,4 +73,8 @@ const (
|
||||||
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
|
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
|
||||||
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
|
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
|
||||||
FailedToUnescapeQuery = "failed to unescape query"
|
FailedToUnescapeQuery = "failed to unescape query"
|
||||||
|
ServerReconnecting = "reconnecting server..."
|
||||||
|
ServerReconnectedSuccessfully = "server reconnected successfully"
|
||||||
|
ServerReconnectFailed = "failed to reconnect server"
|
||||||
|
WarnDuplicateAddress = "duplicate address"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue