From 8a0775ce3c8a45890cdb8b17366e4b5134d8afe7 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Fri, 16 Aug 2019 10:10:56 +0100 Subject: [PATCH] azureblob: make all operations work from the root #3421 --- backend/azureblob/azureblob.go | 431 +++++++++++++++------------------ 1 file changed, 200 insertions(+), 231 deletions(-) diff --git a/backend/azureblob/azureblob.go b/backend/azureblob/azureblob.go index d6c8f44f6..18dff3cc7 100644 --- a/backend/azureblob/azureblob.go +++ b/backend/azureblob/azureblob.go @@ -13,11 +13,9 @@ import ( "encoding/hex" "fmt" "io" - "log" "net/http" "net/url" "path" - "regexp" "strconv" "strings" "sync" @@ -34,6 +32,7 @@ import ( "github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/walk" + "github.com/rclone/rclone/lib/bucket" "github.com/rclone/rclone/lib/pacer" ) @@ -144,19 +143,20 @@ type Options struct { // Fs represents a remote azure server type Fs struct { - name string // name of this remote - root string // the path we are working on if any - opt Options // parsed config options - features *fs.Features // optional features - client *http.Client // http client we are using - svcURL *azblob.ServiceURL // reference to serviceURL - cntURL *azblob.ContainerURL // reference to containerURL - container string // the container we are working on - containerOKMu sync.Mutex // mutex to protect container OK - containerOK bool // true if we have created the container - containerDeleted bool // true if we have deleted the container - pacer *fs.Pacer // To pace and retry the API calls - uploadToken *pacer.TokenDispenser // control concurrency + name string // name of this remote + root string // the path we are working on if any + opt Options // parsed config options + features *fs.Features // optional features + client *http.Client // http client we are using + svcURL *azblob.ServiceURL // reference to serviceURL + cntURLcacheMu sync.Mutex // mutex to protect cntURLcache + cntURLcache map[string]*azblob.ContainerURL // reference to containerURL per container + rootContainer string // container part of root (if any) + rootDirectory string // directory part of root (if any) + isLimited bool // if limited to one container + cache *bucket.Cache // cache for container creation status + pacer *fs.Pacer // To pace and retry the API calls + uploadToken *pacer.TokenDispenser // control concurrency } // Object describes a azure object @@ -180,18 +180,18 @@ func (f *Fs) Name() string { // Root of the remote (as passed into NewFs) func (f *Fs) Root() string { - if f.root == "" { - return f.container - } - return f.container + "/" + f.root + return f.root } // String converts this Fs to a string func (f *Fs) String() string { - if f.root == "" { - return fmt.Sprintf("Azure container %s", f.container) + if f.rootContainer == "" { + return fmt.Sprintf("Azure root") } - return fmt.Sprintf("Azure container %s path %s", f.container, f.root) + if f.rootDirectory == "" { + return fmt.Sprintf("Azure container %s", f.rootContainer) + } + return fmt.Sprintf("Azure container %s path %s", f.rootContainer, f.rootDirectory) } // Features returns the optional features of this Fs @@ -199,21 +199,23 @@ func (f *Fs) Features() *fs.Features { return f.features } -// Pattern to match a azure path -var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`) - -// parseParse parses a azure 'url' -func parsePath(path string) (container, directory string, err error) { - parts := matcher.FindStringSubmatch(path) - if parts == nil { - err = errors.Errorf("couldn't find container in azure path %q", path) - } else { - container, directory = parts[1], parts[2] - directory = strings.Trim(directory, "/") - } +// parsePath parses a remote 'url' +func parsePath(path string) (root string) { + root = strings.Trim(path, "/") return } +// split returns container and containerPath from the rootRelativePath +// relative to f.root +func (f *Fs) split(rootRelativePath string) (containerName, containerPath string) { + return bucket.Split(path.Join(f.root, rootRelativePath)) +} + +// split returns container and containerPath from the object +func (o *Object) split() (container, containerPath string) { + return o.fs.split(o.remote) +} + // validateAccessTier checks if azureblob supports user supplied tier func validateAccessTier(tier string) bool { switch tier { @@ -318,6 +320,12 @@ func (f *Fs) newPipeline(c azblob.Credential, o azblob.PipelineOptions) pipeline return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log}) } +// setRoot changes the root of the Fs +func (f *Fs) setRoot(root string) { + f.root = parsePath(root) + f.rootContainer, f.rootDirectory = bucket.Split(f.root) +} + // NewFs constructs an Fs from the path, container:path func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { ctx := context.Background() @@ -339,10 +347,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { if opt.ListChunkSize > maxListChunkSize { return nil, errors.Errorf("azure: blob list size can't be greater than %v - was %v", maxListChunkSize, opt.ListChunkSize) } - container, directory, err := parsePath(root) - if err != nil { - return nil, err - } if opt.Endpoint == "" { opt.Endpoint = storageDefaultBaseURL } @@ -357,24 +361,25 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { f := &Fs{ name: name, opt: *opt, - container: container, - root: directory, pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), client: fshttp.NewClient(fs.Config), + cache: bucket.NewCache(), + cntURLcache: make(map[string]*azblob.ContainerURL, 1), } + f.setRoot(root) f.features = (&fs.Features{ - ReadMimeType: true, - WriteMimeType: true, - BucketBased: true, - SetTier: true, - GetTier: true, + ReadMimeType: true, + WriteMimeType: true, + BucketBased: true, + BucketBasedRootOK: true, + SetTier: true, + GetTier: true, }).Fill(f) var ( - u *url.URL - serviceURL azblob.ServiceURL - containerURL azblob.ContainerURL + u *url.URL + serviceURL azblob.ServiceURL ) switch { case opt.UseEmulator: @@ -388,7 +393,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { } pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}}) serviceURL = azblob.NewServiceURL(*u, pipeline) - containerURL = serviceURL.NewContainerURL(container) case opt.Account != "" && opt.Key != "": credential, err := azblob.NewSharedKeyCredential(opt.Account, opt.Key) if err != nil { @@ -401,7 +405,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { } pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}}) serviceURL = azblob.NewServiceURL(*u, pipeline) - containerURL = serviceURL.NewContainerURL(container) case opt.SASURL != "": u, err = url.Parse(opt.SASURL) if err != nil { @@ -412,38 +415,30 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { // Check if we have container level SAS or account level sas parts := azblob.NewBlobURLParts(*u) if parts.ContainerName != "" { - if container != "" && parts.ContainerName != container { + if f.rootContainer != "" && parts.ContainerName != f.rootContainer { return nil, errors.New("Container name in SAS URL and container provided in command do not match") } - - f.container = parts.ContainerName - containerURL = azblob.NewContainerURL(*u, pipeline) + containerURL := azblob.NewContainerURL(*u, pipeline) + f.cntURLcache[parts.ContainerName] = &containerURL + f.isLimited = true } else { serviceURL = azblob.NewServiceURL(*u, pipeline) - containerURL = serviceURL.NewContainerURL(container) } default: return nil, errors.New("Need account+key or connectionString or sasURL") } f.svcURL = &serviceURL - f.cntURL = &containerURL - if f.root != "" { - f.root += "/" + if f.rootContainer != "" && f.rootDirectory != "" { // Check to see if the (container,directory) is actually an existing file oldRoot := f.root - remote := path.Base(directory) - f.root = path.Dir(directory) - if f.root == "." { - f.root = "" - } else { - f.root += "/" - } - _, err := f.NewObject(ctx, remote) + newRoot, leaf := path.Split(oldRoot) + f.setRoot(newRoot) + _, err := f.NewObject(ctx, leaf) if err != nil { if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile { // File doesn't exist or is a directory so return old f - f.root = oldRoot + f.setRoot(oldRoot) return f, nil } return nil, err @@ -454,6 +449,20 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { return f, nil } +// return the container URL for the container passed in +func (f *Fs) cntURL(container string) (containerURL *azblob.ContainerURL) { + f.cntURLcacheMu.Lock() + defer f.cntURLcacheMu.Unlock() + var ok bool + if containerURL, ok = f.cntURLcache[container]; !ok { + cntURL := f.svcURL.NewContainerURL(container) + containerURL = &cntURL + f.cntURLcache[container] = containerURL + } + return containerURL + +} + // Return an Object from a path // // If it can't be found it returns the error fs.ErrorObjectNotFound. @@ -483,8 +492,8 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { } // getBlobReference creates an empty blob reference with no metadata -func (f *Fs) getBlobReference(remote string) azblob.BlobURL { - return f.cntURL.NewBlobURL(f.root + remote) +func (f *Fs) getBlobReference(container, containerPath string) azblob.BlobURL { + return f.cntURL(container).NewBlobURL(containerPath) } // updateMetadataWithModTime adds the modTime passed in to o.meta. @@ -520,16 +529,18 @@ type listFn func(remote string, object *azblob.BlobItem, isDirectory bool) error // the container and root supplied // // dir is the starting directory, "" for root -func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint, fn listFn) error { - f.containerOKMu.Lock() - deleted := f.containerDeleted - f.containerOKMu.Unlock() - if deleted { +// +// The remote has prefix removed from it and if addContainer is set then +// it adds the container to the start. +func (f *Fs) list(ctx context.Context, container, directory, prefix string, addContainer bool, recurse bool, maxResults uint, fn listFn) error { + if f.cache.IsDeleted(container) { return fs.ErrorDirNotFound } - root := f.root - if dir != "" { - root += dir + "/" + if prefix != "" { + prefix += "/" + } + if directory != "" { + directory += "/" } delimiter := "" if !recurse { @@ -544,15 +555,14 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint UncommittedBlobs: false, Deleted: false, }, - Prefix: root, + Prefix: directory, MaxResults: int32(maxResults), } - directoryMarkers := map[string]struct{}{} for marker := (azblob.Marker{}); marker.NotDone(); { var response *azblob.ListBlobsHierarchySegmentResponse err := f.pacer.Call(func() (bool, error) { var err error - response, err = f.cntURL.ListBlobsHierarchySegment(ctx, marker, delimiter, options) + response, err = f.cntURL(container).ListBlobsHierarchySegment(ctx, marker, delimiter, options) return f.shouldRetry(err) }) @@ -572,26 +582,17 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint // if prefix != "" && !strings.HasPrefix(file.Name, prefix) { // return nil // } - if !strings.HasPrefix(file.Name, f.root) { + if !strings.HasPrefix(file.Name, prefix) { fs.Debugf(f, "Odd name received %q", file.Name) continue } - remote := file.Name[len(f.root):] + remote := file.Name[len(prefix):] if isDirectoryMarker(*file.Properties.ContentLength, file.Metadata, remote) { - if strings.HasSuffix(remote, "/") { - remote = remote[:len(remote)-1] - } - err = fn(remote, file, true) - if err != nil { - return err - } - // Keep track of directory markers. If recursing then - // there will be no Prefixes so no need to keep track - if !recurse { - directoryMarkers[remote] = struct{}{} - } continue // skip directory marker } + if addContainer { + remote = path.Join(container, remote) + } // Send object err = fn(remote, file, false) if err != nil { @@ -601,14 +602,13 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint // Send the subdirectories for _, remote := range response.Segment.BlobPrefixes { remote := strings.TrimRight(remote.Name, "/") - if !strings.HasPrefix(remote, f.root) { + if !strings.HasPrefix(remote, prefix) { fs.Debugf(f, "Odd directory name received %q", remote) continue } - remote = remote[len(f.root):] - // Don't send if already sent as a directory marker - if _, found := directoryMarkers[remote]; found { - continue + remote = remote[len(prefix):] + if addContainer { + remote = path.Join(container, remote) } // Send object err = fn(remote, nil, true) @@ -633,19 +633,9 @@ func (f *Fs) itemToDirEntry(remote string, object *azblob.BlobItem, isDirectory return o, nil } -// mark the container as being OK -func (f *Fs) markContainerOK() { - if f.container != "" { - f.containerOKMu.Lock() - f.containerOK = true - f.containerDeleted = false - f.containerOKMu.Unlock() - } -} - // listDir lists a single directory -func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - err = f.list(ctx, dir, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error { +func (f *Fs) listDir(ctx context.Context, container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) { + err = f.list(ctx, container, directory, prefix, addContainer, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error { entry, err := f.itemToDirEntry(remote, object, isDirectory) if err != nil { return err @@ -659,7 +649,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er return nil, err } // container must be present if listing succeeded - f.markContainerOK() + f.cache.MarkOK(container) return entries, nil } @@ -668,8 +658,18 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { if dir != "" { return nil, fs.ErrorListBucketRequired } + if f.isLimited { + f.cntURLcacheMu.Lock() + for container := range f.cntURLcache { + d := fs.NewDir(container, time.Time{}) + entries = append(entries, d) + } + f.cntURLcacheMu.Unlock() + return entries, nil + } err = f.listContainersToFn(func(container *azblob.ContainerItem) error { d := fs.NewDir(container.Name, container.Properties.LastModified) + f.cache.MarkOK(container.Name) entries = append(entries, d) return nil }) @@ -689,10 +689,11 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) { // This should return ErrDirNotFound if the directory isn't // found. func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { - if f.container == "" { - return f.listContainers(dir) + container, directory := f.split(dir) + if container == "" { + return f.listContainers(directory) } - return f.listDir(ctx, dir) + return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "") } // ListR lists the objects and directories of the Fs starting @@ -712,22 +713,41 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e // Don't implement this unless you have a more efficient way // of listing recursively that doing a directory traversal. func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { - if f.container == "" { - return fs.ErrorListBucketRequired - } + container, directory := f.split(dir) list := walk.NewListRHelper(callback) - err = f.list(ctx, dir, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error { - entry, err := f.itemToDirEntry(remote, object, isDirectory) + listR := func(container, directory, prefix string, addContainer bool) error { + return f.list(ctx, container, directory, prefix, addContainer, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error { + entry, err := f.itemToDirEntry(remote, object, isDirectory) + if err != nil { + return err + } + return list.Add(entry) + }) + } + if container == "" { + entries, err := f.listContainers("") + if err != nil { + return err + } + for _, entry := range entries { + err = list.Add(entry) + if err != nil { + return err + } + container := entry.Remote() + err = listR(container, "", f.rootDirectory, true) + if err != nil { + return err + } + } + } else { + err = listR(container, directory, f.rootDirectory, f.rootContainer == "") if err != nil { return err } - return list.Add(entry) - }) - if err != nil { - return err } // container must be present if listing succeeded - f.markContainerOK() + f.cache.MarkOK(container) return list.Flush() } @@ -777,86 +797,38 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options . return fs, fs.Update(ctx, in, src, options...) } -// Check if the container exists -// -// NB this can return incorrect results if called immediately after container deletion -func (f *Fs) dirExists() (bool, error) { - options := azblob.ListBlobsSegmentOptions{ - Details: azblob.BlobListingDetails{ - Copy: false, - Metadata: false, - Snapshots: false, - UncommittedBlobs: false, - Deleted: false, - }, - MaxResults: 1, - } - err := f.pacer.Call(func() (bool, error) { - ctx := context.Background() - _, err := f.cntURL.ListBlobsHierarchySegment(ctx, azblob.Marker{}, "", options) - return f.shouldRetry(err) - }) - if err == nil { - return true, nil - } - // Check http error code along with service code, current SDK doesn't populate service code correctly sometimes - if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) { - return false, nil - } - return false, err -} - // Mkdir creates the container if it doesn't exist func (f *Fs) Mkdir(ctx context.Context, dir string) error { - f.containerOKMu.Lock() - defer f.containerOKMu.Unlock() - if f.containerOK { - return nil - } - if !f.containerDeleted { - exists, err := f.dirExists() - if err == nil { - f.containerOK = exists - } - if err != nil || exists { - return err - } - } - - // now try to create the container - err := f.pacer.Call(func() (bool, error) { - ctx := context.Background() - _, err := f.cntURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) - if err != nil { - if storageErr, ok := err.(azblob.StorageError); ok { - switch storageErr.ServiceCode() { - case azblob.ServiceCodeContainerAlreadyExists: - f.containerOK = true - return false, nil - case azblob.ServiceCodeContainerBeingDeleted: - // From https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container - // When a container is deleted, a container with the same name cannot be created - // for at least 30 seconds; the container may not be available for more than 30 - // seconds if the service is still processing the request. - time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds - f.containerDeleted = true - return true, err + container, _ := f.split(dir) + return f.cache.Create(container, func() error { + // now try to create the container + return f.pacer.Call(func() (bool, error) { + _, err := f.cntURL(container).Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) + if err != nil { + if storageErr, ok := err.(azblob.StorageError); ok { + switch storageErr.ServiceCode() { + case azblob.ServiceCodeContainerAlreadyExists: + return false, nil + case azblob.ServiceCodeContainerBeingDeleted: + // From https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container + // When a container is deleted, a container with the same name cannot be created + // for at least 30 seconds; the container may not be available for more than 30 + // seconds if the service is still processing the request. + time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds + f.cache.MarkDeleted(container) + return true, err + } } } - } - return f.shouldRetry(err) - }) - if err == nil { - f.containerOK = true - f.containerDeleted = false - } - return errors.Wrap(err, "failed to make container") + return f.shouldRetry(err) + }) + }, nil) } -// isEmpty checks to see if a given directory is empty and returns an error if not -func (f *Fs) isEmpty(ctx context.Context, dir string) (err error) { +// isEmpty checks to see if a given (container, directory) is empty and returns an error if not +func (f *Fs) isEmpty(ctx context.Context, container, directory string) (err error) { empty := true - err = f.list(ctx, dir, true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error { + err = f.list(ctx, container, directory, f.rootDirectory, f.rootContainer == "", true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error { empty = false return nil }) @@ -871,47 +843,42 @@ func (f *Fs) isEmpty(ctx context.Context, dir string) (err error) { // deleteContainer deletes the container. It can delete a full // container so use isEmpty if you don't want that. -func (f *Fs) deleteContainer() error { - f.containerOKMu.Lock() - defer f.containerOKMu.Unlock() - options := azblob.ContainerAccessConditions{} - ctx := context.Background() - err := f.pacer.Call(func() (bool, error) { - _, err := f.cntURL.GetProperties(ctx, azblob.LeaseAccessConditions{}) - if err == nil { - _, err = f.cntURL.Delete(ctx, options) - } +func (f *Fs) deleteContainer(ctx context.Context, container string) error { + return f.cache.Remove(container, func() error { + options := azblob.ContainerAccessConditions{} + return f.pacer.Call(func() (bool, error) { + _, err := f.cntURL(container).GetProperties(ctx, azblob.LeaseAccessConditions{}) + if err == nil { + _, err = f.cntURL(container).Delete(ctx, options) + } - if err != nil { - // Check http error code along with service code, current SDK doesn't populate service code correctly sometimes - if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) { - return false, fs.ErrorDirNotFound + if err != nil { + // Check http error code along with service code, current SDK doesn't populate service code correctly sometimes + if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) { + return false, fs.ErrorDirNotFound + } + + return f.shouldRetry(err) } return f.shouldRetry(err) - } - - return f.shouldRetry(err) + }) }) - if err == nil { - f.containerOK = false - f.containerDeleted = true - } - return errors.Wrap(err, "failed to delete container") } // Rmdir deletes the container if the fs is at the root // // Returns an error if it isn't empty func (f *Fs) Rmdir(ctx context.Context, dir string) error { - err := f.isEmpty(ctx, dir) + container, directory := f.split(dir) + if container == "" || directory != "" { + return nil + } + err := f.isEmpty(ctx, container, directory) if err != nil { return err } - if f.root != "" || dir != "" { - return nil - } - return f.deleteContainer() + return f.deleteContainer(ctx, container) } // Precision of the remote @@ -927,11 +894,12 @@ func (f *Fs) Hashes() hash.Set { // Purge deletes all the files and directories including the old versions. func (f *Fs) Purge(ctx context.Context) error { dir := "" // forward compat! - if f.root != "" || dir != "" { - // Delegate to caller if not root container + container, directory := f.split(dir) + if container == "" || directory != "" { + // Delegate to caller if not root of a container return fs.ErrorCantPurge } - return f.deleteContainer() + return f.deleteContainer(ctx, container) } // Copy src to this remote using server side copy operations. @@ -944,6 +912,7 @@ func (f *Fs) Purge(ctx context.Context) error { // // If it isn't possible then return fs.ErrorCantCopy func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { + dstContainer, dstPath := f.split(remote) err := f.Mkdir(ctx, "") if err != nil { return nil, err @@ -953,7 +922,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, fs.Debugf(src, "Can't copy - not same remote type") return nil, fs.ErrorCantCopy } - dstBlobURL := f.getBlobReference(remote) + dstBlobURL := f.getBlobReference(dstContainer, dstPath) srcBlobURL := srcObj.getBlobReference() source, err := url.Parse(srcBlobURL.String()) @@ -1086,7 +1055,8 @@ func (o *Object) decodeMetaDataFromBlob(info *azblob.BlobItem) (err error) { // getBlobReference creates an empty blob reference with no metadata func (o *Object) getBlobReference() azblob.BlobURL { - return o.fs.getBlobReference(o.remote) + container, directory := o.split() + return o.fs.getBlobReference(container, directory) } // clearMetaData clears enough metadata so readMetaData will re-read it @@ -1206,7 +1176,6 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read ac := azblob.BlobAccessConditions{} var dowloadResponse *azblob.DownloadResponse err = o.fs.pacer.Call(func() (bool, error) { - log.Printf("offset=%d, count=%v", offset, count) dowloadResponse, err = blob.Download(ctx, offset, count, ac, false) return o.fs.shouldRetry(err) })