azureblob: empty directory markers #3453
This commit is contained in:
parent
4023eaebe0
commit
f080ec437c
3 changed files with 192 additions and 69 deletions
|
@ -58,6 +58,8 @@ const (
|
|||
decayConstant = 1 // bigger for slower decay, exponential
|
||||
maxListChunkSize = 5000 // number of items to read at once
|
||||
modTimeKey = "mtime"
|
||||
dirMetaKey = "hdi_isfolder"
|
||||
dirMetaValue = "true"
|
||||
timeFormatIn = time.RFC3339
|
||||
timeFormatOut = "2006-01-02T15:04:05.000000000Z07:00"
|
||||
storageDefaultBaseURL = "blob.core.windows.net"
|
||||
|
@ -363,6 +365,18 @@ This option controls how often unused buffers will be removed from the pool.`,
|
|||
},
|
||||
},
|
||||
Advanced: true,
|
||||
}, {
|
||||
Name: "directory_markers",
|
||||
Default: false,
|
||||
Advanced: true,
|
||||
Help: `Upload an empty object with a trailing slash when a new directory is created
|
||||
|
||||
Empty folders are unsupported for bucket based remotes, this option
|
||||
creates an empty object ending with "/", to persist the folder.
|
||||
|
||||
This object also has the metadata "` + dirMetaKey + ` = ` + dirMetaValue + `" to conform to
|
||||
the Microsoft standard.
|
||||
`,
|
||||
}, {
|
||||
Name: "no_check_container",
|
||||
Help: `If set, don't attempt to check the container exists or create it.
|
||||
|
@ -412,6 +426,7 @@ type Options struct {
|
|||
MemoryPoolUseMmap bool `config:"memory_pool_use_mmap"`
|
||||
Enc encoder.MultiEncoder `config:"encoding"`
|
||||
PublicAccess string `config:"public_access"`
|
||||
DirectoryMarkers bool `config:"directory_markers"`
|
||||
NoCheckContainer bool `config:"no_check_container"`
|
||||
NoHeadObject bool `config:"no_head_object"`
|
||||
}
|
||||
|
@ -486,7 +501,7 @@ func parsePath(path string) (root string) {
|
|||
// split returns container and containerPath from the rootRelativePath
|
||||
// relative to f.root
|
||||
func (f *Fs) split(rootRelativePath string) (containerName, containerPath string) {
|
||||
containerName, containerPath = bucket.Split(path.Join(f.root, rootRelativePath))
|
||||
containerName, containerPath = bucket.Split(bucket.Join(f.root, rootRelativePath))
|
||||
return f.opt.Enc.FromStandardName(containerName), f.opt.Enc.FromStandardPath(containerPath)
|
||||
}
|
||||
|
||||
|
@ -664,6 +679,10 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
|||
SetTier: true,
|
||||
GetTier: true,
|
||||
}).Fill(ctx, f)
|
||||
if opt.DirectoryMarkers {
|
||||
f.features.CanHaveEmptyDirectories = true
|
||||
fs.Debugf(f, "Using directory markers")
|
||||
}
|
||||
|
||||
// Client options specifying our own transport
|
||||
policyClientOptions := policy.ClientOptions{
|
||||
|
@ -906,7 +925,7 @@ func (f *Fs) cntSVC(containerName string) (containerClient *container.Client) {
|
|||
// Return an Object from a path
|
||||
//
|
||||
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
||||
func (f *Fs) newObjectWithInfo(remote string, info *container.BlobItem) (fs.Object, error) {
|
||||
func (f *Fs) newObjectWithInfo(ctx context.Context, remote string, info *container.BlobItem) (fs.Object, error) {
|
||||
o := &Object{
|
||||
fs: f,
|
||||
remote: remote,
|
||||
|
@ -917,7 +936,7 @@ func (f *Fs) newObjectWithInfo(remote string, info *container.BlobItem) (fs.Obje
|
|||
return nil, err
|
||||
}
|
||||
} else if !o.fs.opt.NoHeadObject {
|
||||
err := o.readMetaData() // reads info and headers, returning an error
|
||||
err := o.readMetaData(ctx) // reads info and headers, returning an error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -928,7 +947,7 @@ func (f *Fs) newObjectWithInfo(remote string, info *container.BlobItem) (fs.Obje
|
|||
// NewObject finds the Object at remote. If it can't be found
|
||||
// it returns the error fs.ErrorObjectNotFound.
|
||||
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||
return f.newObjectWithInfo(remote, nil)
|
||||
return f.newObjectWithInfo(ctx, remote, nil)
|
||||
}
|
||||
|
||||
// getBlobSVC creates a blob client
|
||||
|
@ -964,31 +983,7 @@ func isDirectoryMarker(size int64, metadata map[string]*string, remote string) b
|
|||
// defacto standard for marking blobs as directories.
|
||||
// Note also that the metadata hasn't been normalised to lower case yet
|
||||
for k, v := range metadata {
|
||||
if v != nil && strings.EqualFold(k, "hdi_isfolder") && *v == "true" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Returns whether file is a directory marker or not using metadata
|
||||
// with pointers to strings as the SDK seems to use both forms rather
|
||||
// annoyingly.
|
||||
//
|
||||
// NB This is a duplicate of isDirectoryMarker
|
||||
func isDirectoryMarkerP(size int64, metadata map[string]*string, remote string) bool {
|
||||
// Directory markers are 0 length
|
||||
if size == 0 {
|
||||
endsWithSlash := strings.HasSuffix(remote, "/")
|
||||
if endsWithSlash || remote == "" {
|
||||
return true
|
||||
}
|
||||
// Note that metadata with hdi_isfolder = true seems to be a
|
||||
// defacto standard for marking blobs as directories.
|
||||
// Note also that the metadata hasn't been normalised to lower case yet
|
||||
for k, pv := range metadata {
|
||||
if strings.EqualFold(k, "hdi_isfolder") && pv != nil && *pv == "true" {
|
||||
if v != nil && strings.EqualFold(k, dirMetaKey) && *v == dirMetaValue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -1033,6 +1028,7 @@ func (f *Fs) list(ctx context.Context, containerName, directory, prefix string,
|
|||
Prefix: &directory,
|
||||
MaxResults: &maxResults,
|
||||
})
|
||||
foundItems := 0
|
||||
for pager.More() {
|
||||
var response container.ListBlobsHierarchyResponse
|
||||
err := f.pacer.Call(func() (bool, error) {
|
||||
|
@ -1051,6 +1047,7 @@ func (f *Fs) list(ctx context.Context, containerName, directory, prefix string,
|
|||
}
|
||||
// Advance marker to next
|
||||
// marker = response.NextMarker
|
||||
foundItems += len(response.Segment.BlobItems)
|
||||
for i := range response.Segment.BlobItems {
|
||||
file := response.Segment.BlobItems[i]
|
||||
// Finish if file name no longer has prefix
|
||||
|
@ -1067,19 +1064,26 @@ func (f *Fs) list(ctx context.Context, containerName, directory, prefix string,
|
|||
continue
|
||||
}
|
||||
remote = remote[len(prefix):]
|
||||
if isDirectoryMarkerP(*file.Properties.ContentLength, file.Metadata, remote) {
|
||||
continue // skip directory marker
|
||||
}
|
||||
isDirectory := isDirectoryMarker(*file.Properties.ContentLength, file.Metadata, remote)
|
||||
if addContainer {
|
||||
remote = path.Join(containerName, remote)
|
||||
}
|
||||
if isDirectory {
|
||||
// Don't insert the root directory
|
||||
if remote == directory {
|
||||
continue
|
||||
}
|
||||
// process directory markers as directories
|
||||
remote = strings.TrimRight(remote, "/")
|
||||
}
|
||||
// Send object
|
||||
err = fn(remote, file, false)
|
||||
err = fn(remote, file, isDirectory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Send the subdirectories
|
||||
foundItems += len(response.Segment.BlobPrefixes)
|
||||
for _, remote := range response.Segment.BlobPrefixes {
|
||||
if remote.Name == nil {
|
||||
fs.Debugf(f, "Nil prefix received")
|
||||
|
@ -1102,16 +1106,26 @@ func (f *Fs) list(ctx context.Context, containerName, directory, prefix string,
|
|||
}
|
||||
}
|
||||
}
|
||||
if f.opt.DirectoryMarkers && foundItems == 0 && directory != "" {
|
||||
// Determine whether the directory exists or not by whether it has a marker
|
||||
_, err := f.readMetaData(ctx, containerName, directory)
|
||||
if err != nil {
|
||||
if err == fs.ErrorObjectNotFound {
|
||||
return fs.ErrorDirNotFound
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Convert a list item into a DirEntry
|
||||
func (f *Fs) itemToDirEntry(remote string, object *container.BlobItem, isDirectory bool) (fs.DirEntry, error) {
|
||||
func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *container.BlobItem, isDirectory bool) (fs.DirEntry, error) {
|
||||
if isDirectory {
|
||||
d := fs.NewDir(remote, time.Time{})
|
||||
return d, nil
|
||||
}
|
||||
o, err := f.newObjectWithInfo(remote, object)
|
||||
o, err := f.newObjectWithInfo(ctx, remote, object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1139,7 +1153,7 @@ func (f *Fs) listDir(ctx context.Context, containerName, directory, prefix strin
|
|||
return nil, fs.ErrorDirNotFound
|
||||
}
|
||||
err = f.list(ctx, containerName, directory, prefix, addContainer, false, int32(f.opt.ListChunkSize), func(remote string, object *container.BlobItem, isDirectory bool) error {
|
||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
||||
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1220,7 +1234,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||
list := walk.NewListRHelper(callback)
|
||||
listR := func(containerName, directory, prefix string, addContainer bool) error {
|
||||
return f.list(ctx, containerName, directory, prefix, addContainer, true, int32(f.opt.ListChunkSize), func(remote string, object *container.BlobItem, isDirectory bool) error {
|
||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
||||
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1314,10 +1328,71 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
|
|||
return f.Put(ctx, in, src, options...)
|
||||
}
|
||||
|
||||
// Create directory marker file and parents
|
||||
func (f *Fs) createDirectoryMarker(ctx context.Context, container, dir string) error {
|
||||
if !f.opt.DirectoryMarkers || container == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Object to be uploaded
|
||||
o := &Object{
|
||||
fs: f,
|
||||
modTime: time.Now(),
|
||||
meta: map[string]string{
|
||||
dirMetaKey: dirMetaValue,
|
||||
},
|
||||
}
|
||||
|
||||
for {
|
||||
_, containerPath := f.split(dir)
|
||||
// Don't create the directory marker if it is the bucket or at the very root
|
||||
if containerPath == "" {
|
||||
break
|
||||
}
|
||||
o.remote = dir + "/"
|
||||
|
||||
// Check to see if object already exists
|
||||
_, err := f.readMetaData(ctx, container, containerPath+"/")
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upload it if not
|
||||
fs.Debugf(o, "Creating directory marker")
|
||||
content := io.Reader(strings.NewReader(""))
|
||||
err = o.Update(ctx, content, o)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating directory marker failed: %w", err)
|
||||
}
|
||||
|
||||
// Now check parent directory exists
|
||||
dir = path.Dir(dir)
|
||||
if dir == "/" || dir == "." {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Mkdir creates the container if it doesn't exist
|
||||
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||
container, _ := f.split(dir)
|
||||
return f.makeContainer(ctx, container)
|
||||
e := f.makeContainer(ctx, container)
|
||||
if e != nil {
|
||||
return e
|
||||
}
|
||||
return f.createDirectoryMarker(ctx, container, dir)
|
||||
}
|
||||
|
||||
// mkdirParent creates the parent bucket/directory if it doesn't exist
|
||||
func (f *Fs) mkdirParent(ctx context.Context, remote string) error {
|
||||
remote = strings.TrimRight(remote, "/")
|
||||
dir := path.Dir(remote)
|
||||
if dir == "/" || dir == "." {
|
||||
dir = ""
|
||||
}
|
||||
return f.Mkdir(ctx, dir)
|
||||
}
|
||||
|
||||
// makeContainer creates the container if it doesn't exist
|
||||
|
@ -1417,6 +1492,18 @@ func (f *Fs) deleteContainer(ctx context.Context, containerName string) error {
|
|||
// Returns an error if it isn't empty
|
||||
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||||
container, directory := f.split(dir)
|
||||
// Remove directory marker file
|
||||
if f.opt.DirectoryMarkers && container != "" && dir != "" {
|
||||
o := &Object{
|
||||
fs: f,
|
||||
remote: dir + "/",
|
||||
}
|
||||
fs.Debugf(o, "Removing directory marker")
|
||||
err := o.Remove(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("removing directory marker failed: %w", err)
|
||||
}
|
||||
}
|
||||
if container == "" || directory != "" {
|
||||
return nil
|
||||
}
|
||||
|
@ -1458,7 +1545,7 @@ func (f *Fs) Purge(ctx context.Context, dir string) error {
|
|||
// If it isn't possible then return fs.ErrorCantCopy
|
||||
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||
dstContainer, dstPath := f.split(remote)
|
||||
err := f.makeContainer(ctx, dstContainer)
|
||||
err := f.mkdirParent(ctx, remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1695,7 +1782,7 @@ func (o *Object) decodeMetaDataFromBlob(info *container.BlobItem) (err error) {
|
|||
} else {
|
||||
size = *info.Properties.ContentLength
|
||||
}
|
||||
if isDirectoryMarkerP(size, metadata, o.remote) {
|
||||
if isDirectoryMarker(size, metadata, o.remote) {
|
||||
return fs.ErrorNotAFile
|
||||
}
|
||||
// NOTE - Client library always returns MD5 as base64 decoded string, Object needs to maintain
|
||||
|
@ -1733,6 +1820,29 @@ func (o *Object) clearMetaData() {
|
|||
o.modTime = time.Time{}
|
||||
}
|
||||
|
||||
// readMetaData gets the metadata if it hasn't already been fetched
|
||||
func (f *Fs) readMetaData(ctx context.Context, container, containerPath string) (blobProperties blob.GetPropertiesResponse, err error) {
|
||||
if !f.containerOK(container) {
|
||||
return blobProperties, fs.ErrorObjectNotFound
|
||||
}
|
||||
blb := f.getBlobSVC(container, containerPath)
|
||||
|
||||
// Read metadata (this includes metadata)
|
||||
options := blob.GetPropertiesOptions{}
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
blobProperties, err = blb.GetProperties(ctx, &options)
|
||||
return f.shouldRetry(ctx, err)
|
||||
})
|
||||
if err != nil {
|
||||
// On directories - GetProperties does not work and current SDK does not populate service code correctly hence check regular http response as well
|
||||
if storageErr, ok := err.(*azcore.ResponseError); ok && (storageErr.ErrorCode == string(bloberror.BlobNotFound) || storageErr.StatusCode == http.StatusNotFound) {
|
||||
return blobProperties, fs.ErrorObjectNotFound
|
||||
}
|
||||
return blobProperties, err
|
||||
}
|
||||
return blobProperties, nil
|
||||
}
|
||||
|
||||
// readMetaData gets the metadata if it hasn't already been fetched
|
||||
//
|
||||
// Sets
|
||||
|
@ -1741,33 +1851,15 @@ func (o *Object) clearMetaData() {
|
|||
// o.modTime
|
||||
// o.size
|
||||
// o.md5
|
||||
func (o *Object) readMetaData() (err error) {
|
||||
container, _ := o.split()
|
||||
if !o.fs.containerOK(container) {
|
||||
return fs.ErrorObjectNotFound
|
||||
}
|
||||
func (o *Object) readMetaData(ctx context.Context) (err error) {
|
||||
if !o.modTime.IsZero() {
|
||||
return nil
|
||||
}
|
||||
blb := o.getBlobSVC()
|
||||
// fs.Debugf(o, "Blob URL = %q", blb.URL())
|
||||
|
||||
// Read metadata (this includes metadata)
|
||||
options := blob.GetPropertiesOptions{}
|
||||
ctx := context.Background()
|
||||
var blobProperties blob.GetPropertiesResponse
|
||||
err = o.fs.pacer.Call(func() (bool, error) {
|
||||
blobProperties, err = blb.GetProperties(ctx, &options)
|
||||
return o.fs.shouldRetry(ctx, err)
|
||||
})
|
||||
container, containerPath := o.split()
|
||||
blobProperties, err := o.fs.readMetaData(ctx, container, containerPath)
|
||||
if err != nil {
|
||||
// On directories - GetProperties does not work and current SDK does not populate service code correctly hence check regular http response as well
|
||||
if storageErr, ok := err.(*azcore.ResponseError); ok && (storageErr.ErrorCode == string(bloberror.BlobNotFound) || storageErr.StatusCode == http.StatusNotFound) {
|
||||
return fs.ErrorObjectNotFound
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return o.decodeMetaDataFromPropertiesResponse(&blobProperties)
|
||||
}
|
||||
|
||||
|
@ -1777,7 +1869,7 @@ func (o *Object) readMetaData() (err error) {
|
|||
// LastModified returned in the http headers
|
||||
func (o *Object) ModTime(ctx context.Context) (result time.Time) {
|
||||
// The error is logged in readMetaData
|
||||
_ = o.readMetaData()
|
||||
_ = o.readMetaData(ctx)
|
||||
return o.modTime
|
||||
}
|
||||
|
||||
|
@ -2123,12 +2215,17 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
if container == "" || containerPath == "" {
|
||||
return fmt.Errorf("can't upload to root - need a container")
|
||||
}
|
||||
err = o.fs.makeContainer(ctx, container)
|
||||
if err != nil {
|
||||
return err
|
||||
// Create parent dir/bucket if not saving directory marker
|
||||
_, isDirMarker := o.meta[dirMetaKey]
|
||||
if !isDirMarker {
|
||||
err = o.fs.mkdirParent(ctx, o.remote)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Update Mod time
|
||||
fs.Debugf(nil, "o.meta = %+v", o.meta)
|
||||
o.updateMetadataWithModTime(src.ModTime(ctx))
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -2176,6 +2273,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
size := src.Size()
|
||||
multipartUpload := size < 0 || size > o.fs.poolSize
|
||||
|
||||
fs.Debugf(nil, "o.meta = %+v", o.meta)
|
||||
if multipartUpload {
|
||||
err = o.uploadMultipart(ctx, in, size, blb, &httpHeaders)
|
||||
} else {
|
||||
|
@ -2186,10 +2284,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
|||
}
|
||||
|
||||
// Refresh metadata on object
|
||||
o.clearMetaData()
|
||||
err = o.readMetaData()
|
||||
if err != nil {
|
||||
return err
|
||||
if !isDirMarker {
|
||||
o.clearMetaData()
|
||||
err = o.readMetaData(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If tier is not changed or not specified, do not attempt to invoke `SetBlobTier` operation
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/rclone/rclone/fs"
|
||||
"github.com/rclone/rclone/fstest"
|
||||
"github.com/rclone/rclone/fstest/fstests"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
@ -25,6 +26,25 @@ func TestIntegration(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
// TestIntegration2 runs integration tests against the remote
|
||||
func TestIntegration2(t *testing.T) {
|
||||
if *fstest.RemoteName != "" {
|
||||
t.Skip("Skipping as -remote set")
|
||||
}
|
||||
name := "TestAzureBlob:"
|
||||
fstests.Run(t, &fstests.Opt{
|
||||
RemoteName: name,
|
||||
NilObject: (*Object)(nil),
|
||||
TiersToTest: []string{"Hot", "Cool"},
|
||||
ChunkedUpload: fstests.ChunkedUploadConfig{
|
||||
MinChunkSize: defaultChunkSize,
|
||||
},
|
||||
ExtraConfig: []fstests.ExtraConfigItem{
|
||||
{Name: name, Key: "directory_markers", Value: "true"},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) {
|
||||
return f.setUploadChunkSize(cs)
|
||||
}
|
||||
|
|
|
@ -305,6 +305,9 @@ backends:
|
|||
- backend: "azureblob"
|
||||
remote: "TestAzureBlob:"
|
||||
fastlist: true
|
||||
- backend: "azureblob"
|
||||
remote: "TestAzureBlob,directory_markers:"
|
||||
fastlist: true
|
||||
- backend: "pcloud"
|
||||
remote: "TestPcloud:"
|
||||
fastlist: true
|
||||
|
|
Loading…
Reference in a new issue