diff --git a/backend/s3/s3.go b/backend/s3/s3.go index d68b340ba..351285d31 100644 --- a/backend/s3/s3.go +++ b/backend/s3/s3.go @@ -5316,10 +5316,15 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read var warnStreamUpload sync.Once +// OpenChunkWriter returns the chunk size and a ChunkWriter +// +// Pass in the remote and the src object +// You can also use options to hint at the desired chunk size func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (chunkSizeResult int64, writer fs.ChunkWriter, err error) { + // This duplicates part of the logic in Update, however it is + // required until we migrate the MultiPartUpload to + // OpenChunkWriter/multi-thread op completely. - // This duplicates part of the logic in Update, - //however per my understanding it is required until we migrate the MultiPartUpload to OpenChunkWriter/multi-thread op completely // Temporary Object under construction o := &Object{ fs: f, @@ -5327,7 +5332,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn } req, _, err := o.buildS3Req(ctx, src, options) if err != nil { - return -1, nil, fmt.Errorf("failed to build s3 request: %v", err) + return -1, nil, fmt.Errorf("failed to build s3 request: %w", err) } //structs.SetFrom(&mReq, req) @@ -5357,15 +5362,19 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn chunkSize = chunksize.Calculator(src, size, uploadParts, chunkSize) } - mOut, err := f.c.CreateMultipartUploadWithContext(ctx, &mReq) + var mOut *s3.CreateMultipartUploadOutput + err = f.pacer.Call(func() (bool, error) { + mOut, err = f.c.CreateMultipartUploadWithContext(ctx, &mReq) + return f.shouldRetry(ctx, err) + }) if err != nil { - return -1, nil, fmt.Errorf("CreateMultipartUpload failed: %w", err) + return -1, nil, fmt.Errorf("create multipart upload failed: %w", err) } chunkWriter := &s3ChunkWriter{ ctx: ctx, chunkSize: int64(chunkSize), - size: src.Size(), + size: size, f: f, bucket: mOut.Bucket, key: mOut.Key, @@ -5394,23 +5403,35 @@ type s3ChunkWriter struct { md5s []byte } +// add a part number and etag to the completed parts +func (w *s3ChunkWriter) addCompletedPart(partNum *int64, eTag *string) { + w.completedPartsMu.Lock() + defer w.completedPartsMu.Unlock() + w.completedParts = append(w.completedParts, &s3.CompletedPart{ + PartNumber: partNum, + ETag: eTag, + }) +} + +// addMd5 adds a binary md5 to the md5 calculated so far +func (w *s3ChunkWriter) addMd5(md5binary *[]byte, chunkNumber int64) { + w.md5sMu.Lock() + defer w.md5sMu.Unlock() + start := chunkNumber * md5.Size + end := start + md5.Size + if extend := end - int64(len(w.md5s)); extend > 0 { + w.md5s = append(w.md5s, make([]byte, extend)...) + } + copy(w.md5s[start:end], (*md5binary)[:]) +} + +// WriteChunk will write chunk number with reader bytes, where chunk number >= 0 func (w *s3ChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64, error) { if chunkNumber < 0 { err := fmt.Errorf("invalid chunk number provided: %v", chunkNumber) return -1, err } - addMd5 := func(md5binary *[]byte, chunkNumber int64) { - w.md5sMu.Lock() - defer w.md5sMu.Unlock() - start := chunkNumber * md5.Size - end := start + md5.Size - if extend := end - int64(len(w.md5s)); extend > 0 { - w.md5s = append(w.md5s, make([]byte, extend)...) - } - copy(w.md5s[start:end], (*md5binary)[:]) - } - // create checksum of buffer for integrity checking // currently there is no way to calculate the md5 without reading the chunk a 2nd time (1st read is in uploadMultipart) // possible in AWS SDK v2 with trailers? @@ -5420,7 +5441,7 @@ func (w *s3ChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64 return -1, err } md5sumBinary := m.Sum([]byte{}) - addMd5(&md5sumBinary, int64(chunkNumber)) + w.addMd5(&md5sumBinary, int64(chunkNumber)) md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:]) // reset the reader after we calculated the md5 @@ -5444,56 +5465,67 @@ func (w *s3ChunkWriter) WriteChunk(chunkNumber int, reader io.ReadSeeker) (int64 SSECustomerKey: w.multiPartUploadInput.SSECustomerKey, SSECustomerKeyMD5: w.multiPartUploadInput.SSECustomerKeyMD5, } - uout, err := w.f.c.UploadPartWithContext(w.ctx, uploadPartReq) + var uout *s3.UploadPartOutput + err = w.f.pacer.Call(func() (bool, error) { + uout, err = w.f.c.UploadPartWithContext(w.ctx, uploadPartReq) + if err != nil { + if chunkNumber <= 8 { + return w.f.shouldRetry(w.ctx, err) + } + // retry all chunks once have done the first few + return true, err + } + return false, nil + }) if err != nil { - fs.Errorf(w.f, "Failed to upload part: %v", err) - return -1, err + return -1, fmt.Errorf("failed to upload chunk %d with %v bytes: %w", chunkNumber+1, currentChunkSize, err) } - addCompletedPart := func(partNum *int64, eTag *string) { - w.completedPartsMu.Lock() - defer w.completedPartsMu.Unlock() - w.completedParts = append(w.completedParts, &s3.CompletedPart{ - PartNumber: partNum, - ETag: uout.ETag, - }) - } - addCompletedPart(s3PartNumber, uout.ETag) + w.addCompletedPart(s3PartNumber, uout.ETag) fs.Debugf(w.f, "multipart upload wrote chunk %d with %v bytes and etag %v", chunkNumber+1, currentChunkSize, *uout.ETag) return currentChunkSize, err } +// Abort the multpart upload func (w *s3ChunkWriter) Abort() error { - _, err := w.f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{ - Bucket: w.bucket, - Key: w.key, - UploadId: w.uploadId, - RequestPayer: w.multiPartUploadInput.RequestPayer, + err := w.f.pacer.Call(func() (bool, error) { + _, err := w.f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{ + Bucket: w.bucket, + Key: w.key, + UploadId: w.uploadId, + RequestPayer: w.multiPartUploadInput.RequestPayer, + }) + return w.f.shouldRetry(w.ctx, err) }) if err != nil { - fs.Errorf(w.f, "Failed to abort multipart upload: %v", err) + return fmt.Errorf("failed to abort multipart upload %q: %w", *w.uploadId, err) } - fs.Debugf(w.f, "multipart upload '%v' aborted", *w.uploadId) + fs.Debugf(w.f, "multipart upload %q aborted", *w.uploadId) return err } -func (w *s3ChunkWriter) Close() error { +// Close and finalise the multipart upload +func (w *s3ChunkWriter) Close() (err error) { // sort the completed parts by part number sort.Slice(w.completedParts, func(i, j int) bool { return *w.completedParts[i].PartNumber < *w.completedParts[j].PartNumber }) - resp, err := w.f.c.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{ - Bucket: w.bucket, - Key: w.key, - MultipartUpload: &s3.CompletedMultipartUpload{ - Parts: w.completedParts, - }, - RequestPayer: w.multiPartUploadInput.RequestPayer, - UploadId: w.uploadId, + var resp *s3.CompleteMultipartUploadOutput + err = w.f.pacer.Call(func() (bool, error) { + resp, err = w.f.c.CompleteMultipartUploadWithContext(w.ctx, &s3.CompleteMultipartUploadInput{ + Bucket: w.bucket, + Key: w.key, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: w.completedParts, + }, + RequestPayer: w.multiPartUploadInput.RequestPayer, + UploadId: w.uploadId, + }) + return w.f.shouldRetry(w.ctx, err) }) if err != nil { - fs.Errorf(w.f, "Failed to complete multipart upload: %v", err) + return fmt.Errorf("failed to complete multipart upload %q: %w", *w.uploadId, err) } if resp != nil { if resp.ETag != nil { @@ -5503,7 +5535,7 @@ func (w *s3ChunkWriter) Close() error { w.versionID = *resp.VersionId } } - fs.Debugf(w.f, "multipart upload '%v' closed", *w.uploadId) + fs.Debugf(w.f, "multipart upload %q closed", *w.uploadId) return err } @@ -5516,15 +5548,8 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R concurrency = 1 } tokens := pacer.NewTokenDispenser(concurrency) - openChunkWriter := f.Features().OpenChunkWriter - var chunkWriter fs.ChunkWriter - var chunkSize int64 - err = f.pacer.Call(func() (bool, error) { - var err error - chunkSize, chunkWriter, err = openChunkWriter(ctx, src.Remote(), src) - return f.shouldRetry(ctx, err) - }) + chunkSize, chunkWriter, err := f.OpenChunkWriter(ctx, src.Remote(), src) if err != nil { return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to initialise: %w", err) } @@ -5536,10 +5561,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R return } fs.Debugf(o, "Cancelling multipart upload") - errCancel := f.pacer.Call(func() (bool, error) { - err := chunkWriter.Abort() - return f.shouldRetry(ctx, err) - }) + errCancel := chunkWriter.Abort() if errCancel != nil { fs.Debugf(o, "Failed to cancel multipart upload: %v", errCancel) } @@ -5589,21 +5611,8 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R off += int64(n) g.Go(func() (err error) { defer free() - err = f.pacer.Call(func() (bool, error) { - _, err := chunkWriter.WriteChunk(int(partNum), bytes.NewReader(buf)) - if err != nil { - if partNum <= int64(concurrency) { - return f.shouldRetry(gCtx, err) - } - // retry all chunks once have done the first batch - return true, err - } - return false, nil - }) - if err != nil { - return fmt.Errorf("multipart upload failed to upload part: %w", err) - } - return nil + _, err = chunkWriter.WriteChunk(int(partNum), bytes.NewReader(buf)) + return err }) } err = g.Wait() @@ -5611,10 +5620,7 @@ func (o *Object) uploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.R return wantETag, gotETag, nil, err } - err = f.pacer.Call(func() (bool, error) { - err := chunkWriter.Close() - return f.shouldRetry(uploadCtx, err) - }) + err = chunkWriter.Close() if err != nil { return wantETag, gotETag, nil, fmt.Errorf("multipart upload failed to finalise: %w", err) } @@ -5945,7 +5951,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } else { req, md5sumHex, err = o.buildS3Req(ctx, src, options) if err != nil { - return fmt.Errorf("failed to build s3 request: %v", err) + return fmt.Errorf("failed to build s3 request: %w", err) } if o.fs.opt.UsePresignedRequest {