forked from TrueCloudLab/rclone
operations: fix missing metadata for multipart transfers to local disk
Before this change multipart downloads to the local disk with --metadata failed to have their metadata set properly. This was because the OpenWriterAt interface doesn't receive metadata when creating the object. This patch fixes the problem by using the recently introduced Object.SetMetadata method to set the metadata on the object after the download has completed (when using --metadata). If the backend we are copying to is using OpenWriterAt but the Object doesn't support SetMetadata then it will write an ERROR level log but complete successfully. This should not happen at the moment as only the local backend supports metadata and OpenWriterAt but it may in the future. It also adds a test to check metadata is preserved when doing multipart transfers. Fixes #7424
This commit is contained in:
parent
629e895da8
commit
6a0a54ab97
2 changed files with 70 additions and 21 deletions
|
@ -131,6 +131,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
|
||||||
openChunkWriter := f.Features().OpenChunkWriter
|
openChunkWriter := f.Features().OpenChunkWriter
|
||||||
ci := fs.GetConfig(ctx)
|
ci := fs.GetConfig(ctx)
|
||||||
noBuffering := false
|
noBuffering := false
|
||||||
|
usingOpenWriterAt := false
|
||||||
if openChunkWriter == nil {
|
if openChunkWriter == nil {
|
||||||
openWriterAt := f.Features().OpenWriterAt
|
openWriterAt := f.Features().OpenWriterAt
|
||||||
if openWriterAt == nil {
|
if openWriterAt == nil {
|
||||||
|
@ -140,6 +141,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
|
||||||
// If we are using OpenWriterAt we don't seek the chunks so don't need to buffer
|
// If we are using OpenWriterAt we don't seek the chunks so don't need to buffer
|
||||||
fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt")
|
fs.Debugf(src, "multi-thread copy: disabling buffering because destination uses OpenWriterAt")
|
||||||
noBuffering = true
|
noBuffering = true
|
||||||
|
usingOpenWriterAt = true
|
||||||
} else if src.Fs().Features().IsLocal {
|
} else if src.Fs().Features().IsLocal {
|
||||||
// If the source fs is local we don't need to buffer
|
// If the source fs is local we don't need to buffer
|
||||||
fs.Debugf(src, "multi-thread copy: disabling buffering because source is local disk")
|
fs.Debugf(src, "multi-thread copy: disabling buffering because source is local disk")
|
||||||
|
@ -241,7 +243,26 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
|
||||||
return nil, fmt.Errorf("multi-thread copy: failed to find object after copy: %w", err)
|
return nil, fmt.Errorf("multi-thread copy: failed to find object after copy: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if f.Features().PartialUploads {
|
// OpenWriterAt doesn't set metadata so we need to set it on completion
|
||||||
|
if usingOpenWriterAt {
|
||||||
|
setModTime := true
|
||||||
|
if ci.Metadata {
|
||||||
|
do, ok := obj.(fs.SetMetadataer)
|
||||||
|
if ok {
|
||||||
|
meta, err := fs.GetMetadataOptions(ctx, f, src, options)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("multi-thread copy: failed to read metadata from source object: %w", err)
|
||||||
|
}
|
||||||
|
err = do.SetMetadata(ctx, meta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("multi-thread copy: failed to set metadata: %w", err)
|
||||||
|
}
|
||||||
|
setModTime = false
|
||||||
|
} else {
|
||||||
|
fs.Errorf(obj, "multi-thread copy: can't set metadata as SetMetadata isn't implemented in: %v", f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if setModTime {
|
||||||
err = obj.SetModTime(ctx, src.ModTime(ctx))
|
err = obj.SetModTime(ctx, src.ModTime(ctx))
|
||||||
switch err {
|
switch err {
|
||||||
case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete:
|
case nil, fs.ErrorCantSetModTime, fs.ErrorCantSetModTimeWithoutDelete:
|
||||||
|
@ -249,6 +270,7 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object,
|
||||||
return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err)
|
return nil, fmt.Errorf("multi-thread copy: failed to set modification time: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fs.Debugf(src, "Finished multi-thread copy with %d parts of size %v", mc.numChunks, fs.SizeSuffix(mc.partSize))
|
fs.Debugf(src, "Finished multi-thread copy with %d parts of size %v", mc.numChunks, fs.SizeSuffix(mc.partSize))
|
||||||
return obj, nil
|
return obj, nil
|
||||||
|
|
|
@ -146,6 +146,9 @@ func TestMultithreadCopy(t *testing.T) {
|
||||||
r := fstest.NewRun(t)
|
r := fstest.NewRun(t)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
chunkSize := skipIfNotMultithread(ctx, t, r)
|
chunkSize := skipIfNotMultithread(ctx, t, r)
|
||||||
|
// Check every other transfer for metadata
|
||||||
|
checkMetadata := false
|
||||||
|
ctx, ci := fs.AddConfig(ctx)
|
||||||
|
|
||||||
for _, upload := range []bool{false, true} {
|
for _, upload := range []bool{false, true} {
|
||||||
for _, test := range []struct {
|
for _, test := range []struct {
|
||||||
|
@ -156,6 +159,8 @@ func TestMultithreadCopy(t *testing.T) {
|
||||||
{size: chunkSize * 2, streams: 2},
|
{size: chunkSize * 2, streams: 2},
|
||||||
{size: chunkSize*2 + 1, streams: 2},
|
{size: chunkSize*2 + 1, streams: 2},
|
||||||
} {
|
} {
|
||||||
|
checkMetadata = !checkMetadata
|
||||||
|
ci.Metadata = checkMetadata
|
||||||
fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams)
|
fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams)
|
||||||
t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) {
|
t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) {
|
||||||
if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit {
|
if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit {
|
||||||
|
@ -167,21 +172,41 @@ func TestMultithreadCopy(t *testing.T) {
|
||||||
file1 fstest.Item
|
file1 fstest.Item
|
||||||
src, dst fs.Object
|
src, dst fs.Object
|
||||||
err error
|
err error
|
||||||
|
testMetadata = fs.Metadata{
|
||||||
|
// System metadata supported by all backends
|
||||||
|
"mtime": t1.Format(time.RFC3339Nano),
|
||||||
|
// User metadata
|
||||||
|
"potato": "jersey",
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var fSrc, fDst fs.Fs
|
||||||
if upload {
|
if upload {
|
||||||
file1 = r.WriteFile(fileName, contents, t1)
|
file1 = r.WriteFile(fileName, contents, t1)
|
||||||
r.CheckRemoteItems(t)
|
r.CheckRemoteItems(t)
|
||||||
r.CheckLocalItems(t, file1)
|
r.CheckLocalItems(t, file1)
|
||||||
src, err = r.Flocal.NewObject(ctx, fileName)
|
fDst, fSrc = r.Fremote, r.Flocal
|
||||||
} else {
|
} else {
|
||||||
file1 = r.WriteObject(ctx, fileName, contents, t1)
|
file1 = r.WriteObject(ctx, fileName, contents, t1)
|
||||||
r.CheckRemoteItems(t, file1)
|
r.CheckRemoteItems(t, file1)
|
||||||
r.CheckLocalItems(t)
|
r.CheckLocalItems(t)
|
||||||
src, err = r.Fremote.NewObject(ctx, fileName)
|
fDst, fSrc = r.Flocal, r.Fremote
|
||||||
}
|
}
|
||||||
|
src, err = fSrc.NewObject(ctx, fileName)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
do, canSetMetadata := src.(fs.SetMetadataer)
|
||||||
|
if checkMetadata && canSetMetadata {
|
||||||
|
// Set metadata on the source if required
|
||||||
|
err := do.SetMetadata(ctx, testMetadata)
|
||||||
|
if err == fs.ErrorNotImplemented {
|
||||||
|
canSetMetadata = false
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
fstest.CheckEntryMetadata(ctx, t, r.Flocal, src, testMetadata)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
accounting.GlobalStats().ResetCounters()
|
accounting.GlobalStats().ResetCounters()
|
||||||
tr := accounting.GlobalStats().NewTransfer(src, nil)
|
tr := accounting.GlobalStats().NewTransfer(src, nil)
|
||||||
|
|
||||||
|
@ -189,19 +214,21 @@ func TestMultithreadCopy(t *testing.T) {
|
||||||
tr.Done(ctx, err)
|
tr.Done(ctx, err)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if upload {
|
dst, err = multiThreadCopy(ctx, fDst, fileName, src, test.streams, tr)
|
||||||
dst, err = multiThreadCopy(ctx, r.Fremote, fileName, src, test.streams, tr)
|
|
||||||
} else {
|
|
||||||
dst, err = multiThreadCopy(ctx, r.Flocal, fileName, src, test.streams, tr)
|
|
||||||
}
|
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Equal(t, src.Size(), dst.Size())
|
assert.Equal(t, src.Size(), dst.Size())
|
||||||
assert.Equal(t, fileName, dst.Remote())
|
assert.Equal(t, fileName, dst.Remote())
|
||||||
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
|
fstest.CheckListingWithPrecision(t, fSrc, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))
|
||||||
fstest.CheckListingWithPrecision(t, r.Flocal, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, r.Flocal, r.Fremote))
|
fstest.CheckListingWithPrecision(t, fDst, []fstest.Item{file1}, nil, fs.GetModifyWindow(ctx, fDst, fSrc))
|
||||||
|
|
||||||
|
if checkMetadata && canSetMetadata && fDst.Features().ReadMetadata {
|
||||||
|
fstest.CheckEntryMetadata(ctx, t, fDst, dst, testMetadata)
|
||||||
|
}
|
||||||
|
|
||||||
require.NoError(t, dst.Remove(ctx))
|
require.NoError(t, dst.Remove(ctx))
|
||||||
require.NoError(t, src.Remove(ctx))
|
require.NoError(t, src.Remove(ctx))
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue