Compare commits

..

8 commits

Author SHA1 Message Date
11965deb41 [#100] server auto re-binding
Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
2024-04-04 14:19:33 +03:00
a95dc6c8c7 [#110] Update CHANGELOG
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2024-03-27 19:26:37 +03:00
f39b3aa93a [#110] Add "h2" as next proto to allow HTTP/2 requests in http.Serve
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2024-03-27 19:25:45 +03:00
6695ebe5a0 [#110] Test HTTP/2 requests
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2024-03-27 19:25:34 +03:00
c6383fc135 [#107] Update CHANGELOG.md
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-02-29 12:52:52 +03:00
5ded105c09 [#107] Check query unescape errors
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-02-29 12:50:56 +03:00
88e32ddd7f [#107] Add return on error in tokenizer middleware
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-02-29 12:30:33 +03:00
007d278caa [#107] Close server listener on error
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-02-29 12:14:37 +03:00
10 changed files with 265 additions and 24 deletions

View file

@ -8,13 +8,15 @@ This document outlines major changes between releases.
### Added ### Added
- Tree pool traversal limit (#92) - Tree pool traversal limit (#92)
- Add new `reconnect_interval` config param (#100)
### Update from 0.28.0 ### Update from 0.28.0
See new `frostfs.tree_pool_max_attempts` config parameter. See new `frostfs.tree_pool_max_attempts` config parameter.
### Fixed ### Fixed
- Fix possibility of panic during SIGHUP (#99) - Fix possibility of panic during SIGHUP (#99)
- Handle query unescape and invalid bearer token errors (#108) - Handle query unescape and invalid bearer token errors (#107)
- Fix HTTP/2 requests (#110)
### Added ### Added
- Support client side object cut (#70) - Support client side object cut (#70)

View file

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"os" "os"
@ -57,7 +58,10 @@ type (
metrics *gateMetrics metrics *gateMetrics
services []*metrics.Service services []*metrics.Service
settings *appSettings settings *appSettings
servers []Server
servers []Server
unbindServers []ServerInfo
mu sync.RWMutex
} }
// App is an interface for the main gateway function. // App is an interface for the main gateway function.
@ -78,6 +82,8 @@ type (
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex. // appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
appSettings struct { appSettings struct {
reconnectInterval time.Duration
mu sync.RWMutex mu sync.RWMutex
defaultTimestamp bool defaultTimestamp bool
zipCompression bool zipCompression bool
@ -199,8 +205,9 @@ func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
} }
func (a *app) initAppSettings() { func (a *app) initAppSettings() {
a.settings = &appSettings{} a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg),
}
a.updateSettings() a.updateSettings()
} }
@ -399,16 +406,22 @@ func (a *app) Serve() {
a.startServices() a.startServices()
a.initServers(a.ctx) a.initServers(a.ctx)
for i := range a.servers { servs := a.getServers()
for i := range servs {
go func(i int) { go func(i int) {
a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address())) a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
if err := a.webServer.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed { if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(a.servers[i].Address()) a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err)) a.log.Fatal(logs.ListenAndServe, zap.Error(err))
} }
}(i) }(i)
} }
if len(a.unbindServers) != 0 {
a.scheduleReconnect(a.ctx, a.webServer)
}
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP) signal.Notify(sigs, syscall.SIGHUP)
@ -598,7 +611,7 @@ func (a *app) AppParams() *utils.AppParams {
} }
func (a *app) initServers(ctx context.Context) { func (a *app) initServers(ctx context.Context) {
serversInfo := fetchServers(a.cfg) serversInfo := fetchServers(a.cfg, a.log)
a.servers = make([]Server, 0, len(serversInfo)) a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo { for _, serverInfo := range serversInfo {
@ -608,6 +621,7 @@ func (a *app) initServers(ctx context.Context) {
} }
srv, err := newServer(ctx, serverInfo) srv, err := newServer(ctx, serverInfo)
if err != nil { if err != nil {
a.unbindServers = append(a.unbindServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address) a.metrics.MarkUnhealthy(serverInfo.Address)
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...) a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
continue continue
@ -624,21 +638,24 @@ func (a *app) initServers(ctx context.Context) {
} }
func (a *app) updateServers() error { func (a *app) updateServers() error {
serversInfo := fetchServers(a.cfg) serversInfo := fetchServers(a.cfg, a.log)
a.mu.Lock()
defer a.mu.Unlock()
var found bool var found bool
for _, serverInfo := range serversInfo { for _, serverInfo := range serversInfo {
index := a.serverIndex(serverInfo.Address) ser := a.getServer(serverInfo.Address)
if index == -1 { if ser != nil {
continue if serverInfo.TLS.Enabled {
} if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
if serverInfo.TLS.Enabled { }
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil { found = true
return fmt.Errorf("failed to update tls certs: %w", err)
} }
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
found = true
} }
found = true
} }
if !found { if !found {
@ -648,13 +665,29 @@ func (a *app) updateServers() error {
return nil return nil
} }
func (a *app) serverIndex(address string) int { func (a *app) getServers() []Server {
a.mu.RLock()
defer a.mu.RUnlock()
return a.servers
}
func (a *app) getServer(address string) Server {
for i := range a.servers { for i := range a.servers {
if a.servers[i].Address() == address { if a.servers[i].Address() == address {
return i return a.servers[i]
} }
} }
return -1 return nil
}
func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
for i := range a.unbindServers {
if a.unbindServers[i].Address == info.Address {
a.unbindServers[i] = info
return true
}
}
return false
} }
func (a *app) initTracing(ctx context.Context) { func (a *app) initTracing(ctx context.Context) {
@ -727,3 +760,60 @@ func (s *appSettings) setDefaultNamespaces(namespaces []string) {
s.defaultNamespaces = namespaces s.defaultNamespaces = namespaces
s.mu.Unlock() s.mu.Unlock()
} }
func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
go func() {
t := time.NewTicker(a.settings.reconnectInterval)
defer t.Stop()
for {
select {
case <-t.C:
if a.tryReconnect(ctx, srv) {
return
}
t.Reset(a.settings.reconnectInterval)
case <-ctx.Done():
return
}
}
}()
}
func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
a.mu.Lock()
defer a.mu.Unlock()
a.log.Info(logs.ServerReconnecting)
var failedServers []ServerInfo
for _, serverInfo := range a.unbindServers {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
failedServers = append(failedServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
continue
}
go func() {
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
a.metrics.MarkHealthy(serverInfo.Address)
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
a.log.Warn(logs.ListenAndServe, zap.Error(err))
a.metrics.MarkUnhealthy(serverInfo.Address)
}
}()
a.servers = append(a.servers, srv)
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
}
a.unbindServers = failedServers
return len(a.unbindServers) == 0
}

View file

@ -74,6 +74,7 @@ func newServer(ctx context.Context, serverInfo ServerInfo) (*server, error) {
ln = tls.NewListener(ln, &tls.Config{ ln = tls.NewListener(ln, &tls.Config{
GetCertificate: tlsProvider.GetCertificate, GetCertificate: tlsProvider.GetCertificate,
NextProtos: []string{"h2"}, // required to enable HTTP/2 requests in `http.Serve`
}) })
} }

119
cmd/http-gw/server_test.go Normal file
View file

@ -0,0 +1,119 @@
package main
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math/big"
"net"
"net/http"
"os"
"path"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
)
const (
expHeaderKey = "Foo"
expHeaderValue = "Bar"
)
func TestHTTP2TLS(t *testing.T) {
ctx := context.Background()
certPath, keyPath := prepareTestCerts(t)
srv := &http.Server{
Handler: http.HandlerFunc(testHandler),
}
tlsListener, err := newServer(ctx, ServerInfo{
Address: ":0",
TLS: ServerTLSInfo{
Enabled: true,
CertFile: certPath,
KeyFile: keyPath,
},
})
require.NoError(t, err)
port := tlsListener.Listener().Addr().(*net.TCPAddr).Port
addr := fmt.Sprintf("https://localhost:%d", port)
go func() {
_ = srv.Serve(tlsListener.Listener())
}()
// Server is running, now send HTTP/2 request
tlsClientConfig := &tls.Config{
InsecureSkipVerify: true,
}
cliHTTP1 := http.Client{Transport: &http.Transport{TLSClientConfig: tlsClientConfig}}
cliHTTP2 := http.Client{Transport: &http2.Transport{TLSClientConfig: tlsClientConfig}}
req, err := http.NewRequest("GET", addr, nil)
require.NoError(t, err)
req.Header[expHeaderKey] = []string{expHeaderValue}
resp, err := cliHTTP1.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
resp, err = cliHTTP2.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}
func testHandler(resp http.ResponseWriter, req *http.Request) {
hdr, ok := req.Header[expHeaderKey]
if !ok || len(hdr) != 1 || hdr[0] != expHeaderValue {
resp.WriteHeader(http.StatusBadRequest)
} else {
resp.WriteHeader(http.StatusOK)
}
}
func prepareTestCerts(t *testing.T) (certPath, keyPath string) {
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
require.NoError(t, err)
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "localhost"},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365),
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
}
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
require.NoError(t, err)
dir := t.TempDir()
certPath = path.Join(dir, "cert.pem")
keyPath = path.Join(dir, "key.pem")
certFile, err := os.Create(certPath)
require.NoError(t, err)
defer certFile.Close()
keyFile, err := os.Create(keyPath)
require.NoError(t, err)
defer keyFile.Close()
err = pem.Encode(certFile, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
require.NoError(t, err)
err = pem.Encode(keyFile, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)})
require.NoError(t, err)
return certPath, keyPath
}

View file

@ -51,11 +51,15 @@ const (
defaultNamespaceHeader = "X-Frostfs-Namespace" defaultNamespaceHeader = "X-Frostfs-Namespace"
defaultReconnectInterval = time.Minute
cfgServer = "server" cfgServer = "server"
cfgTLSEnabled = "tls.enabled" cfgTLSEnabled = "tls.enabled"
cfgTLSCertFile = "tls.cert_file" cfgTLSCertFile = "tls.cert_file"
cfgTLSKeyFile = "tls.key_file" cfgTLSKeyFile = "tls.key_file"
cfgReconnectInterval = "reconnect_interval"
// Web. // Web.
cfgWebReadBufferSize = "web.read_buffer_size" cfgWebReadBufferSize = "web.read_buffer_size"
cfgWebWriteBufferSize = "web.write_buffer_size" cfgWebWriteBufferSize = "web.write_buffer_size"
@ -454,8 +458,18 @@ func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
return lvl, nil return lvl, nil
} }
func fetchServers(v *viper.Viper) []ServerInfo { func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
reconnect := cfg.GetDuration(cfgReconnectInterval)
if reconnect <= 0 {
reconnect = defaultReconnectInterval
}
return reconnect
}
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
var servers []ServerInfo var servers []ServerInfo
seen := make(map[string]struct{})
for i := 0; ; i++ { for i := 0; ; i++ {
key := cfgServer + "." + strconv.Itoa(i) + "." key := cfgServer + "." + strconv.Itoa(i) + "."
@ -470,6 +484,11 @@ func fetchServers(v *viper.Viper) []ServerInfo {
break break
} }
if _, ok := seen[serverInfo.Address]; ok {
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
continue
}
seen[serverInfo.Address] = struct{}{}
servers = append(servers, serverInfo) servers = append(servers, serverInfo)
} }

View file

@ -26,6 +26,9 @@ HTTP_GW_SERVER_1_TLS_ENABLED=true
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
# How often to reconnect to the servers
HTTP_GW_RECONNECT_INTERVAL: 1m
# Nodes configuration. # Nodes configuration.
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080) # This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080) # while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)

View file

@ -55,6 +55,7 @@ peers:
priority: 2 priority: 2
weight: 9 weight: 9
reconnect_interval: 1m
web: web:
# Per-connection buffer size for requests' reading. # Per-connection buffer size for requests' reading.

View file

@ -72,6 +72,7 @@ stream_timeout: 10s
request_timeout: 5s request_timeout: 5s
rebalance_timer: 30s rebalance_timer: 30s
pool_error_threshold: 100 pool_error_threshold: 100
reconnect_interval: 1m
``` ```
| Parameter | Type | SIGHUP reload | Default value | Description | | Parameter | Type | SIGHUP reload | Default value | Description |
@ -83,6 +84,7 @@ pool_error_threshold: 100
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. | | `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. | | `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. | | `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
# `wallet` section # `wallet` section

2
go.mod
View file

@ -22,6 +22,7 @@ require (
go.opentelemetry.io/otel/trace v1.16.0 go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
golang.org/x/net v0.10.0
google.golang.org/grpc v1.55.0 google.golang.org/grpc v1.55.0
) )
@ -103,7 +104,6 @@ require (
go.uber.org/atomic v1.10.0 // indirect go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect golang.org/x/term v0.8.0 // indirect

View file

@ -73,4 +73,8 @@ const (
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
FailedToUnescapeQuery = "failed to unescape query" FailedToUnescapeQuery = "failed to unescape query"
ServerReconnecting = "reconnecting server..."
ServerReconnectedSuccessfully = "server reconnected successfully"
ServerReconnectFailed = "failed to reconnect server"
WarnDuplicateAddress = "duplicate address"
) )