From d8e9b1a67c667d593c90caea7e5046c77462ed25 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Thu, 15 Aug 2019 16:26:16 +0100 Subject: [PATCH] gcs: make all operations work from the root #3421 --- .../googlecloudstorage/googlecloudstorage.go | 341 +++++++++--------- 1 file changed, 177 insertions(+), 164 deletions(-) diff --git a/backend/googlecloudstorage/googlecloudstorage.go b/backend/googlecloudstorage/googlecloudstorage.go index e78ef7312..ddc162867 100644 --- a/backend/googlecloudstorage/googlecloudstorage.go +++ b/backend/googlecloudstorage/googlecloudstorage.go @@ -23,9 +23,7 @@ import ( "net/http" "os" "path" - "regexp" "strings" - "sync" "time" "github.com/pkg/errors" @@ -38,6 +36,7 @@ import ( "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/walk" + "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/oauthutil" "github.com/rclone/rclone/lib/pacer" "golang.org/x/oauth2" @@ -264,16 +263,16 @@ type Options struct { // Fs represents a remote storage server type Fs struct { - name string // name of this remote - root string // the path we are working on if any - opt Options // parsed options - features *fs.Features // optional features - svc *storage.Service // the connection to the storage server - client *http.Client // authorized client - bucket string // the bucket we are working on - bucketOKMu sync.Mutex // mutex to protect bucket OK - bucketOK bool // true if we have created the bucket - pacer *fs.Pacer // To pace the API calls + name string // name of this remote + root string // the path we are working on if any + opt Options // parsed options + features *fs.Features // optional features + svc *storage.Service // the connection to the storage server + client *http.Client // authorized client + rootBucket string // bucket part of root (if any) + rootDirectory string // directory part of root (if any) + cache *bucket.Cache // cache of bucket status + pacer *fs.Pacer // To pace the API calls } // Object describes a storage object @@ -298,18 +297,18 @@ func (f *Fs) Name() string { // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { - if f.root == "" { - return f.bucket - } - return f.bucket + "/" + f.root + return f.root } // String converts this Fs to a string func (f *Fs) String() string { - if f.root == "" { - return fmt.Sprintf("Storage bucket %s", f.bucket) + if f.rootBucket == "" { + return fmt.Sprintf("GCS root") } - return fmt.Sprintf("Storage bucket %s path %s", f.bucket, f.root) + if f.rootDirectory == "" { + return fmt.Sprintf("GCS bucket %s", f.rootBucket) + } + return fmt.Sprintf("GCS bucket %s path %s", f.rootBucket, f.rootDirectory) } // Features returns the optional features of this Fs @@ -341,21 +340,23 @@ func shouldRetry(err error) (again bool, errOut error) { return again, err } -// Pattern to match a storage path -var matcher = regexp.MustCompile(`^([^/]*)(.*)$`) - -// parseParse parses a storage 'url' -func parsePath(path string) (bucket, directory string, err error) { - parts := matcher.FindStringSubmatch(path) - if parts == nil { - err = errors.Errorf("couldn't find bucket in storage path %q", path) - } else { - bucket, directory = parts[1], parts[2] - directory = strings.Trim(directory, "/") - } +// parsePath parses a remote 'url' +func parsePath(path string) (root string) { + root = strings.Trim(path, "/") return } +// split returns bucket and bucketPath from the rootRelativePath +// relative to f.root +func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) { + return bucket.Split(path.Join(f.root, rootRelativePath)) +} + +// split returns bucket and bucketPath from the object +func (o *Object) split() (bucket, bucketPath string) { + return o.fs.split(o.remote) +} + func getServiceAccountClient(credentialsData []byte) (*http.Client, error) { conf, err := google.JWTConfigFromJSON(credentialsData, storageConfig.Scopes...) if err != nil { @@ -365,6 +366,12 @@ func getServiceAccountClient(credentialsData []byte) (*http.Client, error) { return oauth2.NewClient(ctxWithSpecialClient, conf.TokenSource(ctxWithSpecialClient)), nil } +// setRoot changes the root of the Fs +func (f *Fs) setRoot(root string) { + f.root = parsePath(root) + f.rootBucket, f.rootDirectory = bucket.Split(f.root) +} + // NewFs constructs an Fs from the path, bucket:path func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { var oAuthClient *http.Client @@ -406,22 +413,19 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { } } - bucket, directory, err := parsePath(root) - if err != nil { - return nil, err - } - f := &Fs{ - name: name, - bucket: bucket, - root: directory, - opt: *opt, - pacer: fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(minSleep))), + name: name, + root: root, + opt: *opt, + pacer: fs.NewPacer(pacer.NewGoogleDrive(pacer.MinSleep(minSleep))), + cache: bucket.NewCache(), } + f.setRoot(root) f.features = (&fs.Features{ - ReadMimeType: true, - WriteMimeType: true, - BucketBased: true, + ReadMimeType: true, + WriteMimeType: true, + BucketBased: true, + BucketBasedRootOK: true, }).Fill(f) // Create a new authorized Drive client. @@ -431,20 +435,18 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { return nil, errors.Wrap(err, "couldn't create Google Cloud Storage client") } - if f.root != "" { - f.root += "/" + if f.rootBucket != "" && f.rootDirectory != "" { // Check to see if the object exists err = f.pacer.Call(func() (bool, error) { - _, err = f.svc.Objects.Get(bucket, directory).Do() + _, err = f.svc.Objects.Get(f.rootBucket, f.rootDirectory).Do() return shouldRetry(err) }) if err == nil { - f.root = path.Dir(directory) - if f.root == "." { - f.root = "" - } else { - f.root += "/" + newRoot := path.Dir(f.root) + if newRoot == "." { + newRoot = "" } + f.setRoot(newRoot) // return an error with an fs which points to the parent return f, fs.ErrorIsFile } @@ -485,13 +487,17 @@ type listFn func(remote string, object *storage.Object, isDirectory bool) error // dir is the starting directory, "" for root // // Set recurse to read sub directories -func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) (err error) { - root := f.root - rootLength := len(root) - if dir != "" { - root += dir + "/" +// +// The remote has prefix removed from it and if addBucket is set +// then it adds the bucket to the start. +func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) (err error) { + if prefix != "" { + prefix += "/" } - list := f.svc.Objects.List(f.bucket).Prefix(root).MaxResults(listChunks) + if directory != "" { + directory += "/" + } + list := f.svc.Objects.List(bucket).Prefix(directory).MaxResults(listChunks) if !recurse { list = list.Delimiter("/") } @@ -511,31 +517,36 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) (err } if !recurse { var object storage.Object - for _, prefix := range objects.Prefixes { - if !strings.HasSuffix(prefix, "/") { + for _, remote := range objects.Prefixes { + if !strings.HasSuffix(remote, "/") { continue } - err = fn(prefix[rootLength:len(prefix)-1], &object, true) + if !strings.HasPrefix(remote, prefix) { + fs.Logf(f, "Odd name received %q", remote) + continue + } + remote = remote[len(prefix) : len(remote)-1] + if addBucket { + remote = path.Join(bucket, remote) + } + err = fn(remote, &object, true) if err != nil { return err } } } for _, object := range objects.Items { - if !strings.HasPrefix(object.Name, root) { + if !strings.HasPrefix(object.Name, prefix) { fs.Logf(f, "Odd name received %q", object.Name) continue } - remote := object.Name[rootLength:] + remote := object.Name[len(prefix):] + isDirectory := strings.HasSuffix(remote, "/") + if addBucket { + remote = path.Join(bucket, remote) + } // is this a directory marker? - if (strings.HasSuffix(remote, "/") || remote == "") && object.Size == 0 { - if recurse && remote != "" { - // add a directory in if --fast-list since will have no prefixes - err = fn(remote[:len(remote)-1], object, true) - if err != nil { - return err - } - } + if isDirectory && object.Size == 0 { continue // skip directory marker } err = fn(remote, object, false) @@ -564,19 +575,10 @@ func (f *Fs) itemToDirEntry(remote string, object *storage.Object, isDirectory b return o, nil } -// mark the bucket as being OK -func (f *Fs) markBucketOK() { - if f.bucket != "" { - f.bucketOKMu.Lock() - f.bucketOK = true - f.bucketOKMu.Unlock() - } -} - // listDir lists a single directory -func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) { +func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) { // List the objects - err = f.list(ctx, dir, false, func(remote string, object *storage.Object, isDirectory bool) error { + err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *storage.Object, isDirectory bool) error { entry, err := f.itemToDirEntry(remote, object, isDirectory) if err != nil { return err @@ -590,7 +592,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er return nil, err } // bucket must be present if listing succeeded - f.markBucketOK() + f.cache.MarkOK(bucket) return entries, err } @@ -634,10 +636,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) { // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - if f.bucket == "" { + bucket, directory := f.split(dir) + if bucket == "" { return f.listBuckets(dir) } - return f.listDir(ctx, dir) + return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "") } // ListR lists the objects and directories of the Fs starting @@ -657,22 +660,41 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { - if f.bucket == "" { - return fs.ErrorListBucketRequired - } + bucket, directory := f.split(dir) list := walk.NewListRHelper(callback) - err = f.list(ctx, dir, true, func(remote string, object *storage.Object, isDirectory bool) error { - entry, err := f.itemToDirEntry(remote, object, isDirectory) + listR := func(bucket, directory, prefix string, addBucket bool) error { + return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *storage.Object, isDirectory bool) error { + entry, err := f.itemToDirEntry(remote, object, isDirectory) + if err != nil { + return err + } + return list.Add(entry) + }) + } + if bucket == "" { + entries, err := f.listBuckets("") + if err != nil { + return err + } + for _, entry := range entries { + err = list.Add(entry) + if err != nil { + return err + } + bucket := entry.Remote() + err = listR(bucket, "", f.rootDirectory, true) + if err != nil { + return err + } + } + } else { + err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "") if err != nil { return err } - return list.Add(entry) - }) - if err != nil { - return err } // bucket must be present if listing succeeded - f.markBucketOK() + f.cache.MarkOK(bucket) return list.Flush() } @@ -697,58 +719,50 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt // Mkdir creates the bucket if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) { - f.bucketOKMu.Lock() - defer f.bucketOKMu.Unlock() - if f.bucketOK { - return nil - } - // List something from the bucket to see if it exists. Doing it like this enables the use of a - // service account that only has the "Storage Object Admin" role. See #2193 for details. - - err = f.pacer.Call(func() (bool, error) { - _, err = f.svc.Objects.List(f.bucket).MaxResults(1).Do() - return shouldRetry(err) - }) - if err == nil { - // Bucket already exists - f.bucketOK = true - return nil - } else if gErr, ok := err.(*googleapi.Error); ok { - if gErr.Code != http.StatusNotFound { + bucket, _ := f.split(dir) + return f.cache.Create(bucket, func() error { + // List something from the bucket to see if it exists. Doing it like this enables the use of a + // service account that only has the "Storage Object Admin" role. See #2193 for details. + err = f.pacer.Call(func() (bool, error) { + _, err = f.svc.Objects.List(bucket).MaxResults(1).Do() + return shouldRetry(err) + }) + if err == nil { + // Bucket already exists + return nil + } else if gErr, ok := err.(*googleapi.Error); ok { + if gErr.Code != http.StatusNotFound { + return errors.Wrap(err, "failed to get bucket") + } + } else { return errors.Wrap(err, "failed to get bucket") } - } else { - return errors.Wrap(err, "failed to get bucket") - } - if f.opt.ProjectNumber == "" { - return errors.New("can't make bucket without project number") - } + if f.opt.ProjectNumber == "" { + return errors.New("can't make bucket without project number") + } - bucket := storage.Bucket{ - Name: f.bucket, - Location: f.opt.Location, - StorageClass: f.opt.StorageClass, - } - if f.opt.BucketPolicyOnly { - bucket.IamConfiguration = &storage.BucketIamConfiguration{ - BucketPolicyOnly: &storage.BucketIamConfigurationBucketPolicyOnly{ - Enabled: true, - }, + bucket := storage.Bucket{ + Name: bucket, + Location: f.opt.Location, + StorageClass: f.opt.StorageClass, } - } - err = f.pacer.Call(func() (bool, error) { - insertBucket := f.svc.Buckets.Insert(f.opt.ProjectNumber, &bucket) - if !f.opt.BucketPolicyOnly { - insertBucket.PredefinedAcl(f.opt.BucketACL) + if f.opt.BucketPolicyOnly { + bucket.IamConfiguration = &storage.BucketIamConfiguration{ + BucketPolicyOnly: &storage.BucketIamConfigurationBucketPolicyOnly{ + Enabled: true, + }, + } } - _, err = insertBucket.Do() - return shouldRetry(err) - }) - if err == nil { - f.bucketOK = true - } - return err + return f.pacer.Call(func() (bool, error) { + insertBucket := f.svc.Buckets.Insert(f.opt.ProjectNumber, &bucket) + if !f.opt.BucketPolicyOnly { + insertBucket.PredefinedAcl(f.opt.BucketACL) + } + _, err = insertBucket.Do() + return shouldRetry(err) + }) + }, nil) } // Rmdir deletes the bucket if the fs is at the root @@ -756,19 +770,16 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) { // Returns an error if it isn't empty: Error 409: The bucket you tried // to delete was not empty. func (f *Fs) Rmdir(ctx context.Context, dir string) (err error) { - f.bucketOKMu.Lock() - defer f.bucketOKMu.Unlock() - if f.root != "" || dir != "" { + bucket, directory := f.split(dir) + if bucket == "" || directory != "" { return nil } - err = f.pacer.Call(func() (bool, error) { - err = f.svc.Buckets.Delete(f.bucket).Do() - return shouldRetry(err) + return f.cache.Remove(bucket, func() error { + return f.pacer.Call(func() (bool, error) { + err = f.svc.Buckets.Delete(bucket).Do() + return shouldRetry(err) + }) }) - if err == nil { - f.bucketOK = false - } - return err } // Precision returns the precision @@ -786,6 +797,7 @@ func (f *Fs) Precision() time.Duration { // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + dstBucket, dstPath := f.split(remote) err := f.Mkdir(ctx, "") if err != nil { return nil, err @@ -795,6 +807,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } + srcBucket, srcPath := srcObj.split() // Temporary Object under construction dstObj := &Object{ @@ -802,13 +815,9 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, remote: remote, } - srcBucket := srcObj.fs.bucket - srcObject := srcObj.fs.root + srcObj.remote - dstBucket := f.bucket - dstObject := f.root + remote var newObject *storage.Object err = f.pacer.Call(func() (bool, error) { - newObject, err = f.svc.Objects.Copy(srcBucket, srcObject, dstBucket, dstObject, nil).Do() + newObject, err = f.svc.Objects.Copy(srcBucket, srcPath, dstBucket, dstPath, nil).Do() return shouldRetry(err) }) if err != nil { @@ -898,9 +907,10 @@ func (o *Object) readMetaData() (err error) { if !o.modTime.IsZero() { return nil } + bucket, bucketPath := o.split() var object *storage.Object err = o.fs.pacer.Call(func() (bool, error) { - object, err = o.fs.svc.Objects.Get(o.fs.bucket, o.fs.root+o.remote).Do() + object, err = o.fs.svc.Objects.Get(bucket, bucketPath).Do() return shouldRetry(err) }) if err != nil { @@ -938,14 +948,15 @@ func metadataFromModTime(modTime time.Time) map[string]string { // SetModTime sets the modification time of the local fs object func (o *Object) SetModTime(ctx context.Context, modTime time.Time) (err error) { // This only adds metadata so will perserve other metadata + bucket, bucketPath := o.split() object := storage.Object{ - Bucket: o.fs.bucket, - Name: o.fs.root + o.remote, + Bucket: bucket, + Name: bucketPath, Metadata: metadataFromModTime(modTime), } var newObject *storage.Object err = o.fs.pacer.Call(func() (bool, error) { - newObject, err = o.fs.svc.Objects.Patch(o.fs.bucket, o.fs.root+o.remote, &object).Do() + newObject, err = o.fs.svc.Objects.Patch(bucket, bucketPath, &object).Do() return shouldRetry(err) }) if err != nil { @@ -994,6 +1005,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read // // The new object may have been created if an error is returned func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { + bucket, bucketPath := o.split() err := o.fs.Mkdir(ctx, "") if err != nil { return err @@ -1001,14 +1013,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op modTime := src.ModTime(ctx) object := storage.Object{ - Bucket: o.fs.bucket, - Name: o.fs.root + o.remote, + Bucket: bucket, + Name: bucketPath, ContentType: fs.MimeType(ctx, src), Metadata: metadataFromModTime(modTime), } var newObject *storage.Object err = o.fs.pacer.CallNoRetry(func() (bool, error) { - insertObject := o.fs.svc.Objects.Insert(o.fs.bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name) + insertObject := o.fs.svc.Objects.Insert(bucket, &object).Media(in, googleapi.ContentType("")).Name(object.Name) if !o.fs.opt.BucketPolicyOnly { insertObject.PredefinedAcl(o.fs.opt.ObjectACL) } @@ -1025,8 +1037,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op // Remove an object func (o *Object) Remove(ctx context.Context) (err error) { + bucket, bucketPath := o.split() err = o.fs.pacer.Call(func() (bool, error) { - err = o.fs.svc.Objects.Delete(o.fs.bucket, o.fs.root+o.remote).Do() + err = o.fs.svc.Objects.Delete(bucket, bucketPath).Do() return shouldRetry(err) }) return err