diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index f675588bc..a4a357e13 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -11,34 +11,12 @@ import ( "github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/lib/readers" "golang.org/x/sync/errgroup" - "golang.org/x/sync/semaphore" ) const ( multithreadChunkSize = 64 << 10 ) -// An offsetWriter maps writes at offset base to offset base+off in the underlying writer. -// -// Modified from the go source code. Can be replaced with -// io.OffsetWriter when we no longer need to support go1.19 -type offsetWriter struct { - w io.WriterAt - off int64 // the current offset -} - -// newOffsetWriter returns an offsetWriter that writes to w -// starting at offset off. -func newOffsetWriter(w io.WriterAt, off int64) *offsetWriter { - return &offsetWriter{w, off} -} - -func (o *offsetWriter) Write(p []byte) (n int, err error) { - n, err = o.w.WriteAt(p, o.off) - o.off += int64(n) - return -} - // Return a boolean as to whether we should use multi thread copy for // this transfer func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool { @@ -102,10 +80,12 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ rc, err := Open(ctx, mc.src, &fs.RangeOption{Start: start, End: end - 1}) if err != nil { - return fmt.Errorf("multipart copy: failed to open source: %w", err) + return fmt.Errorf("multi-thread copy: failed to open source: %w", err) } defer fs.CheckClose(rc, &err) + // FIXME NewRepeatableReader is allocating - need to be more careful with the memory allocations + // Also allocating for copy to local which doesn't need it bytesWritten, err := writer.WriteChunk(stream, readers.NewRepeatableReader(rc)) if err != nil { return err @@ -128,7 +108,6 @@ func calculateNumChunks(size int64, chunkSize int64) int { if size%chunkSize != 0 { numChunks++ } - return int(numChunks) } @@ -140,7 +119,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, if openChunkWriter == nil { openWriterAt := f.Features().OpenWriterAt if openWriterAt == nil { - return nil, errors.New("multi-part copy: neither OpenChunkWriter nor OpenWriterAt supported") + return nil, errors.New("multi-thread copy: neither OpenChunkWriter nor OpenWriterAt supported") } openChunkWriter = openChunkWriterFromOpenWriterAt(openWriterAt, int64(ci.MultiThreadChunkSize), int64(ci.MultiThreadWriteBufferSize), f) } @@ -153,7 +132,12 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, } g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(streams) + chunkSize, chunkWriter, err := openChunkWriter(ctx, remote, src) + if err != nil { + return nil, fmt.Errorf("multi-thread copy: failed to open chunk writer: %w", err) + } if chunkSize > src.Size() { fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(chunkSize), fs.SizeSuffix(src.Size())) @@ -175,25 +159,18 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, numChunks: numChunks, } - if err != nil { - return nil, fmt.Errorf("multipart copy: failed to open chunk writer: %w", err) - } - // Make accounting mc.acc = tr.Account(ctx, nil) fs.Debugf(src, "Starting multi-thread copy with %d parts of size %v with %v parallel streams", mc.numChunks, fs.SizeSuffix(mc.partSize), mc.streams) - sem := semaphore.NewWeighted(int64(mc.streams)) for chunk := 0; chunk < mc.numChunks; chunk++ { - fs.Debugf(src, "Acquiring semaphore...") - if err := sem.Acquire(ctx, 1); err != nil { - fs.Errorf(src, "Failed to acquire semaphore: %v", err) + // Fail fast, in case an errgroup managed function returns an error + if gCtx.Err() != nil { break } - currChunk := chunk - g.Go(func() (err error) { - defer sem.Release(1) - return mc.copyStream(gCtx, currChunk, chunkWriter) + chunk := chunk + g.Go(func() error { + return mc.copyStream(gCtx, chunk, chunkWriter) }) } @@ -224,6 +201,28 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, return obj, nil } +// An offsetWriter maps writes at offset base to offset base+off in the underlying writer. +// +// Modified from the go source code. Can be replaced with +// io.OffsetWriter when we no longer need to support go1.19 +type offsetWriter struct { + w io.WriterAt + off int64 // the current offset +} + +// newOffsetWriter returns an offsetWriter that writes to w +// starting at offset off. +func newOffsetWriter(w io.WriterAt, off int64) *offsetWriter { + return &offsetWriter{w, off} +} + +func (o *offsetWriter) Write(p []byte) (n int, err error) { + n, err = o.w.WriteAt(p, o.off) + o.off += int64(n) + return +} + +// writerAtChunkWriter converts a WriterAtCloser into a ChunkWriter type writerAtChunkWriter struct { ctx context.Context remote string @@ -235,6 +234,7 @@ type writerAtChunkWriter struct { f fs.Fs } +// WriteChunk writes chunkNumber from reader func (w writerAtChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) { fs.Debugf(w.remote, "writing chunk %v", chunkNumber) @@ -254,21 +254,23 @@ func (w writerAtChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) ( if n != bytesToWrite { return -1, fmt.Errorf("expected to write %v bytes for chunk %v, but wrote %v bytes", bytesToWrite, chunkNumber, n) } - // if we were buffering, flush do disk + // if we were buffering, flush to disk switch w := writer.(type) { case *bufio.Writer: er2 := w.Flush() if er2 != nil { - return -1, fmt.Errorf("multipart copy: flush failed: %w", err) + return -1, fmt.Errorf("multi-thread copy: flush failed: %w", err) } } return n, nil } +// Close the chunk writing func (w writerAtChunkWriter) Close() error { return w.writerAt.Close() } +// Abort the chunk writing func (w writerAtChunkWriter) Abort() error { obj, err := w.f.NewObject(w.ctx, w.remote) if err != nil {