Merge pull request #4066 from milosgajdos/optimise-s3-push

Optimise push in S3 driver
This commit is contained in:
Milos Gajdos 2023-09-29 13:47:20 +01:00 committed by GitHub
commit 735c161b53
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -25,6 +25,7 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@ -48,6 +49,7 @@ const driverName = "s3aws"
const minChunkSize = 5 << 20 const minChunkSize = 5 << 20
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3. // maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
// S3 API requires max upload chunk to be 5GB.
const maxChunkSize = 5 << 30 const maxChunkSize = 5 << 30
const defaultChunkSize = 2 * minChunkSize const defaultChunkSize = 2 * minChunkSize
@ -166,6 +168,7 @@ type driver struct {
RootDirectory string RootDirectory string
StorageClass string StorageClass string
ObjectACL string ObjectACL string
pool *sync.Pool
} }
type baseEmbed struct { type baseEmbed struct {
@ -580,6 +583,13 @@ func New(params DriverParameters) (*Driver, error) {
RootDirectory: params.RootDirectory, RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass, StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL, ObjectACL: params.ObjectACL,
pool: &sync.Pool{
New: func() interface{} {
return &buffer{
data: make([]byte, 0, params.ChunkSize),
}
},
},
} }
return &Driver{ return &Driver{
@ -1251,6 +1261,49 @@ func (d *driver) getStorageClass() *string {
return aws.String(d.StorageClass) return aws.String(d.StorageClass)
} }
// buffer is a static size bytes buffer.
type buffer struct {
data []byte
}
// NewBuffer returns a new bytes buffer from driver's memory pool.
// The size of the buffer is static and set to params.ChunkSize.
func (d *driver) NewBuffer() *buffer {
return d.pool.Get().(*buffer)
}
// ReadFrom reads as much data as it can fit in from r without growing its size.
// It returns the number of bytes successfully read from r or error.
func (b *buffer) ReadFrom(r io.Reader) (offset int64, err error) {
for len(b.data) < cap(b.data) && err == nil {
var n int
n, err = r.Read(b.data[len(b.data):cap(b.data)])
offset += int64(n)
b.data = b.data[:len(b.data)+n]
}
// NOTE(milosgajdos): io.ReaderFrom "swallows" io.EOF
// See: https://pkg.go.dev/io#ReaderFrom
if err == io.EOF {
err = nil
}
return offset, err
}
// Cap returns the capacity of the buffer's underlying byte slice.
func (b *buffer) Cap() int {
return cap(b.data)
}
// Len returns the length of the data in the buffer
func (b *buffer) Len() int {
return len(b.data)
}
// Clear the buffer data.
func (b *buffer) Clear() {
b.data = b.data[:0]
}
// writer attempts to upload parts to S3 in a buffered fashion where the last // writer attempts to upload parts to S3 in a buffered fashion where the last
// part is at least as large as the chunksize, so the multipart upload could be // part is at least as large as the chunksize, so the multipart upload could be
// cleanly resumed in the future. This is violated if Close is called after less // cleanly resumed in the future. This is violated if Close is called after less
@ -1261,8 +1314,8 @@ type writer struct {
uploadID string uploadID string
parts []*s3.Part parts []*s3.Part
size int64 size int64
readyPart []byte ready *buffer
pendingPart []byte pending *buffer
closed bool closed bool
committed bool committed bool
cancelled bool cancelled bool
@ -1279,6 +1332,8 @@ func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver
uploadID: uploadID, uploadID: uploadID,
parts: parts, parts: parts,
size: size, size: size,
ready: d.NewBuffer(),
pending: d.NewBuffer(),
} }
} }
@ -1300,12 +1355,12 @@ func (w *writer) Write(p []byte) (int, error) {
// If the last written part is smaller than minChunkSize, we need to make a // If the last written part is smaller than minChunkSize, we need to make a
// new multipart upload :sadface: // new multipart upload :sadface:
if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize { if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize {
var completedUploadedParts completedParts completedUploadedParts := make(completedParts, len(w.parts))
for _, part := range w.parts { for i, part := range w.parts {
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{ completedUploadedParts[i] = &s3.CompletedPart{
ETag: part.ETag, ETag: part.ETag,
PartNumber: part.PartNumber, PartNumber: part.PartNumber,
}) }
} }
sort.Sort(completedUploadedParts) sort.Sort(completedUploadedParts)
@ -1319,11 +1374,13 @@ func (w *writer) Write(p []byte) (int, error) {
}, },
}) })
if err != nil { if err != nil {
w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key), Key: aws.String(w.key),
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
}) }); aErr != nil {
return 0, errors.Join(err, aErr)
}
return 0, err return 0, err
} }
@ -1351,11 +1408,18 @@ func (w *writer) Write(p []byte) (int, error) {
return 0, err return 0, err
} }
defer resp.Body.Close() defer resp.Body.Close()
// reset uploaded parts
w.parts = nil w.parts = nil
w.readyPart, err = io.ReadAll(resp.Body) w.ready.Clear()
n, err := w.ready.ReadFrom(resp.Body)
if err != nil { if err != nil {
return 0, err return 0, err
} }
if resp.ContentLength != nil && n < *resp.ContentLength {
return 0, io.ErrShortBuffer
}
} else { } else {
// Otherwise we can use the old file as the new first part // Otherwise we can use the old file as the new first part
copyPartResp, err := w.driver.S3.UploadPartCopy(&s3.UploadPartCopyInput{ copyPartResp, err := w.driver.S3.UploadPartCopy(&s3.UploadPartCopyInput{
@ -1380,51 +1444,60 @@ func (w *writer) Write(p []byte) (int, error) {
var n int var n int
for len(p) > 0 { defer func() { w.size += int64(n) }()
// If no parts are ready to write, fill up the first part
if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 { reader := bytes.NewReader(p)
if len(p) >= neededBytes {
w.readyPart = append(w.readyPart, p[:neededBytes]...) for reader.Len() > 0 {
n += neededBytes // NOTE(milosgajdos): we do some seemingly unsafe conversions
p = p[neededBytes:] // from int64 to int in this for loop. These are fine as the
} else { // offset returned from buffer.ReadFrom can only ever be
w.readyPart = append(w.readyPart, p...) // maxChunkSize large which fits in to int. The reason why
n += len(p) // we return int64 is to play nice with Go interfaces where
p = nil // the buffer implements io.ReaderFrom interface.
// fill up the ready parts buffer
offset, err := w.ready.ReadFrom(reader)
n += int(offset)
if err != nil {
return n, err
}
// try filling up the pending parts buffer
offset, err = w.pending.ReadFrom(reader)
n += int(offset)
if err != nil {
return n, err
}
// we filled up pending buffer, flush
if w.pending.Len() == w.pending.Cap() {
if err := w.flush(); err != nil {
return n, err
}
} }
} }
if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 {
if len(p) >= neededBytes {
w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
n += neededBytes
p = p[neededBytes:]
err := w.flushPart()
if err != nil {
w.size += int64(n)
return n, err
}
} else {
w.pendingPart = append(w.pendingPart, p...)
n += len(p)
p = nil
}
}
}
w.size += int64(n)
return n, nil return n, nil
} }
func (w *writer) Size() int64 { func (w *writer) Size() int64 {
return w.size return w.size
} }
func (w *writer) Close() error { func (w *writer) Close() error {
if w.closed { if w.closed {
return fmt.Errorf("already closed") return fmt.Errorf("already closed")
} }
w.closed = true w.closed = true
return w.flushPart()
defer func() {
w.ready.Clear()
w.driver.pool.Put(w.ready)
w.pending.Clear()
w.driver.pool.Put(w.pending)
}()
return w.flush()
} }
func (w *writer) Cancel(ctx context.Context) error { func (w *writer) Cancel(ctx context.Context) error {
@ -1450,25 +1523,28 @@ func (w *writer) Commit() error {
} else if w.cancelled { } else if w.cancelled {
return fmt.Errorf("already cancelled") return fmt.Errorf("already cancelled")
} }
err := w.flushPart()
err := w.flush()
if err != nil { if err != nil {
return err return err
} }
w.committed = true w.committed = true
completedUploadedParts := make(completedParts, 0, len(w.parts)) completedUploadedParts := make(completedParts, len(w.parts))
for _, part := range w.parts { for i, part := range w.parts {
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{ completedUploadedParts[i] = &s3.CompletedPart{
ETag: part.ETag, ETag: part.ETag,
PartNumber: part.PartNumber, PartNumber: part.PartNumber,
}) }
} }
// This is an edge case when we are trying to upload an empty chunk of data using // This is an edge case when we are trying to upload an empty file as part of
// a MultiPart upload. As a result we are trying to complete the MultipartUpload // the MultiPart upload. We get a PUT with Content-Length: 0 and sad things happen.
// with an empty slice of `completedUploadedParts` which will always lead to 400 // The result is we are trying to Complete MultipartUpload with an empty list of
// being returned from S3 See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload // completedUploadedParts which will always lead to 400 being returned from S3
// Solution: we upload an empty i.e. 0 byte part as a single part and then append it // See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload
// Solution: we upload the empty i.e. 0 byte part as a single part and then append it
// to the completedUploadedParts slice used to complete the Multipart upload. // to the completedUploadedParts slice used to complete the Multipart upload.
if len(w.parts) == 0 { if len(w.parts) == 0 {
resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{ resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{
@ -1499,47 +1575,56 @@ func (w *writer) Commit() error {
}, },
}) })
if err != nil { if err != nil {
w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key), Key: aws.String(w.key),
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
}) }); aErr != nil {
return errors.Join(err, aErr)
}
return err return err
} }
return nil return nil
} }
// flushPart flushes buffers to write a part to S3. // flush flushes all buffers to write a part to S3.
// Only called by Write (with both buffers full) and Close/Commit (always) // flush is only called by Write (with both buffers full) and Close/Commit (always)
func (w *writer) flushPart() error { func (w *writer) flush() error {
if len(w.readyPart) == 0 && len(w.pendingPart) == 0 { if w.ready.Len() == 0 && w.pending.Len() == 0 {
// nothing to write
return nil return nil
} }
if w.driver.MultipartCombineSmallPart && len(w.pendingPart) < int(w.driver.ChunkSize) {
// closing with a small pending part buf := bytes.NewBuffer(w.ready.data)
// combine ready and pending to avoid writing a small part if w.driver.MultipartCombineSmallPart && (w.pending.Len() > 0 && w.pending.Len() < int(w.driver.ChunkSize)) {
w.readyPart = append(w.readyPart, w.pendingPart...) if _, err := buf.Write(w.pending.data); err != nil {
w.pendingPart = nil return err
}
w.pending.Clear()
} }
partSize := buf.Len()
partNumber := aws.Int64(int64(len(w.parts) + 1)) partNumber := aws.Int64(int64(len(w.parts) + 1))
resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{ resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key), Key: aws.String(w.key),
PartNumber: partNumber, PartNumber: partNumber,
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
Body: bytes.NewReader(w.readyPart), Body: bytes.NewReader(buf.Bytes()),
}) })
if err != nil { if err != nil {
return err return err
} }
w.parts = append(w.parts, &s3.Part{ w.parts = append(w.parts, &s3.Part{
ETag: resp.ETag, ETag: resp.ETag,
PartNumber: partNumber, PartNumber: partNumber,
Size: aws.Int64(int64(len(w.readyPart))), Size: aws.Int64(int64(partSize)),
}) })
w.readyPart = w.pendingPart
w.pendingPart = nil // reset the flushed buffer and swap buffers
w.ready.Clear()
w.ready, w.pending = w.pending, w.ready
return nil return nil
} }