s3: factor generic multipart upload into lib/multipart #7056

This makes the memory controls of the s3 backend inoperative and
replaced with the global ones.

    --s3-memory-pool-flush-time
    --s3-memory-pool-use-mmap

By using the buffered reader this fixes excessive memory use when
uploading large files as it will share memory pages between all
readers.

Fixes #7141
This commit is contained in:
Nick Craig-Wood 2023-08-15 20:38:02 +01:00
parent 0d0bcdac31
commit 4c76fac594
2 changed files with 274 additions and 223 deletions

View file

@ -53,13 +53,12 @@ import (
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"github.com/rclone/rclone/lib/readers"
"github.com/rclone/rclone/lib/rest"
"github.com/rclone/rclone/lib/version"
"golang.org/x/net/http/httpguts"
"golang.org/x/sync/errgroup"
)
// Register with Fs
@ -2279,17 +2278,16 @@ very small even with this flag.
encoder.EncodeDot,
}, {
Name: "memory_pool_flush_time",
Default: memoryPoolFlushTime,
Default: fs.Duration(time.Minute),
Advanced: true,
Help: `How often internal memory buffer pools will be flushed.
Uploads which requires additional buffers (f.e multipart) will use memory pool for allocations.
This option controls how often unused buffers will be removed from the pool.`,
Hide: fs.OptionHideBoth,
Help: `How often internal memory buffer pools will be flushed. (no longer used)`,
}, {
Name: "memory_pool_use_mmap",
Default: memoryPoolUseMmap,
Default: false,
Advanced: true,
Help: `Whether to use mmap buffers in internal memory pool.`,
Hide: fs.OptionHideBoth,
Help: `Whether to use mmap buffers in internal memory pool. (no longer used)`,
}, {
Name: "disable_http2",
Default: false,
@ -2440,10 +2438,7 @@ const (
minChunkSize = fs.SizeSuffix(1024 * 1024 * 5)
defaultUploadCutoff = fs.SizeSuffix(200 * 1024 * 1024)
maxUploadCutoff = fs.SizeSuffix(5 * 1024 * 1024 * 1024)
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
memoryPoolFlushTime = fs.Duration(time.Minute) // flush the cached buffers after this long
memoryPoolUseMmap = false
minSleep = 10 * time.Millisecond // In case of error, start at 10ms sleep.
maxExpireDuration = fs.Duration(7 * 24 * time.Hour) // max expiry is 1 week
)
@ -2543,8 +2538,6 @@ type Options struct {
NoHead bool `config:"no_head"`
NoHeadObject bool `config:"no_head_object"`
Enc encoder.MultiEncoder `config:"encoding"`
MemoryPoolFlushTime fs.Duration `config:"memory_pool_flush_time"`
MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"`
DisableHTTP2 bool `config:"disable_http2"`
DownloadURL string `config:"download_url"`
DirectoryMarkers bool `config:"directory_markers"`
@ -2574,7 +2567,6 @@ type Fs struct {
pacer *fs.Pacer // To pace the API calls
srv *http.Client // a plain http client
srvRest *rest.Client // the rest connection to the server
pool *pool.Pool // memory pool
etagIsNotMD5 bool // if set ETags are not MD5s
versioningMu sync.Mutex
versioning fs.Tristate // if set bucket is using versions
@ -3176,12 +3168,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
cache: bucket.NewCache(),
srv: srv,
srvRest: rest.NewClient(fshttp.NewClient(ctx)),
pool: pool.New(
time.Duration(opt.MemoryPoolFlushTime),
int(opt.ChunkSize),
opt.UploadConcurrency*ci.Transfers,
opt.MemoryPoolUseMmap,
),
}
if opt.ServerSideEncryption == "aws:kms" || opt.SSECustomerAlgorithm != "" {
// From: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html
@ -4376,19 +4362,6 @@ func (f *Fs) Hashes() hash.Set {
return hash.Set(hash.MD5)
}
func (f *Fs) getMemoryPool(size int64) *pool.Pool {
if size == int64(f.opt.ChunkSize) {
return f.pool
}
return pool.New(
time.Duration(f.opt.MemoryPoolFlushTime),
int(size),
f.opt.UploadConcurrency*f.ci.Transfers,
f.opt.MemoryPoolUseMmap,
)
}
// PublicLink generates a public link to the remote path (usually readable by anyone)
func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (link string, err error) {
if strings.HasSuffix(remote, "/") {
@ -5316,28 +5289,43 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
var warnStreamUpload sync.Once
// state of ChunkWriter
type s3ChunkWriter struct {
chunkSize int64
size int64
f *Fs
bucket *string
key *string
uploadId *string
multiPartUploadInput *s3.CreateMultipartUploadInput
completedPartsMu sync.Mutex
completedParts []*s3.CompletedPart
eTag string
versionID string
md5sMu sync.Mutex
md5s []byte
ui uploadInfo
o *Object
}
// OpenChunkWriter returns the chunk size and a ChunkWriter
//
// Pass in the remote and the src object
// You can also use options to hint at the desired chunk size
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) {
// This duplicates part of the logic in Update, however it is
// required until we migrate the MultiPartUpload to
// OpenChunkWriter/multi-thread op completely.
// Temporary Object under construction
o := &Object{
fs: f,
remote: remote,
}
req, _, err := o.buildS3Req(ctx, src, options)
ui, err := o.prepareUpload(ctx, src, options)
if err != nil {
return -1, nil, fmt.Errorf("failed to build s3 request: %w", err)
return -1, nil, fmt.Errorf("failed to prepare upload: %w", err)
}
//structs.SetFrom(&mReq, req)
var mReq s3.CreateMultipartUploadInput
setFrom_s3CreateMultipartUploadInput_s3PutObjectInput(&mReq, req)
setFrom_s3CreateMultipartUploadInput_s3PutObjectInput(&mReq, ui.req)
uploadParts := f.opt.MaxUploadParts
if uploadParts < 1 {
@ -5372,7 +5360,6 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
}
chunkWriter := &s3ChunkWriter{
ctx: ctx,
chunkSize: int64(chunkSize),
size: size,
f: f,
@ -5381,28 +5368,13 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn
uploadId: mOut.UploadId,
multiPartUploadInput: &mReq,
completedParts: make([]*s3.CompletedPart, 0),
ui: ui,
o: o,
}
fs.Debugf(f, "open chunk writer: started multipart upload: %v", *mOut.UploadId)
fs.Debugf(o, "open chunk writer: started multipart upload: %v", *mOut.UploadId)
return int64(chunkSize), chunkWriter, err
}
type s3ChunkWriter struct {
ctx context.Context
chunkSize int64
size int64
f *Fs
bucket *string
key *string
uploadId *string
multiPartUploadInput *s3.CreateMultipartUploadInput
completedPartsMu sync.Mutex
completedParts []*s3.CompletedPart
eTag string
versionID string
md5sMu sync.Mutex
md5s []byte
}
// add a part number and etag to the completed parts
func (w *s3ChunkWriter) addCompletedPart(partNum *int64, eTag *string) {
w.completedPartsMu.Lock()
@ -5437,19 +5409,17 @@ func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader
// possible in AWS SDK v2 with trailers?
m := md5.New()
currentChunkSize, err := io.Copy(m, reader)
if err != nil && err != io.EOF {
if err != nil {
return -1, err
}
// If no data read, don't write the chunk
if currentChunkSize == 0 {
return 0, nil
}
md5sumBinary := m.Sum([]byte{})
w.addMd5(&md5sumBinary, int64(chunkNumber))
md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:])
// reset the reader after we calculated the md5
_, err = reader.Seek(0, io.SeekStart)
if err != nil {
return -1, err
}
// S3 requires 1 <= PartNumber <= 10000
s3PartNumber := aws.Int64(int64(chunkNumber + 1))
uploadPartReq := &s3.UploadPartInput{
@ -5467,10 +5437,15 @@ func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader
}
var uout *s3.UploadPartOutput
err = w.f.pacer.Call(func() (bool, error) {
uout, err = w.f.c.UploadPartWithContext(w.ctx, uploadPartReq)
// rewind the reader on retry and after reading md5
_, err = reader.Seek(0, io.SeekStart)
if err != nil {
return false, err
}
uout, err = w.f.c.UploadPartWithContext(ctx, uploadPartReq)
if err != nil {
if chunkNumber <= 8 {
return w.f.shouldRetry(w.ctx, err)
return w.f.shouldRetry(ctx, err)
}
// retry all chunks once have done the first few
return true, err
@ -5483,7 +5458,7 @@ func (w *s3ChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader
w.addCompletedPart(s3PartNumber, uout.ETag)
fs.Debugf(w.f, "multipart upload wrote chunk %d with %v bytes and etag %v", chunkNumber+1, currentChunkSize, *uout.ETag)
fs.Debugf(w.o, "multipart upload wrote chunk %d with %v bytes and etag %v", chunkNumber+1, currentChunkSize, *uout.ETag)
return currentChunkSize, err
}
@ -5496,12 +5471,12 @@ func (w *s3ChunkWriter) Abort(ctx context.Context) error {
UploadId: w.uploadId,
RequestPayer: w.multiPartUploadInput.RequestPayer,
})
return w.f.shouldRetry(w.ctx, err)
return w.f.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to abort multipart upload %q: %w", *w.uploadId, err)
}
fs.Debugf(w.f, "multipart upload %q aborted", *w.uploadId)
fs.Debugf(w.o, "multipart upload %q aborted", *w.uploadId)
return err
}
@ -5513,7 +5488,7 @@ func (w *s3ChunkWriter) Close(ctx context.Context) (err error) {
})
var resp *s3.CompleteMultipartUploadOutput
err = w.f.pacer.Call(func() (bool, error) {
resp, err = w.f.c.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{
resp, err = w.f.c.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: w.bucket,
Key: w.key,
MultipartUpload: &s3.CompletedMultipartUpload{
@ -5522,7 +5497,7 @@ func (w *s3ChunkWriter) Close(ctx context.Context) (err error) {
RequestPayer: w.multiPartUploadInput.RequestPayer,
UploadId: w.uploadId,
})
return w.f.shouldRetry(w.ctx, err)
return w.f.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to complete multipart upload %q: %w", *w.uploadId, err)
@ -5535,94 +5510,19 @@ func (w *s3ChunkWriter) Close(ctx context.Context) (err error) {
w.versionID = *resp.VersionId
}
}
fs.Debugf(w.f, "multipart upload %q closed", *w.uploadId)
fs.Debugf(w.o, "multipart upload %q finished", *w.uploadId)
return err
}
func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader) (wantETag, gotETag string, versionID *string, err error) {
f := o.fs
// make concurrency machinery
concurrency := f.opt.UploadConcurrency
if concurrency < 1 {
concurrency = 1
}
tokens := pacer.NewTokenDispenser(concurrency)
chunkSize, chunkWriter, err := f.OpenChunkWriter(ctx, src.Remote(), src)
func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, options ...fs.OpenOption) (wantETag, gotETag string, versionID *string, ui uploadInfo, err error) {
chunkWriter, err := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.fs,
Concurrency: o.fs.opt.UploadConcurrency,
LeavePartsOnError: o.fs.opt.LeavePartsOnError,
OpenOptions: options,
})
if err != nil {
return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to initialise: %w", err)
}
memPool := f.getMemoryPool(chunkSize)
uploadCtx, cancel := context.WithCancel(ctx)
defer atexit.OnError(&err, func() {
cancel()
if o.fs.opt.LeavePartsOnError {
return
}
fs.Debugf(o, "Cancelling multipart upload")
errCancel := chunkWriter.Abort(ctx)
if errCancel != nil {
fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel)
}
})()
var (
g, gCtx = errgroup.WithContext(uploadCtx)
finished = false
off int64
)
for partNum := int64(0); !finished; partNum++ {
// Get a block of memory from the pool and token which limits concurrency.
tokens.Get()
buf := memPool.Get()
free := func() {
// return the memory and token
memPool.Put(buf)
tokens.Put()
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
var n int
n, err = readers.ReadFill(in, buf) // this can never return 0, nil
if err == io.EOF {
if n == 0 && partNum != 0 { // end if no data and if not first chunk
free()
break
}
finished = true
} else if err != nil {
free()
return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to read source: %w", err)
}
buf = buf[:n]
partNum := partNum
fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(src.Size()))
off += int64(n)
g.Go(func() (err error) {
defer free()
_, err = chunkWriter.WriteChunk(gCtx, int(partNum), bytes.NewReader(buf))
return err
})
}
err = g.Wait()
if err != nil {
return wantETag, gotETag, nil, err
}
err = chunkWriter.Close(ctx)
if err != nil {
return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err)
return wantETag, gotETag, versionID, ui, err
}
var s3cw *s3ChunkWriter = chunkWriter.(*s3ChunkWriter)
@ -5632,7 +5532,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R
hashOfHashes := md5.Sum(s3cw.md5s)
wantETag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(s3cw.completedParts))
return wantETag, gotETag, versionID, nil
return wantETag, gotETag, versionID, s3cw.ui, nil
}
// unWrapAwsError unwraps AWS errors, looking for a non AWS error
@ -5762,18 +5662,25 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P
return etag, lastModified, versionID, nil
}
func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (req *s3.PutObjectInput, md5sumHex string, err error) {
// Info needed for an upload
type uploadInfo struct {
req *s3.PutObjectInput
md5sumHex string
}
// Prepare object for being uploaded
func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (ui uploadInfo, err error) {
bucket, bucketPath := o.split()
// Create parent dir/bucket if not saving directory marker
if !strings.HasSuffix(o.remote, "/") {
err := o.fs.mkdirParent(ctx, o.remote)
if err != nil {
return nil, "", err
return ui, err
}
}
modTime := src.ModTime(ctx)
req = &s3.PutObjectInput{
ui.req = &s3.PutObjectInput{
Bucket: &bucket,
ACL: stringPointerOrNil(o.fs.opt.ACL),
Key: &bucketPath,
@ -5782,30 +5689,30 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs
// Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options)
if err != nil {
return nil, "", fmt.Errorf("failed to read metadata from source object: %w", err)
return ui, fmt.Errorf("failed to read metadata from source object: %w", err)
}
req.Metadata = make(map[string]*string, len(meta)+2)
ui.req.Metadata = make(map[string]*string, len(meta)+2)
// merge metadata into request and user metadata
for k, v := range meta {
pv := aws.String(v)
k = strings.ToLower(k)
if o.fs.opt.NoSystemMetadata {
req.Metadata[k] = pv
ui.req.Metadata[k] = pv
continue
}
switch k {
case "cache-control":
req.CacheControl = pv
ui.req.CacheControl = pv
case "content-disposition":
req.ContentDisposition = pv
ui.req.ContentDisposition = pv
case "content-encoding":
req.ContentEncoding = pv
ui.req.ContentEncoding = pv
case "content-language":
req.ContentLanguage = pv
ui.req.ContentLanguage = pv
case "content-type":
req.ContentType = pv
ui.req.ContentType = pv
case "x-amz-tagging":
req.Tagging = pv
ui.req.Tagging = pv
case "tier":
// ignore
case "mtime":
@ -5818,14 +5725,14 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs
}
case "btime":
// write as metadata since we can't set it
req.Metadata[k] = pv
ui.req.Metadata[k] = pv
default:
req.Metadata[k] = pv
ui.req.Metadata[k] = pv
}
}
// Set the mtime in the meta data
req.Metadata[metaMtime] = aws.String(swift.TimeToFloatString(modTime))
ui.req.Metadata[metaMtime] = aws.String(swift.TimeToFloatString(modTime))
// read the md5sum if available
// - for non multipart
@ -5837,9 +5744,9 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs
size := src.Size()
multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
if !multipart || !o.fs.opt.DisableChecksum {
md5sumHex, err = src.Hash(ctx, hash.MD5)
if err == nil && matchMd5.MatchString(md5sumHex) {
hashBytes, err := hex.DecodeString(md5sumHex)
ui.md5sumHex, err = src.Hash(ctx, hash.MD5)
if err == nil && matchMd5.MatchString(ui.md5sumHex) {
hashBytes, err := hex.DecodeString(ui.md5sumHex)
if err == nil {
md5sumBase64 = base64.StdEncoding.EncodeToString(hashBytes)
if (multipart || o.fs.etagIsNotMD5) && !o.fs.opt.DisableChecksum {
@ -5847,42 +5754,42 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs
// - a multipart upload
// - the Etag is not an MD5, eg when using SSE/SSE-C
// provided checksums aren't disabled
req.Metadata[metaMD5Hash] = &md5sumBase64
ui.req.Metadata[metaMD5Hash] = &md5sumBase64
}
}
}
}
// Set the content type if it isn't set already
if req.ContentType == nil {
req.ContentType = aws.String(fs.MimeType(ctx, src))
if ui.req.ContentType == nil {
ui.req.ContentType = aws.String(fs.MimeType(ctx, src))
}
if size >= 0 {
req.ContentLength = &size
ui.req.ContentLength = &size
}
if md5sumBase64 != "" {
req.ContentMD5 = &md5sumBase64
ui.req.ContentMD5 = &md5sumBase64
}
if o.fs.opt.RequesterPays {
req.RequestPayer = aws.String(s3.RequestPayerRequester)
ui.req.RequestPayer = aws.String(s3.RequestPayerRequester)
}
if o.fs.opt.ServerSideEncryption != "" {
req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption
ui.req.ServerSideEncryption = &o.fs.opt.ServerSideEncryption
}
if o.fs.opt.SSECustomerAlgorithm != "" {
req.SSECustomerAlgorithm = &o.fs.opt.SSECustomerAlgorithm
ui.req.SSECustomerAlgorithm = &o.fs.opt.SSECustomerAlgorithm
}
if o.fs.opt.SSECustomerKey != "" {
req.SSECustomerKey = &o.fs.opt.SSECustomerKey
ui.req.SSECustomerKey = &o.fs.opt.SSECustomerKey
}
if o.fs.opt.SSECustomerKeyMD5 != "" {
req.SSECustomerKeyMD5 = &o.fs.opt.SSECustomerKeyMD5
ui.req.SSECustomerKeyMD5 = &o.fs.opt.SSECustomerKeyMD5
}
if o.fs.opt.SSEKMSKeyID != "" {
req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID
ui.req.SSEKMSKeyId = &o.fs.opt.SSEKMSKeyID
}
if o.fs.opt.StorageClass != "" {
req.StorageClass = &o.fs.opt.StorageClass
ui.req.StorageClass = &o.fs.opt.StorageClass
}
// Apply upload options
for _, option := range options {
@ -5892,22 +5799,22 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs
case "":
// ignore
case "cache-control":
req.CacheControl = aws.String(value)
ui.req.CacheControl = aws.String(value)
case "content-disposition":
req.ContentDisposition = aws.String(value)
ui.req.ContentDisposition = aws.String(value)
case "content-encoding":
req.ContentEncoding = aws.String(value)
ui.req.ContentEncoding = aws.String(value)
case "content-language":
req.ContentLanguage = aws.String(value)
ui.req.ContentLanguage = aws.String(value)
case "content-type":
req.ContentType = aws.String(value)
ui.req.ContentType = aws.String(value)
case "x-amz-tagging":
req.Tagging = aws.String(value)
ui.req.Tagging = aws.String(value)
default:
const amzMetaPrefix = "x-amz-meta-"
if strings.HasPrefix(lowerKey, amzMetaPrefix) {
metaKey := lowerKey[len(amzMetaPrefix):]
req.Metadata[metaKey] = aws.String(value)
ui.req.Metadata[metaKey] = aws.String(value)
} else {
fs.Errorf(o, "Don't know how to set key %q on upload", key)
}
@ -5915,20 +5822,20 @@ func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs
}
// Check metadata keys and values are valid
for key, value := range req.Metadata {
for key, value := range ui.req.Metadata {
if !httpguts.ValidHeaderFieldName(key) {
fs.Errorf(o, "Dropping invalid metadata key %q", key)
delete(req.Metadata, key)
delete(ui.req.Metadata, key)
} else if value == nil {
fs.Errorf(o, "Dropping nil metadata value for key %q", key)
delete(req.Metadata, key)
delete(ui.req.Metadata, key)
} else if !httpguts.ValidHeaderFieldValue(*value) {
fs.Errorf(o, "Dropping invalid metadata value %q for key %q", *value, key)
delete(req.Metadata, key)
delete(ui.req.Metadata, key)
}
}
return req, md5sumHex, nil
return ui, nil
}
// Update the Object from in with modTime and size
@ -5944,20 +5851,19 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
var lastModified time.Time // Time we got from the upload
var versionID *string // versionID we got from the upload
var err error
var md5sumHex string
var req *s3.PutObjectInput
var ui uploadInfo
if multipart {
wantETag, gotETag, versionID, err = o.uploadMultipart(ctx, src, in)
wantETag, gotETag, versionID, ui, err = o.uploadMultipart(ctx, src, in)
} else {
req, md5sumHex, err = o.buildS3Req(ctx, src, options)
ui, err = o.prepareUpload(ctx, src, options)
if err != nil {
return fmt.Errorf("failed to build s3 request: %w", err)
return fmt.Errorf("failed to prepare upload: %w", err)
}
if o.fs.opt.UsePresignedRequest {
gotETag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, req, size, in)
gotETag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, ui.req, size, in)
} else {
gotETag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, req, size, in)
gotETag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, ui.req, size, in)
}
}
if err != nil {
@ -5977,8 +5883,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if o.fs.opt.NoHead && size >= 0 {
head = new(s3.HeadObjectOutput)
//structs.SetFrom(head, &req)
setFrom_s3HeadObjectOutput_s3PutObjectInput(head, req)
head.ETag = &md5sumHex // doesn't matter quotes are missing
setFrom_s3HeadObjectOutput_s3PutObjectInput(head, ui.req)
head.ETag = &ui.md5sumHex // doesn't matter quotes are missing
head.ContentLength = &size
// We get etag back from single and multipart upload so fill it in here
if gotETag != "" {
@ -6116,16 +6022,17 @@ func (o *Object) Metadata(ctx context.Context) (metadata fs.Metadata, err error)
// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Purger = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.Commander = &Fs{}
_ fs.CleanUpper = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}
_ fs.GetTierer = &Object{}
_ fs.SetTierer = &Object{}
_ fs.Metadataer = &Object{}
_ fs.Fs = &Fs{}
_ fs.Purger = &Fs{}
_ fs.Copier = &Fs{}
_ fs.PutStreamer = &Fs{}
_ fs.ListRer = &Fs{}
_ fs.Commander = &Fs{}
_ fs.CleanUpper = &Fs{}
_ fs.OpenChunkWriter = &Fs{}
_ fs.Object = &Object{}
_ fs.MimeTyper = &Object{}
_ fs.GetTierer = &Object{}
_ fs.SetTierer = &Object{}
_ fs.Metadataer = &Object{}
)

144
lib/multipart/multipart.go Normal file
View file

@ -0,0 +1,144 @@
package multipart
import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/lib/atexit"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/pool"
"golang.org/x/sync/errgroup"
)
const (
bufferSize = 1024 * 1024 // default size of the pages used in the reader
bufferCacheSize = 64 // max number of buffers to keep in cache
bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long
)
// bufferPool is a global pool of buffers
var (
bufferPool *pool.Pool
bufferPoolOnce sync.Once
)
// get a buffer pool
func getPool() *pool.Pool {
bufferPoolOnce.Do(func() {
ci := fs.GetConfig(context.Background())
// Initialise the buffer pool when used
bufferPool = pool.New(bufferCacheFlushTime, bufferSize, bufferCacheSize, ci.UseMmap)
})
return bufferPool
}
// Get a pool.RW using the multipart pool
func NewRW() *pool.RW {
return pool.NewRW(getPool())
}
// UploadMultipartOptions options for the generic multipart upload
type UploadMultipartOptions struct {
Open fs.OpenChunkWriter // thing to call OpenChunkWriter on
OpenOptions []fs.OpenOption // options for OpenChunkWriter
Concurrency int // number of simultaneous uploads to do
LeavePartsOnError bool // if set don't delete parts uploaded so far on error
}
// Do a generic multipart upload from src using f as OpenChunkWriter.
//
// in is read seqentially and chunks from it are uploaded in parallel.
//
// It returns the chunkWriter used in case the caller needs to extract any private info from it.
func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt UploadMultipartOptions) (chunkWriterOut fs.ChunkWriter, err error) {
chunkSize, chunkWriter, err := opt.Open.OpenChunkWriter(ctx, src.Remote(), src, opt.OpenOptions...)
if err != nil {
return nil, fmt.Errorf("multipart upload failed to initialise: %w", err)
}
// make concurrency machinery
concurrency := opt.Concurrency
if concurrency < 1 {
concurrency = 1
}
tokens := pacer.NewTokenDispenser(concurrency)
uploadCtx, cancel := context.WithCancel(ctx)
defer atexit.OnError(&err, func() {
cancel()
if opt.LeavePartsOnError {
return
}
fs.Debugf(src, "Cancelling multipart upload")
errCancel := chunkWriter.Abort(ctx)
if errCancel != nil {
fs.Debugf(src, "Failed to cancel multipart upload: %v", errCancel)
}
})()
var (
g, gCtx = errgroup.WithContext(uploadCtx)
finished = false
off int64
size = src.Size()
)
for partNum := int64(0); !finished; partNum++ {
// Get a block of memory from the pool and token which limits concurrency.
tokens.Get()
rw := NewRW()
free := func() {
// return the memory and token
_ = rw.Close() // Can't return an error
tokens.Put()
}
// Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil {
free()
break
}
// Read the chunk
var n int64
n, err = io.CopyN(rw, in, chunkSize)
if err == io.EOF {
if n == 0 && partNum != 0 { // end if no data and if not first chunk
free()
break
}
finished = true
} else if err != nil {
free()
return nil, fmt.Errorf("multipart upload: failed to read source: %w", err)
}
partNum := partNum
partOff := off
off += n
g.Go(func() (err error) {
defer free()
fs.Debugf(src, "multipart upload: starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(partOff), fs.SizeSuffix(size))
_, err = chunkWriter.WriteChunk(gCtx, int(partNum), rw)
return err
})
}
err = g.Wait()
if err != nil {
return nil, err
}
err = chunkWriter.Close(ctx)
if err != nil {
return nil, fmt.Errorf("multipart upload: failed to finalise: %w", err)
}
return chunkWriter, nil
}