s3: make all operations work from the root #3421
This commit is contained in:
parent
eaeef4811f
commit
eaaf2ded94
1 changed files with 202 additions and 187 deletions
389
backend/s3/s3.go
389
backend/s3/s3.go
|
@ -23,7 +23,6 @@ import (
|
||||||
"path"
|
"path"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
@ -46,6 +45,7 @@ import (
|
||||||
"github.com/rclone/rclone/fs/fshttp"
|
"github.com/rclone/rclone/fs/fshttp"
|
||||||
"github.com/rclone/rclone/fs/hash"
|
"github.com/rclone/rclone/fs/hash"
|
||||||
"github.com/rclone/rclone/fs/walk"
|
"github.com/rclone/rclone/fs/walk"
|
||||||
|
"github.com/rclone/rclone/lib/bucket"
|
||||||
"github.com/rclone/rclone/lib/pacer"
|
"github.com/rclone/rclone/lib/pacer"
|
||||||
"github.com/rclone/rclone/lib/rest"
|
"github.com/rclone/rclone/lib/rest"
|
||||||
)
|
)
|
||||||
|
@ -798,10 +798,9 @@ type Fs struct {
|
||||||
features *fs.Features // optional features
|
features *fs.Features // optional features
|
||||||
c *s3.S3 // the connection to the s3 server
|
c *s3.S3 // the connection to the s3 server
|
||||||
ses *session.Session // the s3 session
|
ses *session.Session // the s3 session
|
||||||
bucket string // the bucket we are working on
|
rootBucket string // bucket part of root (if any)
|
||||||
bucketOKMu sync.Mutex // mutex to protect bucket OK
|
rootDirectory string // directory part of root (if any)
|
||||||
bucketOK bool // true if we have created the bucket
|
cache *bucket.Cache // cache for bucket creation status
|
||||||
bucketDeleted bool // true if we have deleted the bucket
|
|
||||||
pacer *fs.Pacer // To pace the API calls
|
pacer *fs.Pacer // To pace the API calls
|
||||||
srv *http.Client // a plain http client
|
srv *http.Client // a plain http client
|
||||||
}
|
}
|
||||||
|
@ -830,18 +829,18 @@ func (f *Fs) Name() string {
|
||||||
|
|
||||||
// Root of the remote (as passed into NewFs)
|
// Root of the remote (as passed into NewFs)
|
||||||
func (f *Fs) Root() string {
|
func (f *Fs) Root() string {
|
||||||
if f.root == "" {
|
return f.root
|
||||||
return f.bucket
|
|
||||||
}
|
|
||||||
return f.bucket + "/" + f.root
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// String converts this Fs to a string
|
// String converts this Fs to a string
|
||||||
func (f *Fs) String() string {
|
func (f *Fs) String() string {
|
||||||
if f.root == "" {
|
if f.rootBucket == "" {
|
||||||
return fmt.Sprintf("S3 bucket %s", f.bucket)
|
return fmt.Sprintf("S3 root")
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("S3 bucket %s path %s", f.bucket, f.root)
|
if f.rootDirectory == "" {
|
||||||
|
return fmt.Sprintf("S3 bucket %s", f.rootBucket)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("S3 bucket %s path %s", f.rootBucket, f.rootDirectory)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Features returns the optional features of this Fs
|
// Features returns the optional features of this Fs
|
||||||
|
@ -868,14 +867,16 @@ func (f *Fs) shouldRetry(err error) (bool, error) {
|
||||||
}
|
}
|
||||||
// Failing that, if it's a RequestFailure it's probably got an http status code we can check
|
// Failing that, if it's a RequestFailure it's probably got an http status code we can check
|
||||||
if reqErr, ok := err.(awserr.RequestFailure); ok {
|
if reqErr, ok := err.(awserr.RequestFailure); ok {
|
||||||
// 301 if wrong region for bucket
|
// 301 if wrong region for bucket - can only update if running from a bucket
|
||||||
if reqErr.StatusCode() == http.StatusMovedPermanently {
|
if f.rootBucket != "" {
|
||||||
urfbErr := f.updateRegionForBucket()
|
if reqErr.StatusCode() == http.StatusMovedPermanently {
|
||||||
if urfbErr != nil {
|
urfbErr := f.updateRegionForBucket(f.rootBucket)
|
||||||
fs.Errorf(f, "Failed to update region for bucket: %v", urfbErr)
|
if urfbErr != nil {
|
||||||
return false, err
|
fs.Errorf(f, "Failed to update region for bucket: %v", urfbErr)
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, err
|
||||||
}
|
}
|
||||||
return true, err
|
|
||||||
}
|
}
|
||||||
for _, e := range retryErrorCodes {
|
for _, e := range retryErrorCodes {
|
||||||
if reqErr.StatusCode() == e {
|
if reqErr.StatusCode() == e {
|
||||||
|
@ -888,21 +889,23 @@ func (f *Fs) shouldRetry(err error) (bool, error) {
|
||||||
return fserrors.ShouldRetry(err), err
|
return fserrors.ShouldRetry(err), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pattern to match a s3 path
|
// parsePath parses a remote 'url'
|
||||||
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
|
func parsePath(path string) (root string) {
|
||||||
|
root = strings.Trim(path, "/")
|
||||||
// parseParse parses a s3 'url'
|
|
||||||
func s3ParsePath(path string) (bucket, directory string, err error) {
|
|
||||||
parts := matcher.FindStringSubmatch(path)
|
|
||||||
if parts == nil {
|
|
||||||
err = errors.Errorf("couldn't parse bucket out of s3 path %q", path)
|
|
||||||
} else {
|
|
||||||
bucket, directory = parts[1], parts[2]
|
|
||||||
directory = strings.Trim(directory, "/")
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// split returns bucket and bucketPath from the rootRelativePath
|
||||||
|
// relative to f.root
|
||||||
|
func (f *Fs) split(rootRelativePath string) (bucketName, bucketPath string) {
|
||||||
|
return bucket.Split(path.Join(f.root, rootRelativePath))
|
||||||
|
}
|
||||||
|
|
||||||
|
// split returns bucket and bucketPath from the object
|
||||||
|
func (o *Object) split() (bucket, bucketPath string) {
|
||||||
|
return o.fs.split(o.remote)
|
||||||
|
}
|
||||||
|
|
||||||
// s3Connection makes a connection to s3
|
// s3Connection makes a connection to s3
|
||||||
func s3Connection(opt *Options) (*s3.S3, *session.Session, error) {
|
func s3Connection(opt *Options) (*s3.S3, *session.Session, error) {
|
||||||
// Make the auth
|
// Make the auth
|
||||||
|
@ -1039,6 +1042,12 @@ func (f *Fs) setUploadCutoff(cs fs.SizeSuffix) (old fs.SizeSuffix, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setRoot changes the root of the Fs
|
||||||
|
func (f *Fs) setRoot(root string) {
|
||||||
|
f.root = parsePath(root)
|
||||||
|
f.rootBucket, f.rootDirectory = bucket.Split(f.root)
|
||||||
|
}
|
||||||
|
|
||||||
// NewFs constructs an Fs from the path, bucket:path
|
// NewFs constructs an Fs from the path, bucket:path
|
||||||
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
// Parse config into Options struct
|
// Parse config into Options struct
|
||||||
|
@ -1055,10 +1064,6 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "s3: upload cutoff")
|
return nil, errors.Wrap(err, "s3: upload cutoff")
|
||||||
}
|
}
|
||||||
bucket, directory, err := s3ParsePath(root)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if opt.ACL == "" {
|
if opt.ACL == "" {
|
||||||
opt.ACL = "private"
|
opt.ACL = "private"
|
||||||
}
|
}
|
||||||
|
@ -1070,38 +1075,37 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
f := &Fs{
|
f := &Fs{
|
||||||
name: name,
|
name: name,
|
||||||
root: directory,
|
opt: *opt,
|
||||||
opt: *opt,
|
c: c,
|
||||||
c: c,
|
ses: ses,
|
||||||
bucket: bucket,
|
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))),
|
||||||
ses: ses,
|
cache: bucket.NewCache(),
|
||||||
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))),
|
srv: fshttp.NewClient(fs.Config),
|
||||||
srv: fshttp.NewClient(fs.Config),
|
|
||||||
}
|
}
|
||||||
|
f.setRoot(root)
|
||||||
f.features = (&fs.Features{
|
f.features = (&fs.Features{
|
||||||
ReadMimeType: true,
|
ReadMimeType: true,
|
||||||
WriteMimeType: true,
|
WriteMimeType: true,
|
||||||
BucketBased: true,
|
BucketBased: true,
|
||||||
|
BucketBasedRootOK: true,
|
||||||
}).Fill(f)
|
}).Fill(f)
|
||||||
if f.root != "" {
|
if f.rootBucket != "" && f.rootDirectory != "" {
|
||||||
f.root += "/"
|
|
||||||
// Check to see if the object exists
|
// Check to see if the object exists
|
||||||
req := s3.HeadObjectInput{
|
req := s3.HeadObjectInput{
|
||||||
Bucket: &f.bucket,
|
Bucket: &f.rootBucket,
|
||||||
Key: &directory,
|
Key: &f.rootDirectory,
|
||||||
}
|
}
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
_, err = f.c.HeadObject(&req)
|
_, err = f.c.HeadObject(&req)
|
||||||
return f.shouldRetry(err)
|
return f.shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f.root = path.Dir(directory)
|
newRoot := path.Dir(f.root)
|
||||||
if f.root == "." {
|
if newRoot == "." {
|
||||||
f.root = ""
|
newRoot = ""
|
||||||
} else {
|
|
||||||
f.root += "/"
|
|
||||||
}
|
}
|
||||||
|
f.setRoot(newRoot)
|
||||||
// return an error with an fs which points to the parent
|
// return an error with an fs which points to the parent
|
||||||
return f, fs.ErrorIsFile
|
return f, fs.ErrorIsFile
|
||||||
}
|
}
|
||||||
|
@ -1144,9 +1148,9 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gets the bucket location
|
// Gets the bucket location
|
||||||
func (f *Fs) getBucketLocation() (string, error) {
|
func (f *Fs) getBucketLocation(bucket string) (string, error) {
|
||||||
req := s3.GetBucketLocationInput{
|
req := s3.GetBucketLocationInput{
|
||||||
Bucket: &f.bucket,
|
Bucket: &bucket,
|
||||||
}
|
}
|
||||||
var resp *s3.GetBucketLocationOutput
|
var resp *s3.GetBucketLocationOutput
|
||||||
var err error
|
var err error
|
||||||
|
@ -1162,8 +1166,8 @@ func (f *Fs) getBucketLocation() (string, error) {
|
||||||
|
|
||||||
// Updates the region for the bucket by reading the region from the
|
// Updates the region for the bucket by reading the region from the
|
||||||
// bucket then updating the session.
|
// bucket then updating the session.
|
||||||
func (f *Fs) updateRegionForBucket() error {
|
func (f *Fs) updateRegionForBucket(bucket string) error {
|
||||||
region, err := f.getBucketLocation()
|
region, err := f.getBucketLocation(bucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "reading bucket location failed")
|
return errors.Wrap(err, "reading bucket location failed")
|
||||||
}
|
}
|
||||||
|
@ -1191,15 +1195,18 @@ func (f *Fs) updateRegionForBucket() error {
|
||||||
// listFn is called from list to handle an object.
|
// listFn is called from list to handle an object.
|
||||||
type listFn func(remote string, object *s3.Object, isDirectory bool) error
|
type listFn func(remote string, object *s3.Object, isDirectory bool) error
|
||||||
|
|
||||||
// list the objects into the function supplied
|
// list lists the objects into the function supplied from
|
||||||
//
|
// the bucket and directory supplied. The remote has prefix
|
||||||
// dir is the starting directory, "" for root
|
// removed from it and if addBucket is set then it adds the
|
||||||
|
// bucket to the start.
|
||||||
//
|
//
|
||||||
// Set recurse to read sub directories
|
// Set recurse to read sub directories
|
||||||
func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) error {
|
func (f *Fs) list(ctx context.Context, bucket, directory, prefix string, addBucket bool, recurse bool, fn listFn) error {
|
||||||
root := f.root
|
if prefix != "" {
|
||||||
if dir != "" {
|
prefix += "/"
|
||||||
root += dir + "/"
|
}
|
||||||
|
if directory != "" {
|
||||||
|
directory += "/"
|
||||||
}
|
}
|
||||||
maxKeys := int64(listChunkSize)
|
maxKeys := int64(listChunkSize)
|
||||||
delimiter := ""
|
delimiter := ""
|
||||||
|
@ -1210,9 +1217,9 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro
|
||||||
for {
|
for {
|
||||||
// FIXME need to implement ALL loop
|
// FIXME need to implement ALL loop
|
||||||
req := s3.ListObjectsInput{
|
req := s3.ListObjectsInput{
|
||||||
Bucket: &f.bucket,
|
Bucket: &bucket,
|
||||||
Delimiter: &delimiter,
|
Delimiter: &delimiter,
|
||||||
Prefix: &root,
|
Prefix: &directory,
|
||||||
MaxKeys: &maxKeys,
|
MaxKeys: &maxKeys,
|
||||||
Marker: marker,
|
Marker: marker,
|
||||||
}
|
}
|
||||||
|
@ -1228,9 +1235,19 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro
|
||||||
err = fs.ErrorDirNotFound
|
err = fs.ErrorDirNotFound
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if f.rootBucket == "" {
|
||||||
|
// if listing from the root ignore wrong region requests returning
|
||||||
|
// empty directory
|
||||||
|
if reqErr, ok := err.(awserr.RequestFailure); ok {
|
||||||
|
// 301 if wrong region for bucket
|
||||||
|
if reqErr.StatusCode() == http.StatusMovedPermanently {
|
||||||
|
fs.Errorf(f, "Can't change region for bucket %q with no bucket specified", bucket)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
rootLength := len(f.root)
|
|
||||||
if !recurse {
|
if !recurse {
|
||||||
for _, commonPrefix := range resp.CommonPrefixes {
|
for _, commonPrefix := range resp.CommonPrefixes {
|
||||||
if commonPrefix.Prefix == nil {
|
if commonPrefix.Prefix == nil {
|
||||||
|
@ -1238,11 +1255,14 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote := *commonPrefix.Prefix
|
remote := *commonPrefix.Prefix
|
||||||
if !strings.HasPrefix(remote, f.root) {
|
if !strings.HasPrefix(remote, prefix) {
|
||||||
fs.Logf(f, "Odd name received %q", remote)
|
fs.Logf(f, "Odd name received %q", remote)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote = remote[rootLength:]
|
remote = remote[len(prefix):]
|
||||||
|
if addBucket {
|
||||||
|
remote = path.Join(bucket, remote)
|
||||||
|
}
|
||||||
if strings.HasSuffix(remote, "/") {
|
if strings.HasSuffix(remote, "/") {
|
||||||
remote = remote[:len(remote)-1]
|
remote = remote[:len(remote)-1]
|
||||||
}
|
}
|
||||||
|
@ -1253,22 +1273,18 @@ func (f *Fs) list(ctx context.Context, dir string, recurse bool, fn listFn) erro
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, object := range resp.Contents {
|
for _, object := range resp.Contents {
|
||||||
key := aws.StringValue(object.Key)
|
remote := aws.StringValue(object.Key)
|
||||||
if !strings.HasPrefix(key, f.root) {
|
if !strings.HasPrefix(remote, prefix) {
|
||||||
fs.Logf(f, "Odd name received %q", key)
|
fs.Logf(f, "Odd name received %q", remote)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote := key[rootLength:]
|
remote = remote[len(prefix):]
|
||||||
|
isDirectory := strings.HasSuffix(remote, "/")
|
||||||
|
if addBucket {
|
||||||
|
remote = path.Join(bucket, remote)
|
||||||
|
}
|
||||||
// is this a directory marker?
|
// is this a directory marker?
|
||||||
if (strings.HasSuffix(remote, "/") || remote == "") && *object.Size == 0 {
|
if isDirectory && object.Size != nil && *object.Size == 0 {
|
||||||
if recurse && remote != "" {
|
|
||||||
// add a directory in if --fast-list since will have no prefixes
|
|
||||||
remote = remote[:len(remote)-1]
|
|
||||||
err = fn(remote, &s3.Object{Key: &remote}, true)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
continue // skip directory marker
|
continue // skip directory marker
|
||||||
}
|
}
|
||||||
err = fn(remote, object, false)
|
err = fn(remote, object, false)
|
||||||
|
@ -1309,20 +1325,10 @@ func (f *Fs) itemToDirEntry(ctx context.Context, remote string, object *s3.Objec
|
||||||
return o, nil
|
return o, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark the bucket as being OK
|
|
||||||
func (f *Fs) markBucketOK() {
|
|
||||||
if f.bucket != "" {
|
|
||||||
f.bucketOKMu.Lock()
|
|
||||||
f.bucketOK = true
|
|
||||||
f.bucketDeleted = false
|
|
||||||
f.bucketOKMu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// listDir lists files and directories to out
|
// listDir lists files and directories to out
|
||||||
func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) listDir(ctx context.Context, bucket, directory, prefix string, addBucket bool) (entries fs.DirEntries, err error) {
|
||||||
// List the objects and directories
|
// List the objects and directories
|
||||||
err = f.list(ctx, dir, false, func(remote string, object *s3.Object, isDirectory bool) error {
|
err = f.list(ctx, bucket, directory, prefix, addBucket, false, func(remote string, object *s3.Object, isDirectory bool) error {
|
||||||
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1336,7 +1342,7 @@ func (f *Fs) listDir(ctx context.Context, dir string) (entries fs.DirEntries, er
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// bucket must be present if listing succeeded
|
// bucket must be present if listing succeeded
|
||||||
f.markBucketOK()
|
f.cache.MarkOK(bucket)
|
||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1355,7 +1361,9 @@ func (f *Fs) listBuckets(ctx context.Context, dir string) (entries fs.DirEntries
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, bucket := range resp.Buckets {
|
for _, bucket := range resp.Buckets {
|
||||||
d := fs.NewDir(aws.StringValue(bucket.Name), aws.TimeValue(bucket.CreationDate))
|
bucketName := aws.StringValue(bucket.Name)
|
||||||
|
f.cache.MarkOK(bucketName)
|
||||||
|
d := fs.NewDir(bucketName, aws.TimeValue(bucket.CreationDate))
|
||||||
entries = append(entries, d)
|
entries = append(entries, d)
|
||||||
}
|
}
|
||||||
return entries, nil
|
return entries, nil
|
||||||
|
@ -1371,10 +1379,11 @@ func (f *Fs) listBuckets(ctx context.Context, dir string) (entries fs.DirEntries
|
||||||
// This should return ErrDirNotFound if the directory isn't
|
// This should return ErrDirNotFound if the directory isn't
|
||||||
// found.
|
// found.
|
||||||
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
||||||
if f.bucket == "" {
|
bucket, directory := f.split(dir)
|
||||||
|
if bucket == "" {
|
||||||
return f.listBuckets(ctx, dir)
|
return f.listBuckets(ctx, dir)
|
||||||
}
|
}
|
||||||
return f.listDir(ctx, dir)
|
return f.listDir(ctx, bucket, directory, f.rootDirectory, f.rootBucket == "")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListR lists the objects and directories of the Fs starting
|
// ListR lists the objects and directories of the Fs starting
|
||||||
|
@ -1392,24 +1401,43 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
||||||
// immediately.
|
// immediately.
|
||||||
//
|
//
|
||||||
// Don't implement this unless you have a more efficient way
|
// Don't implement this unless you have a more efficient way
|
||||||
// of listing recursively that doing a directory traversal.
|
// of listing recursively than doing a directory traversal.
|
||||||
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
||||||
if f.bucket == "" {
|
bucket, directory := f.split(dir)
|
||||||
return fs.ErrorListBucketRequired
|
|
||||||
}
|
|
||||||
list := walk.NewListRHelper(callback)
|
list := walk.NewListRHelper(callback)
|
||||||
err = f.list(ctx, dir, true, func(remote string, object *s3.Object, isDirectory bool) error {
|
listR := func(bucket, directory, prefix string, addBucket bool) error {
|
||||||
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
return f.list(ctx, bucket, directory, prefix, addBucket, true, func(remote string, object *s3.Object, isDirectory bool) error {
|
||||||
|
entry, err := f.itemToDirEntry(ctx, remote, object, isDirectory)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return list.Add(entry)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if bucket == "" {
|
||||||
|
entries, err := f.listBuckets(ctx, "")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, entry := range entries {
|
||||||
|
err = list.Add(entry)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bucket := entry.Remote()
|
||||||
|
err = listR(bucket, "", f.rootDirectory, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = listR(bucket, directory, f.rootDirectory, f.rootBucket == "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return list.Add(entry)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
// bucket must be present if listing succeeded
|
// bucket must be present if listing succeeded
|
||||||
f.markBucketOK()
|
f.cache.MarkOK(bucket)
|
||||||
return list.Flush()
|
return list.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1431,9 +1459,9 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
|
||||||
// Check if the bucket exists
|
// Check if the bucket exists
|
||||||
//
|
//
|
||||||
// NB this can return incorrect results if called immediately after bucket deletion
|
// NB this can return incorrect results if called immediately after bucket deletion
|
||||||
func (f *Fs) dirExists(ctx context.Context) (bool, error) {
|
func (f *Fs) bucketExists(ctx context.Context, bucket string) (bool, error) {
|
||||||
req := s3.HeadBucketInput{
|
req := s3.HeadBucketInput{
|
||||||
Bucket: &f.bucket,
|
Bucket: &bucket,
|
||||||
}
|
}
|
||||||
err := f.pacer.Call(func() (bool, error) {
|
err := f.pacer.Call(func() (bool, error) {
|
||||||
_, err := f.c.HeadBucketWithContext(ctx, &req)
|
_, err := f.c.HeadBucketWithContext(ctx, &req)
|
||||||
|
@ -1452,68 +1480,56 @@ func (f *Fs) dirExists(ctx context.Context) (bool, error) {
|
||||||
|
|
||||||
// Mkdir creates the bucket if it doesn't exist
|
// Mkdir creates the bucket if it doesn't exist
|
||||||
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||||
f.bucketOKMu.Lock()
|
bucket, _ := f.split(dir)
|
||||||
defer f.bucketOKMu.Unlock()
|
return f.cache.Create(bucket, func() error {
|
||||||
if f.bucketOK {
|
req := s3.CreateBucketInput{
|
||||||
return nil
|
Bucket: &bucket,
|
||||||
}
|
ACL: &f.opt.BucketACL,
|
||||||
if !f.bucketDeleted {
|
}
|
||||||
exists, err := f.dirExists(ctx)
|
if f.opt.LocationConstraint != "" {
|
||||||
|
req.CreateBucketConfiguration = &s3.CreateBucketConfiguration{
|
||||||
|
LocationConstraint: &f.opt.LocationConstraint,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err := f.pacer.Call(func() (bool, error) {
|
||||||
|
_, err := f.c.CreateBucketWithContext(ctx, &req)
|
||||||
|
return f.shouldRetry(err)
|
||||||
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
f.bucketOK = exists
|
fs.Infof(f, "Bucket %q created with ACL %q", bucket, f.opt.BucketACL)
|
||||||
}
|
}
|
||||||
if err != nil || exists {
|
if err, ok := err.(awserr.Error); ok {
|
||||||
return err
|
if err.Code() == "BucketAlreadyOwnedByYou" {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
return nil
|
||||||
req := s3.CreateBucketInput{
|
}, func() (bool, error) {
|
||||||
Bucket: &f.bucket,
|
return f.bucketExists(ctx, bucket)
|
||||||
ACL: &f.opt.BucketACL,
|
|
||||||
}
|
|
||||||
if f.opt.LocationConstraint != "" {
|
|
||||||
req.CreateBucketConfiguration = &s3.CreateBucketConfiguration{
|
|
||||||
LocationConstraint: &f.opt.LocationConstraint,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
err := f.pacer.Call(func() (bool, error) {
|
|
||||||
_, err := f.c.CreateBucketWithContext(ctx, &req)
|
|
||||||
return f.shouldRetry(err)
|
|
||||||
})
|
})
|
||||||
if err, ok := err.(awserr.Error); ok {
|
|
||||||
if err.Code() == "BucketAlreadyOwnedByYou" {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
f.bucketOK = true
|
|
||||||
f.bucketDeleted = false
|
|
||||||
fs.Infof(f, "Bucket created with ACL %q", *req.ACL)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rmdir deletes the bucket if the fs is at the root
|
// Rmdir deletes the bucket if the fs is at the root
|
||||||
//
|
//
|
||||||
// Returns an error if it isn't empty
|
// Returns an error if it isn't empty
|
||||||
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||||||
f.bucketOKMu.Lock()
|
bucket, directory := f.split(dir)
|
||||||
defer f.bucketOKMu.Unlock()
|
if bucket == "" || directory != "" {
|
||||||
if f.root != "" || dir != "" {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
req := s3.DeleteBucketInput{
|
return f.cache.Remove(bucket, func() error {
|
||||||
Bucket: &f.bucket,
|
req := s3.DeleteBucketInput{
|
||||||
}
|
Bucket: &bucket,
|
||||||
err := f.pacer.Call(func() (bool, error) {
|
}
|
||||||
_, err := f.c.DeleteBucketWithContext(ctx, &req)
|
err := f.pacer.Call(func() (bool, error) {
|
||||||
return f.shouldRetry(err)
|
_, err := f.c.DeleteBucketWithContext(ctx, &req)
|
||||||
|
return f.shouldRetry(err)
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
fs.Infof(f, "Bucket %q deleted", bucket)
|
||||||
|
}
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
if err == nil {
|
|
||||||
f.bucketOK = false
|
|
||||||
f.bucketDeleted = true
|
|
||||||
fs.Infof(f, "Bucket deleted")
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Precision of the remote
|
// Precision of the remote
|
||||||
|
@ -1537,6 +1553,7 @@ func pathEscape(s string) string {
|
||||||
//
|
//
|
||||||
// If it isn't possible then return fs.ErrorCantCopy
|
// If it isn't possible then return fs.ErrorCantCopy
|
||||||
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||||
|
dstBucket, dstPath := f.split(remote)
|
||||||
err := f.Mkdir(ctx, "")
|
err := f.Mkdir(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -1546,13 +1563,12 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||||
fs.Debugf(src, "Can't copy - not same remote type")
|
fs.Debugf(src, "Can't copy - not same remote type")
|
||||||
return nil, fs.ErrorCantCopy
|
return nil, fs.ErrorCantCopy
|
||||||
}
|
}
|
||||||
srcFs := srcObj.fs
|
srcBucket, srcPath := srcObj.split()
|
||||||
key := f.root + remote
|
source := pathEscape(path.Join(srcBucket, srcPath))
|
||||||
source := pathEscape(srcFs.bucket + "/" + srcFs.root + srcObj.remote)
|
|
||||||
req := s3.CopyObjectInput{
|
req := s3.CopyObjectInput{
|
||||||
Bucket: &f.bucket,
|
Bucket: &dstBucket,
|
||||||
ACL: &f.opt.ACL,
|
ACL: &f.opt.ACL,
|
||||||
Key: &key,
|
Key: &dstPath,
|
||||||
CopySource: &source,
|
CopySource: &source,
|
||||||
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
|
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
|
||||||
}
|
}
|
||||||
|
@ -1640,10 +1656,10 @@ func (o *Object) readMetaData(ctx context.Context) (err error) {
|
||||||
if o.meta != nil {
|
if o.meta != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
key := o.fs.root + o.remote
|
bucket, bucketPath := o.split()
|
||||||
req := s3.HeadObjectInput{
|
req := s3.HeadObjectInput{
|
||||||
Bucket: &o.fs.bucket,
|
Bucket: &bucket,
|
||||||
Key: &key,
|
Key: &bucketPath,
|
||||||
}
|
}
|
||||||
var resp *s3.HeadObjectOutput
|
var resp *s3.HeadObjectOutput
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
|
@ -1722,13 +1738,13 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
||||||
mimeType := fs.MimeType(ctx, o)
|
mimeType := fs.MimeType(ctx, o)
|
||||||
|
|
||||||
// Copy the object to itself to update the metadata
|
// Copy the object to itself to update the metadata
|
||||||
key := o.fs.root + o.remote
|
bucket, bucketPath := o.split()
|
||||||
sourceKey := o.fs.bucket + "/" + key
|
sourceKey := path.Join(bucket, bucketPath)
|
||||||
directive := s3.MetadataDirectiveReplace // replace metadata with that passed in
|
directive := s3.MetadataDirectiveReplace // replace metadata with that passed in
|
||||||
req := s3.CopyObjectInput{
|
req := s3.CopyObjectInput{
|
||||||
Bucket: &o.fs.bucket,
|
Bucket: &bucket,
|
||||||
ACL: &o.fs.opt.ACL,
|
ACL: &o.fs.opt.ACL,
|
||||||
Key: &key,
|
Key: &bucketPath,
|
||||||
ContentType: &mimeType,
|
ContentType: &mimeType,
|
||||||
CopySource: aws.String(pathEscape(sourceKey)),
|
CopySource: aws.String(pathEscape(sourceKey)),
|
||||||
Metadata: o.meta,
|
Metadata: o.meta,
|
||||||
|
@ -1760,10 +1776,10 @@ func (o *Object) Storable() bool {
|
||||||
|
|
||||||
// Open an object for read
|
// Open an object for read
|
||||||
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.ReadCloser, err error) {
|
||||||
key := o.fs.root + o.remote
|
bucket, bucketPath := o.split()
|
||||||
req := s3.GetObjectInput{
|
req := s3.GetObjectInput{
|
||||||
Bucket: &o.fs.bucket,
|
Bucket: &bucket,
|
||||||
Key: &key,
|
Key: &bucketPath,
|
||||||
}
|
}
|
||||||
fs.FixRangeOption(options, o.bytes)
|
fs.FixRangeOption(options, o.bytes)
|
||||||
for _, option := range options {
|
for _, option := range options {
|
||||||
|
@ -1785,7 +1801,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
||||||
})
|
})
|
||||||
if err, ok := err.(awserr.RequestFailure); ok {
|
if err, ok := err.(awserr.RequestFailure); ok {
|
||||||
if err.Code() == "InvalidObjectState" {
|
if err.Code() == "InvalidObjectState" {
|
||||||
return nil, errors.Errorf("Object in GLACIER, restore first: %v", key)
|
return nil, errors.Errorf("Object in GLACIER, restore first: bucket=%q, key=%q", bucket, bucketPath)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1796,6 +1812,7 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
||||||
|
|
||||||
// Update the Object from in with modTime and size
|
// Update the Object from in with modTime and size
|
||||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
|
bucket, bucketPath := o.split()
|
||||||
err := o.fs.Mkdir(ctx, "")
|
err := o.fs.Mkdir(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -1849,13 +1866,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
|
|
||||||
// Guess the content type
|
// Guess the content type
|
||||||
mimeType := fs.MimeType(ctx, src)
|
mimeType := fs.MimeType(ctx, src)
|
||||||
|
|
||||||
key := o.fs.root + o.remote
|
|
||||||
if multipart {
|
if multipart {
|
||||||
req := s3manager.UploadInput{
|
req := s3manager.UploadInput{
|
||||||
Bucket: &o.fs.bucket,
|
Bucket: &bucket,
|
||||||
ACL: &o.fs.opt.ACL,
|
ACL: &o.fs.opt.ACL,
|
||||||
Key: &key,
|
Key: &bucketPath,
|
||||||
Body: in,
|
Body: in,
|
||||||
ContentType: &mimeType,
|
ContentType: &mimeType,
|
||||||
Metadata: metadata,
|
Metadata: metadata,
|
||||||
|
@ -1879,9 +1894,9 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
req := s3.PutObjectInput{
|
req := s3.PutObjectInput{
|
||||||
Bucket: &o.fs.bucket,
|
Bucket: &bucket,
|
||||||
ACL: &o.fs.opt.ACL,
|
ACL: &o.fs.opt.ACL,
|
||||||
Key: &key,
|
Key: &bucketPath,
|
||||||
ContentType: &mimeType,
|
ContentType: &mimeType,
|
||||||
Metadata: metadata,
|
Metadata: metadata,
|
||||||
}
|
}
|
||||||
|
@ -1954,10 +1969,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
|
|
||||||
// Remove an object
|
// Remove an object
|
||||||
func (o *Object) Remove(ctx context.Context) error {
|
func (o *Object) Remove(ctx context.Context) error {
|
||||||
key := o.fs.root + o.remote
|
bucket, bucketPath := o.split()
|
||||||
req := s3.DeleteObjectInput{
|
req := s3.DeleteObjectInput{
|
||||||
Bucket: &o.fs.bucket,
|
Bucket: &bucket,
|
||||||
Key: &key,
|
Key: &bucketPath,
|
||||||
}
|
}
|
||||||
err := o.fs.pacer.Call(func() (bool, error) {
|
err := o.fs.pacer.Call(func() (bool, error) {
|
||||||
_, err := o.fs.c.DeleteObjectWithContext(ctx, &req)
|
_, err := o.fs.c.DeleteObjectWithContext(ctx, &req)
|
||||||
|
|
Loading…
Reference in a new issue