[#291] server auto re-binding

Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
This commit is contained in:
Pavel Pogodaev 2024-02-11 21:00:56 +03:00
parent 94bd1dfe28
commit bfcde09f07
7 changed files with 147 additions and 27 deletions

View file

@ -26,6 +26,7 @@ This document outlines major changes between releases.
- Support `policy` contract (#259)
- Support `proxy` contract (#287)
- Authmate: support custom attributes (#292)
- Add new `reconnect_interval` config param (#291)
### Changed
- Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221)

View file

@ -71,7 +71,9 @@ type (
policyStorage *policy.Storage
servers []Server
servers []Server
unbindServers []ServerInfo
mu sync.RWMutex
controlAPI *grpc.Server
@ -88,6 +90,7 @@ type (
logLevel zap.AtomicLevel
maxClient maxClientsConfig
defaultMaxAge int
reconnectInterval time.Duration
notificatorEnabled bool
resolveZoneList []string
isResolveListAllow bool // True if ResolveZoneList contains allowed zones
@ -205,6 +208,7 @@ func newAppSettings(log *Logger, v *viper.Viper, key *keys.PrivateKey) *appSetti
logLevel: log.lvl,
maxClient: newMaxClients(v),
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
reconnectInterval: fetchReconnectInterval(v),
notificatorEnabled: v.GetBool(cfgEnableNATS),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
}
@ -699,17 +703,23 @@ func (a *App) Serve(ctx context.Context) {
a.startServices()
for i := range a.servers {
go func(i int) {
a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address()))
servs := a.getServers()
if err := srv.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(a.servers[i].Address())
for i := range servs {
go func(i int) {
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
if err := srv.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
}
}(i)
}
if len(a.unbindServers) != 0 {
a.scheduleReconnect(ctx, srv)
}
go func() {
address := a.cfg.GetString(cfgControlGRPCEndpoint)
a.log.Info(logs.StartingControlAPI, zap.String("address", address))
@ -826,7 +836,7 @@ func (a *App) startServices() {
}
func (a *App) initServers(ctx context.Context) {
serversInfo := fetchServers(a.cfg)
serversInfo := fetchServers(a.cfg, a.log)
a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo {
@ -836,6 +846,7 @@ func (a *App) initServers(ctx context.Context) {
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.unbindServers = append(a.unbindServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
continue
@ -852,21 +863,24 @@ func (a *App) initServers(ctx context.Context) {
}
func (a *App) updateServers() error {
serversInfo := fetchServers(a.cfg)
serversInfo := fetchServers(a.cfg, a.log)
a.mu.Lock()
defer a.mu.Unlock()
var found bool
for _, serverInfo := range serversInfo {
index := a.serverIndex(serverInfo.Address)
if index == -1 {
continue
}
if serverInfo.TLS.Enabled {
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
ser := a.getServer(serverInfo.Address)
if ser != nil {
if serverInfo.TLS.Enabled {
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
}
found = true
}
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
found = true
}
found = true
}
if !found {
@ -876,15 +890,6 @@ func (a *App) updateServers() error {
return nil
}
func (a *App) serverIndex(address string) int {
for i := range a.servers {
if a.servers[i].Address() == address {
return i
}
}
return -1
}
func (a *App) stopServices() {
ctx, cancel := shutdownContext()
defer cancel()
@ -959,6 +964,31 @@ func (a *App) initHandler() {
}
}
func (a *App) getServer(address string) Server {
for i := range a.servers {
if a.servers[i].Address() == address {
return a.servers[i]
}
}
return nil
}
func (a *App) updateUnbindServerInfo(info ServerInfo) bool {
for i := range a.unbindServers {
if a.unbindServers[i].Address == info.Address {
a.unbindServers[i] = info
return true
}
}
return false
}
func (a *App) getServers() []Server {
a.mu.RLock()
defer a.mu.RUnlock()
return a.servers
}
func (a *App) setRuntimeParameters() {
if len(os.Getenv("GOMEMLIMIT")) != 0 {
// default limit < yaml limit < app env limit < GOMEMLIMIT
@ -974,3 +1004,60 @@ func (a *App) setRuntimeParameters() {
zap.Int64("old_value", previous))
}
}
func (a *App) scheduleReconnect(ctx context.Context, srv *http.Server) {
go func() {
t := time.NewTicker(a.settings.reconnectInterval)
defer t.Stop()
for {
select {
case <-t.C:
if a.tryReconnect(ctx, srv) {
return
}
t.Reset(a.settings.reconnectInterval)
case <-ctx.Done():
return
}
}
}()
}
func (a *App) tryReconnect(ctx context.Context, sr *http.Server) bool {
a.mu.Lock()
defer a.mu.Unlock()
a.log.Info(logs.ServerReconnecting)
var failedServers []ServerInfo
for _, serverInfo := range a.unbindServers {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
failedServers = append(failedServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
continue
}
go func() {
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
a.metrics.MarkHealthy(serverInfo.Address)
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
a.log.Warn(logs.ListenAndServe, zap.Error(err))
a.metrics.MarkUnhealthy(serverInfo.Address)
}
}()
a.servers = append(a.servers, srv)
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
}
a.unbindServers = failedServers
return len(a.unbindServers) == 0
}

View file

@ -59,6 +59,8 @@ const (
defaultConstraintName = "default"
defaultNamespace = ""
defaultReconnectInterval = time.Minute
)
var (
@ -222,6 +224,9 @@ const ( // Settings.
// Proxy.
cfgProxyContract = "proxy.contract"
// Server.
cfgReconnectInterval = "reconnect_interval"
// envPrefix is an environment variables prefix used for configuration.
envPrefix = "S3_GW"
)
@ -244,6 +249,15 @@ func fetchConnectTimeout(cfg *viper.Viper) time.Duration {
return connTimeout
}
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
reconnect := cfg.GetDuration(cfgReconnectInterval)
if reconnect <= 0 {
reconnect = defaultReconnectInterval
}
return reconnect
}
func fetchStreamTimeout(cfg *viper.Viper) time.Duration {
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
if streamTimeout <= 0 {
@ -611,8 +625,9 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
return nodes
}
func fetchServers(v *viper.Viper) []ServerInfo {
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
var servers []ServerInfo
seen := make(map[string]struct{})
for i := 0; ; i++ {
key := cfgServer + "." + strconv.Itoa(i) + "."
@ -627,6 +642,11 @@ func fetchServers(v *viper.Viper) []ServerInfo {
break
}
if _, ok := seen[serverInfo.Address]; ok {
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
continue
}
seen[serverInfo.Address] = struct{}{}
servers = append(servers, serverInfo)
}

View file

@ -33,6 +33,9 @@ S3_GW_SERVER_1_TLS_ENABLED=true
S3_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
S3_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
# How often to reconnect to the servers
S3_GW_RECONNECT_INTERVAL: 1m
# Control API
# List of hex-encoded public keys that have rights to use the Control Service
S3_GW_CONTROL_AUTHORIZED_KEYS=035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6

View file

@ -25,6 +25,8 @@ peers:
priority: 2
weight: 0.9
reconnect_interval: 1m
server:
- address: 0.0.0.0:8080
tls:

View file

@ -218,6 +218,8 @@ max_clients_deadline: 30s
allowed_access_key_id_prefixes:
- Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX
- 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn
reconnect_interval: 1m
```
| Parameter | Type | SIGHUP reload | Default value | Description |
@ -233,6 +235,7 @@ allowed_access_key_id_prefixes:
| `max_clients_count` | `int` | no | `100` | Limits for processing of clients' requests. |
| `max_clients_deadline` | `duration` | no | `30s` | Deadline after which the gate sends error `RequestTimeout` to a client. |
| `allowed_access_key_id_prefixes` | `[]string` | no | | List of allowed `AccessKeyID` prefixes which S3 GW serve. If the parameter is omitted, all `AccessKeyID` will be accepted. |
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
### `wallet` section

View file

@ -135,6 +135,9 @@ const (
ControlAPIGetPolicy = "get policy request"
ControlAPIListPolicies = "list policies request"
PolicyValidationFailed = "policy validation failed"
ServerReconnecting = "reconnecting server..."
ServerReconnectedSuccessfully = "server reconnected successfully"
ServerReconnectFailed = "failed to reconnect server"
ParseTreeNode = "parse tree node"
FailedToGetRealObjectSize = "failed to get real object size"
CouldntDeleteObjectFromStorageContinueDeleting = "couldn't delete object from storage, continue deleting from tree"
@ -149,4 +152,5 @@ const (
InvalidBucketObjectLockEnabledHeader = "invalid X-Amz-Bucket-Object-Lock-Enabled header"
InvalidTreeKV = "invalid tree service meta KV"
FailedToWriteResponse = "failed to write response"
WarnDuplicateAddress = "duplicate address"
)