fstests: add TestFsPutChunked
This commit is contained in:
parent
84289d1d69
commit
57273d364b
2 changed files with 126 additions and 0 deletions
|
@ -4,6 +4,7 @@ package fs
|
|||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
|
@ -130,3 +131,15 @@ func (x *SizeSuffix) Scan(s fmt.ScanState, ch rune) error {
|
|||
}
|
||||
return x.Set(string(token))
|
||||
}
|
||||
|
||||
// SizeSuffixList is a sclice SizeSuffix values
|
||||
type SizeSuffixList []SizeSuffix
|
||||
|
||||
func (l SizeSuffixList) Len() int { return len(l) }
|
||||
func (l SizeSuffixList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
|
||||
func (l SizeSuffixList) Less(i, j int) bool { return l[i] < l[j] }
|
||||
|
||||
// Sort sorts the list
|
||||
func (l SizeSuffixList) Sort() {
|
||||
sort.Sort(l)
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/ncw/rclone/fs/operations"
|
||||
"github.com/ncw/rclone/fs/walk"
|
||||
"github.com/ncw/rclone/fstest"
|
||||
"github.com/ncw/rclone/lib/readers"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -145,6 +146,36 @@ again:
|
|||
return contents
|
||||
}
|
||||
|
||||
// testPutLarge puts file to the remote, checks it and removes it on success.
|
||||
func testPutLarge(t *testing.T, f fs.Fs, file *fstest.Item) {
|
||||
tries := 1
|
||||
const maxTries = 10
|
||||
again:
|
||||
r := readers.NewPatternReader(file.Size)
|
||||
hash := hash.NewMultiHasher()
|
||||
in := io.TeeReader(r, hash)
|
||||
|
||||
obji := object.NewStaticObjectInfo(file.Path, file.ModTime, file.Size, true, nil, nil)
|
||||
obj, err := f.Put(in, obji)
|
||||
if err != nil {
|
||||
// Retry if err returned a retry error
|
||||
if fserrors.IsRetryError(err) && tries < maxTries {
|
||||
t.Logf("Put error: %v - low level retry %d/%d", err, tries, maxTries)
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
tries++
|
||||
goto again
|
||||
}
|
||||
require.NoError(t, err, fmt.Sprintf("Put error: %v", err))
|
||||
}
|
||||
file.Hashes = hash.Sums()
|
||||
file.Check(t, obj, f.Precision())
|
||||
// Re-read the object and check again
|
||||
obj = findObject(t, f, file.Path)
|
||||
file.Check(t, obj, f.Precision())
|
||||
require.NoError(t, obj.Remove())
|
||||
}
|
||||
|
||||
// errorReader just returne an error on Read
|
||||
type errorReader struct {
|
||||
err error
|
||||
|
@ -467,6 +498,88 @@ func Run(t *testing.T, opt *Opt) {
|
|||
// Note that the next test will check there are no duplicated file names
|
||||
})
|
||||
|
||||
t.Run("TestFsPutChunked", func(t *testing.T) {
|
||||
skipIfNotOk(t)
|
||||
|
||||
setUploadChunkSizer, _ := remote.(SetUploadChunkSizer)
|
||||
if setUploadChunkSizer == nil {
|
||||
t.Skipf("%T does not implement SetUploadChunkSizer", remote)
|
||||
}
|
||||
|
||||
minChunkSize := opt.ChunkedUpload.MinChunkSize
|
||||
if minChunkSize < 100 {
|
||||
minChunkSize = 100
|
||||
}
|
||||
if opt.ChunkedUpload.CeilChunkSize != nil {
|
||||
minChunkSize = opt.ChunkedUpload.CeilChunkSize(minChunkSize)
|
||||
}
|
||||
|
||||
maxChunkSize := opt.ChunkedUpload.MaxChunkSize
|
||||
if maxChunkSize < minChunkSize {
|
||||
if minChunkSize <= fs.MebiByte {
|
||||
maxChunkSize = 2 * fs.MebiByte
|
||||
} else {
|
||||
maxChunkSize = 2 * minChunkSize
|
||||
}
|
||||
} else if maxChunkSize >= 2*minChunkSize {
|
||||
maxChunkSize = 2 * minChunkSize
|
||||
}
|
||||
if opt.ChunkedUpload.CeilChunkSize != nil {
|
||||
maxChunkSize = opt.ChunkedUpload.CeilChunkSize(maxChunkSize)
|
||||
}
|
||||
|
||||
next := func(f func(fs.SizeSuffix) fs.SizeSuffix) fs.SizeSuffix {
|
||||
s := f(minChunkSize)
|
||||
if s > maxChunkSize {
|
||||
s = minChunkSize
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
chunkSizes := fs.SizeSuffixList{
|
||||
minChunkSize,
|
||||
minChunkSize + (maxChunkSize-minChunkSize)/3,
|
||||
next(NextPowerOfTwo),
|
||||
next(NextMultipleOf(100000)),
|
||||
next(NextMultipleOf(100001)),
|
||||
maxChunkSize,
|
||||
}
|
||||
chunkSizes.Sort()
|
||||
|
||||
old, err := setUploadChunkSizer.SetUploadChunkSize(minChunkSize)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
_, err := setUploadChunkSizer.SetUploadChunkSize(old)
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
var lastCs fs.SizeSuffix
|
||||
for _, cs := range chunkSizes {
|
||||
if cs <= lastCs {
|
||||
continue
|
||||
}
|
||||
if opt.ChunkedUpload.CeilChunkSize != nil {
|
||||
cs = opt.ChunkedUpload.CeilChunkSize(cs)
|
||||
}
|
||||
lastCs = cs
|
||||
|
||||
t.Run(cs.String(), func(t *testing.T) {
|
||||
_, err := setUploadChunkSizer.SetUploadChunkSize(cs)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, fileSize := range []fs.SizeSuffix{cs - 1, cs, 2*cs + 1} {
|
||||
t.Run(fmt.Sprintf("%d", fileSize), func(t *testing.T) {
|
||||
testPutLarge(t, remote, &fstest.Item{
|
||||
ModTime: fstest.Time("2001-02-03T04:05:06.499999999Z"),
|
||||
Path: fmt.Sprintf("chunked-%s-%s.bin", cs.String(), fileSize.String()),
|
||||
Size: int64(fileSize),
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
// TestFsListDirFile2 tests the files are correctly uploaded by doing
|
||||
// Depth 1 directory listings
|
||||
TestFsListDirFile2 := func(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue