diff --git a/fs/operations/operations.go b/fs/operations/operations.go index 59877bcc8..37dc4246d 100644 --- a/fs/operations/operations.go +++ b/fs/operations/operations.go @@ -36,6 +36,7 @@ import ( "github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/errcount" + "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/readers" "golang.org/x/sync/errgroup" @@ -719,13 +720,18 @@ func Retry(ctx context.Context, o interface{}, maxTries int, fn func() error) (e if err == nil { break } - // Retry if err returned a retry error + // End if ctx is in error if fserrors.ContextError(ctx, &err) { break } + // Retry if err returned a retry error if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) { fs.Debugf(o, "Received error: %v - low level retry %d/%d", err, tries, maxTries) continue + } else if t, ok := pacer.IsRetryAfter(err); ok { + fs.Debugf(o, "Sleeping for %v (as indicated by the server) to obey Retry-After error: %v", t, err) + time.Sleep(t) + continue } break } @@ -1269,22 +1275,32 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b } // Rcat reads data from the Reader until EOF and uploads it to a file on remote +// +// in is closed at the end of the transfer func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) { return rcatSrc(ctx, fdst, dstFileName, in, modTime, meta, nil) } // rcatSrc reads data from the Reader until EOF and uploads it to a file on remote // +// in is closed at the end of the transfer +// // Pass in fsrc if known or nil if not func rcatSrc(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata, fsrc fs.Fs) (dst fs.Object, err error) { + if SkipDestructive(ctx, dstFileName, "upload from pipe") { + // prevents "broken pipe" errors + _, err = io.Copy(io.Discard, in) + return nil, err + } + ci := fs.GetConfig(ctx) tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1, nil, fdst) defer func() { tr.Done(ctx, err) }() - in = tr.Account(ctx, in).WithBuffer() + var streamIn io.Reader = tr.Account(ctx, in).WithBuffer() - readCounter := readers.NewCountingReader(in) + readCounter := readers.NewCountingReader(streamIn) var trackingIn io.Reader var hasher *hash.MultiHasher var options []fs.OpenOption @@ -1307,86 +1323,90 @@ func rcatSrc(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClos options = append(options, fs.MetadataOption(ci.MetadataSet)) } - compare := func(dst fs.Object) error { - var sums map[hash.Type]string - opt := defaultEqualOpt(ctx) + // get the sums from the hasher if in use, or nil + getSums := func() (sums map[hash.Type]string) { if hasher != nil { - // force --checksum on if we have hashes - opt.checkSum = true sums = hasher.Sums() } - src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst).WithMetadata(meta) - if !equal(ctx, src, dst, opt) { - err = fmt.Errorf("corrupted on transfer") - err = fs.CountError(err) - fs.Errorf(dst, "%v", err) - return err - } - return nil + return sums } - // check if file small enough for direct upload + // Read the start of the input and check if it is small enough for direct upload buf := make([]byte, ci.StreamingUploadCutoff) + fileIsSmall := false if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF { - fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n) - src := object.NewMemoryObject(dstFileName, modTime, buf[:n]).WithMetadata(meta).SetFs(fsrc) - return Copy(ctx, fdst, nil, dstFileName, src) + fileIsSmall = true + buf = buf[:n] } - // Make a new ReadCloser with the bits we've already read - in = &readCloser{ - Reader: io.MultiReader(bytes.NewReader(buf), trackingIn), - Closer: in, - } + // Read the data we have already read in buf and any further unread + streamIn = io.MultiReader(bytes.NewReader(buf), trackingIn) - fStreamTo := fdst - canStream := fdst.Features().PutStream != nil - if !canStream { - fs.Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file") - tmpLocalFs, err := fs.TemporaryLocalFs(ctx) - if err != nil { - return nil, fmt.Errorf("failed to create temporary local FS to spool file: %w", err) - } - defer func() { - err := Purge(ctx, tmpLocalFs, "") + doPutStream := fdst.Features().PutStream + + // Upload the input + if fileIsSmall || doPutStream == nil { + var rs io.ReadSeeker + if fileIsSmall { + fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", len(buf)) + rs = bytes.NewReader(buf) + } else { + fs.Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file") + spool, err := os.CreateTemp("", "rclone-spool") if err != nil { - fs.Infof(tmpLocalFs, "Failed to cleanup temporary FS: %v", err) + return nil, fmt.Errorf("failed to create temporary spool file: %v", err) } - }() - fStreamTo = tmpLocalFs - } - - if SkipDestructive(ctx, dstFileName, "upload from pipe") { - // prevents "broken pipe" errors - _, err = io.Copy(io.Discard, in) - return nil, err - } - - objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, fsrc).WithMetadata(meta) - if dst, err = fStreamTo.Features().PutStream(ctx, in, objInfo, options...); err != nil { - return dst, err - } - if err = compare(dst); err != nil { - return dst, err - } - if !canStream { - // copy dst (which is the local object we have just streamed to) to the remote - newCtx := ctx - if ci.Metadata && len(meta) != 0 { - // If we have metadata and we are setting it then use - // the --metadataset mechanism to supply it to Copy - var newCi *fs.ConfigInfo - newCtx, newCi = fs.AddConfig(ctx) - if len(newCi.MetadataSet) == 0 { - newCi.MetadataSet = meta - } else { - var newMeta fs.Metadata - newMeta.Merge(meta) - newMeta.Merge(newCi.MetadataSet) // --metadata-set takes priority - newCi.MetadataSet = newMeta + fileName := spool.Name() + defer func() { + err := spool.Close() + if err != nil { + fs.Errorf(fileName, "Failed to close temporary spool file: %v", err) + } + err = os.Remove(fileName) + if err != nil { + fs.Errorf(fileName, "Failed to delete temporary spool file: %v", err) + } + }() + _, err = io.Copy(spool, streamIn) + if err != nil { + return nil, fmt.Errorf("failed to copy to temporary spool file: %v", err) } + rs = spool } - return Copy(newCtx, fdst, nil, dstFileName, dst) + // Upload with Put with retries - since we have downloaded the file we know the size, and the hashes + sums := getSums() + size := int64(readCounter.BytesRead()) + objInfo := object.NewStaticObjectInfo(dstFileName, modTime, size, false, sums, fsrc).WithMetadata(meta) + err = Retry(ctx, objInfo, ci.LowLevelRetries, func() error { + _, err = rs.Seek(0, io.SeekStart) + if err != nil { + return fmt.Errorf("failed to rewind temporary spool file: %v", err) + } + dst, err = fdst.Put(ctx, rs, objInfo, options...) + return err + }) + } else { + // Upload with PutStream with no retries + objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, fsrc).WithMetadata(meta) + dst, err = doPutStream(ctx, streamIn, objInfo, options...) + } + if err != nil { + return dst, err + } + + // Check transfer + sums := getSums() + opt := defaultEqualOpt(ctx) + if sums != nil { + // force --checksum on if we have hashes + opt.checkSum = true + } + src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst).WithMetadata(meta) + if !equal(ctx, src, dst, opt) { + err = fmt.Errorf("corrupted on transfer") + err = fs.CountError(err) + fs.Errorf(dst, "%v", err) + return dst, err } return dst, nil } diff --git a/fs/operations/operations_test.go b/fs/operations/operations_test.go index d14678f13..3e66bb33a 100644 --- a/fs/operations/operations_test.go +++ b/fs/operations/operations_test.go @@ -42,6 +42,7 @@ import ( "github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/fstests" + "github.com/rclone/rclone/lib/pacer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/text/cases" @@ -504,12 +505,12 @@ func TestRetry(t *testing.T) { return err } - i, err = 3, io.EOF + i, err = 3, fmt.Errorf("Wrapped EOF is retriable: %w", io.EOF) assert.Equal(t, nil, operations.Retry(ctx, nil, 5, fn)) assert.Equal(t, 0, i) - i, err = 10, io.EOF - assert.Equal(t, io.EOF, operations.Retry(ctx, nil, 5, fn)) + i, err = 10, pacer.RetryAfterError(errors.New("BANG"), 10*time.Millisecond) + assert.Equal(t, err, operations.Retry(ctx, nil, 5, fn)) assert.Equal(t, 5, i) i, err = 10, fs.ErrorObjectNotFound