azureblob: make all operations work from the root #3421

This commit is contained in:
Nick Craig-Wood 2019-08-16 10:10:56 +01:00
parent d8e9b1a67c
commit 8a0775ce3c

View file

@ -13,11 +13,9 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"net/url" "net/url"
"path" "path"
"regexp"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -34,6 +32,7 @@ import (
"github.com/rclone/rclone/fs/fshttp" "github.com/rclone/rclone/fs/fshttp"
"github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/walk" "github.com/rclone/rclone/fs/walk"
"github.com/rclone/rclone/lib/bucket"
"github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/pacer"
) )
@ -144,19 +143,20 @@ type Options struct {
// Fs represents a remote azure server // Fs represents a remote azure server
type Fs struct { type Fs struct {
name string // name of this remote name string // name of this remote
root string // the path we are working on if any root string // the path we are working on if any
opt Options // parsed config options opt Options // parsed config options
features *fs.Features // optional features features *fs.Features // optional features
client *http.Client // http client we are using client *http.Client // http client we are using
svcURL *azblob.ServiceURL // reference to serviceURL svcURL *azblob.ServiceURL // reference to serviceURL
cntURL *azblob.ContainerURL // reference to containerURL cntURLcacheMu sync.Mutex // mutex to protect cntURLcache
container string // the container we are working on cntURLcache map[string]*azblob.ContainerURL // reference to containerURL per container
containerOKMu sync.Mutex // mutex to protect container OK rootContainer string // container part of root (if any)
containerOK bool // true if we have created the container rootDirectory string // directory part of root (if any)
containerDeleted bool // true if we have deleted the container isLimited bool // if limited to one container
pacer *fs.Pacer // To pace and retry the API calls cache *bucket.Cache // cache for container creation status
uploadToken *pacer.TokenDispenser // control concurrency pacer *fs.Pacer // To pace and retry the API calls
uploadToken *pacer.TokenDispenser // control concurrency
} }
// Object describes a azure object // Object describes a azure object
@ -180,18 +180,18 @@ func (f *Fs) Name() string {
// Root of the remote (as passed into NewFs) // Root of the remote (as passed into NewFs)
func (f *Fs) Root() string { func (f *Fs) Root() string {
if f.root == "" { return f.root
return f.container
}
return f.container + "/" + f.root
} }
// String converts this Fs to a string // String converts this Fs to a string
func (f *Fs) String() string { func (f *Fs) String() string {
if f.root == "" { if f.rootContainer == "" {
return fmt.Sprintf("Azure container %s", f.container) return fmt.Sprintf("Azure root")
} }
return fmt.Sprintf("Azure container %s path %s", f.container, f.root) if f.rootDirectory == "" {
return fmt.Sprintf("Azure container %s", f.rootContainer)
}
return fmt.Sprintf("Azure container %s path %s", f.rootContainer, f.rootDirectory)
} }
// Features returns the optional features of this Fs // Features returns the optional features of this Fs
@ -199,21 +199,23 @@ func (f *Fs) Features() *fs.Features {
return f.features return f.features
} }
// Pattern to match a azure path // parsePath parses a remote 'url'
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`) func parsePath(path string) (root string) {
root = strings.Trim(path, "/")
// parseParse parses a azure 'url'
func parsePath(path string) (container, directory string, err error) {
parts := matcher.FindStringSubmatch(path)
if parts == nil {
err = errors.Errorf("couldn't find container in azure path %q", path)
} else {
container, directory = parts[1], parts[2]
directory = strings.Trim(directory, "/")
}
return return
} }
// split returns container and containerPath from the rootRelativePath
// relative to f.root
func (f *Fs) split(rootRelativePath string) (containerName, containerPath string) {
return bucket.Split(path.Join(f.root, rootRelativePath))
}
// split returns container and containerPath from the object
func (o *Object) split() (container, containerPath string) {
return o.fs.split(o.remote)
}
// validateAccessTier checks if azureblob supports user supplied tier // validateAccessTier checks if azureblob supports user supplied tier
func validateAccessTier(tier string) bool { func validateAccessTier(tier string) bool {
switch tier { switch tier {
@ -318,6 +320,12 @@ func (f *Fs) newPipeline(c azblob.Credential, o azblob.PipelineOptions) pipeline
return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log}) return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log})
} }
// setRoot changes the root of the Fs
func (f *Fs) setRoot(root string) {
f.root = parsePath(root)
f.rootContainer, f.rootDirectory = bucket.Split(f.root)
}
// NewFs constructs an Fs from the path, container:path // NewFs constructs an Fs from the path, container:path
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
ctx := context.Background() ctx := context.Background()
@ -339,10 +347,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
if opt.ListChunkSize > maxListChunkSize { if opt.ListChunkSize > maxListChunkSize {
return nil, errors.Errorf("azure: blob list size can't be greater than %v - was %v", maxListChunkSize, opt.ListChunkSize) return nil, errors.Errorf("azure: blob list size can't be greater than %v - was %v", maxListChunkSize, opt.ListChunkSize)
} }
container, directory, err := parsePath(root)
if err != nil {
return nil, err
}
if opt.Endpoint == "" { if opt.Endpoint == "" {
opt.Endpoint = storageDefaultBaseURL opt.Endpoint = storageDefaultBaseURL
} }
@ -357,24 +361,25 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
f := &Fs{ f := &Fs{
name: name, name: name,
opt: *opt, opt: *opt,
container: container,
root: directory,
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))), pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers), uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
client: fshttp.NewClient(fs.Config), client: fshttp.NewClient(fs.Config),
cache: bucket.NewCache(),
cntURLcache: make(map[string]*azblob.ContainerURL, 1),
} }
f.setRoot(root)
f.features = (&fs.Features{ f.features = (&fs.Features{
ReadMimeType: true, ReadMimeType: true,
WriteMimeType: true, WriteMimeType: true,
BucketBased: true, BucketBased: true,
SetTier: true, BucketBasedRootOK: true,
GetTier: true, SetTier: true,
GetTier: true,
}).Fill(f) }).Fill(f)
var ( var (
u *url.URL u *url.URL
serviceURL azblob.ServiceURL serviceURL azblob.ServiceURL
containerURL azblob.ContainerURL
) )
switch { switch {
case opt.UseEmulator: case opt.UseEmulator:
@ -388,7 +393,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
} }
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}}) pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
serviceURL = azblob.NewServiceURL(*u, pipeline) serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
case opt.Account != "" && opt.Key != "": case opt.Account != "" && opt.Key != "":
credential, err := azblob.NewSharedKeyCredential(opt.Account, opt.Key) credential, err := azblob.NewSharedKeyCredential(opt.Account, opt.Key)
if err != nil { if err != nil {
@ -401,7 +405,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
} }
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}}) pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
serviceURL = azblob.NewServiceURL(*u, pipeline) serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
case opt.SASURL != "": case opt.SASURL != "":
u, err = url.Parse(opt.SASURL) u, err = url.Parse(opt.SASURL)
if err != nil { if err != nil {
@ -412,38 +415,30 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
// Check if we have container level SAS or account level sas // Check if we have container level SAS or account level sas
parts := azblob.NewBlobURLParts(*u) parts := azblob.NewBlobURLParts(*u)
if parts.ContainerName != "" { if parts.ContainerName != "" {
if container != "" && parts.ContainerName != container { if f.rootContainer != "" && parts.ContainerName != f.rootContainer {
return nil, errors.New("Container name in SAS URL and container provided in command do not match") return nil, errors.New("Container name in SAS URL and container provided in command do not match")
} }
containerURL := azblob.NewContainerURL(*u, pipeline)
f.container = parts.ContainerName f.cntURLcache[parts.ContainerName] = &containerURL
containerURL = azblob.NewContainerURL(*u, pipeline) f.isLimited = true
} else { } else {
serviceURL = azblob.NewServiceURL(*u, pipeline) serviceURL = azblob.NewServiceURL(*u, pipeline)
containerURL = serviceURL.NewContainerURL(container)
} }
default: default:
return nil, errors.New("Need account+key or connectionString or sasURL") return nil, errors.New("Need account+key or connectionString or sasURL")
} }
f.svcURL = &serviceURL f.svcURL = &serviceURL
f.cntURL = &containerURL
if f.root != "" { if f.rootContainer != "" && f.rootDirectory != "" {
f.root += "/"
// Check to see if the (container,directory) is actually an existing file // Check to see if the (container,directory) is actually an existing file
oldRoot := f.root oldRoot := f.root
remote := path.Base(directory) newRoot, leaf := path.Split(oldRoot)
f.root = path.Dir(directory) f.setRoot(newRoot)
if f.root == "." { _, err := f.NewObject(ctx, leaf)
f.root = ""
} else {
f.root += "/"
}
_, err := f.NewObject(ctx, remote)
if err != nil { if err != nil {
if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile { if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile {
// File doesn't exist or is a directory so return old f // File doesn't exist or is a directory so return old f
f.root = oldRoot f.setRoot(oldRoot)
return f, nil return f, nil
} }
return nil, err return nil, err
@ -454,6 +449,20 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
return f, nil return f, nil
} }
// return the container URL for the container passed in
func (f *Fs) cntURL(container string) (containerURL *azblob.ContainerURL) {
f.cntURLcacheMu.Lock()
defer f.cntURLcacheMu.Unlock()
var ok bool
if containerURL, ok = f.cntURLcache[container]; !ok {
cntURL := f.svcURL.NewContainerURL(container)
containerURL = &cntURL
f.cntURLcache[container] = containerURL
}
return containerURL
}
// Return an Object from a path // Return an Object from a path
// //
// If it can't be found it returns the error fs.ErrorObjectNotFound. // If it can't be found it returns the error fs.ErrorObjectNotFound.
@ -483,8 +492,8 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
} }
// getBlobReference creates an empty blob reference with no metadata // getBlobReference creates an empty blob reference with no metadata
func (f *Fs) getBlobReference(remote string) azblob.BlobURL { func (f *Fs) getBlobReference(container, containerPath string) azblob.BlobURL {
return f.cntURL.NewBlobURL(f.root + remote) return f.cntURL(container).NewBlobURL(containerPath)
} }
// updateMetadataWithModTime adds the modTime passed in to o.meta. // updateMetadataWithModTime adds the modTime passed in to o.meta.
@ -520,16 +529,18 @@ type listFn func(remote string, object *azblob.BlobItem, isDirectory bool) error
// the container and root supplied // the container and root supplied
// //
// dir is the starting directory, "" for root // dir is the starting directory, "" for root
func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint, fn listFn) error { //
f.containerOKMu.Lock() // The remote has prefix removed from it and if addContainer is set then
deleted := f.containerDeleted // it adds the container to the start.
f.containerOKMu.Unlock() func (f *Fs) list(ctx context.Context, container, directory, prefix string, addContainer bool, recurse bool, maxResults uint, fn listFn) error {
if deleted { if f.cache.IsDeleted(container) {
return fs.ErrorDirNotFound return fs.ErrorDirNotFound
} }
root := f.root if prefix != "" {
if dir != "" { prefix += "/"
root += dir + "/" }
if directory != "" {
directory += "/"
} }
delimiter := "" delimiter := ""
if !recurse { if !recurse {
@ -544,15 +555,14 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint
UncommittedBlobs: false, UncommittedBlobs: false,
Deleted: false, Deleted: false,
}, },
Prefix: root, Prefix: directory,
MaxResults: int32(maxResults), MaxResults: int32(maxResults),
} }
directoryMarkers := map[string]struct{}{}
for marker := (azblob.Marker{}); marker.NotDone(); { for marker := (azblob.Marker{}); marker.NotDone(); {
var response *azblob.ListBlobsHierarchySegmentResponse var response *azblob.ListBlobsHierarchySegmentResponse
err := f.pacer.Call(func() (bool, error) { err := f.pacer.Call(func() (bool, error) {
var err error var err error
response, err = f.cntURL.ListBlobsHierarchySegment(ctx, marker, delimiter, options) response, err = f.cntURL(container).ListBlobsHierarchySegment(ctx, marker, delimiter, options)
return f.shouldRetry(err) return f.shouldRetry(err)
}) })
@ -572,26 +582,17 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint
// if prefix != "" && !strings.HasPrefix(file.Name, prefix) { // if prefix != "" && !strings.HasPrefix(file.Name, prefix) {
// return nil // return nil
// } // }
if !strings.HasPrefix(file.Name, f.root) { if !strings.HasPrefix(file.Name, prefix) {
fs.Debugf(f, "Odd name received %q", file.Name) fs.Debugf(f, "Odd name received %q", file.Name)
continue continue
} }
remote := file.Name[len(f.root):] remote := file.Name[len(prefix):]
if isDirectoryMarker(*file.Properties.ContentLength, file.Metadata, remote) { if isDirectoryMarker(*file.Properties.ContentLength, file.Metadata, remote) {
if strings.HasSuffix(remote, "/") {
remote = remote[:len(remote)-1]
}
err = fn(remote, file, true)
if err != nil {
return err
}
// Keep track of directory markers. If recursing then
// there will be no Prefixes so no need to keep track
if !recurse {
directoryMarkers[remote] = struct{}{}
}
continue // skip directory marker continue // skip directory marker
} }
if addContainer {
remote = path.Join(container, remote)
}
// Send object // Send object
err = fn(remote, file, false) err = fn(remote, file, false)
if err != nil { if err != nil {
@ -601,14 +602,13 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint
// Send the subdirectories // Send the subdirectories
for _, remote := range response.Segment.BlobPrefixes { for _, remote := range response.Segment.BlobPrefixes {
remote := strings.TrimRight(remote.Name, "/") remote := strings.TrimRight(remote.Name, "/")
if !strings.HasPrefix(remote, f.root) { if !strings.HasPrefix(remote, prefix) {
fs.Debugf(f, "Odd directory name received %q", remote) fs.Debugf(f, "Odd directory name received %q", remote)
continue continue
} }
remote = remote[len(f.root):] remote = remote[len(prefix):]
// Don't send if already sent as a directory marker if addContainer {
if _, found := directoryMarkers[remote]; found { remote = path.Join(container, remote)
continue
} }
// Send object // Send object
err = fn(remote, nil, true) err = fn(remote, nil, true)
@ -633,19 +633,9 @@ func (f *Fs) itemToDirEntry(remote string, object *azblob.BlobItem, isDirectory
return o, nil return o, nil
} }
// mark the container as being OK
func (f *Fs) markContainerOK() {
if f.container != "" {
f.containerOKMu.Lock()
f.containerOK = true
f.containerDeleted = false
f.containerOKMu.Unlock()
}
}
// listDir lists a single directory // listDir lists a single directory
func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) { func (f *Fs) listDir(ctx context.Context, container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) {
err = f.list(ctx, dir, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error { err = f.list(ctx, container, directory, prefix, addContainer, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory) entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil { if err != nil {
return err return err
@ -659,7 +649,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er
return nil, err return nil, err
} }
// container must be present if listing succeeded // container must be present if listing succeeded
f.markContainerOK() f.cache.MarkOK(container)
return entries, nil return entries, nil
} }
@ -668,8 +658,18 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
if dir != "" { if dir != "" {
return nil, fs.ErrorListBucketRequired return nil, fs.ErrorListBucketRequired
} }
if f.isLimited {
f.cntURLcacheMu.Lock()
for container := range f.cntURLcache {
d := fs.NewDir(container, time.Time{})
entries = append(entries, d)
}
f.cntURLcacheMu.Unlock()
return entries, nil
}
err = f.listContainersToFn(func(container *azblob.ContainerItem) error { err = f.listContainersToFn(func(container *azblob.ContainerItem) error {
d := fs.NewDir(container.Name, container.Properties.LastModified) d := fs.NewDir(container.Name, container.Properties.LastModified)
f.cache.MarkOK(container.Name)
entries = append(entries, d) entries = append(entries, d)
return nil return nil
}) })
@ -689,10 +689,11 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
// This should return ErrDirNotFound if the directory isn't // This should return ErrDirNotFound if the directory isn't
// found. // found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
if f.container == "" { container, directory := f.split(dir)
return f.listContainers(dir) if container == "" {
return f.listContainers(directory)
} }
return f.listDir(ctx, dir) return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "")
} }
// ListR lists the objects and directories of the Fs starting // ListR lists the objects and directories of the Fs starting
@ -712,22 +713,41 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
// Don't implement this unless you have a more efficient way // Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal. // of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
if f.container == "" { container, directory := f.split(dir)
return fs.ErrorListBucketRequired
}
list := walk.NewListRHelper(callback) list := walk.NewListRHelper(callback)
err = f.list(ctx, dir, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error { listR := func(container, directory, prefix string, addContainer bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory) return f.list(ctx, container, directory, prefix, addContainer, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
entry, err := f.itemToDirEntry(remote, object, isDirectory)
if err != nil {
return err
}
return list.Add(entry)
})
}
if container == "" {
entries, err := f.listContainers("")
if err != nil {
return err
}
for _, entry := range entries {
err = list.Add(entry)
if err != nil {
return err
}
container := entry.Remote()
err = listR(container, "", f.rootDirectory, true)
if err != nil {
return err
}
}
} else {
err = listR(container, directory, f.rootDirectory, f.rootContainer == "")
if err != nil { if err != nil {
return err return err
} }
return list.Add(entry)
})
if err != nil {
return err
} }
// container must be present if listing succeeded // container must be present if listing succeeded
f.markContainerOK() f.cache.MarkOK(container)
return list.Flush() return list.Flush()
} }
@ -777,86 +797,38 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
return fs, fs.Update(ctx, in, src, options...) return fs, fs.Update(ctx, in, src, options...)
} }
// Check if the container exists
//
// NB this can return incorrect results if called immediately after container deletion
func (f *Fs) dirExists() (bool, error) {
options := azblob.ListBlobsSegmentOptions{
Details: azblob.BlobListingDetails{
Copy: false,
Metadata: false,
Snapshots: false,
UncommittedBlobs: false,
Deleted: false,
},
MaxResults: 1,
}
err := f.pacer.Call(func() (bool, error) {
ctx := context.Background()
_, err := f.cntURL.ListBlobsHierarchySegment(ctx, azblob.Marker{}, "", options)
return f.shouldRetry(err)
})
if err == nil {
return true, nil
}
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) {
return false, nil
}
return false, err
}
// Mkdir creates the container if it doesn't exist // Mkdir creates the container if it doesn't exist
func (f *Fs) Mkdir(ctx context.Context, dir string) error { func (f *Fs) Mkdir(ctx context.Context, dir string) error {
f.containerOKMu.Lock() container, _ := f.split(dir)
defer f.containerOKMu.Unlock() return f.cache.Create(container, func() error {
if f.containerOK { // now try to create the container
return nil return f.pacer.Call(func() (bool, error) {
} _, err := f.cntURL(container).Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
if !f.containerDeleted { if err != nil {
exists, err := f.dirExists() if storageErr, ok := err.(azblob.StorageError); ok {
if err == nil { switch storageErr.ServiceCode() {
f.containerOK = exists case azblob.ServiceCodeContainerAlreadyExists:
} return false, nil
if err != nil || exists { case azblob.ServiceCodeContainerBeingDeleted:
return err // From https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container
} // When a container is deleted, a container with the same name cannot be created
} // for at least 30 seconds; the container may not be available for more than 30
// seconds if the service is still processing the request.
// now try to create the container time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds
err := f.pacer.Call(func() (bool, error) { f.cache.MarkDeleted(container)
ctx := context.Background() return true, err
_, err := f.cntURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone) }
if err != nil {
if storageErr, ok := err.(azblob.StorageError); ok {
switch storageErr.ServiceCode() {
case azblob.ServiceCodeContainerAlreadyExists:
f.containerOK = true
return false, nil
case azblob.ServiceCodeContainerBeingDeleted:
// From https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container
// When a container is deleted, a container with the same name cannot be created
// for at least 30 seconds; the container may not be available for more than 30
// seconds if the service is still processing the request.
time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds
f.containerDeleted = true
return true, err
} }
} }
} return f.shouldRetry(err)
return f.shouldRetry(err) })
}) }, nil)
if err == nil {
f.containerOK = true
f.containerDeleted = false
}
return errors.Wrap(err, "failed to make container")
} }
// isEmpty checks to see if a given directory is empty and returns an error if not // isEmpty checks to see if a given (container, directory) is empty and returns an error if not
func (f *Fs) isEmpty(ctx context.Context, dir string) (err error) { func (f *Fs) isEmpty(ctx context.Context, container, directory string) (err error) {
empty := true empty := true
err = f.list(ctx, dir, true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error { err = f.list(ctx, container, directory, f.rootDirectory, f.rootContainer == "", true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
empty = false empty = false
return nil return nil
}) })
@ -871,47 +843,42 @@ func (f *Fs) isEmpty(ctx context.Context, dir string) (err error) {
// deleteContainer deletes the container. It can delete a full // deleteContainer deletes the container. It can delete a full
// container so use isEmpty if you don't want that. // container so use isEmpty if you don't want that.
func (f *Fs) deleteContainer() error { func (f *Fs) deleteContainer(ctx context.Context, container string) error {
f.containerOKMu.Lock() return f.cache.Remove(container, func() error {
defer f.containerOKMu.Unlock() options := azblob.ContainerAccessConditions{}
options := azblob.ContainerAccessConditions{} return f.pacer.Call(func() (bool, error) {
ctx := context.Background() _, err := f.cntURL(container).GetProperties(ctx, azblob.LeaseAccessConditions{})
err := f.pacer.Call(func() (bool, error) { if err == nil {
_, err := f.cntURL.GetProperties(ctx, azblob.LeaseAccessConditions{}) _, err = f.cntURL(container).Delete(ctx, options)
if err == nil { }
_, err = f.cntURL.Delete(ctx, options)
}
if err != nil { if err != nil {
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes // Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) { if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) {
return false, fs.ErrorDirNotFound return false, fs.ErrorDirNotFound
}
return f.shouldRetry(err)
} }
return f.shouldRetry(err) return f.shouldRetry(err)
} })
return f.shouldRetry(err)
}) })
if err == nil {
f.containerOK = false
f.containerDeleted = true
}
return errors.Wrap(err, "failed to delete container")
} }
// Rmdir deletes the container if the fs is at the root // Rmdir deletes the container if the fs is at the root
// //
// Returns an error if it isn't empty // Returns an error if it isn't empty
func (f *Fs) Rmdir(ctx context.Context, dir string) error { func (f *Fs) Rmdir(ctx context.Context, dir string) error {
err := f.isEmpty(ctx, dir) container, directory := f.split(dir)
if container == "" || directory != "" {
return nil
}
err := f.isEmpty(ctx, container, directory)
if err != nil { if err != nil {
return err return err
} }
if f.root != "" || dir != "" { return f.deleteContainer(ctx, container)
return nil
}
return f.deleteContainer()
} }
// Precision of the remote // Precision of the remote
@ -927,11 +894,12 @@ func (f *Fs) Hashes() hash.Set {
// Purge deletes all the files and directories including the old versions. // Purge deletes all the files and directories including the old versions.
func (f *Fs) Purge(ctx context.Context) error { func (f *Fs) Purge(ctx context.Context) error {
dir := "" // forward compat! dir := "" // forward compat!
if f.root != "" || dir != "" { container, directory := f.split(dir)
// Delegate to caller if not root container if container == "" || directory != "" {
// Delegate to caller if not root of a container
return fs.ErrorCantPurge return fs.ErrorCantPurge
} }
return f.deleteContainer() return f.deleteContainer(ctx, container)
} }
// Copy src to this remote using server side copy operations. // Copy src to this remote using server side copy operations.
@ -944,6 +912,7 @@ func (f *Fs) Purge(ctx context.Context) error {
// //
// If it isn't possible then return fs.ErrorCantCopy // If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
dstContainer, dstPath := f.split(remote)
err := f.Mkdir(ctx, "") err := f.Mkdir(ctx, "")
if err != nil { if err != nil {
return nil, err return nil, err
@ -953,7 +922,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
fs.Debugf(src, "Can't copy - not same remote type") fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy return nil, fs.ErrorCantCopy
} }
dstBlobURL := f.getBlobReference(remote) dstBlobURL := f.getBlobReference(dstContainer, dstPath)
srcBlobURL := srcObj.getBlobReference() srcBlobURL := srcObj.getBlobReference()
source, err := url.Parse(srcBlobURL.String()) source, err := url.Parse(srcBlobURL.String())
@ -1086,7 +1055,8 @@ func (o *Object) decodeMetaDataFromBlob(info *azblob.BlobItem) (err error) {
// getBlobReference creates an empty blob reference with no metadata // getBlobReference creates an empty blob reference with no metadata
func (o *Object) getBlobReference() azblob.BlobURL { func (o *Object) getBlobReference() azblob.BlobURL {
return o.fs.getBlobReference(o.remote) container, directory := o.split()
return o.fs.getBlobReference(container, directory)
} }
// clearMetaData clears enough metadata so readMetaData will re-read it // clearMetaData clears enough metadata so readMetaData will re-read it
@ -1206,7 +1176,6 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
ac := azblob.BlobAccessConditions{} ac := azblob.BlobAccessConditions{}
var dowloadResponse *azblob.DownloadResponse var dowloadResponse *azblob.DownloadResponse
err = o.fs.pacer.Call(func() (bool, error) { err = o.fs.pacer.Call(func() (bool, error) {
log.Printf("offset=%d, count=%v", offset, count)
dowloadResponse, err = blob.Download(ctx, offset, count, ac, false) dowloadResponse, err = blob.Download(ctx, offset, count, ac, false)
return o.fs.shouldRetry(err) return o.fs.shouldRetry(err)
}) })