diff --git a/backend/b2/b2.go b/backend/b2/b2.go index 73b9e700b..b985b07f2 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -1923,7 +1923,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op return err } // NB Stream returns the buffer and token - return up.Stream(ctx, rw) + err = up.Stream(ctx, rw) + if err != nil { + return err + } + return o.decodeMetaDataFileInfo(up.info) } else if err == io.EOF { fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n) defer o.fs.putRW(rw) diff --git a/backend/b2/b2_internal_test.go b/backend/b2/b2_internal_test.go index 34a01eec6..f5f0f33ff 100644 --- a/backend/b2/b2_internal_test.go +++ b/backend/b2/b2_internal_test.go @@ -1,10 +1,14 @@ package b2 import ( + "bytes" "context" + "fmt" "testing" "time" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/object" "github.com/rclone/rclone/fstest" "github.com/rclone/rclone/fstest/fstests" "github.com/rclone/rclone/lib/random" @@ -191,7 +195,7 @@ func (f *Fs) InternalTestChunkedCopy(t *testing.T) { // Set copy cutoff to mininum value so we make chunks origCutoff := f.opt.CopyCutoff - f.opt.CopyCutoff = 5 * 1024 * 1024 + f.opt.CopyCutoff = minChunkSize defer func() { f.opt.CopyCutoff = origCutoff }() @@ -216,9 +220,57 @@ func (f *Fs) InternalTestChunkedCopy(t *testing.T) { assert.Equal(t, contents, gotContents) } +// The integration tests do a reasonable job of testing the normal +// streaming upload but don't test the chunked streaming upload. +func (f *Fs) InternalTestChunkedStreamingUpload(t *testing.T, size int) { + ctx := context.Background() + contents := random.String(size) + item := fstest.NewItem(fmt.Sprintf("chunked-streaming-upload-%d", size), contents, fstest.Time("2001-05-06T04:05:06.499Z")) + + // Set chunk size to mininum value so we make chunks + origOpt := f.opt + f.opt.ChunkSize = minChunkSize + f.opt.UploadCutoff = 0 + defer func() { + f.opt = origOpt + }() + + // Do the streaming upload + src := object.NewStaticObjectInfo(item.Path, item.ModTime, -1, true, item.Hashes, f) + in := bytes.NewBufferString(contents) + dst, err := f.PutStream(ctx, in, src) + require.NoError(t, err) + defer func() { + assert.NoError(t, dst.Remove(ctx)) + }() + + // Check size + assert.Equal(t, int64(size), dst.Size()) + + // Check modtime + srcModTime := src.ModTime(ctx) + dstModTime := dst.ModTime(ctx) + assert.Equal(t, srcModTime, dstModTime) + + // Make sure contents are correct + gotContents := fstests.ReadObject(ctx, t, dst, -1) + assert.Equal(t, contents, gotContents, "Contents incorrect") +} + // -run TestIntegration/FsMkdir/FsPutFiles/Internal func (f *Fs) InternalTest(t *testing.T) { t.Run("ChunkedCopy", f.InternalTestChunkedCopy) + for _, size := range []fs.SizeSuffix{ + minChunkSize - 1, + minChunkSize, + minChunkSize + 1, + (3 * minChunkSize) / 2, + (5 * minChunkSize) / 2, + } { + t.Run(fmt.Sprintf("ChunkedStreamingUpload/%d", size), func(t *testing.T) { + f.InternalTestChunkedStreamingUpload(t, int(size)) + }) + } } var _ fstests.InternalTester = (*Fs)(nil) diff --git a/backend/b2/upload.go b/backend/b2/upload.go index a8897dcaf..ef0102347 100644 --- a/backend/b2/upload.go +++ b/backend/b2/upload.go @@ -393,10 +393,11 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW) hasMoreParts = true ) up.size = initialUploadBlock.Size() + up.parts = 0 for part := 0; hasMoreParts; part++ { // Get a block of memory from the pool and token which limits concurrency. var rw *pool.RW - if part == 1 { + if part == 0 { rw = initialUploadBlock } else { rw = up.f.getRW(false) @@ -411,7 +412,7 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW) // Read the chunk var n int64 - if part == 1 { + if part == 0 { n = rw.Size() } else { n, err = io.CopyN(rw, up.in, up.chunkSize) @@ -426,7 +427,7 @@ func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW) } // Keep stats up to date - up.parts = part + up.parts += 1 up.size += n if part > maxParts { up.f.putRW(rw)