From 4c76fac5942bb94f906d5f61b6b41f51c83a8def Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 15 Aug 2023 20:38:02 +0100 Subject: [PATCH] s3: factor generic multipart upload into lib/multipart #7056 This makes the memory controls of the s3 backend inoperative and replaced with the global ones. --s3-memory-pool-flush-time --s3-memory-pool-use-mmap By using the buffered reader this fixes excessive memory use when uploading large files as it will share memory pages between all readers. Fixes #7141 --- backend/s3/s3.go | 353 ++++++++++++++----------------------- lib/multipart/multipart.go | 144 +++++++++++++++ 2 files changed, 274 insertions(+), 223 deletions(-) create mode 100644 lib/multipart/multipart.go diff --git a/backend/s3/s3.go b/backend/s3/s3.go index c5722b85e..c5ca85db7 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -53,13 +53,12 @@ import ( "github.com/rclone/rclone/lib/atexit" "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/encoder" + "github.com/rclone/rclone/lib/multipart" "github.com/rclone/rclone/lib/pacer" - "github.com/rclone/rclone/lib/pool" "github.com/rclone/rclone/lib/readers" "github.com/rclone/rclone/lib/rest" "github.com/rclone/rclone/lib/version" "golang.org/x/net/http/httpguts" - "golang.org/x/sync/errgroup" ) // Register with Fs @@ -2279,17 +2278,16 @@ very small even with this flag. encoder.EncodeDot, }, { Name: "memory_pool_flush_time", - Default: memoryPoolFlushTime, + Default: fs.Duration(time.Minute), Advanced: true, - Help: `How often internal memory buffer pools will be flushed. - -Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations. -This option controls how often unused buffers will be removed from the pool.`, + Hide: fs.OptionHideBoth, + Help: `How often internal memory buffer pools will be flushed. (no longer used)`, }, { Name: "memory_pool_use_mmap", - Default: memoryPoolUseMmap, + Default: false, Advanced: true, - Help: `Whether to use mmap buffers in internal memory pool.`, + Hide: fs.OptionHideBoth, + Help: `Whether to use mmap buffers in internal memory pool. (no longer used)`, }, { Name: "disable_http2", Default: false, @@ -2440,10 +2438,7 @@ const ( minChunkSize = fs.SizeSuffix(1024 * 1024 * 5) defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024) maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024) - minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. - - memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long - memoryPoolUseMmap = false + minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep. maxExpireDuration = fs.Duration(7 * 24 * time.Hour) // max expiry is 1 week ) @@ -2543,8 +2538,6 @@ type Options struct { NoHead bool `config:"no_head"` NoHeadObject bool `config:"no_head_object"` Enc encoder.MultiEncoder `config:"encoding"` - MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"` - MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"` DisableHTTP2 bool `config:"disable_http2"` DownloadURL string `config:"download_url"` DirectoryMarkers bool `config:"directory_markers"` @@ -2574,7 +2567,6 @@ type Fs struct { pacer *fs.Pacer // To pace the API calls srv *http.Client // a plain http client srvRest *rest.Client // the rest connection to the server - pool *pool.Pool // memory pool etagIsNotMD5 bool // if set ETags are not MD5s versioningMu sync.Mutex versioning fs.Tristate // if set bucket is using versions @@ -3176,12 +3168,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e cache: bucket.NewCache(), srv: srv, srvRest: rest.NewClient(fshttp.NewClient(ctx)), - pool: pool.New( - time.Duration(opt.MemoryPoolFlushTime), - int(opt.ChunkSize), - opt.UploadConcurrency*ci.Transfers, - opt.MemoryPoolUseMmap, - ), } if opt.ServerSideEncryption == "aws:kms" || opt.SSECustomerAlgorithm != "" { // From: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html @@ -4376,19 +4362,6 @@ func (f *Fs) Hashes() hash.Set { return hash.Set(hash.MD5) } -func (f *Fs) getMemoryPool(size int64) *pool.Pool { - if size == int64(f.opt.ChunkSize) { - return f.pool - } - - return pool.New( - time.Duration(f.opt.MemoryPoolFlushTime), - int(size), - f.opt.UploadConcurrency*f.ci.Transfers, - f.opt.MemoryPoolUseMmap, - ) -} - // PublicLink generates a public link to the remote path (usually readable by anyone) func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (link string, err error) { if strings.HasSuffix(remote, "/") { @@ -5316,28 +5289,43 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read var warnStreamUpload sync.Once +// state of ChunkWriter +type s3ChunkWriter struct { + chunkSize int64 + size int64 + f *Fs + bucket *string + key *string + uploadId *string + multiPartUploadInput *s3.CreateMultipartUploadInput + completedPartsMu sync.Mutex + completedParts []*s3.CompletedPart + eTag string + versionID string + md5sMu sync.Mutex + md5s []byte + ui uploadInfo + o *Object +} + // OpenChunkWriter returns the chunk size and a ChunkWriter // // Pass in the remote and the src object // You can also use options to hint at the desired chunk size func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { - // This duplicates part of the logic in Update, however it is - // required until we migrate the MultiPartUpload to - // OpenChunkWriter/multi-thread op completely. - // Temporary Object under construction o := &Object{ fs: f, remote: remote, } - req, _, err := o.buildS3Req(ctx, src, options) + ui, err := o.prepareUpload(ctx, src, options) if err != nil { - return -1, nil, fmt.Errorf("failed to build s3 request: %w", err) + return -1, nil, fmt.Errorf("failed to prepare upload: %w", err) } //structs.SetFrom(&mReq, req) var mReq s3.CreateMultipartUploadInput - setFrom_s3CreateMultipartUploadInput_s3PutObjectInput(&mReq, req) + setFrom_s3CreateMultipartUploadInput_s3PutObjectInput(&mReq, ui.req) uploadParts := f.opt.MaxUploadParts if uploadParts < 1 { @@ -5372,7 +5360,6 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn } chunkWriter := &s3ChunkWriter{ - ctx: ctx, chunkSize: int64(chunkSize), size: size, f: f, @@ -5381,28 +5368,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn uploadId: mOut.UploadId, multiPartUploadInput: &mReq, completedParts: make([]*s3.CompletedPart, 0), + ui: ui, + o: o, } - fs.Debugf(f, "open chunk writer: started multipart upload: %v", *mOut.UploadId) + fs.Debugf(o, "open chunk writer: started multipart upload: %v", *mOut.UploadId) return int64(chunkSize), chunkWriter, err } -type s3ChunkWriter struct { - ctx context.Context - chunkSize int64 - size int64 - f *Fs - bucket *string - key *string - uploadId *string - multiPartUploadInput *s3.CreateMultipartUploadInput - completedPartsMu sync.Mutex - completedParts []*s3.CompletedPart - eTag string - versionID string - md5sMu sync.Mutex - md5s []byte -} - // add a part number and etag to the completed parts func (w *s3ChunkWriter) addCompletedPart(partNum *int64, eTag *string) { w.completedPartsMu.Lock() @@ -5437,19 +5409,17 @@ func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader // possible in AWS SDK v2 with trailers? m := md5.New() currentChunkSize, err := io.Copy(m, reader) - if err != nil && err != io.EOF { + if err != nil { return -1, err } + // If no data read, don't write the chunk + if currentChunkSize == 0 { + return 0, nil + } md5sumBinary := m.Sum([]byte{}) w.addMd5(&md5sumBinary, int64(chunkNumber)) md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) - // reset the reader after we calculated the md5 - _, err = reader.Seek(0, io.SeekStart) - if err != nil { - return -1, err - } - // S3 requires 1 <= PartNumber <= 10000 s3PartNumber := aws.Int64(int64(chunkNumber + 1)) uploadPartReq := &s3.UploadPartInput{ @@ -5467,10 +5437,15 @@ func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader } var uout *s3.UploadPartOutput err = w.f.pacer.Call(func() (bool, error) { - uout, err = w.f.c.UploadPartWithContext(w.ctx, uploadPartReq) + // rewind the reader on retry and after reading md5 + _, err = reader.Seek(0, io.SeekStart) + if err != nil { + return false, err + } + uout, err = w.f.c.UploadPartWithContext(ctx, uploadPartReq) if err != nil { if chunkNumber <= 8 { - return w.f.shouldRetry(w.ctx, err) + return w.f.shouldRetry(ctx, err) } // retry all chunks once have done the first few return true, err @@ -5483,7 +5458,7 @@ func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader w.addCompletedPart(s3PartNumber, uout.ETag) - fs.Debugf(w.f, "multipart upload wrote chunk %d with %v bytes and etag %v", chunkNumber+1, currentChunkSize, *uout.ETag) + fs.Debugf(w.o, "multipart upload wrote chunk %d with %v bytes and etag %v", chunkNumber+1, currentChunkSize, *uout.ETag) return currentChunkSize, err } @@ -5496,12 +5471,12 @@ func (w *s3ChunkWriter) Abort(ctx context.Context) error { UploadId: w.uploadId, RequestPayer: w.multiPartUploadInput.RequestPayer, }) - return w.f.shouldRetry(w.ctx, err) + return w.f.shouldRetry(ctx, err) }) if err != nil { return fmt.Errorf("failed to abort multipart upload %q: %w", *w.uploadId, err) } - fs.Debugf(w.f, "multipart upload %q aborted", *w.uploadId) + fs.Debugf(w.o, "multipart upload %q aborted", *w.uploadId) return err } @@ -5513,7 +5488,7 @@ func (w *s3ChunkWriter) Close(ctx context.Context) (err error) { }) var resp *s3.CompleteMultipartUploadOutput err = w.f.pacer.Call(func() (bool, error) { - resp, err = w.f.c.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{ + resp, err = w.f.c.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ Bucket: w.bucket, Key: w.key, MultipartUpload: &s3.CompletedMultipartUpload{ @@ -5522,7 +5497,7 @@ func (w *s3ChunkWriter) Close(ctx context.Context) (err error) { RequestPayer: w.multiPartUploadInput.RequestPayer, UploadId: w.uploadId, }) - return w.f.shouldRetry(w.ctx, err) + return w.f.shouldRetry(ctx, err) }) if err != nil { return fmt.Errorf("failed to complete multipart upload %q: %w", *w.uploadId, err) @@ -5535,94 +5510,19 @@ func (w *s3ChunkWriter) Close(ctx context.Context) (err error) { w.versionID = *resp.VersionId } } - fs.Debugf(w.f, "multipart upload %q closed", *w.uploadId) + fs.Debugf(w.o, "multipart upload %q finished", *w.uploadId) return err } -func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader) (wantETag, gotETag string, versionID *string, err error) { - f := o.fs - - // make concurrency machinery - concurrency := f.opt.UploadConcurrency - if concurrency < 1 { - concurrency = 1 - } - tokens := pacer.NewTokenDispenser(concurrency) - - chunkSize, chunkWriter, err := f.OpenChunkWriter(ctx, src.Remote(), src) +func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) (wantETag, gotETag string, versionID *string, ui uploadInfo, err error) { + chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ + Open: o.fs, + Concurrency: o.fs.opt.UploadConcurrency, + LeavePartsOnError: o.fs.opt.LeavePartsOnError, + OpenOptions: options, + }) if err != nil { - return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to initialise: %w", err) - } - memPool := f.getMemoryPool(chunkSize) - uploadCtx, cancel := context.WithCancel(ctx) - defer atexit.OnError(&err, func() { - cancel() - if o.fs.opt.LeavePartsOnError { - return - } - fs.Debugf(o, "Cancelling multipart upload") - errCancel := chunkWriter.Abort(ctx) - if errCancel != nil { - fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel) - } - })() - - var ( - g, gCtx = errgroup.WithContext(uploadCtx) - finished = false - off int64 - ) - - for partNum := int64(0); !finished; partNum++ { - // Get a block of memory from the pool and token which limits concurrency. - tokens.Get() - buf := memPool.Get() - - free := func() { - // return the memory and token - memPool.Put(buf) - tokens.Put() - } - - // Fail fast, in case an errgroup managed function returns an error - // gCtx is cancelled. There is no point in uploading all the other parts. - if gCtx.Err() != nil { - free() - break - } - - // Read the chunk - var n int - n, err = readers.ReadFill(in, buf) // this can never return 0, nil - if err == io.EOF { - if n == 0 && partNum != 0 { // end if no data and if not first chunk - free() - break - } - finished = true - } else if err != nil { - free() - return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to read source: %w", err) - } - buf = buf[:n] - - partNum := partNum - fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(src.Size())) - off += int64(n) - g.Go(func() (err error) { - defer free() - _, err = chunkWriter.WriteChunk(gCtx, int(partNum), bytes.NewReader(buf)) - return err - }) - } - err = g.Wait() - if err != nil { - return wantETag, gotETag, nil, err - } - - err = chunkWriter.Close(ctx) - if err != nil { - return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err) + return wantETag, gotETag, versionID, ui, err } var s3cw *s3ChunkWriter = chunkWriter.(*s3ChunkWriter) @@ -5632,7 +5532,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R hashOfHashes := md5.Sum(s3cw.md5s) wantETag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(s3cw.completedParts)) - return wantETag, gotETag, versionID, nil + return wantETag, gotETag, versionID, s3cw.ui, nil } // unWrapAwsError unwraps AWS errors, looking for a non AWS error @@ -5762,18 +5662,25 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P return etag, lastModified, versionID, nil } -func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (req *s3.PutObjectInput, md5sumHex string, err error) { +// Info needed for an upload +type uploadInfo struct { + req *s3.PutObjectInput + md5sumHex string +} + +// Prepare object for being uploaded +func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (ui uploadInfo, err error) { bucket, bucketPath := o.split() // Create parent dir/bucket if not saving directory marker if !strings.HasSuffix(o.remote, "/") { err := o.fs.mkdirParent(ctx, o.remote) if err != nil { - return nil, "", err + return ui, err } } modTime := src.ModTime(ctx) - req = &s3.PutObjectInput{ + ui.req = &s3.PutObjectInput{ Bucket: &bucket, ACL: stringPointerOrNil(o.fs.opt.ACL), Key: &bucketPath, @@ -5782,30 +5689,30 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs // Fetch metadata if --metadata is in use meta, err := fs.GetMetadataOptions(ctx, src, options) if err != nil { - return nil, "", fmt.Errorf("failed to read metadata from source object: %w", err) + return ui, fmt.Errorf("failed to read metadata from source object: %w", err) } - req.Metadata = make(map[string]*string, len(meta)+2) + ui.req.Metadata = make(map[string]*string, len(meta)+2) // merge metadata into request and user metadata for k, v := range meta { pv := aws.String(v) k = strings.ToLower(k) if o.fs.opt.NoSystemMetadata { - req.Metadata[k] = pv + ui.req.Metadata[k] = pv continue } switch k { case "cache-control": - req.CacheControl = pv + ui.req.CacheControl = pv case "content-disposition": - req.ContentDisposition = pv + ui.req.ContentDisposition = pv case "content-encoding": - req.ContentEncoding = pv + ui.req.ContentEncoding = pv case "content-language": - req.ContentLanguage = pv + ui.req.ContentLanguage = pv case "content-type": - req.ContentType = pv + ui.req.ContentType = pv case "x-amz-tagging": - req.Tagging = pv + ui.req.Tagging = pv case "tier": // ignore case "mtime": @@ -5818,14 +5725,14 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs } case "btime": // write as metadata since we can't set it - req.Metadata[k] = pv + ui.req.Metadata[k] = pv default: - req.Metadata[k] = pv + ui.req.Metadata[k] = pv } } // Set the mtime in the meta data - req.Metadata[metaMtime] = aws.String(swift.TimeToFloatString(modTime)) + ui.req.Metadata[metaMtime] = aws.String(swift.TimeToFloatString(modTime)) // read the md5sum if available // - for non multipart @@ -5837,9 +5744,9 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs size := src.Size() multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) if !multipart || !o.fs.opt.DisableChecksum { - md5sumHex, err = src.Hash(ctx, hash.MD5) - if err == nil && matchMd5.MatchString(md5sumHex) { - hashBytes, err := hex.DecodeString(md5sumHex) + ui.md5sumHex, err = src.Hash(ctx, hash.MD5) + if err == nil && matchMd5.MatchString(ui.md5sumHex) { + hashBytes, err := hex.DecodeString(ui.md5sumHex) if err == nil { md5sumBase64 = base64.StdEncoding.EncodeToString(hashBytes) if (multipart || o.fs.etagIsNotMD5) && !o.fs.opt.DisableChecksum { @@ -5847,42 +5754,42 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs // - a multipart upload // - the Etag is not an MD5, eg when using SSE/SSE-C // provided checksums aren't disabled - req.Metadata[metaMD5Hash] = &md5sumBase64 + ui.req.Metadata[metaMD5Hash] = &md5sumBase64 } } } } // Set the content type if it isn't set already - if req.ContentType == nil { - req.ContentType = aws.String(fs.MimeType(ctx, src)) + if ui.req.ContentType == nil { + ui.req.ContentType = aws.String(fs.MimeType(ctx, src)) } if size >= 0 { - req.ContentLength = &size + ui.req.ContentLength = &size } if md5sumBase64 != "" { - req.ContentMD5 = &md5sumBase64 + ui.req.ContentMD5 = &md5sumBase64 } if o.fs.opt.RequesterPays { - req.RequestPayer = aws.String(s3.RequestPayerRequester) + ui.req.RequestPayer = aws.String(s3.RequestPayerRequester) } if o.fs.opt.ServerSideEncryption != "" { - req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption + ui.req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption } if o.fs.opt.SSECustomerAlgorithm != "" { - req.SSECustomerAlgorithm = &o.fs.opt.SSECustomerAlgorithm + ui.req.SSECustomerAlgorithm = &o.fs.opt.SSECustomerAlgorithm } if o.fs.opt.SSECustomerKey != "" { - req.SSECustomerKey = &o.fs.opt.SSECustomerKey + ui.req.SSECustomerKey = &o.fs.opt.SSECustomerKey } if o.fs.opt.SSECustomerKeyMD5 != "" { - req.SSECustomerKeyMD5 = &o.fs.opt.SSECustomerKeyMD5 + ui.req.SSECustomerKeyMD5 = &o.fs.opt.SSECustomerKeyMD5 } if o.fs.opt.SSEKMSKeyID != "" { - req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID + ui.req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID } if o.fs.opt.StorageClass != "" { - req.StorageClass = &o.fs.opt.StorageClass + ui.req.StorageClass = &o.fs.opt.StorageClass } // Apply upload options for _, option := range options { @@ -5892,22 +5799,22 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs case "": // ignore case "cache-control": - req.CacheControl = aws.String(value) + ui.req.CacheControl = aws.String(value) case "content-disposition": - req.ContentDisposition = aws.String(value) + ui.req.ContentDisposition = aws.String(value) case "content-encoding": - req.ContentEncoding = aws.String(value) + ui.req.ContentEncoding = aws.String(value) case "content-language": - req.ContentLanguage = aws.String(value) + ui.req.ContentLanguage = aws.String(value) case "content-type": - req.ContentType = aws.String(value) + ui.req.ContentType = aws.String(value) case "x-amz-tagging": - req.Tagging = aws.String(value) + ui.req.Tagging = aws.String(value) default: const amzMetaPrefix = "x-amz-meta-" if strings.HasPrefix(lowerKey, amzMetaPrefix) { metaKey := lowerKey[len(amzMetaPrefix):] - req.Metadata[metaKey] = aws.String(value) + ui.req.Metadata[metaKey] = aws.String(value) } else { fs.Errorf(o, "Don't know how to set key %q on upload", key) } @@ -5915,20 +5822,20 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs } // Check metadata keys and values are valid - for key, value := range req.Metadata { + for key, value := range ui.req.Metadata { if !httpguts.ValidHeaderFieldName(key) { fs.Errorf(o, "Dropping invalid metadata key %q", key) - delete(req.Metadata, key) + delete(ui.req.Metadata, key) } else if value == nil { fs.Errorf(o, "Dropping nil metadata value for key %q", key) - delete(req.Metadata, key) + delete(ui.req.Metadata, key) } else if !httpguts.ValidHeaderFieldValue(*value) { fs.Errorf(o, "Dropping invalid metadata value %q for key %q", *value, key) - delete(req.Metadata, key) + delete(ui.req.Metadata, key) } } - return req, md5sumHex, nil + return ui, nil } // Update the Object from in with modTime and size @@ -5944,20 +5851,19 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op var lastModified time.Time // Time we got from the upload var versionID *string // versionID we got from the upload var err error - var md5sumHex string - var req *s3.PutObjectInput + var ui uploadInfo if multipart { - wantETag, gotETag, versionID, err = o.uploadMultipart(ctx, src, in) + wantETag, gotETag, versionID, ui, err = o.uploadMultipart(ctx, src, in) } else { - req, md5sumHex, err = o.buildS3Req(ctx, src, options) + ui, err = o.prepareUpload(ctx, src, options) if err != nil { - return fmt.Errorf("failed to build s3 request: %w", err) + return fmt.Errorf("failed to prepare upload: %w", err) } if o.fs.opt.UsePresignedRequest { - gotETag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, req, size, in) + gotETag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, ui.req, size, in) } else { - gotETag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, req, size, in) + gotETag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, ui.req, size, in) } } if err != nil { @@ -5977,8 +5883,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if o.fs.opt.NoHead && size >= 0 { head = new(s3.HeadObjectOutput) //structs.SetFrom(head, &req) - setFrom_s3HeadObjectOutput_s3PutObjectInput(head, req) - head.ETag = &md5sumHex // doesn't matter quotes are missing + setFrom_s3HeadObjectOutput_s3PutObjectInput(head, ui.req) + head.ETag = &ui.md5sumHex // doesn't matter quotes are missing head.ContentLength = &size // We get etag back from single and multipart upload so fill it in here if gotETag != "" { @@ -6116,16 +6022,17 @@ func (o *Object) Metadata(ctx context.Context) (metadata fs.Metadata, err error) // Check the interfaces are satisfied var ( - _ fs.Fs = &Fs{} - _ fs.Purger = &Fs{} - _ fs.Copier = &Fs{} - _ fs.PutStreamer = &Fs{} - _ fs.ListRer = &Fs{} - _ fs.Commander = &Fs{} - _ fs.CleanUpper = &Fs{} - _ fs.Object = &Object{} - _ fs.MimeTyper = &Object{} - _ fs.GetTierer = &Object{} - _ fs.SetTierer = &Object{} - _ fs.Metadataer = &Object{} + _ fs.Fs = &Fs{} + _ fs.Purger = &Fs{} + _ fs.Copier = &Fs{} + _ fs.PutStreamer = &Fs{} + _ fs.ListRer = &Fs{} + _ fs.Commander = &Fs{} + _ fs.CleanUpper = &Fs{} + _ fs.OpenChunkWriter = &Fs{} + _ fs.Object = &Object{} + _ fs.MimeTyper = &Object{} + _ fs.GetTierer = &Object{} + _ fs.SetTierer = &Object{} + _ fs.Metadataer = &Object{} ) diff --git a/lib/multipart/multipart.go b/lib/multipart/multipart.go new file mode 100644 index 000000000..a54c357eb --- /dev/null +++ b/lib/multipart/multipart.go @@ -0,0 +1,144 @@ +package multipart + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/lib/atexit" + "github.com/rclone/rclone/lib/pacer" + "github.com/rclone/rclone/lib/pool" + "golang.org/x/sync/errgroup" +) + +const ( + bufferSize = 1024 * 1024 // default size of the pages used in the reader + bufferCacheSize = 64 // max number of buffers to keep in cache + bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long +) + +// bufferPool is a global pool of buffers +var ( + bufferPool *pool.Pool + bufferPoolOnce sync.Once +) + +// get a buffer pool +func getPool() *pool.Pool { + bufferPoolOnce.Do(func() { + ci := fs.GetConfig(context.Background()) + // Initialise the buffer pool when used + bufferPool = pool.New(bufferCacheFlushTime, bufferSize, bufferCacheSize, ci.UseMmap) + }) + return bufferPool +} + +// Get a pool.RW using the multipart pool +func NewRW() *pool.RW { + return pool.NewRW(getPool()) +} + +// UploadMultipartOptions options for the generic multipart upload +type UploadMultipartOptions struct { + Open fs.OpenChunkWriter // thing to call OpenChunkWriter on + OpenOptions []fs.OpenOption // options for OpenChunkWriter + Concurrency int // number of simultaneous uploads to do + LeavePartsOnError bool // if set don't delete parts uploaded so far on error +} + +// Do a generic multipart upload from src using f as OpenChunkWriter. +// +// in is read seqentially and chunks from it are uploaded in parallel. +// +// It returns the chunkWriter used in case the caller needs to extract any private info from it. +func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt UploadMultipartOptions) (chunkWriterOut fs.ChunkWriter, err error) { + chunkSize, chunkWriter, err := opt.Open.OpenChunkWriter(ctx, src.Remote(), src, opt.OpenOptions...) + if err != nil { + return nil, fmt.Errorf("multipart upload failed to initialise: %w", err) + } + + // make concurrency machinery + concurrency := opt.Concurrency + if concurrency < 1 { + concurrency = 1 + } + tokens := pacer.NewTokenDispenser(concurrency) + + uploadCtx, cancel := context.WithCancel(ctx) + defer atexit.OnError(&err, func() { + cancel() + if opt.LeavePartsOnError { + return + } + fs.Debugf(src, "Cancelling multipart upload") + errCancel := chunkWriter.Abort(ctx) + if errCancel != nil { + fs.Debugf(src, "Failed to cancel multipart upload: %v", errCancel) + } + })() + + var ( + g, gCtx = errgroup.WithContext(uploadCtx) + finished = false + off int64 + size = src.Size() + ) + + for partNum := int64(0); !finished; partNum++ { + // Get a block of memory from the pool and token which limits concurrency. + tokens.Get() + rw := NewRW() + + free := func() { + // return the memory and token + _ = rw.Close() // Can't return an error + tokens.Put() + } + + // Fail fast, in case an errgroup managed function returns an error + // gCtx is cancelled. There is no point in uploading all the other parts. + if gCtx.Err() != nil { + free() + break + } + + // Read the chunk + var n int64 + n, err = io.CopyN(rw, in, chunkSize) + if err == io.EOF { + if n == 0 && partNum != 0 { // end if no data and if not first chunk + free() + break + } + finished = true + } else if err != nil { + free() + return nil, fmt.Errorf("multipart upload: failed to read source: %w", err) + } + + partNum := partNum + partOff := off + off += n + g.Go(func() (err error) { + defer free() + fs.Debugf(src, "multipart upload: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(partOff), fs.SizeSuffix(size)) + _, err = chunkWriter.WriteChunk(gCtx, int(partNum), rw) + return err + }) + } + + err = g.Wait() + if err != nil { + return nil, err + } + + err = chunkWriter.Close(ctx) + if err != nil { + return nil, fmt.Errorf("multipart upload: failed to finalise: %w", err) + } + + return chunkWriter, nil +}