diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go index 3df5e5b3..bd61bccb 100644 --- a/registry/storage/driver/s3-aws/s3.go +++ b/registry/storage/driver/s3-aws/s3.go @@ -25,6 +25,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -48,6 +49,7 @@ const driverName = "s3aws" const minChunkSize = 5 << 20 // maxChunkSize defines the maximum multipart upload chunk size allowed by S3. +// S3 API requires max upload chunk to be 5GB. const maxChunkSize = 5 << 30 const defaultChunkSize = 2 * minChunkSize @@ -166,6 +168,7 @@ type driver struct { RootDirectory string StorageClass string ObjectACL string + pool *sync.Pool } type baseEmbed struct { @@ -580,6 +583,13 @@ func New(params DriverParameters) (*Driver, error) { RootDirectory: params.RootDirectory, StorageClass: params.StorageClass, ObjectACL: params.ObjectACL, + pool: &sync.Pool{ + New: func() interface{} { + return &buffer{ + data: make([]byte, 0, params.ChunkSize), + } + }, + }, } return &Driver{ @@ -1251,21 +1261,64 @@ func (d *driver) getStorageClass() *string { return aws.String(d.StorageClass) } +// buffer is a static size bytes buffer. +type buffer struct { + data []byte +} + +// NewBuffer returns a new bytes buffer from driver's memory pool. +// The size of the buffer is static and set to params.ChunkSize. +func (d *driver) NewBuffer() *buffer { + return d.pool.Get().(*buffer) +} + +// ReadFrom reads as much data as it can fit in from r without growing its size. +// It returns the number of bytes successfully read from r or error. +func (b *buffer) ReadFrom(r io.Reader) (offset int64, err error) { + for len(b.data) < cap(b.data) && err == nil { + var n int + n, err = r.Read(b.data[len(b.data):cap(b.data)]) + offset += int64(n) + b.data = b.data[:len(b.data)+n] + } + // NOTE(milosgajdos): io.ReaderFrom "swallows" io.EOF + // See: https://pkg.go.dev/io#ReaderFrom + if err == io.EOF { + err = nil + } + return offset, err +} + +// Cap returns the capacity of the buffer's underlying byte slice. +func (b *buffer) Cap() int { + return cap(b.data) +} + +// Len returns the length of the data in the buffer +func (b *buffer) Len() int { + return len(b.data) +} + +// Clear the buffer data. +func (b *buffer) Clear() { + b.data = b.data[:0] +} + // writer attempts to upload parts to S3 in a buffered fashion where the last // part is at least as large as the chunksize, so the multipart upload could be // cleanly resumed in the future. This is violated if Close is called after less // than a full chunk is written. type writer struct { - driver *driver - key string - uploadID string - parts []*s3.Part - size int64 - readyPart []byte - pendingPart []byte - closed bool - committed bool - cancelled bool + driver *driver + key string + uploadID string + parts []*s3.Part + size int64 + ready *buffer + pending *buffer + closed bool + committed bool + cancelled bool } func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver.FileWriter { @@ -1279,6 +1332,8 @@ func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver uploadID: uploadID, parts: parts, size: size, + ready: d.NewBuffer(), + pending: d.NewBuffer(), } } @@ -1300,12 +1355,12 @@ func (w *writer) Write(p []byte) (int, error) { // If the last written part is smaller than minChunkSize, we need to make a // new multipart upload :sadface: if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize { - var completedUploadedParts completedParts - for _, part := range w.parts { - completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{ + completedUploadedParts := make(completedParts, len(w.parts)) + for i, part := range w.parts { + completedUploadedParts[i] = &s3.CompletedPart{ ETag: part.ETag, PartNumber: part.PartNumber, - }) + } } sort.Sort(completedUploadedParts) @@ -1319,11 +1374,13 @@ func (w *writer) Write(p []byte) (int, error) { }, }) if err != nil { - w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), UploadId: aws.String(w.uploadID), - }) + }); aErr != nil { + return 0, errors.Join(err, aErr) + } return 0, err } @@ -1351,11 +1408,18 @@ func (w *writer) Write(p []byte) (int, error) { return 0, err } defer resp.Body.Close() + + // reset uploaded parts w.parts = nil - w.readyPart, err = io.ReadAll(resp.Body) + w.ready.Clear() + + n, err := w.ready.ReadFrom(resp.Body) if err != nil { return 0, err } + if resp.ContentLength != nil && n < *resp.ContentLength { + return 0, io.ErrShortBuffer + } } else { // Otherwise we can use the old file as the new first part copyPartResp, err := w.driver.S3.UploadPartCopy(&s3.UploadPartCopyInput{ @@ -1380,51 +1444,60 @@ func (w *writer) Write(p []byte) (int, error) { var n int - for len(p) > 0 { - // If no parts are ready to write, fill up the first part - if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 { - if len(p) >= neededBytes { - w.readyPart = append(w.readyPart, p[:neededBytes]...) - n += neededBytes - p = p[neededBytes:] - } else { - w.readyPart = append(w.readyPart, p...) - n += len(p) - p = nil - } + defer func() { w.size += int64(n) }() + + reader := bytes.NewReader(p) + + for reader.Len() > 0 { + // NOTE(milosgajdos): we do some seemingly unsafe conversions + // from int64 to int in this for loop. These are fine as the + // offset returned from buffer.ReadFrom can only ever be + // maxChunkSize large which fits in to int. The reason why + // we return int64 is to play nice with Go interfaces where + // the buffer implements io.ReaderFrom interface. + + // fill up the ready parts buffer + offset, err := w.ready.ReadFrom(reader) + n += int(offset) + if err != nil { + return n, err } - if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 { - if len(p) >= neededBytes { - w.pendingPart = append(w.pendingPart, p[:neededBytes]...) - n += neededBytes - p = p[neededBytes:] - err := w.flushPart() - if err != nil { - w.size += int64(n) - return n, err - } - } else { - w.pendingPart = append(w.pendingPart, p...) - n += len(p) - p = nil + // try filling up the pending parts buffer + offset, err = w.pending.ReadFrom(reader) + n += int(offset) + if err != nil { + return n, err + } + + // we filled up pending buffer, flush + if w.pending.Len() == w.pending.Cap() { + if err := w.flush(); err != nil { + return n, err } } } - w.size += int64(n) + return n, nil } func (w *writer) Size() int64 { return w.size } - func (w *writer) Close() error { if w.closed { return fmt.Errorf("already closed") } w.closed = true - return w.flushPart() + + defer func() { + w.ready.Clear() + w.driver.pool.Put(w.ready) + w.pending.Clear() + w.driver.pool.Put(w.pending) + }() + + return w.flush() } func (w *writer) Cancel(ctx context.Context) error { @@ -1450,25 +1523,28 @@ func (w *writer) Commit() error { } else if w.cancelled { return fmt.Errorf("already cancelled") } - err := w.flushPart() + + err := w.flush() if err != nil { return err } + w.committed = true - completedUploadedParts := make(completedParts, 0, len(w.parts)) - for _, part := range w.parts { - completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{ + completedUploadedParts := make(completedParts, len(w.parts)) + for i, part := range w.parts { + completedUploadedParts[i] = &s3.CompletedPart{ ETag: part.ETag, PartNumber: part.PartNumber, - }) + } } - // This is an edge case when we are trying to upload an empty chunk of data using - // a MultiPart upload. As a result we are trying to complete the MultipartUpload - // with an empty slice of `completedUploadedParts` which will always lead to 400 - // being returned from S3 See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload - // Solution: we upload an empty i.e. 0 byte part as a single part and then append it + // This is an edge case when we are trying to upload an empty file as part of + // the MultiPart upload. We get a PUT with Content-Length: 0 and sad things happen. + // The result is we are trying to Complete MultipartUpload with an empty list of + // completedUploadedParts which will always lead to 400 being returned from S3 + // See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload + // Solution: we upload the empty i.e. 0 byte part as a single part and then append it // to the completedUploadedParts slice used to complete the Multipart upload. if len(w.parts) == 0 { resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{ @@ -1499,47 +1575,56 @@ func (w *writer) Commit() error { }, }) if err != nil { - w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), UploadId: aws.String(w.uploadID), - }) + }); aErr != nil { + return errors.Join(err, aErr) + } return err } return nil } -// flushPart flushes buffers to write a part to S3. -// Only called by Write (with both buffers full) and Close/Commit (always) -func (w *writer) flushPart() error { - if len(w.readyPart) == 0 && len(w.pendingPart) == 0 { - // nothing to write +// flush flushes all buffers to write a part to S3. +// flush is only called by Write (with both buffers full) and Close/Commit (always) +func (w *writer) flush() error { + if w.ready.Len() == 0 && w.pending.Len() == 0 { return nil } - if w.driver.MultipartCombineSmallPart && len(w.pendingPart) < int(w.driver.ChunkSize) { - // closing with a small pending part - // combine ready and pending to avoid writing a small part - w.readyPart = append(w.readyPart, w.pendingPart...) - w.pendingPart = nil + + buf := bytes.NewBuffer(w.ready.data) + if w.driver.MultipartCombineSmallPart && (w.pending.Len() > 0 && w.pending.Len() < int(w.driver.ChunkSize)) { + if _, err := buf.Write(w.pending.data); err != nil { + return err + } + w.pending.Clear() } + partSize := buf.Len() partNumber := aws.Int64(int64(len(w.parts) + 1)) + resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{ Bucket: aws.String(w.driver.Bucket), Key: aws.String(w.key), PartNumber: partNumber, UploadId: aws.String(w.uploadID), - Body: bytes.NewReader(w.readyPart), + Body: bytes.NewReader(buf.Bytes()), }) if err != nil { return err } + w.parts = append(w.parts, &s3.Part{ ETag: resp.ETag, PartNumber: partNumber, - Size: aws.Int64(int64(len(w.readyPart))), + Size: aws.Int64(int64(partSize)), }) - w.readyPart = w.pendingPart - w.pendingPart = nil + + // reset the flushed buffer and swap buffers + w.ready.Clear() + w.ready, w.pending = w.pending, w.ready + return nil }