b2: fix uploading files bigger than 1TiB

Before this change when uploading files bigger than 1TiB, the chunk
calculator would work out that the chunk size needed to be bigger than
the default 100 MiB to fit within the 10,000 parts limit.

However the uploader was still using the memory pool for the old chunk
size and this caused errors like

    panic: runtime error: slice bounds out of range [:122683392] with capacity 100663296

The fix for this is to make a temporary pool with the larger chunk
size and use it during the upload of the large file.

See: https://forum.rclone.org/t/rclone-cannot-complete-upload-to-b2-restarts-upload-frequently/35617/
This commit is contained in:
Nick Craig-Wood 2023-01-22 12:46:23 +00:00
parent a6f6a9dcdf
commit 351fc609b1

View file

@ -14,6 +14,7 @@ import (
"io" "io"
"strings" "strings"
"sync" "sync"
"time"
"github.com/rclone/rclone/backend/b2/api" "github.com/rclone/rclone/backend/b2/api"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
@ -21,6 +22,7 @@ import (
"github.com/rclone/rclone/fs/chunksize" "github.com/rclone/rclone/fs/chunksize"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/rest"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -428,18 +430,47 @@ func (up *largeUpload) Upload(ctx context.Context) (err error) {
defer atexit.OnError(&err, func() { _ = up.cancel(ctx) })() defer atexit.OnError(&err, func() { _ = up.cancel(ctx) })()
fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id) fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id)
var ( var (
g, gCtx = errgroup.WithContext(ctx) g, gCtx = errgroup.WithContext(ctx)
remaining = up.size remaining = up.size
uploadPool *pool.Pool
ci = fs.GetConfig(ctx)
) )
// If using large chunk size then make a temporary pool
if up.chunkSize <= int64(up.f.opt.ChunkSize) {
uploadPool = up.f.pool
} else {
uploadPool = pool.New(
time.Duration(up.f.opt.MemoryPoolFlushTime),
int(up.chunkSize),
ci.Transfers,
up.f.opt.MemoryPoolUseMmap,
)
defer uploadPool.Flush()
}
// Get an upload token and a buffer
getBuf := func() (buf []byte) {
up.f.getBuf(true)
if !up.doCopy {
buf = uploadPool.Get()
}
return buf
}
// Put an upload token and a buffer
putBuf := func(buf []byte) {
if !up.doCopy {
uploadPool.Put(buf)
}
up.f.putBuf(nil, true)
}
g.Go(func() error { g.Go(func() error {
for part := int64(1); part <= up.parts; part++ { for part := int64(1); part <= up.parts; part++ {
// Get a block of memory from the pool and token which limits concurrency. // Get a block of memory from the pool and token which limits concurrency.
buf := up.f.getBuf(up.doCopy) buf := getBuf()
// Fail fast, in case an errgroup managed function returns an error // Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts. // gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil { if gCtx.Err() != nil {
up.f.putBuf(buf, up.doCopy) putBuf(buf)
return nil return nil
} }
@ -453,14 +484,14 @@ func (up *largeUpload) Upload(ctx context.Context) (err error) {
buf = buf[:reqSize] buf = buf[:reqSize]
_, err = io.ReadFull(up.in, buf) _, err = io.ReadFull(up.in, buf)
if err != nil { if err != nil {
up.f.putBuf(buf, up.doCopy) putBuf(buf)
return err return err
} }
} }
part := part // for the closure part := part // for the closure
g.Go(func() (err error) { g.Go(func() (err error) {
defer up.f.putBuf(buf, up.doCopy) defer putBuf(buf)
if !up.doCopy { if !up.doCopy {
err = up.transferChunk(gCtx, part, buf) err = up.transferChunk(gCtx, part, buf)
} else { } else {