operations: resume downloads if the reader fails in copy - fixes #2108

This puts a shim on the reader opened by Copy so that if an error is
returned, the reader is re-opened at the correct seek point.

This should make downloading very large files more reliable.
This commit is contained in:
Nick Craig-Wood 2019-02-10 18:20:58 +00:00
parent c3eecbe933
commit 076d3da825
3 changed files with 256 additions and 1 deletions

View file

@ -283,7 +283,7 @@ func Copy(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Objec
// If can't server side copy, do it manually
if err == fs.ErrorCantCopy {
var in0 io.ReadCloser
in0, err = src.Open(hashOption)
in0, err = newReOpen(src, hashOption, fs.Config.LowLevelRetries)
if err != nil {
err = errors.Wrap(err, "failed to open source object")
} else {

111
fs/operations/reopen.go Normal file
View file

@ -0,0 +1,111 @@
package operations
import (
"io"
"sync"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
)
// reOpen is a wrapper for an object reader which reopens the stream on error
type reOpen struct {
mu sync.Mutex // mutex to protect the below
src fs.Object // object to open
hashOption *fs.HashesOption // option to pass to initial open
rc io.ReadCloser // underlying stream
read int64 // number of bytes read from this stream
maxTries int // maximum number of retries
tries int // number of retries we've had so far in this stream
err error // if this is set then Read/Close calls will return it
opened bool // if set then rc is valid and needs closing
}
var (
errorFileClosed = errors.New("file already closed")
errorTooManyTries = errors.New("failed to reopen: too many retries")
)
// newReOpen makes a handle which will reopen itself and seek to where it was on errors
func newReOpen(src fs.Object, hashOption *fs.HashesOption, maxTries int) (rc io.ReadCloser, err error) {
h := &reOpen{
src: src,
hashOption: hashOption,
maxTries: maxTries,
}
h.mu.Lock()
defer h.mu.Unlock()
err = h.open()
if err != nil {
return nil, err
}
return h, nil
}
// open the underlying handle - call with lock held
//
// we don't retry here as the Open() call will itself have low level retries
func (h *reOpen) open() error {
var opts = make([]fs.OpenOption, 1)
if h.tries > 0 {
}
if h.read == 0 {
// put hashOption on if reading from the start, ditch otherwise
opts[0] = h.hashOption
} else {
// seek to the read point
opts[0] = &fs.SeekOption{Offset: h.read}
}
h.tries++
if h.tries > h.maxTries {
h.err = errorTooManyTries
} else {
h.rc, h.err = h.src.Open(opts...)
}
if h.err != nil {
if h.tries > 1 {
fs.Debugf(h.src, "Reopen failed after %d bytes read: %v", h.read, h.err)
}
return h.err
}
h.opened = true
return nil
}
// Read bytes retrying as necessary
func (h *reOpen) Read(p []byte) (n int, err error) {
h.mu.Lock()
defer h.mu.Unlock()
if h.err != nil {
// return a previous error if there is one
return n, h.err
}
n, err = h.rc.Read(p)
if err != nil {
h.err = err
}
h.read += int64(n)
if err != nil && err != io.EOF {
// close underlying stream
h.opened = false
_ = h.rc.Close()
// reopen stream, clearing error if successful
fs.Debugf(h.src, "Reopening on read failure after %d bytes: retry %d/%d: %v", h.read, h.tries, h.maxTries, err)
if h.open() == nil {
err = nil
}
}
return n, err
}
// Close the stream
func (h *reOpen) Close() error {
h.mu.Lock()
defer h.mu.Unlock()
if !h.opened {
return errorFileClosed
}
h.opened = false
h.err = errorFileClosed
return h.rc.Close()
}

View file

@ -0,0 +1,144 @@
package operations
import (
"io"
"io/ioutil"
"testing"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/hash"
"github.com/ncw/rclone/fstest/mockobject"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
// check interface
var _ io.ReadCloser = (*reOpen)(nil)
var errorTestError = errors.New("test error")
// this is a wrapper for an mockobject with a custom Open function
//
// breaks indicate the number of bytes to read before returning an
// error
type reOpenTestObject struct {
fs.Object
breaks []int64
}
// Open opens the file for read. Call Close() on the returned io.ReadCloser
//
// This will break after reading the number of bytes in breaks
func (o *reOpenTestObject) Open(options ...fs.OpenOption) (io.ReadCloser, error) {
rc, err := o.Object.Open(options...)
if err != nil {
return nil, err
}
if len(o.breaks) > 0 {
// Pop a breakpoint off
N := o.breaks[0]
o.breaks = o.breaks[1:]
// If 0 then return an error immediately
if N == 0 {
return nil, errorTestError
}
// Read N bytes then an error
r := io.MultiReader(&io.LimitedReader{R: rc, N: N}, errorReader{errorTestError})
// Wrap with Close in a new readCloser
rc = readCloser{Reader: r, Closer: rc}
}
return rc, nil
}
// Return an error only
type errorReader struct {
err error
}
// Read returning an error
func (er errorReader) Read(p []byte) (n int, err error) {
return 0, er.err
}
// Contents for the mock object
var reOpenTestcontents = []byte("0123456789")
// Start the test with the given breaks
func testReOpen(breaks []int64, maxRetries int) (io.ReadCloser, error) {
srcOrig := mockobject.New("potato").WithContent(reOpenTestcontents, mockobject.SeekModeRegular)
src := &reOpenTestObject{
Object: srcOrig,
breaks: breaks,
}
hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)}
return newReOpen(src, hashOption, maxRetries)
}
func TestReOpenBasics(t *testing.T) {
// open
h, err := testReOpen(nil, 10)
assert.NoError(t, err)
// Check contents read correctly
got, err := ioutil.ReadAll(h)
assert.NoError(t, err)
assert.Equal(t, reOpenTestcontents, got)
// Check read after end
var buf = make([]byte, 1)
n, err := h.Read(buf)
assert.Equal(t, 0, n)
assert.Equal(t, io.EOF, err)
// Check close
assert.NoError(t, h.Close())
// Check double close
assert.Equal(t, errorFileClosed, h.Close())
// Check read after close
n, err = h.Read(buf)
assert.Equal(t, 0, n)
assert.Equal(t, errorFileClosed, err)
}
func TestReOpenErrorAtStart(t *testing.T) {
// open with immediate breaking
h, err := testReOpen([]int64{0}, 10)
assert.Equal(t, errorTestError, err)
assert.Nil(t, h)
}
func TestReOpenError(t *testing.T) {
// open with a few break points but less than the max
h, err := testReOpen([]int64{2, 1, 3}, 10)
assert.NoError(t, err)
// check contents
got, err := ioutil.ReadAll(h)
assert.NoError(t, err)
assert.Equal(t, reOpenTestcontents, got)
// check close
assert.NoError(t, h.Close())
}
func TestReOpenFail(t *testing.T) {
// open with a few break points but >= the max
h, err := testReOpen([]int64{2, 1, 3}, 3)
assert.NoError(t, err)
// check contents
got, err := ioutil.ReadAll(h)
assert.Equal(t, errorTestError, err)
assert.Equal(t, reOpenTestcontents[:6], got)
// check old error is returned
var buf = make([]byte, 1)
n, err := h.Read(buf)
assert.Equal(t, 0, n)
assert.Equal(t, errorTooManyTries, err)
// Check close
assert.Equal(t, errorFileClosed, h.Close())
}