googlecloudstorage: added gcs requester pays

This commit is contained in:
Christopher Merry 2023-03-12 01:59:21 -07:00 committed by Nick Craig-Wood
parent f5bf0a48f3
commit 0361acbde4

View file

@ -93,6 +93,9 @@ func init() {
Options: append(oauthutil.SharedOptions, []fs.Option{{ Options: append(oauthutil.SharedOptions, []fs.Option{{
Name: "project_number", Name: "project_number",
Help: "Project number.\n\nOptional - needed only for list/create/delete buckets - see your developer console.", Help: "Project number.\n\nOptional - needed only for list/create/delete buckets - see your developer console.",
}, {
Name: "user_project",
Help: "User project.\n\nOptional - needed only for requester pays.",
}, { }, {
Name: "service_account_file", Name: "service_account_file",
Help: "Service Account Credentials JSON file path.\n\nLeave blank normally.\nNeeded only if you want use SA instead of interactive login." + env.ShellExpandHelp, Help: "Service Account Credentials JSON file path.\n\nLeave blank normally.\nNeeded only if you want use SA instead of interactive login." + env.ShellExpandHelp,
@ -349,6 +352,7 @@ can't check the size and hash but the file contents will be decompressed.
// Options defines the configuration for this backend // Options defines the configuration for this backend
type Options struct { type Options struct {
ProjectNumber string `config:"project_number"` ProjectNumber string `config:"project_number"`
UserProject string `config:"user_project"`
ServiceAccountFile string `config:"service_account_file"` ServiceAccountFile string `config:"service_account_file"`
ServiceAccountCredentials string `config:"service_account_credentials"` ServiceAccountCredentials string `config:"service_account_credentials"`
Anonymous bool `config:"anonymous"` Anonymous bool `config:"anonymous"`
@ -559,7 +563,11 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
// Check to see if the object exists // Check to see if the object exists
encodedDirectory := f.opt.Enc.FromStandardPath(f.rootDirectory) encodedDirectory := f.opt.Enc.FromStandardPath(f.rootDirectory)
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
_, err = f.svc.Objects.Get(f.rootBucket, encodedDirectory).Context(ctx).Do() get := f.svc.Objects.Get(f.rootBucket, encodedDirectory).Context(ctx)
if f.opt.UserProject != "" {
get = get.UserProject(f.opt.UserProject)
}
_, err = get.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
if err == nil { if err == nil {
@ -619,6 +627,9 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
directory += "/" directory += "/"
} }
list := f.svc.Objects.List(bucket).Prefix(directory).MaxResults(listChunks) list := f.svc.Objects.List(bucket).Prefix(directory).MaxResults(listChunks)
if f.opt.UserProject != "" {
list = list.UserProject(f.opt.UserProject)
}
if !recurse { if !recurse {
list = list.Delimiter("/") list = list.Delimiter("/")
} }
@ -725,6 +736,9 @@ func (f *Fs) listBuckets(ctx context.Context) (entries fs.DirEntries, err error)
return nil, errors.New("can't list buckets without project number") return nil, errors.New("can't list buckets without project number")
} }
listBuckets := f.svc.Buckets.List(f.opt.ProjectNumber).MaxResults(listChunks) listBuckets := f.svc.Buckets.List(f.opt.ProjectNumber).MaxResults(listChunks)
if f.opt.UserProject != "" {
listBuckets = listBuckets.UserProject(f.opt.UserProject)
}
for { for {
var buckets *storage.Buckets var buckets *storage.Buckets
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
@ -854,7 +868,11 @@ func (f *Fs) makeBucket(ctx context.Context, bucket string) (err error) {
// List something from the bucket to see if it exists. Doing it like this enables the use of a // List something from the bucket to see if it exists. Doing it like this enables the use of a
// service account that only has the "Storage Object Admin" role. See #2193 for details. // service account that only has the "Storage Object Admin" role. See #2193 for details.
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
_, err = f.svc.Objects.List(bucket).MaxResults(1).Context(ctx).Do() list := f.svc.Objects.List(bucket).MaxResults(1).Context(ctx)
if f.opt.UserProject != "" {
list = list.UserProject(f.opt.UserProject)
}
_, err = list.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
if err == nil { if err == nil {
@ -889,7 +907,11 @@ func (f *Fs) makeBucket(ctx context.Context, bucket string) (err error) {
if !f.opt.BucketPolicyOnly { if !f.opt.BucketPolicyOnly {
insertBucket.PredefinedAcl(f.opt.BucketACL) insertBucket.PredefinedAcl(f.opt.BucketACL)
} }
_, err = insertBucket.Context(ctx).Do() insertBucket = insertBucket.Context(ctx)
if f.opt.UserProject != "" {
insertBucket = insertBucket.UserProject(f.opt.UserProject)
}
_, err = insertBucket.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
}, nil) }, nil)
@ -914,7 +936,11 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) (err error) {
} }
return f.cache.Remove(bucket, func() error { return f.cache.Remove(bucket, func() error {
return f.pacer.Call(func() (bool, error) { return f.pacer.Call(func() (bool, error) {
err = f.svc.Buckets.Delete(bucket).Context(ctx).Do() deleteBucket := f.svc.Buckets.Delete(bucket).Context(ctx)
if f.opt.UserProject != "" {
deleteBucket = deleteBucket.UserProject(f.opt.UserProject)
}
err = deleteBucket.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
}) })
@ -960,7 +986,11 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
var rewriteResponse *storage.RewriteResponse var rewriteResponse *storage.RewriteResponse
for { for {
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
rewriteResponse, err = rewriteRequest.Context(ctx).Do() rewriteRequest = rewriteRequest.Context(ctx)
if f.opt.UserProject != "" {
rewriteRequest.UserProject(f.opt.UserProject)
}
rewriteResponse, err = rewriteRequest.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
if err != nil { if err != nil {
@ -1071,7 +1101,11 @@ func (o *Object) setMetaData(info *storage.Object) {
func (o *Object) readObjectInfo(ctx context.Context) (object *storage.Object, err error) { func (o *Object) readObjectInfo(ctx context.Context) (object *storage.Object, err error) {
bucket, bucketPath := o.split() bucket, bucketPath := o.split()
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
object, err = o.fs.svc.Objects.Get(bucket, bucketPath).Context(ctx).Do() get := o.fs.svc.Objects.Get(bucket, bucketPath).Context(ctx)
if o.fs.opt.UserProject != "" {
get = get.UserProject(o.fs.opt.UserProject)
}
object, err = get.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
if err != nil { if err != nil {
@ -1143,7 +1177,11 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) (err error)
if !o.fs.opt.BucketPolicyOnly { if !o.fs.opt.BucketPolicyOnly {
copyObject.DestinationPredefinedAcl(o.fs.opt.ObjectACL) copyObject.DestinationPredefinedAcl(o.fs.opt.ObjectACL)
} }
newObject, err = copyObject.Context(ctx).Do() copyObject = copyObject.Context(ctx)
if o.fs.opt.UserProject != "" {
copyObject = copyObject.UserProject(o.fs.opt.UserProject)
}
newObject, err = copyObject.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
if err != nil { if err != nil {
@ -1160,6 +1198,9 @@ func (o *Object) Storable() bool {
// Open an object for read // Open an object for read
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) { func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
if o.fs.opt.UserProject != "" {
o.url = o.url + "&userProject=" + o.fs.opt.UserProject
}
req, err := http.NewRequestWithContext(ctx, "GET", o.url, nil) req, err := http.NewRequestWithContext(ctx, "GET", o.url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1252,7 +1293,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
if !o.fs.opt.BucketPolicyOnly { if !o.fs.opt.BucketPolicyOnly {
insertObject.PredefinedAcl(o.fs.opt.ObjectACL) insertObject.PredefinedAcl(o.fs.opt.ObjectACL)
} }
newObject, err = insertObject.Context(ctx).Do() insertObject = insertObject.Context(ctx)
if o.fs.opt.UserProject != "" {
insertObject = insertObject.UserProject(o.fs.opt.UserProject)
}
newObject, err = insertObject.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
if err != nil { if err != nil {
@ -1267,7 +1312,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
func (o *Object) Remove(ctx context.Context) (err error) { func (o *Object) Remove(ctx context.Context) (err error) {
bucket, bucketPath := o.split() bucket, bucketPath := o.split()
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
err = o.fs.svc.Objects.Delete(bucket, bucketPath).Context(ctx).Do() deleteBucket := o.fs.svc.Objects.Delete(bucket, bucketPath).Context(ctx)
if o.fs.opt.UserProject != "" {
deleteBucket = deleteBucket.UserProject(o.fs.opt.UserProject)
}
err = deleteBucket.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
}) })
return err return err