From b888b14b3984f5bdbec7305a807fcdc8afa9b980 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 22 Sep 2023 14:53:47 +0100 Subject: [PATCH] Optimise push in S3 driver This commit cleans up and attempts to optimise the performance of image push in S3 driver. There are 2 main changes: * we refactor the S3 driver Writer where instead of using separate bytes slices for ready and pending parts which get constantly appended data into them causing unnecessary allocations we use optimised bytes buffers; we make sure these are used efficiently when written to. * we introduce a memory pool that is used for allocating the byte buffers introduced above These changes should alleviate high memory pressure on the push path to S3. Co-authored-by: Cory Snider Signed-off-by: Milos Gajdos --- registry/storage/driver/s3-aws/s3.go | 241 ++++++++++++++++++--------- 1 file changed, 163 insertions(+), 78 deletions(-) diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index 3df5e5b3..6e833753 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -25,6 +25,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -48,6 +49,7 @@ const driverName = "s3aws" const minChunkSize = 5 << 20 // maxChunkSize defines the maximum multipart upload chunk size allowed by S3. +// S3 API requires max upload chunk to be 5GB. const maxChunkSize = 5 << 30 const defaultChunkSize = 2 * minChunkSize @@ -166,6 +168,7 @@ type driver struct { RootDirectory string StorageClass string ObjectACL string + pool *sync.Pool } type baseEmbed struct { @@ -580,6 +583,13 @@ func New(params DriverParameters) (*Driver, error) { RootDirectory: params.RootDirectory, StorageClass: params.StorageClass, ObjectACL: params.ObjectACL, + pool: &sync.Pool{ + New: func() interface{} { + return &buffer{ + data: make([]byte, 0, params.ChunkSize), + } + }, + }, } return &Driver{ @@ -1251,21 +1261,70 @@ func (d *driver) getStorageClass() *string { return aws.String(d.StorageClass) } +type completedParts []*s3.CompletedPart + +func (a completedParts) Len() int { return len(a) } +func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } + +// buffer is a static size bytes buffer. +type buffer struct { + data []byte +} + +// NewBuffer returns a new bytes buffer from driver's memory pool. +// The size of the buffer is static and set to params.ChunkSize. +func (d *driver) NewBuffer() *buffer { + return d.pool.Get().(*buffer) +} + +// ReadFrom reads as much data as it can fit in from r without growing its size. +// It returns the number of bytes successfully read from r or error. +func (b *buffer) ReadFrom(r io.Reader) (offset int64, err error) { + for len(b.data) < cap(b.data) && err == nil { + var n int + n, err = r.Read(b.data[len(b.data):cap(b.data)]) + offset += int64(n) + b.data = b.data[:len(b.data)+n] + } + // NOTE(milosgajdos): io.ReaderFrom "swallows" io.EOF + // See: https://pkg.go.dev/io#ReaderFrom + if err == io.EOF { + err = nil + } + return offset, err +} + +// Cap returns the capacity of the buffer's underlying byte slice. +func (b *buffer) Cap() int { + return cap(b.data) +} + +// Len returns the length of the data in the buffer +func (b *buffer) Len() int { + return len(b.data) +} + +// Clear the buffer data. +func (b *buffer) Clear() { + b.data = b.data[:0] +} + // writer attempts to upload parts to S3 in a buffered fashion where the last // part is at least as large as the chunksize, so the multipart upload could be // cleanly resumed in the future. This is violated if Close is called after less // than a full chunk is written. type writer struct { - driver *driver - key string - uploadID string - parts []*s3.Part - size int64 - readyPart []byte - pendingPart []byte - closed bool - committed bool - cancelled bool + driver *driver + key string + uploadID string + parts []*s3.Part + size int64 + ready *buffer + pending *buffer + closed bool + committed bool + cancelled bool } func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver.FileWriter { @@ -1279,15 +1338,11 @@ func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver uploadID: uploadID, parts: parts, size: size, + ready: d.NewBuffer(), + pending: d.NewBuffer(), } } -type completedParts []*s3.CompletedPart - -func (a completedParts) Len() int { return len(a) } -func (a completedParts) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a completedParts) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber } - func (w *writer) Write(p []byte) (int, error) { if w.closed { return 0, fmt.Errorf("already closed") @@ -1300,12 +1355,12 @@ func (w *writer) Write(p []byte) (int, error) { // If the last written part is smaller than minChunkSize, we need to make a // new multipart upload :sadface: if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize { - var completedUploadedParts completedParts - for _, part := range w.parts { - completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{ + completedUploadedParts := make(completedParts, len(w.parts)) + for i, part := range w.parts { + completedUploadedParts[i] = &s3.CompletedPart{ ETag: part.ETag, PartNumber: part.PartNumber, - }) + } } sort.Sort(completedUploadedParts) @@ -1319,11 +1374,13 @@ func (w *writer) Write(p []byte) (int, error) { }, }) if err != nil { - w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), UploadId: aws.String(w.uploadID), - }) + }); aErr != nil { + return 0, errors.Join(err, aErr) + } return 0, err } @@ -1351,11 +1408,18 @@ func (w *writer) Write(p []byte) (int, error) { return 0, err } defer resp.Body.Close() + + // reset uploaded parts w.parts = nil - w.readyPart, err = io.ReadAll(resp.Body) + w.ready.Clear() + + n, err := w.ready.ReadFrom(resp.Body) if err != nil { return 0, err } + if resp.ContentLength != nil && n < *resp.ContentLength { + return 0, io.ErrShortBuffer + } } else { // Otherwise we can use the old file as the new first part copyPartResp, err := w.driver.S3.UploadPartCopy(&s3.UploadPartCopyInput{ @@ -1380,51 +1444,60 @@ func (w *writer) Write(p []byte) (int, error) { var n int - for len(p) > 0 { - // If no parts are ready to write, fill up the first part - if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 { - if len(p) >= neededBytes { - w.readyPart = append(w.readyPart, p[:neededBytes]...) - n += neededBytes - p = p[neededBytes:] - } else { - w.readyPart = append(w.readyPart, p...) - n += len(p) - p = nil - } + defer func() { w.size += int64(n) }() + + reader := bytes.NewReader(p) + + for reader.Len() > 0 { + // NOTE(milosgajdos): we do some seemingly unsafe conversions + // from int64 to int in this for loop. These are fine as the + // offset returned from buffer.ReadFrom can only ever be + // maxChunkSize large which fits in to int. The reason why + // we return int64 is to play nice with Go interfaces where + // the buffer implements io.ReaderFrom interface. + + // fill up the ready parts buffer + offset, err := w.ready.ReadFrom(reader) + n += int(offset) + if err != nil { + return n, err } - if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 { - if len(p) >= neededBytes { - w.pendingPart = append(w.pendingPart, p[:neededBytes]...) - n += neededBytes - p = p[neededBytes:] - err := w.flushPart() - if err != nil { - w.size += int64(n) - return n, err - } - } else { - w.pendingPart = append(w.pendingPart, p...) - n += len(p) - p = nil + // try filling up the pending parts buffer + offset, err = w.pending.ReadFrom(reader) + n += int(offset) + if err != nil { + return n, err + } + + // we filled up pending buffer, flush + if w.pending.Len() == w.pending.Cap() { + if err := w.flush(); err != nil { + return n, err } } } - w.size += int64(n) + return n, nil } func (w *writer) Size() int64 { return w.size } - func (w *writer) Close() error { if w.closed { return fmt.Errorf("already closed") } w.closed = true - return w.flushPart() + + defer func() { + w.ready.Clear() + w.driver.pool.Put(w.ready) + w.pending.Clear() + w.driver.pool.Put(w.pending) + }() + + return w.flush() } func (w *writer) Cancel(ctx context.Context) error { @@ -1450,25 +1523,28 @@ func (w *writer) Commit() error { } else if w.cancelled { return fmt.Errorf("already cancelled") } - err := w.flushPart() + + err := w.flush() if err != nil { return err } + w.committed = true - completedUploadedParts := make(completedParts, 0, len(w.parts)) - for _, part := range w.parts { - completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{ + completedUploadedParts := make(completedParts, len(w.parts)) + for i, part := range w.parts { + completedUploadedParts[i] = &s3.CompletedPart{ ETag: part.ETag, PartNumber: part.PartNumber, - }) + } } - // This is an edge case when we are trying to upload an empty chunk of data using - // a MultiPart upload. As a result we are trying to complete the MultipartUpload - // with an empty slice of `completedUploadedParts` which will always lead to 400 - // being returned from S3 See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload - // Solution: we upload an empty i.e. 0 byte part as a single part and then append it + // This is an edge case when we are trying to upload an empty file as part of + // the MultiPart upload. We get a PUT with Content-Length: 0 and sad things happen. + // The result is we are trying to Complete MultipartUpload with an empty list of + // completedUploadedParts which will always lead to 400 being returned from S3 + // See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload + // Solution: we upload the empty i.e. 0 byte part as a single part and then append it // to the completedUploadedParts slice used to complete the Multipart upload. if len(w.parts) == 0 { resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{ @@ -1499,47 +1575,56 @@ func (w *writer) Commit() error { }, }) if err != nil { - w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), UploadId: aws.String(w.uploadID), - }) + }); aErr != nil { + return errors.Join(err, aErr) + } return err } return nil } -// flushPart flushes buffers to write a part to S3. -// Only called by Write (with both buffers full) and Close/Commit (always) -func (w *writer) flushPart() error { - if len(w.readyPart) == 0 && len(w.pendingPart) == 0 { - // nothing to write +// flush flushes all buffers to write a part to S3. +// flush is only called by Write (with both buffers full) and Close/Commit (always) +func (w *writer) flush() error { + if w.ready.Len() == 0 && w.pending.Len() == 0 { return nil } - if w.driver.MultipartCombineSmallPart && len(w.pendingPart) < int(w.driver.ChunkSize) { - // closing with a small pending part - // combine ready and pending to avoid writing a small part - w.readyPart = append(w.readyPart, w.pendingPart...) - w.pendingPart = nil + + buf := bytes.NewBuffer(w.ready.data) + if w.driver.MultipartCombineSmallPart && (w.pending.Len() > 0 && w.pending.Len() < int(w.driver.ChunkSize)) { + if _, err := buf.Write(w.pending.data); err != nil { + return err + } + w.pending.Clear() } + partSize := buf.Len() partNumber := aws.Int64(int64(len(w.parts) + 1)) + resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), PartNumber: partNumber, UploadId: aws.String(w.uploadID), - Body: bytes.NewReader(w.readyPart), + Body: bytes.NewReader(buf.Bytes()), }) if err != nil { return err } + w.parts = append(w.parts, &s3.Part{ ETag: resp.ETag, PartNumber: partNumber, - Size: aws.Int64(int64(len(w.readyPart))), + Size: aws.Int64(int64(partSize)), }) - w.readyPart = w.pendingPart - w.pendingPart = nil + + // reset the flushed buffer and swap buffers + w.ready.Clear() + w.ready, w.pending = w.pending, w.ready + return nil }