Merge pull request #4792 from restic/request-watchdog

backend: enforce that backend HTTP requests make progress
This commit is contained in:
Michael Eischer 2024-05-09 23:55:30 +02:00 committed by GitHub
commit e4f9bce384
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 335 additions and 2 deletions

View file

@ -4,5 +4,14 @@ Restic now downloads pack files in large chunks instead of using a streaming
download. This prevents failures due to interrupted streams. The `restore` download. This prevents failures due to interrupted streams. The `restore`
command now also retries downloading individual blobs that cannot be retrieved. command now also retries downloading individual blobs that cannot be retrieved.
HTTP requests that are stuck for more than two minutes while uploading or
downloading are now forcibly interrupted. This ensures that stuck requests are
retried after a short timeout. These new request timeouts can temporarily be
disabled by setting the environment variable
`RESTIC_FEATURES=http-timeouts=false`. Note that this feature flag will be
removed in the next minor restic version.
https://github.com/restic/restic/issues/4627 https://github.com/restic/restic/issues/4627
https://github.com/restic/restic/issues/4193
https://github.com/restic/restic/pull/4605 https://github.com/restic/restic/pull/4605
https://github.com/restic/restic/pull/4792

View file

@ -13,6 +13,8 @@ import (
"github.com/peterbourgon/unixtransport" "github.com/peterbourgon/unixtransport"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/feature"
"golang.org/x/net/http2"
) )
// TransportOptions collects various options which can be set for an HTTP based // TransportOptions collects various options which can be set for an HTTP based
@ -74,7 +76,6 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) {
KeepAlive: 30 * time.Second, KeepAlive: 30 * time.Second,
DualStack: true, DualStack: true,
}).DialContext, }).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100, MaxIdleConns: 100,
MaxIdleConnsPerHost: 100, MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second, IdleConnTimeout: 90 * time.Second,
@ -83,6 +84,17 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) {
TLSClientConfig: &tls.Config{}, TLSClientConfig: &tls.Config{},
} }
// ensure that http2 connections are closed if they are broken
h2, err := http2.ConfigureTransports(tr)
if err != nil {
panic(err)
}
if feature.Flag.Enabled(feature.HTTPTimeouts) {
h2.WriteByteTimeout = 120 * time.Second
h2.ReadIdleTimeout = 60 * time.Second
h2.PingTimeout = 60 * time.Second
}
unixtransport.Register(tr) unixtransport.Register(tr)
if opts.InsecureTLS { if opts.InsecureTLS {
@ -119,6 +131,11 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) {
tr.TLSClientConfig.RootCAs = pool tr.TLSClientConfig.RootCAs = pool
} }
rt := http.RoundTripper(tr)
if feature.Flag.Enabled(feature.HTTPTimeouts) {
rt = newWatchdogRoundtripper(rt, 120*time.Second, 128*1024)
}
// wrap in the debug round tripper (if active) // wrap in the debug round tripper (if active)
return debug.RoundTripper(tr), nil return debug.RoundTripper(rt), nil
} }

View file

@ -0,0 +1,104 @@
package backend
import (
"context"
"io"
"net/http"
"time"
)
// watchdogRoundtripper cancels an http request if an upload or download did not make progress
// within timeout. The time between fully sending the request and receiving an response is also
// limited by this timeout. This ensures that stuck requests are cancelled after some time.
//
// The roundtriper makes the assumption that the upload and download happen continuously. In particular,
// the caller must not make long pauses between individual read requests from the response body.
type watchdogRoundtripper struct {
rt http.RoundTripper
timeout time.Duration
chunkSize int
}
var _ http.RoundTripper = &watchdogRoundtripper{}
func newWatchdogRoundtripper(rt http.RoundTripper, timeout time.Duration, chunkSize int) *watchdogRoundtripper {
return &watchdogRoundtripper{
rt: rt,
timeout: timeout,
chunkSize: chunkSize,
}
}
func (w *watchdogRoundtripper) RoundTrip(req *http.Request) (*http.Response, error) {
timer := time.NewTimer(w.timeout)
ctx, cancel := context.WithCancel(req.Context())
// cancel context if timer expires
go func() {
defer timer.Stop()
select {
case <-timer.C:
cancel()
case <-ctx.Done():
}
}()
kick := func() {
timer.Reset(w.timeout)
}
req = req.Clone(ctx)
if req.Body != nil {
// kick watchdog timer as long as uploading makes progress
req.Body = newWatchdogReadCloser(req.Body, w.chunkSize, kick, nil)
}
resp, err := w.rt.RoundTrip(req)
if err != nil {
return nil, err
}
// kick watchdog timer as long as downloading makes progress
// cancel context to stop goroutine once response body is closed
resp.Body = newWatchdogReadCloser(resp.Body, w.chunkSize, kick, cancel)
return resp, nil
}
func newWatchdogReadCloser(rc io.ReadCloser, chunkSize int, kick func(), close func()) *watchdogReadCloser {
return &watchdogReadCloser{
rc: rc,
chunkSize: chunkSize,
kick: kick,
close: close,
}
}
type watchdogReadCloser struct {
rc io.ReadCloser
chunkSize int
kick func()
close func()
}
var _ io.ReadCloser = &watchdogReadCloser{}
func (w *watchdogReadCloser) Read(p []byte) (n int, err error) {
w.kick()
// Read is not required to fill the whole passed in byte slice
// Thus, keep things simple and just stay within our chunkSize.
if len(p) > w.chunkSize {
p = p[:w.chunkSize]
}
n, err = w.rc.Read(p)
w.kick()
return n, err
}
func (w *watchdogReadCloser) Close() error {
if w.close != nil {
w.close()
}
return w.rc.Close()
}

View file

@ -0,0 +1,201 @@
package backend
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"
rtest "github.com/restic/restic/internal/test"
)
func TestRead(t *testing.T) {
data := []byte("abcdef")
var ctr int
kick := func() {
ctr++
}
var closed bool
onClose := func() {
closed = true
}
wd := newWatchdogReadCloser(io.NopCloser(bytes.NewReader(data)), 1, kick, onClose)
out, err := io.ReadAll(wd)
rtest.OK(t, err)
rtest.Equals(t, data, out, "data mismatch")
// the EOF read also triggers the kick function
rtest.Equals(t, len(data)*2+2, ctr, "unexpected number of kick calls")
rtest.Equals(t, false, closed, "close function called too early")
rtest.OK(t, wd.Close())
rtest.Equals(t, true, closed, "close function not called")
}
func TestRoundtrip(t *testing.T) {
t.Parallel()
// at the higher delay values, it takes longer to transmit the request/response body
// than the roundTripper timeout
for _, delay := range []int{0, 1, 10, 20} {
t.Run(fmt.Sprintf("%v", delay), func(t *testing.T) {
msg := []byte("ping-pong-data")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
// slowly send the reply
for len(data) >= 2 {
_, _ = w.Write(data[:2])
w.(http.Flusher).Flush()
data = data[2:]
time.Sleep(time.Duration(delay) * time.Millisecond)
}
_, _ = w.Write(data)
}))
defer srv.Close()
rt := newWatchdogRoundtripper(http.DefaultTransport, 50*time.Millisecond, 2)
req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(newSlowReader(bytes.NewReader(msg), time.Duration(delay)*time.Millisecond)))
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.OK(t, err)
rtest.Equals(t, 200, resp.StatusCode, "unexpected status code")
response, err := io.ReadAll(resp.Body)
rtest.OK(t, err)
rtest.Equals(t, msg, response, "unexpected response")
rtest.OK(t, resp.Body.Close())
})
}
}
func TestCanceledRoundtrip(t *testing.T) {
rt := newWatchdogRoundtripper(http.DefaultTransport, time.Second, 2)
ctx, cancel := context.WithCancel(context.Background())
cancel()
req, err := http.NewRequestWithContext(ctx, "GET", "http://some.random.url.dfdgsfg", nil)
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.Equals(t, context.Canceled, err)
// make linter happy
if resp != nil {
rtest.OK(t, resp.Body.Close())
}
}
type slowReader struct {
data io.Reader
delay time.Duration
}
func newSlowReader(data io.Reader, delay time.Duration) *slowReader {
return &slowReader{
data: data,
delay: delay,
}
}
func (s *slowReader) Read(p []byte) (n int, err error) {
time.Sleep(s.delay)
return s.data.Read(p)
}
func TestUploadTimeout(t *testing.T) {
t.Parallel()
msg := []byte("ping")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
return
}
t.Error("upload should have been canceled")
}))
defer srv.Close()
rt := newWatchdogRoundtripper(http.DefaultTransport, 10*time.Millisecond, 1024)
req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(newSlowReader(bytes.NewReader(msg), 100*time.Millisecond)))
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.Equals(t, context.Canceled, err)
// make linter happy
if resp != nil {
rtest.OK(t, resp.Body.Close())
}
}
func TestProcessingTimeout(t *testing.T) {
t.Parallel()
msg := []byte("ping")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
return
}
time.Sleep(100 * time.Millisecond)
w.WriteHeader(200)
}))
defer srv.Close()
rt := newWatchdogRoundtripper(http.DefaultTransport, 10*time.Millisecond, 1024)
req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(bytes.NewReader(msg)))
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.Equals(t, context.Canceled, err)
// make linter happy
if resp != nil {
rtest.OK(t, resp.Body.Close())
}
}
func TestDownloadTimeout(t *testing.T) {
t.Parallel()
msg := []byte("ping")
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
return
}
w.WriteHeader(200)
_, _ = w.Write(data[:2])
w.(http.Flusher).Flush()
data = data[2:]
time.Sleep(100 * time.Millisecond)
_, _ = w.Write(data)
}))
defer srv.Close()
rt := newWatchdogRoundtripper(http.DefaultTransport, 10*time.Millisecond, 1024)
req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(bytes.NewReader(msg)))
rtest.OK(t, err)
resp, err := rt.RoundTrip(req)
rtest.OK(t, err)
rtest.Equals(t, 200, resp.StatusCode, "unexpected status code")
_, err = io.ReadAll(resp.Body)
rtest.Equals(t, context.Canceled, err, "response download not canceled")
rtest.OK(t, resp.Body.Close())
}

View file

@ -8,6 +8,7 @@ const (
DeprecateLegacyIndex FlagName = "deprecate-legacy-index" DeprecateLegacyIndex FlagName = "deprecate-legacy-index"
DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout" DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout"
DeviceIDForHardlinks FlagName = "device-id-for-hardlinks" DeviceIDForHardlinks FlagName = "device-id-for-hardlinks"
HTTPTimeouts FlagName = "http-timeouts"
) )
func init() { func init() {
@ -15,5 +16,6 @@ func init() {
DeprecateLegacyIndex: {Type: Beta, Description: "disable support for index format used by restic 0.1.0. Use `restic repair index` to update the index if necessary."}, DeprecateLegacyIndex: {Type: Beta, Description: "disable support for index format used by restic 0.1.0. Use `restic repair index` to update the index if necessary."},
DeprecateS3LegacyLayout: {Type: Beta, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use `RESTIC_FEATURES=deprecate-s3-legacy-layout=false restic migrate s3_layout` to migrate your S3 repository if necessary."}, DeprecateS3LegacyLayout: {Type: Beta, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use `RESTIC_FEATURES=deprecate-s3-legacy-layout=false restic migrate s3_layout` to migrate your S3 repository if necessary."},
DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"}, DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"},
HTTPTimeouts: {Type: Beta, Description: "enforce timeouts for stuck HTTP requests."},
}) })
} }