operations: rework rcat so that it doesn't call the --metadata-mapper twice

The --metadata-mapper was being called twice for files that rclone
needed to stream to disk,

This happened only for:
- files bigger than --upload-streaming-cutoff
- on backends which didn't support PutStream

This also meant that these were being logged as two transfers which
was a little strange.

This fixes the problem by not using operations.Copy to upload the file
once it has been streamed to disk, instead using the Put method on the
backend.

This should have no effect on reliability of the transfers as we retry
Put if possible.

This also tidies up the Rcat function to make the different ways of
uploading the data clearer and make it easy to see that it gets
verified on all those paths.

See #7848
This commit is contained in:
Nick Craig-Wood 2024-05-16 12:18:00 +01:00
parent faa58315c5
commit a5700a4a53
2 changed files with 93 additions and 72 deletions

View file

@ -36,6 +36,7 @@ import (
"github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/errcount" "github.com/rclone/rclone/lib/errcount"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/random"
"github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -719,13 +720,18 @@ func Retry(ctx context.Context, o interface{}, maxTries int, fn func() error) (e
if err == nil { if err == nil {
break break
} }
// Retry if err returned a retry error // End if ctx is in error
if fserrors.ContextError(ctx, &err) { if fserrors.ContextError(ctx, &err) {
break break
} }
// Retry if err returned a retry error
if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) { if fserrors.IsRetryError(err) || fserrors.ShouldRetry(err) {
fs.Debugf(o, "Received error: %v - low level retry %d/%d", err, tries, maxTries) fs.Debugf(o, "Received error: %v - low level retry %d/%d", err, tries, maxTries)
continue continue
} else if t, ok := pacer.IsRetryAfter(err); ok {
fs.Debugf(o, "Sleeping for %v (as indicated by the server) to obey Retry-After error: %v", t, err)
time.Sleep(t)
continue
} }
break break
} }
@ -1269,22 +1275,32 @@ func Cat(ctx context.Context, f fs.Fs, w io.Writer, offset, count int64, sep []b
} }
// Rcat reads data from the Reader until EOF and uploads it to a file on remote // Rcat reads data from the Reader until EOF and uploads it to a file on remote
//
// in is closed at the end of the transfer
func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) { func Rcat(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata) (dst fs.Object, err error) {
return rcatSrc(ctx, fdst, dstFileName, in, modTime, meta, nil) return rcatSrc(ctx, fdst, dstFileName, in, modTime, meta, nil)
} }
// rcatSrc reads data from the Reader until EOF and uploads it to a file on remote // rcatSrc reads data from the Reader until EOF and uploads it to a file on remote
// //
// in is closed at the end of the transfer
//
// Pass in fsrc if known or nil if not // Pass in fsrc if known or nil if not
func rcatSrc(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata, fsrc fs.Fs) (dst fs.Object, err error) { func rcatSrc(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadCloser, modTime time.Time, meta fs.Metadata, fsrc fs.Fs) (dst fs.Object, err error) {
if SkipDestructive(ctx, dstFileName, "upload from pipe") {
// prevents "broken pipe" errors
_, err = io.Copy(io.Discard, in)
return nil, err
}
ci := fs.GetConfig(ctx) ci := fs.GetConfig(ctx)
tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1, nil, fdst) tr := accounting.Stats(ctx).NewTransferRemoteSize(dstFileName, -1, nil, fdst)
defer func() { defer func() {
tr.Done(ctx, err) tr.Done(ctx, err)
}() }()
in = tr.Account(ctx, in).WithBuffer() var streamIn io.Reader = tr.Account(ctx, in).WithBuffer()
readCounter := readers.NewCountingReader(in) readCounter := readers.NewCountingReader(streamIn)
var trackingIn io.Reader var trackingIn io.Reader
var hasher *hash.MultiHasher var hasher *hash.MultiHasher
var options []fs.OpenOption var options []fs.OpenOption
@ -1307,86 +1323,90 @@ func rcatSrc(ctx context.Context, fdst fs.Fs, dstFileName string, in io.ReadClos
options = append(options, fs.MetadataOption(ci.MetadataSet)) options = append(options, fs.MetadataOption(ci.MetadataSet))
} }
compare := func(dst fs.Object) error { // get the sums from the hasher if in use, or nil
var sums map[hash.Type]string getSums := func() (sums map[hash.Type]string) {
opt := defaultEqualOpt(ctx)
if hasher != nil { if hasher != nil {
// force --checksum on if we have hashes
opt.checkSum = true
sums = hasher.Sums() sums = hasher.Sums()
} }
src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst).WithMetadata(meta) return sums
if !equal(ctx, src, dst, opt) {
err = fmt.Errorf("corrupted on transfer")
err = fs.CountError(err)
fs.Errorf(dst, "%v", err)
return err
}
return nil
} }
// check if file small enough for direct upload // Read the start of the input and check if it is small enough for direct upload
buf := make([]byte, ci.StreamingUploadCutoff) buf := make([]byte, ci.StreamingUploadCutoff)
fileIsSmall := false
if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF { if n, err := io.ReadFull(trackingIn, buf); err == io.EOF || err == io.ErrUnexpectedEOF {
fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", n) fileIsSmall = true
src := object.NewMemoryObject(dstFileName, modTime, buf[:n]).WithMetadata(meta).SetFs(fsrc) buf = buf[:n]
return Copy(ctx, fdst, nil, dstFileName, src)
} }
// Make a new ReadCloser with the bits we've already read // Read the data we have already read in buf and any further unread
in = &readCloser{ streamIn = io.MultiReader(bytes.NewReader(buf), trackingIn)
Reader: io.MultiReader(bytes.NewReader(buf), trackingIn),
Closer: in,
}
fStreamTo := fdst doPutStream := fdst.Features().PutStream
canStream := fdst.Features().PutStream != nil
if !canStream { // Upload the input
fs.Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file") if fileIsSmall || doPutStream == nil {
tmpLocalFs, err := fs.TemporaryLocalFs(ctx) var rs io.ReadSeeker
if err != nil { if fileIsSmall {
return nil, fmt.Errorf("failed to create temporary local FS to spool file: %w", err) fs.Debugf(fdst, "File to upload is small (%d bytes), uploading instead of streaming", len(buf))
} rs = bytes.NewReader(buf)
defer func() { } else {
err := Purge(ctx, tmpLocalFs, "") fs.Debugf(fdst, "Target remote doesn't support streaming uploads, creating temporary local FS to spool file")
spool, err := os.CreateTemp("", "rclone-spool")
if err != nil { if err != nil {
fs.Infof(tmpLocalFs, "Failed to cleanup temporary FS: %v", err) return nil, fmt.Errorf("failed to create temporary spool file: %v", err)
} }
}() fileName := spool.Name()
fStreamTo = tmpLocalFs defer func() {
} err := spool.Close()
if err != nil {
if SkipDestructive(ctx, dstFileName, "upload from pipe") { fs.Errorf(fileName, "Failed to close temporary spool file: %v", err)
// prevents "broken pipe" errors }
_, err = io.Copy(io.Discard, in) err = os.Remove(fileName)
return nil, err if err != nil {
} fs.Errorf(fileName, "Failed to delete temporary spool file: %v", err)
}
objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, fsrc).WithMetadata(meta) }()
if dst, err = fStreamTo.Features().PutStream(ctx, in, objInfo, options...); err != nil { _, err = io.Copy(spool, streamIn)
return dst, err if err != nil {
} return nil, fmt.Errorf("failed to copy to temporary spool file: %v", err)
if err = compare(dst); err != nil {
return dst, err
}
if !canStream {
// copy dst (which is the local object we have just streamed to) to the remote
newCtx := ctx
if ci.Metadata && len(meta) != 0 {
// If we have metadata and we are setting it then use
// the --metadataset mechanism to supply it to Copy
var newCi *fs.ConfigInfo
newCtx, newCi = fs.AddConfig(ctx)
if len(newCi.MetadataSet) == 0 {
newCi.MetadataSet = meta
} else {
var newMeta fs.Metadata
newMeta.Merge(meta)
newMeta.Merge(newCi.MetadataSet) // --metadata-set takes priority
newCi.MetadataSet = newMeta
} }
rs = spool
} }
return Copy(newCtx, fdst, nil, dstFileName, dst) // Upload with Put with retries - since we have downloaded the file we know the size, and the hashes
sums := getSums()
size := int64(readCounter.BytesRead())
objInfo := object.NewStaticObjectInfo(dstFileName, modTime, size, false, sums, fsrc).WithMetadata(meta)
err = Retry(ctx, objInfo, ci.LowLevelRetries, func() error {
_, err = rs.Seek(0, io.SeekStart)
if err != nil {
return fmt.Errorf("failed to rewind temporary spool file: %v", err)
}
dst, err = fdst.Put(ctx, rs, objInfo, options...)
return err
})
} else {
// Upload with PutStream with no retries
objInfo := object.NewStaticObjectInfo(dstFileName, modTime, -1, false, nil, fsrc).WithMetadata(meta)
dst, err = doPutStream(ctx, streamIn, objInfo, options...)
}
if err != nil {
return dst, err
}
// Check transfer
sums := getSums()
opt := defaultEqualOpt(ctx)
if sums != nil {
// force --checksum on if we have hashes
opt.checkSum = true
}
src := object.NewStaticObjectInfo(dstFileName, modTime, int64(readCounter.BytesRead()), false, sums, fdst).WithMetadata(meta)
if !equal(ctx, src, dst, opt) {
err = fmt.Errorf("corrupted on transfer")
err = fs.CountError(err)
fs.Errorf(dst, "%v", err)
return dst, err
} }
return dst, nil return dst, nil
} }

View file

@ -42,6 +42,7 @@ import (
"github.com/rclone/rclone/fs/operations" "github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/fstest/fstests" "github.com/rclone/rclone/fstest/fstests"
"github.com/rclone/rclone/lib/pacer"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/text/cases" "golang.org/x/text/cases"
@ -504,12 +505,12 @@ func TestRetry(t *testing.T) {
return err return err
} }
i, err = 3, io.EOF i, err = 3, fmt.Errorf("Wrapped EOF is retriable: %w", io.EOF)
assert.Equal(t, nil, operations.Retry(ctx, nil, 5, fn)) assert.Equal(t, nil, operations.Retry(ctx, nil, 5, fn))
assert.Equal(t, 0, i) assert.Equal(t, 0, i)
i, err = 10, io.EOF i, err = 10, pacer.RetryAfterError(errors.New("BANG"), 10*time.Millisecond)
assert.Equal(t, io.EOF, operations.Retry(ctx, nil, 5, fn)) assert.Equal(t, err, operations.Retry(ctx, nil, 5, fn))
assert.Equal(t, 5, i) assert.Equal(t, 5, i)
i, err = 10, fs.ErrorObjectNotFound i, err = 10, fs.ErrorObjectNotFound