2019-04-24 16:04:40 +00:00
|
|
|
package operations
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2021-11-04 10:12:57 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
2019-04-24 16:04:40 +00:00
|
|
|
"io"
|
|
|
|
|
2019-07-28 17:47:38 +00:00
|
|
|
"github.com/rclone/rclone/fs"
|
|
|
|
"github.com/rclone/rclone/fs/accounting"
|
2019-04-24 16:04:40 +00:00
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
multithreadChunkSize = 64 << 10
|
|
|
|
multithreadChunkSizeMask = multithreadChunkSize - 1
|
|
|
|
multithreadBufferSize = 32 * 1024
|
|
|
|
)
|
|
|
|
|
2019-08-12 21:09:40 +00:00
|
|
|
// Return a boolean as to whether we should use multi thread copy for
|
|
|
|
// this transfer
|
2020-11-05 11:33:32 +00:00
|
|
|
func doMultiThreadCopy(ctx context.Context, f fs.Fs, src fs.Object) bool {
|
|
|
|
ci := fs.GetConfig(ctx)
|
|
|
|
|
2019-08-12 21:09:40 +00:00
|
|
|
// Disable multi thread if...
|
|
|
|
|
|
|
|
// ...it isn't configured
|
2020-11-05 11:33:32 +00:00
|
|
|
if ci.MultiThreadStreams <= 1 {
|
2019-08-12 21:09:40 +00:00
|
|
|
return false
|
|
|
|
}
|
2023-05-09 16:40:58 +00:00
|
|
|
// ...if the source doesn't support it
|
|
|
|
if src.Fs().Features().NoMultiThreading {
|
|
|
|
return false
|
|
|
|
}
|
2019-08-12 21:09:40 +00:00
|
|
|
// ...size of object is less than cutoff
|
2020-11-05 11:33:32 +00:00
|
|
|
if src.Size() < int64(ci.MultiThreadCutoff) {
|
2019-08-12 21:09:40 +00:00
|
|
|
return false
|
|
|
|
}
|
2023-05-09 16:40:58 +00:00
|
|
|
// ...destination doesn't support it
|
2019-08-12 21:09:40 +00:00
|
|
|
dstFeatures := f.Features()
|
|
|
|
if dstFeatures.OpenWriterAt == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
// ...if --multi-thread-streams not in use and source and
|
|
|
|
// destination are both local
|
2020-11-05 11:33:32 +00:00
|
|
|
if !ci.MultiThreadSet && dstFeatures.IsLocal && src.Fs().Features().IsLocal {
|
2019-08-12 21:09:40 +00:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2019-04-24 16:04:40 +00:00
|
|
|
// state for a multi-thread copy
|
|
|
|
type multiThreadCopyState struct {
|
|
|
|
ctx context.Context
|
|
|
|
partSize int64
|
|
|
|
size int64
|
|
|
|
wc fs.WriterAtCloser
|
|
|
|
src fs.Object
|
|
|
|
acc *accounting.Account
|
|
|
|
streams int
|
|
|
|
}
|
|
|
|
|
|
|
|
// Copy a single stream into place
|
2019-06-17 08:34:30 +00:00
|
|
|
func (mc *multiThreadCopyState) copyStream(ctx context.Context, stream int) (err error) {
|
2020-11-05 11:33:32 +00:00
|
|
|
ci := fs.GetConfig(ctx)
|
2019-04-24 16:04:40 +00:00
|
|
|
defer func() {
|
|
|
|
if err != nil {
|
|
|
|
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
start := int64(stream) * mc.partSize
|
|
|
|
if start >= mc.size {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
end := start + mc.partSize
|
|
|
|
if end > mc.size {
|
|
|
|
end = mc.size
|
|
|
|
}
|
|
|
|
|
|
|
|
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start))
|
|
|
|
|
2020-11-05 11:33:32 +00:00
|
|
|
rc, err := NewReOpen(ctx, mc.src, ci.LowLevelRetries, &fs.RangeOption{Start: start, End: end - 1})
|
2019-04-24 16:04:40 +00:00
|
|
|
if err != nil {
|
2021-11-04 10:12:57 +00:00
|
|
|
return fmt.Errorf("multipart copy: failed to open source: %w", err)
|
2019-04-24 16:04:40 +00:00
|
|
|
}
|
|
|
|
defer fs.CheckClose(rc, &err)
|
|
|
|
|
|
|
|
// Copy the data
|
|
|
|
buf := make([]byte, multithreadBufferSize)
|
|
|
|
offset := start
|
|
|
|
for {
|
|
|
|
// Check if context cancelled and exit if so
|
|
|
|
if mc.ctx.Err() != nil {
|
|
|
|
return mc.ctx.Err()
|
|
|
|
}
|
|
|
|
nr, er := rc.Read(buf)
|
|
|
|
if nr > 0 {
|
|
|
|
err = mc.acc.AccountRead(nr)
|
|
|
|
if err != nil {
|
2021-11-04 10:12:57 +00:00
|
|
|
return fmt.Errorf("multipart copy: accounting failed: %w", err)
|
2019-04-24 16:04:40 +00:00
|
|
|
}
|
|
|
|
nw, ew := mc.wc.WriteAt(buf[0:nr], offset)
|
|
|
|
if nw > 0 {
|
|
|
|
offset += int64(nw)
|
|
|
|
}
|
|
|
|
if ew != nil {
|
2021-11-04 10:12:57 +00:00
|
|
|
return fmt.Errorf("multipart copy: write failed: %w", ew)
|
2019-04-24 16:04:40 +00:00
|
|
|
}
|
|
|
|
if nr != nw {
|
2021-11-04 10:12:57 +00:00
|
|
|
return fmt.Errorf("multipart copy: %w", io.ErrShortWrite)
|
2019-04-24 16:04:40 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if er != nil {
|
|
|
|
if er != io.EOF {
|
2021-11-04 10:12:57 +00:00
|
|
|
return fmt.Errorf("multipart copy: read failed: %w", er)
|
2019-04-24 16:04:40 +00:00
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if offset != end {
|
2021-11-04 10:12:57 +00:00
|
|
|
return fmt.Errorf("multipart copy: wrote %d bytes but expected to write %d", offset-start, end-start)
|
2019-04-24 16:04:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v finished", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start))
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate the chunk sizes and updated number of streams
|
|
|
|
func (mc *multiThreadCopyState) calculateChunks() {
|
|
|
|
partSize := mc.size / int64(mc.streams)
|
|
|
|
// Round partition size up so partSize * streams >= size
|
|
|
|
if (mc.size % int64(mc.streams)) != 0 {
|
|
|
|
partSize++
|
|
|
|
}
|
|
|
|
// round partSize up to nearest multithreadChunkSize boundary
|
|
|
|
mc.partSize = (partSize + multithreadChunkSizeMask) &^ multithreadChunkSizeMask
|
|
|
|
// recalculate number of streams
|
|
|
|
mc.streams = int(mc.size / mc.partSize)
|
|
|
|
// round streams up so partSize * streams >= size
|
|
|
|
if (mc.size % mc.partSize) != 0 {
|
|
|
|
mc.streams++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Copy src to (f, remote) using streams download threads and the OpenWriterAt feature
|
2019-07-16 11:56:20 +00:00
|
|
|
func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int, tr *accounting.Transfer) (newDst fs.Object, err error) {
|
2019-04-24 16:04:40 +00:00
|
|
|
openWriterAt := f.Features().OpenWriterAt
|
|
|
|
if openWriterAt == nil {
|
|
|
|
return nil, errors.New("multi-thread copy: OpenWriterAt not supported")
|
|
|
|
}
|
|
|
|
if src.Size() < 0 {
|
|
|
|
return nil, errors.New("multi-thread copy: can't copy unknown sized file")
|
|
|
|
}
|
|
|
|
if src.Size() == 0 {
|
|
|
|
return nil, errors.New("multi-thread copy: can't copy zero sized file")
|
|
|
|
}
|
|
|
|
|
2019-07-01 08:33:21 +00:00
|
|
|
g, gCtx := errgroup.WithContext(ctx)
|
2019-04-24 16:04:40 +00:00
|
|
|
mc := &multiThreadCopyState{
|
2019-07-01 08:33:21 +00:00
|
|
|
ctx: gCtx,
|
2019-04-24 16:04:40 +00:00
|
|
|
size: src.Size(),
|
|
|
|
src: src,
|
|
|
|
streams: streams,
|
|
|
|
}
|
|
|
|
mc.calculateChunks()
|
|
|
|
|
|
|
|
// Make accounting
|
2020-06-04 14:09:03 +00:00
|
|
|
mc.acc = tr.Account(ctx, nil)
|
2019-04-24 16:04:40 +00:00
|
|
|
|
|
|
|
// create write file handle
|
2019-07-01 08:33:21 +00:00
|
|
|
mc.wc, err = openWriterAt(gCtx, remote, mc.size)
|
2019-04-24 16:04:40 +00:00
|
|
|
if err != nil {
|
2021-11-04 10:12:57 +00:00
|
|
|
return nil, fmt.Errorf("multipart copy: failed to open destination: %w", err)
|
2019-04-24 16:04:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fs.Debugf(src, "Starting multi-thread copy with %d parts of size %v", mc.streams, fs.SizeSuffix(mc.partSize))
|
|
|
|
for stream := 0; stream < mc.streams; stream++ {
|
|
|
|
stream := stream
|
|
|
|
g.Go(func() (err error) {
|
2019-07-01 08:33:21 +00:00
|
|
|
return mc.copyStream(gCtx, stream)
|
2019-04-24 16:04:40 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
err = g.Wait()
|
2020-02-24 10:22:09 +00:00
|
|
|
closeErr := mc.wc.Close()
|
2019-04-24 16:04:40 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-02-24 10:22:09 +00:00
|
|
|
if closeErr != nil {
|
2021-11-04 10:12:57 +00:00
|
|
|
return nil, fmt.Errorf("multi-thread copy: failed to close object after copy: %w", closeErr)
|
2020-02-24 10:22:09 +00:00
|
|
|
}
|
2019-04-24 16:04:40 +00:00
|
|
|
|
2019-06-17 08:34:30 +00:00
|
|
|
obj, err := f.NewObject(ctx, remote)
|
2019-04-24 16:04:40 +00:00
|
|
|
if err != nil {
|
2021-11-04 10:12:57 +00:00
|
|
|
return nil, fmt.Errorf("multi-thread copy: failed to find object after copy: %w", err)
|
2019-04-24 16:04:40 +00:00
|
|
|
}
|
|
|
|
|
2019-06-17 08:34:30 +00:00
|
|
|
err = obj.SetModTime(ctx, src.ModTime(ctx))
|
2019-04-24 16:04:40 +00:00
|
|
|
switch err {
|
|
|
|
case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete:
|
|
|
|
default:
|
2021-11-04 10:12:57 +00:00
|
|
|
return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err)
|
2019-04-24 16:04:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fs.Debugf(src, "Finished multi-thread copy with %d parts of size %v", mc.streams, fs.SizeSuffix(mc.partSize))
|
|
|
|
return obj, nil
|
|
|
|
}
|