s3: preserve metadata when doing multipart copy

Before this change the s3 multipart server side copy was not
preserving the metadata of the object. This was most noticeable
because the modtime was not preserved.

This change fetches the metadata from the object before starting the
copy and overwrites it if requires.

It will also mean any other metadata is preserved.

See: https://forum.rclone.org/t/copying-files-within-a-b2-bucket/16680/70
This commit is contained in:
Nick Craig-Wood 2020-07-30 10:52:32 +01:00
parent 70c8566cb8
commit b7dd3ce608

View file

@ -1967,7 +1967,7 @@ func pathEscape(s string) string {
// //
// It adds the boiler plate to the req passed in and calls the s3 // It adds the boiler plate to the req passed in and calls the s3
// method // method
func (f *Fs) copy(ctx context.Context, req *s3.CopyObjectInput, dstBucket, dstPath, srcBucket, srcPath string, srcSize int64) error { func (f *Fs) copy(ctx context.Context, req *s3.CopyObjectInput, dstBucket, dstPath, srcBucket, srcPath string, src *Object) error {
req.Bucket = &dstBucket req.Bucket = &dstBucket
req.ACL = &f.opt.ACL req.ACL = &f.opt.ACL
req.Key = &dstPath req.Key = &dstPath
@ -1983,8 +1983,8 @@ func (f *Fs) copy(ctx context.Context, req *s3.CopyObjectInput, dstBucket, dstPa
req.StorageClass = &f.opt.StorageClass req.StorageClass = &f.opt.StorageClass
} }
if srcSize >= int64(f.opt.CopyCutoff) { if src.bytes >= int64(f.opt.CopyCutoff) {
return f.copyMultipart(ctx, req, dstBucket, dstPath, srcBucket, srcPath, srcSize) return f.copyMultipart(ctx, req, dstBucket, dstPath, srcBucket, srcPath, src)
} }
return f.pacer.Call(func() (bool, error) { return f.pacer.Call(func() (bool, error) {
_, err := f.c.CopyObjectWithContext(ctx, req) _, err := f.c.CopyObjectWithContext(ctx, req)
@ -2005,14 +2005,33 @@ func calculateRange(partSize, partIndex, numParts, totalSize int64) string {
return fmt.Sprintf("bytes=%v-%v", start, ends) return fmt.Sprintf("bytes=%v-%v", start, ends)
} }
func (f *Fs) copyMultipart(ctx context.Context, req *s3.CopyObjectInput, dstBucket, dstPath, srcBucket, srcPath string, srcSize int64) (err error) { func (f *Fs) copyMultipart(ctx context.Context, copyReq *s3.CopyObjectInput, dstBucket, dstPath, srcBucket, srcPath string, src *Object) (err error) {
info, err := src.headObject(ctx)
if err != nil {
return err
}
req := &s3.CreateMultipartUploadInput{}
// Fill in the request from the head info
structs.SetFrom(req, info)
// If copy metadata was set then set the Metadata to that read
// from the head request
if aws.StringValue(copyReq.MetadataDirective) == s3.MetadataDirectiveCopy {
copyReq.Metadata = info.Metadata
}
// Overwrite any from the copyReq
structs.SetFrom(req, copyReq)
req.Bucket = &dstBucket
req.Key = &dstPath
var cout *s3.CreateMultipartUploadOutput var cout *s3.CreateMultipartUploadOutput
if err := f.pacer.Call(func() (bool, error) { if err := f.pacer.Call(func() (bool, error) {
var err error var err error
cout, err = f.c.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ cout, err = f.c.CreateMultipartUploadWithContext(ctx, req)
Bucket: &dstBucket,
Key: &dstPath,
})
return f.shouldRetry(err) return f.shouldRetry(err)
}); err != nil { }); err != nil {
return err return err
@ -2021,7 +2040,7 @@ func (f *Fs) copyMultipart(ctx context.Context, req *s3.CopyObjectInput, dstBuck
defer atexit.OnError(&err, func() { defer atexit.OnError(&err, func() {
// Try to abort the upload, but ignore the error. // Try to abort the upload, but ignore the error.
fs.Debugf(nil, "Cancelling multipart copy") fs.Debugf(src, "Cancelling multipart copy")
_ = f.pacer.Call(func() (bool, error) { _ = f.pacer.Call(func() (bool, error) {
_, err := f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{ _, err := f.c.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: &dstBucket, Bucket: &dstBucket,
@ -2033,33 +2052,23 @@ func (f *Fs) copyMultipart(ctx context.Context, req *s3.CopyObjectInput, dstBuck
}) })
})() })()
srcSize := src.bytes
partSize := int64(f.opt.CopyCutoff) partSize := int64(f.opt.CopyCutoff)
numParts := (srcSize-1)/partSize + 1 numParts := (srcSize-1)/partSize + 1
fs.Debugf(src, "Starting multipart copy with %d parts", numParts)
var parts []*s3.CompletedPart var parts []*s3.CompletedPart
for partNum := int64(1); partNum <= numParts; partNum++ { for partNum := int64(1); partNum <= numParts; partNum++ {
if err := f.pacer.Call(func() (bool, error) { if err := f.pacer.Call(func() (bool, error) {
partNum := partNum partNum := partNum
uploadPartReq := &s3.UploadPartCopyInput{ uploadPartReq := &s3.UploadPartCopyInput{}
Bucket: &dstBucket, structs.SetFrom(uploadPartReq, copyReq)
Key: &dstPath, uploadPartReq.Bucket = &dstBucket
PartNumber: &partNum, uploadPartReq.Key = &dstPath
UploadId: uid, uploadPartReq.PartNumber = &partNum
CopySourceRange: aws.String(calculateRange(partSize, partNum-1, numParts, srcSize)), uploadPartReq.UploadId = uid
// Args copy from req uploadPartReq.CopySourceRange = aws.String(calculateRange(partSize, partNum-1, numParts, srcSize))
CopySource: req.CopySource,
CopySourceIfMatch: req.CopySourceIfMatch,
CopySourceIfModifiedSince: req.CopySourceIfModifiedSince,
CopySourceIfNoneMatch: req.CopySourceIfNoneMatch,
CopySourceIfUnmodifiedSince: req.CopySourceIfUnmodifiedSince,
CopySourceSSECustomerAlgorithm: req.CopySourceSSECustomerAlgorithm,
CopySourceSSECustomerKey: req.CopySourceSSECustomerKey,
CopySourceSSECustomerKeyMD5: req.CopySourceSSECustomerKeyMD5,
RequestPayer: req.RequestPayer,
SSECustomerAlgorithm: req.SSECustomerAlgorithm,
SSECustomerKey: req.SSECustomerKey,
SSECustomerKeyMD5: req.SSECustomerKeyMD5,
}
uout, err := f.c.UploadPartCopyWithContext(ctx, uploadPartReq) uout, err := f.c.UploadPartCopyWithContext(ctx, uploadPartReq)
if err != nil { if err != nil {
return f.shouldRetry(err) return f.shouldRetry(err)
@ -2112,7 +2121,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
req := s3.CopyObjectInput{ req := s3.CopyObjectInput{
MetadataDirective: aws.String(s3.MetadataDirectiveCopy), MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
} }
err = f.copy(ctx, &req, dstBucket, dstPath, srcBucket, srcPath, srcObj.Size()) err = f.copy(ctx, &req, dstBucket, dstPath, srcBucket, srcPath, srcObj)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2509,19 +2518,12 @@ func (o *Object) Size() int64 {
return o.bytes return o.bytes
} }
// readMetaData gets the metadata if it hasn't already been fetched func (o *Object) headObject(ctx context.Context) (resp *s3.HeadObjectOutput, err error) {
//
// it also sets the info
func (o *Object) readMetaData(ctx context.Context) (err error) {
if o.meta != nil {
return nil
}
bucket, bucketPath := o.split() bucket, bucketPath := o.split()
req := s3.HeadObjectInput{ req := s3.HeadObjectInput{
Bucket: &bucket, Bucket: &bucket,
Key: &bucketPath, Key: &bucketPath,
} }
var resp *s3.HeadObjectOutput
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
var err error var err error
resp, err = o.fs.c.HeadObjectWithContext(ctx, &req) resp, err = o.fs.c.HeadObjectWithContext(ctx, &req)
@ -2530,12 +2532,26 @@ func (o *Object) readMetaData(ctx context.Context) (err error) {
if err != nil { if err != nil {
if awsErr, ok := err.(awserr.RequestFailure); ok { if awsErr, ok := err.(awserr.RequestFailure); ok {
if awsErr.StatusCode() == http.StatusNotFound { if awsErr.StatusCode() == http.StatusNotFound {
return fs.ErrorObjectNotFound return nil, fs.ErrorObjectNotFound
} }
} }
return err return nil, err
} }
o.fs.cache.MarkOK(bucket) o.fs.cache.MarkOK(bucket)
return resp, nil
}
// readMetaData gets the metadata if it hasn't already been fetched
//
// it also sets the info
func (o *Object) readMetaData(ctx context.Context) (err error) {
if o.meta != nil {
return nil
}
resp, err := o.headObject(ctx)
if err != nil {
return err
}
var size int64 var size int64
// Ignore missing Content-Length assuming it is 0 // Ignore missing Content-Length assuming it is 0
// Some versions of ceph do this due their apache proxies // Some versions of ceph do this due their apache proxies
@ -2606,7 +2622,7 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
Metadata: o.meta, Metadata: o.meta,
MetadataDirective: aws.String(s3.MetadataDirectiveReplace), // replace metadata with that passed in MetadataDirective: aws.String(s3.MetadataDirectiveReplace), // replace metadata with that passed in
} }
return o.fs.copy(ctx, &req, bucket, bucketPath, bucket, bucketPath, o.bytes) return o.fs.copy(ctx, &req, bucket, bucketPath, bucket, bucketPath, o)
} }
// Storable raturns a boolean indicating if this object is storable // Storable raturns a boolean indicating if this object is storable
@ -3046,7 +3062,7 @@ func (o *Object) SetTier(tier string) (err error) {
MetadataDirective: aws.String(s3.MetadataDirectiveCopy), MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
StorageClass: aws.String(tier), StorageClass: aws.String(tier),
} }
err = o.fs.copy(ctx, &req, bucket, bucketPath, bucket, bucketPath, o.bytes) err = o.fs.copy(ctx, &req, bucket, bucketPath, bucket, bucketPath, o)
if err != nil { if err != nil {
return err return err
} }