Propagate storage driver context to S3 API calls

Only some of the S3 storage driver calls were propagating context to the
S3 API calls. This commit updates the S3 storage drivers so the context
is propagated to all the S3 API calls.

Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
This commit is contained in:
Milos Gajdos 2023-09-03 21:54:54 +01:00
parent a2e65220ae
commit dcdd8bb740
No known key found for this signature in database
GPG key ID: 01300E5E6D417439

View file

@ -608,7 +608,7 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
// PutContent stores the []byte content at a location designated by "path". // PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error { func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
_, err := d.S3.PutObject(&s3.PutObjectInput{ _, err := d.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)), Key: aws.String(d.s3Path(path)),
ContentType: d.getContentType(), ContentType: d.getContentType(),
@ -624,7 +624,7 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e
// Reader retrieves an io.ReadCloser for the content stored at "path" with a // Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset. // given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
resp, err := d.S3.GetObject(&s3.GetObjectInput{ resp, err := d.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)), Key: aws.String(d.s3Path(path)),
Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"), Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"),
@ -645,7 +645,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
key := d.s3Path(path) key := d.s3Path(path)
if !appendParam { if !appendParam {
// TODO (brianbland): cancel other uploads at this path // TODO (brianbland): cancel other uploads at this path
resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ resp, err := d.S3.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(key), Key: aws.String(key),
ContentType: d.getContentType(), ContentType: d.getContentType(),
@ -665,7 +665,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
Prefix: aws.String(key), Prefix: aws.String(key),
} }
for { for {
resp, err := d.S3.ListMultipartUploads(listMultipartUploadsInput) resp, err := d.S3.ListMultipartUploadsWithContext(ctx, listMultipartUploadsInput)
if err != nil { if err != nil {
return nil, parseError(path, err) return nil, parseError(path, err)
} }
@ -683,7 +683,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
continue continue
} }
partsList, err := d.S3.ListParts(&s3.ListPartsInput{ partsList, err := d.S3.ListPartsWithContext(ctx, &s3.ListPartsInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(key), Key: aws.String(key),
UploadId: multi.UploadId, UploadId: multi.UploadId,
@ -693,7 +693,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
} }
allParts = append(allParts, partsList.Parts...) allParts = append(allParts, partsList.Parts...)
for *partsList.IsTruncated { for *partsList.IsTruncated {
partsList, err = d.S3.ListParts(&s3.ListPartsInput{ partsList, err = d.S3.ListPartsWithContext(ctx, &s3.ListPartsInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(key), Key: aws.String(key),
UploadId: multi.UploadId, UploadId: multi.UploadId,
@ -722,7 +722,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
// Stat retrieves the FileInfo for the given path, including the current size // Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time. // in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) { func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
resp, err := d.S3.ListObjectsV2(&s3.ListObjectsV2Input{ resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)), Prefix: aws.String(d.s3Path(path)),
MaxKeys: aws.Int64(1), MaxKeys: aws.Int64(1),
@ -767,7 +767,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
prefix = "/" prefix = "/"
} }
resp, err := d.S3.ListObjectsV2(&s3.ListObjectsV2Input{ resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)), Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"), Delimiter: aws.String("/"),
@ -791,7 +791,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
} }
if *resp.IsTruncated { if *resp.IsTruncated {
resp, err = d.S3.ListObjectsV2(&s3.ListObjectsV2Input{ resp, err = d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)), Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"), Delimiter: aws.String("/"),
@ -841,7 +841,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
} }
if fileInfo.Size() <= d.MultipartCopyThresholdSize { if fileInfo.Size() <= d.MultipartCopyThresholdSize {
_, err := d.S3.CopyObject(&s3.CopyObjectInput{ _, err := d.S3.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)), Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(), ContentType: d.getContentType(),
@ -857,7 +857,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
return nil return nil
} }
createResp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{ createResp, err := d.S3.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)), Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(), ContentType: d.getContentType(),
@ -884,7 +884,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
if lastByte >= fileInfo.Size() { if lastByte >= fileInfo.Size() {
lastByte = fileInfo.Size() - 1 lastByte = fileInfo.Size() - 1
} }
uploadResp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{ uploadResp, err := d.S3.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)), CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
Key: aws.String(d.s3Path(destPath)), Key: aws.String(d.s3Path(destPath)),
@ -910,7 +910,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
} }
} }
_, err = d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{ _, err = d.S3.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)), Key: aws.String(d.s3Path(destPath)),
UploadId: createResp.UploadId, UploadId: createResp.UploadId,
@ -931,7 +931,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {
for { for {
// list all the objects // list all the objects
resp, err := d.S3.ListObjectsV2(listObjectsInput) resp, err := d.S3.ListObjectsV2WithContext(ctx, listObjectsInput)
// resp.Contents can only be empty on the first call // resp.Contents can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false // if there were no more results to return after the first call, resp.IsTruncated would have been false
@ -956,7 +956,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {
// by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more. // by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more.
// 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack // 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack
// Delete here straight away and reset the object slice when successful. // Delete here straight away and reset the object slice when successful.
resp, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{ resp, err := d.S3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket), Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{ Delete: &s3.Delete{
Objects: s3Objects, Objects: s3Objects,