diff --git a/backend/putio/error.go b/backend/putio/error.go new file mode 100644 index 000000000..3fdfc2f15 --- /dev/null +++ b/backend/putio/error.go @@ -0,0 +1,43 @@ +package putio + +import ( + "fmt" + "net/http" + + "github.com/putdotio/go-putio/putio" + "github.com/rclone/rclone/fs/fserrors" +) + +func checkStatusCode(resp *http.Response, expected int) error { + if resp.StatusCode != expected { + return &statusCodeError{response: resp} + } + return nil +} + +type statusCodeError struct { + response *http.Response +} + +func (e *statusCodeError) Error() string { + return fmt.Sprintf("unexpected status code (%d) response while doing %s to %s", e.response.StatusCode, e.response.Request.Method, e.response.Request.URL.String()) +} + +func (e *statusCodeError) Temporary() bool { + return e.response.StatusCode == 429 || e.response.StatusCode >= 500 +} + +// shouldRetry returns a boolean as to whether this err deserves to be +// retried. It returns the err as a convenience +func shouldRetry(err error) (bool, error) { + if err == nil { + return false, nil + } + if perr, ok := err.(*putio.ErrorResponse); ok { + err = &statusCodeError{response: perr.Response} + } + if fserrors.ShouldRetry(err) { + return true, err + } + return false, err +} diff --git a/backend/putio/fs.go b/backend/putio/fs.go index 8de5eff5a..27e73d7ca 100644 --- a/backend/putio/fs.go +++ b/backend/putio/fs.go @@ -17,7 +17,6 @@ import ( "github.com/putdotio/go-putio/putio" "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/config/configmap" - "github.com/rclone/rclone/fs/fserrors" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/dircache" "github.com/rclone/rclone/lib/oauthutil" @@ -58,23 +57,6 @@ func (f *Fs) Features() *fs.Features { return f.features } -// shouldRetry returns a boolean as to whether this err deserves to be -// retried. It returns the err as a convenience -func shouldRetry(err error) (bool, error) { - if err == nil { - return false, nil - } - if fserrors.ShouldRetry(err) { - return true, err - } - if perr, ok := err.(*putio.ErrorResponse); ok { - if perr.Response.StatusCode == 429 || perr.Response.StatusCode >= 500 { - return true, err - } - } - return false, err -} - // NewFs constructs an Fs from the path, container:path func NewFs(name, root string, m configmap.Mapper) (f fs.Fs, err error) { // defer log.Trace(name, "root=%v", root)("f=%+v, err=%v", &f, &err) @@ -318,66 +300,125 @@ func (f *Fs) createUpload(ctx context.Context, name string, size int64, parentID } func (f *Fs) sendUpload(ctx context.Context, location string, size int64, in io.Reader) (fileID int64, err error) { - // defer log.Trace(f, "location=%v, size=%v", location, size)("fileID=%v, err=%v", fileID, &err) + // defer log.Trace(f, "location=%v, size=%v", location, size)("fileID=%v, err=%v", &fileID, &err) if size == 0 { err = f.pacer.Call(func() (bool, error) { fs.Debugf(f, "Sending zero length chunk") - fileID, err = f.transferChunk(ctx, location, 0, bytes.NewReader([]byte{}), 0) + _, fileID, err = f.transferChunk(ctx, location, 0, bytes.NewReader([]byte{}), 0) return shouldRetry(err) }) return } - var start int64 + var clientOffset int64 + var offsetMismatch bool buf := make([]byte, defaultChunkSize) - for start < size { - reqSize := size - start - if reqSize >= int64(defaultChunkSize) { - reqSize = int64(defaultChunkSize) + for clientOffset < size { + chunkSize := size - clientOffset + if chunkSize >= int64(defaultChunkSize) { + chunkSize = int64(defaultChunkSize) } - chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, reqSize) + chunk := readers.NewRepeatableLimitReaderBuffer(in, buf, chunkSize) + chunkStart := clientOffset + reqSize := chunkSize + transferOffset := clientOffset + fs.Debugf(f, "chunkStart: %d, reqSize: %d", chunkStart, reqSize) // Transfer the chunk err = f.pacer.Call(func() (bool, error) { - fs.Debugf(f, "Sending chunk. start: %d length: %d", start, reqSize) - // TODO get file offset and seek to the position - fileID, err = f.transferChunk(ctx, location, start, chunk, reqSize) + if offsetMismatch { + // Get file offset and seek to the position + offset, err := f.getServerOffset(ctx, location) + if err != nil { + return shouldRetry(err) + } + sentBytes := offset - chunkStart + fs.Debugf(f, "sentBytes: %d", sentBytes) + _, err = chunk.Seek(sentBytes, io.SeekStart) + if err != nil { + return shouldRetry(err) + } + transferOffset = offset + reqSize = chunkSize - sentBytes + offsetMismatch = false + } + fs.Debugf(f, "Sending chunk. transferOffset: %d length: %d", transferOffset, reqSize) + var serverOffset int64 + serverOffset, fileID, err = f.transferChunk(ctx, location, transferOffset, chunk, reqSize) + if cerr, ok := err.(*statusCodeError); ok && cerr.response.StatusCode == 409 { + offsetMismatch = true + return true, err + } + if serverOffset != (transferOffset + reqSize) { + offsetMismatch = true + return true, errors.New("connection broken") + } return shouldRetry(err) }) if err != nil { return } - start += reqSize + clientOffset += chunkSize } return } -func (f *Fs) transferChunk(ctx context.Context, location string, start int64, chunk io.ReadSeeker, chunkSize int64) (fileID int64, err error) { - // defer log.Trace(f, "location=%v, start=%v, chunkSize=%v", location, start, chunkSize)("fileID=%v, err=%v", fileID, &err) - _, _ = chunk.Seek(0, io.SeekStart) +func (f *Fs) getServerOffset(ctx context.Context, location string) (offset int64, err error) { + // defer log.Trace(f, "location=%v", location)("offset=%v, err=%v", &offset, &err) + req, err := f.makeUploadHeadRequest(ctx, location) + if err != nil { + return 0, err + } + resp, err := f.oAuthClient.Do(req) + if err != nil { + return 0, err + } + err = checkStatusCode(resp, 200) + if err != nil { + return 0, err + } + return strconv.ParseInt(resp.Header.Get("upload-offset"), 10, 64) +} + +func (f *Fs) transferChunk(ctx context.Context, location string, start int64, chunk io.ReadSeeker, chunkSize int64) (serverOffset, fileID int64, err error) { + // defer log.Trace(f, "location=%v, start=%v, chunkSize=%v", location, start, chunkSize)("fileID=%v, err=%v", &fileID, &err) req, err := f.makeUploadPatchRequest(ctx, location, chunk, start, chunkSize) if err != nil { - return 0, err + return } - req = req.WithContext(ctx) - res, err := f.oAuthClient.Do(req) + resp, err := f.oAuthClient.Do(req) if err != nil { - return 0, err + return } defer func() { - _ = res.Body.Close() + _ = resp.Body.Close() }() - if res.StatusCode != 204 { - return 0, fmt.Errorf("unexpected status code while transferring chunk: %d", res.StatusCode) + err = checkStatusCode(resp, 204) + if err != nil { + return } - sfid := res.Header.Get("putio-file-id") + serverOffset, err = strconv.ParseInt(resp.Header.Get("upload-offset"), 10, 64) + if err != nil { + return + } + sfid := resp.Header.Get("putio-file-id") if sfid != "" { fileID, err = strconv.ParseInt(sfid, 10, 64) if err != nil { - return 0, err + return } } - return fileID, nil + return +} + +func (f *Fs) makeUploadHeadRequest(ctx context.Context, location string) (*http.Request, error) { + req, err := http.NewRequest("HEAD", location, nil) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) // go1.13 can use NewRequestWithContext + req.Header.Set("tus-resumable", "1.0.0") + return req, nil } func (f *Fs) makeUploadPatchRequest(ctx context.Context, location string, in io.Reader, offset, length int64) (*http.Request, error) {