forked from TrueCloudLab/rclone
6a0a54ab97
Before this change multipart downloads to the local disk with --metadata failed to have their metadata set properly. This was because the OpenWriterAt interface doesn't receive metadata when creating the object. This patch fixes the problem by using the recently introduced Object.SetMetadata method to set the metadata on the object after the download has completed (when using --metadata). If the backend we are copying to is using OpenWriterAt but the Object doesn't support SetMetadata then it will write an ERROR level log but complete successfully. This should not happen at the moment as only the local backend supports metadata and OpenWriterAt but it may in the future. It also adds a test to check metadata is preserved when doing multipart transfers. Fixes #7424
333 lines
10 KiB
Go
333 lines
10 KiB
Go
package operations
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/rclone/rclone/fs/accounting"
|
|
"github.com/rclone/rclone/fs/hash"
|
|
"github.com/rclone/rclone/fs/object"
|
|
"github.com/rclone/rclone/fstest/mockfs"
|
|
"github.com/rclone/rclone/fstest/mockobject"
|
|
"github.com/rclone/rclone/lib/random"
|
|
|
|
"github.com/rclone/rclone/fs"
|
|
"github.com/rclone/rclone/fstest"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestDoMultiThreadCopy(t *testing.T) {
|
|
ctx := context.Background()
|
|
ci := fs.GetConfig(ctx)
|
|
f, err := mockfs.NewFs(ctx, "potato", "", nil)
|
|
require.NoError(t, err)
|
|
src := mockobject.New("file.txt").WithContent([]byte(random.String(100)), mockobject.SeekModeNone)
|
|
srcFs, err := mockfs.NewFs(ctx, "sausage", "", nil)
|
|
require.NoError(t, err)
|
|
src.SetFs(srcFs)
|
|
|
|
oldStreams := ci.MultiThreadStreams
|
|
oldCutoff := ci.MultiThreadCutoff
|
|
oldIsSet := ci.MultiThreadSet
|
|
defer func() {
|
|
ci.MultiThreadStreams = oldStreams
|
|
ci.MultiThreadCutoff = oldCutoff
|
|
ci.MultiThreadSet = oldIsSet
|
|
}()
|
|
|
|
ci.MultiThreadStreams, ci.MultiThreadCutoff = 4, 50
|
|
ci.MultiThreadSet = false
|
|
|
|
nullWriterAt := func(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
|
|
panic("don't call me")
|
|
}
|
|
f.Features().OpenWriterAt = nullWriterAt
|
|
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
ci.MultiThreadStreams = 0
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadStreams = 1
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadStreams = 2
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
ci.MultiThreadCutoff = 200
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadCutoff = 101
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadCutoff = 100
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
f.Features().OpenWriterAt = nil
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
f.Features().OpenWriterAt = nullWriterAt
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
f.Features().IsLocal = true
|
|
srcFs.Features().IsLocal = true
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadSet = true
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
ci.MultiThreadSet = false
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().IsLocal = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().IsLocal = true
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
f.Features().IsLocal = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().IsLocal = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
|
|
srcFs.Features().NoMultiThreading = true
|
|
assert.False(t, doMultiThreadCopy(ctx, f, src))
|
|
srcFs.Features().NoMultiThreading = false
|
|
assert.True(t, doMultiThreadCopy(ctx, f, src))
|
|
}
|
|
|
|
func TestMultithreadCalculateNumChunks(t *testing.T) {
|
|
for _, test := range []struct {
|
|
size int64
|
|
chunkSize int64
|
|
wantNumChunks int
|
|
}{
|
|
{size: 1, chunkSize: multithreadChunkSize, wantNumChunks: 1},
|
|
{size: 1 << 20, chunkSize: 1, wantNumChunks: 1 << 20},
|
|
{size: 1 << 20, chunkSize: 2, wantNumChunks: 1 << 19},
|
|
{size: (1 << 20) + 1, chunkSize: 2, wantNumChunks: (1 << 19) + 1},
|
|
{size: (1 << 20) - 1, chunkSize: 2, wantNumChunks: 1 << 19},
|
|
} {
|
|
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) {
|
|
mc := &multiThreadCopyState{
|
|
size: test.size,
|
|
}
|
|
mc.numChunks = calculateNumChunks(test.size, test.chunkSize)
|
|
assert.Equal(t, test.wantNumChunks, mc.numChunks)
|
|
})
|
|
}
|
|
}
|
|
|
|
// Skip if not multithread, returning the chunkSize otherwise
|
|
func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) int {
|
|
features := r.Fremote.Features()
|
|
if features.OpenChunkWriter == nil && features.OpenWriterAt == nil {
|
|
t.Skip("multithread writing not supported")
|
|
}
|
|
|
|
// Only support one hash otherwise we end up spending a huge amount of CPU on hashing!
|
|
oldHashes := hash.SupportOnly([]hash.Type{r.Fremote.Hashes().GetOne()})
|
|
t.Cleanup(func() {
|
|
_ = hash.SupportOnly(oldHashes)
|
|
})
|
|
|
|
ci := fs.GetConfig(ctx)
|
|
chunkSize := int(ci.MultiThreadChunkSize)
|
|
if features.OpenChunkWriter != nil {
|
|
//OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error)
|
|
const fileName = "chunksize-probe"
|
|
src := object.NewStaticObjectInfo(fileName, time.Now(), int64(100*fs.Mebi), true, nil, nil)
|
|
info, writer, err := features.OpenChunkWriter(ctx, fileName, src)
|
|
require.NoError(t, err)
|
|
chunkSize = int(info.ChunkSize)
|
|
err = writer.Abort(ctx)
|
|
require.NoError(t, err)
|
|
}
|
|
return chunkSize
|
|
}
|
|
|
|
func TestMultithreadCopy(t *testing.T) {
|
|
r := fstest.NewRun(t)
|
|
ctx := context.Background()
|
|
chunkSize := skipIfNotMultithread(ctx, t, r)
|
|
// Check every other transfer for metadata
|
|
checkMetadata := false
|
|
ctx, ci := fs.AddConfig(ctx)
|
|
|
|
for _, upload := range []bool{false, true} {
|
|
for _, test := range []struct {
|
|
size int
|
|
streams int
|
|
}{
|
|
{size: chunkSize*2 - 1, streams: 2},
|
|
{size: chunkSize * 2, streams: 2},
|
|
{size: chunkSize*2 + 1, streams: 2},
|
|
} {
|
|
checkMetadata = !checkMetadata
|
|
ci.Metadata = checkMetadata
|
|
fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams)
|
|
t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) {
|
|
if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit {
|
|
t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit)
|
|
}
|
|
var (
|
|
contents = random.String(test.size)
|
|
t1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
|
|
file1 fstest.Item
|
|
src, dst fs.Object
|
|
err error
|
|
testMetadata = fs.Metadata{
|
|
// System metadata supported by all backends
|
|
"mtime": t1.Format(time.RFC3339Nano),
|
|
// User metadata
|
|
"potato": "jersey",
|
|
}
|
|
)
|
|
|
|
var fSrc, fDst fs.Fs
|
|
if upload {
|
|
file1 = r.WriteFile(fileName, contents, t1)
|
|
r.CheckRemoteItems(t)
|
|
r.CheckLocalItems(t, file1)
|
|
fDst, fSrc = r.Fremote, r.Flocal
|
|
} else {
|
|
file1 = r.WriteObject(ctx, fileName, contents, t1)
|
|
r.CheckRemoteItems(t, file1)
|
|
r.CheckLocalItems(t)
|
|
fDst, fSrc = r.Flocal, r.Fremote
|
|
}
|
|
src, err = fSrc.NewObject(ctx, fileName)
|
|
require.NoError(t, err)
|
|
|
|
do, canSetMetadata := src.(fs.SetMetadataer)
|
|
if checkMetadata && canSetMetadata {
|
|
// Set metadata on the source if required
|
|
err := do.SetMetadata(ctx, testMetadata)
|
|
if err == fs.ErrorNotImplemented {
|
|
canSetMetadata = false
|
|
} else {
|
|
require.NoError(t, err)
|
|
fstest.CheckEntryMetadata(ctx, t, r.Flocal, src, testMetadata)
|
|
}
|
|
}
|
|
|
|
accounting.GlobalStats().ResetCounters()
|
|
tr := accounting.GlobalStats().NewTransfer(src, nil)
|
|
|
|
defer func() {
|
|
tr.Done(ctx, err)
|
|
}()
|
|
|
|
dst, err = multiThreadCopy(ctx, fDst, fileName, src, test.streams, tr)
|
|
require.NoError(t, err)
|
|
|
|
assert.Equal(t, src.Size(), dst.Size())
|
|
assert.Equal(t, fileName, dst.Remote())
|
|
fstest.CheckListingWithPrecision(t, fSrc, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))
|
|
fstest.CheckListingWithPrecision(t, fDst, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))
|
|
|
|
if checkMetadata && canSetMetadata && fDst.Features().ReadMetadata {
|
|
fstest.CheckEntryMetadata(ctx, t, fDst, dst, testMetadata)
|
|
}
|
|
|
|
require.NoError(t, dst.Remove(ctx))
|
|
require.NoError(t, src.Remove(ctx))
|
|
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
type errorObject struct {
|
|
fs.Object
|
|
size int64
|
|
wg *sync.WaitGroup
|
|
}
|
|
|
|
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
|
//
|
|
// Remember this is called multiple times whenever the backend seeks (eg having read checksum)
|
|
func (o errorObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
|
fs.Debugf(nil, "Open with options = %v", options)
|
|
rc, err := o.Object.Open(ctx, options...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Return an error reader for the second segment
|
|
for _, option := range options {
|
|
if ropt, ok := option.(*fs.RangeOption); ok {
|
|
end := ropt.End + 1
|
|
if end >= o.size {
|
|
// Give the other chunks a chance to start
|
|
time.Sleep(time.Second)
|
|
// Wait for chunks to upload first
|
|
o.wg.Wait()
|
|
fs.Debugf(nil, "Returning error reader")
|
|
return errorReadCloser{rc}, nil
|
|
}
|
|
}
|
|
}
|
|
o.wg.Add(1)
|
|
return wgReadCloser{rc, o.wg}, nil
|
|
}
|
|
|
|
type errorReadCloser struct {
|
|
io.ReadCloser
|
|
}
|
|
|
|
func (rc errorReadCloser) Read(p []byte) (n int, err error) {
|
|
fs.Debugf(nil, "BOOM: simulated read failure")
|
|
return 0, errors.New("BOOM: simulated read failure")
|
|
}
|
|
|
|
type wgReadCloser struct {
|
|
io.ReadCloser
|
|
wg *sync.WaitGroup
|
|
}
|
|
|
|
func (rc wgReadCloser) Close() (err error) {
|
|
rc.wg.Done()
|
|
return rc.ReadCloser.Close()
|
|
}
|
|
|
|
// Make sure aborting the multi-thread copy doesn't overwrite an existing file.
|
|
func TestMultithreadCopyAbort(t *testing.T) {
|
|
r := fstest.NewRun(t)
|
|
ctx := context.Background()
|
|
chunkSize := skipIfNotMultithread(ctx, t, r)
|
|
size := 2*chunkSize + 1
|
|
|
|
if *fstest.SizeLimit > 0 && int64(size) > *fstest.SizeLimit {
|
|
t.Skipf("exceeded file size limit %d > %d", size, *fstest.SizeLimit)
|
|
}
|
|
|
|
// first write a canary file which we are trying not to overwrite
|
|
const fileName = "test-multithread-abort"
|
|
contents := random.String(100)
|
|
t1 := fstest.Time("2001-02-03T04:05:06.499999999Z")
|
|
canary := r.WriteObject(ctx, fileName, contents, t1)
|
|
r.CheckRemoteItems(t, canary)
|
|
|
|
// Now write a local file to upload
|
|
file1 := r.WriteFile(fileName, random.String(size), t1)
|
|
r.CheckLocalItems(t, file1)
|
|
|
|
src, err := r.Flocal.NewObject(ctx, fileName)
|
|
require.NoError(t, err)
|
|
accounting.GlobalStats().ResetCounters()
|
|
tr := accounting.GlobalStats().NewTransfer(src, nil)
|
|
|
|
defer func() {
|
|
tr.Done(ctx, err)
|
|
}()
|
|
wg := new(sync.WaitGroup)
|
|
dst, err := multiThreadCopy(ctx, r.Fremote, fileName, errorObject{src, int64(size), wg}, 1, tr)
|
|
assert.Error(t, err)
|
|
assert.Nil(t, dst)
|
|
|
|
if r.Fremote.Features().PartialUploads {
|
|
r.CheckRemoteItems(t)
|
|
|
|
} else {
|
|
r.CheckRemoteItems(t, canary)
|
|
o, err := r.Fremote.NewObject(ctx, fileName)
|
|
require.NoError(t, err)
|
|
require.NoError(t, o.Remove(ctx))
|
|
}
|
|
}
|