s3: Use the ETag on multipart transfers to verify the transfer was OK
Before this rclone ignored the ETag on multipart uploads which missed an opportunity for a whole file integrity check. This adds that check which means that we now check even harder that multipart uploads have arrived properly. See #5993
This commit is contained in:
parent
06ecc6511b
commit
8f164e4df5
1 changed files with 38 additions and 14 deletions
|
@ -3244,9 +3244,6 @@ func (o *Object) readMetaData(ctx context.Context) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.LastModified == nil {
|
||||
fs.Logf(o, "Failed to read last modified from HEAD: %v", err)
|
||||
}
|
||||
o.setMetaData(resp.ETag, resp.ContentLength, resp.LastModified, resp.Metadata, resp.ContentType, resp.StorageClass)
|
||||
return nil
|
||||
}
|
||||
|
@ -3276,6 +3273,7 @@ func (o *Object) setMetaData(etag *string, contentLength *int64, lastModified *t
|
|||
o.storageClass = aws.StringValue(storageClass)
|
||||
if lastModified == nil {
|
||||
o.lastModified = time.Now()
|
||||
fs.Logf(o, "Failed to read last modified")
|
||||
} else {
|
||||
o.lastModified = *lastModified
|
||||
}
|
||||
|
@ -3447,9 +3445,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.LastModified == nil {
|
||||
fs.Logf(o, "Failed to read last modified: %v", err)
|
||||
}
|
||||
|
||||
// read size from ContentLength or ContentRange
|
||||
size := resp.ContentLength
|
||||
if resp.ContentRange != nil {
|
||||
|
@ -3472,7 +3468,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
|||
|
||||
var warnStreamUpload sync.Once
|
||||
|
||||
func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (err error) {
|
||||
func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (etag string, err error) {
|
||||
f := o.fs
|
||||
|
||||
// make concurrency machinery
|
||||
|
@ -3519,7 +3515,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("multipart upload failed to initialise: %w", err)
|
||||
return etag, fmt.Errorf("multipart upload failed to initialise: %w", err)
|
||||
}
|
||||
uid := cout.UploadId
|
||||
|
||||
|
@ -3548,8 +3544,21 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
partsMu sync.Mutex // to protect parts
|
||||
parts []*s3.CompletedPart
|
||||
off int64
|
||||
md5sMu sync.Mutex
|
||||
md5s []byte
|
||||
)
|
||||
|
||||
addMd5 := func(md5binary *[md5.Size]byte, partNum int64) {
|
||||
md5sMu.Lock()
|
||||
defer md5sMu.Unlock()
|
||||
start := partNum * md5.Size
|
||||
end := start + md5.Size
|
||||
if extend := end - int64(len(md5s)); extend > 0 {
|
||||
md5s = append(md5s, make([]byte, extend)...)
|
||||
}
|
||||
copy(md5s[start:end], (*md5binary)[:])
|
||||
}
|
||||
|
||||
for partNum := int64(1); !finished; partNum++ {
|
||||
// Get a block of memory from the pool and token which limits concurrency.
|
||||
tokens.Get()
|
||||
|
@ -3579,7 +3588,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
finished = true
|
||||
} else if err != nil {
|
||||
free()
|
||||
return fmt.Errorf("multipart upload failed to read source: %w", err)
|
||||
return etag, fmt.Errorf("multipart upload failed to read source: %w", err)
|
||||
}
|
||||
buf = buf[:n]
|
||||
|
||||
|
@ -3592,6 +3601,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
|
||||
// create checksum of buffer for integrity checking
|
||||
md5sumBinary := md5.Sum(buf)
|
||||
addMd5(&md5sumBinary, partNum-1)
|
||||
md5sum := base64.StdEncoding.EncodeToString(md5sumBinary[:])
|
||||
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
|
@ -3633,7 +3643,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
}
|
||||
err = g.Wait()
|
||||
if err != nil {
|
||||
return err
|
||||
return etag, err
|
||||
}
|
||||
|
||||
// sort the completed parts by part number
|
||||
|
@ -3654,9 +3664,11 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
|
|||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("multipart upload failed to finalise: %w", err)
|
||||
return etag, fmt.Errorf("multipart upload failed to finalise: %w", err)
|
||||
}
|
||||
return nil
|
||||
hashOfHashes := md5.Sum(md5s)
|
||||
etag = fmt.Sprintf("%s-%d", hex.EncodeToString(hashOfHashes[:]), len(parts))
|
||||
return etag, nil
|
||||
}
|
||||
|
||||
// Update the Object from in with modTime and size
|
||||
|
@ -3765,8 +3777,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
}
|
||||
|
||||
var resp *http.Response // response from PUT
|
||||
var wantETag string // Multipart upload Etag to check
|
||||
if multipart {
|
||||
err = o.uploadMultipart(ctx, &req, size, in)
|
||||
wantETag, err = o.uploadMultipart(ctx, &req, size, in)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3846,7 +3859,18 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
|
||||
// Read the metadata from the newly created object
|
||||
o.meta = nil // wipe old metadata
|
||||
err = o.readMetaData(ctx)
|
||||
head, err := o.headObject(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.setMetaData(head.ETag, head.ContentLength, head.LastModified, head.Metadata, head.ContentType, head.StorageClass)
|
||||
if !o.fs.etagIsNotMD5 && wantETag != "" && head.ETag != nil && *head.ETag != "" {
|
||||
gotETag := strings.Trim(strings.ToLower(*head.ETag), `"`)
|
||||
if wantETag != gotETag {
|
||||
return fmt.Errorf("multipart upload corrupted: Etag differ: expecting %s but got %s", wantETag, gotETag)
|
||||
}
|
||||
fs.Debugf(o, "Multipart upload Etag: %s OK", wantETag)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue