forked from TrueCloudLab/rclone
Added RepetableReader to fs. used in OneDrive with io.LimitedReader to display accurate speed
This commit is contained in:
parent
930ff266f2
commit
cdeeff988e
3 changed files with 173 additions and 15 deletions
65
fs/readers.go
Normal file
65
fs/readers.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
package fs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A RepeatableReader implements the io.ReadSeeker it allow to seek cached data
|
||||||
|
// back and forth within the reader but will only read data from the internal Reader as necessary
|
||||||
|
// and will play nicely with the Account and io.LimitedReader to reflect current speed
|
||||||
|
type RepeatableReader struct {
|
||||||
|
in io.Reader // Input reader
|
||||||
|
i int64 // current reading index
|
||||||
|
b []byte // internal cache buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ io.ReadSeeker = (*RepeatableReader)(nil)
|
||||||
|
|
||||||
|
// Seek implements the io.Seeker interface.
|
||||||
|
// If seek position is passed the cache buffer length the function will return
|
||||||
|
// the maximum offset that can be used and "fs.RepeatableReader.Seek: offset is unavailable" Error
|
||||||
|
func (r *RepeatableReader) Seek(offset int64, whence int) (int64, error) {
|
||||||
|
var abs int64
|
||||||
|
cacheLen := int64(len(r.b))
|
||||||
|
switch whence {
|
||||||
|
case 0: //io.SeekStart
|
||||||
|
abs = offset
|
||||||
|
case 1: //io.SeekCurrent
|
||||||
|
abs = r.i + offset
|
||||||
|
case 2: //io.SeekEnd
|
||||||
|
abs = cacheLen + offset
|
||||||
|
default:
|
||||||
|
return 0, errors.New("fs.RepeatableReader.Seek: invalid whence")
|
||||||
|
}
|
||||||
|
if abs < 0 {
|
||||||
|
return 0, errors.New("fs.RepeatableReader.Seek: negative position")
|
||||||
|
}
|
||||||
|
if abs > cacheLen {
|
||||||
|
return offset - (abs - cacheLen), errors.New("fs.RepeatableReader.Seek: offset is unavailable")
|
||||||
|
}
|
||||||
|
r.i = abs
|
||||||
|
return abs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read data from original Reader into bytes
|
||||||
|
// Data is either served from the underlying Reader or from cache if was already read
|
||||||
|
func (r *RepeatableReader) Read(b []byte) (n int, err error) {
|
||||||
|
cacheLen := int64(len(r.b))
|
||||||
|
if r.i == cacheLen {
|
||||||
|
n, err = r.in.Read(b)
|
||||||
|
if n > 0 {
|
||||||
|
r.b = append(r.b, b[:n]...)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
n = copy(b, r.b[r.i:])
|
||||||
|
}
|
||||||
|
r.i += int64(n)
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRepeatableReader create new repeatable reader from Reader r
|
||||||
|
func NewRepeatableReader(r io.Reader) *RepeatableReader {
|
||||||
|
return &RepeatableReader{in: r}
|
||||||
|
}
|
100
fs/readers_test.go
Normal file
100
fs/readers_test.go
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
package fs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRepeatableReader(t *testing.T) {
|
||||||
|
var dst []byte
|
||||||
|
var n int
|
||||||
|
var pos int64
|
||||||
|
var err error
|
||||||
|
|
||||||
|
b := []byte("Testbuffer")
|
||||||
|
buf := bytes.NewBuffer(b)
|
||||||
|
r := NewRepeatableReader(buf)
|
||||||
|
|
||||||
|
dst = make([]byte, 100)
|
||||||
|
n, err = r.Read(dst)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 10, n)
|
||||||
|
require.Equal(t, b, dst[0:10])
|
||||||
|
|
||||||
|
// Test read EOF
|
||||||
|
n, err = r.Read(dst)
|
||||||
|
assert.Equal(t, io.EOF, err)
|
||||||
|
assert.Equal(t, 0, n)
|
||||||
|
|
||||||
|
// Test Seek Back to start
|
||||||
|
dst = make([]byte, 10)
|
||||||
|
pos, err = r.Seek(0, 0)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
require.Equal(t, 0, int(pos))
|
||||||
|
|
||||||
|
n, err = r.Read(dst)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 10, n)
|
||||||
|
require.Equal(t, b, dst)
|
||||||
|
|
||||||
|
// Test partial read
|
||||||
|
buf = bytes.NewBuffer(b)
|
||||||
|
r = NewRepeatableReader(buf)
|
||||||
|
dst = make([]byte, 5)
|
||||||
|
n, err = r.Read(dst)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 5, n)
|
||||||
|
require.Equal(t, b[0:5], dst)
|
||||||
|
n, err = r.Read(dst)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 5, n)
|
||||||
|
require.Equal(t, b[5:], dst)
|
||||||
|
|
||||||
|
// Test Seek
|
||||||
|
buf = bytes.NewBuffer(b)
|
||||||
|
r = NewRepeatableReader(buf)
|
||||||
|
// Should not allow seek past cache index
|
||||||
|
pos, err = r.Seek(5, 1)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
assert.Equal(t, "fs.RepeatableReader.Seek: offset is unavailable", err.Error())
|
||||||
|
assert.Equal(t, 0, int(pos))
|
||||||
|
|
||||||
|
// Should not allow seek to negative position start
|
||||||
|
pos, err = r.Seek(-1, 1)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
assert.Equal(t, "fs.RepeatableReader.Seek: negative position", err.Error())
|
||||||
|
assert.Equal(t, 0, int(pos))
|
||||||
|
|
||||||
|
// Should not allow seek with invalid whence
|
||||||
|
pos, err = r.Seek(0, 3)
|
||||||
|
assert.NotNil(t, err)
|
||||||
|
assert.Equal(t, "fs.RepeatableReader.Seek: invalid whence", err.Error())
|
||||||
|
assert.Equal(t, 0, int(pos))
|
||||||
|
|
||||||
|
// Should seek from index with io.SeekCurrent(1) whence
|
||||||
|
dst = make([]byte, 5)
|
||||||
|
_, _ = r.Read(dst)
|
||||||
|
pos, err = r.Seek(-3, 1)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
require.Equal(t, 2, int(pos))
|
||||||
|
pos, err = r.Seek(1, 1)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
require.Equal(t, 3, int(pos))
|
||||||
|
|
||||||
|
// Should seek from cache end with io.SeekEnd(2) whence
|
||||||
|
pos, err = r.Seek(-3, 2)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
require.Equal(t, 2, int(pos))
|
||||||
|
|
||||||
|
// Should read from seek postion and past it
|
||||||
|
dst = make([]byte, 5)
|
||||||
|
n, err = io.ReadFull(r, dst)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
assert.Equal(t, 5, n)
|
||||||
|
assert.Equal(t, b[2:7], dst)
|
||||||
|
|
||||||
|
}
|
|
@ -3,7 +3,6 @@
|
||||||
package onedrive
|
package onedrive
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -923,21 +922,20 @@ func (o *Object) createUploadSession() (response *api.CreateUploadResponse, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// uploadFragment uploads a part
|
// uploadFragment uploads a part
|
||||||
func (o *Object) uploadFragment(url string, start int64, totalSize int64, buf []byte) (err error) {
|
func (o *Object) uploadFragment(url string, start int64, totalSize int64, chunk io.ReadSeeker, chunkSize int64) (err error) {
|
||||||
bufSize := int64(len(buf))
|
|
||||||
bufReader := bytes.NewReader(buf)
|
|
||||||
opts := rest.Opts{
|
opts := rest.Opts{
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
Path: url,
|
Path: url,
|
||||||
Absolute: true,
|
Absolute: true,
|
||||||
ContentLength: &bufSize,
|
ContentLength: &chunkSize,
|
||||||
ContentRange: fmt.Sprintf("bytes %d-%d/%d", start, start+bufSize-1, totalSize),
|
ContentRange: fmt.Sprintf("bytes %d-%d/%d", start, start+chunkSize-1, totalSize),
|
||||||
Body: bufReader,
|
Body: chunk,
|
||||||
}
|
}
|
||||||
|
fs.Debugf(o, "OPTS: %s", opts.ContentRange)
|
||||||
var response api.UploadFragmentResponse
|
var response api.UploadFragmentResponse
|
||||||
var resp *http.Response
|
var resp *http.Response
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
_, _ = bufReader.Seek(0, 0)
|
_, _ = chunk.Seek(0, 0)
|
||||||
resp, err = o.fs.srv.CallJSON(&opts, nil, &response)
|
resp, err = o.fs.srv.CallJSON(&opts, nil, &response)
|
||||||
return shouldRetry(resp, err)
|
return shouldRetry(resp, err)
|
||||||
})
|
})
|
||||||
|
@ -988,19 +986,14 @@ func (o *Object) uploadMultipart(in io.Reader, size int64) (err error) {
|
||||||
// Upload the chunks
|
// Upload the chunks
|
||||||
remaining := size
|
remaining := size
|
||||||
position := int64(0)
|
position := int64(0)
|
||||||
buf := make([]byte, int64(chunkSize))
|
|
||||||
for remaining > 0 {
|
for remaining > 0 {
|
||||||
n := int64(chunkSize)
|
n := int64(chunkSize)
|
||||||
if remaining < n {
|
if remaining < n {
|
||||||
n = remaining
|
n = remaining
|
||||||
buf = buf[:n]
|
|
||||||
}
|
|
||||||
_, err = io.ReadFull(in, buf)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
seg := fs.NewRepeatableReader(io.LimitReader(in, n))
|
||||||
fs.Debugf(o, "Uploading segment %d/%d size %d", position, size, n)
|
fs.Debugf(o, "Uploading segment %d/%d size %d", position, size, n)
|
||||||
err = o.uploadFragment(uploadURL, position, size, buf)
|
err = o.uploadFragment(uploadURL, position, size, seg, n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue