diff --git a/internal/backend/http_transport.go b/internal/backend/http_transport.go index 354611e07..09eb3cf16 100644 --- a/internal/backend/http_transport.go +++ b/internal/backend/http_transport.go @@ -13,6 +13,7 @@ import ( "github.com/peterbourgon/unixtransport" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/feature" "golang.org/x/net/http2" ) @@ -130,6 +131,11 @@ func Transport(opts TransportOptions) (http.RoundTripper, error) { tr.TLSClientConfig.RootCAs = pool } + rt := http.RoundTripper(tr) + if feature.Flag.Enabled(feature.HTTPTimeouts) { + rt = newWatchdogRoundtripper(rt, 120*time.Second, 128*1024) + } + // wrap in the debug round tripper (if active) - return debug.RoundTripper(tr), nil + return debug.RoundTripper(rt), nil } diff --git a/internal/backend/watchdog_roundtriper.go b/internal/backend/watchdog_roundtriper.go new file mode 100644 index 000000000..fb7863002 --- /dev/null +++ b/internal/backend/watchdog_roundtriper.go @@ -0,0 +1,104 @@ +package backend + +import ( + "context" + "io" + "net/http" + "time" +) + +// watchdogRoundtripper cancels an http request if an upload or download did not make progress +// within timeout. The time between fully sending the request and receiving an response is also +// limited by this timeout. This ensures that stuck requests are cancelled after some time. +// +// The roundtriper makes the assumption that the upload and download happen continuously. In particular, +// the caller must not make long pauses between individual read requests from the response body. +type watchdogRoundtripper struct { + rt http.RoundTripper + timeout time.Duration + chunkSize int +} + +var _ http.RoundTripper = &watchdogRoundtripper{} + +func newWatchdogRoundtripper(rt http.RoundTripper, timeout time.Duration, chunkSize int) *watchdogRoundtripper { + return &watchdogRoundtripper{ + rt: rt, + timeout: timeout, + chunkSize: chunkSize, + } +} + +func (w *watchdogRoundtripper) RoundTrip(req *http.Request) (*http.Response, error) { + timer := time.NewTimer(w.timeout) + ctx, cancel := context.WithCancel(req.Context()) + + // cancel context if timer expires + go func() { + defer timer.Stop() + select { + case <-timer.C: + cancel() + case <-ctx.Done(): + } + }() + + kick := func() { + timer.Reset(w.timeout) + } + + req = req.Clone(ctx) + if req.Body != nil { + // kick watchdog timer as long as uploading makes progress + req.Body = newWatchdogReadCloser(req.Body, w.chunkSize, kick, nil) + } + + resp, err := w.rt.RoundTrip(req) + if err != nil { + return nil, err + } + + // kick watchdog timer as long as downloading makes progress + // cancel context to stop goroutine once response body is closed + resp.Body = newWatchdogReadCloser(resp.Body, w.chunkSize, kick, cancel) + return resp, nil +} + +func newWatchdogReadCloser(rc io.ReadCloser, chunkSize int, kick func(), close func()) *watchdogReadCloser { + return &watchdogReadCloser{ + rc: rc, + chunkSize: chunkSize, + kick: kick, + close: close, + } +} + +type watchdogReadCloser struct { + rc io.ReadCloser + chunkSize int + kick func() + close func() +} + +var _ io.ReadCloser = &watchdogReadCloser{} + +func (w *watchdogReadCloser) Read(p []byte) (n int, err error) { + w.kick() + + // Read is not required to fill the whole passed in byte slice + // Thus, keep things simple and just stay within our chunkSize. + if len(p) > w.chunkSize { + p = p[:w.chunkSize] + } + n, err = w.rc.Read(p) + w.kick() + + return n, err +} + +func (w *watchdogReadCloser) Close() error { + if w.close != nil { + w.close() + } + return w.rc.Close() +} diff --git a/internal/feature/registry.go b/internal/feature/registry.go index 2d2e45edf..b0e4d2ed7 100644 --- a/internal/feature/registry.go +++ b/internal/feature/registry.go @@ -8,6 +8,7 @@ const ( DeprecateLegacyIndex FlagName = "deprecate-legacy-index" DeprecateS3LegacyLayout FlagName = "deprecate-s3-legacy-layout" DeviceIDForHardlinks FlagName = "device-id-for-hardlinks" + HTTPTimeouts FlagName = "http-timeouts" ) func init() { @@ -15,5 +16,6 @@ func init() { DeprecateLegacyIndex: {Type: Beta, Description: "disable support for index format used by restic 0.1.0. Use `restic repair index` to update the index if necessary."}, DeprecateS3LegacyLayout: {Type: Beta, Description: "disable support for S3 legacy layout used up to restic 0.7.0. Use `RESTIC_FEATURES=deprecate-s3-legacy-layout=false restic migrate s3_layout` to migrate your S3 repository if necessary."}, DeviceIDForHardlinks: {Type: Alpha, Description: "store deviceID only for hardlinks to reduce metadata changes for example when using btrfs subvolumes. Will be removed in a future restic version after repository format 3 is available"}, + HTTPTimeouts: {Type: Beta, Description: "enforce timeouts for stuck HTTP requests."}, }) }