s3: don't leak memory or tokens in edge cases for multipart upload

This commit is contained in:
Nick Craig-Wood 2020-05-14 07:48:18 +01:00
parent 52b7337d28
commit 8a58e0235d

View file

@ -2253,9 +2253,16 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
tokens.Get() tokens.Get()
buf := memPool.Get() buf := memPool.Get()
free := func() {
// return the memory and token
memPool.Put(buf)
tokens.Put()
}
// Fail fast, in case an errgroup managed function returns an error // Fail fast, in case an errgroup managed function returns an error
// gCtx is cancelled. There is no point in uploading all the other parts. // gCtx is cancelled. There is no point in uploading all the other parts.
if gCtx.Err() != nil { if gCtx.Err() != nil {
free()
break break
} }
@ -2264,10 +2271,12 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
n, err = readers.ReadFill(in, buf) // this can never return 0, nil n, err = readers.ReadFill(in, buf) // this can never return 0, nil
if err == io.EOF { if err == io.EOF {
if n == 0 && partNum != 1 { // end if no data and if not first chunk if n == 0 && partNum != 1 { // end if no data and if not first chunk
free()
break break
} }
finished = true finished = true
} else if err != nil { } else if err != nil {
free()
return errors.Wrap(err, "multipart upload failed to read source") return errors.Wrap(err, "multipart upload failed to read source")
} }
buf = buf[:n] buf = buf[:n]
@ -2276,6 +2285,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(size)) fs.Debugf(o, "multipart upload starting chunk %d size %v offset %v/%v", partNum, fs.SizeSuffix(n), fs.SizeSuffix(off), fs.SizeSuffix(size))
off += int64(n) off += int64(n)
g.Go(func() (err error) { g.Go(func() (err error) {
defer free()
partLength := int64(len(buf)) partLength := int64(len(buf))
// create checksum of buffer for integrity checking // create checksum of buffer for integrity checking
@ -2313,11 +2323,6 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
return false, nil return false, nil
}) })
// return the memory and token
memPool.Put(buf)
tokens.Put()
if err != nil { if err != nil {
return errors.Wrap(err, "multipart upload failed to upload part") return errors.Wrap(err, "multipart upload failed to upload part")
} }