azureblob: make all operations work from the root #3421
This commit is contained in:
parent
d8e9b1a67c
commit
8a0775ce3c
1 changed files with 200 additions and 231 deletions
|
@ -13,11 +13,9 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
"regexp"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -34,6 +32,7 @@ import (
|
||||||
"github.com/rclone/rclone/fs/fshttp"
|
"github.com/rclone/rclone/fs/fshttp"
|
||||||
"github.com/rclone/rclone/fs/hash"
|
"github.com/rclone/rclone/fs/hash"
|
||||||
"github.com/rclone/rclone/fs/walk"
|
"github.com/rclone/rclone/fs/walk"
|
||||||
|
"github.com/rclone/rclone/lib/bucket"
|
||||||
"github.com/rclone/rclone/lib/pacer"
|
"github.com/rclone/rclone/lib/pacer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -150,11 +149,12 @@ type Fs struct {
|
||||||
features *fs.Features // optional features
|
features *fs.Features // optional features
|
||||||
client *http.Client // http client we are using
|
client *http.Client // http client we are using
|
||||||
svcURL *azblob.ServiceURL // reference to serviceURL
|
svcURL *azblob.ServiceURL // reference to serviceURL
|
||||||
cntURL *azblob.ContainerURL // reference to containerURL
|
cntURLcacheMu sync.Mutex // mutex to protect cntURLcache
|
||||||
container string // the container we are working on
|
cntURLcache map[string]*azblob.ContainerURL // reference to containerURL per container
|
||||||
containerOKMu sync.Mutex // mutex to protect container OK
|
rootContainer string // container part of root (if any)
|
||||||
containerOK bool // true if we have created the container
|
rootDirectory string // directory part of root (if any)
|
||||||
containerDeleted bool // true if we have deleted the container
|
isLimited bool // if limited to one container
|
||||||
|
cache *bucket.Cache // cache for container creation status
|
||||||
pacer *fs.Pacer // To pace and retry the API calls
|
pacer *fs.Pacer // To pace and retry the API calls
|
||||||
uploadToken *pacer.TokenDispenser // control concurrency
|
uploadToken *pacer.TokenDispenser // control concurrency
|
||||||
}
|
}
|
||||||
|
@ -180,18 +180,18 @@ func (f *Fs) Name() string {
|
||||||
|
|
||||||
// Root of the remote (as passed into NewFs)
|
// Root of the remote (as passed into NewFs)
|
||||||
func (f *Fs) Root() string {
|
func (f *Fs) Root() string {
|
||||||
if f.root == "" {
|
return f.root
|
||||||
return f.container
|
|
||||||
}
|
|
||||||
return f.container + "/" + f.root
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// String converts this Fs to a string
|
// String converts this Fs to a string
|
||||||
func (f *Fs) String() string {
|
func (f *Fs) String() string {
|
||||||
if f.root == "" {
|
if f.rootContainer == "" {
|
||||||
return fmt.Sprintf("Azure container %s", f.container)
|
return fmt.Sprintf("Azure root")
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("Azure container %s path %s", f.container, f.root)
|
if f.rootDirectory == "" {
|
||||||
|
return fmt.Sprintf("Azure container %s", f.rootContainer)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("Azure container %s path %s", f.rootContainer, f.rootDirectory)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Features returns the optional features of this Fs
|
// Features returns the optional features of this Fs
|
||||||
|
@ -199,21 +199,23 @@ func (f *Fs) Features() *fs.Features {
|
||||||
return f.features
|
return f.features
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pattern to match a azure path
|
// parsePath parses a remote 'url'
|
||||||
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
|
func parsePath(path string) (root string) {
|
||||||
|
root = strings.Trim(path, "/")
|
||||||
// parseParse parses a azure 'url'
|
|
||||||
func parsePath(path string) (container, directory string, err error) {
|
|
||||||
parts := matcher.FindStringSubmatch(path)
|
|
||||||
if parts == nil {
|
|
||||||
err = errors.Errorf("couldn't find container in azure path %q", path)
|
|
||||||
} else {
|
|
||||||
container, directory = parts[1], parts[2]
|
|
||||||
directory = strings.Trim(directory, "/")
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// split returns container and containerPath from the rootRelativePath
|
||||||
|
// relative to f.root
|
||||||
|
func (f *Fs) split(rootRelativePath string) (containerName, containerPath string) {
|
||||||
|
return bucket.Split(path.Join(f.root, rootRelativePath))
|
||||||
|
}
|
||||||
|
|
||||||
|
// split returns container and containerPath from the object
|
||||||
|
func (o *Object) split() (container, containerPath string) {
|
||||||
|
return o.fs.split(o.remote)
|
||||||
|
}
|
||||||
|
|
||||||
// validateAccessTier checks if azureblob supports user supplied tier
|
// validateAccessTier checks if azureblob supports user supplied tier
|
||||||
func validateAccessTier(tier string) bool {
|
func validateAccessTier(tier string) bool {
|
||||||
switch tier {
|
switch tier {
|
||||||
|
@ -318,6 +320,12 @@ func (f *Fs) newPipeline(c azblob.Credential, o azblob.PipelineOptions) pipeline
|
||||||
return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log})
|
return pipeline.NewPipeline(factories, pipeline.Options{HTTPSender: httpClientFactory(f.client), Log: o.Log})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setRoot changes the root of the Fs
|
||||||
|
func (f *Fs) setRoot(root string) {
|
||||||
|
f.root = parsePath(root)
|
||||||
|
f.rootContainer, f.rootDirectory = bucket.Split(f.root)
|
||||||
|
}
|
||||||
|
|
||||||
// NewFs constructs an Fs from the path, container:path
|
// NewFs constructs an Fs from the path, container:path
|
||||||
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
@ -339,10 +347,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
if opt.ListChunkSize > maxListChunkSize {
|
if opt.ListChunkSize > maxListChunkSize {
|
||||||
return nil, errors.Errorf("azure: blob list size can't be greater than %v - was %v", maxListChunkSize, opt.ListChunkSize)
|
return nil, errors.Errorf("azure: blob list size can't be greater than %v - was %v", maxListChunkSize, opt.ListChunkSize)
|
||||||
}
|
}
|
||||||
container, directory, err := parsePath(root)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if opt.Endpoint == "" {
|
if opt.Endpoint == "" {
|
||||||
opt.Endpoint = storageDefaultBaseURL
|
opt.Endpoint = storageDefaultBaseURL
|
||||||
}
|
}
|
||||||
|
@ -357,16 +361,18 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
f := &Fs{
|
f := &Fs{
|
||||||
name: name,
|
name: name,
|
||||||
opt: *opt,
|
opt: *opt,
|
||||||
container: container,
|
|
||||||
root: directory,
|
|
||||||
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep), pacer.MaxSleep(maxSleep), pacer.DecayConstant(decayConstant))),
|
||||||
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
|
uploadToken: pacer.NewTokenDispenser(fs.Config.Transfers),
|
||||||
client: fshttp.NewClient(fs.Config),
|
client: fshttp.NewClient(fs.Config),
|
||||||
|
cache: bucket.NewCache(),
|
||||||
|
cntURLcache: make(map[string]*azblob.ContainerURL, 1),
|
||||||
}
|
}
|
||||||
|
f.setRoot(root)
|
||||||
f.features = (&fs.Features{
|
f.features = (&fs.Features{
|
||||||
ReadMimeType: true,
|
ReadMimeType: true,
|
||||||
WriteMimeType: true,
|
WriteMimeType: true,
|
||||||
BucketBased: true,
|
BucketBased: true,
|
||||||
|
BucketBasedRootOK: true,
|
||||||
SetTier: true,
|
SetTier: true,
|
||||||
GetTier: true,
|
GetTier: true,
|
||||||
}).Fill(f)
|
}).Fill(f)
|
||||||
|
@ -374,7 +380,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
var (
|
var (
|
||||||
u *url.URL
|
u *url.URL
|
||||||
serviceURL azblob.ServiceURL
|
serviceURL azblob.ServiceURL
|
||||||
containerURL azblob.ContainerURL
|
|
||||||
)
|
)
|
||||||
switch {
|
switch {
|
||||||
case opt.UseEmulator:
|
case opt.UseEmulator:
|
||||||
|
@ -388,7 +393,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
}
|
}
|
||||||
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
||||||
serviceURL = azblob.NewServiceURL(*u, pipeline)
|
serviceURL = azblob.NewServiceURL(*u, pipeline)
|
||||||
containerURL = serviceURL.NewContainerURL(container)
|
|
||||||
case opt.Account != "" && opt.Key != "":
|
case opt.Account != "" && opt.Key != "":
|
||||||
credential, err := azblob.NewSharedKeyCredential(opt.Account, opt.Key)
|
credential, err := azblob.NewSharedKeyCredential(opt.Account, opt.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -401,7 +405,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
}
|
}
|
||||||
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
pipeline := f.newPipeline(credential, azblob.PipelineOptions{Retry: azblob.RetryOptions{TryTimeout: maxTryTimeout}})
|
||||||
serviceURL = azblob.NewServiceURL(*u, pipeline)
|
serviceURL = azblob.NewServiceURL(*u, pipeline)
|
||||||
containerURL = serviceURL.NewContainerURL(container)
|
|
||||||
case opt.SASURL != "":
|
case opt.SASURL != "":
|
||||||
u, err = url.Parse(opt.SASURL)
|
u, err = url.Parse(opt.SASURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -412,38 +415,30 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
// Check if we have container level SAS or account level sas
|
// Check if we have container level SAS or account level sas
|
||||||
parts := azblob.NewBlobURLParts(*u)
|
parts := azblob.NewBlobURLParts(*u)
|
||||||
if parts.ContainerName != "" {
|
if parts.ContainerName != "" {
|
||||||
if container != "" && parts.ContainerName != container {
|
if f.rootContainer != "" && parts.ContainerName != f.rootContainer {
|
||||||
return nil, errors.New("Container name in SAS URL and container provided in command do not match")
|
return nil, errors.New("Container name in SAS URL and container provided in command do not match")
|
||||||
}
|
}
|
||||||
|
containerURL := azblob.NewContainerURL(*u, pipeline)
|
||||||
f.container = parts.ContainerName
|
f.cntURLcache[parts.ContainerName] = &containerURL
|
||||||
containerURL = azblob.NewContainerURL(*u, pipeline)
|
f.isLimited = true
|
||||||
} else {
|
} else {
|
||||||
serviceURL = azblob.NewServiceURL(*u, pipeline)
|
serviceURL = azblob.NewServiceURL(*u, pipeline)
|
||||||
containerURL = serviceURL.NewContainerURL(container)
|
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return nil, errors.New("Need account+key or connectionString or sasURL")
|
return nil, errors.New("Need account+key or connectionString or sasURL")
|
||||||
}
|
}
|
||||||
f.svcURL = &serviceURL
|
f.svcURL = &serviceURL
|
||||||
f.cntURL = &containerURL
|
|
||||||
|
|
||||||
if f.root != "" {
|
if f.rootContainer != "" && f.rootDirectory != "" {
|
||||||
f.root += "/"
|
|
||||||
// Check to see if the (container,directory) is actually an existing file
|
// Check to see if the (container,directory) is actually an existing file
|
||||||
oldRoot := f.root
|
oldRoot := f.root
|
||||||
remote := path.Base(directory)
|
newRoot, leaf := path.Split(oldRoot)
|
||||||
f.root = path.Dir(directory)
|
f.setRoot(newRoot)
|
||||||
if f.root == "." {
|
_, err := f.NewObject(ctx, leaf)
|
||||||
f.root = ""
|
|
||||||
} else {
|
|
||||||
f.root += "/"
|
|
||||||
}
|
|
||||||
_, err := f.NewObject(ctx, remote)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile {
|
if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile {
|
||||||
// File doesn't exist or is a directory so return old f
|
// File doesn't exist or is a directory so return old f
|
||||||
f.root = oldRoot
|
f.setRoot(oldRoot)
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -454,6 +449,20 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// return the container URL for the container passed in
|
||||||
|
func (f *Fs) cntURL(container string) (containerURL *azblob.ContainerURL) {
|
||||||
|
f.cntURLcacheMu.Lock()
|
||||||
|
defer f.cntURLcacheMu.Unlock()
|
||||||
|
var ok bool
|
||||||
|
if containerURL, ok = f.cntURLcache[container]; !ok {
|
||||||
|
cntURL := f.svcURL.NewContainerURL(container)
|
||||||
|
containerURL = &cntURL
|
||||||
|
f.cntURLcache[container] = containerURL
|
||||||
|
}
|
||||||
|
return containerURL
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Return an Object from a path
|
// Return an Object from a path
|
||||||
//
|
//
|
||||||
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
// If it can't be found it returns the error fs.ErrorObjectNotFound.
|
||||||
|
@ -483,8 +492,8 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBlobReference creates an empty blob reference with no metadata
|
// getBlobReference creates an empty blob reference with no metadata
|
||||||
func (f *Fs) getBlobReference(remote string) azblob.BlobURL {
|
func (f *Fs) getBlobReference(container, containerPath string) azblob.BlobURL {
|
||||||
return f.cntURL.NewBlobURL(f.root + remote)
|
return f.cntURL(container).NewBlobURL(containerPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateMetadataWithModTime adds the modTime passed in to o.meta.
|
// updateMetadataWithModTime adds the modTime passed in to o.meta.
|
||||||
|
@ -520,16 +529,18 @@ type listFn func(remote string, object *azblob.BlobItem, isDirectory bool) error
|
||||||
// the container and root supplied
|
// the container and root supplied
|
||||||
//
|
//
|
||||||
// dir is the starting directory, "" for root
|
// dir is the starting directory, "" for root
|
||||||
func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint, fn listFn) error {
|
//
|
||||||
f.containerOKMu.Lock()
|
// The remote has prefix removed from it and if addContainer is set then
|
||||||
deleted := f.containerDeleted
|
// it adds the container to the start.
|
||||||
f.containerOKMu.Unlock()
|
func (f *Fs) list(ctx context.Context, container, directory, prefix string, addContainer bool, recurse bool, maxResults uint, fn listFn) error {
|
||||||
if deleted {
|
if f.cache.IsDeleted(container) {
|
||||||
return fs.ErrorDirNotFound
|
return fs.ErrorDirNotFound
|
||||||
}
|
}
|
||||||
root := f.root
|
if prefix != "" {
|
||||||
if dir != "" {
|
prefix += "/"
|
||||||
root += dir + "/"
|
}
|
||||||
|
if directory != "" {
|
||||||
|
directory += "/"
|
||||||
}
|
}
|
||||||
delimiter := ""
|
delimiter := ""
|
||||||
if !recurse {
|
if !recurse {
|
||||||
|
@ -544,15 +555,14 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint
|
||||||
UncommittedBlobs: false,
|
UncommittedBlobs: false,
|
||||||
Deleted: false,
|
Deleted: false,
|
||||||
},
|
},
|
||||||
Prefix: root,
|
Prefix: directory,
|
||||||
MaxResults: int32(maxResults),
|
MaxResults: int32(maxResults),
|
||||||
}
|
}
|
||||||
directoryMarkers := map[string]struct{}{}
|
|
||||||
for marker := (azblob.Marker{}); marker.NotDone(); {
|
for marker := (azblob.Marker{}); marker.NotDone(); {
|
||||||
var response *azblob.ListBlobsHierarchySegmentResponse
|
var response *azblob.ListBlobsHierarchySegmentResponse
|
||||||
err := f.pacer.Call(func() (bool, error) {
|
err := f.pacer.Call(func() (bool, error) {
|
||||||
var err error
|
var err error
|
||||||
response, err = f.cntURL.ListBlobsHierarchySegment(ctx, marker, delimiter, options)
|
response, err = f.cntURL(container).ListBlobsHierarchySegment(ctx, marker, delimiter, options)
|
||||||
return f.shouldRetry(err)
|
return f.shouldRetry(err)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -572,26 +582,17 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint
|
||||||
// if prefix != "" && !strings.HasPrefix(file.Name, prefix) {
|
// if prefix != "" && !strings.HasPrefix(file.Name, prefix) {
|
||||||
// return nil
|
// return nil
|
||||||
// }
|
// }
|
||||||
if !strings.HasPrefix(file.Name, f.root) {
|
if !strings.HasPrefix(file.Name, prefix) {
|
||||||
fs.Debugf(f, "Odd name received %q", file.Name)
|
fs.Debugf(f, "Odd name received %q", file.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote := file.Name[len(f.root):]
|
remote := file.Name[len(prefix):]
|
||||||
if isDirectoryMarker(*file.Properties.ContentLength, file.Metadata, remote) {
|
if isDirectoryMarker(*file.Properties.ContentLength, file.Metadata, remote) {
|
||||||
if strings.HasSuffix(remote, "/") {
|
|
||||||
remote = remote[:len(remote)-1]
|
|
||||||
}
|
|
||||||
err = fn(remote, file, true)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Keep track of directory markers. If recursing then
|
|
||||||
// there will be no Prefixes so no need to keep track
|
|
||||||
if !recurse {
|
|
||||||
directoryMarkers[remote] = struct{}{}
|
|
||||||
}
|
|
||||||
continue // skip directory marker
|
continue // skip directory marker
|
||||||
}
|
}
|
||||||
|
if addContainer {
|
||||||
|
remote = path.Join(container, remote)
|
||||||
|
}
|
||||||
// Send object
|
// Send object
|
||||||
err = fn(remote, file, false)
|
err = fn(remote, file, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -601,14 +602,13 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, maxResults uint
|
||||||
// Send the subdirectories
|
// Send the subdirectories
|
||||||
for _, remote := range response.Segment.BlobPrefixes {
|
for _, remote := range response.Segment.BlobPrefixes {
|
||||||
remote := strings.TrimRight(remote.Name, "/")
|
remote := strings.TrimRight(remote.Name, "/")
|
||||||
if !strings.HasPrefix(remote, f.root) {
|
if !strings.HasPrefix(remote, prefix) {
|
||||||
fs.Debugf(f, "Odd directory name received %q", remote)
|
fs.Debugf(f, "Odd directory name received %q", remote)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote = remote[len(f.root):]
|
remote = remote[len(prefix):]
|
||||||
// Don't send if already sent as a directory marker
|
if addContainer {
|
||||||
if _, found := directoryMarkers[remote]; found {
|
remote = path.Join(container, remote)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
// Send object
|
// Send object
|
||||||
err = fn(remote, nil, true)
|
err = fn(remote, nil, true)
|
||||||
|
@ -633,19 +633,9 @@ func (f *Fs) itemToDirEntry(remote string, object *azblob.BlobItem, isDirectory
|
||||||
return o, nil
|
return o, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark the container as being OK
|
|
||||||
func (f *Fs) markContainerOK() {
|
|
||||||
if f.container != "" {
|
|
||||||
f.containerOKMu.Lock()
|
|
||||||
f.containerOK = true
|
|
||||||
f.containerDeleted = false
|
|
||||||
f.containerOKMu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// listDir lists a single directory
|
// listDir lists a single directory
|
||||||
func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) listDir(ctx context.Context, container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) {
|
||||||
err = f.list(ctx, dir, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
|
err = f.list(ctx, container, directory, prefix, addContainer, false, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
|
||||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -659,7 +649,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// container must be present if listing succeeded
|
// container must be present if listing succeeded
|
||||||
f.markContainerOK()
|
f.cache.MarkOK(container)
|
||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -668,8 +658,18 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
|
||||||
if dir != "" {
|
if dir != "" {
|
||||||
return nil, fs.ErrorListBucketRequired
|
return nil, fs.ErrorListBucketRequired
|
||||||
}
|
}
|
||||||
|
if f.isLimited {
|
||||||
|
f.cntURLcacheMu.Lock()
|
||||||
|
for container := range f.cntURLcache {
|
||||||
|
d := fs.NewDir(container, time.Time{})
|
||||||
|
entries = append(entries, d)
|
||||||
|
}
|
||||||
|
f.cntURLcacheMu.Unlock()
|
||||||
|
return entries, nil
|
||||||
|
}
|
||||||
err = f.listContainersToFn(func(container *azblob.ContainerItem) error {
|
err = f.listContainersToFn(func(container *azblob.ContainerItem) error {
|
||||||
d := fs.NewDir(container.Name, container.Properties.LastModified)
|
d := fs.NewDir(container.Name, container.Properties.LastModified)
|
||||||
|
f.cache.MarkOK(container.Name)
|
||||||
entries = append(entries, d)
|
entries = append(entries, d)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -689,10 +689,11 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
|
||||||
// This should return ErrDirNotFound if the directory isn't
|
// This should return ErrDirNotFound if the directory isn't
|
||||||
// found.
|
// found.
|
||||||
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
||||||
if f.container == "" {
|
container, directory := f.split(dir)
|
||||||
return f.listContainers(dir)
|
if container == "" {
|
||||||
|
return f.listContainers(directory)
|
||||||
}
|
}
|
||||||
return f.listDir(ctx, dir)
|
return f.listDir(ctx, container, directory, f.rootDirectory, f.rootContainer == "")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListR lists the objects and directories of the Fs starting
|
// ListR lists the objects and directories of the Fs starting
|
||||||
|
@ -712,22 +713,41 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
||||||
// Don't implement this unless you have a more efficient way
|
// Don't implement this unless you have a more efficient way
|
||||||
// of listing recursively that doing a directory traversal.
|
// of listing recursively that doing a directory traversal.
|
||||||
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
||||||
if f.container == "" {
|
container, directory := f.split(dir)
|
||||||
return fs.ErrorListBucketRequired
|
|
||||||
}
|
|
||||||
list := walk.NewListRHelper(callback)
|
list := walk.NewListRHelper(callback)
|
||||||
err = f.list(ctx, dir, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
|
listR := func(container, directory, prefix string, addContainer bool) error {
|
||||||
|
return f.list(ctx, container, directory, prefix, addContainer, true, f.opt.ListChunkSize, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
|
||||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return list.Add(entry)
|
return list.Add(entry)
|
||||||
})
|
})
|
||||||
|
}
|
||||||
|
if container == "" {
|
||||||
|
entries, err := f.listContainers("")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
for _, entry := range entries {
|
||||||
|
err = list.Add(entry)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
container := entry.Remote()
|
||||||
|
err = listR(container, "", f.rootDirectory, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = listR(container, directory, f.rootDirectory, f.rootContainer == "")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
// container must be present if listing succeeded
|
// container must be present if listing succeeded
|
||||||
f.markContainerOK()
|
f.cache.MarkOK(container)
|
||||||
return list.Flush()
|
return list.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -777,61 +797,17 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
|
||||||
return fs, fs.Update(ctx, in, src, options...)
|
return fs, fs.Update(ctx, in, src, options...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the container exists
|
|
||||||
//
|
|
||||||
// NB this can return incorrect results if called immediately after container deletion
|
|
||||||
func (f *Fs) dirExists() (bool, error) {
|
|
||||||
options := azblob.ListBlobsSegmentOptions{
|
|
||||||
Details: azblob.BlobListingDetails{
|
|
||||||
Copy: false,
|
|
||||||
Metadata: false,
|
|
||||||
Snapshots: false,
|
|
||||||
UncommittedBlobs: false,
|
|
||||||
Deleted: false,
|
|
||||||
},
|
|
||||||
MaxResults: 1,
|
|
||||||
}
|
|
||||||
err := f.pacer.Call(func() (bool, error) {
|
|
||||||
ctx := context.Background()
|
|
||||||
_, err := f.cntURL.ListBlobsHierarchySegment(ctx, azblob.Marker{}, "", options)
|
|
||||||
return f.shouldRetry(err)
|
|
||||||
})
|
|
||||||
if err == nil {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
// Check http error code along with service code, current SDK doesn't populate service code correctly sometimes
|
|
||||||
if storageErr, ok := err.(azblob.StorageError); ok && (storageErr.ServiceCode() == azblob.ServiceCodeContainerNotFound || storageErr.Response().StatusCode == http.StatusNotFound) {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mkdir creates the container if it doesn't exist
|
// Mkdir creates the container if it doesn't exist
|
||||||
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||||
f.containerOKMu.Lock()
|
container, _ := f.split(dir)
|
||||||
defer f.containerOKMu.Unlock()
|
return f.cache.Create(container, func() error {
|
||||||
if f.containerOK {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if !f.containerDeleted {
|
|
||||||
exists, err := f.dirExists()
|
|
||||||
if err == nil {
|
|
||||||
f.containerOK = exists
|
|
||||||
}
|
|
||||||
if err != nil || exists {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// now try to create the container
|
// now try to create the container
|
||||||
err := f.pacer.Call(func() (bool, error) {
|
return f.pacer.Call(func() (bool, error) {
|
||||||
ctx := context.Background()
|
_, err := f.cntURL(container).Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
|
||||||
_, err := f.cntURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if storageErr, ok := err.(azblob.StorageError); ok {
|
if storageErr, ok := err.(azblob.StorageError); ok {
|
||||||
switch storageErr.ServiceCode() {
|
switch storageErr.ServiceCode() {
|
||||||
case azblob.ServiceCodeContainerAlreadyExists:
|
case azblob.ServiceCodeContainerAlreadyExists:
|
||||||
f.containerOK = true
|
|
||||||
return false, nil
|
return false, nil
|
||||||
case azblob.ServiceCodeContainerBeingDeleted:
|
case azblob.ServiceCodeContainerBeingDeleted:
|
||||||
// From https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container
|
// From https://docs.microsoft.com/en-us/rest/api/storageservices/delete-container
|
||||||
|
@ -839,24 +815,20 @@ func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||||
// for at least 30 seconds; the container may not be available for more than 30
|
// for at least 30 seconds; the container may not be available for more than 30
|
||||||
// seconds if the service is still processing the request.
|
// seconds if the service is still processing the request.
|
||||||
time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds
|
time.Sleep(6 * time.Second) // default 10 retries will be 60 seconds
|
||||||
f.containerDeleted = true
|
f.cache.MarkDeleted(container)
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return f.shouldRetry(err)
|
return f.shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err == nil {
|
}, nil)
|
||||||
f.containerOK = true
|
|
||||||
f.containerDeleted = false
|
|
||||||
}
|
|
||||||
return errors.Wrap(err, "failed to make container")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// isEmpty checks to see if a given directory is empty and returns an error if not
|
// isEmpty checks to see if a given (container, directory) is empty and returns an error if not
|
||||||
func (f *Fs) isEmpty(ctx context.Context, dir string) (err error) {
|
func (f *Fs) isEmpty(ctx context.Context, container, directory string) (err error) {
|
||||||
empty := true
|
empty := true
|
||||||
err = f.list(ctx, dir, true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
|
err = f.list(ctx, container, directory, f.rootDirectory, f.rootContainer == "", true, 1, func(remote string, object *azblob.BlobItem, isDirectory bool) error {
|
||||||
empty = false
|
empty = false
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -871,15 +843,13 @@ func (f *Fs) isEmpty(ctx context.Context, dir string) (err error) {
|
||||||
|
|
||||||
// deleteContainer deletes the container. It can delete a full
|
// deleteContainer deletes the container. It can delete a full
|
||||||
// container so use isEmpty if you don't want that.
|
// container so use isEmpty if you don't want that.
|
||||||
func (f *Fs) deleteContainer() error {
|
func (f *Fs) deleteContainer(ctx context.Context, container string) error {
|
||||||
f.containerOKMu.Lock()
|
return f.cache.Remove(container, func() error {
|
||||||
defer f.containerOKMu.Unlock()
|
|
||||||
options := azblob.ContainerAccessConditions{}
|
options := azblob.ContainerAccessConditions{}
|
||||||
ctx := context.Background()
|
return f.pacer.Call(func() (bool, error) {
|
||||||
err := f.pacer.Call(func() (bool, error) {
|
_, err := f.cntURL(container).GetProperties(ctx, azblob.LeaseAccessConditions{})
|
||||||
_, err := f.cntURL.GetProperties(ctx, azblob.LeaseAccessConditions{})
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
_, err = f.cntURL.Delete(ctx, options)
|
_, err = f.cntURL(container).Delete(ctx, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -893,25 +863,22 @@ func (f *Fs) deleteContainer() error {
|
||||||
|
|
||||||
return f.shouldRetry(err)
|
return f.shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err == nil {
|
})
|
||||||
f.containerOK = false
|
|
||||||
f.containerDeleted = true
|
|
||||||
}
|
|
||||||
return errors.Wrap(err, "failed to delete container")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rmdir deletes the container if the fs is at the root
|
// Rmdir deletes the container if the fs is at the root
|
||||||
//
|
//
|
||||||
// Returns an error if it isn't empty
|
// Returns an error if it isn't empty
|
||||||
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||||||
err := f.isEmpty(ctx, dir)
|
container, directory := f.split(dir)
|
||||||
|
if container == "" || directory != "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
err := f.isEmpty(ctx, container, directory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if f.root != "" || dir != "" {
|
return f.deleteContainer(ctx, container)
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return f.deleteContainer()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Precision of the remote
|
// Precision of the remote
|
||||||
|
@ -927,11 +894,12 @@ func (f *Fs) Hashes() hash.Set {
|
||||||
// Purge deletes all the files and directories including the old versions.
|
// Purge deletes all the files and directories including the old versions.
|
||||||
func (f *Fs) Purge(ctx context.Context) error {
|
func (f *Fs) Purge(ctx context.Context) error {
|
||||||
dir := "" // forward compat!
|
dir := "" // forward compat!
|
||||||
if f.root != "" || dir != "" {
|
container, directory := f.split(dir)
|
||||||
// Delegate to caller if not root container
|
if container == "" || directory != "" {
|
||||||
|
// Delegate to caller if not root of a container
|
||||||
return fs.ErrorCantPurge
|
return fs.ErrorCantPurge
|
||||||
}
|
}
|
||||||
return f.deleteContainer()
|
return f.deleteContainer(ctx, container)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy src to this remote using server side copy operations.
|
// Copy src to this remote using server side copy operations.
|
||||||
|
@ -944,6 +912,7 @@ func (f *Fs) Purge(ctx context.Context) error {
|
||||||
//
|
//
|
||||||
// If it isn't possible then return fs.ErrorCantCopy
|
// If it isn't possible then return fs.ErrorCantCopy
|
||||||
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||||
|
dstContainer, dstPath := f.split(remote)
|
||||||
err := f.Mkdir(ctx, "")
|
err := f.Mkdir(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -953,7 +922,7 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||||
fs.Debugf(src, "Can't copy - not same remote type")
|
fs.Debugf(src, "Can't copy - not same remote type")
|
||||||
return nil, fs.ErrorCantCopy
|
return nil, fs.ErrorCantCopy
|
||||||
}
|
}
|
||||||
dstBlobURL := f.getBlobReference(remote)
|
dstBlobURL := f.getBlobReference(dstContainer, dstPath)
|
||||||
srcBlobURL := srcObj.getBlobReference()
|
srcBlobURL := srcObj.getBlobReference()
|
||||||
|
|
||||||
source, err := url.Parse(srcBlobURL.String())
|
source, err := url.Parse(srcBlobURL.String())
|
||||||
|
@ -1086,7 +1055,8 @@ func (o *Object) decodeMetaDataFromBlob(info *azblob.BlobItem) (err error) {
|
||||||
|
|
||||||
// getBlobReference creates an empty blob reference with no metadata
|
// getBlobReference creates an empty blob reference with no metadata
|
||||||
func (o *Object) getBlobReference() azblob.BlobURL {
|
func (o *Object) getBlobReference() azblob.BlobURL {
|
||||||
return o.fs.getBlobReference(o.remote)
|
container, directory := o.split()
|
||||||
|
return o.fs.getBlobReference(container, directory)
|
||||||
}
|
}
|
||||||
|
|
||||||
// clearMetaData clears enough metadata so readMetaData will re-read it
|
// clearMetaData clears enough metadata so readMetaData will re-read it
|
||||||
|
@ -1206,7 +1176,6 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
||||||
ac := azblob.BlobAccessConditions{}
|
ac := azblob.BlobAccessConditions{}
|
||||||
var dowloadResponse *azblob.DownloadResponse
|
var dowloadResponse *azblob.DownloadResponse
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
log.Printf("offset=%d, count=%v", offset, count)
|
|
||||||
dowloadResponse, err = blob.Download(ctx, offset, count, ac, false)
|
dowloadResponse, err = blob.Download(ctx, offset, count, ac, false)
|
||||||
return o.fs.shouldRetry(err)
|
return o.fs.shouldRetry(err)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue