diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cf6b70..5447d81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ This document outlines major changes between releases. - Support `policy` contract (#259) - Support `proxy` contract (#287) - Authmate: support custom attributes (#292) +- Add new `reconnect_interval` config param (#291) ### Changed - Generalise config param `use_default_xmlns_for_complete_multipart` to `use_default_xmlns` so that use default xmlns for all requests (#221) diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 222a7ac..7d4e8b6 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -71,7 +71,9 @@ type ( policyStorage *policy.Storage - servers []Server + servers []Server + unbindServers []ServerInfo + mu sync.RWMutex controlAPI *grpc.Server @@ -88,6 +90,7 @@ type ( logLevel zap.AtomicLevel maxClient maxClientsConfig defaultMaxAge int + reconnectInterval time.Duration notificatorEnabled bool resolveZoneList []string isResolveListAllow bool // True if ResolveZoneList contains allowed zones @@ -205,6 +208,7 @@ func newAppSettings(log *Logger, v *viper.Viper, key *keys.PrivateKey) *appSetti logLevel: log.lvl, maxClient: newMaxClients(v), defaultMaxAge: fetchDefaultMaxAge(v, log.logger), + reconnectInterval: fetchReconnectInterval(v), notificatorEnabled: v.GetBool(cfgEnableNATS), frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), } @@ -699,17 +703,23 @@ func (a *App) Serve(ctx context.Context) { a.startServices() - for i := range a.servers { - go func(i int) { - a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address())) + servs := a.getServers() - if err := srv.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed { - a.metrics.MarkUnhealthy(a.servers[i].Address()) + for i := range servs { + go func(i int) { + a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address())) + + if err := srv.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed { + a.metrics.MarkUnhealthy(servs[i].Address()) a.log.Fatal(logs.ListenAndServe, zap.Error(err)) } }(i) } + if len(a.unbindServers) != 0 { + a.scheduleReconnect(ctx, srv) + } + go func() { address := a.cfg.GetString(cfgControlGRPCEndpoint) a.log.Info(logs.StartingControlAPI, zap.String("address", address)) @@ -826,7 +836,7 @@ func (a *App) startServices() { } func (a *App) initServers(ctx context.Context) { - serversInfo := fetchServers(a.cfg) + serversInfo := fetchServers(a.cfg, a.log) a.servers = make([]Server, 0, len(serversInfo)) for _, serverInfo := range serversInfo { @@ -836,6 +846,7 @@ func (a *App) initServers(ctx context.Context) { } srv, err := newServer(ctx, serverInfo) if err != nil { + a.unbindServers = append(a.unbindServers, serverInfo) a.metrics.MarkUnhealthy(serverInfo.Address) a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...) continue @@ -852,21 +863,24 @@ func (a *App) initServers(ctx context.Context) { } func (a *App) updateServers() error { - serversInfo := fetchServers(a.cfg) + serversInfo := fetchServers(a.cfg, a.log) + + a.mu.Lock() + defer a.mu.Unlock() var found bool for _, serverInfo := range serversInfo { - index := a.serverIndex(serverInfo.Address) - if index == -1 { - continue - } - - if serverInfo.TLS.Enabled { - if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil { - return fmt.Errorf("failed to update tls certs: %w", err) + ser := a.getServer(serverInfo.Address) + if ser != nil { + if serverInfo.TLS.Enabled { + if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil { + return fmt.Errorf("failed to update tls certs: %w", err) + } + found = true } + } else if unbind := a.updateUnbindServerInfo(serverInfo); unbind { + found = true } - found = true } if !found { @@ -876,15 +890,6 @@ func (a *App) updateServers() error { return nil } -func (a *App) serverIndex(address string) int { - for i := range a.servers { - if a.servers[i].Address() == address { - return i - } - } - return -1 -} - func (a *App) stopServices() { ctx, cancel := shutdownContext() defer cancel() @@ -959,6 +964,31 @@ func (a *App) initHandler() { } } +func (a *App) getServer(address string) Server { + for i := range a.servers { + if a.servers[i].Address() == address { + return a.servers[i] + } + } + return nil +} + +func (a *App) updateUnbindServerInfo(info ServerInfo) bool { + for i := range a.unbindServers { + if a.unbindServers[i].Address == info.Address { + a.unbindServers[i] = info + return true + } + } + return false +} + +func (a *App) getServers() []Server { + a.mu.RLock() + defer a.mu.RUnlock() + return a.servers +} + func (a *App) setRuntimeParameters() { if len(os.Getenv("GOMEMLIMIT")) != 0 { // default limit < yaml limit < app env limit < GOMEMLIMIT @@ -974,3 +1004,60 @@ func (a *App) setRuntimeParameters() { zap.Int64("old_value", previous)) } } + +func (a *App) scheduleReconnect(ctx context.Context, srv *http.Server) { + go func() { + t := time.NewTicker(a.settings.reconnectInterval) + defer t.Stop() + for { + select { + case <-t.C: + if a.tryReconnect(ctx, srv) { + return + } + t.Reset(a.settings.reconnectInterval) + case <-ctx.Done(): + return + } + } + }() +} + +func (a *App) tryReconnect(ctx context.Context, sr *http.Server) bool { + a.mu.Lock() + defer a.mu.Unlock() + + a.log.Info(logs.ServerReconnecting) + var failedServers []ServerInfo + + for _, serverInfo := range a.unbindServers { + fields := []zap.Field{ + zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled), + zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile), + } + + srv, err := newServer(ctx, serverInfo) + if err != nil { + a.log.Warn(logs.ServerReconnectFailed, zap.Error(err)) + failedServers = append(failedServers, serverInfo) + a.metrics.MarkUnhealthy(serverInfo.Address) + continue + } + + go func() { + a.log.Info(logs.StartingServer, zap.String("address", srv.Address())) + a.metrics.MarkHealthy(serverInfo.Address) + if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) { + a.log.Warn(logs.ListenAndServe, zap.Error(err)) + a.metrics.MarkUnhealthy(serverInfo.Address) + } + }() + + a.servers = append(a.servers, srv) + a.log.Info(logs.ServerReconnectedSuccessfully, fields...) + } + + a.unbindServers = failedServers + + return len(a.unbindServers) == 0 +} diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index cc2dd5c..2025765 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -59,6 +59,8 @@ const ( defaultConstraintName = "default" defaultNamespace = "" + + defaultReconnectInterval = time.Minute ) var ( @@ -222,6 +224,9 @@ const ( // Settings. // Proxy. cfgProxyContract = "proxy.contract" + // Server. + cfgReconnectInterval = "reconnect_interval" + // envPrefix is an environment variables prefix used for configuration. envPrefix = "S3_GW" ) @@ -244,6 +249,15 @@ func fetchConnectTimeout(cfg *viper.Viper) time.Duration { return connTimeout } +func fetchReconnectInterval(cfg *viper.Viper) time.Duration { + reconnect := cfg.GetDuration(cfgReconnectInterval) + if reconnect <= 0 { + reconnect = defaultReconnectInterval + } + + return reconnect +} + func fetchStreamTimeout(cfg *viper.Viper) time.Duration { streamTimeout := cfg.GetDuration(cfgStreamTimeout) if streamTimeout <= 0 { @@ -611,8 +625,9 @@ func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam { return nodes } -func fetchServers(v *viper.Viper) []ServerInfo { +func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo { var servers []ServerInfo + seen := make(map[string]struct{}) for i := 0; ; i++ { key := cfgServer + "." + strconv.Itoa(i) + "." @@ -627,6 +642,11 @@ func fetchServers(v *viper.Viper) []ServerInfo { break } + if _, ok := seen[serverInfo.Address]; ok { + log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address)) + continue + } + seen[serverInfo.Address] = struct{}{} servers = append(servers, serverInfo) } diff --git a/config/config.env b/config/config.env index 3fff17a..7110985 100644 --- a/config/config.env +++ b/config/config.env @@ -33,6 +33,9 @@ S3_GW_SERVER_1_TLS_ENABLED=true S3_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert S3_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key +# How often to reconnect to the servers +S3_GW_RECONNECT_INTERVAL: 1m + # Control API # List of hex-encoded public keys that have rights to use the Control Service S3_GW_CONTROL_AUTHORIZED_KEYS=035839e45d472a3b7769a2a1bd7d54c4ccd4943c3b40f547870e83a8fcbfb3ce11 028f42cfcb74499d7b15b35d9bff260a1c8d27de4f446a627406a382d8961486d6 diff --git a/config/config.yaml b/config/config.yaml index fe16150..16c8513 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -25,6 +25,8 @@ peers: priority: 2 weight: 0.9 +reconnect_interval: 1m + server: - address: 0.0.0.0:8080 tls: diff --git a/docs/configuration.md b/docs/configuration.md index c3327ff..280d9dc 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -218,6 +218,8 @@ max_clients_deadline: 30s allowed_access_key_id_prefixes: - Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX - 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn + +reconnect_interval: 1m ``` | Parameter | Type | SIGHUP reload | Default value | Description | @@ -233,6 +235,7 @@ allowed_access_key_id_prefixes: | `max_clients_count` | `int` | no | `100` | Limits for processing of clients' requests. | | `max_clients_deadline` | `duration` | no | `30s` | Deadline after which the gate sends error `RequestTimeout` to a client. | | `allowed_access_key_id_prefixes` | `[]string` | no | | List of allowed `AccessKeyID` prefixes which S3 GW serve. If the parameter is omitted, all `AccessKeyID` will be accepted. | +| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. | ### `wallet` section diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 3bed892..14a4c66 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -135,6 +135,9 @@ const ( ControlAPIGetPolicy = "get policy request" ControlAPIListPolicies = "list policies request" PolicyValidationFailed = "policy validation failed" + ServerReconnecting = "reconnecting server..." + ServerReconnectedSuccessfully = "server reconnected successfully" + ServerReconnectFailed = "failed to reconnect server" ParseTreeNode = "parse tree node" FailedToGetRealObjectSize = "failed to get real object size" CouldntDeleteObjectFromStorageContinueDeleting = "couldn't delete object from storage, continue deleting from tree" @@ -149,4 +152,5 @@ const ( InvalidBucketObjectLockEnabledHeader = "invalid X-Amz-Bucket-Object-Lock-Enabled header" InvalidTreeKV = "invalid tree service meta KV" FailedToWriteResponse = "failed to write response" + WarnDuplicateAddress = "duplicate address" )