Optimise push in S3 driver

This commit cleans up and attempts to optimise the performance of image push in S3 driver.
There are 2 main changes:
* we refactor the S3 driver Writer where instead of using separate bytes
  slices for ready and pending parts which get constantly appended data
  into them causing unnecessary allocations we use optimised bytes
  buffers; we make sure these are used efficiently when written to.
* we introduce a memory pool that is used for allocating the byte
  buffers introduced above

These changes should alleviate high memory pressure on the push path to S3.

Co-authored-by: Cory Snider <corhere@gmail.com>
Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
This commit is contained in:
Milos Gajdos 2023-09-22 14:53:47 +01:00
parent 8d12329a8f
commit b888b14b39
No known key found for this signature in database
GPG key ID: 01300E5E6D417439

View file

@ -25,6 +25,7 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
@ -48,6 +49,7 @@ const driverName = "s3aws"
const minChunkSize = 5 << 20 const minChunkSize = 5 << 20
// maxChunkSize defines the maximum multipart upload chunk size allowed by S3. // maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
// S3 API requires max upload chunk to be 5GB.
const maxChunkSize = 5 << 30 const maxChunkSize = 5 << 30
const defaultChunkSize = 2 * minChunkSize const defaultChunkSize = 2 * minChunkSize
@ -166,6 +168,7 @@ type driver struct {
RootDirectory string RootDirectory string
StorageClass string StorageClass string
ObjectACL string ObjectACL string
pool *sync.Pool
} }
type baseEmbed struct { type baseEmbed struct {
@ -580,6 +583,13 @@ func New(params DriverParameters) (*Driver, error) {
RootDirectory: params.RootDirectory, RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass, StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL, ObjectACL: params.ObjectACL,
pool: &sync.Pool{
New: func() interface{} {
return &buffer{
data: make([]byte, 0, params.ChunkSize),
}
},
},
} }
return &Driver{ return &Driver{
@ -1251,21 +1261,70 @@ func (d *driver) getStorageClass() *string {
return aws.String(d.StorageClass) return aws.String(d.StorageClass)
} }
type completedParts []*s3.CompletedPart
func (a completedParts) Len() int { return len(a) }
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
// buffer is a static size bytes buffer.
type buffer struct {
data []byte
}
// NewBuffer returns a new bytes buffer from driver's memory pool.
// The size of the buffer is static and set to params.ChunkSize.
func (d *driver) NewBuffer() *buffer {
return d.pool.Get().(*buffer)
}
// ReadFrom reads as much data as it can fit in from r without growing its size.
// It returns the number of bytes successfully read from r or error.
func (b *buffer) ReadFrom(r io.Reader) (offset int64, err error) {
for len(b.data) < cap(b.data) && err == nil {
var n int
n, err = r.Read(b.data[len(b.data):cap(b.data)])
offset += int64(n)
b.data = b.data[:len(b.data)+n]
}
// NOTE(milosgajdos): io.ReaderFrom "swallows" io.EOF
// See: https://pkg.go.dev/io#ReaderFrom
if err == io.EOF {
err = nil
}
return offset, err
}
// Cap returns the capacity of the buffer's underlying byte slice.
func (b *buffer) Cap() int {
return cap(b.data)
}
// Len returns the length of the data in the buffer
func (b *buffer) Len() int {
return len(b.data)
}
// Clear the buffer data.
func (b *buffer) Clear() {
b.data = b.data[:0]
}
// writer attempts to upload parts to S3 in a buffered fashion where the last // writer attempts to upload parts to S3 in a buffered fashion where the last
// part is at least as large as the chunksize, so the multipart upload could be // part is at least as large as the chunksize, so the multipart upload could be
// cleanly resumed in the future. This is violated if Close is called after less // cleanly resumed in the future. This is violated if Close is called after less
// than a full chunk is written. // than a full chunk is written.
type writer struct { type writer struct {
driver *driver driver *driver
key string key string
uploadID string uploadID string
parts []*s3.Part parts []*s3.Part
size int64 size int64
readyPart []byte ready *buffer
pendingPart []byte pending *buffer
closed bool closed bool
committed bool committed bool
cancelled bool cancelled bool
} }
func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver.FileWriter { func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver.FileWriter {
@ -1279,15 +1338,11 @@ func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver
uploadID: uploadID, uploadID: uploadID,
parts: parts, parts: parts,
size: size, size: size,
ready: d.NewBuffer(),
pending: d.NewBuffer(),
} }
} }
type completedParts []*s3.CompletedPart
func (a completedParts) Len() int { return len(a) }
func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }
func (w *writer) Write(p []byte) (int, error) { func (w *writer) Write(p []byte) (int, error) {
if w.closed { if w.closed {
return 0, fmt.Errorf("already closed") return 0, fmt.Errorf("already closed")
@ -1300,12 +1355,12 @@ func (w *writer) Write(p []byte) (int, error) {
// If the last written part is smaller than minChunkSize, we need to make a // If the last written part is smaller than minChunkSize, we need to make a
// new multipart upload :sadface: // new multipart upload :sadface:
if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize { if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize {
var completedUploadedParts completedParts completedUploadedParts := make(completedParts, len(w.parts))
for _, part := range w.parts { for i, part := range w.parts {
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{ completedUploadedParts[i] = &s3.CompletedPart{
ETag: part.ETag, ETag: part.ETag,
PartNumber: part.PartNumber, PartNumber: part.PartNumber,
}) }
} }
sort.Sort(completedUploadedParts) sort.Sort(completedUploadedParts)
@ -1319,11 +1374,13 @@ func (w *writer) Write(p []byte) (int, error) {
}, },
}) })
if err != nil { if err != nil {
w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key), Key: aws.String(w.key),
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
}) }); aErr != nil {
return 0, errors.Join(err, aErr)
}
return 0, err return 0, err
} }
@ -1351,11 +1408,18 @@ func (w *writer) Write(p []byte) (int, error) {
return 0, err return 0, err
} }
defer resp.Body.Close() defer resp.Body.Close()
// reset uploaded parts
w.parts = nil w.parts = nil
w.readyPart, err = io.ReadAll(resp.Body) w.ready.Clear()
n, err := w.ready.ReadFrom(resp.Body)
if err != nil { if err != nil {
return 0, err return 0, err
} }
if resp.ContentLength != nil && n < *resp.ContentLength {
return 0, io.ErrShortBuffer
}
} else { } else {
// Otherwise we can use the old file as the new first part // Otherwise we can use the old file as the new first part
copyPartResp, err := w.driver.S3.UploadPartCopy(&s3.UploadPartCopyInput{ copyPartResp, err := w.driver.S3.UploadPartCopy(&s3.UploadPartCopyInput{
@ -1380,51 +1444,60 @@ func (w *writer) Write(p []byte) (int, error) {
var n int var n int
for len(p) > 0 { defer func() { w.size += int64(n) }()
// If no parts are ready to write, fill up the first part
if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 { reader := bytes.NewReader(p)
if len(p) >= neededBytes {
w.readyPart = append(w.readyPart, p[:neededBytes]...) for reader.Len() > 0 {
n += neededBytes // NOTE(milosgajdos): we do some seemingly unsafe conversions
p = p[neededBytes:] // from int64 to int in this for loop. These are fine as the
} else { // offset returned from buffer.ReadFrom can only ever be
w.readyPart = append(w.readyPart, p...) // maxChunkSize large which fits in to int. The reason why
n += len(p) // we return int64 is to play nice with Go interfaces where
p = nil // the buffer implements io.ReaderFrom interface.
}
// fill up the ready parts buffer
offset, err := w.ready.ReadFrom(reader)
n += int(offset)
if err != nil {
return n, err
} }
if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 { // try filling up the pending parts buffer
if len(p) >= neededBytes { offset, err = w.pending.ReadFrom(reader)
w.pendingPart = append(w.pendingPart, p[:neededBytes]...) n += int(offset)
n += neededBytes if err != nil {
p = p[neededBytes:] return n, err
err := w.flushPart() }
if err != nil {
w.size += int64(n) // we filled up pending buffer, flush
return n, err if w.pending.Len() == w.pending.Cap() {
} if err := w.flush(); err != nil {
} else { return n, err
w.pendingPart = append(w.pendingPart, p...)
n += len(p)
p = nil
} }
} }
} }
w.size += int64(n)
return n, nil return n, nil
} }
func (w *writer) Size() int64 { func (w *writer) Size() int64 {
return w.size return w.size
} }
func (w *writer) Close() error { func (w *writer) Close() error {
if w.closed { if w.closed {
return fmt.Errorf("already closed") return fmt.Errorf("already closed")
} }
w.closed = true w.closed = true
return w.flushPart()
defer func() {
w.ready.Clear()
w.driver.pool.Put(w.ready)
w.pending.Clear()
w.driver.pool.Put(w.pending)
}()
return w.flush()
} }
func (w *writer) Cancel(ctx context.Context) error { func (w *writer) Cancel(ctx context.Context) error {
@ -1450,25 +1523,28 @@ func (w *writer) Commit() error {
} else if w.cancelled { } else if w.cancelled {
return fmt.Errorf("already cancelled") return fmt.Errorf("already cancelled")
} }
err := w.flushPart()
err := w.flush()
if err != nil { if err != nil {
return err return err
} }
w.committed = true w.committed = true
completedUploadedParts := make(completedParts, 0, len(w.parts)) completedUploadedParts := make(completedParts, len(w.parts))
for _, part := range w.parts { for i, part := range w.parts {
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{ completedUploadedParts[i] = &s3.CompletedPart{
ETag: part.ETag, ETag: part.ETag,
PartNumber: part.PartNumber, PartNumber: part.PartNumber,
}) }
} }
// This is an edge case when we are trying to upload an empty chunk of data using // This is an edge case when we are trying to upload an empty file as part of
// a MultiPart upload. As a result we are trying to complete the MultipartUpload // the MultiPart upload. We get a PUT with Content-Length: 0 and sad things happen.
// with an empty slice of `completedUploadedParts` which will always lead to 400 // The result is we are trying to Complete MultipartUpload with an empty list of
// being returned from S3 See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload // completedUploadedParts which will always lead to 400 being returned from S3
// Solution: we upload an empty i.e. 0 byte part as a single part and then append it // See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload
// Solution: we upload the empty i.e. 0 byte part as a single part and then append it
// to the completedUploadedParts slice used to complete the Multipart upload. // to the completedUploadedParts slice used to complete the Multipart upload.
if len(w.parts) == 0 { if len(w.parts) == 0 {
resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{ resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{
@ -1499,47 +1575,56 @@ func (w *writer) Commit() error {
}, },
}) })
if err != nil { if err != nil {
w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key), Key: aws.String(w.key),
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
}) }); aErr != nil {
return errors.Join(err, aErr)
}
return err return err
} }
return nil return nil
} }
// flushPart flushes buffers to write a part to S3. // flush flushes all buffers to write a part to S3.
// Only called by Write (with both buffers full) and Close/Commit (always) // flush is only called by Write (with both buffers full) and Close/Commit (always)
func (w *writer) flushPart() error { func (w *writer) flush() error {
if len(w.readyPart) == 0 && len(w.pendingPart) == 0 { if w.ready.Len() == 0 && w.pending.Len() == 0 {
// nothing to write
return nil return nil
} }
if w.driver.MultipartCombineSmallPart && len(w.pendingPart) < int(w.driver.ChunkSize) {
// closing with a small pending part buf := bytes.NewBuffer(w.ready.data)
// combine ready and pending to avoid writing a small part if w.driver.MultipartCombineSmallPart && (w.pending.Len() > 0 && w.pending.Len() < int(w.driver.ChunkSize)) {
w.readyPart = append(w.readyPart, w.pendingPart...) if _, err := buf.Write(w.pending.data); err != nil {
w.pendingPart = nil return err
}
w.pending.Clear()
} }
partSize := buf.Len()
partNumber := aws.Int64(int64(len(w.parts) + 1)) partNumber := aws.Int64(int64(len(w.parts) + 1))
resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{ resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{
Bucket: aws.String(w.driver.Bucket), Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key), Key: aws.String(w.key),
PartNumber: partNumber, PartNumber: partNumber,
UploadId: aws.String(w.uploadID), UploadId: aws.String(w.uploadID),
Body: bytes.NewReader(w.readyPart), Body: bytes.NewReader(buf.Bytes()),
}) })
if err != nil { if err != nil {
return err return err
} }
w.parts = append(w.parts, &s3.Part{ w.parts = append(w.parts, &s3.Part{
ETag: resp.ETag, ETag: resp.ETag,
PartNumber: partNumber, PartNumber: partNumber,
Size: aws.Int64(int64(len(w.readyPart))), Size: aws.Int64(int64(partSize)),
}) })
w.readyPart = w.pendingPart
w.pendingPart = nil // reset the flushed buffer and swap buffers
w.ready.Clear()
w.ready, w.pending = w.pending, w.ready
return nil return nil
} }