operations: re-work reopen framework so it can take a RangeOption #2252
This is in preparation for multipart downloads.
This commit is contained in:
parent
7c4fe3eb75
commit
0655738da6
3 changed files with 130 additions and 100 deletions
|
@ -289,7 +289,7 @@ func Copy(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Objec
|
||||||
// If can't server side copy, do it manually
|
// If can't server side copy, do it manually
|
||||||
if err == fs.ErrorCantCopy {
|
if err == fs.ErrorCantCopy {
|
||||||
var in0 io.ReadCloser
|
var in0 io.ReadCloser
|
||||||
in0, err = newReOpen(src, hashOption, fs.Config.LowLevelRetries)
|
in0, err = newReOpen(src, hashOption, nil, fs.Config.LowLevelRetries)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = errors.Wrap(err, "failed to open source object")
|
err = errors.Wrap(err, "failed to open source object")
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -10,15 +10,16 @@ import (
|
||||||
|
|
||||||
// reOpen is a wrapper for an object reader which reopens the stream on error
|
// reOpen is a wrapper for an object reader which reopens the stream on error
|
||||||
type reOpen struct {
|
type reOpen struct {
|
||||||
mu sync.Mutex // mutex to protect the below
|
mu sync.Mutex // mutex to protect the below
|
||||||
src fs.Object // object to open
|
src fs.Object // object to open
|
||||||
hashOption *fs.HashesOption // option to pass to initial open
|
hashOption *fs.HashesOption // option to pass to initial open
|
||||||
rc io.ReadCloser // underlying stream
|
rangeOption *fs.RangeOption // option to pass to initial open
|
||||||
read int64 // number of bytes read from this stream
|
rc io.ReadCloser // underlying stream
|
||||||
maxTries int // maximum number of retries
|
read int64 // number of bytes read from this stream
|
||||||
tries int // number of retries we've had so far in this stream
|
maxTries int // maximum number of retries
|
||||||
err error // if this is set then Read/Close calls will return it
|
tries int // number of retries we've had so far in this stream
|
||||||
opened bool // if set then rc is valid and needs closing
|
err error // if this is set then Read/Close calls will return it
|
||||||
|
opened bool // if set then rc is valid and needs closing
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -27,11 +28,17 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
// newReOpen makes a handle which will reopen itself and seek to where it was on errors
|
// newReOpen makes a handle which will reopen itself and seek to where it was on errors
|
||||||
func newReOpen(src fs.Object, hashOption *fs.HashesOption, maxTries int) (rc io.ReadCloser, err error) {
|
//
|
||||||
|
// If hashOption is set this will be applied when reading from the start
|
||||||
|
//
|
||||||
|
// If rangeOption is set then this will applied when reading from the
|
||||||
|
// start, and updated on retries.
|
||||||
|
func newReOpen(src fs.Object, hashOption *fs.HashesOption, rangeOption *fs.RangeOption, maxTries int) (rc io.ReadCloser, err error) {
|
||||||
h := &reOpen{
|
h := &reOpen{
|
||||||
src: src,
|
src: src,
|
||||||
hashOption: hashOption,
|
hashOption: hashOption,
|
||||||
maxTries: maxTries,
|
rangeOption: rangeOption,
|
||||||
|
maxTries: maxTries,
|
||||||
}
|
}
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
defer h.mu.Unlock()
|
defer h.mu.Unlock()
|
||||||
|
@ -46,15 +53,24 @@ func newReOpen(src fs.Object, hashOption *fs.HashesOption, maxTries int) (rc io.
|
||||||
//
|
//
|
||||||
// we don't retry here as the Open() call will itself have low level retries
|
// we don't retry here as the Open() call will itself have low level retries
|
||||||
func (h *reOpen) open() error {
|
func (h *reOpen) open() error {
|
||||||
var opts = make([]fs.OpenOption, 1)
|
var optsArray [2]fs.OpenOption
|
||||||
if h.tries > 0 {
|
var opts = optsArray[:0]
|
||||||
}
|
|
||||||
if h.read == 0 {
|
if h.read == 0 {
|
||||||
// put hashOption on if reading from the start, ditch otherwise
|
if h.rangeOption != nil {
|
||||||
opts[0] = h.hashOption
|
opts = append(opts, h.rangeOption)
|
||||||
|
}
|
||||||
|
if h.hashOption != nil {
|
||||||
|
// put hashOption on if reading from the start, ditch otherwise
|
||||||
|
opts = append(opts, h.hashOption)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// seek to the read point
|
if h.rangeOption != nil {
|
||||||
opts[0] = &fs.SeekOption{Offset: h.read}
|
// range to the read point
|
||||||
|
opts = append(opts, &fs.RangeOption{Start: h.rangeOption.Start + h.read, End: h.rangeOption.End})
|
||||||
|
} else {
|
||||||
|
// seek to the read point
|
||||||
|
opts = append(opts, &fs.SeekOption{Offset: h.read})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
h.tries++
|
h.tries++
|
||||||
if h.tries > h.maxTries {
|
if h.tries > h.maxTries {
|
||||||
|
|
|
@ -60,85 +60,99 @@ func (er errorReader) Read(p []byte) (n int, err error) {
|
||||||
return 0, er.err
|
return 0, er.err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Contents for the mock object
|
func TestReOpen(t *testing.T) {
|
||||||
var reOpenTestcontents = []byte("0123456789")
|
for testIndex, testName := range []string{"Seek", "Range"} {
|
||||||
|
t.Run(testName, func(t *testing.T) {
|
||||||
|
// Contents for the mock object
|
||||||
|
var (
|
||||||
|
reOpenTestcontents = []byte("0123456789")
|
||||||
|
expectedRead = reOpenTestcontents
|
||||||
|
rangeOption *fs.RangeOption
|
||||||
|
)
|
||||||
|
if testIndex > 0 {
|
||||||
|
rangeOption = &fs.RangeOption{Start: 1, End: 7}
|
||||||
|
expectedRead = reOpenTestcontents[1:8]
|
||||||
|
}
|
||||||
|
|
||||||
// Start the test with the given breaks
|
// Start the test with the given breaks
|
||||||
func testReOpen(breaks []int64, maxRetries int) (io.ReadCloser, error) {
|
testReOpen := func(breaks []int64, maxRetries int) (io.ReadCloser, error) {
|
||||||
srcOrig := mockobject.New("potato").WithContent(reOpenTestcontents, mockobject.SeekModeRegular)
|
srcOrig := mockobject.New("potato").WithContent(reOpenTestcontents, mockobject.SeekModeNone)
|
||||||
src := &reOpenTestObject{
|
src := &reOpenTestObject{
|
||||||
Object: srcOrig,
|
Object: srcOrig,
|
||||||
breaks: breaks,
|
breaks: breaks,
|
||||||
|
}
|
||||||
|
hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)}
|
||||||
|
return newReOpen(src, hashOption, rangeOption, maxRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("Basics", func(t *testing.T) {
|
||||||
|
// open
|
||||||
|
h, err := testReOpen(nil, 10)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// Check contents read correctly
|
||||||
|
got, err := ioutil.ReadAll(h)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, expectedRead, got)
|
||||||
|
|
||||||
|
// Check read after end
|
||||||
|
var buf = make([]byte, 1)
|
||||||
|
n, err := h.Read(buf)
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
assert.Equal(t, io.EOF, err)
|
||||||
|
|
||||||
|
// Check close
|
||||||
|
assert.NoError(t, h.Close())
|
||||||
|
|
||||||
|
// Check double close
|
||||||
|
assert.Equal(t, errorFileClosed, h.Close())
|
||||||
|
|
||||||
|
// Check read after close
|
||||||
|
n, err = h.Read(buf)
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
assert.Equal(t, errorFileClosed, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("ErrorAtStart", func(t *testing.T) {
|
||||||
|
// open with immediate breaking
|
||||||
|
h, err := testReOpen([]int64{0}, 10)
|
||||||
|
assert.Equal(t, errorTestError, err)
|
||||||
|
assert.Nil(t, h)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("WithErrors", func(t *testing.T) {
|
||||||
|
// open with a few break points but less than the max
|
||||||
|
h, err := testReOpen([]int64{2, 1, 3}, 10)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// check contents
|
||||||
|
got, err := ioutil.ReadAll(h)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, expectedRead, got)
|
||||||
|
|
||||||
|
// check close
|
||||||
|
assert.NoError(t, h.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("TooManyErrors", func(t *testing.T) {
|
||||||
|
// open with a few break points but >= the max
|
||||||
|
h, err := testReOpen([]int64{2, 1, 3}, 3)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
// check contents
|
||||||
|
got, err := ioutil.ReadAll(h)
|
||||||
|
assert.Equal(t, errorTestError, err)
|
||||||
|
assert.Equal(t, expectedRead[:6], got)
|
||||||
|
|
||||||
|
// check old error is returned
|
||||||
|
var buf = make([]byte, 1)
|
||||||
|
n, err := h.Read(buf)
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
assert.Equal(t, errorTooManyTries, err)
|
||||||
|
|
||||||
|
// Check close
|
||||||
|
assert.Equal(t, errorFileClosed, h.Close())
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
hashOption := &fs.HashesOption{Hashes: hash.NewHashSet(hash.MD5)}
|
|
||||||
return newReOpen(src, hashOption, maxRetries)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReOpenBasics(t *testing.T) {
|
|
||||||
// open
|
|
||||||
h, err := testReOpen(nil, 10)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// Check contents read correctly
|
|
||||||
got, err := ioutil.ReadAll(h)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, reOpenTestcontents, got)
|
|
||||||
|
|
||||||
// Check read after end
|
|
||||||
var buf = make([]byte, 1)
|
|
||||||
n, err := h.Read(buf)
|
|
||||||
assert.Equal(t, 0, n)
|
|
||||||
assert.Equal(t, io.EOF, err)
|
|
||||||
|
|
||||||
// Check close
|
|
||||||
assert.NoError(t, h.Close())
|
|
||||||
|
|
||||||
// Check double close
|
|
||||||
assert.Equal(t, errorFileClosed, h.Close())
|
|
||||||
|
|
||||||
// Check read after close
|
|
||||||
n, err = h.Read(buf)
|
|
||||||
assert.Equal(t, 0, n)
|
|
||||||
assert.Equal(t, errorFileClosed, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReOpenErrorAtStart(t *testing.T) {
|
|
||||||
// open with immediate breaking
|
|
||||||
h, err := testReOpen([]int64{0}, 10)
|
|
||||||
assert.Equal(t, errorTestError, err)
|
|
||||||
assert.Nil(t, h)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReOpenError(t *testing.T) {
|
|
||||||
// open with a few break points but less than the max
|
|
||||||
h, err := testReOpen([]int64{2, 1, 3}, 10)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// check contents
|
|
||||||
got, err := ioutil.ReadAll(h)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, reOpenTestcontents, got)
|
|
||||||
|
|
||||||
// check close
|
|
||||||
assert.NoError(t, h.Close())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReOpenFail(t *testing.T) {
|
|
||||||
// open with a few break points but >= the max
|
|
||||||
h, err := testReOpen([]int64{2, 1, 3}, 3)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
// check contents
|
|
||||||
got, err := ioutil.ReadAll(h)
|
|
||||||
assert.Equal(t, errorTestError, err)
|
|
||||||
assert.Equal(t, reOpenTestcontents[:6], got)
|
|
||||||
|
|
||||||
// check old error is returned
|
|
||||||
var buf = make([]byte, 1)
|
|
||||||
n, err := h.Read(buf)
|
|
||||||
assert.Equal(t, 0, n)
|
|
||||||
assert.Equal(t, errorTooManyTries, err)
|
|
||||||
|
|
||||||
// Check close
|
|
||||||
assert.Equal(t, errorFileClosed, h.Close())
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue