From 2db0e23584317fd580a871b888a06e7e29857ad9 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 1 Sep 2023 17:25:15 +0100 Subject: [PATCH] backends: change OpenChunkWriter interface to allow backend concurrency override Before this change the concurrency used for an upload was rather inconsistent. - if size below `--backend-upload-cutoff` (default 200M) do single part upload. - if size below `--multi-thread-cutoff` (default 256M) or using streaming uploads (eg `rclone rcat) do multipart upload using `--backend-upload-concurrency` to set the concurrency used by the uploader. - otherwise do multipart upload using `--multi-thread-streams` to set the concurrency. This change makes the default for the concurrency used be the `--backend-upload-concurrency`. If `--multi-thread-streams` is set and larger than the `--backend-upload-concurrency` then that will be used instead. This means that if the user sets `--backend-upload-concurrency` then it will be obeyed for all multipart/multi-thread transfers and the user can override them all with `--multi-thread-streams`. See: #7056 --- backend/azureblob/azureblob.go | 15 +++--- backend/b2/b2.go | 17 ++++--- backend/oracleobjectstorage/multipart.go | 19 +++++--- backend/s3/s3.go | 20 ++++---- docs/content/docs.md | 38 +++++++++------ fs/features.go | 13 +++-- fs/operations/multithread.go | 62 +++++++++++++++--------- lib/multipart/multipart.go | 22 ++++----- 8 files changed, 124 insertions(+), 82 deletions(-) diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index 4e36c09cb..48915d6e9 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -1986,7 +1986,7 @@ type azChunkWriter struct { // // Pass in the remote and the src object // You can also use options to hint at the desired chunk size -func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { // Temporary Object under construction o := &Object{ fs: f, @@ -1994,7 +1994,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn } ui, err := o.prepareUpload(ctx, src, options) if err != nil { - return -1, nil, fmt.Errorf("failed to prepare upload: %w", err) + return info, nil, fmt.Errorf("failed to prepare upload: %w", err) } // Calculate correct partSize @@ -2020,7 +2020,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn } else { partSize = chunksize.Calculator(remote, size, blockblob.MaxBlocks, f.opt.ChunkSize) if partSize > fs.SizeSuffix(blockblob.MaxStageBlockBytes) { - return -1, nil, fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes)) + return info, nil, fmt.Errorf("can't upload as it is too big %v - takes more than %d chunks of %v", fs.SizeSuffix(size), fs.SizeSuffix(blockblob.MaxBlocks), fs.SizeSuffix(blockblob.MaxStageBlockBytes)) } totalParts = int(fs.SizeSuffix(size) / partSize) if fs.SizeSuffix(size)%partSize != 0 { @@ -2037,8 +2037,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn ui: ui, o: o, } + info = fs.ChunkWriterInfo{ + ChunkSize: int64(partSize), + Concurrency: o.fs.opt.UploadConcurrency, + //LeavePartsOnError: o.fs.opt.LeavePartsOnError, + } fs.Debugf(o, "open chunk writer: started multipart upload") - return int64(partSize), chunkWriter, nil + return info, chunkWriter, nil } // WriteChunk will write chunk number with reader bytes, where chunk number >= 0 @@ -2165,9 +2170,7 @@ var warnStreamUpload sync.Once func (o *Object) uploadMultipart(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (ui uploadInfo, err error) { chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ Open: o.fs, - Concurrency: o.fs.opt.UploadConcurrency, OpenOptions: options, - //LeavePartsOnError: o.fs.opt.LeavePartsOnError, }) if err != nil { return ui, err diff --git a/backend/b2/b2.go b/backend/b2/b2.go index 726c8ffa3..d11233517 100644 --- a/backend/b2/b2.go +++ b/backend/b2/b2.go @@ -1897,9 +1897,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } else if size > int64(o.fs.opt.UploadCutoff) { _, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ Open: o.fs, - Concurrency: o.fs.opt.UploadConcurrency, OpenOptions: options, - //LeavePartsOnError: o.fs.opt.LeavePartsOnError, }) return err } @@ -2013,13 +2011,13 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // // Pass in the remote and the src object // You can also use options to hint at the desired chunk size -func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { // FIXME what if file is smaller than 1 chunk? if f.opt.Versions { - return -1, nil, errNotWithVersions + return info, nil, errNotWithVersions } if f.opt.VersionAt.IsSet() { - return -1, nil, errNotWithVersionAt + return info, nil, errNotWithVersionAt } //size := src.Size() @@ -2032,11 +2030,16 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn bucket, _ := o.split() err = f.makeBucket(ctx, bucket) if err != nil { - return -1, nil, err + return info, nil, err } + info = fs.ChunkWriterInfo{ + ChunkSize: int64(f.opt.ChunkSize), + Concurrency: o.fs.opt.UploadConcurrency, + //LeavePartsOnError: o.fs.opt.LeavePartsOnError, + } up, err := f.newLargeUpload(ctx, o, nil, src, f.opt.ChunkSize, false, nil) - return int64(f.opt.ChunkSize), up, err + return info, up, err } // Remove an object diff --git a/backend/oracleobjectstorage/multipart.go b/backend/oracleobjectstorage/multipart.go index 892f51acc..8555e203a 100644 --- a/backend/oracleobjectstorage/multipart.go +++ b/backend/oracleobjectstorage/multipart.go @@ -53,10 +53,8 @@ type objectChunkWriter struct { func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) error { _, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ - Open: o.fs, - Concurrency: o.fs.opt.UploadConcurrency, - LeavePartsOnError: o.fs.opt.LeavePartsOnError, - OpenOptions: options, + Open: o.fs, + OpenOptions: options, }) return err } @@ -69,7 +67,7 @@ func (f *Fs) OpenChunkWriter( ctx context.Context, remote string, src fs.ObjectInfo, - options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { + options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { // Temporary Object under construction o := &Object{ fs: f, @@ -77,7 +75,7 @@ func (f *Fs) OpenChunkWriter( } ui, err := o.prepareUpload(ctx, src, options) if err != nil { - return -1, nil, fmt.Errorf("failed to prepare upload: %w", err) + return info, nil, fmt.Errorf("failed to prepare upload: %w", err) } uploadParts := f.opt.MaxUploadParts @@ -105,7 +103,7 @@ func (f *Fs) OpenChunkWriter( uploadID, existingParts, err := o.createMultipartUpload(ctx, ui.req) if err != nil { - return -1, nil, fmt.Errorf("create multipart upload request failed: %w", err) + return info, nil, fmt.Errorf("create multipart upload request failed: %w", err) } bucketName, bucketPath := o.split() chunkWriter := &objectChunkWriter{ @@ -119,8 +117,13 @@ func (f *Fs) OpenChunkWriter( ui: ui, o: o, } + info = fs.ChunkWriterInfo{ + ChunkSize: int64(chunkSize), + Concurrency: o.fs.opt.UploadConcurrency, + LeavePartsOnError: o.fs.opt.LeavePartsOnError, + } fs.Debugf(o, "open chunk writer: started multipart upload: %v", uploadID) - return int64(chunkSize), chunkWriter, err + return info, chunkWriter, err } // WriteChunk will write chunk number with reader bytes, where chunk number >= 0 diff --git a/backend/s3/s3.go b/backend/s3/s3.go index d30657e25..2ec6ce98e 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -3115,7 +3115,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e if err != nil { return nil, err } - fs.Debugf(nil, "name = %q, root = %q, opt = %#v", name, root, opt) err = checkUploadChunkSize(opt.ChunkSize) if err != nil { return nil, fmt.Errorf("s3: chunk size: %w", err) @@ -5317,7 +5316,7 @@ type s3ChunkWriter struct { // // Pass in the remote and the src object // You can also use options to hint at the desired chunk size -func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { // Temporary Object under construction o := &Object{ fs: f, @@ -5325,7 +5324,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn } ui, err := o.prepareUpload(ctx, src, options) if err != nil { - return -1, nil, fmt.Errorf("failed to prepare upload: %w", err) + return info, nil, fmt.Errorf("failed to prepare upload: %w", err) } //structs.SetFrom(&mReq, req) @@ -5361,7 +5360,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn return f.shouldRetry(ctx, err) }) if err != nil { - return -1, nil, fmt.Errorf("create multipart upload failed: %w", err) + return info, nil, fmt.Errorf("create multipart upload failed: %w", err) } chunkWriter := &s3ChunkWriter{ @@ -5376,8 +5375,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn ui: ui, o: o, } + info = fs.ChunkWriterInfo{ + ChunkSize: int64(chunkSize), + Concurrency: o.fs.opt.UploadConcurrency, + LeavePartsOnError: o.fs.opt.LeavePartsOnError, + } fs.Debugf(o, "open chunk writer: started multipart upload: %v", *mOut.UploadId) - return int64(chunkSize), chunkWriter, err + return info, chunkWriter, err } // add a part number and etag to the completed parts @@ -5527,10 +5531,8 @@ func (w *s3ChunkWriter) Close(ctx context.Context) (err error) { func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) (wantETag, gotETag string, versionID *string, ui uploadInfo, err error) { chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ - Open: o.fs, - Concurrency: o.fs.opt.UploadConcurrency, - LeavePartsOnError: o.fs.opt.LeavePartsOnError, - OpenOptions: options, + Open: o.fs, + OpenOptions: options, }) if err != nil { return wantETag, gotETag, versionID, ui, err diff --git a/docs/content/docs.md b/docs/content/docs.md index 942c6eac9..367c3da86 100644 --- a/docs/content/docs.md +++ b/docs/content/docs.md @@ -1521,12 +1521,12 @@ This command line flag allows you to override that computed default. ### --multi-thread-write-buffer-size=SIZE ### -When downloading with multiple threads, rclone will buffer SIZE bytes in -memory before writing to disk for each thread. +When transferring with multiple threads, rclone will buffer SIZE bytes +in memory before writing to disk for each thread. This can improve performance if the underlying filesystem does not deal well with a lot of small writes in different positions of the file, so -if you see downloads being limited by disk write speed, you might want +if you see transfers being limited by disk write speed, you might want to experiment with different values. Specially for magnetic drives and remote file systems a higher value can be useful. @@ -1540,22 +1540,23 @@ multiples of 16k performed much better than other values. ### --multi-thread-chunk-size=SizeSuffix ### -Normally the chunk size for multi thread copies is set by the backend. -However some backends such as `local` and `smb` (which implement -`OpenWriterAt` but not `OpenChunkWriter`) don't have a natural chunk -size. +Normally the chunk size for multi thread transfers is set by the backend. +However some backends such as `local` and `smb` (which implement `OpenWriterAt` +but not `OpenChunkWriter`) don't have a natural chunk size. In this case the value of this option is used (default 64Mi). ### --multi-thread-cutoff=SIZE ### -When transferring files to capable backends above this size, rclone -will use multiple threads to download the file (default 256M). +When transferring files above SIZE to capable backends, rclone will +use multiple threads to transfer the file (default 256M). Capable backends are marked in the [overview](/overview/#optional-features) as `MultithreadUpload`. (They -need to implement either `OpenWriterAt` or `OpenChunkedWriter`). These -include include, `local`, `s3`, `azureblob`, `b2` and `smb`. +need to implement either the `OpenWriterAt` or `OpenChunkedWriter` +internal interfaces). These include include, `local`, `s3`, +`azureblob`, `b2`, `oracleobjectstorage` and `smb` at the time of +writing. On the local disk, rclone preallocates the file (using `fallocate(FALLOC_FL_KEEP_SIZE)` on unix or `NTSetInformationFile` on @@ -1574,8 +1575,8 @@ This will work with the `sync`/`copy`/`move` commands and friends mount` and `rclone serve` if `--vfs-cache-mode` is set to `writes` or above. -**NB** that this **only** works supported backends as the destination -but will work with any backend as the source. +**NB** that this **only** works with supported backends as the +destination but will work with any backend as the source. **NB** that multi-thread copies are disabled for local to local copies as they are faster without unless `--multi-thread-streams` is set @@ -1584,14 +1585,19 @@ explicitly. **NB** on Windows using multi-thread transfers to the local disk will cause the resulting files to be [sparse](https://en.wikipedia.org/wiki/Sparse_file). Use `--local-no-sparse` to disable sparse files (which may cause long -delays at the start of downloads) or disable multi-thread downloads +delays at the start of transfers) or disable multi-thread transfers with `--multi-thread-streams 0` ### --multi-thread-streams=N ### -When using multi thread downloads (see above `--multi-thread-cutoff`) +When using multi thread transfers (see above `--multi-thread-cutoff`) this sets the number of streams to use. Set to `0` to disable multi -thread downloads (Default 4). +thread transfers (Default 4). + +If the backend has a `--backend-upload-concurrency` setting (eg +`--s3-upload-concurrency`) then this setting will be used as the +number of transfers instead if it is larger than the value of +`--multi-thread-streams` or `--multi-thread-streams` isn't set. ### --no-check-dest ### diff --git a/fs/features.go b/fs/features.go index 46e7ef424..82b8226c7 100644 --- a/fs/features.go +++ b/fs/features.go @@ -155,7 +155,7 @@ type Features struct { // Pass in the remote and the src object // You can also use options to hint at the desired chunk size // - OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (chunkSize int64, writer ChunkWriter, err error) + OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error) // UserInfo returns info about the connected user UserInfo func(ctx context.Context) (map[string]string, error) @@ -639,17 +639,24 @@ type OpenWriterAter interface { // OpenWriterAtFn describes the OpenWriterAt function pointer type OpenWriterAtFn func(ctx context.Context, remote string, size int64) (WriterAtCloser, error) +// ChunkWriterInfo describes how a backend would like ChunkWriter called +type ChunkWriterInfo struct { + ChunkSize int64 // preferred chunk size + Concurrency int // how many chunks to write at once + LeavePartsOnError bool // if set don't delete parts uploaded so far on error +} + // OpenChunkWriter is an option interface for Fs to implement chunked writing type OpenChunkWriter interface { // OpenChunkWriter returns the chunk size and a ChunkWriter // // Pass in the remote and the src object // You can also use options to hint at the desired chunk size - OpenChunkWriter(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (chunkSize int64, writer ChunkWriter, err error) + OpenChunkWriter(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error) } // OpenChunkWriterFn describes the OpenChunkWriter function pointer -type OpenChunkWriterFn func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (chunkSize int64, writer ChunkWriter, err error) +type OpenChunkWriterFn func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error) // ChunkWriter is returned by OpenChunkWriter to implement chunked writing type ChunkWriter interface { diff --git a/fs/operations/multithread.go b/fs/operations/multithread.go index 946fd6a29..e4b75c62c 100644 --- a/fs/operations/multithread.go +++ b/fs/operations/multithread.go @@ -58,7 +58,6 @@ type multiThreadCopyState struct { size int64 src fs.Object acc *accounting.Account - streams int numChunks int noSeek bool // set if sure the receiving fs won't seek the input } @@ -128,7 +127,7 @@ func calculateNumChunks(size int64, chunkSize int64) int { // Copy src to (f, remote) using streams download threads. It tries to use the OpenChunkWriter feature // and if that's not available it creates an adapter using OpenWriterAt -func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, streams int, tr *accounting.Transfer) (newDst fs.Object, err error) { +func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, concurrency int, tr *accounting.Transfer) (newDst fs.Object, err error) { openChunkWriter := f.Features().OpenChunkWriter ci := fs.GetConfig(ctx) noseek := false @@ -149,47 +148,61 @@ func multiThreadCopy(ctx context.Context, f fs.Fs, remote string, src fs.Object, return nil, fmt.Errorf("multi-thread copy: can't copy zero sized file") } - g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(streams) - - chunkSize, chunkWriter, err := openChunkWriter(ctx, remote, src) + info, chunkWriter, err := openChunkWriter(ctx, remote, src) if err != nil { return nil, fmt.Errorf("multi-thread copy: failed to open chunk writer: %w", err) } + uploadCtx, cancel := context.WithCancel(ctx) + defer cancel() defer atexit.OnError(&err, func() { - fs.Debugf(src, "multi-thread copy: aborting transfer on exit") + cancel() + if info.LeavePartsOnError { + return + } + fs.Debugf(src, "multi-thread copy: cancelling transfer on exit") abortErr := chunkWriter.Abort(ctx) if abortErr != nil { fs.Debugf(src, "multi-thread copy: abort failed: %v", abortErr) } })() - if chunkSize > src.Size() { - fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(chunkSize), fs.SizeSuffix(src.Size())) - chunkSize = src.Size() + if info.ChunkSize > src.Size() { + fs.Debugf(src, "multi-thread copy: chunk size %v was bigger than source file size %v", fs.SizeSuffix(info.ChunkSize), fs.SizeSuffix(src.Size())) + info.ChunkSize = src.Size() } - numChunks := calculateNumChunks(src.Size(), chunkSize) - if streams > numChunks { - fs.Debugf(src, "multi-thread copy: number of streams %d was bigger than number of chunks %d", streams, numChunks) - streams = numChunks + numChunks := calculateNumChunks(src.Size(), info.ChunkSize) + if concurrency > numChunks { + fs.Debugf(src, "multi-thread copy: number of streams %d was bigger than number of chunks %d", concurrency, numChunks) + concurrency = numChunks } + // Use the backend concurrency if it is higher than --multi-thread-streams or if --multi-thread-streams wasn't set explicitly + if !ci.MultiThreadSet || info.Concurrency > concurrency { + fs.Debugf(src, "multi-thread copy: using backend concurrency of %d instead of --multi-thread-streams %d", info.Concurrency, concurrency) + concurrency = info.Concurrency + } + if concurrency < 1 { + concurrency = 1 + } + + g, gCtx := errgroup.WithContext(uploadCtx) + g.SetLimit(concurrency) + mc := &multiThreadCopyState{ ctx: gCtx, size: src.Size(), src: src, - partSize: chunkSize, - streams: streams, + partSize: info.ChunkSize, numChunks: numChunks, noSeek: noseek, } // Make accounting - mc.acc = tr.Account(ctx, nil) + mc.acc = tr.Account(gCtx, nil) - fs.Debugf(src, "Starting multi-thread copy with %d chunks of size %v with %v parallel streams", mc.numChunks, fs.SizeSuffix(mc.partSize), mc.streams) + fs.Debugf(src, "Starting multi-thread copy with %d chunks of size %v with %v parallel streams", mc.numChunks, fs.SizeSuffix(mc.partSize), concurrency) for chunk := 0; chunk < mc.numChunks; chunk++ { // Fail fast, in case an errgroup managed function returns an error if gCtx.Err() != nil { @@ -307,10 +320,12 @@ func (w writerAtChunkWriter) Abort(ctx context.Context) error { // openChunkWriterFromOpenWriterAt adapts an OpenWriterAtFn into an OpenChunkWriterFn using chunkSize and writeBufferSize func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize int64, writeBufferSize int64, f fs.Fs) fs.OpenChunkWriterFn { - return func(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { + return func(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { + ci := fs.GetConfig(ctx) + writerAt, err := openWriterAt(ctx, remote, src.Size()) if err != nil { - return -1, nil, err + return info, nil, err } if writeBufferSize > 0 { @@ -326,7 +341,10 @@ func openChunkWriterFromOpenWriterAt(openWriterAt fs.OpenWriterAtFn, chunkSize i writeBufferSize: writeBufferSize, f: f, } - - return chunkSize, chunkWriter, nil + info = fs.ChunkWriterInfo{ + ChunkSize: chunkSize, + Concurrency: ci.MultiThreadStreams, + } + return info, chunkWriter, nil } } diff --git a/lib/multipart/multipart.go b/lib/multipart/multipart.go index e584041ef..f96db4533 100644 --- a/lib/multipart/multipart.go +++ b/lib/multipart/multipart.go @@ -45,10 +45,8 @@ func NewRW() *pool.RW { // UploadMultipartOptions options for the generic multipart upload type UploadMultipartOptions struct { - Open fs.OpenChunkWriter // thing to call OpenChunkWriter on - OpenOptions []fs.OpenOption // options for OpenChunkWriter - Concurrency int // number of simultaneous uploads to do - LeavePartsOnError bool // if set don't delete parts uploaded so far on error + Open fs.OpenChunkWriter // thing to call OpenChunkWriter on + OpenOptions []fs.OpenOption // options for OpenChunkWriter } // UploadMultipart does a generic multipart upload from src using f as OpenChunkWriter. @@ -57,22 +55,23 @@ type UploadMultipartOptions struct { // // It returns the chunkWriter used in case the caller needs to extract any private info from it. func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt UploadMultipartOptions) (chunkWriterOut fs.ChunkWriter, err error) { - chunkSize, chunkWriter, err := opt.Open.OpenChunkWriter(ctx, src.Remote(), src, opt.OpenOptions...) + info, chunkWriter, err := opt.Open.OpenChunkWriter(ctx, src.Remote(), src, opt.OpenOptions...) if err != nil { return nil, fmt.Errorf("multipart upload failed to initialise: %w", err) } // make concurrency machinery - concurrency := opt.Concurrency + concurrency := info.Concurrency if concurrency < 1 { concurrency = 1 } tokens := pacer.NewTokenDispenser(concurrency) uploadCtx, cancel := context.WithCancel(ctx) + defer cancel() defer atexit.OnError(&err, func() { cancel() - if opt.LeavePartsOnError { + if info.LeavePartsOnError { return } fs.Debugf(src, "Cancelling multipart upload") @@ -83,10 +82,11 @@ func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt U })() var ( - g, gCtx = errgroup.WithContext(uploadCtx) - finished = false - off int64 - size = src.Size() + g, gCtx = errgroup.WithContext(uploadCtx) + finished = false + off int64 + size = src.Size() + chunkSize = info.ChunkSize ) // Do the accounting manually