forked from TrueCloudLab/rclone
qingstor: make all operations work from the root #3421
This commit is contained in:
parent
8a0775ce3c
commit
b619430bcf
1 changed files with 214 additions and 238 deletions
|
@ -14,7 +14,6 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -24,6 +23,7 @@ import (
|
||||||
"github.com/rclone/rclone/fs/fshttp"
|
"github.com/rclone/rclone/fs/fshttp"
|
||||||
"github.com/rclone/rclone/fs/hash"
|
"github.com/rclone/rclone/fs/hash"
|
||||||
"github.com/rclone/rclone/fs/walk"
|
"github.com/rclone/rclone/fs/walk"
|
||||||
|
"github.com/rclone/rclone/lib/bucket"
|
||||||
qsConfig "github.com/yunify/qingstor-sdk-go/v3/config"
|
qsConfig "github.com/yunify/qingstor-sdk-go/v3/config"
|
||||||
qsErr "github.com/yunify/qingstor-sdk-go/v3/request/errors"
|
qsErr "github.com/yunify/qingstor-sdk-go/v3/request/errors"
|
||||||
qs "github.com/yunify/qingstor-sdk-go/v3/service"
|
qs "github.com/yunify/qingstor-sdk-go/v3/service"
|
||||||
|
@ -146,16 +146,15 @@ type Options struct {
|
||||||
|
|
||||||
// Fs represents a remote qingstor server
|
// Fs represents a remote qingstor server
|
||||||
type Fs struct {
|
type Fs struct {
|
||||||
name string // The name of the remote
|
name string // The name of the remote
|
||||||
root string // The root is a subdir, is a special object
|
root string // The root is a subdir, is a special object
|
||||||
opt Options // parsed options
|
opt Options // parsed options
|
||||||
features *fs.Features // optional features
|
features *fs.Features // optional features
|
||||||
svc *qs.Service // The connection to the qingstor server
|
svc *qs.Service // The connection to the qingstor server
|
||||||
zone string // The zone we are working on
|
zone string // The zone we are working on
|
||||||
bucket string // The bucket we are working on
|
rootBucket string // bucket part of root (if any)
|
||||||
bucketOKMu sync.Mutex // mutex to protect bucketOK and bucketDeleted
|
rootDirectory string // directory part of root (if any)
|
||||||
bucketOK bool // true if we have created the bucket
|
cache *bucket.Cache // cache for bucket creation status
|
||||||
bucketDeleted bool // true if we have deleted the bucket
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a qingstor object
|
// Object describes a qingstor object
|
||||||
|
@ -176,22 +175,23 @@ type Object struct {
|
||||||
|
|
||||||
// ------------------------------------------------------------
|
// ------------------------------------------------------------
|
||||||
|
|
||||||
// Pattern to match a qingstor path
|
// parsePath parses a remote 'url'
|
||||||
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
|
func parsePath(path string) (root string) {
|
||||||
|
root = strings.Trim(path, "/")
|
||||||
// parseParse parses a qingstor 'url'
|
|
||||||
func qsParsePath(path string) (bucket, key string, err error) {
|
|
||||||
// Pattern to match a qingstor path
|
|
||||||
parts := matcher.FindStringSubmatch(path)
|
|
||||||
if parts == nil {
|
|
||||||
err = errors.Errorf("Couldn't parse bucket out of qingstor path %q", path)
|
|
||||||
} else {
|
|
||||||
bucket, key = parts[1], parts[2]
|
|
||||||
key = strings.Trim(key, "/")
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// split returns bucket and bucketPath from the rootRelativePath
|
||||||
|
// relative to f.root
|
||||||
|
func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) {
|
||||||
|
return bucket.Split(path.Join(f.root, rootRelativePath))
|
||||||
|
}
|
||||||
|
|
||||||
|
// split returns bucket and bucketPath from the object
|
||||||
|
func (o *Object) split() (bucket, bucketPath string) {
|
||||||
|
return o.fs.split(o.remote)
|
||||||
|
}
|
||||||
|
|
||||||
// Split an URL into three parts: protocol host and port
|
// Split an URL into three parts: protocol host and port
|
||||||
func qsParseEndpoint(endpoint string) (protocol, host, port string, err error) {
|
func qsParseEndpoint(endpoint string) (protocol, host, port string, err error) {
|
||||||
/*
|
/*
|
||||||
|
@ -301,6 +301,12 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setRoot changes the root of the Fs
|
||||||
|
func (f *Fs) setRoot(root string) {
|
||||||
|
f.root = parsePath(root)
|
||||||
|
f.rootBucket, f.rootDirectory = bucket.Split(f.root)
|
||||||
|
}
|
||||||
|
|
||||||
// NewFs constructs an Fs from the path, bucket:path
|
// NewFs constructs an Fs from the path, bucket:path
|
||||||
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
// Parse config into Options struct
|
// Parse config into Options struct
|
||||||
|
@ -317,10 +323,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "qingstor: upload cutoff")
|
return nil, errors.Wrap(err, "qingstor: upload cutoff")
|
||||||
}
|
}
|
||||||
bucket, key, err := qsParsePath(root)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
svc, err := qsServiceConnection(opt)
|
svc, err := qsServiceConnection(opt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -331,36 +333,33 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
f := &Fs{
|
f := &Fs{
|
||||||
name: name,
|
name: name,
|
||||||
root: key,
|
opt: *opt,
|
||||||
opt: *opt,
|
svc: svc,
|
||||||
svc: svc,
|
zone: opt.Zone,
|
||||||
zone: opt.Zone,
|
cache: bucket.NewCache(),
|
||||||
bucket: bucket,
|
|
||||||
}
|
}
|
||||||
|
f.setRoot(root)
|
||||||
f.features = (&fs.Features{
|
f.features = (&fs.Features{
|
||||||
ReadMimeType: true,
|
ReadMimeType: true,
|
||||||
WriteMimeType: true,
|
WriteMimeType: true,
|
||||||
BucketBased: true,
|
BucketBased: true,
|
||||||
|
BucketBasedRootOK: true,
|
||||||
}).Fill(f)
|
}).Fill(f)
|
||||||
|
|
||||||
if f.root != "" {
|
if f.rootBucket != "" && f.rootDirectory != "" {
|
||||||
if !strings.HasSuffix(f.root, "/") {
|
// Check to see if the object exists
|
||||||
f.root += "/"
|
bucketInit, err := svc.Bucket(f.rootBucket, opt.Zone)
|
||||||
}
|
|
||||||
//Check to see if the object exists
|
|
||||||
bucketInit, err := svc.Bucket(bucket, opt.Zone)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
_, err = bucketInit.HeadObject(key, &qs.HeadObjectInput{})
|
_, err = bucketInit.HeadObject(f.rootDirectory, &qs.HeadObjectInput{})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f.root = path.Dir(key)
|
newRoot := path.Dir(f.root)
|
||||||
if f.root == "." {
|
if newRoot == "." {
|
||||||
f.root = ""
|
newRoot = ""
|
||||||
} else {
|
|
||||||
f.root += "/"
|
|
||||||
}
|
}
|
||||||
|
f.setRoot(newRoot)
|
||||||
// return an error with an fs which points to the parent
|
// return an error with an fs which points to the parent
|
||||||
return f, fs.ErrorIsFile
|
return f, fs.ErrorIsFile
|
||||||
}
|
}
|
||||||
|
@ -375,18 +374,18 @@ func (f *Fs) Name() string {
|
||||||
|
|
||||||
// Root of the remote (as passed into NewFs)
|
// Root of the remote (as passed into NewFs)
|
||||||
func (f *Fs) Root() string {
|
func (f *Fs) Root() string {
|
||||||
if f.root == "" {
|
return f.root
|
||||||
return f.bucket
|
|
||||||
}
|
|
||||||
return f.bucket + "/" + f.root
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// String converts this Fs to a string
|
// String converts this Fs to a string
|
||||||
func (f *Fs) String() string {
|
func (f *Fs) String() string {
|
||||||
if f.root == "" {
|
if f.rootBucket == "" {
|
||||||
return fmt.Sprintf("QingStor bucket %s", f.bucket)
|
return fmt.Sprintf("QingStor root")
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("QingStor bucket %s root %s", f.bucket, f.root)
|
if f.rootDirectory == "" {
|
||||||
|
return fmt.Sprintf("QingStor bucket %s", f.rootBucket)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("QingStor bucket %s path %s", f.rootBucket, f.rootDirectory)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Precision of the remote
|
// Precision of the remote
|
||||||
|
@ -426,6 +425,7 @@ func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options .
|
||||||
//
|
//
|
||||||
// If it isn't possible then return fs.ErrorCantCopy
|
// If it isn't possible then return fs.ErrorCantCopy
|
||||||
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||||
|
dstBucket, dstPath := f.split(remote)
|
||||||
err := f.Mkdir(ctx, "")
|
err := f.Mkdir(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -435,22 +435,21 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||||
fs.Debugf(src, "Can't copy - not same remote type")
|
fs.Debugf(src, "Can't copy - not same remote type")
|
||||||
return nil, fs.ErrorCantCopy
|
return nil, fs.ErrorCantCopy
|
||||||
}
|
}
|
||||||
srcFs := srcObj.fs
|
srcBucket, srcPath := srcObj.split()
|
||||||
key := f.root + remote
|
source := path.Join("/", srcBucket, srcPath)
|
||||||
source := path.Join("/"+srcFs.bucket, srcFs.root+srcObj.remote)
|
|
||||||
|
|
||||||
fs.Debugf(f, "Copied, source key is: %s, and dst key is: %s", source, key)
|
// fs.Debugf(f, "Copied, source key is: %s, and dst key is: %s", source, key)
|
||||||
req := qs.PutObjectInput{
|
req := qs.PutObjectInput{
|
||||||
XQSCopySource: &source,
|
XQSCopySource: &source,
|
||||||
}
|
}
|
||||||
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
bucketInit, err := f.svc.Bucket(dstBucket, f.zone)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
_, err = bucketInit.PutObject(key, &req)
|
_, err = bucketInit.PutObject(dstPath, &req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.Debugf(f, "Copy Failed, API Error: %v", err)
|
// fs.Debugf(f, "Copy Failed, API Error: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return f.NewObject(ctx, remote)
|
return f.NewObject(ctx, remote)
|
||||||
|
@ -511,29 +510,27 @@ type listFn func(remote string, object *qs.KeyType, isDirectory bool) error
|
||||||
// dir is the starting directory, "" for root
|
// dir is the starting directory, "" for root
|
||||||
//
|
//
|
||||||
// Set recurse to read sub directories
|
// Set recurse to read sub directories
|
||||||
func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) error {
|
func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) error {
|
||||||
prefix := f.root
|
if prefix != "" {
|
||||||
if dir != "" {
|
prefix += "/"
|
||||||
prefix += dir + "/"
|
}
|
||||||
|
if directory != "" {
|
||||||
|
directory += "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
delimiter := ""
|
delimiter := ""
|
||||||
if !recurse {
|
if !recurse {
|
||||||
delimiter = "/"
|
delimiter = "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
maxLimit := int(listLimitSize)
|
maxLimit := int(listLimitSize)
|
||||||
var marker *string
|
var marker *string
|
||||||
|
|
||||||
for {
|
for {
|
||||||
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
bucketInit, err := f.svc.Bucket(bucket, f.zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// FIXME need to implement ALL loop
|
|
||||||
req := qs.ListObjectsInput{
|
req := qs.ListObjectsInput{
|
||||||
Delimiter: &delimiter,
|
Delimiter: &delimiter,
|
||||||
Prefix: &prefix,
|
Prefix: &directory,
|
||||||
Limit: &maxLimit,
|
Limit: &maxLimit,
|
||||||
Marker: marker,
|
Marker: marker,
|
||||||
}
|
}
|
||||||
|
@ -546,7 +543,6 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
rootLength := len(f.root)
|
|
||||||
if !recurse {
|
if !recurse {
|
||||||
for _, commonPrefix := range resp.CommonPrefixes {
|
for _, commonPrefix := range resp.CommonPrefixes {
|
||||||
if commonPrefix == nil {
|
if commonPrefix == nil {
|
||||||
|
@ -554,15 +550,17 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote := *commonPrefix
|
remote := *commonPrefix
|
||||||
if !strings.HasPrefix(remote, f.root) {
|
if !strings.HasPrefix(remote, prefix) {
|
||||||
fs.Logf(f, "Odd name received %q", remote)
|
fs.Logf(f, "Odd name received %q", remote)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote = remote[rootLength:]
|
remote = remote[len(prefix):]
|
||||||
|
if addBucket {
|
||||||
|
remote = path.Join(bucket, remote)
|
||||||
|
}
|
||||||
if strings.HasSuffix(remote, "/") {
|
if strings.HasSuffix(remote, "/") {
|
||||||
remote = remote[:len(remote)-1]
|
remote = remote[:len(remote)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
err = fn(remote, &qs.KeyType{Key: &remote}, true)
|
err = fn(remote, &qs.KeyType{Key: &remote}, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -572,11 +570,14 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro
|
||||||
|
|
||||||
for _, object := range resp.Keys {
|
for _, object := range resp.Keys {
|
||||||
key := qs.StringValue(object.Key)
|
key := qs.StringValue(object.Key)
|
||||||
if !strings.HasPrefix(key, f.root) {
|
if !strings.HasPrefix(key, prefix) {
|
||||||
fs.Logf(f, "Odd name received %q", key)
|
fs.Logf(f, "Odd name received %q", key)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote := key[rootLength:]
|
remote := key[len(prefix):]
|
||||||
|
if addBucket {
|
||||||
|
remote = path.Join(bucket, remote)
|
||||||
|
}
|
||||||
err = fn(remote, object, false)
|
err = fn(remote, object, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -613,20 +614,10 @@ func (f *Fs) itemToDirEntry(remote string, object *qs.KeyType, isDirectory bool)
|
||||||
return o, nil
|
return o, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark the bucket as being OK
|
|
||||||
func (f *Fs) markBucketOK() {
|
|
||||||
if f.bucket != "" {
|
|
||||||
f.bucketOKMu.Lock()
|
|
||||||
f.bucketOK = true
|
|
||||||
f.bucketDeleted = false
|
|
||||||
f.bucketOKMu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// listDir lists files and directories to out
|
// listDir lists files and directories to out
|
||||||
func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) {
|
||||||
// List the objects and directories
|
// List the objects and directories
|
||||||
err = f.list(ctx, dir, false, func(remote string, object *qs.KeyType, isDirectory bool) error {
|
err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *qs.KeyType, isDirectory bool) error {
|
||||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -640,7 +631,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// bucket must be present if listing succeeded
|
// bucket must be present if listing succeeded
|
||||||
f.markBucketOK()
|
f.cache.MarkOK(bucket)
|
||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -675,10 +666,11 @@ func (f *Fs) listBuckets(dir string) (entries fs.DirEntries, err error) {
|
||||||
// This should return ErrDirNotFound if the directory isn't
|
// This should return ErrDirNotFound if the directory isn't
|
||||||
// found.
|
// found.
|
||||||
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
||||||
if f.bucket == "" {
|
bucket, directory := f.split(dir)
|
||||||
|
if bucket == "" {
|
||||||
return f.listBuckets(dir)
|
return f.listBuckets(dir)
|
||||||
}
|
}
|
||||||
return f.listDir(ctx, dir)
|
return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListR lists the objects and directories of the Fs starting
|
// ListR lists the objects and directories of the Fs starting
|
||||||
|
@ -698,106 +690,98 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
||||||
// Don't implement this unless you have a more efficient way
|
// Don't implement this unless you have a more efficient way
|
||||||
// of listing recursively that doing a directory traversal.
|
// of listing recursively that doing a directory traversal.
|
||||||
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
||||||
if f.bucket == "" {
|
bucket, directory := f.split(dir)
|
||||||
return fs.ErrorListBucketRequired
|
|
||||||
}
|
|
||||||
list := walk.NewListRHelper(callback)
|
list := walk.NewListRHelper(callback)
|
||||||
err = f.list(ctx, dir, true, func(remote string, object *qs.KeyType, isDirectory bool) error {
|
listR := func(bucket, directory, prefix string, addBucket bool) error {
|
||||||
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *qs.KeyType, isDirectory bool) error {
|
||||||
|
entry, err := f.itemToDirEntry(remote, object, isDirectory)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return list.Add(entry)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if bucket == "" {
|
||||||
|
entries, err := f.listBuckets("")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return list.Add(entry)
|
for _, entry := range entries {
|
||||||
})
|
err = list.Add(entry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// bucket must be present if listing succeeded
|
bucket := entry.Remote()
|
||||||
f.markBucketOK()
|
err = listR(bucket, "", f.rootDirectory, true)
|
||||||
return list.Flush()
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
}
|
||||||
// Check if the bucket exists
|
}
|
||||||
func (f *Fs) dirExists() (bool, error) {
|
} else {
|
||||||
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return err
|
||||||
}
|
|
||||||
|
|
||||||
_, err = bucketInit.Head()
|
|
||||||
if err == nil {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, ok := err.(*qsErr.QingStorError); ok {
|
|
||||||
if e.StatusCode == http.StatusNotFound {
|
|
||||||
err = nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false, err
|
// bucket must be present if listing succeeded
|
||||||
|
f.cache.MarkOK(bucket)
|
||||||
|
return list.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mkdir creates the bucket if it doesn't exist
|
// Mkdir creates the bucket if it doesn't exist
|
||||||
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||||
f.bucketOKMu.Lock()
|
bucket, _ := f.split(dir)
|
||||||
defer f.bucketOKMu.Unlock()
|
return f.cache.Create(bucket, func() error {
|
||||||
if f.bucketOK {
|
bucketInit, err := f.svc.Bucket(bucket, f.zone)
|
||||||
return nil
|
if err != nil {
|
||||||
}
|
|
||||||
|
|
||||||
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
/* When delete a bucket, qingstor need about 60 second to sync status;
|
|
||||||
So, need wait for it sync end if we try to operation a just deleted bucket
|
|
||||||
*/
|
|
||||||
retries := 0
|
|
||||||
for retries <= 120 {
|
|
||||||
statistics, err := bucketInit.GetStatistics()
|
|
||||||
if statistics == nil || err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
switch *statistics.Status {
|
|
||||||
case "deleted":
|
|
||||||
fs.Debugf(f, "Wait for qingstor sync bucket status, retries: %d", retries)
|
|
||||||
time.Sleep(time.Second * 1)
|
|
||||||
retries++
|
|
||||||
continue
|
|
||||||
default:
|
|
||||||
break
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if !f.bucketDeleted {
|
|
||||||
exists, err := f.dirExists()
|
|
||||||
if err == nil {
|
|
||||||
f.bucketOK = exists
|
|
||||||
}
|
|
||||||
if err != nil || exists {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
/* When delete a bucket, qingstor need about 60 second to sync status;
|
||||||
|
So, need wait for it sync end if we try to operation a just deleted bucket
|
||||||
_, err = bucketInit.Put()
|
*/
|
||||||
if e, ok := err.(*qsErr.QingStorError); ok {
|
wasDeleted := false
|
||||||
if e.StatusCode == http.StatusConflict {
|
retries := 0
|
||||||
err = nil
|
for retries <= 120 {
|
||||||
|
statistics, err := bucketInit.GetStatistics()
|
||||||
|
if statistics == nil || err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
switch *statistics.Status {
|
||||||
|
case "deleted":
|
||||||
|
fs.Debugf(f, "Wait for qingstor bucket to be deleted, retries: %d", retries)
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
retries++
|
||||||
|
wasDeleted = true
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if err == nil {
|
retries = 0
|
||||||
f.bucketOK = true
|
for retries <= 120 {
|
||||||
f.bucketDeleted = false
|
_, err = bucketInit.Put()
|
||||||
}
|
if e, ok := err.(*qsErr.QingStorError); ok {
|
||||||
|
if e.StatusCode == http.StatusConflict {
|
||||||
return err
|
if wasDeleted {
|
||||||
|
fs.Debugf(f, "Wait for qingstor bucket to be creatable, retries: %d", retries)
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
retries++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// dirIsEmpty check if the bucket empty
|
// bucketIsEmpty check if the bucket empty
|
||||||
func (f *Fs) dirIsEmpty() (bool, error) {
|
func (f *Fs) bucketIsEmpty(bucket string) (bool, error) {
|
||||||
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
bucketInit, err := f.svc.Bucket(bucket, f.zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
@ -815,71 +799,64 @@ func (f *Fs) dirIsEmpty() (bool, error) {
|
||||||
|
|
||||||
// Rmdir delete a bucket
|
// Rmdir delete a bucket
|
||||||
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||||||
f.bucketOKMu.Lock()
|
bucket, directory := f.split(dir)
|
||||||
defer f.bucketOKMu.Unlock()
|
if bucket == "" || directory != "" {
|
||||||
if f.root != "" || dir != "" {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
isEmpty, err := f.bucketIsEmpty(bucket)
|
||||||
isEmpty, err := f.dirIsEmpty()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !isEmpty {
|
if !isEmpty {
|
||||||
fs.Debugf(f, "The bucket %s you tried to delete not empty.", f.bucket)
|
// fs.Debugf(f, "The bucket %s you tried to delete not empty.", bucket)
|
||||||
return errors.New("BucketNotEmpty: The bucket you tried to delete is not empty")
|
return errors.New("BucketNotEmpty: The bucket you tried to delete is not empty")
|
||||||
}
|
}
|
||||||
|
return f.cache.Remove(bucket, func() error {
|
||||||
fs.Debugf(f, "Tried to delete the bucket %s", f.bucket)
|
// fs.Debugf(f, "Deleting the bucket %s", bucket)
|
||||||
bucketInit, err := f.svc.Bucket(f.bucket, f.zone)
|
bucketInit, err := f.svc.Bucket(bucket, f.zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
|
||||||
retries := 0
|
|
||||||
for retries <= 10 {
|
|
||||||
_, delErr := bucketInit.Delete()
|
|
||||||
if delErr != nil {
|
|
||||||
if e, ok := delErr.(*qsErr.QingStorError); ok {
|
|
||||||
switch e.Code {
|
|
||||||
// The status of "lease" takes a few seconds to "ready" when creating a new bucket
|
|
||||||
// wait for lease status ready
|
|
||||||
case "lease_not_ready":
|
|
||||||
fs.Debugf(f, "QingStor bucket lease not ready, retries: %d", retries)
|
|
||||||
retries++
|
|
||||||
time.Sleep(time.Second * 1)
|
|
||||||
continue
|
|
||||||
default:
|
|
||||||
err = e
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
err = delErr
|
|
||||||
}
|
}
|
||||||
break
|
retries := 0
|
||||||
}
|
for retries <= 10 {
|
||||||
|
_, delErr := bucketInit.Delete()
|
||||||
if err == nil {
|
if delErr != nil {
|
||||||
f.bucketOK = false
|
if e, ok := delErr.(*qsErr.QingStorError); ok {
|
||||||
f.bucketDeleted = true
|
switch e.Code {
|
||||||
}
|
// The status of "lease" takes a few seconds to "ready" when creating a new bucket
|
||||||
return err
|
// wait for lease status ready
|
||||||
|
case "lease_not_ready":
|
||||||
|
fs.Debugf(f, "QingStor bucket lease not ready, retries: %d", retries)
|
||||||
|
retries++
|
||||||
|
time.Sleep(time.Second * 1)
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
err = e
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = delErr
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// readMetaData gets the metadata if it hasn't already been fetched
|
// readMetaData gets the metadata if it hasn't already been fetched
|
||||||
//
|
//
|
||||||
// it also sets the info
|
// it also sets the info
|
||||||
func (o *Object) readMetaData() (err error) {
|
func (o *Object) readMetaData() (err error) {
|
||||||
bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone)
|
bucket, bucketPath := o.split()
|
||||||
|
bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// fs.Debugf(o, "Read metadata of key: %s", key)
|
||||||
key := o.fs.root + o.remote
|
resp, err := bucketInit.HeadObject(bucketPath, &qs.HeadObjectInput{})
|
||||||
fs.Debugf(o, "Read metadata of key: %s", key)
|
|
||||||
resp, err := bucketInit.HeadObject(key, &qs.HeadObjectInput{})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fs.Debugf(o, "Read metadata failed, API Error: %v", err)
|
// fs.Debugf(o, "Read metadata failed, API Error: %v", err)
|
||||||
if e, ok := err.(*qsErr.QingStorError); ok {
|
if e, ok := err.(*qsErr.QingStorError); ok {
|
||||||
if e.StatusCode == http.StatusNotFound {
|
if e.StatusCode == http.StatusNotFound {
|
||||||
return fs.ErrorObjectNotFound
|
return fs.ErrorObjectNotFound
|
||||||
|
@ -941,10 +918,10 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// Copy the object to itself to update the metadata
|
// Copy the object to itself to update the metadata
|
||||||
key := o.fs.root + o.remote
|
bucket, bucketPath := o.split()
|
||||||
sourceKey := path.Join("/", o.fs.bucket, key)
|
sourceKey := path.Join("/", bucket, bucketPath)
|
||||||
|
|
||||||
bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone)
|
bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -953,19 +930,19 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
||||||
XQSCopySource: &sourceKey,
|
XQSCopySource: &sourceKey,
|
||||||
ContentType: &mimeType,
|
ContentType: &mimeType,
|
||||||
}
|
}
|
||||||
_, err = bucketInit.PutObject(key, &req)
|
_, err = bucketInit.PutObject(bucketPath, &req)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
// Open opens the file for read. Call Close() on the returned io.ReadCloser
|
||||||
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadCloser, error) {
|
||||||
bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone)
|
bucket, bucketPath := o.split()
|
||||||
|
bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
key := o.fs.root + o.remote
|
|
||||||
req := qs.GetObjectInput{}
|
req := qs.GetObjectInput{}
|
||||||
fs.FixRangeOption(options, o.size)
|
fs.FixRangeOption(options, o.size)
|
||||||
for _, option := range options {
|
for _, option := range options {
|
||||||
|
@ -979,7 +956,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
resp, err := bucketInit.GetObject(key, &req)
|
resp, err := bucketInit.GetObject(bucketPath, &req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -989,21 +966,21 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (io.ReadClo
|
||||||
// Update in to the object
|
// Update in to the object
|
||||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
// The maximum size of upload object is multipartUploadSize * MaxMultipleParts
|
// The maximum size of upload object is multipartUploadSize * MaxMultipleParts
|
||||||
|
bucket, bucketPath := o.split()
|
||||||
err := o.fs.Mkdir(ctx, "")
|
err := o.fs.Mkdir(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
key := o.fs.root + o.remote
|
|
||||||
// Guess the content type
|
// Guess the content type
|
||||||
mimeType := fs.MimeType(ctx, src)
|
mimeType := fs.MimeType(ctx, src)
|
||||||
|
|
||||||
req := uploadInput{
|
req := uploadInput{
|
||||||
body: in,
|
body: in,
|
||||||
qsSvc: o.fs.svc,
|
qsSvc: o.fs.svc,
|
||||||
bucket: o.fs.bucket,
|
bucket: bucket,
|
||||||
zone: o.fs.zone,
|
zone: o.fs.zone,
|
||||||
key: key,
|
key: bucketPath,
|
||||||
mimeType: mimeType,
|
mimeType: mimeType,
|
||||||
partSize: int64(o.fs.opt.ChunkSize),
|
partSize: int64(o.fs.opt.ChunkSize),
|
||||||
concurrency: o.fs.opt.UploadConcurrency,
|
concurrency: o.fs.opt.UploadConcurrency,
|
||||||
|
@ -1027,13 +1004,12 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
|
|
||||||
// Remove this object
|
// Remove this object
|
||||||
func (o *Object) Remove(ctx context.Context) error {
|
func (o *Object) Remove(ctx context.Context) error {
|
||||||
bucketInit, err := o.fs.svc.Bucket(o.fs.bucket, o.fs.zone)
|
bucket, bucketPath := o.split()
|
||||||
|
bucketInit, err := o.fs.svc.Bucket(bucket, o.fs.zone)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
_, err = bucketInit.DeleteObject(bucketPath)
|
||||||
key := o.fs.root + o.remote
|
|
||||||
_, err = bucketInit.DeleteObject(key)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue