Merge pull request #4036 from milosgajdos/s3-context

Propagate storage driver context to S3 API calls
This commit is contained in:
Milos Gajdos 2023-09-04 16:57:11 +01:00 committed by GitHub
commit b6d0d3802e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -608,7 +608,7 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
_, err := d.S3.PutObject(&s3.PutObjectInput{
_, err := d.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
ContentType: d.getContentType(),
@ -624,7 +624,7 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
resp, err := d.S3.GetObject(&s3.GetObjectInput{
resp, err := d.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"),
@ -645,7 +645,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
key := d.s3Path(path)
if !appendParam {
// TODO (brianbland): cancel other uploads at this path
resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
resp, err := d.S3.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
ContentType: d.getContentType(),
@ -665,7 +665,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
Prefix: aws.String(key),
}
for {
resp, err := d.S3.ListMultipartUploads(listMultipartUploadsInput)
resp, err := d.S3.ListMultipartUploadsWithContext(ctx, listMultipartUploadsInput)
if err != nil {
return nil, parseError(path, err)
}
@ -683,7 +683,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
continue
}
partsList, err := d.S3.ListParts(&s3.ListPartsInput{
partsList, err := d.S3.ListPartsWithContext(ctx, &s3.ListPartsInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
UploadId: multi.UploadId,
@ -693,7 +693,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
}
allParts = append(allParts, partsList.Parts...)
for *partsList.IsTruncated {
partsList, err = d.S3.ListParts(&s3.ListPartsInput{
partsList, err = d.S3.ListPartsWithContext(ctx, &s3.ListPartsInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
UploadId: multi.UploadId,
@ -722,7 +722,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
resp, err := d.S3.ListObjectsV2(&s3.ListObjectsV2Input{
resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
MaxKeys: aws.Int64(1),
@ -767,7 +767,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
prefix = "/"
}
resp, err := d.S3.ListObjectsV2(&s3.ListObjectsV2Input{
resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"),
@ -791,7 +791,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
}
if *resp.IsTruncated {
resp, err = d.S3.ListObjectsV2(&s3.ListObjectsV2Input{
resp, err = d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"),
@ -841,7 +841,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
}
if fileInfo.Size() <= d.MultipartCopyThresholdSize {
_, err := d.S3.CopyObject(&s3.CopyObjectInput{
_, err := d.S3.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
@ -857,7 +857,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
return nil
}
createResp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
createResp, err := d.S3.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
@ -884,7 +884,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
if lastByte >= fileInfo.Size() {
lastByte = fileInfo.Size() - 1
}
uploadResp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{
uploadResp, err := d.S3.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
Key: aws.String(d.s3Path(destPath)),
@ -910,7 +910,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
}
}
_, err = d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
_, err = d.S3.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
UploadId: createResp.UploadId,
@ -931,7 +931,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {
for {
// list all the objects
resp, err := d.S3.ListObjectsV2(listObjectsInput)
resp, err := d.S3.ListObjectsV2WithContext(ctx, listObjectsInput)
// resp.Contents can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
@ -956,7 +956,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {
// by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more.
// 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack
// Delete here straight away and reset the object slice when successful.
resp, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
resp, err := d.S3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects,