From 076d3da8250e7ce930f1a8e9118f1c1a1091a781 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sun, 10 Feb 2019 18:20:58 +0000 Subject: [PATCH] operations: resume downloads if the reader fails in copy - fixes #2108 This puts a shim on the reader opened by Copy so that if an error is returned, the reader is re-opened at the correct seek point. This should make downloading very large files more reliable. --- fs/operations/operations.go | 2 +- fs/operations/reopen.go | 111 +++++++++++++++++++++++++++ fs/operations/reopen_test.go | 144 +++++++++++++++++++++++++++++++++++ 3 files changed, 256 insertions(+), 1 deletion(-) create mode 100644 fs/operations/reopen.go create mode 100644 fs/operations/reopen_test.go diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 1e008068f..417ad58d3 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -283,7 +283,7 @@ func Copy(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Objec // If can't server side copy, do it manually if err == fs.ErrorCantCopy { var in0 io.ReadCloser - in0, err = src.Open(hashOption) + in0, err = newReOpen(src, hashOption, fs.Config.LowLevelRetries) if err != nil { err = errors.Wrap(err, "failed to open source object") } else { diff --git a/fs/operations/reopen.go b/fs/operations/reopen.go new file mode 100644 index 000000000..8ad6be55b --- /dev/null +++ b/fs/operations/reopen.go @@ -0,0 +1,111 @@ +package operations + +import ( + "io" + "sync" + + "github.com/ncw/rclone/fs" + "github.com/pkg/errors" +) + +// reOpen is a wrapper for an object reader which reopens the stream on error +type reOpen struct { + mu sync.Mutex // mutex to protect the below + src fs.Object // object to open + hashOption *fs.HashesOption // option to pass to initial open + rc io.ReadCloser // underlying stream + read int64 // number of bytes read from this stream + maxTries int // maximum number of retries + tries int // number of retries we've had so far in this stream + err error // if this is set then Read/Close calls will return it + opened bool // if set then rc is valid and needs closing +} + +var ( + errorFileClosed = errors.New("file already closed") + errorTooManyTries = errors.New("failed to reopen: too many retries") +) + +// newReOpen makes a handle which will reopen itself and seek to where it was on errors +func newReOpen(src fs.Object, hashOption *fs.HashesOption, maxTries int) (rc io.ReadCloser, err error) { + h := &reOpen{ + src: src, + hashOption: hashOption, + maxTries: maxTries, + } + h.mu.Lock() + defer h.mu.Unlock() + err = h.open() + if err != nil { + return nil, err + } + return h, nil +} + +// open the underlying handle - call with lock held +// +// we don't retry here as the Open() call will itself have low level retries +func (h *reOpen) open() error { + var opts = make([]fs.OpenOption, 1) + if h.tries > 0 { + } + if h.read == 0 { + // put hashOption on if reading from the start, ditch otherwise + opts[0] = h.hashOption + } else { + // seek to the read point + opts[0] = &fs.SeekOption{Offset: h.read} + } + h.tries++ + if h.tries > h.maxTries { + h.err = errorTooManyTries + } else { + h.rc, h.err = h.src.Open(opts...) + } + if h.err != nil { + if h.tries > 1 { + fs.Debugf(h.src, "Reopen failed after %d bytes read: %v", h.read, h.err) + } + return h.err + } + h.opened = true + return nil +} + +// Read bytes retrying as necessary +func (h *reOpen) Read(p []byte) (n int, err error) { + h.mu.Lock() + defer h.mu.Unlock() + if h.err != nil { + // return a previous error if there is one + return n, h.err + } + n, err = h.rc.Read(p) + if err != nil { + h.err = err + } + h.read += int64(n) + if err != nil && err != io.EOF { + // close underlying stream + h.opened = false + _ = h.rc.Close() + // reopen stream, clearing error if successful + fs.Debugf(h.src, "Reopening on read failure after %d bytes: retry %d/%d: %v", h.read, h.tries, h.maxTries, err) + if h.open() == nil { + err = nil + } + } + return n, err +} + +// Close the stream +func (h *reOpen) Close() error { + h.mu.Lock() + defer h.mu.Unlock() + if !h.opened { + return errorFileClosed + } + h.opened = false + h.err = errorFileClosed + return h.rc.Close() +} diff --git a/fs/operations/reopen_test.go b/fs/operations/reopen_test.go new file mode 100644 index 000000000..c878788bc --- /dev/null +++ b/fs/operations/reopen_test.go @@ -0,0 +1,144 @@ +package operations + +import ( + "io" + "io/ioutil" + "testing" + + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fs/hash" + "github.com/ncw/rclone/fstest/mockobject" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +// check interface +var _ io.ReadCloser = (*reOpen)(nil) + +var errorTestError = errors.New("test error") + +// this is a wrapper for an mockobject with a custom Open function +// +// breaks indicate the number of bytes to read before returning an +// error +type reOpenTestObject struct { + fs.Object + breaks []int64 +} + +// Open opens the file for read. Call Close() on the returned io.ReadCloser +// +// This will break after reading the number of bytes in breaks +func (o *reOpenTestObject) Open(options ...fs.OpenOption) (io.ReadCloser, error) { + rc, err := o.Object.Open(options...) + if err != nil { + return nil, err + } + if len(o.breaks) > 0 { + // Pop a breakpoint off + N := o.breaks[0] + o.breaks = o.breaks[1:] + // If 0 then return an error immediately + if N == 0 { + return nil, errorTestError + } + // Read N bytes then an error + r := io.MultiReader(&io.LimitedReader{R: rc, N: N}, errorReader{errorTestError}) + // Wrap with Close in a new readCloser + rc = readCloser{Reader: r, Closer: rc} + } + return rc, nil +} + +// Return an error only +type errorReader struct { + err error +} + +// Read returning an error +func (er errorReader) Read(p []byte) (n int, err error) { + return 0, er.err +} + +// Contents for the mock object +var reOpenTestcontents = []byte("0123456789") + +// Start the test with the given breaks +func testReOpen(breaks []int64, maxRetries int) (io.ReadCloser, error) { + srcOrig := mockobject.New("potato").WithContent(reOpenTestcontents, mockobject.SeekModeRegular) + src := &reOpenTestObject{ + Object: srcOrig, + breaks: breaks, + } + hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)} + return newReOpen(src, hashOption, maxRetries) +} + +func TestReOpenBasics(t *testing.T) { + // open + h, err := testReOpen(nil, 10) + assert.NoError(t, err) + + // Check contents read correctly + got, err := ioutil.ReadAll(h) + assert.NoError(t, err) + assert.Equal(t, reOpenTestcontents, got) + + // Check read after end + var buf = make([]byte, 1) + n, err := h.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, io.EOF, err) + + // Check close + assert.NoError(t, h.Close()) + + // Check double close + assert.Equal(t, errorFileClosed, h.Close()) + + // Check read after close + n, err = h.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, errorFileClosed, err) +} + +func TestReOpenErrorAtStart(t *testing.T) { + // open with immediate breaking + h, err := testReOpen([]int64{0}, 10) + assert.Equal(t, errorTestError, err) + assert.Nil(t, h) +} + +func TestReOpenError(t *testing.T) { + // open with a few break points but less than the max + h, err := testReOpen([]int64{2, 1, 3}, 10) + assert.NoError(t, err) + + // check contents + got, err := ioutil.ReadAll(h) + assert.NoError(t, err) + assert.Equal(t, reOpenTestcontents, got) + + // check close + assert.NoError(t, h.Close()) +} + +func TestReOpenFail(t *testing.T) { + // open with a few break points but >= the max + h, err := testReOpen([]int64{2, 1, 3}, 3) + assert.NoError(t, err) + + // check contents + got, err := ioutil.ReadAll(h) + assert.Equal(t, errorTestError, err) + assert.Equal(t, reOpenTestcontents[:6], got) + + // check old error is returned + var buf = make([]byte, 1) + n, err := h.Read(buf) + assert.Equal(t, 0, n) + assert.Equal(t, errorTooManyTries, err) + + // Check close + assert.Equal(t, errorFileClosed, h.Close()) +}