azureblob,b2,s3: fix chunksize calculations producing too many parts
Before this fix, the chunksize calculator was using the previous size of the object, not the new size of the object to calculate the chunk sizes. This meant that uploading a replacement object which needed a new chunk size would fail, using too many parts. This fix fixes the calculator to take the size explicitly.
This commit is contained in:
parent
cb8842941b
commit
0501773db1
5 changed files with 111 additions and 37 deletions
|
@ -1678,14 +1678,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
}
|
||||
}
|
||||
|
||||
uploadParts := int64(maxUploadParts)
|
||||
uploadParts := maxUploadParts
|
||||
if uploadParts < 1 {
|
||||
uploadParts = 1
|
||||
} else if uploadParts > maxUploadParts {
|
||||
uploadParts = maxUploadParts
|
||||
}
|
||||
// calculate size of parts/blocks
|
||||
partSize := chunksize.Calculator(o, int(uploadParts), o.fs.opt.ChunkSize)
|
||||
partSize := chunksize.Calculator(o, src.Size(), uploadParts, o.fs.opt.ChunkSize)
|
||||
|
||||
putBlobOptions := azblob.UploadStreamToBlockBlobOptions{
|
||||
BufferSize: int(partSize),
|
||||
|
|
|
@ -97,7 +97,7 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
|
|||
if size == -1 {
|
||||
fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", f.opt.ChunkSize, maxParts*f.opt.ChunkSize)
|
||||
} else {
|
||||
chunkSize = chunksize.Calculator(src, maxParts, defaultChunkSize)
|
||||
chunkSize = chunksize.Calculator(o, size, maxParts, defaultChunkSize)
|
||||
parts = size / int64(chunkSize)
|
||||
if size%int64(chunkSize) != 0 {
|
||||
parts++
|
||||
|
|
|
@ -2116,7 +2116,7 @@ type Options struct {
|
|||
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
|
||||
CopyCutoff fs.SizeSuffix `config:"copy_cutoff"`
|
||||
ChunkSize fs.SizeSuffix `config:"chunk_size"`
|
||||
MaxUploadParts int64 `config:"max_upload_parts"`
|
||||
MaxUploadParts int `config:"max_upload_parts"`
|
||||
DisableChecksum bool `config:"disable_checksum"`
|
||||
SharedCredentialsFile string `config:"shared_credentials_file"`
|
||||
Profile string `config:"profile"`
|
||||
|
@ -4718,10 +4718,10 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
if size == -1 {
|
||||
warnStreamUpload.Do(func() {
|
||||
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
|
||||
f.opt.ChunkSize, fs.SizeSuffix(int64(partSize)*uploadParts))
|
||||
f.opt.ChunkSize, fs.SizeSuffix(int64(partSize)*int64(uploadParts)))
|
||||
})
|
||||
} else {
|
||||
partSize = chunksize.Calculator(o, int(uploadParts), f.opt.ChunkSize)
|
||||
partSize = chunksize.Calculator(o, size, uploadParts, f.opt.ChunkSize)
|
||||
}
|
||||
|
||||
memPool := f.getMemoryPool(int64(partSize))
|
||||
|
|
|
@ -5,18 +5,26 @@ import (
|
|||
"github.com/rclone/rclone/fs"
|
||||
)
|
||||
|
||||
/*
|
||||
Calculator calculates the minimum chunk size needed to fit within the maximum number of parts, rounded up to the nearest fs.Mebi
|
||||
// Calculator calculates the minimum chunk size needed to fit within
|
||||
// the maximum number of parts, rounded up to the nearest fs.Mebi.
|
||||
//
|
||||
// For most backends, (chunk_size) * (concurrent_upload_routines)
|
||||
// memory will be required so we want to use the smallest possible
|
||||
// chunk size that's going to allow the upload to proceed. Rounding up
|
||||
// to the nearest fs.Mebi on the assumption that some backends may
|
||||
// only allow integer type parameters when specifying the chunk size.
|
||||
//
|
||||
// Returns the default chunk size if it is sufficiently large enough
|
||||
// to support the given file size otherwise returns the smallest chunk
|
||||
// size necessary to allow the upload to proceed.
|
||||
func Calculator(o interface{}, size int64, maxParts int, defaultChunkSize fs.SizeSuffix) fs.SizeSuffix {
|
||||
// If streaming then use default chunk size
|
||||
if size < 0 {
|
||||
fs.Debugf(o, "Streaming upload with chunk_size %s allows uploads of up to %s and will fail only when that limit is reached.", defaultChunkSize, fs.SizeSuffix(maxParts)*defaultChunkSize)
|
||||
|
||||
For most backends, (chunk_size) * (concurrent_upload_routines) memory will be required so we want to use the smallest
|
||||
possible chunk size that's going to allow the upload to proceed. Rounding up to the nearest fs.Mebi on the assumption
|
||||
that some backends may only allow integer type parameters when specifying the chunk size.
|
||||
|
||||
Returns the default chunk size if it is sufficiently large enough to support the given file size otherwise returns the
|
||||
smallest chunk size necessary to allow the upload to proceed.
|
||||
*/
|
||||
func Calculator(objInfo fs.ObjectInfo, maxParts int, defaultChunkSize fs.SizeSuffix) fs.SizeSuffix {
|
||||
fileSize := fs.SizeSuffix(objInfo.Size())
|
||||
return defaultChunkSize
|
||||
}
|
||||
fileSize := fs.SizeSuffix(size)
|
||||
requiredChunks := fileSize / defaultChunkSize
|
||||
if requiredChunks < fs.SizeSuffix(maxParts) || (requiredChunks == fs.SizeSuffix(maxParts) && fileSize%defaultChunkSize == 0) {
|
||||
return defaultChunkSize
|
||||
|
@ -31,6 +39,6 @@ func Calculator(objInfo fs.ObjectInfo, maxParts int, defaultChunkSize fs.SizeSuf
|
|||
minChunk += fs.Mebi
|
||||
}
|
||||
|
||||
fs.Debugf(objInfo, "size: %v, parts: %v, default: %v, new: %v; default chunk size insufficient, returned new chunk size", fileSize, maxParts, defaultChunkSize, minChunk)
|
||||
fs.Debugf(o, "size: %v, parts: %v, default: %v, new: %v; default chunk size insufficient, returned new chunk size", fileSize, maxParts, defaultChunkSize, minChunk)
|
||||
return minChunk
|
||||
}
|
||||
|
|
|
@ -2,34 +2,100 @@ package chunksize
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fs/object"
|
||||
)
|
||||
|
||||
func TestComputeChunkSize(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
fileSize fs.SizeSuffix
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
size fs.SizeSuffix
|
||||
maxParts int
|
||||
defaultChunkSize fs.SizeSuffix
|
||||
expected fs.SizeSuffix
|
||||
want fs.SizeSuffix
|
||||
}{
|
||||
"default size returned when file size is small enough": {fileSize: 1000, maxParts: 10000, defaultChunkSize: toSizeSuffixMiB(10), expected: toSizeSuffixMiB(10)},
|
||||
"default size returned when file size is just 1 byte small enough": {fileSize: toSizeSuffixMiB(100000) - 1, maxParts: 10000, defaultChunkSize: toSizeSuffixMiB(10), expected: toSizeSuffixMiB(10)},
|
||||
"no rounding up when everything divides evenly": {fileSize: toSizeSuffixMiB(1000000), maxParts: 10000, defaultChunkSize: toSizeSuffixMiB(100), expected: toSizeSuffixMiB(100)},
|
||||
"rounding up to nearest MiB when not quite enough parts": {fileSize: toSizeSuffixMiB(1000000), maxParts: 9999, defaultChunkSize: toSizeSuffixMiB(100), expected: toSizeSuffixMiB(101)},
|
||||
"rounding up to nearest MiB when one extra byte": {fileSize: toSizeSuffixMiB(1000000) + 1, maxParts: 10000, defaultChunkSize: toSizeSuffixMiB(100), expected: toSizeSuffixMiB(101)},
|
||||
"expected MiB value when rounding sets to absolute minimum": {fileSize: toSizeSuffixMiB(1) - 1, maxParts: 1, defaultChunkSize: toSizeSuffixMiB(1), expected: toSizeSuffixMiB(1)},
|
||||
"expected MiB value when rounding to absolute min with extra": {fileSize: toSizeSuffixMiB(1) + 1, maxParts: 1, defaultChunkSize: toSizeSuffixMiB(1), expected: toSizeSuffixMiB(2)},
|
||||
}
|
||||
{
|
||||
name: "streaming file",
|
||||
size: -1,
|
||||
maxParts: 10000,
|
||||
defaultChunkSize: toSizeSuffixMiB(10),
|
||||
want: toSizeSuffixMiB(10),
|
||||
}, {
|
||||
name: "default size returned when file size is small enough",
|
||||
size: 1000,
|
||||
maxParts: 10000,
|
||||
defaultChunkSize: toSizeSuffixMiB(10),
|
||||
want: toSizeSuffixMiB(10),
|
||||
}, {
|
||||
name: "default size returned when file size is just 1 byte small enough",
|
||||
size: toSizeSuffixMiB(100000) - 1,
|
||||
maxParts: 10000,
|
||||
defaultChunkSize: toSizeSuffixMiB(10),
|
||||
want: toSizeSuffixMiB(10),
|
||||
}, {
|
||||
name: "no rounding up when everything divides evenly",
|
||||
size: toSizeSuffixMiB(1000000),
|
||||
maxParts: 10000,
|
||||
defaultChunkSize: toSizeSuffixMiB(100),
|
||||
want: toSizeSuffixMiB(100),
|
||||
}, {
|
||||
name: "rounding up to nearest MiB when not quite enough parts",
|
||||
size: toSizeSuffixMiB(1000000),
|
||||
maxParts: 9999,
|
||||
defaultChunkSize: toSizeSuffixMiB(100),
|
||||
want: toSizeSuffixMiB(101),
|
||||
}, {
|
||||
name: "rounding up to nearest MiB when one extra byte",
|
||||
size: toSizeSuffixMiB(1000000) + 1,
|
||||
maxParts: 10000,
|
||||
defaultChunkSize: toSizeSuffixMiB(100),
|
||||
want: toSizeSuffixMiB(101),
|
||||
}, {
|
||||
name: "expected MiB value when rounding sets to absolute minimum",
|
||||
size: toSizeSuffixMiB(1) - 1,
|
||||
maxParts: 1,
|
||||
defaultChunkSize: toSizeSuffixMiB(1),
|
||||
want: toSizeSuffixMiB(1),
|
||||
}, {
|
||||
name: "expected MiB value when rounding to absolute min with extra",
|
||||
size: toSizeSuffixMiB(1) + 1,
|
||||
maxParts: 1,
|
||||
defaultChunkSize: toSizeSuffixMiB(1),
|
||||
want: toSizeSuffixMiB(2),
|
||||
}, {
|
||||
name: "issue from forum #1",
|
||||
size: 120864818840,
|
||||
maxParts: 10000,
|
||||
defaultChunkSize: 5 * 1024 * 1024,
|
||||
want: toSizeSuffixMiB(12),
|
||||
},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
got := Calculator(test.name, int64(test.size), test.maxParts, test.defaultChunkSize)
|
||||
if got != test.want {
|
||||
t.Fatalf("expected: %v, got: %v", test.want, got)
|
||||
}
|
||||
if test.size < 0 {
|
||||
return
|
||||
}
|
||||
parts := func(result fs.SizeSuffix) int {
|
||||
n := test.size / result
|
||||
r := test.size % result
|
||||
if r != 0 {
|
||||
n++
|
||||
}
|
||||
return int(n)
|
||||
}
|
||||
// Check this gives the parts in range
|
||||
if parts(got) > test.maxParts {
|
||||
t.Fatalf("too many parts %d", parts(got))
|
||||
}
|
||||
// Check that setting chunk size smaller gave too many parts
|
||||
if got > test.defaultChunkSize {
|
||||
if parts(got-toSizeSuffixMiB(1)) <= test.maxParts {
|
||||
t.Fatalf("chunk size %v too big as %v only gives %d parts", got, got-toSizeSuffixMiB(1), parts(got-toSizeSuffixMiB(1)))
|
||||
}
|
||||
|
||||
for name, tc := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
src := object.NewStaticObjectInfo("mock", time.Now(), int64(tc.fileSize), true, nil, nil)
|
||||
result := Calculator(src, tc.maxParts, tc.defaultChunkSize)
|
||||
if result != tc.expected {
|
||||
t.Fatalf("expected: %v, got: %v", tc.expected, result)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue