operations: fix overwrite of destination when multi-thread transfer fails

Before this change, if a multithread upload failed (let's say the
source became unavailable) rclone would finalise the file first before
aborting the transfer.

This caused the partial file to be written which would overwrite any
existing files.

This was fixed by making sure we Abort the transfer before Close-ing
it.

This updates the docs to encourage calling of Abort before Close and
updates writerAtChunkWriter to make sure that works properly.

This also reworks the tests to detect this and to make sure we upload
and download to each multi-thread capable backend (we were only
downloading before which isn't a full test).

Fixes #7071
This commit is contained in:
Nick Craig-Wood 2023-11-22 15:05:44 +00:00
parent 94ccc95515
commit d5d28a7513
3 changed files with 206 additions and 39 deletions

View file

@ -664,10 +664,12 @@ type ChunkWriter interface {
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 // WriteChunk will write chunk number with reader bytes, where chunk number >= 0
WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (bytesWritten int64, err error)
// Close complete chunked writer // Close complete chunked writer finalising the file.
Close(ctx context.Context) error Close(ctx context.Context) error
// Abort chunk write // Abort chunk write
//
// You can and should call Abort without calling Close.
Abort(ctx context.Context) error Abort(ctx context.Context) error
} }

View file

@ -165,9 +165,10 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
uploadCtx, cancel := context.WithCancel(ctx) uploadCtx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
uploadedOK := false
defer atexit.OnError(&err, func() { defer atexit.OnError(&err, func() {
cancel() cancel()
if info.LeavePartsOnError { if info.LeavePartsOnError || uploadedOK {
return return
} }
fs.Debugf(src, "multi-thread copy: cancelling transfer on exit") fs.Debugf(src, "multi-thread copy: cancelling transfer on exit")
@ -226,13 +227,14 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
} }
err = g.Wait() err = g.Wait()
closeErr := chunkWriter.Close(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if closeErr != nil { err = chunkWriter.Close(ctx)
return nil, fmt.Errorf("multi-thread copy: failed to close object after copy: %w", closeErr) if err != nil {
return nil, fmt.Errorf("multi-thread copy: failed to close object after copy: %w", err)
} }
uploadedOK = true // file is definitely uploaded OK so no need to abort
obj, err := f.NewObject(ctx, remote) obj, err := f.NewObject(ctx, remote)
if err != nil { if err != nil {
@ -282,10 +284,11 @@ type writerAtChunkWriter struct {
chunks int chunks int
writeBufferSize int64 writeBufferSize int64
f fs.Fs f fs.Fs
closed bool
} }
// WriteChunk writes chunkNumber from reader // WriteChunk writes chunkNumber from reader
func (w writerAtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { func (w *writerAtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
fs.Debugf(w.remote, "writing chunk %v", chunkNumber) fs.Debugf(w.remote, "writing chunk %v", chunkNumber)
bytesToWrite := w.chunkSize bytesToWrite := w.chunkSize
@ -316,12 +319,20 @@ func (w writerAtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, re
} }
// Close the chunk writing // Close the chunk writing
func (w writerAtChunkWriter) Close(ctx context.Context) error { func (w *writerAtChunkWriter) Close(ctx context.Context) error {
if w.closed {
return nil
}
w.closed = true
return w.writerAt.Close() return w.writerAt.Close()
} }
// Abort the chunk writing // Abort the chunk writing
func (w writerAtChunkWriter) Abort(ctx context.Context) error { func (w *writerAtChunkWriter) Abort(ctx context.Context) error {
err := w.Close(ctx)
if err != nil {
fs.Errorf(w.remote, "multi-thread copy: failed to close file before aborting: %v", err)
}
obj, err := w.f.NewObject(ctx, w.remote) obj, err := w.f.NewObject(ctx, w.remote)
if err != nil { if err != nil {
return fmt.Errorf("multi-thread copy: failed to find temp file when aborting chunk writer: %w", err) return fmt.Errorf("multi-thread copy: failed to find temp file when aborting chunk writer: %w", err)

View file

@ -2,10 +2,16 @@ package operations
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io"
"sync"
"testing" "testing"
"time"
"github.com/rclone/rclone/fs/accounting" "github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/object"
"github.com/rclone/rclone/fstest/mockfs" "github.com/rclone/rclone/fstest/mockfs"
"github.com/rclone/rclone/fstest/mockobject" "github.com/rclone/rclone/fstest/mockobject"
"github.com/rclone/rclone/lib/random" "github.com/rclone/rclone/lib/random"
@ -108,30 +114,174 @@ func TestMultithreadCalculateNumChunks(t *testing.T) {
} }
} }
// Skip if not multithread, returning the chunkSize otherwise
func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) int {
features := r.Fremote.Features()
if features.OpenChunkWriter == nil && features.OpenWriterAt == nil {
t.Skip("multithread writing not supported")
}
// Only support one hash otherwise we end up spending a huge amount of CPU on hashing!
oldHashes := hash.SupportOnly([]hash.Type{r.Fremote.Hashes().GetOne()})
t.Cleanup(func() {
_ = hash.SupportOnly(oldHashes)
})
ci := fs.GetConfig(ctx)
chunkSize := int(ci.MultiThreadChunkSize)
if features.OpenChunkWriter != nil {
//OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error)
const fileName = "chunksize-probe"
src := object.NewStaticObjectInfo(fileName, time.Now(), int64(100*fs.Mebi), true, nil, nil)
info, writer, err := features.OpenChunkWriter(ctx, fileName, src)
require.NoError(t, err)
chunkSize = int(info.ChunkSize)
err = writer.Abort(ctx)
require.NoError(t, err)
}
return chunkSize
}
func TestMultithreadCopy(t *testing.T) { func TestMultithreadCopy(t *testing.T) {
r := fstest.NewRun(t) r := fstest.NewRun(t)
ctx := context.Background() ctx := context.Background()
chunkSize := skipIfNotMultithread(ctx, t, r)
for _, upload := range []bool{false, true} {
for _, test := range []struct { for _, test := range []struct {
size int size int
streams int streams int
}{ }{
{size: multithreadChunkSize*2 - 1, streams: 2}, {size: chunkSize*2 - 1, streams: 2},
{size: multithreadChunkSize * 2, streams: 2}, {size: chunkSize * 2, streams: 2},
{size: multithreadChunkSize*2 + 1, streams: 2}, {size: chunkSize*2 + 1, streams: 2},
} { } {
t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams)
t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) {
if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit { if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit {
t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit) t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit)
} }
var err error var (
contents := random.String(test.size) contents = random.String(test.size)
t1 := fstest.Time("2001-02-03T04:05:06.499999999Z") t1 = fstest.Time("2001-02-03T04:05:06.499999999Z")
file1 := r.WriteObject(ctx, "file1", contents, t1) file1 fstest.Item
src, dst fs.Object
err error
)
if upload {
file1 = r.WriteFile(fileName, contents, t1)
r.CheckRemoteItems(t)
r.CheckLocalItems(t, file1)
src, err = r.Flocal.NewObject(ctx, fileName)
} else {
file1 = r.WriteObject(ctx, fileName, contents, t1)
r.CheckRemoteItems(t, file1) r.CheckRemoteItems(t, file1)
r.CheckLocalItems(t) r.CheckLocalItems(t)
src, err = r.Fremote.NewObject(ctx, fileName)
}
require.NoError(t, err)
src, err := r.Fremote.NewObject(ctx, "file1") accounting.GlobalStats().ResetCounters()
tr := accounting.GlobalStats().NewTransfer(src)
defer func() {
tr.Done(ctx, err)
}()
if upload {
dst, err = multiThreadCopy(ctx, r.Fremote, fileName, src, test.streams, tr)
} else {
dst, err = multiThreadCopy(ctx, r.Flocal, fileName, src, test.streams, tr)
}
require.NoError(t, err)
assert.Equal(t, src.Size(), dst.Size())
assert.Equal(t, fileName, dst.Remote())
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
fstest.CheckListingWithPrecision(t, r.Flocal, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
require.NoError(t, dst.Remove(ctx))
require.NoError(t, src.Remove(ctx))
})
}
}
}
type errorObject struct {
fs.Object
size int64
wg *sync.WaitGroup
}
// Open opens the file for read. Call Close() on the returned io.ReadCloser
//
// Remember this is called multiple times whenever the backend seeks (eg having read checksum)
func (o errorObject) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
fs.Debugf(nil, "Open with options = %v", options)
rc, err := o.Object.Open(ctx, options...)
if err != nil {
return nil, err
}
// Return an error reader for the second segment
for _, option := range options {
if ropt, ok := option.(*fs.RangeOption); ok {
end := ropt.End + 1
if end >= o.size {
// Give the other chunks a chance to start
time.Sleep(time.Second)
// Wait for chunks to upload first
o.wg.Wait()
fs.Debugf(nil, "Returning error reader")
return errorReadCloser{rc}, nil
}
}
}
o.wg.Add(1)
return wgReadCloser{rc, o.wg}, nil
}
type errorReadCloser struct {
io.ReadCloser
}
func (rc errorReadCloser) Read(p []byte) (n int, err error) {
fs.Debugf(nil, "BOOM: simulated read failure")
return 0, errors.New("BOOM: simulated read failure")
}
type wgReadCloser struct {
io.ReadCloser
wg *sync.WaitGroup
}
func (rc wgReadCloser) Close() (err error) {
rc.wg.Done()
return rc.ReadCloser.Close()
}
// Make sure aborting the multi-thread copy doesn't overwrite an existing file.
func TestMultithreadCopyAbort(t *testing.T) {
r := fstest.NewRun(t)
ctx := context.Background()
chunkSize := skipIfNotMultithread(ctx, t, r)
size := 2*chunkSize + 1
if *fstest.SizeLimit > 0 && int64(size) > *fstest.SizeLimit {
t.Skipf("exceeded file size limit %d > %d", size, *fstest.SizeLimit)
}
// first write a canary file which we are trying not to overwrite
const fileName = "test-multithread-abort"
contents := random.String(100)
t1 := fstest.Time("2001-02-03T04:05:06.499999999Z")
canary := r.WriteObject(ctx, fileName, contents, t1)
r.CheckRemoteItems(t, canary)
// Now write a local file to upload
file1 := r.WriteFile(fileName, random.String(size), t1)
r.CheckLocalItems(t, file1)
src, err := r.Flocal.NewObject(ctx, fileName)
require.NoError(t, err) require.NoError(t, err)
accounting.GlobalStats().ResetCounters() accounting.GlobalStats().ResetCounters()
tr := accounting.GlobalStats().NewTransfer(src) tr := accounting.GlobalStats().NewTransfer(src)
@ -139,14 +289,18 @@ func TestMultithreadCopy(t *testing.T) {
defer func() { defer func() {
tr.Done(ctx, err) tr.Done(ctx, err)
}() }()
dst, err := multiThreadCopy(ctx, r.Flocal, "file1", src, 2, tr) wg := new(sync.WaitGroup)
dst, err := multiThreadCopy(ctx, r.Fremote, fileName, errorObject{src, int64(size), wg}, 1, tr)
assert.Error(t, err)
assert.Nil(t, dst)
if r.Fremote.Features().PartialUploads {
r.CheckRemoteItems(t)
} else {
r.CheckRemoteItems(t, canary)
o, err := r.Fremote.NewObject(ctx, fileName)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, src.Size(), dst.Size()) require.NoError(t, o.Remove(ctx))
assert.Equal(t, "file1", dst.Remote())
fstest.CheckListingWithPrecision(t, r.Flocal, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
require.NoError(t, dst.Remove(ctx))
})
} }
} }