From cdeeff988ee2f528d5e8fa6cadd1c35d6ec12cbe Mon Sep 17 00:00:00 2001 From: Yoni Jah Date: Thu, 6 Apr 2017 00:19:26 +0800 Subject: [PATCH] Added RepetableReader to fs. used in OneDrive with io.LimitedReader to display accurate speed --- fs/readers.go | 65 ++++++++++++++++++++++++++++ fs/readers_test.go | 100 +++++++++++++++++++++++++++++++++++++++++++ onedrive/onedrive.go | 23 ++++------ 3 files changed, 173 insertions(+), 15 deletions(-) create mode 100644 fs/readers.go create mode 100644 fs/readers_test.go diff --git a/fs/readers.go b/fs/readers.go new file mode 100644 index 000000000..ef5b0c471 --- /dev/null +++ b/fs/readers.go @@ -0,0 +1,65 @@ +package fs + +import ( + "io" + + "github.com/pkg/errors" +) + +// A RepeatableReader implements the io.ReadSeeker it allow to seek cached data +// back and forth within the reader but will only read data from the internal Reader as necessary +// and will play nicely with the Account and io.LimitedReader to reflect current speed +type RepeatableReader struct { + in io.Reader // Input reader + i int64 // current reading index + b []byte // internal cache buffer +} + +var _ io.ReadSeeker = (*RepeatableReader)(nil) + +// Seek implements the io.Seeker interface. +// If seek position is passed the cache buffer length the function will return +// the maximum offset that can be used and "fs.RepeatableReader.Seek: offset is unavailable" Error +func (r *RepeatableReader) Seek(offset int64, whence int) (int64, error) { + var abs int64 + cacheLen := int64(len(r.b)) + switch whence { + case 0: //io.SeekStart + abs = offset + case 1: //io.SeekCurrent + abs = r.i + offset + case 2: //io.SeekEnd + abs = cacheLen + offset + default: + return 0, errors.New("fs.RepeatableReader.Seek: invalid whence") + } + if abs < 0 { + return 0, errors.New("fs.RepeatableReader.Seek: negative position") + } + if abs > cacheLen { + return offset - (abs - cacheLen), errors.New("fs.RepeatableReader.Seek: offset is unavailable") + } + r.i = abs + return abs, nil +} + +// Read data from original Reader into bytes +// Data is either served from the underlying Reader or from cache if was already read +func (r *RepeatableReader) Read(b []byte) (n int, err error) { + cacheLen := int64(len(r.b)) + if r.i == cacheLen { + n, err = r.in.Read(b) + if n > 0 { + r.b = append(r.b, b[:n]...) + } + } else { + n = copy(b, r.b[r.i:]) + } + r.i += int64(n) + return n, err +} + +// NewRepeatableReader create new repeatable reader from Reader r +func NewRepeatableReader(r io.Reader) *RepeatableReader { + return &RepeatableReader{in: r} +} diff --git a/fs/readers_test.go b/fs/readers_test.go new file mode 100644 index 000000000..713177f8c --- /dev/null +++ b/fs/readers_test.go @@ -0,0 +1,100 @@ +package fs + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRepeatableReader(t *testing.T) { + var dst []byte + var n int + var pos int64 + var err error + + b := []byte("Testbuffer") + buf := bytes.NewBuffer(b) + r := NewRepeatableReader(buf) + + dst = make([]byte, 100) + n, err = r.Read(dst) + assert.Nil(t, err) + assert.Equal(t, 10, n) + require.Equal(t, b, dst[0:10]) + + // Test read EOF + n, err = r.Read(dst) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, n) + + // Test Seek Back to start + dst = make([]byte, 10) + pos, err = r.Seek(0, 0) + assert.Nil(t, err) + require.Equal(t, 0, int(pos)) + + n, err = r.Read(dst) + assert.Nil(t, err) + assert.Equal(t, 10, n) + require.Equal(t, b, dst) + + // Test partial read + buf = bytes.NewBuffer(b) + r = NewRepeatableReader(buf) + dst = make([]byte, 5) + n, err = r.Read(dst) + assert.Nil(t, err) + assert.Equal(t, 5, n) + require.Equal(t, b[0:5], dst) + n, err = r.Read(dst) + assert.Nil(t, err) + assert.Equal(t, 5, n) + require.Equal(t, b[5:], dst) + + // Test Seek + buf = bytes.NewBuffer(b) + r = NewRepeatableReader(buf) + // Should not allow seek past cache index + pos, err = r.Seek(5, 1) + assert.NotNil(t, err) + assert.Equal(t, "fs.RepeatableReader.Seek: offset is unavailable", err.Error()) + assert.Equal(t, 0, int(pos)) + + // Should not allow seek to negative position start + pos, err = r.Seek(-1, 1) + assert.NotNil(t, err) + assert.Equal(t, "fs.RepeatableReader.Seek: negative position", err.Error()) + assert.Equal(t, 0, int(pos)) + + // Should not allow seek with invalid whence + pos, err = r.Seek(0, 3) + assert.NotNil(t, err) + assert.Equal(t, "fs.RepeatableReader.Seek: invalid whence", err.Error()) + assert.Equal(t, 0, int(pos)) + + // Should seek from index with io.SeekCurrent(1) whence + dst = make([]byte, 5) + _, _ = r.Read(dst) + pos, err = r.Seek(-3, 1) + assert.Nil(t, err) + require.Equal(t, 2, int(pos)) + pos, err = r.Seek(1, 1) + assert.Nil(t, err) + require.Equal(t, 3, int(pos)) + + // Should seek from cache end with io.SeekEnd(2) whence + pos, err = r.Seek(-3, 2) + assert.Nil(t, err) + require.Equal(t, 2, int(pos)) + + // Should read from seek postion and past it + dst = make([]byte, 5) + n, err = io.ReadFull(r, dst) + assert.Nil(t, err) + assert.Equal(t, 5, n) + assert.Equal(t, b[2:7], dst) + +} diff --git a/onedrive/onedrive.go b/onedrive/onedrive.go index 5fd710d12..3ac2cbdb9 100644 --- a/onedrive/onedrive.go +++ b/onedrive/onedrive.go @@ -3,7 +3,6 @@ package onedrive import ( - "bytes" "encoding/json" "fmt" "io" @@ -923,21 +922,20 @@ func (o *Object) createUploadSession() (response *api.CreateUploadResponse, err } // uploadFragment uploads a part -func (o *Object) uploadFragment(url string, start int64, totalSize int64, buf []byte) (err error) { - bufSize := int64(len(buf)) - bufReader := bytes.NewReader(buf) +func (o *Object) uploadFragment(url string, start int64, totalSize int64, chunk io.ReadSeeker, chunkSize int64) (err error) { opts := rest.Opts{ Method: "PUT", Path: url, Absolute: true, - ContentLength: &bufSize, - ContentRange: fmt.Sprintf("bytes %d-%d/%d", start, start+bufSize-1, totalSize), - Body: bufReader, + ContentLength: &chunkSize, + ContentRange: fmt.Sprintf("bytes %d-%d/%d", start, start+chunkSize-1, totalSize), + Body: chunk, } + fs.Debugf(o, "OPTS: %s", opts.ContentRange) var response api.UploadFragmentResponse var resp *http.Response err = o.fs.pacer.Call(func() (bool, error) { - _, _ = bufReader.Seek(0, 0) + _, _ = chunk.Seek(0, 0) resp, err = o.fs.srv.CallJSON(&opts, nil, &response) return shouldRetry(resp, err) }) @@ -988,19 +986,14 @@ func (o *Object) uploadMultipart(in io.Reader, size int64) (err error) { // Upload the chunks remaining := size position := int64(0) - buf := make([]byte, int64(chunkSize)) for remaining > 0 { n := int64(chunkSize) if remaining < n { n = remaining - buf = buf[:n] - } - _, err = io.ReadFull(in, buf) - if err != nil { - return err } + seg := fs.NewRepeatableReader(io.LimitReader(in, n)) fs.Debugf(o, "Uploading segment %d/%d size %d", position, size, n) - err = o.uploadFragment(uploadURL, position, size, buf) + err = o.uploadFragment(uploadURL, position, size, seg, n) if err != nil { return err }