operations: enable multi threaded downloads - Fixes #2252

This implements the --multi-thread-cutoff and --multi-thread-streams
flags to control multi thread downloading to the local backend.
This commit is contained in:
Nick Craig-Wood 2019-04-24 17:04:40 +01:00
parent 687cbf3ded
commit 7d70e92664
8 changed files with 340 additions and 30 deletions

View file

@ -73,6 +73,7 @@ Please see [the full list of all storage providers and their features](https://r
* Optional encryption ([Crypt](https://rclone.org/crypt/)) * Optional encryption ([Crypt](https://rclone.org/crypt/))
* Optional cache ([Cache](https://rclone.org/cache/)) * Optional cache ([Cache](https://rclone.org/cache/))
* Optional FUSE mount ([rclone mount](https://rclone.org/commands/rclone_mount/)) * Optional FUSE mount ([rclone mount](https://rclone.org/commands/rclone_mount/))
* Multi-threaded downloads to local disk
## Installation & documentation ## Installation & documentation

View file

@ -65,6 +65,7 @@ Features
* ([Cache](/cache/)) backend * ([Cache](/cache/)) backend
* ([Union](/union/)) backend * ([Union](/union/)) backend
* Optional FUSE mount ([rclone mount](/commands/rclone_mount/)) * Optional FUSE mount ([rclone mount](/commands/rclone_mount/))
* Multi-threaded downloads to local disk
Links Links

View file

@ -674,6 +674,49 @@ if you are reading and writing to an OS X filing system this will be
This command line flag allows you to override that computed default. This command line flag allows you to override that computed default.
### --multi-thread-cutoff=SIZE ###
When downloading files to the local backend above this size, rclone
will use multiple threads to download the file. (default 250M)
Rclone preallocates the file (using `fallocate(FALLOC_FL_KEEP_SIZE)`
on unix or `NTSetInformationFile` on Windows both of which takes no
time) then each thread writes directly into the file at the correct
place. This means that rclone won't create fragmented or sparse files
and there won't be any assembly time at the end of the transfer.
The number of threads used to dowload is controlled by
`--multi-thread-streams`.
Use `-vv` if you wish to see info about the threads.
This will work with the `sync`/`copy`/`move` commands and friends
`copyto`/`moveto`. Multi thread downloads will be used with `rclone
mount` and `rclone serve` if `--vfs-cache-mode` is set to `writes` or
above.
**NB** that this **only** works for a local destination but will work
with any source.
### --multi-thread-streams=N ###
When using multi thread downloads (see above `--multi-thread-cutoff`)
this sets the maximum number of streams to use. Set to `0` to disable
multi thread downloads. (Default 4)
Exactly how many streams rclone uses for the download depends on the
size of the file. To calculate the number of download streams Rclone
divides the size of the file by the `--multi-thread-cutoff` and rounds
up, up to the maximum set with `--multi-thread-streams`.
So if `--multi-thread-cutoff 250MB` and `--multi-thread-streams 4` are
in effect (the defaults):
- 0MB.250MB files will be downloaded with 1 stream
- 250MB..500MB files will be downloaded with 2 streams
- 500MB..750MB files will be downloaded with 3 streams
- 750MB+ files will be downloaded with 4 streams
### --no-gzip-encoding ### ### --no-gzip-encoding ###
Don't set `Accept-Encoding: gzip`. This means that rclone won't ask Don't set `Accept-Encoding: gzip`. This means that rclone won't ask

View file

@ -93,6 +93,8 @@ type ConfigInfo struct {
CaCert string // Client Side CA CaCert string // Client Side CA
ClientCert string // Client Side Cert ClientCert string // Client Side Cert
ClientKey string // Client Side Key ClientKey string // Client Side Key
MultiThreadCutoff SizeSuffix
MultiThreadStreams int
} }
// NewConfig creates a new config with everything set to the default // NewConfig creates a new config with everything set to the default
@ -124,6 +126,8 @@ func NewConfig() *ConfigInfo {
c.MaxBacklog = 10000 c.MaxBacklog = 10000
// We do not want to set the default here. We use this variable being empty as part of the fall-through of options. // We do not want to set the default here. We use this variable being empty as part of the fall-through of options.
// c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - " // c.StatsOneLineDateFormat = "2006/01/02 15:04:05 - "
c.MultiThreadCutoff = SizeSuffix(250 * 1024 * 1024)
c.MultiThreadStreams = 4
return c return c
} }

View file

@ -95,6 +95,8 @@ func AddFlags(flagSet *pflag.FlagSet) {
flags.StringVarP(flagSet, &fs.Config.CaCert, "ca-cert", "", fs.Config.CaCert, "CA certificate used to verify servers") flags.StringVarP(flagSet, &fs.Config.CaCert, "ca-cert", "", fs.Config.CaCert, "CA certificate used to verify servers")
flags.StringVarP(flagSet, &fs.Config.ClientCert, "client-cert", "", fs.Config.ClientCert, "Client SSL certificate (PEM) for mutual TLS auth") flags.StringVarP(flagSet, &fs.Config.ClientCert, "client-cert", "", fs.Config.ClientCert, "Client SSL certificate (PEM) for mutual TLS auth")
flags.StringVarP(flagSet, &fs.Config.ClientKey, "client-key", "", fs.Config.ClientKey, "Client SSL private key (PEM) for mutual TLS auth") flags.StringVarP(flagSet, &fs.Config.ClientKey, "client-key", "", fs.Config.ClientKey, "Client SSL private key (PEM) for mutual TLS auth")
flags.FVarP(flagSet, &fs.Config.MultiThreadCutoff, "multi-thread-cutoff", "", "Use multi-thread downloads for files above this size.")
flags.IntVarP(flagSet, &fs.Config.MultiThreadStreams, "multi-thread-streams", "", fs.Config.MultiThreadStreams, "Max number of streams to use for multi-thread downloads.")
} }
// SetFlags converts any flags into config which weren't straight forward // SetFlags converts any flags into config which weren't straight forward

View file

@ -0,0 +1,171 @@
package operations
import (
"context"
"io"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/accounting"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
const (
multithreadChunkSize = 64 << 10
multithreadChunkSizeMask = multithreadChunkSize - 1
multithreadBufferSize = 32 * 1024
)
// state for a multi-thread copy
type multiThreadCopyState struct {
ctx context.Context
partSize int64
size int64
wc fs.WriterAtCloser
src fs.Object
acc *accounting.Account
streams int
}
// Copy a single stream into place
func (mc *multiThreadCopyState) copyStream(stream int) (err error) {
defer func() {
if err != nil {
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d failed: %v", stream+1, mc.streams, err)
}
}()
start := int64(stream) * mc.partSize
if start >= mc.size {
return nil
}
end := start + mc.partSize
if end > mc.size {
end = mc.size
}
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v starting", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start))
rc, err := newReOpen(mc.src, nil, &fs.RangeOption{Start: start, End: end - 1}, fs.Config.LowLevelRetries)
if err != nil {
return errors.Wrap(err, "multpart copy: failed to open source")
}
defer fs.CheckClose(rc, &err)
// Copy the data
buf := make([]byte, multithreadBufferSize)
offset := start
for {
// Check if context cancelled and exit if so
if mc.ctx.Err() != nil {
return mc.ctx.Err()
}
nr, er := rc.Read(buf)
if nr > 0 {
err = mc.acc.AccountRead(nr)
if err != nil {
return errors.Wrap(err, "multpart copy: accounting failed")
}
nw, ew := mc.wc.WriteAt(buf[0:nr], offset)
if nw > 0 {
offset += int64(nw)
}
if ew != nil {
return errors.Wrap(ew, "multpart copy: write failed")
}
if nr != nw {
return errors.Wrap(io.ErrShortWrite, "multpart copy")
}
}
if er != nil {
if er != io.EOF {
return errors.Wrap(er, "multpart copy: read failed")
}
break
}
}
if offset != end {
return errors.Errorf("multpart copy: wrote %d bytes but expected to write %d", offset-start, end-start)
}
fs.Debugf(mc.src, "multi-thread copy: stream %d/%d (%d-%d) size %v finished", stream+1, mc.streams, start, end, fs.SizeSuffix(end-start))
return nil
}
// Calculate the chunk sizes and updated number of streams
func (mc *multiThreadCopyState) calculateChunks() {
partSize := mc.size / int64(mc.streams)
// Round partition size up so partSize * streams >= size
if (mc.size % int64(mc.streams)) != 0 {
partSize++
}
// round partSize up to nearest multithreadChunkSize boundary
mc.partSize = (partSize + multithreadChunkSizeMask) &^ multithreadChunkSizeMask
// recalculate number of streams
mc.streams = int(mc.size / mc.partSize)
// round streams up so partSize * streams >= size
if (mc.size % mc.partSize) != 0 {
mc.streams++
}
}
// Copy src to (f, remote) using streams download threads and the OpenWriterAt feature
func multiThreadCopy(f fs.Fs, remote string, src fs.Object, streams int) (newDst fs.Object, err error) {
openWriterAt := f.Features().OpenWriterAt
if openWriterAt == nil {
return nil, errors.New("multi-thread copy: OpenWriterAt not supported")
}
if src.Size() < 0 {
return nil, errors.New("multi-thread copy: can't copy unknown sized file")
}
if src.Size() == 0 {
return nil, errors.New("multi-thread copy: can't copy zero sized file")
}
g, ctx := errgroup.WithContext(context.Background())
mc := &multiThreadCopyState{
ctx: ctx,
size: src.Size(),
src: src,
streams: streams,
}
mc.calculateChunks()
// Make accounting
mc.acc = accounting.NewAccount(nil, src)
defer fs.CheckClose(mc.acc, &err)
// create write file handle
mc.wc, err = openWriterAt(remote, mc.size)
if err != nil {
return nil, errors.Wrap(err, "multpart copy: failed to open destination")
}
defer fs.CheckClose(mc.wc, &err)
fs.Debugf(src, "Starting multi-thread copy with %d parts of size %v", mc.streams, fs.SizeSuffix(mc.partSize))
for stream := 0; stream < mc.streams; stream++ {
stream := stream
g.Go(func() (err error) {
return mc.copyStream(stream)
})
}
err = g.Wait()
if err != nil {
return nil, err
}
obj, err := f.NewObject(remote)
if err != nil {
return nil, errors.Wrap(err, "multi-thread copy: failed to find object after copy")
}
err = obj.SetModTime(src.ModTime())
switch err {
case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete:
default:
return nil, errors.Wrap(err, "multi-thread copy: failed to set modification time")
}
fs.Debugf(src, "Finished multi-thread copy with %d parts of size %v", mc.streams, fs.SizeSuffix(mc.partSize))
return obj, nil
}

View file

@ -0,0 +1,70 @@
package operations
import (
"fmt"
"testing"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fstest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestMultithreadCalculateChunks(t *testing.T) {
for _, test := range []struct {
size int64
streams int
wantPartSize int64
wantStreams int
}{
{size: 1, streams: 10, wantPartSize: multithreadChunkSize, wantStreams: 1},
{size: 1 << 20, streams: 1, wantPartSize: 1 << 20, wantStreams: 1},
{size: 1 << 20, streams: 2, wantPartSize: 1 << 19, wantStreams: 2},
{size: (1 << 20) + 1, streams: 2, wantPartSize: (1 << 19) + multithreadChunkSize, wantStreams: 2},
{size: (1 << 20) - 1, streams: 2, wantPartSize: (1 << 19), wantStreams: 2},
} {
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) {
mc := &multiThreadCopyState{
size: test.size,
streams: test.streams,
}
mc.calculateChunks()
assert.Equal(t, test.wantPartSize, mc.partSize)
assert.Equal(t, test.wantStreams, mc.streams)
})
}
}
func TestMultithreadCopy(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
for _, test := range []struct {
size int
streams int
}{
{size: multithreadChunkSize*2 - 1, streams: 2},
{size: multithreadChunkSize * 2, streams: 2},
{size: multithreadChunkSize*2 + 1, streams: 2},
} {
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) {
contents := fstest.RandomString(test.size)
t1 := fstest.Time("2001-02-03T04:05:06.499999999Z")
file1 := r.WriteObject("file1", contents, t1)
fstest.CheckItems(t, r.Fremote, file1)
fstest.CheckItems(t, r.Flocal)
src, err := r.Fremote.NewObject("file1")
require.NoError(t, err)
dst, err := multiThreadCopy(r.Flocal, "file1", src, 2)
require.NoError(t, err)
assert.Equal(t, src.Size(), dst.Size())
assert.Equal(t, "file1", dst.Remote())
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, nil, fs.ModTimeNotSupported)
require.NoError(t, dst.Remove())
})
}
}

View file

@ -292,38 +292,56 @@ func Copy(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Objec
} }
// If can't server side copy, do it manually // If can't server side copy, do it manually
if err == fs.ErrorCantCopy { if err == fs.ErrorCantCopy {
var in0 io.ReadCloser if doOpenWriterAt := f.Features().OpenWriterAt; doOpenWriterAt != nil && src.Size() >= int64(fs.Config.MultiThreadCutoff) && fs.Config.MultiThreadStreams > 1 {
in0, err = newReOpen(src, hashOption, nil, fs.Config.LowLevelRetries) // Number of streams proportional to size
if err != nil { streams := src.Size() / int64(fs.Config.MultiThreadCutoff)
err = errors.Wrap(err, "failed to open source object") // With maximum
} else { if streams > int64(fs.Config.MultiThreadStreams) {
if src.Size() == -1 { streams = int64(fs.Config.MultiThreadStreams)
// -1 indicates unknown size. Use Rcat to handle both remotes supporting and not supporting PutStream. }
if doUpdate { if streams < 2 {
actionTaken = "Copied (Rcat, replaced existing)" streams = 2
} else { }
actionTaken = "Copied (Rcat, new)" dst, err = multiThreadCopy(f, remote, src, int(streams))
} if doUpdate {
dst, err = Rcat(f, remote, in0, src.ModTime()) actionTaken = "Multi-thread Copied (replaced existing)"
newDst = dst
} else { } else {
in := accounting.NewAccount(in0, src).WithBuffer() // account and buffer the transfer actionTaken = "Multi-thread Copied (new)"
var wrappedSrc fs.ObjectInfo = src }
// We try to pass the original object if possible } else {
if src.Remote() != remote { var in0 io.ReadCloser
wrappedSrc = &overrideRemoteObject{Object: src, remote: remote} in0, err = newReOpen(src, hashOption, nil, fs.Config.LowLevelRetries)
} if err != nil {
if doUpdate { err = errors.Wrap(err, "failed to open source object")
actionTaken = "Copied (replaced existing)" } else {
err = dst.Update(in, wrappedSrc, hashOption) if src.Size() == -1 {
} else { // -1 indicates unknown size. Use Rcat to handle both remotes supporting and not supporting PutStream.
actionTaken = "Copied (new)" if doUpdate {
dst, err = f.Put(in, wrappedSrc, hashOption) actionTaken = "Copied (Rcat, replaced existing)"
} } else {
closeErr := in.Close() actionTaken = "Copied (Rcat, new)"
if err == nil { }
dst, err = Rcat(f, remote, in0, src.ModTime())
newDst = dst newDst = dst
err = closeErr } else {
in := accounting.NewAccount(in0, src).WithBuffer() // account and buffer the transfer
var wrappedSrc fs.ObjectInfo = src
// We try to pass the original object if possible
if src.Remote() != remote {
wrappedSrc = &overrideRemoteObject{Object: src, remote: remote}
}
if doUpdate {
actionTaken = "Copied (replaced existing)"
err = dst.Update(in, wrappedSrc, hashOption)
} else {
actionTaken = "Copied (new)"
dst, err = f.Put(in, wrappedSrc, hashOption)
}
closeErr := in.Close()
if err == nil {
newDst = dst
err = closeErr
}
} }
} }
} }