forked from TrueCloudLab/frostfs-http-gw
Compare commits
8 commits
support/v0
...
master
Author | SHA1 | Date | |
---|---|---|---|
11965deb41 | |||
a95dc6c8c7 | |||
f39b3aa93a | |||
6695ebe5a0 | |||
c6383fc135 | |||
5ded105c09 | |||
88e32ddd7f | |||
007d278caa |
10 changed files with 265 additions and 24 deletions
|
@ -8,13 +8,15 @@ This document outlines major changes between releases.
|
|||
|
||||
### Added
|
||||
- Tree pool traversal limit (#92)
|
||||
- Add new `reconnect_interval` config param (#100)
|
||||
|
||||
### Update from 0.28.0
|
||||
See new `frostfs.tree_pool_max_attempts` config parameter.
|
||||
|
||||
### Fixed
|
||||
- Fix possibility of panic during SIGHUP (#99)
|
||||
- Handle query unescape and invalid bearer token errors (#108)
|
||||
- Handle query unescape and invalid bearer token errors (#107)
|
||||
- Fix HTTP/2 requests (#110)
|
||||
|
||||
### Added
|
||||
- Support client side object cut (#70)
|
||||
|
|
|
@ -2,6 +2,7 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
|
@ -57,7 +58,10 @@ type (
|
|||
metrics *gateMetrics
|
||||
services []*metrics.Service
|
||||
settings *appSettings
|
||||
servers []Server
|
||||
|
||||
servers []Server
|
||||
unbindServers []ServerInfo
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
// App is an interface for the main gateway function.
|
||||
|
@ -78,6 +82,8 @@ type (
|
|||
|
||||
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
|
||||
appSettings struct {
|
||||
reconnectInterval time.Duration
|
||||
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
zipCompression bool
|
||||
|
@ -199,8 +205,9 @@ func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
|
|||
}
|
||||
|
||||
func (a *app) initAppSettings() {
|
||||
a.settings = &appSettings{}
|
||||
|
||||
a.settings = &appSettings{
|
||||
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||
}
|
||||
a.updateSettings()
|
||||
}
|
||||
|
||||
|
@ -399,16 +406,22 @@ func (a *app) Serve() {
|
|||
a.startServices()
|
||||
a.initServers(a.ctx)
|
||||
|
||||
for i := range a.servers {
|
||||
servs := a.getServers()
|
||||
|
||||
for i := range servs {
|
||||
go func(i int) {
|
||||
a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address()))
|
||||
if err := a.webServer.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
|
||||
a.metrics.MarkUnhealthy(a.servers[i].Address())
|
||||
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
|
||||
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
|
||||
a.metrics.MarkUnhealthy(servs[i].Address())
|
||||
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
if len(a.unbindServers) != 0 {
|
||||
a.scheduleReconnect(a.ctx, a.webServer)
|
||||
}
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGHUP)
|
||||
|
||||
|
@ -598,7 +611,7 @@ func (a *app) AppParams() *utils.AppParams {
|
|||
}
|
||||
|
||||
func (a *app) initServers(ctx context.Context) {
|
||||
serversInfo := fetchServers(a.cfg)
|
||||
serversInfo := fetchServers(a.cfg, a.log)
|
||||
|
||||
a.servers = make([]Server, 0, len(serversInfo))
|
||||
for _, serverInfo := range serversInfo {
|
||||
|
@ -608,6 +621,7 @@ func (a *app) initServers(ctx context.Context) {
|
|||
}
|
||||
srv, err := newServer(ctx, serverInfo)
|
||||
if err != nil {
|
||||
a.unbindServers = append(a.unbindServers, serverInfo)
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
|
||||
continue
|
||||
|
@ -624,21 +638,24 @@ func (a *app) initServers(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (a *app) updateServers() error {
|
||||
serversInfo := fetchServers(a.cfg)
|
||||
serversInfo := fetchServers(a.cfg, a.log)
|
||||
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
var found bool
|
||||
for _, serverInfo := range serversInfo {
|
||||
index := a.serverIndex(serverInfo.Address)
|
||||
if index == -1 {
|
||||
continue
|
||||
}
|
||||
|
||||
if serverInfo.TLS.Enabled {
|
||||
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
||||
return fmt.Errorf("failed to update tls certs: %w", err)
|
||||
ser := a.getServer(serverInfo.Address)
|
||||
if ser != nil {
|
||||
if serverInfo.TLS.Enabled {
|
||||
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
||||
return fmt.Errorf("failed to update tls certs: %w", err)
|
||||
}
|
||||
found = true
|
||||
}
|
||||
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
|
||||
found = true
|
||||
}
|
||||
found = true
|
||||
}
|
||||
|
||||
if !found {
|
||||
|
@ -648,13 +665,29 @@ func (a *app) updateServers() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *app) serverIndex(address string) int {
|
||||
func (a *app) getServers() []Server {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
return a.servers
|
||||
}
|
||||
|
||||
func (a *app) getServer(address string) Server {
|
||||
for i := range a.servers {
|
||||
if a.servers[i].Address() == address {
|
||||
return i
|
||||
return a.servers[i]
|
||||
}
|
||||
}
|
||||
return -1
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
|
||||
for i := range a.unbindServers {
|
||||
if a.unbindServers[i].Address == info.Address {
|
||||
a.unbindServers[i] = info
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (a *app) initTracing(ctx context.Context) {
|
||||
|
@ -727,3 +760,60 @@ func (s *appSettings) setDefaultNamespaces(namespaces []string) {
|
|||
s.defaultNamespaces = namespaces
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
|
||||
go func() {
|
||||
t := time.NewTicker(a.settings.reconnectInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
if a.tryReconnect(ctx, srv) {
|
||||
return
|
||||
}
|
||||
t.Reset(a.settings.reconnectInterval)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
a.log.Info(logs.ServerReconnecting)
|
||||
var failedServers []ServerInfo
|
||||
|
||||
for _, serverInfo := range a.unbindServers {
|
||||
fields := []zap.Field{
|
||||
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||
}
|
||||
|
||||
srv, err := newServer(ctx, serverInfo)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
|
||||
failedServers = append(failedServers, serverInfo)
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
continue
|
||||
}
|
||||
|
||||
go func() {
|
||||
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
|
||||
a.metrics.MarkHealthy(serverInfo.Address)
|
||||
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
a.log.Warn(logs.ListenAndServe, zap.Error(err))
|
||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||
}
|
||||
}()
|
||||
|
||||
a.servers = append(a.servers, srv)
|
||||
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
|
||||
}
|
||||
|
||||
a.unbindServers = failedServers
|
||||
|
||||
return len(a.unbindServers) == 0
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ func newServer(ctx context.Context, serverInfo ServerInfo) (*server, error) {
|
|||
|
||||
ln = tls.NewListener(ln, &tls.Config{
|
||||
GetCertificate: tlsProvider.GetCertificate,
|
||||
NextProtos: []string{"h2"}, // required to enable HTTP/2 requests in `http.Serve`
|
||||
})
|
||||
}
|
||||
|
||||
|
|
119
cmd/http-gw/server_test.go
Normal file
119
cmd/http-gw/server_test.go
Normal file
|
@ -0,0 +1,119 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/rsa"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
const (
|
||||
expHeaderKey = "Foo"
|
||||
expHeaderValue = "Bar"
|
||||
)
|
||||
|
||||
func TestHTTP2TLS(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
certPath, keyPath := prepareTestCerts(t)
|
||||
|
||||
srv := &http.Server{
|
||||
Handler: http.HandlerFunc(testHandler),
|
||||
}
|
||||
|
||||
tlsListener, err := newServer(ctx, ServerInfo{
|
||||
Address: ":0",
|
||||
TLS: ServerTLSInfo{
|
||||
Enabled: true,
|
||||
CertFile: certPath,
|
||||
KeyFile: keyPath,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
port := tlsListener.Listener().Addr().(*net.TCPAddr).Port
|
||||
addr := fmt.Sprintf("https://localhost:%d", port)
|
||||
|
||||
go func() {
|
||||
_ = srv.Serve(tlsListener.Listener())
|
||||
}()
|
||||
|
||||
// Server is running, now send HTTP/2 request
|
||||
|
||||
tlsClientConfig := &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
|
||||
cliHTTP1 := http.Client{Transport: &http.Transport{TLSClientConfig: tlsClientConfig}}
|
||||
cliHTTP2 := http.Client{Transport: &http2.Transport{TLSClientConfig: tlsClientConfig}}
|
||||
|
||||
req, err := http.NewRequest("GET", addr, nil)
|
||||
require.NoError(t, err)
|
||||
req.Header[expHeaderKey] = []string{expHeaderValue}
|
||||
|
||||
resp, err := cliHTTP1.Do(req)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
|
||||
resp, err = cliHTTP2.Do(req)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
}
|
||||
|
||||
func testHandler(resp http.ResponseWriter, req *http.Request) {
|
||||
hdr, ok := req.Header[expHeaderKey]
|
||||
if !ok || len(hdr) != 1 || hdr[0] != expHeaderValue {
|
||||
resp.WriteHeader(http.StatusBadRequest)
|
||||
} else {
|
||||
resp.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}
|
||||
|
||||
func prepareTestCerts(t *testing.T) (certPath, keyPath string) {
|
||||
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
|
||||
require.NoError(t, err)
|
||||
|
||||
template := x509.Certificate{
|
||||
SerialNumber: big.NewInt(1),
|
||||
Subject: pkix.Name{CommonName: "localhost"},
|
||||
NotBefore: time.Now(),
|
||||
NotAfter: time.Now().Add(time.Hour * 24 * 365),
|
||||
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
||||
BasicConstraintsValid: true,
|
||||
}
|
||||
|
||||
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
dir := t.TempDir()
|
||||
certPath = path.Join(dir, "cert.pem")
|
||||
keyPath = path.Join(dir, "key.pem")
|
||||
|
||||
certFile, err := os.Create(certPath)
|
||||
require.NoError(t, err)
|
||||
defer certFile.Close()
|
||||
|
||||
keyFile, err := os.Create(keyPath)
|
||||
require.NoError(t, err)
|
||||
defer keyFile.Close()
|
||||
|
||||
err = pem.Encode(certFile, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = pem.Encode(keyFile, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)})
|
||||
require.NoError(t, err)
|
||||
|
||||
return certPath, keyPath
|
||||
}
|
|
@ -51,11 +51,15 @@ const (
|
|||
|
||||
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
||||
|
||||
defaultReconnectInterval = time.Minute
|
||||
|
||||
cfgServer = "server"
|
||||
cfgTLSEnabled = "tls.enabled"
|
||||
cfgTLSCertFile = "tls.cert_file"
|
||||
cfgTLSKeyFile = "tls.key_file"
|
||||
|
||||
cfgReconnectInterval = "reconnect_interval"
|
||||
|
||||
// Web.
|
||||
cfgWebReadBufferSize = "web.read_buffer_size"
|
||||
cfgWebWriteBufferSize = "web.write_buffer_size"
|
||||
|
@ -454,8 +458,18 @@ func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
|||
return lvl, nil
|
||||
}
|
||||
|
||||
func fetchServers(v *viper.Viper) []ServerInfo {
|
||||
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
||||
reconnect := cfg.GetDuration(cfgReconnectInterval)
|
||||
if reconnect <= 0 {
|
||||
reconnect = defaultReconnectInterval
|
||||
}
|
||||
|
||||
return reconnect
|
||||
}
|
||||
|
||||
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
||||
var servers []ServerInfo
|
||||
seen := make(map[string]struct{})
|
||||
|
||||
for i := 0; ; i++ {
|
||||
key := cfgServer + "." + strconv.Itoa(i) + "."
|
||||
|
@ -470,6 +484,11 @@ func fetchServers(v *viper.Viper) []ServerInfo {
|
|||
break
|
||||
}
|
||||
|
||||
if _, ok := seen[serverInfo.Address]; ok {
|
||||
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
|
||||
continue
|
||||
}
|
||||
seen[serverInfo.Address] = struct{}{}
|
||||
servers = append(servers, serverInfo)
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,9 @@ HTTP_GW_SERVER_1_TLS_ENABLED=true
|
|||
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
|
||||
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
|
||||
|
||||
# How often to reconnect to the servers
|
||||
HTTP_GW_RECONNECT_INTERVAL: 1m
|
||||
|
||||
# Nodes configuration.
|
||||
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
|
||||
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)
|
||||
|
|
|
@ -55,6 +55,7 @@ peers:
|
|||
priority: 2
|
||||
weight: 9
|
||||
|
||||
reconnect_interval: 1m
|
||||
|
||||
web:
|
||||
# Per-connection buffer size for requests' reading.
|
||||
|
|
|
@ -72,6 +72,7 @@ stream_timeout: 10s
|
|||
request_timeout: 5s
|
||||
rebalance_timer: 30s
|
||||
pool_error_threshold: 100
|
||||
reconnect_interval: 1m
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|
@ -83,6 +84,7 @@ pool_error_threshold: 100
|
|||
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
||||
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
||||
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
||||
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
|
||||
|
||||
# `wallet` section
|
||||
|
||||
|
|
2
go.mod
2
go.mod
|
@ -22,6 +22,7 @@ require (
|
|||
go.opentelemetry.io/otel/trace v1.16.0
|
||||
go.uber.org/zap v1.24.0
|
||||
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
|
||||
golang.org/x/net v0.10.0
|
||||
google.golang.org/grpc v1.55.0
|
||||
)
|
||||
|
||||
|
@ -103,7 +104,6 @@ require (
|
|||
go.uber.org/atomic v1.10.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.9.0 // indirect
|
||||
golang.org/x/net v0.10.0 // indirect
|
||||
golang.org/x/sync v0.2.0 // indirect
|
||||
golang.org/x/sys v0.8.0 // indirect
|
||||
golang.org/x/term v0.8.0 // indirect
|
||||
|
|
|
@ -73,4 +73,8 @@ const (
|
|||
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
|
||||
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
|
||||
FailedToUnescapeQuery = "failed to unescape query"
|
||||
ServerReconnecting = "reconnecting server..."
|
||||
ServerReconnectedSuccessfully = "server reconnected successfully"
|
||||
ServerReconnectFailed = "failed to reconnect server"
|
||||
WarnDuplicateAddress = "duplicate address"
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue