s3: refactor MultipartUpload to use OpenChunkWriter and ChunkWriter #7056

This commit is contained in:
Vitor Gomes 2023-07-18 20:37:31 +02:00 committed by Nick Craig-Wood
parent f36ca0cd25
commit 6dd736fbdc

View file

@ -5316,7 +5316,198 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
var warnStreamUpload sync.Once var warnStreamUpload sync.Once
func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (wantETag, gotETag string, versionID *string, err error) { func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) {
// This duplicates part of the logic in Update,
//however per my understanding it is required until we migrate the MultiPartUpload to OpenChunkWriter/multi-thread op completely
// Temporary Object under construction
o := &Object{
fs: f,
remote: remote,
}
req, _, err := o.buildS3Req(ctx, src, options)
if err != nil {
return -1, nil, fmt.Errorf("failed to build s3 request: %v", err)
}
//structs.SetFrom(&mReq, req)
var mReq s3.CreateMultipartUploadInput
setFrom_s3CreateMultipartUploadInput_s3PutObjectInput(&mReq, req)
uploadParts := f.opt.MaxUploadParts
if uploadParts < 1 {
uploadParts = 1
} else if uploadParts > maxUploadParts {
uploadParts = maxUploadParts
}
size := src.Size()
// calculate size of parts
chunkSize := f.opt.ChunkSize
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of
// 48 GiB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
f.opt.ChunkSize, fs.SizeSuffix(int64(chunkSize)*int64(uploadParts)))
})
} else {
chunkSize = chunksize.Calculator(src, size, uploadParts, chunkSize)
}
mOut, err := f.c.CreateMultipartUploadWithContext(ctx, &mReq)
if err != nil {
return -1, nil, fmt.Errorf("CreateMultipartUpload failed: %w", err)
}
chunkWriter := &s3ChunkWriter{
ctx: ctx,
chunkSize: int64(chunkSize),
size: src.Size(),
f: f,
bucket: mOut.Bucket,
key: mOut.Key,
uploadId: mOut.UploadId,
multiPartUploadInput: &mReq,
completedParts: make([]*s3.CompletedPart, 0),
}
fs.Debugf(f, "open chunk writer: started multipart upload: %v", *mOut.UploadId)
return int64(chunkSize), chunkWriter, err
}
type s3ChunkWriter struct {
ctx context.Context
chunkSize int64
size int64
f *Fs
bucket *string
key *string
uploadId *string
multiPartUploadInput *s3.CreateMultipartUploadInput
completedPartsMu sync.Mutex
completedParts []*s3.CompletedPart
eTag string
versionID string
md5sMu sync.Mutex
md5s []byte
}
func (w *s3ChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) {
if chunkNumber < 0 {
err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber)
return -1, err
}
addMd5 := func(md5binary *[]byte, chunkNumber int64) {
w.md5sMu.Lock()
defer w.md5sMu.Unlock()
start := chunkNumber * md5.Size
end := start + md5.Size
if extend := end - int64(len(w.md5s)); extend > 0 {
w.md5s = append(w.md5s, make([]byte, extend)...)
}
copy(w.md5s[start:end], (*md5binary)[:])
}
// create checksum of buffer for integrity checking
// currently there is no way to calculate the md5 without reading the chunk a 2nd time (1st read is in uploadMultipart)
// possible in AWS SDK v2 with trailers?
m := md5.New()
currentChunkSize, err := io.Copy(m, reader)
if err != nil && err != io.EOF {
return -1, err
}
md5sumBinary := m.Sum([]byte{})
addMd5(&md5sumBinary, int64(chunkNumber))
md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:])
// reset the reader after we calculated the md5
_, err = reader.Seek(0, io.SeekStart)
if err != nil {
return -1, err
}
// S3 requires 1 <= PartNumber <= 10000
s3PartNumber := aws.Int64(int64(chunkNumber + 1))
uploadPartReq := &s3.UploadPartInput{
Body: reader,
Bucket: w.bucket,
Key: w.key,
PartNumber: s3PartNumber,
UploadId: w.uploadId,
ContentMD5: &md5sum,
ContentLength: aws.Int64(currentChunkSize),
RequestPayer: w.multiPartUploadInput.RequestPayer,
SSECustomerAlgorithm: w.multiPartUploadInput.SSECustomerAlgorithm,
SSECustomerKey: w.multiPartUploadInput.SSECustomerKey,
SSECustomerKeyMD5: w.multiPartUploadInput.SSECustomerKeyMD5,
}
uout, err := w.f.c.UploadPartWithContext(w.ctx, uploadPartReq)
if err != nil {
fs.Errorf(w.f, "Failed to upload part: %v", err)
return -1, err
}
addCompletedPart := func(partNum *int64, eTag *string) {
w.completedPartsMu.Lock()
defer w.completedPartsMu.Unlock()
w.completedParts = append(w.completedParts, &s3.CompletedPart{
PartNumber: partNum,
ETag: uout.ETag,
})
}
addCompletedPart(s3PartNumber, uout.ETag)
fs.Debugf(w.f, "multipart upload wrote chunk %d with %v bytes and etag %v", chunkNumber+1, currentChunkSize, *uout.ETag)
return currentChunkSize, err
}
func (w *s3ChunkWriter) Abort() error {
_, err := w.f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: w.bucket,
Key: w.key,
UploadId: w.uploadId,
RequestPayer: w.multiPartUploadInput.RequestPayer,
})
if err != nil {
fs.Errorf(w.f, "Failed to abort multipart upload: %v", err)
}
fs.Debugf(w.f, "multipart upload '%v' aborted", *w.uploadId)
return err
}
func (w *s3ChunkWriter) Close() error {
// sort the completed parts by part number
sort.Slice(w.completedParts, func(i, j int) bool {
return *w.completedParts[i].PartNumber < *w.completedParts[j].PartNumber
})
resp, err := w.f.c.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{
Bucket: w.bucket,
Key: w.key,
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: w.completedParts,
},
RequestPayer: w.multiPartUploadInput.RequestPayer,
UploadId: w.uploadId,
})
if err != nil {
fs.Errorf(w.f, "Failed to complete multipart upload: %v", err)
}
if resp != nil {
if resp.ETag != nil {
w.eTag = *resp.ETag
}
if resp.VersionId != nil {
w.versionID = *resp.VersionId
}
}
fs.Debugf(w.f, "multipart upload '%v' closed", *w.uploadId)
return err
}
func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader) (wantETag, gotETag string, versionID *string, err error) {
f := o.fs f := o.fs
// make concurrency machinery // make concurrency machinery
@ -5325,45 +5516,19 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
concurrency = 1 concurrency = 1
} }
tokens := pacer.NewTokenDispenser(concurrency) tokens := pacer.NewTokenDispenser(concurrency)
openChunkWriter := f.Features().OpenChunkWriter
uploadParts := f.opt.MaxUploadParts var chunkWriter fs.ChunkWriter
if uploadParts < 1 { var chunkSize int64
uploadParts = 1
} else if uploadParts > maxUploadParts {
uploadParts = maxUploadParts
}
// calculate size of parts
partSize := f.opt.ChunkSize
// size can be -1 here meaning we don't know the size of the incoming file. We use ChunkSize
// buffers here (default 5 MiB). With a maximum number of parts (10,000) this will be a file of
// 48 GiB which seems like a not too unreasonable limit.
if size == -1 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
f.opt.ChunkSize, fs.SizeSuffix(int64(partSize)*int64(uploadParts)))
})
} else {
partSize = chunksize.Calculator(o, size, uploadParts, f.opt.ChunkSize)
}
memPool := f.getMemoryPool(int64(partSize))
var mReq s3.CreateMultipartUploadInput
//structs.SetFrom(&mReq, req)
setFrom_s3CreateMultipartUploadInput_s3PutObjectInput(&mReq, req)
var cout *s3.CreateMultipartUploadOutput
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
var err error var err error
cout, err = f.c.CreateMultipartUploadWithContext(ctx, &mReq) chunkSize, chunkWriter, err = openChunkWriter(ctx, src.Remote(), src)
return f.shouldRetry(ctx, err) return f.shouldRetry(ctx, err)
}) })
if err != nil { if err != nil {
return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to initialise: %w", err) return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to initialise: %w", err)
} }
uid := cout.UploadId memPool := f.getMemoryPool(chunkSize)
uploadCtx, cancel := context.WithCancel(ctx) uploadCtx, cancel := context.WithCancel(ctx)
defer atexit.OnError(&err, func() { defer atexit.OnError(&err, func() {
cancel() cancel()
@ -5372,12 +5537,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
} }
fs.Debugf(o, "Cancelling multipart upload") fs.Debugf(o, "Cancelling multipart upload")
errCancel := f.pacer.Call(func() (bool, error) { errCancel := f.pacer.Call(func() (bool, error) {
_, err := f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{ err := chunkWriter.Abort()
Bucket: req.Bucket,
Key: req.Key,
UploadId: uid,
RequestPayer: req.RequestPayer,
})
return f.shouldRetry(ctx, err) return f.shouldRetry(ctx, err)
}) })
if errCancel != nil { if errCancel != nil {
@ -5388,25 +5548,10 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
var ( var (
g, gCtx = errgroup.WithContext(uploadCtx) g, gCtx = errgroup.WithContext(uploadCtx)
finished = false finished = false
partsMu sync.Mutex // to protect parts
parts []*s3.CompletedPart
off int64 off int64
md5sMu sync.Mutex
md5s []byte
) )
addMd5 := func(md5binary *[md5.Size]byte, partNum int64) { for partNum := int64(0); !finished; partNum++ {
md5sMu.Lock()
defer md5sMu.Unlock()
start := partNum * md5.Size
end := start + md5.Size
if extend := end - int64(len(md5s)); extend > 0 {
md5s = append(md5s, make([]byte, extend)...)
}
copy(md5s[start:end], (*md5binary)[:])
}
for partNum := int64(1); !finished; partNum++ {
// Get a block of memory from the pool and token which limits concurrency. // Get a block of memory from the pool and token which limits concurrency.
tokens.Get() tokens.Get()
buf := memPool.Get() buf := memPool.Get()
@ -5428,7 +5573,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
var n int var n int
n, err = readers.ReadFill(in, buf) // this can never return 0, nil n, err = readers.ReadFill(in, buf) // this can never return 0, nil
if err == io.EOF { if err == io.EOF {
if n == 0 && partNum != 1 { // end if no data and if not first chunk if n == 0 && partNum != 0 { // end if no data and if not first chunk
free() free()
break break
} }
@ -5440,32 +5585,12 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
buf = buf[:n] buf = buf[:n]
partNum := partNum partNum := partNum
fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(size)) fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(src.Size()))
off += int64(n) off += int64(n)
g.Go(func() (err error) { g.Go(func() (err error) {
defer free() defer free()
partLength := int64(len(buf))
// create checksum of buffer for integrity checking
md5sumBinary := md5.Sum(buf)
addMd5(&md5sumBinary, partNum-1)
md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:])
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
uploadPartReq := &s3.UploadPartInput{ _, err := chunkWriter.WriteChunk(int(partNum), bytes.NewReader(buf))
Body: bytes.NewReader(buf),
Bucket: req.Bucket,
Key: req.Key,
PartNumber: &partNum,
UploadId: uid,
ContentMD5: &md5sum,
ContentLength: &partLength,
RequestPayer: req.RequestPayer,
SSECustomerAlgorithm: req.SSECustomerAlgorithm,
SSECustomerKey: req.SSECustomerKey,
SSECustomerKeyMD5: req.SSECustomerKeyMD5,
}
uout, err := f.c.UploadPartWithContext(gCtx, uploadPartReq)
if err != nil { if err != nil {
if partNum <= int64(concurrency) { if partNum <= int64(concurrency) {
return f.shouldRetry(gCtx, err) return f.shouldRetry(gCtx, err)
@ -5473,13 +5598,6 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
// retry all chunks once have done the first batch // retry all chunks once have done the first batch
return true, err return true, err
} }
partsMu.Lock()
parts = append(parts, &s3.CompletedPart{
PartNumber: &partNum,
ETag: uout.ETag,
})
partsMu.Unlock()
return false, nil return false, nil
}) })
if err != nil { if err != nil {
@ -5493,35 +5611,21 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
return wantETag, gotETag, nil, err return wantETag, gotETag, nil, err
} }
// sort the completed parts by part number
sort.Slice(parts, func(i, j int) bool {
return *parts[i].PartNumber < *parts[j].PartNumber
})
var resp *s3.CompleteMultipartUploadOutput
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
resp, err = f.c.CompleteMultipartUploadWithContext(uploadCtx, &s3.CompleteMultipartUploadInput{ err := chunkWriter.Close()
Bucket: req.Bucket,
Key: req.Key,
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: parts,
},
RequestPayer: req.RequestPayer,
UploadId: uid,
})
return f.shouldRetry(uploadCtx, err) return f.shouldRetry(uploadCtx, err)
}) })
if err != nil { if err != nil {
return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err) return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err)
} }
hashOfHashes := md5.Sum(md5s)
wantETag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(parts)) var s3cw *s3ChunkWriter = chunkWriter.(*s3ChunkWriter)
if resp != nil { gotETag = s3cw.eTag
if resp.ETag != nil { versionID = aws.String(s3cw.versionID)
gotETag = *resp.ETag
} hashOfHashes := md5.Sum(s3cw.md5s)
versionID = resp.VersionId wantETag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(s3cw.completedParts))
}
return wantETag, gotETag, versionID, nil return wantETag, gotETag, versionID, nil
} }
@ -5652,25 +5756,18 @@ func (o *Object) uploadSinglepartPresignedRequest(ctx context.Context, req *s3.P
return etag, lastModified, versionID, nil return etag, lastModified, versionID, nil
} }
// Update the Object from in with modTime and size func (o *Object) buildS3Req(ctx context.Context, src fs.ObjectInfo, options []fs.OpenOption) (req *s3.PutObjectInput, md5sumHex string, err error) {
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
if o.fs.opt.VersionAt.IsSet() {
return errNotWithVersionAt
}
bucket, bucketPath := o.split() bucket, bucketPath := o.split()
// Create parent dir/bucket if not saving directory marker // Create parent dir/bucket if not saving directory marker
if !strings.HasSuffix(o.remote, "/") { if !strings.HasSuffix(o.remote, "/") {
err := o.fs.mkdirParent(ctx, o.remote) err := o.fs.mkdirParent(ctx, o.remote)
if err != nil { if err != nil {
return err return nil, "", err
} }
} }
modTime := src.ModTime(ctx) modTime := src.ModTime(ctx)
size := src.Size()
multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff) req = &s3.PutObjectInput{
req := s3.PutObjectInput{
Bucket: &bucket, Bucket: &bucket,
ACL: stringPointerOrNil(o.fs.opt.ACL), ACL: stringPointerOrNil(o.fs.opt.ACL),
Key: &bucketPath, Key: &bucketPath,
@ -5679,7 +5776,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// Fetch metadata if --metadata is in use // Fetch metadata if --metadata is in use
meta, err := fs.GetMetadataOptions(ctx, src, options) meta, err := fs.GetMetadataOptions(ctx, src, options)
if err != nil { if err != nil {
return fmt.Errorf("failed to read metadata from source object: %w", err) return nil, "", fmt.Errorf("failed to read metadata from source object: %w", err)
} }
req.Metadata = make(map[string]*string, len(meta)+2) req.Metadata = make(map[string]*string, len(meta)+2)
// merge metadata into request and user metadata // merge metadata into request and user metadata
@ -5731,7 +5828,8 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
// - for multipart provided checksums aren't disabled // - for multipart provided checksums aren't disabled
// - so we can add the md5sum in the metadata as metaMD5Hash // - so we can add the md5sum in the metadata as metaMD5Hash
var md5sumBase64 string var md5sumBase64 string
var md5sumHex string size := src.Size()
multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
if !multipart || !o.fs.opt.DisableChecksum { if !multipart || !o.fs.opt.DisableChecksum {
md5sumHex, err = src.Hash(ctx, hash.MD5) md5sumHex, err = src.Hash(ctx, hash.MD5)
if err == nil && matchMd5.MatchString(md5sumHex) { if err == nil && matchMd5.MatchString(md5sumHex) {
@ -5749,7 +5847,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
} }
} }
// Set the content type it it isn't set already // Set the content type if it isn't set already
if req.ContentType == nil { if req.ContentType == nil {
req.ContentType = aws.String(fs.MimeType(ctx, src)) req.ContentType = aws.String(fs.MimeType(ctx, src))
} }
@ -5824,17 +5922,36 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
} }
} }
return req, md5sumHex, nil
}
// Update the Object from in with modTime and size
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
if o.fs.opt.VersionAt.IsSet() {
return errNotWithVersionAt
}
size := src.Size()
multipart := size < 0 || size >= int64(o.fs.opt.UploadCutoff)
var wantETag string // Multipart upload Etag to check var wantETag string // Multipart upload Etag to check
var gotETag string // Etag we got from the upload var gotETag string // Etag we got from the upload
var lastModified time.Time // Time we got from the upload var lastModified time.Time // Time we got from the upload
var versionID *string // versionID we got from the upload var versionID *string // versionID we got from the upload
var err error
var md5sumHex string
var req *s3.PutObjectInput
if multipart { if multipart {
wantETag, gotETag, versionID, err = o.uploadMultipart(ctx, &req, size, in) wantETag, gotETag, versionID, err = o.uploadMultipart(ctx, src, in)
} else { } else {
req, md5sumHex, err = o.buildS3Req(ctx, src, options)
if err != nil {
return fmt.Errorf("failed to build s3 request: %v", err)
}
if o.fs.opt.UsePresignedRequest { if o.fs.opt.UsePresignedRequest {
gotETag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, &req, size, in) gotETag, lastModified, versionID, err = o.uploadSinglepartPresignedRequest(ctx, req, size, in)
} else { } else {
gotETag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, &req, size, in) gotETag, lastModified, versionID, err = o.uploadSinglepartPutObject(ctx, req, size, in)
} }
} }
if err != nil { if err != nil {
@ -5854,7 +5971,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if o.fs.opt.NoHead && size >= 0 { if o.fs.opt.NoHead && size >= 0 {
head = new(s3.HeadObjectOutput) head = new(s3.HeadObjectOutput)
//structs.SetFrom(head, &req) //structs.SetFrom(head, &req)
setFrom_s3HeadObjectOutput_s3PutObjectInput(head, &req) setFrom_s3HeadObjectOutput_s3PutObjectInput(head, req)
head.ETag = &md5sumHex // doesn't matter quotes are missing head.ETag = &md5sumHex // doesn't matter quotes are missing
head.ContentLength = &size head.ContentLength = &size
// We get etag back from single and multipart upload so fill it in here // We get etag back from single and multipart upload so fill it in here