operations: multipart: don't buffer transfers to local disk #7056

This commit is contained in:
Nick Craig-Wood 2023-08-15 21:15:04 +01:00
parent 4c76fac594
commit f5753369e4

View file

@ -9,6 +9,7 @@ import (
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/readers"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -58,6 +59,7 @@ type multiThreadCopyState struct {
acc *accounting.Account acc *accounting.Account
streams int streams int
numChunks int numChunks int
noSeek bool // set if sure the receiving fs won't seek the input
} }
// Copy a single stream into place // Copy a single stream into place
@ -75,8 +77,9 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ
if end > mc.size { if end > mc.size {
end = mc.size end = mc.size
} }
size := end - start
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.numChunks, start, end, fs.SizeSuffix(end-start)) fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.numChunks, start, end, fs.SizeSuffix(size))
rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1}) rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1})
if err != nil { if err != nil {
@ -84,12 +87,27 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ
} }
defer fs.CheckClose(rc, &err) defer fs.CheckClose(rc, &err)
// FIXME NewRepeatableReader is allocating - need to be more careful with the memory allocations var rs io.ReadSeeker
// Also allocating for copy to local which doesn't need it if mc.noSeek {
bytesWritten, err := writer.WriteChunk(ctx, stream, readers.NewRepeatableReader(rc)) // Read directly if we are sure we aren't going to seek
rs = readers.NoSeeker{Reader: rc}
} else {
// Read the chunk into buffered reader
wr := multipart.NewRW()
defer fs.CheckClose(wr, &err)
_, err = io.CopyN(wr, rc, size)
if err != nil { if err != nil {
return err return fmt.Errorf("multi-thread copy: failed to read chunk: %w", err)
} }
rs = wr
}
// Write the chunk
bytesWritten, err := writer.WriteChunk(ctx, stream, rs)
if err != nil {
return fmt.Errorf("multi-thread copy: failed to write chunk: %w", err)
}
// FIXME: Wrap ReadSeeker for Accounting // FIXME: Wrap ReadSeeker for Accounting
// However, to ensure reporting is correctly seeks have to be handled properly // However, to ensure reporting is correctly seeks have to be handled properly
errAccRead := mc.acc.AccountRead(int(bytesWritten)) errAccRead := mc.acc.AccountRead(int(bytesWritten))
@ -157,6 +175,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
partSize: chunkSize, partSize: chunkSize,
streams: streams, streams: streams,
numChunks: numChunks, numChunks: numChunks,
noSeek: f.Features().PartialUploads,
} }
// Make accounting // Make accounting