From f5753369e45e1a2891da57e43281b931d9354cfe Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 15 Aug 2023 21:15:04 +0100 Subject: [PATCH] operations: multipart: don't buffer transfers to local disk #7056 --- fs/operations/multithread.go | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 6882f23c4..f1e1f18af 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -9,6 +9,7 @@ import ( "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/lib/multipart" "github.com/rclone/rclone/lib/readers" "golang.org/x/sync/errgroup" ) @@ -58,6 +59,7 @@ type multiThreadCopyState struct { acc *accounting.Account streams int numChunks int + noSeek bool // set if sure the receiving fs won't seek the input } // Copy a single stream into place @@ -75,8 +77,9 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ if end > mc.size { end = mc.size } + size := end - start - fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.numChunks, start, end, fs.SizeSuffix(end-start)) + fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.numChunks, start, end, fs.SizeSuffix(size)) rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1}) if err != nil { @@ -84,12 +87,27 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ } defer fs.CheckClose(rc, &err) - // FIXME NewRepeatableReader is allocating - need to be more careful with the memory allocations - // Also allocating for copy to local which doesn't need it - bytesWritten, err := writer.WriteChunk(ctx, stream, readers.NewRepeatableReader(rc)) - if err != nil { - return err + var rs io.ReadSeeker + if mc.noSeek { + // Read directly if we are sure we aren't going to seek + rs = readers.NoSeeker{Reader: rc} + } else { + // Read the chunk into buffered reader + wr := multipart.NewRW() + defer fs.CheckClose(wr, &err) + _, err = io.CopyN(wr, rc, size) + if err != nil { + return fmt.Errorf("multi-thread copy: failed to read chunk: %w", err) + } + rs = wr } + + // Write the chunk + bytesWritten, err := writer.WriteChunk(ctx, stream, rs) + if err != nil { + return fmt.Errorf("multi-thread copy: failed to write chunk: %w", err) + } + // FIXME: Wrap ReadSeeker for Accounting // However, to ensure reporting is correctly seeks have to be handled properly errAccRead := mc.acc.AccountRead(int(bytesWritten)) @@ -157,6 +175,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, partSize: chunkSize, streams: streams, numChunks: numChunks, + noSeek: f.Features().PartialUploads, } // Make accounting