forked from TrueCloudLab/rclone
b2: fix chunked streaming uploads
Streaming uploads are used by rclone rcat and rclone mount --vfs-cache-mode off. After the multipart chunker refactor the multipart chunked streaming upload was accidentally mixing the first and the second parts up which was causing corrupted uploads. This was caused by a simple off by one error in the refactoring where we went from 1 based part number counting to 0 based part number counting. Fixing this revealed that the metadata wasn't being re-read for the copied object either. This fixes both of those issues and adds an integration tests so it won't happen again. Fixes #7367
This commit is contained in:
parent
b9727cc6ab
commit
5fa68e9ca5
3 changed files with 62 additions and 5 deletions
|
@ -1923,7 +1923,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// NB Stream returns the buffer and token
|
// NB Stream returns the buffer and token
|
||||||
return up.Stream(ctx, rw)
|
err = up.Stream(ctx, rw)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return o.decodeMetaDataFileInfo(up.info)
|
||||||
} else if err == io.EOF {
|
} else if err == io.EOF {
|
||||||
fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n)
|
fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n)
|
||||||
defer o.fs.putRW(rw)
|
defer o.fs.putRW(rw)
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
package b2
|
package b2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/fs/object"
|
||||||
"github.com/rclone/rclone/fstest"
|
"github.com/rclone/rclone/fstest"
|
||||||
"github.com/rclone/rclone/fstest/fstests"
|
"github.com/rclone/rclone/fstest/fstests"
|
||||||
"github.com/rclone/rclone/lib/random"
|
"github.com/rclone/rclone/lib/random"
|
||||||
|
@ -191,7 +195,7 @@ func (f *Fs) InternalTestChunkedCopy(t *testing.T) {
|
||||||
|
|
||||||
// Set copy cutoff to mininum value so we make chunks
|
// Set copy cutoff to mininum value so we make chunks
|
||||||
origCutoff := f.opt.CopyCutoff
|
origCutoff := f.opt.CopyCutoff
|
||||||
f.opt.CopyCutoff = 5 * 1024 * 1024
|
f.opt.CopyCutoff = minChunkSize
|
||||||
defer func() {
|
defer func() {
|
||||||
f.opt.CopyCutoff = origCutoff
|
f.opt.CopyCutoff = origCutoff
|
||||||
}()
|
}()
|
||||||
|
@ -216,9 +220,57 @@ func (f *Fs) InternalTestChunkedCopy(t *testing.T) {
|
||||||
assert.Equal(t, contents, gotContents)
|
assert.Equal(t, contents, gotContents)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The integration tests do a reasonable job of testing the normal
|
||||||
|
// streaming upload but don't test the chunked streaming upload.
|
||||||
|
func (f *Fs) InternalTestChunkedStreamingUpload(t *testing.T, size int) {
|
||||||
|
ctx := context.Background()
|
||||||
|
contents := random.String(size)
|
||||||
|
item := fstest.NewItem(fmt.Sprintf("chunked-streaming-upload-%d", size), contents, fstest.Time("2001-05-06T04:05:06.499Z"))
|
||||||
|
|
||||||
|
// Set chunk size to mininum value so we make chunks
|
||||||
|
origOpt := f.opt
|
||||||
|
f.opt.ChunkSize = minChunkSize
|
||||||
|
f.opt.UploadCutoff = 0
|
||||||
|
defer func() {
|
||||||
|
f.opt = origOpt
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Do the streaming upload
|
||||||
|
src := object.NewStaticObjectInfo(item.Path, item.ModTime, -1, true, item.Hashes, f)
|
||||||
|
in := bytes.NewBufferString(contents)
|
||||||
|
dst, err := f.PutStream(ctx, in, src)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
assert.NoError(t, dst.Remove(ctx))
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Check size
|
||||||
|
assert.Equal(t, int64(size), dst.Size())
|
||||||
|
|
||||||
|
// Check modtime
|
||||||
|
srcModTime := src.ModTime(ctx)
|
||||||
|
dstModTime := dst.ModTime(ctx)
|
||||||
|
assert.Equal(t, srcModTime, dstModTime)
|
||||||
|
|
||||||
|
// Make sure contents are correct
|
||||||
|
gotContents := fstests.ReadObject(ctx, t, dst, -1)
|
||||||
|
assert.Equal(t, contents, gotContents, "Contents incorrect")
|
||||||
|
}
|
||||||
|
|
||||||
// -run TestIntegration/FsMkdir/FsPutFiles/Internal
|
// -run TestIntegration/FsMkdir/FsPutFiles/Internal
|
||||||
func (f *Fs) InternalTest(t *testing.T) {
|
func (f *Fs) InternalTest(t *testing.T) {
|
||||||
t.Run("ChunkedCopy", f.InternalTestChunkedCopy)
|
t.Run("ChunkedCopy", f.InternalTestChunkedCopy)
|
||||||
|
for _, size := range []fs.SizeSuffix{
|
||||||
|
minChunkSize - 1,
|
||||||
|
minChunkSize,
|
||||||
|
minChunkSize + 1,
|
||||||
|
(3 * minChunkSize) / 2,
|
||||||
|
(5 * minChunkSize) / 2,
|
||||||
|
} {
|
||||||
|
t.Run(fmt.Sprintf("ChunkedStreamingUpload/%d", size), func(t *testing.T) {
|
||||||
|
f.InternalTestChunkedStreamingUpload(t, int(size))
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ fstests.InternalTester = (*Fs)(nil)
|
var _ fstests.InternalTester = (*Fs)(nil)
|
||||||
|
|
|
@ -393,10 +393,11 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW)
|
||||||
hasMoreParts = true
|
hasMoreParts = true
|
||||||
)
|
)
|
||||||
up.size = initialUploadBlock.Size()
|
up.size = initialUploadBlock.Size()
|
||||||
|
up.parts = 0
|
||||||
for part := 0; hasMoreParts; part++ {
|
for part := 0; hasMoreParts; part++ {
|
||||||
// Get a block of memory from the pool and token which limits concurrency.
|
// Get a block of memory from the pool and token which limits concurrency.
|
||||||
var rw *pool.RW
|
var rw *pool.RW
|
||||||
if part == 1 {
|
if part == 0 {
|
||||||
rw = initialUploadBlock
|
rw = initialUploadBlock
|
||||||
} else {
|
} else {
|
||||||
rw = up.f.getRW(false)
|
rw = up.f.getRW(false)
|
||||||
|
@ -411,7 +412,7 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW)
|
||||||
|
|
||||||
// Read the chunk
|
// Read the chunk
|
||||||
var n int64
|
var n int64
|
||||||
if part == 1 {
|
if part == 0 {
|
||||||
n = rw.Size()
|
n = rw.Size()
|
||||||
} else {
|
} else {
|
||||||
n, err = io.CopyN(rw, up.in, up.chunkSize)
|
n, err = io.CopyN(rw, up.in, up.chunkSize)
|
||||||
|
@ -426,7 +427,7 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Keep stats up to date
|
// Keep stats up to date
|
||||||
up.parts = part
|
up.parts += 1
|
||||||
up.size += n
|
up.size += n
|
||||||
if part > maxParts {
|
if part > maxParts {
|
||||||
up.f.putRW(rw)
|
up.f.putRW(rw)
|
||||||
|
|
Loading…
Reference in a new issue