b2: implement OpenChunkWriter and multi-thread uploads #7056

This implements the OpenChunkWriter interface for b2 which
enables multi-thread uploads.

This makes the memory controls of the s3 backend inoperative; they are
replaced with the global ones.

    --b2-memory-pool-flush-time
    --b2-memory-pool-use-mmap

By using the buffered reader this fixes excessive memory use when
uploading large files as it will share memory pages between all
readers.
This commit is contained in:
Nick Craig-Wood 2023-08-19 15:32:45 +01:00
parent 0427177857
commit ab803d1278
2 changed files with 209 additions and 212 deletions

View file

@ -32,6 +32,7 @@ import (
"github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/rest"
@ -57,9 +58,7 @@ const (
minChunkSize = 5 * fs.Mebi
defaultChunkSize = 96 * fs.Mebi
defaultUploadCutoff = 200 * fs.Mebi
largeFileCopyCutoff = 4 * fs.Gibi // 5E9 is the max
memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long
memoryPoolUseMmap = false
largeFileCopyCutoff = 4 * fs.Gibi // 5E9 is the max
)
// Globals
@ -149,6 +148,18 @@ might a maximum of "--transfers" chunks in progress at once.
5,000,000 Bytes is the minimum size.`,
Default: defaultChunkSize,
Advanced: true,
}, {
Name: "upload_concurrency",
Help: `Concurrency for multipart uploads.
This is the number of chunks of the same file that are uploaded
concurrently.
Note that chunks are stored in memory and there may be up to
"--transfers" * "--b2-upload-concurrency" chunks stored at once
in memory.`,
Default: 16,
Advanced: true,
}, {
Name: "disable_checksum",
Help: `Disable checksums for large (> upload cutoff) files.
@ -188,16 +199,16 @@ The minimum value is 1 second. The maximum value is one week.`,
Advanced: true,
}, {
Name: "memory_pool_flush_time",
Default: memoryPoolFlushTime,
Default: fs.Duration(time.Minute),
Advanced: true,
Help: `How often internal memory buffer pools will be flushed.
Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations.
This option controls how often unused buffers will be removed from the pool.`,
Hide: fs.OptionHideBoth,
Help: `How often internal memory buffer pools will be flushed. (no longer used)`,
}, {
Name: "memory_pool_use_mmap",
Default: memoryPoolUseMmap,
Default: false,
Advanced: true,
Help: `Whether to use mmap buffers in internal memory pool.`,
Hide: fs.OptionHideBoth,
Help: `Whether to use mmap buffers in internal memory pool. (no longer used)`,
}, {
Name: config.ConfigEncoding,
Help: config.ConfigEncodingHelp,
@ -224,11 +235,10 @@ type Options struct {
UploadCutoff fs.SizeSuffix `config:"upload_cutoff"`
CopyCutoff fs.SizeSuffix `config:"copy_cutoff"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
UploadConcurrency int `config:"upload_concurrency"`
DisableCheckSum bool `config:"disable_checksum"`
DownloadURL string `config:"download_url"`
DownloadAuthorizationDuration fs.Duration `config:"download_auth_duration"`
MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"`
MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"`
Enc encoder.MultiEncoder `config:"encoding"`
}
@ -253,7 +263,6 @@ type Fs struct {
authMu sync.Mutex // lock for authorizing the account
pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency
pool *pool.Pool // memory pool
}
// Object describes a b2 object
@ -458,12 +467,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
uploads: make(map[string][]*api.GetUploadURLResponse),
pacer: fs.NewPacer(ctx, pacer.NewDefault(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(ci.Transfers),
pool: pool.New(
time.Duration(opt.MemoryPoolFlushTime),
int(opt.ChunkSize),
ci.Transfers,
opt.MemoryPoolUseMmap,
),
}
f.setRoot(root)
f.features = (&fs.Features{
@ -597,23 +600,24 @@ func (f *Fs) clearUploadURL(bucketID string) {
f.uploadMu.Unlock()
}
// getBuf gets a buffer of f.opt.ChunkSize and an upload token
// getRW gets a RW buffer and an upload token
//
// If noBuf is set then it just gets an upload token
func (f *Fs) getBuf(noBuf bool) (buf []byte) {
func (f *Fs) getRW(noBuf bool) (rw *pool.RW) {
f.uploadToken.Get()
if !noBuf {
buf = f.pool.Get()
rw = multipart.NewRW()
}
return buf
return rw
}
// putBuf returns a buffer to the memory pool and an upload token
// putRW returns a RW buffer to the memory pool and returns an upload
// token
//
// If noBuf is set then it just returns the upload token
func (f *Fs) putBuf(buf []byte, noBuf bool) {
if !noBuf {
f.pool.Put(buf)
// If buf is nil then it just returns the upload token
func (f *Fs) putRW(rw *pool.RW) {
if rw != nil {
_ = rw.Close()
}
f.uploadToken.Put()
}
@ -1293,7 +1297,7 @@ func (f *Fs) copy(ctx context.Context, dstObj *Object, srcObj *Object, newInfo *
if err != nil {
return err
}
return up.Upload(ctx)
return up.Copy(ctx)
}
dstBucket, dstPath := dstObj.split()
@ -1861,11 +1865,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if err != nil {
return err
}
if size == -1 {
if size < 0 {
// Check if the file is large enough for a chunked upload (needs to be at least two chunks)
buf := o.fs.getBuf(false)
rw := o.fs.getRW(false)
n, err := io.ReadFull(in, buf)
n, err := io.CopyN(rw, in, int64(o.fs.opt.ChunkSize))
if err == nil {
bufReader := bufio.NewReader(in)
in = bufReader
@ -1876,26 +1880,28 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
fs.Debugf(o, "File is big enough for chunked streaming")
up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil)
if err != nil {
o.fs.putBuf(buf, false)
o.fs.putRW(rw)
return err
}
// NB Stream returns the buffer and token
return up.Stream(ctx, buf)
} else if err == io.EOF || err == io.ErrUnexpectedEOF {
return up.Stream(ctx, rw)
} else if err == io.EOF {
fs.Debugf(o, "File has %d bytes, which makes only one chunk. Using direct upload.", n)
defer o.fs.putBuf(buf, false)
size = int64(n)
in = bytes.NewReader(buf[:n])
defer o.fs.putRW(rw)
size = n
in = rw
} else {
o.fs.putBuf(buf, false)
o.fs.putRW(rw)
return err
}
} else if size > int64(o.fs.opt.UploadCutoff) {
up, err := o.fs.newLargeUpload(ctx, o, in, src, o.fs.opt.ChunkSize, false, nil)
if err != nil {
return err
}
return up.Upload(ctx)
_, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.fs,
Concurrency: o.fs.opt.UploadConcurrency,
OpenOptions: options,
//LeavePartsOnError: o.fs.opt.LeavePartsOnError,
})
return err
}
modTime := src.ModTime(ctx)
@ -2003,6 +2009,36 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
return o.decodeMetaDataFileInfo(&response)
}
// OpenChunkWriter returns the chunk size and a ChunkWriter
//
// Pass in the remote and the src object
// You can also use options to hint at the desired chunk size
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) {
// FIXME what if file is smaller than 1 chunk?
if f.opt.Versions {
return -1, nil, errNotWithVersions
}
if f.opt.VersionAt.IsSet() {
return -1, nil, errNotWithVersionAt
}
//size := src.Size()
// Temporary Object under construction
o := &Object{
fs: f,
remote: src.Remote(),
}
bucket, _ := o.split()
err = f.makeBucket(ctx, bucket)
if err != nil {
return -1, nil, err
}
up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil)
return int64(f.opt.ChunkSize), up, err
}
// Remove an object
func (o *Object) Remove(ctx context.Context) error {
bucket, bucketPath := o.split()
@ -2030,14 +2066,15 @@ func (o *Object) ID() string {
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Purger = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.CleanUpper = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.PublicLinker = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}
_ fs.IDer = &Object{}
_ fs.Fs = &Fs{}
_ fs.Purger = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.CleanUpper = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.PublicLinker = &Fs{}
_ fs.OpenChunkWriter = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}
_ fs.IDer = &Object{}
)

View file

@ -5,7 +5,6 @@
package b2
import (
"bytes"
"context"
"crypto/sha1"
"encoding/hex"
@ -14,7 +13,6 @@ import (
"io"
"strings"
"sync"
"time"
"github.com/rclone/rclone/backend/b2/api"
"github.com/rclone/rclone/fs"
@ -80,7 +78,8 @@ type largeUpload struct {
wrap accounting.WrapFn // account parts being transferred
id string // ID of the file being uploaded
size int64 // total size
parts int64 // calculated number of parts, if known
parts int // calculated number of parts, if known
sha1smu sync.Mutex // mutex to protect sha1s
sha1s []string // slice of SHA1s for each part
uploadMu sync.Mutex // lock for upload variable
uploads []*api.GetUploadPartURLResponse // result of get upload URL calls
@ -93,18 +92,16 @@ type largeUpload struct {
// If newInfo is set then metadata from that will be used instead of reading it from src
func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs.ObjectInfo, defaultChunkSize fs.SizeSuffix, doCopy bool, newInfo *api.File) (up *largeUpload, err error) {
size := src.Size()
parts := int64(0)
sha1SliceSize := int64(maxParts)
parts := 0
chunkSize := defaultChunkSize
if size == -1 {
fs.Debugf(o, "Streaming upload with --b2-chunk-size %s allows uploads of up to %s and will fail only when that limit is reached.", f.opt.ChunkSize, maxParts*f.opt.ChunkSize)
} else {
chunkSize = chunksize.Calculator(o, size, maxParts, defaultChunkSize)
parts = size / int64(chunkSize)
parts = int(size / int64(chunkSize))
if size%int64(chunkSize) != 0 {
parts++
}
sha1SliceSize = parts
}
opts := rest.Opts{
@ -152,7 +149,7 @@ func (f *Fs) newLargeUpload(ctx context.Context, o *Object, in io.Reader, src fs
id: response.ID,
size: size,
parts: parts,
sha1s: make([]string, sha1SliceSize),
sha1s: make([]string, 0, 16),
chunkSize: int64(chunkSize),
}
// unwrap the accounting from the input, we use wrap to put it
@ -203,10 +200,32 @@ func (up *largeUpload) returnUploadURL(upload *api.GetUploadPartURLResponse) {
up.uploadMu.Unlock()
}
// Transfer a chunk
func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byte) error {
err := up.f.pacer.Call(func() (bool, error) {
fs.Debugf(up.o, "Sending chunk %d length %d", part, len(body))
// Add an sha1 to the being built up sha1s
func (up *largeUpload) addSha1(chunkNumber int, sha1 string) {
up.sha1smu.Lock()
defer up.sha1smu.Unlock()
if len(up.sha1s) < chunkNumber+1 {
up.sha1s = append(up.sha1s, make([]string, chunkNumber+1-len(up.sha1s))...)
}
up.sha1s[chunkNumber] = sha1
}
// WriteChunk will write chunk number with reader bytes, where chunk number >= 0
func (up *largeUpload) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (size int64, err error) {
err = up.f.pacer.Call(func() (bool, error) {
// Discover the size by seeking to the end
size, err = reader.Seek(0, io.SeekEnd)
if err != nil {
return false, err
}
// rewind the reader on retry and after reading size
_, err = reader.Seek(0, io.SeekStart)
if err != nil {
return false, err
}
fs.Debugf(up.o, "Sending chunk %d length %d", chunkNumber, size)
// Get upload URL
upload, err := up.getUploadURL(ctx)
@ -214,8 +233,8 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt
return false, err
}
in := newHashAppendingReader(bytes.NewReader(body), sha1.New())
size := int64(len(body)) + int64(in.AdditionalLength())
in := newHashAppendingReader(reader, sha1.New())
size += int64(in.AdditionalLength())
// Authorization
//
@ -245,7 +264,7 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt
Body: up.wrap(in),
ExtraHeaders: map[string]string{
"Authorization": upload.AuthorizationToken,
"X-Bz-Part-Number": fmt.Sprintf("%d", part),
"X-Bz-Part-Number": fmt.Sprintf("%d", chunkNumber+1),
sha1Header: "hex_digits_at_end",
},
ContentLength: &size,
@ -256,7 +275,7 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt
resp, err := up.f.srv.CallJSON(ctx, &opts, nil, &response)
retry, err := up.f.shouldRetry(ctx, resp, err)
if err != nil {
fs.Debugf(up.o, "Error sending chunk %d (retry=%v): %v: %#v", part, retry, err, err)
fs.Debugf(up.o, "Error sending chunk %d (retry=%v): %v: %#v", chunkNumber, retry, err, err)
}
// On retryable error clear PartUploadURL
if retry {
@ -264,30 +283,30 @@ func (up *largeUpload) transferChunk(ctx context.Context, part int64, body []byt
upload = nil
}
up.returnUploadURL(upload)
up.sha1s[part-1] = in.HexSum()
up.addSha1(chunkNumber, in.HexSum())
return retry, err
})
if err != nil {
fs.Debugf(up.o, "Error sending chunk %d: %v", part, err)
fs.Debugf(up.o, "Error sending chunk %d: %v", chunkNumber, err)
} else {
fs.Debugf(up.o, "Done sending chunk %d", part)
fs.Debugf(up.o, "Done sending chunk %d", chunkNumber)
}
return err
return size, err
}
// Copy a chunk
func (up *largeUpload) copyChunk(ctx context.Context, part int64, partSize int64) error {
func (up *largeUpload) copyChunk(ctx context.Context, part int, partSize int64) error {
err := up.f.pacer.Call(func() (bool, error) {
fs.Debugf(up.o, "Copying chunk %d length %d", part, partSize)
opts := rest.Opts{
Method: "POST",
Path: "/b2_copy_part",
}
offset := (part - 1) * up.chunkSize // where we are in the source file
offset := int64(part) * up.chunkSize // where we are in the source file
var request = api.CopyPartRequest{
SourceID: up.src.id,
LargeFileID: up.id,
PartNumber: part,
PartNumber: int64(part + 1),
Range: fmt.Sprintf("bytes=%d-%d", offset, offset+partSize-1),
}
var response api.UploadPartResponse
@ -296,7 +315,7 @@ func (up *largeUpload) copyChunk(ctx context.Context, part int64, partSize int64
if err != nil {
fs.Debugf(up.o, "Error copying chunk %d (retry=%v): %v: %#v", part, retry, err, err)
}
up.sha1s[part-1] = response.SHA1
up.addSha1(part, response.SHA1)
return retry, err
})
if err != nil {
@ -307,8 +326,8 @@ func (up *largeUpload) copyChunk(ctx context.Context, part int64, partSize int64
return err
}
// finish closes off the large upload
func (up *largeUpload) finish(ctx context.Context) error {
// Close closes off the large upload
func (up *largeUpload) Close(ctx context.Context) error {
fs.Debugf(up.o, "Finishing large file %s with %d parts", up.what, up.parts)
opts := rest.Opts{
Method: "POST",
@ -329,8 +348,8 @@ func (up *largeUpload) finish(ctx context.Context) error {
return up.o.decodeMetaDataFileInfo(&response)
}
// cancel aborts the large upload
func (up *largeUpload) cancel(ctx context.Context) error {
// Abort aborts the large upload
func (up *largeUpload) Abort(ctx context.Context) error {
fs.Debugf(up.o, "Cancelling large file %s", up.what)
opts := rest.Opts{
Method: "POST",
@ -355,157 +374,98 @@ func (up *largeUpload) cancel(ctx context.Context) error {
// reaches EOF.
//
// Note that initialUploadBlock must be returned to f.putBuf()
func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock []byte) (err error) {
defer atexit.OnError(&err, func() { _ = up.cancel(ctx) })()
func (up *largeUpload) Stream(ctx context.Context, initialUploadBlock *pool.RW) (err error) {
defer atexit.OnError(&err, func() { _ = up.Abort(ctx) })()
fs.Debugf(up.o, "Starting streaming of large file (id %q)", up.id)
var (
g, gCtx = errgroup.WithContext(ctx)
hasMoreParts = true
)
up.size = int64(len(initialUploadBlock))
g.Go(func() error {
for part := int64(1); hasMoreParts; part++ {
// Get a block of memory from the pool and token which limits concurrency.
var buf []byte
if part == 1 {
buf = initialUploadBlock
} else {
buf = up.f.getBuf(false)
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
up.f.putBuf(buf, false)
return nil
}
// Read the chunk
var n int
if part == 1 {
n = len(buf)
} else {
n, err = io.ReadFull(up.in, buf)
if err == io.ErrUnexpectedEOF {
fs.Debugf(up.o, "Read less than a full chunk, making this the last one.")
buf = buf[:n]
hasMoreParts = false
} else if err == io.EOF {
fs.Debugf(up.o, "Could not read any more bytes, previous chunk was the last.")
up.f.putBuf(buf, false)
return nil
} else if err != nil {
// other kinds of errors indicate failure
up.f.putBuf(buf, false)
return err
}
}
// Keep stats up to date
up.parts = part
up.size += int64(n)
if part > maxParts {
up.f.putBuf(buf, false)
return fmt.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts)
}
part := part // for the closure
g.Go(func() (err error) {
defer up.f.putBuf(buf, false)
return up.transferChunk(gCtx, part, buf)
})
up.size = initialUploadBlock.Size()
for part := 0; hasMoreParts; part++ {
// Get a block of memory from the pool and token which limits concurrency.
var rw *pool.RW
if part == 1 {
rw = initialUploadBlock
} else {
rw = up.f.getRW(false)
}
return nil
})
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
up.f.putRW(rw)
break
}
// Read the chunk
var n int64
if part == 1 {
n = rw.Size()
} else {
n, err = io.CopyN(rw, up.in, up.chunkSize)
if err == io.EOF {
fs.Debugf(up.o, "Read less than a full chunk, making this the last one.")
hasMoreParts = false
} else if err != nil {
// other kinds of errors indicate failure
up.f.putRW(rw)
return err
}
}
// Keep stats up to date
up.parts = part
up.size += n
if part > maxParts {
up.f.putRW(rw)
return fmt.Errorf("%q too big (%d bytes so far) makes too many parts %d > %d - increase --b2-chunk-size", up.o, up.size, up.parts, maxParts)
}
part := part // for the closure
g.Go(func() (err error) {
defer up.f.putRW(rw)
_, err = up.WriteChunk(gCtx, part, rw)
return err
})
}
err = g.Wait()
if err != nil {
return err
}
up.sha1s = up.sha1s[:up.parts]
return up.finish(ctx)
return up.Close(ctx)
}
// Upload uploads the chunks from the input
func (up *largeUpload) Upload(ctx context.Context) (err error) {
defer atexit.OnError(&err, func() { _ = up.cancel(ctx) })()
// Copy the chunks from the source to the destination
func (up *largeUpload) Copy(ctx context.Context) (err error) {
defer atexit.OnError(&err, func() { _ = up.Abort(ctx) })()
fs.Debugf(up.o, "Starting %s of large file in %d chunks (id %q)", up.what, up.parts, up.id)
var (
g, gCtx = errgroup.WithContext(ctx)
remaining = up.size
uploadPool *pool.Pool
ci = fs.GetConfig(ctx)
g, gCtx = errgroup.WithContext(ctx)
remaining = up.size
)
// If using large chunk size then make a temporary pool
if up.chunkSize <= int64(up.f.opt.ChunkSize) {
uploadPool = up.f.pool
} else {
uploadPool = pool.New(
time.Duration(up.f.opt.MemoryPoolFlushTime),
int(up.chunkSize),
ci.Transfers,
up.f.opt.MemoryPoolUseMmap,
)
defer uploadPool.Flush()
}
// Get an upload token and a buffer
getBuf := func() (buf []byte) {
up.f.getBuf(true)
if !up.doCopy {
buf = uploadPool.Get()
g.SetLimit(up.f.opt.UploadConcurrency)
for part := 0; part <= up.parts; part++ {
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in copying all the other parts.
if gCtx.Err() != nil {
break
}
return buf
}
// Put an upload token and a buffer
putBuf := func(buf []byte) {
if !up.doCopy {
uploadPool.Put(buf)
reqSize := remaining
if reqSize >= up.chunkSize {
reqSize = up.chunkSize
}
up.f.putBuf(nil, true)
part := part // for the closure
g.Go(func() (err error) {
return up.copyChunk(gCtx, part, reqSize)
})
remaining -= reqSize
}
g.Go(func() error {
for part := int64(1); part <= up.parts; part++ {
// Get a block of memory from the pool and token which limits concurrency.
buf := getBuf()
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
putBuf(buf)
return nil
}
reqSize := remaining
if reqSize >= up.chunkSize {
reqSize = up.chunkSize
}
if !up.doCopy {
// Read the chunk
buf = buf[:reqSize]
_, err = io.ReadFull(up.in, buf)
if err != nil {
putBuf(buf)
return err
}
}
part := part // for the closure
g.Go(func() (err error) {
defer putBuf(buf)
if !up.doCopy {
err = up.transferChunk(gCtx, part, buf)
} else {
err = up.copyChunk(gCtx, part, reqSize)
}
return err
})
remaining -= reqSize
}
return nil
})
err = g.Wait()
if err != nil {
return err
}
return up.finish(ctx)
return up.Close(ctx)
}