forked from TrueCloudLab/rclone
swift: fix upload when using no_chunk to return the correct size
When using the VFS with swift and --swift-no-chunk, PutStream was returning objects with size -1 which was causing corrupted transfer messages. This was fixed by counting the bytes transferred in a streamed file and updating the metadata with that.
This commit is contained in:
parent
fdef567da6
commit
9e81fc343e
2 changed files with 66 additions and 0 deletions
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/rclone/rclone/fs/operations"
|
"github.com/rclone/rclone/fs/operations"
|
||||||
"github.com/rclone/rclone/fs/walk"
|
"github.com/rclone/rclone/fs/walk"
|
||||||
"github.com/rclone/rclone/lib/pacer"
|
"github.com/rclone/rclone/lib/pacer"
|
||||||
|
"github.com/rclone/rclone/lib/readers"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Constants
|
// Constants
|
||||||
|
@ -1224,8 +1225,13 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
}
|
}
|
||||||
o.headers = nil // wipe old metadata
|
o.headers = nil // wipe old metadata
|
||||||
} else {
|
} else {
|
||||||
|
var inCount *readers.CountingReader
|
||||||
if size >= 0 {
|
if size >= 0 {
|
||||||
headers["Content-Length"] = strconv.FormatInt(size, 10) // set Content-Length if we know it
|
headers["Content-Length"] = strconv.FormatInt(size, 10) // set Content-Length if we know it
|
||||||
|
} else {
|
||||||
|
// otherwise count the size for later
|
||||||
|
inCount = readers.NewCountingReader(in)
|
||||||
|
in = inCount
|
||||||
}
|
}
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
|
@ -1242,6 +1248,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
o.md5 = rxHeaders["ETag"]
|
o.md5 = rxHeaders["ETag"]
|
||||||
o.contentType = contentType
|
o.contentType = contentType
|
||||||
o.headers = headers
|
o.headers = headers
|
||||||
|
if inCount != nil {
|
||||||
|
// update the size if streaming from the reader
|
||||||
|
o.size = int64(inCount.BytesRead())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If file was a dynamic large object then remove old/all segments
|
// If file was a dynamic large object then remove old/all segments
|
||||||
|
|
|
@ -2,10 +2,19 @@
|
||||||
package swift
|
package swift
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/fs/hash"
|
||||||
|
"github.com/rclone/rclone/fs/object"
|
||||||
|
"github.com/rclone/rclone/fstest"
|
||||||
"github.com/rclone/rclone/fstest/fstests"
|
"github.com/rclone/rclone/fstest/fstests"
|
||||||
|
"github.com/rclone/rclone/lib/random"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestIntegration runs integration tests against the remote
|
// TestIntegration runs integration tests against the remote
|
||||||
|
@ -21,3 +30,50 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ fstests.SetUploadChunkSizer = (*Fs)(nil)
|
var _ fstests.SetUploadChunkSizer = (*Fs)(nil)
|
||||||
|
|
||||||
|
// Check that PutStream works with NoChunk as it is the major code
|
||||||
|
// deviation
|
||||||
|
func (f *Fs) testNoChunk(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
f.opt.NoChunk = true
|
||||||
|
defer func() {
|
||||||
|
f.opt.NoChunk = false
|
||||||
|
}()
|
||||||
|
|
||||||
|
file := fstest.Item{
|
||||||
|
ModTime: fstest.Time("2001-02-03T04:05:06.499999999Z"),
|
||||||
|
Path: "piped data no chunk.txt",
|
||||||
|
Size: -1, // use unknown size during upload
|
||||||
|
}
|
||||||
|
|
||||||
|
const contentSize = 100
|
||||||
|
|
||||||
|
contents := random.String(contentSize)
|
||||||
|
buf := bytes.NewBufferString(contents)
|
||||||
|
uploadHash := hash.NewMultiHasher()
|
||||||
|
in := io.TeeReader(buf, uploadHash)
|
||||||
|
|
||||||
|
file.Size = -1
|
||||||
|
obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil)
|
||||||
|
obj, err := f.Features().PutStream(ctx, in, obji)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
file.Hashes = uploadHash.Sums()
|
||||||
|
file.Size = int64(contentSize) // use correct size when checking
|
||||||
|
file.Check(t, obj, f.Precision())
|
||||||
|
|
||||||
|
// Re-read the object and check again
|
||||||
|
obj, err = f.NewObject(ctx, file.Path)
|
||||||
|
require.NoError(t, err)
|
||||||
|
file.Check(t, obj, f.Precision())
|
||||||
|
|
||||||
|
// Delete the object
|
||||||
|
assert.NoError(t, obj.Remove(ctx))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Additional tests that aren't in the framework
|
||||||
|
func (f *Fs) InternalTest(t *testing.T) {
|
||||||
|
t.Run("NoChunk", f.testNoChunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ fstests.InternalTester = (*Fs)(nil)
|
||||||
|
|
Loading…
Reference in a new issue