combine: backend to combine multiple remotes in one directory tree
Fixes #5600
This commit is contained in:
parent
e58d75e4d7
commit
4b358ff43b
11 changed files with 1304 additions and 0 deletions
13
README.md
13
README.md
|
@ -83,6 +83,19 @@ Rclone *("rsync for cloud storage")* is a command-line program to sync files and
|
||||||
|
|
||||||
Please see [the full list of all storage providers and their features](https://rclone.org/overview/)
|
Please see [the full list of all storage providers and their features](https://rclone.org/overview/)
|
||||||
|
|
||||||
|
### Virtual storage providers
|
||||||
|
|
||||||
|
These backends adapt or modify other storage providers
|
||||||
|
|
||||||
|
* Alias: rename existing remotes [:page_facing_up:](https://rclone.org/alias/)
|
||||||
|
* Cache: cache remotes (DEPRECATED) [:page_facing_up:](https://rclone.org/cache/)
|
||||||
|
* Chunker: split large files [:page_facing_up:](https://rclone.org/chunker/)
|
||||||
|
* Combine: combine multiple remotes into a directory tree [:page_facing_up:](https://rclone.org/combine/)
|
||||||
|
* Compress: compress files [:page_facing_up:](https://rclone.org/compress/)
|
||||||
|
* Crypt: encrypt files [:page_facing_up:](https://rclone.org/crypt/)
|
||||||
|
* Hasher: hash files [:page_facing_up:](https://rclone.org/hasher/)
|
||||||
|
* Union: join multiple remotes to work together [:page_facing_up:](https://rclone.org/union/)
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
* MD5/SHA-1 hashes checked at all times for file integrity
|
* MD5/SHA-1 hashes checked at all times for file integrity
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
_ "github.com/rclone/rclone/backend/box"
|
_ "github.com/rclone/rclone/backend/box"
|
||||||
_ "github.com/rclone/rclone/backend/cache"
|
_ "github.com/rclone/rclone/backend/cache"
|
||||||
_ "github.com/rclone/rclone/backend/chunker"
|
_ "github.com/rclone/rclone/backend/chunker"
|
||||||
|
_ "github.com/rclone/rclone/backend/combine"
|
||||||
_ "github.com/rclone/rclone/backend/compress"
|
_ "github.com/rclone/rclone/backend/compress"
|
||||||
_ "github.com/rclone/rclone/backend/crypt"
|
_ "github.com/rclone/rclone/backend/crypt"
|
||||||
_ "github.com/rclone/rclone/backend/drive"
|
_ "github.com/rclone/rclone/backend/drive"
|
||||||
|
|
941
backend/combine/combine.go
Normal file
941
backend/combine/combine.go
Normal file
|
@ -0,0 +1,941 @@
|
||||||
|
// Package combine implents a backend to combine multipe remotes in a directory tree
|
||||||
|
package combine
|
||||||
|
|
||||||
|
/*
|
||||||
|
Have API to add/remove branches in the combine
|
||||||
|
*/
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/rclone/rclone/fs"
|
||||||
|
"github.com/rclone/rclone/fs/cache"
|
||||||
|
"github.com/rclone/rclone/fs/config/configmap"
|
||||||
|
"github.com/rclone/rclone/fs/config/configstruct"
|
||||||
|
"github.com/rclone/rclone/fs/hash"
|
||||||
|
"github.com/rclone/rclone/fs/operations"
|
||||||
|
"github.com/rclone/rclone/fs/walk"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Register with Fs
|
||||||
|
func init() {
|
||||||
|
fsi := &fs.RegInfo{
|
||||||
|
Name: "combine",
|
||||||
|
Description: "Combine several remotes into one",
|
||||||
|
NewFs: NewFs,
|
||||||
|
Options: []fs.Option{{
|
||||||
|
Name: "upstreams",
|
||||||
|
Help: `Upstreams for combining
|
||||||
|
|
||||||
|
These should be in the form
|
||||||
|
|
||||||
|
dir=remote:path dir2=remote2:path
|
||||||
|
|
||||||
|
Where before the = is specified the root directory and after is the remote to
|
||||||
|
put there.
|
||||||
|
|
||||||
|
Embedded spaces can be added using quotes
|
||||||
|
|
||||||
|
"dir=remote:path with space" "dir2=remote2:path with space"
|
||||||
|
|
||||||
|
`,
|
||||||
|
Required: true,
|
||||||
|
Default: fs.SpaceSepList(nil),
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
fs.Register(fsi)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Options defines the configuration for this backend
|
||||||
|
type Options struct {
|
||||||
|
Upstreams fs.SpaceSepList `config:"upstreams"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fs represents a combine of upstreams
|
||||||
|
type Fs struct {
|
||||||
|
name string // name of this remote
|
||||||
|
features *fs.Features // optional features
|
||||||
|
opt Options // options for this Fs
|
||||||
|
root string // the path we are working on
|
||||||
|
hashSet hash.Set // common hashes
|
||||||
|
when time.Time // directory times
|
||||||
|
upstreams map[string]*upstream // map of upstreams
|
||||||
|
}
|
||||||
|
|
||||||
|
// adjustment stores the info to add a prefix to a path or chop characters off
|
||||||
|
type adjustment struct {
|
||||||
|
root string
|
||||||
|
rootSlash string
|
||||||
|
mountpoint string
|
||||||
|
mountpointSlash string
|
||||||
|
}
|
||||||
|
|
||||||
|
// newAdjustment makes a new path adjustment adjusting between mountpoint and root
|
||||||
|
//
|
||||||
|
// mountpoint is the point the upstream is mounted and root is the combine root
|
||||||
|
func newAdjustment(root, mountpoint string) (a adjustment) {
|
||||||
|
return adjustment{
|
||||||
|
root: root,
|
||||||
|
rootSlash: root + "/",
|
||||||
|
mountpoint: mountpoint,
|
||||||
|
mountpointSlash: mountpoint + "/",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var errNotUnderRoot = errors.New("file not under root")
|
||||||
|
|
||||||
|
// do makes the adjustment on s, mapping an upstream path into a combine path
|
||||||
|
func (a *adjustment) do(s string) (string, error) {
|
||||||
|
absPath := join(a.mountpoint, s)
|
||||||
|
if a.root == "" {
|
||||||
|
return absPath, nil
|
||||||
|
}
|
||||||
|
if absPath == a.root {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(absPath, a.rootSlash) {
|
||||||
|
return "", errNotUnderRoot
|
||||||
|
}
|
||||||
|
return absPath[len(a.rootSlash):], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// undo makes the adjustment on s, mapping a combine path into an upstream path
|
||||||
|
func (a *adjustment) undo(s string) (string, error) {
|
||||||
|
absPath := join(a.root, s)
|
||||||
|
if absPath == a.mountpoint {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(absPath, a.mountpointSlash) {
|
||||||
|
return "", errNotUnderRoot
|
||||||
|
}
|
||||||
|
return absPath[len(a.mountpointSlash):], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// upstream represents an upstream Fs
|
||||||
|
type upstream struct {
|
||||||
|
f fs.Fs
|
||||||
|
parent *Fs
|
||||||
|
dir string // directory the upstream is mounted
|
||||||
|
pathAdjustment adjustment // how to fiddle with the path
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an upstream from the directory it is mounted on and the remote
|
||||||
|
func (f *Fs) newUpstream(ctx context.Context, dir, remote string) (*upstream, error) {
|
||||||
|
uFs, err := cache.Get(ctx, remote)
|
||||||
|
if err == fs.ErrorIsFile {
|
||||||
|
return nil, fmt.Errorf("can't combine files yet, only directories %q: %w", remote, err)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create upstream %q: %w", remote, err)
|
||||||
|
}
|
||||||
|
u := &upstream{
|
||||||
|
f: uFs,
|
||||||
|
parent: f,
|
||||||
|
dir: dir,
|
||||||
|
pathAdjustment: newAdjustment(f.root, dir),
|
||||||
|
}
|
||||||
|
return u, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFs constructs an Fs from the path.
|
||||||
|
//
|
||||||
|
// The returned Fs is the actual Fs, referenced by remote in the config
|
||||||
|
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (outFs fs.Fs, err error) {
|
||||||
|
// defer log.Trace(nil, "name=%q, root=%q, m=%v", name, root, m)("f=%+v, err=%v", &outFs, &err)
|
||||||
|
// Parse config into Options struct
|
||||||
|
opt := new(Options)
|
||||||
|
err = configstruct.Set(m, opt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Backward compatible to old config
|
||||||
|
if len(opt.Upstreams) == 0 {
|
||||||
|
return nil, errors.New("combine can't point to an empty upstream - check the value of the upstreams setting")
|
||||||
|
}
|
||||||
|
for _, u := range opt.Upstreams {
|
||||||
|
if strings.HasPrefix(u, name+":") {
|
||||||
|
return nil, errors.New("can't point combine remote at itself - check the value of the upstreams setting")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
isDir := false
|
||||||
|
for strings.HasSuffix(root, "/") {
|
||||||
|
root = root[:len(root)-1]
|
||||||
|
isDir = true
|
||||||
|
}
|
||||||
|
|
||||||
|
f := &Fs{
|
||||||
|
name: name,
|
||||||
|
root: root,
|
||||||
|
opt: *opt,
|
||||||
|
upstreams: make(map[string]*upstream, len(opt.Upstreams)),
|
||||||
|
when: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
g, gCtx := errgroup.WithContext(ctx)
|
||||||
|
var mu sync.Mutex
|
||||||
|
for _, upstream := range opt.Upstreams {
|
||||||
|
upstream := upstream
|
||||||
|
g.Go(func() (err error) {
|
||||||
|
equal := strings.IndexRune(upstream, '=')
|
||||||
|
if equal < 0 {
|
||||||
|
return fmt.Errorf("no \"=\" in upstream definition %q", upstream)
|
||||||
|
}
|
||||||
|
dir, remote := upstream[:equal], upstream[equal+1:]
|
||||||
|
if dir == "" {
|
||||||
|
return fmt.Errorf("empty dir in upstream definition %q", upstream)
|
||||||
|
}
|
||||||
|
if remote == "" {
|
||||||
|
return fmt.Errorf("empty remote in upstream definition %q", upstream)
|
||||||
|
}
|
||||||
|
if strings.IndexRune(dir, '/') >= 0 {
|
||||||
|
return fmt.Errorf("dirs can't contain / (yet): %q", dir)
|
||||||
|
}
|
||||||
|
u, err := f.newUpstream(gCtx, dir, remote)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
mu.Lock()
|
||||||
|
f.upstreams[dir] = u
|
||||||
|
mu.Unlock()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err = g.Wait()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// check features
|
||||||
|
var features = (&fs.Features{
|
||||||
|
CaseInsensitive: true,
|
||||||
|
DuplicateFiles: false,
|
||||||
|
ReadMimeType: true,
|
||||||
|
WriteMimeType: true,
|
||||||
|
CanHaveEmptyDirectories: true,
|
||||||
|
BucketBased: true,
|
||||||
|
SetTier: true,
|
||||||
|
GetTier: true,
|
||||||
|
}).Fill(ctx, f)
|
||||||
|
canMove := true
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
features = features.Mask(ctx, u.f) // Mask all upstream fs
|
||||||
|
if !operations.CanServerSideMove(u.f) {
|
||||||
|
canMove = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We can move if all remotes support Move or Copy
|
||||||
|
if canMove {
|
||||||
|
features.Move = f.Move
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enable ListR when upstreams either support ListR or is local
|
||||||
|
// But not when all upstreams are local
|
||||||
|
if features.ListR == nil {
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
if u.f.Features().ListR != nil {
|
||||||
|
features.ListR = f.ListR
|
||||||
|
} else if !u.f.Features().IsLocal {
|
||||||
|
features.ListR = nil
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enable Purge when any upstreams support it
|
||||||
|
if features.Purge == nil {
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
if u.f.Features().Purge != nil {
|
||||||
|
features.Purge = f.Purge
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enable Shutdown when any upstreams support it
|
||||||
|
if features.Shutdown == nil {
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
if u.f.Features().Shutdown != nil {
|
||||||
|
features.Shutdown = f.Shutdown
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enable DirCacheFlush when any upstreams support it
|
||||||
|
if features.DirCacheFlush == nil {
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
if u.f.Features().DirCacheFlush != nil {
|
||||||
|
features.DirCacheFlush = f.DirCacheFlush
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enable ChangeNotify when any upstreams support it
|
||||||
|
if features.ChangeNotify == nil {
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
if u.f.Features().ChangeNotify != nil {
|
||||||
|
features.ChangeNotify = f.ChangeNotify
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
f.features = features
|
||||||
|
|
||||||
|
// Get common intersection of hashes
|
||||||
|
var hashSet hash.Set
|
||||||
|
var first = true
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
if first {
|
||||||
|
hashSet = u.f.Hashes()
|
||||||
|
first = false
|
||||||
|
} else {
|
||||||
|
hashSet = hashSet.Overlap(u.f.Hashes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f.hashSet = hashSet
|
||||||
|
|
||||||
|
// Check to see if the root is actually a file
|
||||||
|
if f.root != "" && !isDir {
|
||||||
|
_, err := f.NewObject(ctx, "")
|
||||||
|
if err != nil {
|
||||||
|
if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile || err == fs.ErrorIsDir {
|
||||||
|
// File doesn't exist or is a directory so return old f
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check to see if the root path is actually an existing file
|
||||||
|
f.root = path.Dir(f.root)
|
||||||
|
if f.root == "." {
|
||||||
|
f.root = ""
|
||||||
|
}
|
||||||
|
// Adjust path adjustment to remove leaf
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
u.pathAdjustment = newAdjustment(f.root, u.dir)
|
||||||
|
}
|
||||||
|
return f, fs.ErrorIsFile
|
||||||
|
}
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run a function over all the upstreams in parallel
|
||||||
|
func (f *Fs) multithread(ctx context.Context, fn func(context.Context, *upstream) error) error {
|
||||||
|
g, gCtx := errgroup.WithContext(ctx)
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
u := u
|
||||||
|
g.Go(func() (err error) {
|
||||||
|
return fn(gCtx, u)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return g.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// join the elements together but unline path.Join return empty string
|
||||||
|
func join(elem ...string) string {
|
||||||
|
result := path.Join(elem...)
|
||||||
|
if result == "." {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if len(result) > 0 && result[0] == '/' {
|
||||||
|
result = result[1:]
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// find the upstream for the remote passed in, returning the upstream and the adjusted path
|
||||||
|
func (f *Fs) findUpstream(remote string) (u *upstream, uRemote string, err error) {
|
||||||
|
// defer log.Trace(remote, "")("f=%v, uRemote=%q, err=%v", &u, &uRemote, &err)
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
uRemote, err = u.pathAdjustment.undo(remote)
|
||||||
|
if err == nil {
|
||||||
|
return u, uRemote, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, "", fmt.Errorf("combine for remote %q: %w", remote, fs.ErrorDirNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name of the remote (as passed into NewFs)
|
||||||
|
func (f *Fs) Name() string {
|
||||||
|
return f.name
|
||||||
|
}
|
||||||
|
|
||||||
|
// Root of the remote (as passed into NewFs)
|
||||||
|
func (f *Fs) Root() string {
|
||||||
|
return f.root
|
||||||
|
}
|
||||||
|
|
||||||
|
// String converts this Fs to a string
|
||||||
|
func (f *Fs) String() string {
|
||||||
|
return fmt.Sprintf("combine root '%s'", f.root)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Features returns the optional features of this Fs
|
||||||
|
func (f *Fs) Features() *fs.Features {
|
||||||
|
return f.features
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rmdir removes the root directory of the Fs object
|
||||||
|
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
|
||||||
|
// The root always exists
|
||||||
|
if f.root == "" && dir == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
u, uRemote, err := f.findUpstream(dir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return u.f.Rmdir(ctx, uRemote)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hashes returns hash.HashNone to indicate remote hashing is unavailable
|
||||||
|
func (f *Fs) Hashes() hash.Set {
|
||||||
|
return f.hashSet
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mkdir makes the root directory of the Fs object
|
||||||
|
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
|
||||||
|
// The root always exists
|
||||||
|
if f.root == "" && dir == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
u, uRemote, err := f.findUpstream(dir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return u.f.Mkdir(ctx, uRemote)
|
||||||
|
}
|
||||||
|
|
||||||
|
// purge the upstream or fallback to a slow way
|
||||||
|
func (u *upstream) purge(ctx context.Context, dir string) (err error) {
|
||||||
|
if do := u.f.Features().Purge; do != nil {
|
||||||
|
err = do(ctx, dir)
|
||||||
|
} else {
|
||||||
|
err = operations.Purge(ctx, u.f, dir)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Purge all files in the directory
|
||||||
|
//
|
||||||
|
// Implement this if you have a way of deleting all the files
|
||||||
|
// quicker than just running Remove() on the result of List()
|
||||||
|
//
|
||||||
|
// Return an error if it doesn't exist
|
||||||
|
func (f *Fs) Purge(ctx context.Context, dir string) error {
|
||||||
|
if f.root == "" && dir == "" {
|
||||||
|
return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
|
||||||
|
return u.purge(ctx, "")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
u, uRemote, err := f.findUpstream(dir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return u.purge(ctx, uRemote)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy src to this remote using server-side copy operations.
|
||||||
|
//
|
||||||
|
// This is stored with the remote path given
|
||||||
|
//
|
||||||
|
// It returns the destination Object and a possible error
|
||||||
|
//
|
||||||
|
// Will only be called if src.Fs().Name() == f.Name()
|
||||||
|
//
|
||||||
|
// If it isn't possible then return fs.ErrorCantCopy
|
||||||
|
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||||
|
srcObj, ok := src.(*Object)
|
||||||
|
if !ok {
|
||||||
|
fs.Debugf(src, "Can't copy - not same remote type")
|
||||||
|
return nil, fs.ErrorCantCopy
|
||||||
|
}
|
||||||
|
|
||||||
|
dstU, dstRemote, err := f.findUpstream(remote)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
do := dstU.f.Features().Copy
|
||||||
|
if do == nil {
|
||||||
|
return nil, fs.ErrorCantCopy
|
||||||
|
}
|
||||||
|
|
||||||
|
o, err := do(ctx, srcObj.Object, dstRemote)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return dstU.newObject(o), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move src to this remote using server-side move operations.
|
||||||
|
//
|
||||||
|
// This is stored with the remote path given
|
||||||
|
//
|
||||||
|
// It returns the destination Object and a possible error
|
||||||
|
//
|
||||||
|
// Will only be called if src.Fs().Name() == f.Name()
|
||||||
|
//
|
||||||
|
// If it isn't possible then return fs.ErrorCantMove
|
||||||
|
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
|
||||||
|
srcObj, ok := src.(*Object)
|
||||||
|
if !ok {
|
||||||
|
fs.Debugf(src, "Can't move - not same remote type")
|
||||||
|
return nil, fs.ErrorCantMove
|
||||||
|
}
|
||||||
|
|
||||||
|
dstU, dstRemote, err := f.findUpstream(remote)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
do := dstU.f.Features().Move
|
||||||
|
useCopy := false
|
||||||
|
if do == nil {
|
||||||
|
do = dstU.f.Features().Copy
|
||||||
|
if do == nil {
|
||||||
|
return nil, fs.ErrorCantMove
|
||||||
|
}
|
||||||
|
useCopy = true
|
||||||
|
}
|
||||||
|
|
||||||
|
o, err := do(ctx, srcObj.Object, dstRemote)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If did Copy then remove the source object
|
||||||
|
if useCopy {
|
||||||
|
err = srcObj.Remove(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return dstU.newObject(o), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DirMove moves src, srcRemote to this remote at dstRemote
|
||||||
|
// using server-side move operations.
|
||||||
|
//
|
||||||
|
// Will only be called if src.Fs().Name() == f.Name()
|
||||||
|
//
|
||||||
|
// If it isn't possible then return fs.ErrorCantDirMove
|
||||||
|
//
|
||||||
|
// If destination exists then return fs.ErrorDirExists
|
||||||
|
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) (err error) {
|
||||||
|
// defer log.Trace(f, "src=%v, srcRemote=%q, dstRemote=%q", src, srcRemote, dstRemote)("err=%v", &err)
|
||||||
|
srcFs, ok := src.(*Fs)
|
||||||
|
if !ok {
|
||||||
|
fs.Debugf(src, "Can't move directory - not same remote type")
|
||||||
|
return fs.ErrorCantDirMove
|
||||||
|
}
|
||||||
|
|
||||||
|
dstU, dstURemote, err := f.findUpstream(dstRemote)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
srcU, srcURemote, err := srcFs.findUpstream(srcRemote)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
do := dstU.f.Features().DirMove
|
||||||
|
if do == nil {
|
||||||
|
return fs.ErrorCantDirMove
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.Logf(dstU.f, "srcU.f=%v, srcURemote=%q, dstURemote=%q", srcU.f, srcURemote, dstURemote)
|
||||||
|
return do(ctx, srcU.f, srcURemote, dstURemote)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ChangeNotify calls the passed function with a path
|
||||||
|
// that has had changes. If the implementation
|
||||||
|
// uses polling, it should adhere to the given interval.
|
||||||
|
// At least one value will be written to the channel,
|
||||||
|
// specifying the initial value and updated values might
|
||||||
|
// follow. A 0 Duration should pause the polling.
|
||||||
|
// The ChangeNotify implementation must empty the channel
|
||||||
|
// regularly. When the channel gets closed, the implementation
|
||||||
|
// should stop polling and release resources.
|
||||||
|
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), ch <-chan time.Duration) {
|
||||||
|
var uChans []chan time.Duration
|
||||||
|
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
u := u
|
||||||
|
if do := u.f.Features().ChangeNotify; do != nil {
|
||||||
|
ch := make(chan time.Duration)
|
||||||
|
uChans = append(uChans, ch)
|
||||||
|
wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
|
||||||
|
newPath, err := u.pathAdjustment.do(path)
|
||||||
|
if err != nil {
|
||||||
|
fs.Logf(f, "ChangeNotify: unable to process %q: %s", path, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fs.Debugf(f, "ChangeNotify: path %q entryType %d", newPath, entryType)
|
||||||
|
notifyFunc(newPath, entryType)
|
||||||
|
}
|
||||||
|
do(ctx, wrappedNotifyFunc, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for i := range ch {
|
||||||
|
for _, c := range uChans {
|
||||||
|
c <- i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, c := range uChans {
|
||||||
|
close(c)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// DirCacheFlush resets the directory cache - used in testing
|
||||||
|
// as an optional interface
|
||||||
|
func (f *Fs) DirCacheFlush() {
|
||||||
|
ctx := context.Background()
|
||||||
|
_ = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
|
||||||
|
if do := u.f.Features().DirCacheFlush; do != nil {
|
||||||
|
do()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
|
srcPath := src.Remote()
|
||||||
|
u, uRemote, err := f.findUpstream(srcPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
uSrc := operations.NewOverrideRemote(src, uRemote)
|
||||||
|
var o fs.Object
|
||||||
|
if stream {
|
||||||
|
o, err = u.f.Features().PutStream(ctx, in, uSrc, options...)
|
||||||
|
} else {
|
||||||
|
o, err = u.f.Put(ctx, in, uSrc, options...)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return u.newObject(o), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put in to the remote path with the modTime given of the given size
|
||||||
|
//
|
||||||
|
// May create the object even if it returns an error - if so
|
||||||
|
// will return the object and the error, otherwise will return
|
||||||
|
// nil and the error
|
||||||
|
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
|
o, err := f.NewObject(ctx, src.Remote())
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
return o, o.Update(ctx, in, src, options...)
|
||||||
|
case fs.ErrorObjectNotFound:
|
||||||
|
return f.put(ctx, in, src, false, options...)
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PutStream uploads to the remote path with the modTime given of indeterminate size
|
||||||
|
//
|
||||||
|
// May create the object even if it returns an error - if so
|
||||||
|
// will return the object and the error, otherwise will return
|
||||||
|
// nil and the error
|
||||||
|
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
|
||||||
|
o, err := f.NewObject(ctx, src.Remote())
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
return o, o.Update(ctx, in, src, options...)
|
||||||
|
case fs.ErrorObjectNotFound:
|
||||||
|
return f.put(ctx, in, src, true, options...)
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// About gets quota information from the Fs
|
||||||
|
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
|
||||||
|
usage := &fs.Usage{
|
||||||
|
Total: new(int64),
|
||||||
|
Used: new(int64),
|
||||||
|
Trashed: new(int64),
|
||||||
|
Other: new(int64),
|
||||||
|
Free: new(int64),
|
||||||
|
Objects: new(int64),
|
||||||
|
}
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
doAbout := u.f.Features().About
|
||||||
|
if doAbout == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
usg, err := doAbout(ctx)
|
||||||
|
if errors.Is(err, fs.ErrorDirNotFound) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if usg.Total != nil && usage.Total != nil {
|
||||||
|
*usage.Total += *usg.Total
|
||||||
|
} else {
|
||||||
|
usage.Total = nil
|
||||||
|
}
|
||||||
|
if usg.Used != nil && usage.Used != nil {
|
||||||
|
*usage.Used += *usg.Used
|
||||||
|
} else {
|
||||||
|
usage.Used = nil
|
||||||
|
}
|
||||||
|
if usg.Trashed != nil && usage.Trashed != nil {
|
||||||
|
*usage.Trashed += *usg.Trashed
|
||||||
|
} else {
|
||||||
|
usage.Trashed = nil
|
||||||
|
}
|
||||||
|
if usg.Other != nil && usage.Other != nil {
|
||||||
|
*usage.Other += *usg.Other
|
||||||
|
} else {
|
||||||
|
usage.Other = nil
|
||||||
|
}
|
||||||
|
if usg.Free != nil && usage.Free != nil {
|
||||||
|
*usage.Free += *usg.Free
|
||||||
|
} else {
|
||||||
|
usage.Free = nil
|
||||||
|
}
|
||||||
|
if usg.Objects != nil && usage.Objects != nil {
|
||||||
|
*usage.Objects += *usg.Objects
|
||||||
|
} else {
|
||||||
|
usage.Objects = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return usage, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wraps entries for this upstream
|
||||||
|
func (u *upstream) wrapEntries(ctx context.Context, entries fs.DirEntries) (fs.DirEntries, error) {
|
||||||
|
for i, entry := range entries {
|
||||||
|
switch x := entry.(type) {
|
||||||
|
case fs.Object:
|
||||||
|
entries[i] = u.newObject(x)
|
||||||
|
case fs.Directory:
|
||||||
|
newDir := fs.NewDirCopy(ctx, x)
|
||||||
|
newPath, err := u.pathAdjustment.do(newDir.Remote())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
newDir.SetRemote(newPath)
|
||||||
|
entries[i] = newDir
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown entry type %T", entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return entries, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// List the objects and directories in dir into entries. The
|
||||||
|
// entries can be returned in any order but should be for a
|
||||||
|
// complete directory.
|
||||||
|
//
|
||||||
|
// dir should be "" to list the root, and should not have
|
||||||
|
// trailing slashes.
|
||||||
|
//
|
||||||
|
// This should return ErrDirNotFound if the directory isn't
|
||||||
|
// found.
|
||||||
|
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
|
||||||
|
// defer log.Trace(f, "dir=%q", dir)("entries = %v, err=%v", &entries, &err)
|
||||||
|
if f.root == "" && dir == "" {
|
||||||
|
entries = make(fs.DirEntries, 0, len(f.upstreams))
|
||||||
|
for combineDir := range f.upstreams {
|
||||||
|
d := fs.NewDir(combineDir, f.when)
|
||||||
|
entries = append(entries, d)
|
||||||
|
}
|
||||||
|
return entries, nil
|
||||||
|
}
|
||||||
|
u, uRemote, err := f.findUpstream(dir)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
entries, err = u.f.List(ctx, uRemote)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return u.wrapEntries(ctx, entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListR lists the objects and directories of the Fs starting
|
||||||
|
// from dir recursively into out.
|
||||||
|
//
|
||||||
|
// dir should be "" to start from the root, and should not
|
||||||
|
// have trailing slashes.
|
||||||
|
//
|
||||||
|
// This should return ErrDirNotFound if the directory isn't
|
||||||
|
// found.
|
||||||
|
//
|
||||||
|
// It should call callback for each tranche of entries read.
|
||||||
|
// These need not be returned in any particular order. If
|
||||||
|
// callback returns an error then the listing will stop
|
||||||
|
// immediately.
|
||||||
|
//
|
||||||
|
// Don't implement this unless you have a more efficient way
|
||||||
|
// of listing recursively that doing a directory traversal.
|
||||||
|
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
||||||
|
// defer log.Trace(f, "dir=%q, callback=%v", dir, callback)("err=%v", &err)
|
||||||
|
if f.root == "" && dir == "" {
|
||||||
|
rootEntries, err := f.List(ctx, "")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = callback(rootEntries)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var mu sync.Mutex
|
||||||
|
syncCallback := func(entries fs.DirEntries) error {
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
return callback(entries)
|
||||||
|
}
|
||||||
|
err = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
|
||||||
|
return f.ListR(ctx, u.dir, syncCallback)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
u, uRemote, err := f.findUpstream(dir)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
wrapCallback := func(entries fs.DirEntries) error {
|
||||||
|
entries, err := u.wrapEntries(ctx, entries)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return callback(entries)
|
||||||
|
}
|
||||||
|
if do := u.f.Features().ListR; do != nil {
|
||||||
|
err = do(ctx, uRemote, wrapCallback)
|
||||||
|
} else {
|
||||||
|
err = walk.ListR(ctx, u.f, uRemote, true, -1, walk.ListAll, wrapCallback)
|
||||||
|
}
|
||||||
|
if err == fs.ErrorDirNotFound {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewObject creates a new remote combine file object
|
||||||
|
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
|
||||||
|
u, uRemote, err := f.findUpstream(remote)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if uRemote == "" || strings.HasSuffix(uRemote, "/") {
|
||||||
|
return nil, fs.ErrorIsDir
|
||||||
|
}
|
||||||
|
o, err := u.f.NewObject(ctx, uRemote)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return u.newObject(o), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Precision is the greatest Precision of all upstreams
|
||||||
|
func (f *Fs) Precision() time.Duration {
|
||||||
|
var greatestPrecision time.Duration
|
||||||
|
for _, u := range f.upstreams {
|
||||||
|
uPrecision := u.f.Precision()
|
||||||
|
if uPrecision > greatestPrecision {
|
||||||
|
greatestPrecision = uPrecision
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return greatestPrecision
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown the backend, closing any background tasks and any
|
||||||
|
// cached connections.
|
||||||
|
func (f *Fs) Shutdown(ctx context.Context) error {
|
||||||
|
return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
|
||||||
|
if do := u.f.Features().Shutdown; do != nil {
|
||||||
|
return do(ctx)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Object describes a wrapped Object
|
||||||
|
//
|
||||||
|
// This is a wrapped Object which knows its path prefix
|
||||||
|
type Object struct {
|
||||||
|
fs.Object
|
||||||
|
u *upstream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *upstream) newObject(o fs.Object) *Object {
|
||||||
|
return &Object{
|
||||||
|
Object: o,
|
||||||
|
u: u,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fs returns read only access to the Fs that this object is part of
|
||||||
|
func (o *Object) Fs() fs.Info {
|
||||||
|
return o.u.parent
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns the remote path
|
||||||
|
func (o *Object) String() string {
|
||||||
|
return o.Remote()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remote returns the remote path
|
||||||
|
func (o *Object) Remote() string {
|
||||||
|
newPath, err := o.u.pathAdjustment.do(o.Object.String())
|
||||||
|
if err != nil {
|
||||||
|
fs.Errorf(o, "Bad object: %v", err)
|
||||||
|
return err.Error()
|
||||||
|
}
|
||||||
|
return newPath
|
||||||
|
}
|
||||||
|
|
||||||
|
// MimeType returns the content type of the Object if known
|
||||||
|
func (o *Object) MimeType(ctx context.Context) (mimeType string) {
|
||||||
|
if do, ok := o.Object.(fs.MimeTyper); ok {
|
||||||
|
mimeType = do.MimeType(ctx)
|
||||||
|
}
|
||||||
|
return mimeType
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnWrap returns the Object that this Object is wrapping or
|
||||||
|
// nil if it isn't wrapping anything
|
||||||
|
func (o *Object) UnWrap() fs.Object {
|
||||||
|
return o.Object
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the interfaces are satisfied
|
||||||
|
var (
|
||||||
|
_ fs.Fs = (*Fs)(nil)
|
||||||
|
_ fs.Purger = (*Fs)(nil)
|
||||||
|
_ fs.PutStreamer = (*Fs)(nil)
|
||||||
|
_ fs.Copier = (*Fs)(nil)
|
||||||
|
_ fs.Mover = (*Fs)(nil)
|
||||||
|
_ fs.DirMover = (*Fs)(nil)
|
||||||
|
_ fs.DirCacheFlusher = (*Fs)(nil)
|
||||||
|
_ fs.ChangeNotifier = (*Fs)(nil)
|
||||||
|
_ fs.Abouter = (*Fs)(nil)
|
||||||
|
_ fs.ListRer = (*Fs)(nil)
|
||||||
|
_ fs.Shutdowner = (*Fs)(nil)
|
||||||
|
)
|
94
backend/combine/combine_internal_test.go
Normal file
94
backend/combine/combine_internal_test.go
Normal file
|
@ -0,0 +1,94 @@
|
||||||
|
package combine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAdjustmentDo(t *testing.T) {
|
||||||
|
for _, test := range []struct {
|
||||||
|
root string
|
||||||
|
mountpoint string
|
||||||
|
in string
|
||||||
|
want string
|
||||||
|
wantErr error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
root: "",
|
||||||
|
mountpoint: "mountpoint",
|
||||||
|
in: "path/to/file.txt",
|
||||||
|
want: "mountpoint/path/to/file.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
root: "mountpoint",
|
||||||
|
mountpoint: "mountpoint",
|
||||||
|
in: "path/to/file.txt",
|
||||||
|
want: "path/to/file.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
root: "mountpoint/path",
|
||||||
|
mountpoint: "mountpoint",
|
||||||
|
in: "path/to/file.txt",
|
||||||
|
want: "to/file.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
root: "mountpoint/path",
|
||||||
|
mountpoint: "mountpoint",
|
||||||
|
in: "wrongpath/to/file.txt",
|
||||||
|
want: "",
|
||||||
|
wantErr: errNotUnderRoot,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
what := fmt.Sprintf("%+v", test)
|
||||||
|
a := newAdjustment(test.root, test.mountpoint)
|
||||||
|
got, gotErr := a.do(test.in)
|
||||||
|
assert.Equal(t, test.wantErr, gotErr)
|
||||||
|
assert.Equal(t, test.want, got, what)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAdjustmentUndo(t *testing.T) {
|
||||||
|
for _, test := range []struct {
|
||||||
|
root string
|
||||||
|
mountpoint string
|
||||||
|
in string
|
||||||
|
want string
|
||||||
|
wantErr error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
root: "",
|
||||||
|
mountpoint: "mountpoint",
|
||||||
|
in: "mountpoint/path/to/file.txt",
|
||||||
|
want: "path/to/file.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
root: "mountpoint",
|
||||||
|
mountpoint: "mountpoint",
|
||||||
|
in: "path/to/file.txt",
|
||||||
|
want: "path/to/file.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
root: "mountpoint/path",
|
||||||
|
mountpoint: "mountpoint",
|
||||||
|
in: "to/file.txt",
|
||||||
|
want: "path/to/file.txt",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
root: "wrongmountpoint/path",
|
||||||
|
mountpoint: "mountpoint",
|
||||||
|
in: "to/file.txt",
|
||||||
|
want: "",
|
||||||
|
wantErr: errNotUnderRoot,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
what := fmt.Sprintf("%+v", test)
|
||||||
|
a := newAdjustment(test.root, test.mountpoint)
|
||||||
|
got, gotErr := a.undo(test.in)
|
||||||
|
assert.Equal(t, test.wantErr, gotErr)
|
||||||
|
assert.Equal(t, test.want, got, what)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
79
backend/combine/combine_test.go
Normal file
79
backend/combine/combine_test.go
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
// Test Combine filesystem interface
|
||||||
|
package combine_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
_ "github.com/rclone/rclone/backend/local"
|
||||||
|
_ "github.com/rclone/rclone/backend/memory"
|
||||||
|
"github.com/rclone/rclone/fstest"
|
||||||
|
"github.com/rclone/rclone/fstest/fstests"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestIntegration runs integration tests against the remote
|
||||||
|
func TestIntegration(t *testing.T) {
|
||||||
|
if *fstest.RemoteName == "" {
|
||||||
|
t.Skip("Skipping as -remote not set")
|
||||||
|
}
|
||||||
|
fstests.Run(t, &fstests.Opt{
|
||||||
|
RemoteName: *fstest.RemoteName,
|
||||||
|
UnimplementableFsMethods: []string{"OpenWriterAt", "DuplicateFiles"},
|
||||||
|
UnimplementableObjectMethods: []string{"MimeType"},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLocal(t *testing.T) {
|
||||||
|
if *fstest.RemoteName != "" {
|
||||||
|
t.Skip("Skipping as -remote set")
|
||||||
|
}
|
||||||
|
dirs := MakeTestDirs(t, 3)
|
||||||
|
upstreams := "dir1=" + dirs[0] + " dir2=" + dirs[1] + " dir3=" + dirs[2]
|
||||||
|
name := "TestCombineLocal"
|
||||||
|
fstests.Run(t, &fstests.Opt{
|
||||||
|
RemoteName: name + ":dir1",
|
||||||
|
ExtraConfig: []fstests.ExtraConfigItem{
|
||||||
|
{Name: name, Key: "type", Value: "combine"},
|
||||||
|
{Name: name, Key: "upstreams", Value: upstreams},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMemory(t *testing.T) {
|
||||||
|
if *fstest.RemoteName != "" {
|
||||||
|
t.Skip("Skipping as -remote set")
|
||||||
|
}
|
||||||
|
upstreams := "dir1=:memory:dir1 dir2=:memory:dir2 dir3=:memory:dir3"
|
||||||
|
name := "TestCombineMemory"
|
||||||
|
fstests.Run(t, &fstests.Opt{
|
||||||
|
RemoteName: name + ":dir1",
|
||||||
|
ExtraConfig: []fstests.ExtraConfigItem{
|
||||||
|
{Name: name, Key: "type", Value: "combine"},
|
||||||
|
{Name: name, Key: "upstreams", Value: upstreams},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMixed(t *testing.T) {
|
||||||
|
if *fstest.RemoteName != "" {
|
||||||
|
t.Skip("Skipping as -remote set")
|
||||||
|
}
|
||||||
|
dirs := MakeTestDirs(t, 2)
|
||||||
|
upstreams := "dir1=" + dirs[0] + " dir2=" + dirs[1] + " dir3=:memory:dir3"
|
||||||
|
name := "TestCombineMixed"
|
||||||
|
fstests.Run(t, &fstests.Opt{
|
||||||
|
RemoteName: name + ":dir1",
|
||||||
|
ExtraConfig: []fstests.ExtraConfigItem{
|
||||||
|
{Name: name, Key: "type", Value: "combine"},
|
||||||
|
{Name: name, Key: "upstreams", Value: upstreams},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// MakeTestDirs makes directories in /tmp for testing
|
||||||
|
func MakeTestDirs(t *testing.T, n int) (dirs []string) {
|
||||||
|
for i := 1; i <= n; i++ {
|
||||||
|
dir := t.TempDir()
|
||||||
|
dirs = append(dirs, dir)
|
||||||
|
}
|
||||||
|
return dirs
|
||||||
|
}
|
|
@ -38,6 +38,7 @@ docs = [
|
||||||
"sharefile.md",
|
"sharefile.md",
|
||||||
"crypt.md",
|
"crypt.md",
|
||||||
"compress.md",
|
"compress.md",
|
||||||
|
"combine.md",
|
||||||
"dropbox.md",
|
"dropbox.md",
|
||||||
"filefabric.md",
|
"filefabric.md",
|
||||||
"ftp.md",
|
"ftp.md",
|
||||||
|
|
|
@ -170,6 +170,20 @@ WebDAV or S3, that work out of the box.)
|
||||||
{{< provider name="The local filesystem" home="/local/" config="/local/" end="true">}}
|
{{< provider name="The local filesystem" home="/local/" config="/local/" end="true">}}
|
||||||
{{< /provider_list >}}
|
{{< /provider_list >}}
|
||||||
|
|
||||||
|
## Virtual providers
|
||||||
|
|
||||||
|
These backends adapt or modify other storage providers:
|
||||||
|
|
||||||
|
{{< provider name="Alias: rename existing remotes" home="/alias/" config="/alias/" >}}
|
||||||
|
{{< provider name="Cache: cache remotes (DEPRECATED)" home="/cache/" config="/cache/" >}}
|
||||||
|
{{< provider name="Chunker: split large files" home="/chunker/" config="/chunker/" >}}
|
||||||
|
{{< provider name="Combine: combine multiple remotes into a directory tree" home="/combine/" config="/combine/" >}}
|
||||||
|
{{< provider name="Compress: compress files" home="/compress/" config="/compress/" >}}
|
||||||
|
{{< provider name="Crypt: encrypt files" home="/crypt/" config="/crypt/" >}}
|
||||||
|
{{< provider name="Hasher: hash files" home="/hasher/" config="/hasher/" >}}
|
||||||
|
{{< provider name="Union: join multiple remotes to work together" home="/union/" config="/union/" >}}
|
||||||
|
|
||||||
|
|
||||||
## Links
|
## Links
|
||||||
|
|
||||||
* {{< icon "fa fa-home" >}} [Home page](https://rclone.org/)
|
* {{< icon "fa fa-home" >}} [Home page](https://rclone.org/)
|
||||||
|
|
156
docs/content/combine.md
Normal file
156
docs/content/combine.md
Normal file
|
@ -0,0 +1,156 @@
|
||||||
|
---
|
||||||
|
title: "Combine"
|
||||||
|
description: "Combine several remotes into one"
|
||||||
|
---
|
||||||
|
|
||||||
|
# {{< icon "fa fa-folder-plus" >}} Combine
|
||||||
|
|
||||||
|
The `combine` backend joins remotes together into a single directory
|
||||||
|
tree.
|
||||||
|
|
||||||
|
For example you might have a remote for images on one provider:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ rclone tree s3:imagesbucket
|
||||||
|
/
|
||||||
|
├── image1.jpg
|
||||||
|
└── image2.jpg
|
||||||
|
```
|
||||||
|
|
||||||
|
And a remote for files on another:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ rclone tree drive:important/files
|
||||||
|
/
|
||||||
|
├── file1.txt
|
||||||
|
└── file2.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
The `combine` backend can join these together into a synthetic
|
||||||
|
directory structure like this:
|
||||||
|
|
||||||
|
```
|
||||||
|
$ rclone tree combined:
|
||||||
|
/
|
||||||
|
├── files
|
||||||
|
│ ├── file1.txt
|
||||||
|
│ └── file2.txt
|
||||||
|
└── images
|
||||||
|
├── image1.jpg
|
||||||
|
└── image2.jpg
|
||||||
|
```
|
||||||
|
|
||||||
|
You'd do this by specifying an `upstreams` parameter in the config
|
||||||
|
like this
|
||||||
|
|
||||||
|
upstreams = images=s3:imagesbucket files=drive:important/files
|
||||||
|
|
||||||
|
During the initial setup with `rclone config` you will specify the
|
||||||
|
upstreams remotes as a space separated list. The upstream remotes can
|
||||||
|
either be a local paths or other remotes.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Here is an example of how to make a combine called `remote` for the
|
||||||
|
example above. First run:
|
||||||
|
|
||||||
|
rclone config
|
||||||
|
|
||||||
|
This will guide you through an interactive setup process:
|
||||||
|
|
||||||
|
```
|
||||||
|
No remotes found, make a new one?
|
||||||
|
n) New remote
|
||||||
|
s) Set configuration password
|
||||||
|
q) Quit config
|
||||||
|
n/s/q> n
|
||||||
|
name> remote
|
||||||
|
Option Storage.
|
||||||
|
Type of storage to configure.
|
||||||
|
Choose a number from below, or type in your own value.
|
||||||
|
...
|
||||||
|
XX / Combine several remotes into one
|
||||||
|
\ (combine)
|
||||||
|
...
|
||||||
|
Storage> combine
|
||||||
|
Option upstreams.
|
||||||
|
Upstreams for combining
|
||||||
|
These should be in the form
|
||||||
|
dir=remote:path dir2=remote2:path
|
||||||
|
Where before the = is specified the root directory and after is the remote to
|
||||||
|
put there.
|
||||||
|
Embedded spaces can be added using quotes
|
||||||
|
"dir=remote:path with space" "dir2=remote2:path with space"
|
||||||
|
Enter a fs.SpaceSepList value.
|
||||||
|
upstreams> images=s3:imagesbucket files=drive:important/files
|
||||||
|
--------------------
|
||||||
|
[remote]
|
||||||
|
type = combine
|
||||||
|
upstreams = images=s3:imagesbucket files=drive:important/files
|
||||||
|
--------------------
|
||||||
|
y) Yes this is OK (default)
|
||||||
|
e) Edit this remote
|
||||||
|
d) Delete this remote
|
||||||
|
y/e/d> y
|
||||||
|
```
|
||||||
|
|
||||||
|
### Configuring for Google Drive Shared Drives
|
||||||
|
|
||||||
|
Rclone has a convenience feature for making a combine backend for all
|
||||||
|
the shared drives you have access to.
|
||||||
|
|
||||||
|
Assuming your main (non shared drive) Google drive remote is called
|
||||||
|
`drive:` you would run
|
||||||
|
|
||||||
|
rclone backend -o config drives drive:
|
||||||
|
|
||||||
|
This would produce something like this:
|
||||||
|
|
||||||
|
[My Drive]
|
||||||
|
type = alias
|
||||||
|
remote = drive,team_drive=0ABCDEF-01234567890,root_folder_id=:
|
||||||
|
|
||||||
|
[Test Drive]
|
||||||
|
type = alias
|
||||||
|
remote = drive,team_drive=0ABCDEFabcdefghijkl,root_folder_id=:
|
||||||
|
|
||||||
|
[AllDrives]
|
||||||
|
type = combine
|
||||||
|
remote = "My Drive=My Drive:" "Test Drive=Test Drive:"
|
||||||
|
|
||||||
|
If you then add that config to your config file (find it with `rclone
|
||||||
|
config file`) then you can access all the shared drives in one place
|
||||||
|
with the `AllDrives:` remote.
|
||||||
|
|
||||||
|
See [the Google Drive docs](/drive/#drives) for full info.
|
||||||
|
|
||||||
|
{{< rem autogenerated options start" - DO NOT EDIT - instead edit fs.RegInfo in backend/combine/combine.go then run make backenddocs" >}}
|
||||||
|
### Standard options
|
||||||
|
|
||||||
|
Here are the standard options specific to combine (Combine several remotes into one).
|
||||||
|
|
||||||
|
#### --combine-upstreams
|
||||||
|
|
||||||
|
Upstreams for combining
|
||||||
|
|
||||||
|
These should be in the form
|
||||||
|
|
||||||
|
dir=remote:path dir2=remote2:path
|
||||||
|
|
||||||
|
Where before the = is specified the root directory and after is the remote to
|
||||||
|
put there.
|
||||||
|
|
||||||
|
Embedded spaces can be added using quotes
|
||||||
|
|
||||||
|
"dir=remote:path with space" "dir2=remote2:path with space"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Properties:
|
||||||
|
|
||||||
|
- Config: upstreams
|
||||||
|
- Env Var: RCLONE_COMBINE_UPSTREAMS
|
||||||
|
- Type: SpaceSepList
|
||||||
|
- Default:
|
||||||
|
|
||||||
|
{{< rem autogenerated options stop >}}
|
|
@ -37,6 +37,7 @@ See the following for detailed instructions for
|
||||||
* [Chunker](/chunker/) - transparently splits large files for other remotes
|
* [Chunker](/chunker/) - transparently splits large files for other remotes
|
||||||
* [Citrix ShareFile](/sharefile/)
|
* [Citrix ShareFile](/sharefile/)
|
||||||
* [Compress](/compress/)
|
* [Compress](/compress/)
|
||||||
|
* [Combine](/combine/)
|
||||||
* [Crypt](/crypt/) - to encrypt other remotes
|
* [Crypt](/crypt/) - to encrypt other remotes
|
||||||
* [DigitalOcean Spaces](/s3/#digitalocean-spaces)
|
* [DigitalOcean Spaces](/s3/#digitalocean-spaces)
|
||||||
* [Digi Storage](/koofr/#digi-storage)
|
* [Digi Storage](/koofr/#digi-storage)
|
||||||
|
|
|
@ -60,6 +60,7 @@
|
||||||
<a class="dropdown-item" href="/box/"><i class="fa fa-archive"></i> Box</a>
|
<a class="dropdown-item" href="/box/"><i class="fa fa-archive"></i> Box</a>
|
||||||
<a class="dropdown-item" href="/chunker/"><i class="fa fa-cut"></i> Chunker (splits large files)</a>
|
<a class="dropdown-item" href="/chunker/"><i class="fa fa-cut"></i> Chunker (splits large files)</a>
|
||||||
<a class="dropdown-item" href="/compress/"><i class="fas fa-compress"></i> Compress (transparent gzip compression)</a>
|
<a class="dropdown-item" href="/compress/"><i class="fas fa-compress"></i> Compress (transparent gzip compression)</a>
|
||||||
|
<a class="dropdown-item" href="/combine/"><i class="fa fa-folder-plus"></i> Combine (remotes into a directory tree)</a>
|
||||||
<a class="dropdown-item" href="/sharefile/"><i class="fas fa-share-square"></i> Citrix ShareFile</a>
|
<a class="dropdown-item" href="/sharefile/"><i class="fas fa-share-square"></i> Citrix ShareFile</a>
|
||||||
<a class="dropdown-item" href="/crypt/"><i class="fa fa-lock"></i> Crypt (encrypts the others)</a>
|
<a class="dropdown-item" href="/crypt/"><i class="fa fa-lock"></i> Crypt (encrypts the others)</a>
|
||||||
<a class="dropdown-item" href="/koofr/#digi-storage"><i class="fa fa-cloud"></i> Digi Storage</a>
|
<a class="dropdown-item" href="/koofr/#digi-storage"><i class="fa fa-cloud"></i> Digi Storage</a>
|
||||||
|
|
|
@ -91,6 +91,9 @@ backends:
|
||||||
fastlist: true
|
fastlist: true
|
||||||
maxfile: 1k
|
maxfile: 1k
|
||||||
## end chunker
|
## end chunker
|
||||||
|
- backend: "combine"
|
||||||
|
remote: "TestCombine:dir1"
|
||||||
|
fastlist: false
|
||||||
## begin compress
|
## begin compress
|
||||||
- backend: "compress"
|
- backend: "compress"
|
||||||
remote: "TestCompress:"
|
remote: "TestCompress:"
|
||||||
|
|
Loading…
Reference in a new issue