From 351fc609b1b59d9da3a6a024047d9115e8a05acc Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Sun, 22 Jan 2023 12:46:23 +0000 Subject: [PATCH] b2: fix uploading files bigger than 1TiB Before this change when uploading files bigger than 1TiB, the chunk calculator would work out that the chunk size needed to be bigger than the default 100 MiB to fit within the 10,000 parts limit. However the uploader was still using the memory pool for the old chunk size and this caused errors like panic: runtime error: slice bounds out of range [:122683392] with capacity 100663296 The fix for this is to make a temporary pool with the larger chunk size and use it during the upload of the large file. See: https://forum.rclone.org/t/rclone-cannot-complete-upload-to-b2-restarts-upload-frequently/35617/ --- backend/b2/upload.go | 43 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/backend/b2/upload.go b/backend/b2/upload.go index 47ba53473..cdf8dbef0 100644 --- a/backend/b2/upload.go +++ b/backend/b2/upload.go @@ -14,6 +14,7 @@ import ( "io" "strings" "sync" + "time" "github.com/rclone/rclone/backend/b2/api" "github.com/rclone/rclone/fs" @@ -21,6 +22,7 @@ import ( "github.com/rclone/rclone/fs/chunksize" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/rest" "golang.org/x/sync/errgroup" ) @@ -428,18 +430,47 @@ func (up *largeUpload) Upload(ctx context.Context) (err error) { defer atexit.OnError(&err, func() { _ = up.cancel(ctx) })() fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id) var ( - g, gCtx = errgroup.WithContext(ctx) - remaining = up.size + g, gCtx = errgroup.WithContext(ctx) + remaining = up.size + uploadPool *pool.Pool + ci = fs.GetConfig(ctx) ) + // If using large chunk size then make a temporary pool + if up.chunkSize <= int64(up.f.opt.ChunkSize) { + uploadPool = up.f.pool + } else { + uploadPool = pool.New( + time.Duration(up.f.opt.MemoryPoolFlushTime), + int(up.chunkSize), + ci.Transfers, + up.f.opt.MemoryPoolUseMmap, + ) + defer uploadPool.Flush() + } + // Get an upload token and a buffer + getBuf := func() (buf []byte) { + up.f.getBuf(true) + if !up.doCopy { + buf = uploadPool.Get() + } + return buf + } + // Put an upload token and a buffer + putBuf := func(buf []byte) { + if !up.doCopy { + uploadPool.Put(buf) + } + up.f.putBuf(nil, true) + } g.Go(func() error { for part := int64(1); part <= up.parts; part++ { // Get a block of memory from the pool and token which limits concurrency. - buf := up.f.getBuf(up.doCopy) + buf := getBuf() // Fail fast, in case an errgroup managed function returns an error // gCtx is cancelled. There is no point in uploading all the other parts. if gCtx.Err() != nil { - up.f.putBuf(buf, up.doCopy) + putBuf(buf) return nil } @@ -453,14 +484,14 @@ func (up *largeUpload) Upload(ctx context.Context) (err error) { buf = buf[:reqSize] _, err = io.ReadFull(up.in, buf) if err != nil { - up.f.putBuf(buf, up.doCopy) + putBuf(buf) return err } } part := part // for the closure g.Go(func() (err error) { - defer up.f.putBuf(buf, up.doCopy) + defer putBuf(buf) if !up.doCopy { err = up.transferChunk(gCtx, part, buf) } else {