dropbox: buffer the chunks when uploading large files so they can be retried

We use fs.RepeatableReader to buffer the chunks which plays nice with
the accounting.  The default chunk size is 128M which may be too
large.

Fixes #1806
This commit is contained in:
Nick Craig-Wood 2017-11-08 09:18:16 +00:00
parent 159fce0106
commit 6f61da5c75

View file

@ -1,7 +1,6 @@
// Package dropbox provides an interface to Dropbox object storage // Package dropbox provides an interface to Dropbox object storage
package dropbox package dropbox
// FIXME buffer chunks for retries in upload
// FIXME dropbox for business would be quite easy to add // FIXME dropbox for business would be quite easy to add
/* /*
@ -805,8 +804,6 @@ func (o *Object) Open(options ...fs.OpenOption) (in io.ReadCloser, err error) {
// Will work optimally if size is >= uploadChunkSize. If the size is either // Will work optimally if size is >= uploadChunkSize. If the size is either
// unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an // unknown (i.e. -1) or smaller than uploadChunkSize, the method incurs an
// avoidable request to the Dropbox API that does not carry payload. // avoidable request to the Dropbox API that does not carry payload.
//
// FIXME buffer chunks to improve upload retries
func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) { func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size int64) (entry *files.FileMetadata, err error) {
chunkSize := int64(uploadChunkSize) chunkSize := int64(uploadChunkSize)
chunks := 0 chunks := 0
@ -828,8 +825,13 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
// write the first chunk // write the first chunk
fmtChunk(1, false) fmtChunk(1, false)
var res *files.UploadSessionStartResult var res *files.UploadSessionStartResult
err = o.fs.pacer.CallNoRetry(func() (bool, error) { chunk := fs.NewRepeatableReader(&io.LimitedReader{R: in, N: chunkSize})
res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, &io.LimitedReader{R: in, N: chunkSize}) err = o.fs.pacer.Call(func() (bool, error) {
// seek to the start in case this is a retry
if _, err = chunk.Seek(0, 0); err != nil {
return false, nil
}
res, err = o.fs.srv.UploadSessionStart(&files.UploadSessionStartArg{}, chunk)
return shouldRetry(err) return shouldRetry(err)
}) })
if err != nil { if err != nil {
@ -859,8 +861,13 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
} }
cursor.Offset = in.BytesRead() cursor.Offset = in.BytesRead()
fmtChunk(currentChunk, false) fmtChunk(currentChunk, false)
err = o.fs.pacer.CallNoRetry(func() (bool, error) { chunk = fs.NewRepeatableReader(&io.LimitedReader{R: in, N: chunkSize})
err = o.fs.srv.UploadSessionAppendV2(&appendArg, &io.LimitedReader{R: in, N: chunkSize}) err = o.fs.pacer.Call(func() (bool, error) {
// seek to the start in case this is a retry
if _, err = chunk.Seek(0, 0); err != nil {
return false, nil
}
err = o.fs.srv.UploadSessionAppendV2(&appendArg, chunk)
return shouldRetry(err) return shouldRetry(err)
}) })
if err != nil { if err != nil {
@ -876,8 +883,13 @@ func (o *Object) uploadChunked(in0 io.Reader, commitInfo *files.CommitInfo, size
Commit: commitInfo, Commit: commitInfo,
} }
fmtChunk(currentChunk, true) fmtChunk(currentChunk, true)
err = o.fs.pacer.CallNoRetry(func() (bool, error) { chunk = fs.NewRepeatableReader(in)
entry, err = o.fs.srv.UploadSessionFinish(args, in) err = o.fs.pacer.Call(func() (bool, error) {
// seek to the start in case this is a retry
if _, err = chunk.Seek(0, 0); err != nil {
return false, nil
}
entry, err = o.fs.srv.UploadSessionFinish(args, chunk)
return shouldRetry(err) return shouldRetry(err)
}) })
if err != nil { if err != nil {
@ -921,7 +933,7 @@ func (o *Object) Update(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOptio
// Remove an object // Remove an object
func (o *Object) Remove() (err error) { func (o *Object) Remove() (err error) {
err = o.fs.pacer.CallNoRetry(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
_, err = o.fs.srv.DeleteV2(&files.DeleteArg{Path: o.remotePath()}) _, err = o.fs.srv.DeleteV2(&files.DeleteArg{Path: o.remotePath()})
return shouldRetry(err) return shouldRetry(err)
}) })