swift: make all operations work from the root #3421
This commit is contained in:
parent
d266a171c2
commit
eaeef4811f
1 changed files with 186 additions and 165 deletions
|
@ -8,10 +8,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"path"
|
"path"
|
||||||
"regexp"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ncw/swift"
|
"github.com/ncw/swift"
|
||||||
|
@ -24,6 +22,7 @@ import (
|
||||||
"github.com/rclone/rclone/fs/hash"
|
"github.com/rclone/rclone/fs/hash"
|
||||||
"github.com/rclone/rclone/fs/operations"
|
"github.com/rclone/rclone/fs/operations"
|
||||||
"github.com/rclone/rclone/fs/walk"
|
"github.com/rclone/rclone/fs/walk"
|
||||||
|
"github.com/rclone/rclone/lib/bucket"
|
||||||
"github.com/rclone/rclone/lib/pacer"
|
"github.com/rclone/rclone/lib/pacer"
|
||||||
"github.com/rclone/rclone/lib/readers"
|
"github.com/rclone/rclone/lib/readers"
|
||||||
)
|
)
|
||||||
|
@ -208,17 +207,16 @@ type Options struct {
|
||||||
|
|
||||||
// Fs represents a remote swift server
|
// Fs represents a remote swift server
|
||||||
type Fs struct {
|
type Fs struct {
|
||||||
name string // name of this remote
|
name string // name of this remote
|
||||||
root string // the path we are working on if any
|
root string // the path we are working on if any
|
||||||
features *fs.Features // optional features
|
features *fs.Features // optional features
|
||||||
opt Options // options for this backend
|
opt Options // options for this backend
|
||||||
c *swift.Connection // the connection to the swift server
|
c *swift.Connection // the connection to the swift server
|
||||||
container string // the container we are working on
|
rootContainer string // container part of root (if any)
|
||||||
containerOKMu sync.Mutex // mutex to protect container OK
|
rootDirectory string // directory part of root (if any)
|
||||||
containerOK bool // true if we have created the container
|
cache *bucket.Cache // cache of container status
|
||||||
segmentsContainer string // container to store the segments (if any) in
|
noCheckContainer bool // don't check the container before creating it
|
||||||
noCheckContainer bool // don't check the container before creating it
|
pacer *fs.Pacer // To pace the API calls
|
||||||
pacer *fs.Pacer // To pace the API calls
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Object describes a swift object
|
// Object describes a swift object
|
||||||
|
@ -243,18 +241,18 @@ func (f *Fs) Name() string {
|
||||||
|
|
||||||
// Root of the remote (as passed into NewFs)
|
// Root of the remote (as passed into NewFs)
|
||||||
func (f *Fs) Root() string {
|
func (f *Fs) Root() string {
|
||||||
if f.root == "" {
|
return f.root
|
||||||
return f.container
|
|
||||||
}
|
|
||||||
return f.container + "/" + f.root
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// String converts this Fs to a string
|
// String converts this Fs to a string
|
||||||
func (f *Fs) String() string {
|
func (f *Fs) String() string {
|
||||||
if f.root == "" {
|
if f.rootContainer == "" {
|
||||||
return fmt.Sprintf("Swift container %s", f.container)
|
return fmt.Sprintf("Swift root")
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("Swift container %s path %s", f.container, f.root)
|
if f.rootDirectory == "" {
|
||||||
|
return fmt.Sprintf("Swift container %s", f.rootContainer)
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("Swift container %s path %s", f.rootContainer, f.rootDirectory)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Features returns the optional features of this Fs
|
// Features returns the optional features of this Fs
|
||||||
|
@ -313,21 +311,23 @@ func shouldRetryHeaders(headers swift.Headers, err error) (bool, error) {
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pattern to match a swift path
|
// parsePath parses a remote 'url'
|
||||||
var matcher = regexp.MustCompile(`^/*([^/]*)(.*)$`)
|
func parsePath(path string) (root string) {
|
||||||
|
root = strings.Trim(path, "/")
|
||||||
// parseParse parses a swift 'url'
|
|
||||||
func parsePath(path string) (container, directory string, err error) {
|
|
||||||
parts := matcher.FindStringSubmatch(path)
|
|
||||||
if parts == nil {
|
|
||||||
err = errors.Errorf("couldn't find container in swift path %q", path)
|
|
||||||
} else {
|
|
||||||
container, directory = parts[1], parts[2]
|
|
||||||
directory = strings.Trim(directory, "/")
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// split returns container and containerPath from the rootRelativePath
|
||||||
|
// relative to f.root
|
||||||
|
func (f *Fs) split(rootRelativePath string) (container, containerPath string) {
|
||||||
|
return bucket.Split(path.Join(f.root, rootRelativePath))
|
||||||
|
}
|
||||||
|
|
||||||
|
// split returns container and containerPath from the object
|
||||||
|
func (o *Object) split() (container, containerPath string) {
|
||||||
|
return o.fs.split(o.remote)
|
||||||
|
}
|
||||||
|
|
||||||
// swiftConnection makes a connection to swift
|
// swiftConnection makes a connection to swift
|
||||||
func swiftConnection(opt *Options, name string) (*swift.Connection, error) {
|
func swiftConnection(opt *Options, name string) (*swift.Connection, error) {
|
||||||
c := &swift.Connection{
|
c := &swift.Connection{
|
||||||
|
@ -410,47 +410,48 @@ func (f *Fs) setUploadChunkSize(cs fs.SizeSuffix) (old fs.SizeSuffix, err error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setRoot changes the root of the Fs
|
||||||
|
func (f *Fs) setRoot(root string) {
|
||||||
|
f.root = parsePath(root)
|
||||||
|
f.rootContainer, f.rootDirectory = bucket.Split(f.root)
|
||||||
|
}
|
||||||
|
|
||||||
// NewFsWithConnection constructs an Fs from the path, container:path
|
// NewFsWithConnection constructs an Fs from the path, container:path
|
||||||
// and authenticated connection.
|
// and authenticated connection.
|
||||||
//
|
//
|
||||||
// if noCheckContainer is set then the Fs won't check the container
|
// if noCheckContainer is set then the Fs won't check the container
|
||||||
// exists before creating it.
|
// exists before creating it.
|
||||||
func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, noCheckContainer bool) (fs.Fs, error) {
|
func NewFsWithConnection(opt *Options, name, root string, c *swift.Connection, noCheckContainer bool) (fs.Fs, error) {
|
||||||
container, directory, err := parsePath(root)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
f := &Fs{
|
f := &Fs{
|
||||||
name: name,
|
name: name,
|
||||||
opt: *opt,
|
opt: *opt,
|
||||||
c: c,
|
c: c,
|
||||||
container: container,
|
noCheckContainer: noCheckContainer,
|
||||||
segmentsContainer: container + "_segments",
|
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))),
|
||||||
root: directory,
|
cache: bucket.NewCache(),
|
||||||
noCheckContainer: noCheckContainer,
|
|
||||||
pacer: fs.NewPacer(pacer.NewS3(pacer.MinSleep(minSleep))),
|
|
||||||
}
|
}
|
||||||
|
f.setRoot(root)
|
||||||
f.features = (&fs.Features{
|
f.features = (&fs.Features{
|
||||||
ReadMimeType: true,
|
ReadMimeType: true,
|
||||||
WriteMimeType: true,
|
WriteMimeType: true,
|
||||||
BucketBased: true,
|
BucketBased: true,
|
||||||
|
BucketBasedRootOK: true,
|
||||||
}).Fill(f)
|
}).Fill(f)
|
||||||
if f.root != "" {
|
if f.rootContainer != "" && f.rootDirectory != "" {
|
||||||
f.root += "/"
|
|
||||||
// Check to see if the object exists - ignoring directory markers
|
// Check to see if the object exists - ignoring directory markers
|
||||||
var info swift.Object
|
var info swift.Object
|
||||||
|
var err error
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
info, rxHeaders, err = f.c.Object(container, directory)
|
info, rxHeaders, err = f.c.Object(f.rootContainer, f.rootDirectory)
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err == nil && info.ContentType != directoryMarkerContentType {
|
if err == nil && info.ContentType != directoryMarkerContentType {
|
||||||
f.root = path.Dir(directory)
|
newRoot := path.Dir(f.root)
|
||||||
if f.root == "." {
|
if newRoot == "." {
|
||||||
f.root = ""
|
newRoot = ""
|
||||||
} else {
|
|
||||||
f.root += "/"
|
|
||||||
}
|
}
|
||||||
|
f.setRoot(newRoot)
|
||||||
// return an error with an fs which points to the parent
|
// return an error with an fs which points to the parent
|
||||||
return f, fs.ErrorIsFile
|
return f, fs.ErrorIsFile
|
||||||
}
|
}
|
||||||
|
@ -518,23 +519,26 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||||
type listFn func(remote string, object *swift.Object, isDirectory bool) error
|
type listFn func(remote string, object *swift.Object, isDirectory bool) error
|
||||||
|
|
||||||
// listContainerRoot lists the objects into the function supplied from
|
// listContainerRoot lists the objects into the function supplied from
|
||||||
// the container and root supplied
|
// the container and directory supplied. The remote has prefix
|
||||||
|
// removed from it and if addContainer is set then it adds the
|
||||||
|
// container to the start.
|
||||||
//
|
//
|
||||||
// Set recurse to read sub directories
|
// Set recurse to read sub directories
|
||||||
func (f *Fs) listContainerRoot(container, root string, dir string, recurse bool, fn listFn) error {
|
func (f *Fs) listContainerRoot(container, directory, prefix string, addContainer bool, recurse bool, fn listFn) error {
|
||||||
prefix := root
|
if prefix != "" {
|
||||||
if dir != "" {
|
prefix += "/"
|
||||||
prefix += dir + "/"
|
}
|
||||||
|
if directory != "" {
|
||||||
|
directory += "/"
|
||||||
}
|
}
|
||||||
// Options for ObjectsWalk
|
// Options for ObjectsWalk
|
||||||
opts := swift.ObjectsOpts{
|
opts := swift.ObjectsOpts{
|
||||||
Prefix: prefix,
|
Prefix: directory,
|
||||||
Limit: listChunks,
|
Limit: listChunks,
|
||||||
}
|
}
|
||||||
if !recurse {
|
if !recurse {
|
||||||
opts.Delimiter = '/'
|
opts.Delimiter = '/'
|
||||||
}
|
}
|
||||||
rootLength := len(root)
|
|
||||||
return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
|
return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) {
|
||||||
var objects []swift.Object
|
var objects []swift.Object
|
||||||
var err error
|
var err error
|
||||||
|
@ -559,7 +563,10 @@ func (f *Fs) listContainerRoot(container, root string, dir string, recurse bool,
|
||||||
// duplicate directories. Ignore them here.
|
// duplicate directories. Ignore them here.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
remote := object.Name[rootLength:]
|
remote := object.Name[len(prefix):]
|
||||||
|
if addContainer {
|
||||||
|
remote = path.Join(container, remote)
|
||||||
|
}
|
||||||
err = fn(remote, object, isDirectory)
|
err = fn(remote, object, isDirectory)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
|
@ -573,8 +580,8 @@ func (f *Fs) listContainerRoot(container, root string, dir string, recurse bool,
|
||||||
type addEntryFn func(fs.DirEntry) error
|
type addEntryFn func(fs.DirEntry) error
|
||||||
|
|
||||||
// list the objects into the function supplied
|
// list the objects into the function supplied
|
||||||
func (f *Fs) list(dir string, recurse bool, fn addEntryFn) error {
|
func (f *Fs) list(container, directory, prefix string, addContainer bool, recurse bool, fn addEntryFn) error {
|
||||||
err := f.listContainerRoot(f.container, f.root, dir, recurse, func(remote string, object *swift.Object, isDirectory bool) (err error) {
|
err := f.listContainerRoot(container, directory, prefix, addContainer, recurse, func(remote string, object *swift.Object, isDirectory bool) (err error) {
|
||||||
if isDirectory {
|
if isDirectory {
|
||||||
remote = strings.TrimRight(remote, "/")
|
remote = strings.TrimRight(remote, "/")
|
||||||
d := fs.NewDir(remote, time.Time{}).SetSize(object.Bytes)
|
d := fs.NewDir(remote, time.Time{}).SetSize(object.Bytes)
|
||||||
|
@ -598,22 +605,13 @@ func (f *Fs) list(dir string, recurse bool, fn addEntryFn) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// mark the container as being OK
|
|
||||||
func (f *Fs) markContainerOK() {
|
|
||||||
if f.container != "" {
|
|
||||||
f.containerOKMu.Lock()
|
|
||||||
f.containerOK = true
|
|
||||||
f.containerOKMu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// listDir lists a single directory
|
// listDir lists a single directory
|
||||||
func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) listDir(container, directory, prefix string, addContainer bool) (entries fs.DirEntries, err error) {
|
||||||
if f.container == "" {
|
if container == "" {
|
||||||
return nil, fs.ErrorListBucketRequired
|
return nil, fs.ErrorListBucketRequired
|
||||||
}
|
}
|
||||||
// List the objects
|
// List the objects
|
||||||
err = f.list(dir, false, func(entry fs.DirEntry) error {
|
err = f.list(container, directory, prefix, addContainer, false, func(entry fs.DirEntry) error {
|
||||||
entries = append(entries, entry)
|
entries = append(entries, entry)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
@ -621,7 +619,7 @@ func (f *Fs) listDir(dir string) (entries fs.DirEntries, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// container must be present if listing succeeded
|
// container must be present if listing succeeded
|
||||||
f.markContainerOK()
|
f.cache.MarkOK(container)
|
||||||
return entries, nil
|
return entries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -639,6 +637,7 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
|
||||||
return nil, errors.Wrap(err, "container listing failed")
|
return nil, errors.Wrap(err, "container listing failed")
|
||||||
}
|
}
|
||||||
for _, container := range containers {
|
for _, container := range containers {
|
||||||
|
f.cache.MarkOK(container.Name)
|
||||||
d := fs.NewDir(container.Name, time.Time{}).SetSize(container.Bytes).SetItems(container.Count)
|
d := fs.NewDir(container.Name, time.Time{}).SetSize(container.Bytes).SetItems(container.Count)
|
||||||
entries = append(entries, d)
|
entries = append(entries, d)
|
||||||
}
|
}
|
||||||
|
@ -655,10 +654,11 @@ func (f *Fs) listContainers(dir string) (entries fs.DirEntries, err error) {
|
||||||
// This should return ErrDirNotFound if the directory isn't
|
// This should return ErrDirNotFound if the directory isn't
|
||||||
// found.
|
// found.
|
||||||
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
||||||
if f.container == "" {
|
container, directory := f.split(dir)
|
||||||
return f.listContainers(dir)
|
if container == "" {
|
||||||
|
return f.listContainers(directory)
|
||||||
}
|
}
|
||||||
return f.listDir(dir)
|
return f.listDir(container, directory, f.rootDirectory, f.rootContainer == "")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListR lists the objects and directories of the Fs starting
|
// ListR lists the objects and directories of the Fs starting
|
||||||
|
@ -676,20 +676,39 @@ func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err e
|
||||||
// immediately.
|
// immediately.
|
||||||
//
|
//
|
||||||
// Don't implement this unless you have a more efficient way
|
// Don't implement this unless you have a more efficient way
|
||||||
// of listing recursively that doing a directory traversal.
|
// of listing recursively than doing a directory traversal.
|
||||||
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
||||||
if f.container == "" {
|
container, directory := f.split(dir)
|
||||||
return errors.New("container needed for recursive list")
|
|
||||||
}
|
|
||||||
list := walk.NewListRHelper(callback)
|
list := walk.NewListRHelper(callback)
|
||||||
err = f.list(dir, true, func(entry fs.DirEntry) error {
|
listR := func(container, directory, prefix string, addContainer bool) error {
|
||||||
return list.Add(entry)
|
return f.list(container, directory, prefix, addContainer, true, func(entry fs.DirEntry) error {
|
||||||
})
|
return list.Add(entry)
|
||||||
if err != nil {
|
})
|
||||||
return err
|
}
|
||||||
|
if container == "" {
|
||||||
|
entries, err := f.listContainers("")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, entry := range entries {
|
||||||
|
err = list.Add(entry)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
container := entry.Remote()
|
||||||
|
err = listR(container, "", f.rootDirectory, true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = listR(container, directory, f.rootDirectory, f.rootContainer == "")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// container must be present if listing succeeded
|
// container must be present if listing succeeded
|
||||||
f.markContainerOK()
|
f.cache.MarkOK(container)
|
||||||
return list.Flush()
|
return list.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -738,57 +757,52 @@ func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, opt
|
||||||
|
|
||||||
// Mkdir creates the container if it doesn't exist
|
// Mkdir creates the container if it doesn't exist
|
||||||
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||||
f.containerOKMu.Lock()
|
container, _ := f.split(dir)
|
||||||
defer f.containerOKMu.Unlock()
|
return f.cache.Create(container, func() error {
|
||||||
if f.containerOK {
|
// Check to see if container exists first
|
||||||
return nil
|
var err error = swift.ContainerNotFound
|
||||||
}
|
if !f.noCheckContainer {
|
||||||
// if we are at the root, then it is OK
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
if f.container == "" {
|
var rxHeaders swift.Headers
|
||||||
return nil
|
_, rxHeaders, err = f.c.Container(container)
|
||||||
}
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
// Check to see if container exists first
|
})
|
||||||
var err error = swift.ContainerNotFound
|
|
||||||
if !f.noCheckContainer {
|
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
|
||||||
var rxHeaders swift.Headers
|
|
||||||
_, rxHeaders, err = f.c.Container(f.container)
|
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if err == swift.ContainerNotFound {
|
|
||||||
headers := swift.Headers{}
|
|
||||||
if f.opt.StoragePolicy != "" {
|
|
||||||
headers["X-Storage-Policy"] = f.opt.StoragePolicy
|
|
||||||
}
|
}
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
if err == swift.ContainerNotFound {
|
||||||
err = f.c.ContainerCreate(f.container, headers)
|
headers := swift.Headers{}
|
||||||
return shouldRetry(err)
|
if f.opt.StoragePolicy != "" {
|
||||||
})
|
headers["X-Storage-Policy"] = f.opt.StoragePolicy
|
||||||
}
|
}
|
||||||
if err == nil {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
f.containerOK = true
|
err = f.c.ContainerCreate(container, headers)
|
||||||
}
|
return shouldRetry(err)
|
||||||
return err
|
})
|
||||||
|
if err == nil {
|
||||||
|
fs.Infof(f, "Container %q created", container)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rmdir deletes the container if the fs is at the root
|
// Rmdir deletes the container if the fs is at the root
|
||||||
//
|
//
|
||||||
// Returns an error if it isn't empty
|
// Returns an error if it isn't empty
|
||||||
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||||||
f.containerOKMu.Lock()
|
container, directory := f.split(dir)
|
||||||
defer f.containerOKMu.Unlock()
|
if container == "" || directory != "" {
|
||||||
if f.root != "" || dir != "" {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var err error
|
err := f.cache.Remove(container, func() error {
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err := f.pacer.Call(func() (bool, error) {
|
||||||
err = f.c.ContainerDelete(f.container)
|
err := f.c.ContainerDelete(container)
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
|
})
|
||||||
|
if err == nil {
|
||||||
|
fs.Infof(f, "Container %q removed", container)
|
||||||
|
}
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
if err == nil {
|
|
||||||
f.containerOK = false
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -807,7 +821,7 @@ func (f *Fs) Purge(ctx context.Context) error {
|
||||||
go func() {
|
go func() {
|
||||||
delErr <- operations.DeleteFiles(ctx, toBeDeleted)
|
delErr <- operations.DeleteFiles(ctx, toBeDeleted)
|
||||||
}()
|
}()
|
||||||
err := f.list("", true, func(entry fs.DirEntry) error {
|
err := f.list(f.rootContainer, f.rootDirectory, f.rootDirectory, f.rootContainer == "", true, func(entry fs.DirEntry) error {
|
||||||
if o, ok := entry.(*Object); ok {
|
if o, ok := entry.(*Object); ok {
|
||||||
toBeDeleted <- o
|
toBeDeleted <- o
|
||||||
}
|
}
|
||||||
|
@ -834,6 +848,7 @@ func (f *Fs) Purge(ctx context.Context) error {
|
||||||
//
|
//
|
||||||
// If it isn't possible then return fs.ErrorCantCopy
|
// If it isn't possible then return fs.ErrorCantCopy
|
||||||
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||||
|
dstContainer, dstPath := f.split(remote)
|
||||||
err := f.Mkdir(ctx, "")
|
err := f.Mkdir(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -843,10 +858,10 @@ func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object,
|
||||||
fs.Debugf(src, "Can't copy - not same remote type")
|
fs.Debugf(src, "Can't copy - not same remote type")
|
||||||
return nil, fs.ErrorCantCopy
|
return nil, fs.ErrorCantCopy
|
||||||
}
|
}
|
||||||
srcFs := srcObj.fs
|
srcContainer, srcPath := srcObj.split()
|
||||||
err = f.pacer.Call(func() (bool, error) {
|
err = f.pacer.Call(func() (bool, error) {
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
rxHeaders, err = f.c.ObjectCopy(srcFs.container, srcFs.root+srcObj.remote, f.container, f.root+remote, nil)
|
rxHeaders, err = f.c.ObjectCopy(srcContainer, srcPath, dstContainer, dstPath, nil)
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -955,8 +970,9 @@ func (o *Object) readMetaData() (err error) {
|
||||||
}
|
}
|
||||||
var info swift.Object
|
var info swift.Object
|
||||||
var h swift.Headers
|
var h swift.Headers
|
||||||
|
container, containerPath := o.split()
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
info, h, err = o.fs.c.Object(o.fs.container, o.fs.root+o.remote)
|
info, h, err = o.fs.c.Object(container, containerPath)
|
||||||
return shouldRetryHeaders(h, err)
|
return shouldRetryHeaders(h, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1013,8 +1029,9 @@ func (o *Object) SetModTime(ctx context.Context, modTime time.Time) error {
|
||||||
newHeaders[k] = v
|
newHeaders[k] = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
container, containerPath := o.split()
|
||||||
return o.fs.pacer.Call(func() (bool, error) {
|
return o.fs.pacer.Call(func() (bool, error) {
|
||||||
err = o.fs.c.ObjectUpdate(o.fs.container, o.fs.root+o.remote, newHeaders)
|
err = o.fs.c.ObjectUpdate(container, containerPath, newHeaders)
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1032,9 +1049,10 @@ func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (in io.Read
|
||||||
fs.FixRangeOption(options, o.size)
|
fs.FixRangeOption(options, o.size)
|
||||||
headers := fs.OpenOptionHeaders(options)
|
headers := fs.OpenOptionHeaders(options)
|
||||||
_, isRanging := headers["Range"]
|
_, isRanging := headers["Range"]
|
||||||
|
container, containerPath := o.split()
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
in, rxHeaders, err = o.fs.c.ObjectOpen(o.fs.container, o.fs.root+o.remote, !isRanging, headers)
|
in, rxHeaders, err = o.fs.c.ObjectOpen(container, containerPath, !isRanging, headers)
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
|
@ -1052,20 +1070,20 @@ func min(x, y int64) int64 {
|
||||||
//
|
//
|
||||||
// if except is passed in then segments with that prefix won't be deleted
|
// if except is passed in then segments with that prefix won't be deleted
|
||||||
func (o *Object) removeSegments(except string) error {
|
func (o *Object) removeSegments(except string) error {
|
||||||
segmentsRoot := o.fs.root + o.remote + "/"
|
container, containerPath := o.split()
|
||||||
err := o.fs.listContainerRoot(o.fs.segmentsContainer, segmentsRoot, "", true, func(remote string, object *swift.Object, isDirectory bool) error {
|
segmentsContainer := container + "_segments"
|
||||||
|
err := o.fs.listContainerRoot(segmentsContainer, containerPath, "", false, true, func(remote string, object *swift.Object, isDirectory bool) error {
|
||||||
if isDirectory {
|
if isDirectory {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if except != "" && strings.HasPrefix(remote, except) {
|
if except != "" && strings.HasPrefix(remote, except) {
|
||||||
// fs.Debugf(o, "Ignoring current segment file %q in container %q", segmentsRoot+remote, o.fs.segmentsContainer)
|
// fs.Debugf(o, "Ignoring current segment file %q in container %q", segmentsRoot+remote, segmentsContainer)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
segmentPath := segmentsRoot + remote
|
fs.Debugf(o, "Removing segment file %q in container %q", remote, segmentsContainer)
|
||||||
fs.Debugf(o, "Removing segment file %q in container %q", segmentPath, o.fs.segmentsContainer)
|
|
||||||
var err error
|
var err error
|
||||||
return o.fs.pacer.Call(func() (bool, error) {
|
return o.fs.pacer.Call(func() (bool, error) {
|
||||||
err = o.fs.c.ObjectDelete(o.fs.segmentsContainer, segmentPath)
|
err = o.fs.c.ObjectDelete(segmentsContainer, remote)
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -1074,11 +1092,11 @@ func (o *Object) removeSegments(except string) error {
|
||||||
}
|
}
|
||||||
// remove the segments container if empty, ignore errors
|
// remove the segments container if empty, ignore errors
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
err = o.fs.c.ContainerDelete(o.fs.segmentsContainer)
|
err = o.fs.c.ContainerDelete(segmentsContainer)
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
fs.Debugf(o, "Removed empty container %q", o.fs.segmentsContainer)
|
fs.Debugf(o, "Removed empty container %q", segmentsContainer)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1103,11 +1121,13 @@ func urlEncode(str string) string {
|
||||||
// updateChunks updates the existing object using chunks to a separate
|
// updateChunks updates the existing object using chunks to a separate
|
||||||
// container. It returns a string which prefixes current segments.
|
// container. It returns a string which prefixes current segments.
|
||||||
func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) {
|
func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64, contentType string) (string, error) {
|
||||||
|
container, containerPath := o.split()
|
||||||
|
segmentsContainer := container + "_segments"
|
||||||
// Create the segmentsContainer if it doesn't exist
|
// Create the segmentsContainer if it doesn't exist
|
||||||
var err error
|
var err error
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
_, rxHeaders, err = o.fs.c.Container(o.fs.segmentsContainer)
|
_, rxHeaders, err = o.fs.c.Container(segmentsContainer)
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err == swift.ContainerNotFound {
|
if err == swift.ContainerNotFound {
|
||||||
|
@ -1116,7 +1136,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
headers["X-Storage-Policy"] = o.fs.opt.StoragePolicy
|
headers["X-Storage-Policy"] = o.fs.opt.StoragePolicy
|
||||||
}
|
}
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
err = o.fs.c.ContainerCreate(o.fs.segmentsContainer, headers)
|
err = o.fs.c.ContainerCreate(segmentsContainer, headers)
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1127,7 +1147,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
left := size
|
left := size
|
||||||
i := 0
|
i := 0
|
||||||
uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size)
|
uniquePrefix := fmt.Sprintf("%s/%d", swift.TimeToFloatString(time.Now()), size)
|
||||||
segmentsPath := fmt.Sprintf("%s%s/%s", o.fs.root, o.remote, uniquePrefix)
|
segmentsPath := path.Join(containerPath, uniquePrefix)
|
||||||
in := bufio.NewReader(in0)
|
in := bufio.NewReader(in0)
|
||||||
segmentInfos := make([]string, 0, ((size / int64(o.fs.opt.ChunkSize)) + 1))
|
segmentInfos := make([]string, 0, ((size / int64(o.fs.opt.ChunkSize)) + 1))
|
||||||
for {
|
for {
|
||||||
|
@ -1136,7 +1156,7 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
if left > 0 {
|
if left > 0 {
|
||||||
return "", err // read less than expected
|
return "", err // read less than expected
|
||||||
}
|
}
|
||||||
fs.Debugf(o, "Uploading segments into %q seems done (%v)", o.fs.segmentsContainer, err)
|
fs.Debugf(o, "Uploading segments into %q seems done (%v)", segmentsContainer, err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
n := int64(o.fs.opt.ChunkSize)
|
n := int64(o.fs.opt.ChunkSize)
|
||||||
|
@ -1147,46 +1167,45 @@ func (o *Object) updateChunks(in0 io.Reader, headers swift.Headers, size int64,
|
||||||
}
|
}
|
||||||
segmentReader := io.LimitReader(in, n)
|
segmentReader := io.LimitReader(in, n)
|
||||||
segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
|
segmentPath := fmt.Sprintf("%s/%08d", segmentsPath, i)
|
||||||
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, o.fs.segmentsContainer)
|
fs.Debugf(o, "Uploading segment file %q into %q", segmentPath, segmentsContainer)
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
rxHeaders, err = o.fs.c.ObjectPut(o.fs.segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
rxHeaders, err = o.fs.c.ObjectPut(segmentsContainer, segmentPath, segmentReader, true, "", "", headers)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
segmentInfos = append(segmentInfos, segmentPath)
|
segmentInfos = append(segmentInfos, segmentPath)
|
||||||
}
|
}
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
deleteChunks(o, segmentInfos)
|
deleteChunks(o, segmentsContainer, segmentInfos)
|
||||||
segmentInfos = nil
|
segmentInfos = nil
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
// Upload the manifest
|
// Upload the manifest
|
||||||
headers["X-Object-Manifest"] = urlEncode(fmt.Sprintf("%s/%s", o.fs.segmentsContainer, segmentsPath))
|
headers["X-Object-Manifest"] = urlEncode(fmt.Sprintf("%s/%s", segmentsContainer, segmentsPath))
|
||||||
headers["Content-Length"] = "0" // set Content-Length as we know it
|
headers["Content-Length"] = "0" // set Content-Length as we know it
|
||||||
emptyReader := bytes.NewReader(nil)
|
emptyReader := bytes.NewReader(nil)
|
||||||
manifestName := o.fs.root + o.remote
|
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, manifestName, emptyReader, true, "", contentType, headers)
|
rxHeaders, err = o.fs.c.ObjectPut(container, containerPath, emptyReader, true, "", contentType, headers)
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
deleteChunks(o, segmentInfos)
|
deleteChunks(o, segmentsContainer, segmentInfos)
|
||||||
segmentInfos = nil
|
segmentInfos = nil
|
||||||
}
|
}
|
||||||
return uniquePrefix + "/", err
|
return uniquePrefix + "/", err
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteChunks(o *Object, segmentInfos []string) {
|
func deleteChunks(o *Object, segmentsContainer string, segmentInfos []string) {
|
||||||
if segmentInfos != nil && len(segmentInfos) > 0 {
|
if segmentInfos != nil && len(segmentInfos) > 0 {
|
||||||
for _, v := range segmentInfos {
|
for _, v := range segmentInfos {
|
||||||
fs.Debugf(o, "Delete segment file %q on %q", v, o.fs.segmentsContainer)
|
fs.Debugf(o, "Delete segment file %q on %q", v, segmentsContainer)
|
||||||
e := o.fs.c.ObjectDelete(o.fs.segmentsContainer, v)
|
e := o.fs.c.ObjectDelete(segmentsContainer, v)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
fs.Errorf(o, "Error occured in delete segment file %q on %q , error: %q", v, o.fs.segmentsContainer, e)
|
fs.Errorf(o, "Error occured in delete segment file %q on %q , error: %q", v, segmentsContainer, e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1196,8 +1215,9 @@ func deleteChunks(o *Object, segmentInfos []string) {
|
||||||
//
|
//
|
||||||
// The new object may have been created if an error is returned
|
// The new object may have been created if an error is returned
|
||||||
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
|
||||||
if o.fs.container == "" {
|
container, containerPath := o.split()
|
||||||
return fserrors.FatalError(errors.New("container name needed in remote"))
|
if container == "" {
|
||||||
|
return fserrors.FatalError(errors.New("can't upload files to the root"))
|
||||||
}
|
}
|
||||||
err := o.fs.Mkdir(ctx, "")
|
err := o.fs.Mkdir(ctx, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1235,7 +1255,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
}
|
}
|
||||||
var rxHeaders swift.Headers
|
var rxHeaders swift.Headers
|
||||||
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
err = o.fs.pacer.CallNoRetry(func() (bool, error) {
|
||||||
rxHeaders, err = o.fs.c.ObjectPut(o.fs.container, o.fs.root+o.remote, in, true, "", contentType, headers)
|
rxHeaders, err = o.fs.c.ObjectPut(container, containerPath, in, true, "", contentType, headers)
|
||||||
return shouldRetryHeaders(rxHeaders, err)
|
return shouldRetryHeaders(rxHeaders, err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1268,13 +1288,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
|
||||||
|
|
||||||
// Remove an object
|
// Remove an object
|
||||||
func (o *Object) Remove(ctx context.Context) error {
|
func (o *Object) Remove(ctx context.Context) error {
|
||||||
|
container, containerPath := o.split()
|
||||||
isDynamicLargeObject, err := o.isDynamicLargeObject()
|
isDynamicLargeObject, err := o.isDynamicLargeObject()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Remove file/manifest first
|
// Remove file/manifest first
|
||||||
err = o.fs.pacer.Call(func() (bool, error) {
|
err = o.fs.pacer.Call(func() (bool, error) {
|
||||||
err = o.fs.c.ObjectDelete(o.fs.container, o.fs.root+o.remote)
|
err = o.fs.c.ObjectDelete(container, containerPath)
|
||||||
return shouldRetry(err)
|
return shouldRetry(err)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue