Compare commits

..

4 commits

Author SHA1 Message Date
3ce244784c [#108] Update CHANGELOG.md
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-02-29 12:56:47 +03:00
20f3ba5cb4 [#108] Check query unescape errors
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-02-29 12:56:42 +03:00
59c358a80a [#108] Add return on error in tokenizer middleware
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-02-29 12:56:35 +03:00
602d3add66 [#108] Close server listener on error
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-02-29 12:56:31 +03:00
10 changed files with 25 additions and 266 deletions

View file

@ -8,15 +8,13 @@ This document outlines major changes between releases.
### Added
- Tree pool traversal limit (#92)
- Add new `reconnect_interval` config param (#100)
### Update from 0.28.0
See new `frostfs.tree_pool_max_attempts` config parameter.
### Fixed
- Fix possibility of panic during SIGHUP (#99)
- Handle query unescape and invalid bearer token errors (#107)
- Fix HTTP/2 requests (#110)
- Handle query unescape and invalid bearer token errors (#108)
### Added
- Support client side object cut (#70)

View file

@ -2,7 +2,6 @@ package main
import (
"context"
"errors"
"fmt"
"net/http"
"os"
@ -58,10 +57,7 @@ type (
metrics *gateMetrics
services []*metrics.Service
settings *appSettings
servers []Server
unbindServers []ServerInfo
mu sync.RWMutex
servers []Server
}
// App is an interface for the main gateway function.
@ -82,8 +78,6 @@ type (
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
appSettings struct {
reconnectInterval time.Duration
mu sync.RWMutex
defaultTimestamp bool
zipCompression bool
@ -205,9 +199,8 @@ func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
}
func (a *app) initAppSettings() {
a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg),
}
a.settings = &appSettings{}
a.updateSettings()
}
@ -406,22 +399,16 @@ func (a *app) Serve() {
a.startServices()
a.initServers(a.ctx)
servs := a.getServers()
for i := range servs {
for i := range a.servers {
go func(i int) {
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address()))
if err := a.webServer.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(a.servers[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
}
}(i)
}
if len(a.unbindServers) != 0 {
a.scheduleReconnect(a.ctx, a.webServer)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
@ -611,7 +598,7 @@ func (a *app) AppParams() *utils.AppParams {
}
func (a *app) initServers(ctx context.Context) {
serversInfo := fetchServers(a.cfg, a.log)
serversInfo := fetchServers(a.cfg)
a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo {
@ -621,7 +608,6 @@ func (a *app) initServers(ctx context.Context) {
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.unbindServers = append(a.unbindServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
continue
@ -638,24 +624,21 @@ func (a *app) initServers(ctx context.Context) {
}
func (a *app) updateServers() error {
serversInfo := fetchServers(a.cfg, a.log)
a.mu.Lock()
defer a.mu.Unlock()
serversInfo := fetchServers(a.cfg)
var found bool
for _, serverInfo := range serversInfo {
ser := a.getServer(serverInfo.Address)
if ser != nil {
if serverInfo.TLS.Enabled {
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
}
found = true
}
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
found = true
index := a.serverIndex(serverInfo.Address)
if index == -1 {
continue
}
if serverInfo.TLS.Enabled {
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
}
}
found = true
}
if !found {
@ -665,29 +648,13 @@ func (a *app) updateServers() error {
return nil
}
func (a *app) getServers() []Server {
a.mu.RLock()
defer a.mu.RUnlock()
return a.servers
}
func (a *app) getServer(address string) Server {
func (a *app) serverIndex(address string) int {
for i := range a.servers {
if a.servers[i].Address() == address {
return a.servers[i]
return i
}
}
return nil
}
func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
for i := range a.unbindServers {
if a.unbindServers[i].Address == info.Address {
a.unbindServers[i] = info
return true
}
}
return false
return -1
}
func (a *app) initTracing(ctx context.Context) {
@ -760,60 +727,3 @@ func (s *appSettings) setDefaultNamespaces(namespaces []string) {
s.defaultNamespaces = namespaces
s.mu.Unlock()
}
func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
go func() {
t := time.NewTicker(a.settings.reconnectInterval)
defer t.Stop()
for {
select {
case <-t.C:
if a.tryReconnect(ctx, srv) {
return
}
t.Reset(a.settings.reconnectInterval)
case <-ctx.Done():
return
}
}
}()
}
func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
a.mu.Lock()
defer a.mu.Unlock()
a.log.Info(logs.ServerReconnecting)
var failedServers []ServerInfo
for _, serverInfo := range a.unbindServers {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
failedServers = append(failedServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
continue
}
go func() {
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
a.metrics.MarkHealthy(serverInfo.Address)
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
a.log.Warn(logs.ListenAndServe, zap.Error(err))
a.metrics.MarkUnhealthy(serverInfo.Address)
}
}()
a.servers = append(a.servers, srv)
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
}
a.unbindServers = failedServers
return len(a.unbindServers) == 0
}

View file

@ -74,7 +74,6 @@ func newServer(ctx context.Context, serverInfo ServerInfo) (*server, error) {
ln = tls.NewListener(ln, &tls.Config{
GetCertificate: tlsProvider.GetCertificate,
NextProtos: []string{"h2"}, // required to enable HTTP/2 requests in `http.Serve`
})
}

View file

@ -1,119 +0,0 @@
package main
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math/big"
"net"
"net/http"
"os"
"path"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
)
const (
expHeaderKey = "Foo"
expHeaderValue = "Bar"
)
func TestHTTP2TLS(t *testing.T) {
ctx := context.Background()
certPath, keyPath := prepareTestCerts(t)
srv := &http.Server{
Handler: http.HandlerFunc(testHandler),
}
tlsListener, err := newServer(ctx, ServerInfo{
Address: ":0",
TLS: ServerTLSInfo{
Enabled: true,
CertFile: certPath,
KeyFile: keyPath,
},
})
require.NoError(t, err)
port := tlsListener.Listener().Addr().(*net.TCPAddr).Port
addr := fmt.Sprintf("https://localhost:%d", port)
go func() {
_ = srv.Serve(tlsListener.Listener())
}()
// Server is running, now send HTTP/2 request
tlsClientConfig := &tls.Config{
InsecureSkipVerify: true,
}
cliHTTP1 := http.Client{Transport: &http.Transport{TLSClientConfig: tlsClientConfig}}
cliHTTP2 := http.Client{Transport: &http2.Transport{TLSClientConfig: tlsClientConfig}}
req, err := http.NewRequest("GET", addr, nil)
require.NoError(t, err)
req.Header[expHeaderKey] = []string{expHeaderValue}
resp, err := cliHTTP1.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
resp, err = cliHTTP2.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}
func testHandler(resp http.ResponseWriter, req *http.Request) {
hdr, ok := req.Header[expHeaderKey]
if !ok || len(hdr) != 1 || hdr[0] != expHeaderValue {
resp.WriteHeader(http.StatusBadRequest)
} else {
resp.WriteHeader(http.StatusOK)
}
}
func prepareTestCerts(t *testing.T) (certPath, keyPath string) {
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
require.NoError(t, err)
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "localhost"},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365),
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
}
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
require.NoError(t, err)
dir := t.TempDir()
certPath = path.Join(dir, "cert.pem")
keyPath = path.Join(dir, "key.pem")
certFile, err := os.Create(certPath)
require.NoError(t, err)
defer certFile.Close()
keyFile, err := os.Create(keyPath)
require.NoError(t, err)
defer keyFile.Close()
err = pem.Encode(certFile, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
require.NoError(t, err)
err = pem.Encode(keyFile, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)})
require.NoError(t, err)
return certPath, keyPath
}

View file

@ -51,15 +51,11 @@ const (
defaultNamespaceHeader = "X-Frostfs-Namespace"
defaultReconnectInterval = time.Minute
cfgServer = "server"
cfgTLSEnabled = "tls.enabled"
cfgTLSCertFile = "tls.cert_file"
cfgTLSKeyFile = "tls.key_file"
cfgReconnectInterval = "reconnect_interval"
// Web.
cfgWebReadBufferSize = "web.read_buffer_size"
cfgWebWriteBufferSize = "web.write_buffer_size"
@ -458,18 +454,8 @@ func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
return lvl, nil
}
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
reconnect := cfg.GetDuration(cfgReconnectInterval)
if reconnect <= 0 {
reconnect = defaultReconnectInterval
}
return reconnect
}
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
func fetchServers(v *viper.Viper) []ServerInfo {
var servers []ServerInfo
seen := make(map[string]struct{})
for i := 0; ; i++ {
key := cfgServer + "." + strconv.Itoa(i) + "."
@ -484,11 +470,6 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
break
}
if _, ok := seen[serverInfo.Address]; ok {
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
continue
}
seen[serverInfo.Address] = struct{}{}
servers = append(servers, serverInfo)
}

View file

@ -26,9 +26,6 @@ HTTP_GW_SERVER_1_TLS_ENABLED=true
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
# How often to reconnect to the servers
HTTP_GW_RECONNECT_INTERVAL: 1m
# Nodes configuration.
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)

View file

@ -55,7 +55,6 @@ peers:
priority: 2
weight: 9
reconnect_interval: 1m
web:
# Per-connection buffer size for requests' reading.

View file

@ -72,7 +72,6 @@ stream_timeout: 10s
request_timeout: 5s
rebalance_timer: 30s
pool_error_threshold: 100
reconnect_interval: 1m
```
| Parameter | Type | SIGHUP reload | Default value | Description |
@ -84,7 +83,6 @@ reconnect_interval: 1m
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
# `wallet` section

2
go.mod
View file

@ -22,7 +22,6 @@ require (
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
golang.org/x/net v0.10.0
google.golang.org/grpc v1.55.0
)
@ -104,6 +103,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect

View file

@ -73,8 +73,4 @@ const (
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
FailedToUnescapeQuery = "failed to unescape query"
ServerReconnecting = "reconnecting server..."
ServerReconnectedSuccessfully = "server reconnected successfully"
ServerReconnectFailed = "failed to reconnect server"
WarnDuplicateAddress = "duplicate address"
)