91b54aafcc
Before this change it wasn't possible to see where transfers were going from and to in core/stats and core/transferred. When use in rclone mount in particular this made interpreting the stats very hard.
306 lines
9.2 KiB
Go
306 lines
9.2 KiB
Go
package operations
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs/accounting"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/fs/object"
|
|
"github.com/rclone/rclone/fstest/mockfs"
|
|
"github.com/rclone/rclone/fstest/mockobject"
|
|
"github.com/rclone/rclone/lib/random"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fstest"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestDoMultiThreadCopy(t *testing.T) {
|
|
ctx := context.Background()
|
|
ci := fs.GetConfig(ctx)
|
|
f, err := mockfs.NewFs(ctx, "potato", "", nil)
|
|
require.NoError(t, err)
|
|
src := mockobject.New("file.txt").WithContent([]byte(random.String(100)), mockobject.SeekModeNone)
|
|
srcFs, err := mockfs.NewFs(ctx, "sausage", "", nil)
|
|
require.NoError(t, err)
|
|
src.SetFs(srcFs)
|
|
|
|
oldStreams := ci.MultiThreadStreams
|
|
oldCutoff := ci.MultiThreadCutoff
|
|
oldIsSet := ci.MultiThreadSet
|
|
defer func() {
|
|
ci.MultiThreadStreams = oldStreams
|
|
ci.MultiThreadCutoff = oldCutoff
|
|
ci.MultiThreadSet = oldIsSet
|
|
}()
|
|
|
|
ci.MultiThreadStreams, ci.MultiThreadCutoff = 4, 50
|
|
ci.MultiThreadSet = false
|
|
|
|
nullWriterAt := func(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
|
panic("don't call me")
|
|
}
|
|
f.Features().OpenWriterAt = nullWriterAt
|
|
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
ci.MultiThreadStreams = 0
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadStreams = 1
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadStreams = 2
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
ci.MultiThreadCutoff = 200
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadCutoff = 101
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadCutoff = 100
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
f.Features().OpenWriterAt = nil
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
f.Features().OpenWriterAt = nullWriterAt
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
f.Features().IsLocal = true
|
|
srcFs.Features().IsLocal = true
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadSet = true
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadSet = false
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().IsLocal = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().IsLocal = true
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
f.Features().IsLocal = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().IsLocal = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
srcFs.Features().NoMultiThreading = true
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().NoMultiThreading = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
}
|
|
|
|
func TestMultithreadCalculateNumChunks(t *testing.T) {
|
|
for _, test := range []struct {
|
|
size int64
|
|
chunkSize int64
|
|
wantNumChunks int
|
|
}{
|
|
{size: 1, chunkSize: multithreadChunkSize, wantNumChunks: 1},
|
|
{size: 1 << 20, chunkSize: 1, wantNumChunks: 1 << 20},
|
|
{size: 1 << 20, chunkSize: 2, wantNumChunks: 1 << 19},
|
|
{size: (1 << 20) + 1, chunkSize: 2, wantNumChunks: (1 << 19) + 1},
|
|
{size: (1 << 20) - 1, chunkSize: 2, wantNumChunks: 1 << 19},
|
|
} {
|
|
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) {
|
|
mc := &multiThreadCopyState{
|
|
size: test.size,
|
|
}
|
|
mc.numChunks = calculateNumChunks(test.size, test.chunkSize)
|
|
assert.Equal(t, test.wantNumChunks, mc.numChunks)
|
|
})
|
|
}
|
|
}
|
|
|
|
// Skip if not multithread, returning the chunkSize otherwise
|
|
func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) int {
|
|
features := r.Fremote.Features()
|
|
if features.OpenChunkWriter == nil && features.OpenWriterAt == nil {
|
|
t.Skip("multithread writing not supported")
|
|
}
|
|
|
|
// Only support one hash otherwise we end up spending a huge amount of CPU on hashing!
|
|
oldHashes := hash.SupportOnly([]hash.Type{r.Fremote.Hashes().GetOne()})
|
|
t.Cleanup(func() {
|
|
_ = hash.SupportOnly(oldHashes)
|
|
})
|
|
|
|
ci := fs.GetConfig(ctx)
|
|
chunkSize := int(ci.MultiThreadChunkSize)
|
|
if features.OpenChunkWriter != nil {
|
|
//OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error)
|
|
const fileName = "chunksize-probe"
|
|
src := object.NewStaticObjectInfo(fileName, time.Now(), int64(100*fs.Mebi), true, nil, nil)
|
|
info, writer, err := features.OpenChunkWriter(ctx, fileName, src)
|
|
require.NoError(t, err)
|
|
chunkSize = int(info.ChunkSize)
|
|
err = writer.Abort(ctx)
|
|
require.NoError(t, err)
|
|
}
|
|
return chunkSize
|
|
}
|
|
|
|
func TestMultithreadCopy(t *testing.T) {
|
|
r := fstest.NewRun(t)
|
|
ctx := context.Background()
|
|
chunkSize := skipIfNotMultithread(ctx, t, r)
|
|
|
|
for _, upload := range []bool{false, true} {
|
|
for _, test := range []struct {
|
|
size int
|
|
streams int
|
|
}{
|
|
{size: chunkSize*2 - 1, streams: 2},
|
|
{size: chunkSize * 2, streams: 2},
|
|
{size: chunkSize*2 + 1, streams: 2},
|
|
} {
|
|
fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams)
|
|
t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) {
|
|
if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit {
|
|
t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit)
|
|
}
|
|
var (
|
|
contents = random.String(test.size)
|
|
t1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
|
|
file1 fstest.Item
|
|
src, dst fs.Object
|
|
err error
|
|
)
|
|
|
|
if upload {
|
|
file1 = r.WriteFile(fileName, contents, t1)
|
|
r.CheckRemoteItems(t)
|
|
r.CheckLocalItems(t, file1)
|
|
src, err = r.Flocal.NewObject(ctx, fileName)
|
|
} else {
|
|
file1 = r.WriteObject(ctx, fileName, contents, t1)
|
|
r.CheckRemoteItems(t, file1)
|
|
r.CheckLocalItems(t)
|
|
src, err = r.Fremote.NewObject(ctx, fileName)
|
|
}
|
|
require.NoError(t, err)
|
|
|
|
accounting.GlobalStats().ResetCounters()
|
|
tr := accounting.GlobalStats().NewTransfer(src, nil)
|
|
|
|
defer func() {
|
|
tr.Done(ctx, err)
|
|
}()
|
|
|
|
if upload {
|
|
dst, err = multiThreadCopy(ctx, r.Fremote, fileName, src, test.streams, tr)
|
|
} else {
|
|
dst, err = multiThreadCopy(ctx, r.Flocal, fileName, src, test.streams, tr)
|
|
}
|
|
|
|
require.NoError(t, err)
|
|
assert.Equal(t, src.Size(), dst.Size())
|
|
assert.Equal(t, fileName, dst.Remote())
|
|
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
|
|
fstest.CheckListingWithPrecision(t, r.Flocal, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
|
|
require.NoError(t, dst.Remove(ctx))
|
|
require.NoError(t, src.Remove(ctx))
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
type errorObject struct {
|
|
fs.Object
|
|
size int64
|
|
wg *sync.WaitGroup
|
|
}
|
|
|
|
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
|
//
|
|
// Remember this is called multiple times whenever the backend seeks (eg having read checksum)
|
|
func (o errorObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
|
fs.Debugf(nil, "Open with options = %v", options)
|
|
rc, err := o.Object.Open(ctx, options...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Return an error reader for the second segment
|
|
for _, option := range options {
|
|
if ropt, ok := option.(*fs.RangeOption); ok {
|
|
end := ropt.End + 1
|
|
if end >= o.size {
|
|
// Give the other chunks a chance to start
|
|
time.Sleep(time.Second)
|
|
// Wait for chunks to upload first
|
|
o.wg.Wait()
|
|
fs.Debugf(nil, "Returning error reader")
|
|
return errorReadCloser{rc}, nil
|
|
}
|
|
}
|
|
}
|
|
o.wg.Add(1)
|
|
return wgReadCloser{rc, o.wg}, nil
|
|
}
|
|
|
|
type errorReadCloser struct {
|
|
io.ReadCloser
|
|
}
|
|
|
|
func (rc errorReadCloser) Read(p []byte) (n int, err error) {
|
|
fs.Debugf(nil, "BOOM: simulated read failure")
|
|
return 0, errors.New("BOOM: simulated read failure")
|
|
}
|
|
|
|
type wgReadCloser struct {
|
|
io.ReadCloser
|
|
wg *sync.WaitGroup
|
|
}
|
|
|
|
func (rc wgReadCloser) Close() (err error) {
|
|
rc.wg.Done()
|
|
return rc.ReadCloser.Close()
|
|
}
|
|
|
|
// Make sure aborting the multi-thread copy doesn't overwrite an existing file.
|
|
func TestMultithreadCopyAbort(t *testing.T) {
|
|
r := fstest.NewRun(t)
|
|
ctx := context.Background()
|
|
chunkSize := skipIfNotMultithread(ctx, t, r)
|
|
size := 2*chunkSize + 1
|
|
|
|
if *fstest.SizeLimit > 0 && int64(size) > *fstest.SizeLimit {
|
|
t.Skipf("exceeded file size limit %d > %d", size, *fstest.SizeLimit)
|
|
}
|
|
|
|
// first write a canary file which we are trying not to overwrite
|
|
const fileName = "test-multithread-abort"
|
|
contents := random.String(100)
|
|
t1 := fstest.Time("2001-02-03T04:05:06.499999999Z")
|
|
canary := r.WriteObject(ctx, fileName, contents, t1)
|
|
r.CheckRemoteItems(t, canary)
|
|
|
|
// Now write a local file to upload
|
|
file1 := r.WriteFile(fileName, random.String(size), t1)
|
|
r.CheckLocalItems(t, file1)
|
|
|
|
src, err := r.Flocal.NewObject(ctx, fileName)
|
|
require.NoError(t, err)
|
|
accounting.GlobalStats().ResetCounters()
|
|
tr := accounting.GlobalStats().NewTransfer(src, nil)
|
|
|
|
defer func() {
|
|
tr.Done(ctx, err)
|
|
}()
|
|
wg := new(sync.WaitGroup)
|
|
dst, err := multiThreadCopy(ctx, r.Fremote, fileName, errorObject{src, int64(size), wg}, 1, tr)
|
|
assert.Error(t, err)
|
|
assert.Nil(t, dst)
|
|
|
|
if r.Fremote.Features().PartialUploads {
|
|
r.CheckRemoteItems(t)
|
|
|
|
} else {
|
|
r.CheckRemoteItems(t, canary)
|
|
o, err := r.Fremote.NewObject(ctx, fileName)
|
|
require.NoError(t, err)
|
|
require.NoError(t, o.Remove(ctx))
|
|
}
|
|
}
|