putio: add ability to resume uploads

This commit is contained in:
Cenk Alti 2019-10-12 15:29:02 +03:00 committed by Nick Craig-Wood
parent c526bdb579
commit 929f275ae5
2 changed files with 127 additions and 43 deletions

43
backend/putio/error.go Normal file
View file

@ -0,0 +1,43 @@
package putio
import (
"fmt"
"net/http"
"github.com/putdotio/go-putio/putio"
"github.com/rclone/rclone/fs/fserrors"
)
func checkStatusCode(resp *http.Response, expected int) error {
if resp.StatusCode != expected {
return &statusCodeError{response: resp}
}
return nil
}
type statusCodeError struct {
response *http.Response
}
func (e *statusCodeError) Error() string {
return fmt.Sprintf("unexpected status code (%d) response while doing %s to %s", e.response.StatusCode, e.response.Request.Method, e.response.Request.URL.String())
}
func (e *statusCodeError) Temporary() bool {
return e.response.StatusCode == 429 || e.response.StatusCode >= 500
}
// shouldRetry returns a boolean as to whether this err deserves to be
// retried. It returns the err as a convenience
func shouldRetry(err error) (bool, error) {
if err == nil {
return false, nil
}
if perr, ok := err.(*putio.ErrorResponse); ok {
err = &statusCodeError{response: perr.Response}
}
if fserrors.ShouldRetry(err) {
return true, err
}
return false, err
}

View file

@ -17,7 +17,6 @@ import (
"github.com/putdotio/go-putio/putio"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/fserrors"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/dircache"
"github.com/rclone/rclone/lib/oauthutil"
@ -58,23 +57,6 @@ func (f *Fs) Features() *fs.Features {
return f.features
}
// shouldRetry returns a boolean as to whether this err deserves to be
// retried. It returns the err as a convenience
func shouldRetry(err error) (bool, error) {
if err == nil {
return false, nil
}
if fserrors.ShouldRetry(err) {
return true, err
}
if perr, ok := err.(*putio.ErrorResponse); ok {
if perr.Response.StatusCode == 429 || perr.Response.StatusCode >= 500 {
return true, err
}
}
return false, err
}
// NewFs constructs an Fs from the path, container:path
func NewFs(name, root string, m configmap.Mapper) (f fs.Fs, err error) {
// defer log.Trace(name, "root=%v", root)("f=%+v, err=%v", &f, &err)
@ -318,66 +300,125 @@ func (f *Fs) createUpload(ctx context.Context, name string, size int64, parentID
}
func (f *Fs) sendUpload(ctx context.Context, location string, size int64, in io.Reader) (fileID int64, err error) {
// defer log.Trace(f, "location=%v, size=%v", location, size)("fileID=%v, err=%v", fileID, &err)
// defer log.Trace(f, "location=%v, size=%v", location, size)("fileID=%v, err=%v", &fileID, &err)
if size == 0 {
err = f.pacer.Call(func() (bool, error) {
fs.Debugf(f, "Sending zero length chunk")
fileID, err = f.transferChunk(ctx, location, 0, bytes.NewReader([]byte{}), 0)
_, fileID, err = f.transferChunk(ctx, location, 0, bytes.NewReader([]byte{}), 0)
return shouldRetry(err)
})
return
}
var start int64
var clientOffset int64
var offsetMismatch bool
buf := make([]byte, defaultChunkSize)
for start < size {
reqSize := size - start
if reqSize >= int64(defaultChunkSize) {
reqSize = int64(defaultChunkSize)
for clientOffset < size {
chunkSize := size - clientOffset
if chunkSize >= int64(defaultChunkSize) {
chunkSize = int64(defaultChunkSize)
}
chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, reqSize)
chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, chunkSize)
chunkStart := clientOffset
reqSize := chunkSize
transferOffset := clientOffset
fs.Debugf(f, "chunkStart: %d, reqSize: %d", chunkStart, reqSize)
// Transfer the chunk
err = f.pacer.Call(func() (bool, error) {
fs.Debugf(f, "Sending chunk. start: %d length: %d", start, reqSize)
// TODO get file offset and seek to the position
fileID, err = f.transferChunk(ctx, location, start, chunk, reqSize)
if offsetMismatch {
// Get file offset and seek to the position
offset, err := f.getServerOffset(ctx, location)
if err != nil {
return shouldRetry(err)
}
sentBytes := offset - chunkStart
fs.Debugf(f, "sentBytes: %d", sentBytes)
_, err = chunk.Seek(sentBytes, io.SeekStart)
if err != nil {
return shouldRetry(err)
}
transferOffset = offset
reqSize = chunkSize - sentBytes
offsetMismatch = false
}
fs.Debugf(f, "Sending chunk. transferOffset: %d length: %d", transferOffset, reqSize)
var serverOffset int64
serverOffset, fileID, err = f.transferChunk(ctx, location, transferOffset, chunk, reqSize)
if cerr, ok := err.(*statusCodeError); ok && cerr.response.StatusCode == 409 {
offsetMismatch = true
return true, err
}
if serverOffset != (transferOffset + reqSize) {
offsetMismatch = true
return true, errors.New("connection broken")
}
return shouldRetry(err)
})
if err != nil {
return
}
start += reqSize
clientOffset += chunkSize
}
return
}
func (f *Fs) transferChunk(ctx context.Context, location string, start int64, chunk io.ReadSeeker, chunkSize int64) (fileID int64, err error) {
// defer log.Trace(f, "location=%v, start=%v, chunkSize=%v", location, start, chunkSize)("fileID=%v, err=%v", fileID, &err)
_, _ = chunk.Seek(0, io.SeekStart)
func (f *Fs) getServerOffset(ctx context.Context, location string) (offset int64, err error) {
// defer log.Trace(f, "location=%v", location)("offset=%v, err=%v", &offset, &err)
req, err := f.makeUploadHeadRequest(ctx, location)
if err != nil {
return 0, err
}
resp, err := f.oAuthClient.Do(req)
if err != nil {
return 0, err
}
err = checkStatusCode(resp, 200)
if err != nil {
return 0, err
}
return strconv.ParseInt(resp.Header.Get("upload-offset"), 10, 64)
}
func (f *Fs) transferChunk(ctx context.Context, location string, start int64, chunk io.ReadSeeker, chunkSize int64) (serverOffset, fileID int64, err error) {
// defer log.Trace(f, "location=%v, start=%v, chunkSize=%v", location, start, chunkSize)("fileID=%v, err=%v", &fileID, &err)
req, err := f.makeUploadPatchRequest(ctx, location, chunk, start, chunkSize)
if err != nil {
return 0, err
return
}
req = req.WithContext(ctx)
res, err := f.oAuthClient.Do(req)
resp, err := f.oAuthClient.Do(req)
if err != nil {
return 0, err
return
}
defer func() {
_ = res.Body.Close()
_ = resp.Body.Close()
}()
if res.StatusCode != 204 {
return 0, fmt.Errorf("unexpected status code while transferring chunk: %d", res.StatusCode)
err = checkStatusCode(resp, 204)
if err != nil {
return
}
sfid := res.Header.Get("putio-file-id")
serverOffset, err = strconv.ParseInt(resp.Header.Get("upload-offset"), 10, 64)
if err != nil {
return
}
sfid := resp.Header.Get("putio-file-id")
if sfid != "" {
fileID, err = strconv.ParseInt(sfid, 10, 64)
if err != nil {
return 0, err
return
}
}
return fileID, nil
return
}
func (f *Fs) makeUploadHeadRequest(ctx context.Context, location string) (*http.Request, error) {
req, err := http.NewRequest("HEAD", location, nil)
if err != nil {
return nil, err
}
req = req.WithContext(ctx) // go1.13 can use NewRequestWithContext
req.Header.Set("tus-resumable", "1.0.0")
return req, nil
}
func (f *Fs) makeUploadPatchRequest(ctx context.Context, location string, in io.Reader, offset, length int64) (*http.Request, error) {