gcs: empty directory markers #3453

- Report correct feature flag
- Fix test failures due to that
- don't output the root directory marker
- Don't create the directory marker if it is the bucket or root
- Create directories when uploading files
This commit is contained in:
Nick Craig-Wood 2023-04-26 17:53:48 +01:00
parent e0c445d36e
commit 066e00b470
2 changed files with 110 additions and 32 deletions

View file

@ -305,10 +305,10 @@ Docs: https://cloud.google.com/storage/docs/bucket-policy-only
Name: "directory_markers", Name: "directory_markers",
Default: false, Default: false,
Advanced: true, Advanced: true,
Help: `Upload an empty object with a trailing slash in name when new directory is created Help: `Upload an empty object with a trailing slash when a new directory is created
Empty folders are unsupported for bucket based remotes, this option creates an empty Empty folders are unsupported for bucket based remotes, this option creates an empty
object named "/", to persist the folder. object ending with "/", to persist the folder.
`, `,
}, { }, {
Name: "no_check_bucket", Name: "no_check_bucket",
@ -557,6 +557,9 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
BucketBased: true, BucketBased: true,
BucketBasedRootOK: true, BucketBasedRootOK: true,
}).Fill(ctx, f) }).Fill(ctx, f)
if opt.DirectoryMarkers {
f.features.CanHaveEmptyDirectories = true
}
// Create a new authorized Drive client. // Create a new authorized Drive client.
f.client = oAuthClient f.client = oAuthClient
@ -643,6 +646,7 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
if !recurse { if !recurse {
list = list.Delimiter("/") list = list.Delimiter("/")
} }
foundItems := 0
for { for {
var objects *storage.Objects var objects *storage.Objects
err = f.pacer.Call(func() (bool, error) { err = f.pacer.Call(func() (bool, error) {
@ -658,6 +662,7 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
return err return err
} }
if !recurse { if !recurse {
foundItems += len(objects.Prefixes)
var object storage.Object var object storage.Object
for _, remote := range objects.Prefixes { for _, remote := range objects.Prefixes {
if !strings.HasSuffix(remote, "/") { if !strings.HasSuffix(remote, "/") {
@ -678,6 +683,7 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
} }
} }
} }
foundItems += len(objects.Items)
for _, object := range objects.Items { for _, object := range objects.Items {
remote := f.opt.Enc.ToStandardPath(object.Name) remote := f.opt.Enc.ToStandardPath(object.Name)
if !strings.HasPrefix(remote, prefix) { if !strings.HasPrefix(remote, prefix) {
@ -691,9 +697,15 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
} }
// is this a directory marker? // is this a directory marker?
if isDirectory { if isDirectory {
continue // skip directory marker // Don't insert the root directory
if remote == directory {
continue
} }
err = fn(remote, object, false) // process directory markers as directories
remote = strings.TrimRight(remote, "/")
}
err = fn(remote, object, isDirectory)
if err != nil { if err != nil {
return err return err
} }
@ -703,6 +715,17 @@ func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBuck
} }
list.PageToken(objects.NextPageToken) list.PageToken(objects.NextPageToken)
} }
if f.opt.DirectoryMarkers && foundItems == 0 && directory != "" {
// Determine whether the directory exists or not by whether it has a marker
_, err := f.readObjectInfo(ctx, bucket, directory)
if err != nil {
if err == fs.ErrorObjectNotFound {
return fs.ErrorDirNotFound
}
return err
}
}
return nil return nil
} }
@ -866,27 +889,68 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
return f.Put(ctx, in, src, options...) return f.Put(ctx, in, src, options...)
} }
// Create directory marker file and parents
func (f *Fs) createDirectoryMarker(ctx context.Context, bucket, dir string) error {
if !f.opt.DirectoryMarkers || bucket == "" {
return nil
}
// Object to be uploaded
o := &Object{
fs: f,
}
for {
_, bucketPath := f.split(dir)
// Don't create the directory marker if it is the bucket or at the very root
if bucketPath == "" {
break
}
o.remote = dir + "/"
// Check to see if object already exists
_, err := o.readObjectInfo(ctx)
if err == nil {
return nil
}
// Upload it if not
fs.Debugf(o, "Creating directory marker")
content := io.Reader(strings.NewReader(""))
_, err = f.Put(ctx, content, o)
if err != nil {
return fmt.Errorf("creating directory marker failed: %w", err)
}
// Now check parent directory exists
dir = path.Dir(dir)
if dir == "/" || dir == "." {
break
}
}
return nil
}
// Mkdir creates the bucket if it doesn't exist // Mkdir creates the bucket if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) { func (f *Fs) Mkdir(ctx context.Context, dir string) (err error) {
bucket, _ := f.split(dir) bucket, _ := f.split(dir)
e := f.makeBucket(ctx, bucket) e := f.checkBucket(ctx, bucket)
if e != nil { if e != nil {
return e return e
} }
// Create directory marker file return f.createDirectoryMarker(ctx, bucket, dir)
if f.opt.DirectoryMarkers && bucket != "" && dir != "" {
markerFilePath := fmt.Sprintf("%s/", dir) }
markerFileContent := io.Reader(strings.NewReader(""))
markerFileObject := &Object{ // mkdirParent creates the parent bucket/directory if it doesn't exist
fs: f, func (f *Fs) mkdirParent(ctx context.Context, remote string) error {
remote: markerFilePath, remote = strings.TrimRight(remote, "/")
dir := path.Dir(remote)
if dir == "/" || dir == "." {
dir = ""
} }
_, e := f.Put(ctx, markerFileContent, markerFileObject) return f.Mkdir(ctx, dir)
if e != nil {
return e
}
}
return nil
} }
// makeBucket creates the bucket if it doesn't exist // makeBucket creates the bucket if it doesn't exist
@ -960,12 +1024,15 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) (err error) {
bucket, directory := f.split(dir) bucket, directory := f.split(dir)
// Remove directory marker file // Remove directory marker file
if f.opt.DirectoryMarkers && bucket != "" && dir != "" { if f.opt.DirectoryMarkers && bucket != "" && dir != "" {
markerFilePath := fmt.Sprintf("%s/", dir) o := &Object{
markerFileObject := &Object{
fs: f, fs: f,
remote: markerFilePath, remote: dir + "/",
}
fs.Debugf(o, "Removing directory marker")
err := o.Remove(ctx)
if err != nil {
return fmt.Errorf("removing directory marker failed: %w", err)
} }
_ = markerFileObject.Remove(ctx)
} }
if bucket == "" || directory != "" { if bucket == "" || directory != "" {
return nil return nil
@ -998,7 +1065,7 @@ func (f *Fs) Precision() time.Duration {
// If it isn't possible then return fs.ErrorCantCopy // If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
dstBucket, dstPath := f.split(remote) dstBucket, dstPath := f.split(remote)
err := f.checkBucket(ctx, dstBucket) err := f.mkdirParent(ctx, remote)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1136,10 +1203,15 @@ func (o *Object) setMetaData(info *storage.Object) {
// readObjectInfo reads the definition for an object // readObjectInfo reads the definition for an object
func (o *Object) readObjectInfo(ctx context.Context) (object *storage.Object, err error) { func (o *Object) readObjectInfo(ctx context.Context) (object *storage.Object, err error) {
bucket, bucketPath := o.split() bucket, bucketPath := o.split()
err = o.fs.pacer.Call(func() (bool, error) { return o.fs.readObjectInfo(ctx, bucket, bucketPath)
get := o.fs.svc.Objects.Get(bucket, bucketPath).Context(ctx) }
if o.fs.opt.UserProject != "" {
get = get.UserProject(o.fs.opt.UserProject) // readObjectInfo reads the definition for an object
func (f *Fs) readObjectInfo(ctx context.Context, bucket, bucketPath string) (object *storage.Object, err error) {
err = f.pacer.Call(func() (bool, error) {
get := f.svc.Objects.Get(bucket, bucketPath).Context(ctx)
if f.opt.UserProject != "" {
get = get.UserProject(f.opt.UserProject)
} }
object, err = get.Do() object, err = get.Do()
return shouldRetry(ctx, err) return shouldRetry(ctx, err)
@ -1280,12 +1352,15 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
// Update the object with the contents of the io.Reader, modTime and size // Update the object with the contents of the io.Reader, modTime and size
// //
// The new object may have been created if an error is returned // The new object may have been created if an error is returned
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (err error) {
bucket, bucketPath := o.split() bucket, bucketPath := o.split()
err := o.fs.checkBucket(ctx, bucket) // Create parent dir/bucket if not saving directory marker
if !strings.HasSuffix(o.remote, "/") {
err = o.fs.mkdirParent(ctx, o.remote)
if err != nil { if err != nil {
return err return err
} }
}
modTime := src.ModTime(ctx) modTime := src.ModTime(ctx)
object := storage.Object{ object := storage.Object{

View file

@ -129,6 +129,9 @@ backends:
- backend: "googlecloudstorage" - backend: "googlecloudstorage"
remote: "TestGoogleCloudStorage:" remote: "TestGoogleCloudStorage:"
fastlist: true fastlist: true
- backend: "googlecloudstorage"
remote: "TestGoogleCloudStorage,directory_markers:"
fastlist: true
- backend: "googlephotos" - backend: "googlephotos"
remote: "TestGooglePhotos:" remote: "TestGooglePhotos:"
tests: tests: