diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index a780bfd15..a2f73d7a4 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -91,16 +91,19 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ var rs io.ReadSeeker if mc.noSeek { // Read directly if we are sure we aren't going to seek - rs = readers.NoSeeker{Reader: rc} + // and account with accounting + rs = readers.NoSeeker{Reader: mc.acc.WrapStream(rc)} } else { // Read the chunk into buffered reader - wr := multipart.NewRW() - defer fs.CheckClose(wr, &err) - _, err = io.CopyN(wr, rc, size) + rw := multipart.NewRW() + defer fs.CheckClose(rw, &err) + _, err = io.CopyN(rw, rc, size) if err != nil { return fmt.Errorf("multi-thread copy: failed to read chunk: %w", err) } - rs = wr + // Account as we go + rw.SetAccounting(mc.acc.AccountRead) + rs = rw } // Write the chunk @@ -109,13 +112,6 @@ func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int, writ return fmt.Errorf("multi-thread copy: failed to write chunk: %w", err) } - // FIXME: Wrap ReadSeeker for Accounting - // However, to ensure reporting is correctly seeks have to be handled properly - errAccRead := mc.acc.AccountRead(int(bytesWritten)) - if errAccRead != nil { - return errAccRead - } - fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v finished", stream+1, mc.numChunks, start, end, fs.SizeSuffix(bytesWritten)) return nil }