From 0361acbde40815534ab11b4027f8083151ff8640 Mon Sep 17 00:00:00 2001 From: Christopher Merry Date: Sun, 12 Mar 2023 01:59:21 -0700 Subject: [PATCH] googlecloudstorage: added gcs requester pays --- .../googlecloudstorage/googlecloudstorage.go | 67 ++++++++++++++++--- 1 file changed, 58 insertions(+), 9 deletions(-) diff --git a/backend/googlecloudstorage/googlecloudstorage.go b/backend/googlecloudstorage/googlecloudstorage.go index 4017c206b..753cf04b2 100644 --- a/backend/googlecloudstorage/googlecloudstorage.go +++ b/backend/googlecloudstorage/googlecloudstorage.go @@ -93,6 +93,9 @@ func init() { Options: append(oauthutil.SharedOptions, []fs.Option{{ Name: "project_number", Help: "Project number.\n\nOptional - needed only for list/create/delete buckets - see your developer console.", + }, { + Name: "user_project", + Help: "User project.\n\nOptional - needed only for requester pays.", }, { Name: "service_account_file", Help: "Service Account Credentials JSON file path.\n\nLeave blank normally.\nNeeded only if you want use SA instead of interactive login." + env.ShellExpandHelp, @@ -349,6 +352,7 @@ can't check the size and hash but the file contents will be decompressed. // Options defines the configuration for this backend type Options struct { ProjectNumber string `config:"project_number"` + UserProject string `config:"user_project"` ServiceAccountFile string `config:"service_account_file"` ServiceAccountCredentials string `config:"service_account_credentials"` Anonymous bool `config:"anonymous"` @@ -559,7 +563,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e // Check to see if the object exists encodedDirectory := f.opt.Enc.FromStandardPath(f.rootDirectory) err = f.pacer.Call(func() (bool, error) { - _, err = f.svc.Objects.Get(f.rootBucket, encodedDirectory).Context(ctx).Do() + get := f.svc.Objects.Get(f.rootBucket, encodedDirectory).Context(ctx) + if f.opt.UserProject != "" { + get = get.UserProject(f.opt.UserProject) + } + _, err = get.Do() return shouldRetry(ctx, err) }) if err == nil { @@ -619,6 +627,9 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck directory += "/" } list := f.svc.Objects.List(bucket).Prefix(directory).MaxResults(listChunks) + if f.opt.UserProject != "" { + list = list.UserProject(f.opt.UserProject) + } if !recurse { list = list.Delimiter("/") } @@ -725,6 +736,9 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error) return nil, errors.New("can't list buckets without project number") } listBuckets := f.svc.Buckets.List(f.opt.ProjectNumber).MaxResults(listChunks) + if f.opt.UserProject != "" { + listBuckets = listBuckets.UserProject(f.opt.UserProject) + } for { var buckets *storage.Buckets err = f.pacer.Call(func() (bool, error) { @@ -854,7 +868,11 @@ func (f *Fs) makeBucket(ctx context.Context, bucket string) (err error) { // List something from the bucket to see if it exists. Doing it like this enables the use of a // service account that only has the "Storage Object Admin" role. See #2193 for details. err = f.pacer.Call(func() (bool, error) { - _, err = f.svc.Objects.List(bucket).MaxResults(1).Context(ctx).Do() + list := f.svc.Objects.List(bucket).MaxResults(1).Context(ctx) + if f.opt.UserProject != "" { + list = list.UserProject(f.opt.UserProject) + } + _, err = list.Do() return shouldRetry(ctx, err) }) if err == nil { @@ -889,7 +907,11 @@ func (f *Fs) makeBucket(ctx context.Context, bucket string) (err error) { if !f.opt.BucketPolicyOnly { insertBucket.PredefinedAcl(f.opt.BucketACL) } - _, err = insertBucket.Context(ctx).Do() + insertBucket = insertBucket.Context(ctx) + if f.opt.UserProject != "" { + insertBucket = insertBucket.UserProject(f.opt.UserProject) + } + _, err = insertBucket.Do() return shouldRetry(ctx, err) }) }, nil) @@ -914,7 +936,11 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) (err error) { } return f.cache.Remove(bucket, func() error { return f.pacer.Call(func() (bool, error) { - err = f.svc.Buckets.Delete(bucket).Context(ctx).Do() + deleteBucket := f.svc.Buckets.Delete(bucket).Context(ctx) + if f.opt.UserProject != "" { + deleteBucket = deleteBucket.UserProject(f.opt.UserProject) + } + err = deleteBucket.Do() return shouldRetry(ctx, err) }) }) @@ -960,7 +986,11 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, var rewriteResponse *storage.RewriteResponse for { err = f.pacer.Call(func() (bool, error) { - rewriteResponse, err = rewriteRequest.Context(ctx).Do() + rewriteRequest = rewriteRequest.Context(ctx) + if f.opt.UserProject != "" { + rewriteRequest.UserProject(f.opt.UserProject) + } + rewriteResponse, err = rewriteRequest.Do() return shouldRetry(ctx, err) }) if err != nil { @@ -1071,7 +1101,11 @@ func (o *Object) setMetaData(info *storage.Object) { func (o *Object) readObjectInfo(ctx context.Context) (object *storage.Object, err error) { bucket, bucketPath := o.split() err = o.fs.pacer.Call(func() (bool, error) { - object, err = o.fs.svc.Objects.Get(bucket, bucketPath).Context(ctx).Do() + get := o.fs.svc.Objects.Get(bucket, bucketPath).Context(ctx) + if o.fs.opt.UserProject != "" { + get = get.UserProject(o.fs.opt.UserProject) + } + object, err = get.Do() return shouldRetry(ctx, err) }) if err != nil { @@ -1143,7 +1177,11 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) (err error) if !o.fs.opt.BucketPolicyOnly { copyObject.DestinationPredefinedAcl(o.fs.opt.ObjectACL) } - newObject, err = copyObject.Context(ctx).Do() + copyObject = copyObject.Context(ctx) + if o.fs.opt.UserProject != "" { + copyObject = copyObject.UserProject(o.fs.opt.UserProject) + } + newObject, err = copyObject.Do() return shouldRetry(ctx, err) }) if err != nil { @@ -1160,6 +1198,9 @@ func (o *Object) Storable() bool { // Open an object for read func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) { + if o.fs.opt.UserProject != "" { + o.url = o.url + "&userProject=" + o.fs.opt.UserProject + } req, err := http.NewRequestWithContext(ctx, "GET", o.url, nil) if err != nil { return nil, err @@ -1252,7 +1293,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op if !o.fs.opt.BucketPolicyOnly { insertObject.PredefinedAcl(o.fs.opt.ObjectACL) } - newObject, err = insertObject.Context(ctx).Do() + insertObject = insertObject.Context(ctx) + if o.fs.opt.UserProject != "" { + insertObject = insertObject.UserProject(o.fs.opt.UserProject) + } + newObject, err = insertObject.Do() return shouldRetry(ctx, err) }) if err != nil { @@ -1267,7 +1312,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op func (o *Object) Remove(ctx context.Context) (err error) { bucket, bucketPath := o.split() err = o.fs.pacer.Call(func() (bool, error) { - err = o.fs.svc.Objects.Delete(bucket, bucketPath).Context(ctx).Do() + deleteBucket := o.fs.svc.Objects.Delete(bucket, bucketPath).Context(ctx) + if o.fs.opt.UserProject != "" { + deleteBucket = deleteBucket.UserProject(o.fs.opt.UserProject) + } + err = deleteBucket.Do() return shouldRetry(ctx, err) }) return err