rclone/backend/combine/combine.go
nielash 252562d00a combine: fix CopyDirMetadata error on upstream root
Before this change, operations.CopyDirMetadata would fail with: `internal error:
expecting directory string from combine root '' to have SetMetadata method:
optional feature not implemented` if the dst was the root directory of a combine
upstream. This is because combine was returning a *fs.Dir, which does not
satisfy the fs.SetMetadataer interface.

While it is true that combine cannot set metadata on the root of an upstream
(see also #7652), this should not be considered an error that causes sync to do
high-level retries, abort without doing deletes, etc.

This change addresses the issue by creating a new type of DirWrapper that is
allowed to fail silently, for exceptional cases such as this where certain
special directories have more limited abilities than what the Fs usually
supports.

It is possible that other similar wrapping backends (Union?) may need this same
fix.
2024-03-07 11:09:07 +00:00

1153 lines
30 KiB
Go

// Package combine implements a backend to combine multiple remotes in a directory tree
package combine
/*
Have API to add/remove branches in the combine
*/
import (
"context"
"errors"
"fmt"
"io"
"path"
"strings"
"sync"
"time"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/fs/walk"
"golang.org/x/sync/errgroup"
)
// Register with Fs
func init() {
fsi := &fs.RegInfo{
Name: "combine",
Description: "Combine several remotes into one",
NewFs: NewFs,
MetadataInfo: &fs.MetadataInfo{
Help: `Any metadata supported by the underlying remote is read and written.`,
},
Options: []fs.Option{{
Name: "upstreams",
Help: `Upstreams for combining
These should be in the form
dir=remote:path dir2=remote2:path
Where before the = is specified the root directory and after is the remote to
put there.
Embedded spaces can be added using quotes
"dir=remote:path with space" "dir2=remote2:path with space"
`,
Required: true,
Default: fs.SpaceSepList(nil),
}},
}
fs.Register(fsi)
}
// Options defines the configuration for this backend
type Options struct {
Upstreams fs.SpaceSepList `config:"upstreams"`
}
// Fs represents a combine of upstreams
type Fs struct {
name string // name of this remote
features *fs.Features // optional features
opt Options // options for this Fs
root string // the path we are working on
hashSet hash.Set // common hashes
when time.Time // directory times
upstreams map[string]*upstream // map of upstreams
}
// adjustment stores the info to add a prefix to a path or chop characters off
type adjustment struct {
root string
rootSlash string
mountpoint string
mountpointSlash string
}
// newAdjustment makes a new path adjustment adjusting between mountpoint and root
//
// mountpoint is the point the upstream is mounted and root is the combine root
func newAdjustment(root, mountpoint string) (a adjustment) {
return adjustment{
root: root,
rootSlash: root + "/",
mountpoint: mountpoint,
mountpointSlash: mountpoint + "/",
}
}
var errNotUnderRoot = errors.New("file not under root")
// do makes the adjustment on s, mapping an upstream path into a combine path
func (a *adjustment) do(s string) (string, error) {
absPath := join(a.mountpoint, s)
if a.root == "" {
return absPath, nil
}
if absPath == a.root {
return "", nil
}
if !strings.HasPrefix(absPath, a.rootSlash) {
return "", errNotUnderRoot
}
return absPath[len(a.rootSlash):], nil
}
// undo makes the adjustment on s, mapping a combine path into an upstream path
func (a *adjustment) undo(s string) (string, error) {
absPath := join(a.root, s)
if absPath == a.mountpoint {
return "", nil
}
if !strings.HasPrefix(absPath, a.mountpointSlash) {
return "", errNotUnderRoot
}
return absPath[len(a.mountpointSlash):], nil
}
// upstream represents an upstream Fs
type upstream struct {
f fs.Fs
parent *Fs
dir string // directory the upstream is mounted
pathAdjustment adjustment // how to fiddle with the path
}
// Create an upstream from the directory it is mounted on and the remote
func (f *Fs) newUpstream(ctx context.Context, dir, remote string) (*upstream, error) {
uFs, err := cache.Get(ctx, remote)
if err == fs.ErrorIsFile {
return nil, fmt.Errorf("can't combine files yet, only directories %q: %w", remote, err)
}
if err != nil {
return nil, fmt.Errorf("failed to create upstream %q: %w", remote, err)
}
u := &upstream{
f: uFs,
parent: f,
dir: dir,
pathAdjustment: newAdjustment(f.root, dir),
}
cache.PinUntilFinalized(u.f, u)
return u, nil
}
// NewFs constructs an Fs from the path.
//
// The returned Fs is the actual Fs, referenced by remote in the config
func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (outFs fs.Fs, err error) {
// defer log.Trace(nil, "name=%q, root=%q, m=%v", name, root, m)("f=%+v, err=%v", &outFs, &err)
// Parse config into Options struct
opt := new(Options)
err = configstruct.Set(m, opt)
if err != nil {
return nil, err
}
// Backward compatible to old config
if len(opt.Upstreams) == 0 {
return nil, errors.New("combine can't point to an empty upstream - check the value of the upstreams setting")
}
for _, u := range opt.Upstreams {
if strings.HasPrefix(u, name+":") {
return nil, errors.New("can't point combine remote at itself - check the value of the upstreams setting")
}
}
isDir := false
for strings.HasSuffix(root, "/") {
root = root[:len(root)-1]
isDir = true
}
f := &Fs{
name: name,
root: root,
opt: *opt,
upstreams: make(map[string]*upstream, len(opt.Upstreams)),
when: time.Now(),
}
g, gCtx := errgroup.WithContext(ctx)
var mu sync.Mutex
for _, upstream := range opt.Upstreams {
upstream := upstream
g.Go(func() (err error) {
equal := strings.IndexRune(upstream, '=')
if equal < 0 {
return fmt.Errorf("no \"=\" in upstream definition %q", upstream)
}
dir, remote := upstream[:equal], upstream[equal+1:]
if dir == "" {
return fmt.Errorf("empty dir in upstream definition %q", upstream)
}
if remote == "" {
return fmt.Errorf("empty remote in upstream definition %q", upstream)
}
if strings.ContainsRune(dir, '/') {
return fmt.Errorf("dirs can't contain / (yet): %q", dir)
}
u, err := f.newUpstream(gCtx, dir, remote)
if err != nil {
return err
}
mu.Lock()
if _, found := f.upstreams[dir]; found {
err = fmt.Errorf("duplicate directory name %q", dir)
} else {
f.upstreams[dir] = u
}
mu.Unlock()
return err
})
}
err = g.Wait()
if err != nil {
return nil, err
}
// check features
var features = (&fs.Features{
CaseInsensitive: true,
DuplicateFiles: false,
ReadMimeType: true,
WriteMimeType: true,
CanHaveEmptyDirectories: true,
BucketBased: true,
SetTier: true,
GetTier: true,
ReadMetadata: true,
WriteMetadata: true,
UserMetadata: true,
ReadDirMetadata: true,
WriteDirMetadata: true,
WriteDirSetModTime: true,
UserDirMetadata: true,
DirModTimeUpdatesOnWrite: true,
PartialUploads: true,
}).Fill(ctx, f)
canMove := true
for _, u := range f.upstreams {
features = features.Mask(ctx, u.f) // Mask all upstream fs
if !operations.CanServerSideMove(u.f) {
canMove = false
}
}
// We can move if all remotes support Move or Copy
if canMove {
features.Move = f.Move
}
// Enable ListR when upstreams either support ListR or is local
// But not when all upstreams are local
if features.ListR == nil {
for _, u := range f.upstreams {
if u.f.Features().ListR != nil {
features.ListR = f.ListR
} else if !u.f.Features().IsLocal {
features.ListR = nil
break
}
}
}
// Enable Purge when any upstreams support it
if features.Purge == nil {
for _, u := range f.upstreams {
if u.f.Features().Purge != nil {
features.Purge = f.Purge
break
}
}
}
// Enable Shutdown when any upstreams support it
if features.Shutdown == nil {
for _, u := range f.upstreams {
if u.f.Features().Shutdown != nil {
features.Shutdown = f.Shutdown
break
}
}
}
// Enable DirCacheFlush when any upstreams support it
if features.DirCacheFlush == nil {
for _, u := range f.upstreams {
if u.f.Features().DirCacheFlush != nil {
features.DirCacheFlush = f.DirCacheFlush
break
}
}
}
// Enable CleanUp when any upstreams support it
if features.CleanUp == nil {
for _, u := range f.upstreams {
if u.f.Features().CleanUp != nil {
features.CleanUp = f.CleanUp
break
}
}
}
// Enable ChangeNotify when any upstreams support it
if features.ChangeNotify == nil {
for _, u := range f.upstreams {
if u.f.Features().ChangeNotify != nil {
features.ChangeNotify = f.ChangeNotify
break
}
}
}
// show that we wrap other backends
features.Overlay = true
f.features = features
// Get common intersection of hashes
var hashSet hash.Set
var first = true
for _, u := range f.upstreams {
if first {
hashSet = u.f.Hashes()
first = false
} else {
hashSet = hashSet.Overlap(u.f.Hashes())
}
}
f.hashSet = hashSet
// Check to see if the root is actually a file
if f.root != "" && !isDir {
_, err := f.NewObject(ctx, "")
if err != nil {
if err == fs.ErrorObjectNotFound || err == fs.ErrorNotAFile || err == fs.ErrorIsDir {
// File doesn't exist or is a directory so return old f
return f, nil
}
return nil, err
}
// Check to see if the root path is actually an existing file
f.root = path.Dir(f.root)
if f.root == "." {
f.root = ""
}
// Adjust path adjustment to remove leaf
for _, u := range f.upstreams {
u.pathAdjustment = newAdjustment(f.root, u.dir)
}
return f, fs.ErrorIsFile
}
return f, nil
}
// Run a function over all the upstreams in parallel
func (f *Fs) multithread(ctx context.Context, fn func(context.Context, *upstream) error) error {
g, gCtx := errgroup.WithContext(ctx)
for _, u := range f.upstreams {
u := u
g.Go(func() (err error) {
return fn(gCtx, u)
})
}
return g.Wait()
}
// join the elements together but unlike path.Join return empty string
func join(elem ...string) string {
result := path.Join(elem...)
if result == "." {
return ""
}
if len(result) > 0 && result[0] == '/' {
result = result[1:]
}
return result
}
// find the upstream for the remote passed in, returning the upstream and the adjusted path
func (f *Fs) findUpstream(remote string) (u *upstream, uRemote string, err error) {
// defer log.Trace(remote, "")("f=%v, uRemote=%q, err=%v", &u, &uRemote, &err)
for _, u := range f.upstreams {
uRemote, err = u.pathAdjustment.undo(remote)
if err == nil {
return u, uRemote, nil
}
}
return nil, "", fmt.Errorf("combine for remote %q: %w", remote, fs.ErrorDirNotFound)
}
// Name of the remote (as passed into NewFs)
func (f *Fs) Name() string {
return f.name
}
// Root of the remote (as passed into NewFs)
func (f *Fs) Root() string {
return f.root
}
// String converts this Fs to a string
func (f *Fs) String() string {
return fmt.Sprintf("combine root '%s'", f.root)
}
// Features returns the optional features of this Fs
func (f *Fs) Features() *fs.Features {
return f.features
}
// Rmdir removes the root directory of the Fs object
func (f *Fs) Rmdir(ctx context.Context, dir string) error {
// The root always exists
if f.root == "" && dir == "" {
return nil
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return err
}
return u.f.Rmdir(ctx, uRemote)
}
// Hashes returns hash.HashNone to indicate remote hashing is unavailable
func (f *Fs) Hashes() hash.Set {
return f.hashSet
}
// Mkdir makes the root directory of the Fs object
func (f *Fs) Mkdir(ctx context.Context, dir string) error {
// The root always exists
if f.root == "" && dir == "" {
return nil
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return err
}
return u.f.Mkdir(ctx, uRemote)
}
// MkdirMetadata makes the root directory of the Fs object
func (f *Fs) MkdirMetadata(ctx context.Context, dir string, metadata fs.Metadata) (fs.Directory, error) {
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return nil, err
}
do := u.f.Features().MkdirMetadata
if do == nil {
return nil, fs.ErrorNotImplemented
}
newDir, err := do(ctx, uRemote, metadata)
if err != nil {
return nil, err
}
entries := fs.DirEntries{newDir}
entries, err = u.wrapEntries(ctx, entries)
if err != nil {
return nil, err
}
newDir, ok := entries[0].(fs.Directory)
if !ok {
return nil, fmt.Errorf("internal error: expecting %T to be fs.Directory", entries[0])
}
return newDir, nil
}
// purge the upstream or fallback to a slow way
func (u *upstream) purge(ctx context.Context, dir string) (err error) {
if do := u.f.Features().Purge; do != nil {
err = do(ctx, dir)
} else {
err = operations.Purge(ctx, u.f, dir)
}
return err
}
// Purge all files in the directory
//
// Implement this if you have a way of deleting all the files
// quicker than just running Remove() on the result of List()
//
// Return an error if it doesn't exist
func (f *Fs) Purge(ctx context.Context, dir string) error {
if f.root == "" && dir == "" {
return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
return u.purge(ctx, "")
})
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return err
}
return u.purge(ctx, uRemote)
}
// Copy src to this remote using server-side copy operations.
//
// This is stored with the remote path given.
//
// It returns the destination Object and a possible error.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
srcObj, ok := src.(*Object)
if !ok {
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
dstU, dstRemote, err := f.findUpstream(remote)
if err != nil {
return nil, err
}
do := dstU.f.Features().Copy
if do == nil {
return nil, fs.ErrorCantCopy
}
o, err := do(ctx, srcObj.Object, dstRemote)
if err != nil {
return nil, err
}
return dstU.newObject(o), nil
}
// Move src to this remote using server-side move operations.
//
// This is stored with the remote path given.
//
// It returns the destination Object and a possible error.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantMove
func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
srcObj, ok := src.(*Object)
if !ok {
fs.Debugf(src, "Can't move - not same remote type")
return nil, fs.ErrorCantMove
}
dstU, dstRemote, err := f.findUpstream(remote)
if err != nil {
return nil, err
}
do := dstU.f.Features().Move
useCopy := false
if do == nil {
do = dstU.f.Features().Copy
if do == nil {
return nil, fs.ErrorCantMove
}
useCopy = true
}
o, err := do(ctx, srcObj.Object, dstRemote)
if err != nil {
return nil, err
}
// If did Copy then remove the source object
if useCopy {
err = srcObj.Remove(ctx)
if err != nil {
return nil, err
}
}
return dstU.newObject(o), nil
}
// DirMove moves src, srcRemote to this remote at dstRemote
// using server-side move operations.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantDirMove
//
// If destination exists then return fs.ErrorDirExists
func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) (err error) {
// defer log.Trace(f, "src=%v, srcRemote=%q, dstRemote=%q", src, srcRemote, dstRemote)("err=%v", &err)
srcFs, ok := src.(*Fs)
if !ok {
fs.Debugf(src, "Can't move directory - not same remote type")
return fs.ErrorCantDirMove
}
dstU, dstURemote, err := f.findUpstream(dstRemote)
if err != nil {
return err
}
srcU, srcURemote, err := srcFs.findUpstream(srcRemote)
if err != nil {
return err
}
do := dstU.f.Features().DirMove
if do == nil {
return fs.ErrorCantDirMove
}
fs.Logf(dstU.f, "srcU.f=%v, srcURemote=%q, dstURemote=%q", srcU.f, srcURemote, dstURemote)
return do(ctx, srcU.f, srcURemote, dstURemote)
}
// ChangeNotify calls the passed function with a path
// that has had changes. If the implementation
// uses polling, it should adhere to the given interval.
// At least one value will be written to the channel,
// specifying the initial value and updated values might
// follow. A 0 Duration should pause the polling.
// The ChangeNotify implementation must empty the channel
// regularly. When the channel gets closed, the implementation
// should stop polling and release resources.
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), ch <-chan time.Duration) {
var uChans []chan time.Duration
for _, u := range f.upstreams {
u := u
if do := u.f.Features().ChangeNotify; do != nil {
ch := make(chan time.Duration)
uChans = append(uChans, ch)
wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
newPath, err := u.pathAdjustment.do(path)
if err != nil {
fs.Logf(f, "ChangeNotify: unable to process %q: %s", path, err)
return
}
fs.Debugf(f, "ChangeNotify: path %q entryType %d", newPath, entryType)
notifyFunc(newPath, entryType)
}
do(ctx, wrappedNotifyFunc, ch)
}
}
go func() {
for i := range ch {
for _, c := range uChans {
c <- i
}
}
for _, c := range uChans {
close(c)
}
}()
}
// DirCacheFlush resets the directory cache - used in testing
// as an optional interface
func (f *Fs) DirCacheFlush() {
ctx := context.Background()
_ = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
if do := u.f.Features().DirCacheFlush; do != nil {
do()
}
return nil
})
}
func (f *Fs) put(ctx context.Context, in io.Reader, src fs.ObjectInfo, stream bool, options ...fs.OpenOption) (fs.Object, error) {
srcPath := src.Remote()
u, uRemote, err := f.findUpstream(srcPath)
if err != nil {
return nil, err
}
uSrc := fs.NewOverrideRemote(src, uRemote)
var o fs.Object
if stream {
o, err = u.f.Features().PutStream(ctx, in, uSrc, options...)
} else {
o, err = u.f.Put(ctx, in, uSrc, options...)
}
if err != nil {
return nil, err
}
return u.newObject(o), nil
}
// Put in to the remote path with the modTime given of the given size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
o, err := f.NewObject(ctx, src.Remote())
switch err {
case nil:
return o, o.Update(ctx, in, src, options...)
case fs.ErrorObjectNotFound:
return f.put(ctx, in, src, false, options...)
default:
return nil, err
}
}
// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
o, err := f.NewObject(ctx, src.Remote())
switch err {
case nil:
return o, o.Update(ctx, in, src, options...)
case fs.ErrorObjectNotFound:
return f.put(ctx, in, src, true, options...)
default:
return nil, err
}
}
// About gets quota information from the Fs
func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
usage := &fs.Usage{
Total: new(int64),
Used: new(int64),
Trashed: new(int64),
Other: new(int64),
Free: new(int64),
Objects: new(int64),
}
for _, u := range f.upstreams {
doAbout := u.f.Features().About
if doAbout == nil {
continue
}
usg, err := doAbout(ctx)
if errors.Is(err, fs.ErrorDirNotFound) {
continue
}
if err != nil {
return nil, err
}
if usg.Total != nil && usage.Total != nil {
*usage.Total += *usg.Total
} else {
usage.Total = nil
}
if usg.Used != nil && usage.Used != nil {
*usage.Used += *usg.Used
} else {
usage.Used = nil
}
if usg.Trashed != nil && usage.Trashed != nil {
*usage.Trashed += *usg.Trashed
} else {
usage.Trashed = nil
}
if usg.Other != nil && usage.Other != nil {
*usage.Other += *usg.Other
} else {
usage.Other = nil
}
if usg.Free != nil && usage.Free != nil {
*usage.Free += *usg.Free
} else {
usage.Free = nil
}
if usg.Objects != nil && usage.Objects != nil {
*usage.Objects += *usg.Objects
} else {
usage.Objects = nil
}
}
return usage, nil
}
// Wraps entries for this upstream
func (u *upstream) wrapEntries(ctx context.Context, entries fs.DirEntries) (fs.DirEntries, error) {
for i, entry := range entries {
switch x := entry.(type) {
case fs.Object:
entries[i] = u.newObject(x)
case fs.Directory:
newPath, err := u.pathAdjustment.do(x.Remote())
if err != nil {
return nil, err
}
newDir := fs.NewDirWrapper(newPath, x)
entries[i] = newDir
default:
return nil, fmt.Errorf("unknown entry type %T", entry)
}
}
return entries, nil
}
// List the objects and directories in dir into entries. The
// entries can be returned in any order but should be for a
// complete directory.
//
// dir should be "" to list the root, and should not have
// trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
// defer log.Trace(f, "dir=%q", dir)("entries = %v, err=%v", &entries, &err)
if f.root == "" && dir == "" {
entries = make(fs.DirEntries, 0, len(f.upstreams))
for combineDir := range f.upstreams {
d := fs.NewLimitedDirWrapper(combineDir, fs.NewDir(combineDir, f.when))
entries = append(entries, d)
}
return entries, nil
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return nil, err
}
entries, err = u.f.List(ctx, uRemote)
if err != nil {
return nil, err
}
return u.wrapEntries(ctx, entries)
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
//
// dir should be "" to start from the root, and should not
// have trailing slashes.
//
// This should return ErrDirNotFound if the directory isn't
// found.
//
// It should call callback for each tranche of entries read.
// These need not be returned in any particular order. If
// callback returns an error then the listing will stop
// immediately.
//
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
// defer log.Trace(f, "dir=%q, callback=%v", dir, callback)("err=%v", &err)
if f.root == "" && dir == "" {
rootEntries, err := f.List(ctx, "")
if err != nil {
return err
}
err = callback(rootEntries)
if err != nil {
return err
}
var mu sync.Mutex
syncCallback := func(entries fs.DirEntries) error {
mu.Lock()
defer mu.Unlock()
return callback(entries)
}
err = f.multithread(ctx, func(ctx context.Context, u *upstream) error {
return f.ListR(ctx, u.dir, syncCallback)
})
if err != nil {
return err
}
return nil
}
u, uRemote, err := f.findUpstream(dir)
if err != nil {
return err
}
wrapCallback := func(entries fs.DirEntries) error {
entries, err := u.wrapEntries(ctx, entries)
if err != nil {
return err
}
return callback(entries)
}
if do := u.f.Features().ListR; do != nil {
err = do(ctx, uRemote, wrapCallback)
} else {
err = walk.ListR(ctx, u.f, uRemote, true, -1, walk.ListAll, wrapCallback)
}
if err == fs.ErrorDirNotFound {
err = nil
}
return err
}
// NewObject creates a new remote combine file object
func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
u, uRemote, err := f.findUpstream(remote)
if err != nil {
return nil, err
}
if uRemote == "" || strings.HasSuffix(uRemote, "/") {
return nil, fs.ErrorIsDir
}
o, err := u.f.NewObject(ctx, uRemote)
if err != nil {
return nil, err
}
return u.newObject(o), nil
}
// Precision is the greatest Precision of all upstreams
func (f *Fs) Precision() time.Duration {
var greatestPrecision time.Duration
for _, u := range f.upstreams {
uPrecision := u.f.Precision()
if uPrecision > greatestPrecision {
greatestPrecision = uPrecision
}
}
return greatestPrecision
}
// Shutdown the backend, closing any background tasks and any
// cached connections.
func (f *Fs) Shutdown(ctx context.Context) error {
return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
if do := u.f.Features().Shutdown; do != nil {
return do(ctx)
}
return nil
})
}
// PublicLink generates a public link to the remote path (usually readable by anyone)
func (f *Fs) PublicLink(ctx context.Context, remote string, expire fs.Duration, unlink bool) (string, error) {
u, uRemote, err := f.findUpstream(remote)
if err != nil {
return "", err
}
do := u.f.Features().PublicLink
if do == nil {
return "", fs.ErrorNotImplemented
}
return do(ctx, uRemote, expire, unlink)
}
// PutUnchecked in to the remote path with the modTime given of the given size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
//
// May create duplicates or return errors if src already
// exists.
func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
srcPath := src.Remote()
u, uRemote, err := f.findUpstream(srcPath)
if err != nil {
return nil, err
}
do := u.f.Features().PutUnchecked
if do == nil {
return nil, fs.ErrorNotImplemented
}
uSrc := fs.NewOverrideRemote(src, uRemote)
return do(ctx, in, uSrc, options...)
}
// MergeDirs merges the contents of all the directories passed
// in into the first one and rmdirs the other directories.
func (f *Fs) MergeDirs(ctx context.Context, dirs []fs.Directory) error {
if len(dirs) == 0 {
return nil
}
var (
u *upstream
uDirs []fs.Directory
)
for _, dir := range dirs {
uNew, uDir, err := f.findUpstream(dir.Remote())
if err != nil {
return err
}
if u == nil {
u = uNew
} else if u != uNew {
return fmt.Errorf("can't merge directories from different upstreams")
}
uDirs = append(uDirs, fs.NewOverrideDirectory(dir, uDir))
}
do := u.f.Features().MergeDirs
if do == nil {
return fs.ErrorNotImplemented
}
return do(ctx, uDirs)
}
// DirSetModTime sets the directory modtime for dir
func (f *Fs) DirSetModTime(ctx context.Context, dir string, modTime time.Time) error {
u, uDir, err := f.findUpstream(dir)
if err != nil {
return err
}
if uDir == "" {
fs.Debugf(dir, "Can't set modtime on upstream root. skipping.")
return nil
}
if do := u.f.Features().DirSetModTime; do != nil {
return do(ctx, uDir, modTime)
}
return fs.ErrorNotImplemented
}
// CleanUp the trash in the Fs
//
// Implement this if you have a way of emptying the trash or
// otherwise cleaning up old versions of files.
func (f *Fs) CleanUp(ctx context.Context) error {
return f.multithread(ctx, func(ctx context.Context, u *upstream) error {
if do := u.f.Features().CleanUp; do != nil {
return do(ctx)
}
return nil
})
}
// OpenWriterAt opens with a handle for random access writes
//
// Pass in the remote desired and the size if known.
//
// It truncates any existing object
func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.WriterAtCloser, error) {
u, uRemote, err := f.findUpstream(remote)
if err != nil {
return nil, err
}
do := u.f.Features().OpenWriterAt
if do == nil {
return nil, fs.ErrorNotImplemented
}
return do(ctx, uRemote, size)
}
// Object describes a wrapped Object
//
// This is a wrapped Object which knows its path prefix
type Object struct {
fs.Object
u *upstream
}
func (u *upstream) newObject(o fs.Object) *Object {
return &Object{
Object: o,
u: u,
}
}
// Fs returns read only access to the Fs that this object is part of
func (o *Object) Fs() fs.Info {
return o.u.parent
}
// String returns the remote path
func (o *Object) String() string {
return o.Remote()
}
// Remote returns the remote path
func (o *Object) Remote() string {
newPath, err := o.u.pathAdjustment.do(o.Object.String())
if err != nil {
fs.Errorf(o.Object, "Bad object: %v", err)
return err.Error()
}
return newPath
}
// MimeType returns the content type of the Object if known
func (o *Object) MimeType(ctx context.Context) (mimeType string) {
if do, ok := o.Object.(fs.MimeTyper); ok {
mimeType = do.MimeType(ctx)
}
return mimeType
}
// UnWrap returns the Object that this Object is wrapping or
// nil if it isn't wrapping anything
func (o *Object) UnWrap() fs.Object {
return o.Object
}
// GetTier returns storage tier or class of the Object
func (o *Object) GetTier() string {
do, ok := o.Object.(fs.GetTierer)
if !ok {
return ""
}
return do.GetTier()
}
// ID returns the ID of the Object if known, or "" if not
func (o *Object) ID() string {
do, ok := o.Object.(fs.IDer)
if !ok {
return ""
}
return do.ID()
}
// Metadata returns metadata for an object
//
// It should return nil if there is no Metadata
func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
do, ok := o.Object.(fs.Metadataer)
if !ok {
return nil, nil
}
return do.Metadata(ctx)
}
// SetTier performs changing storage tier of the Object if
// multiple storage classes supported
func (o *Object) SetTier(tier string) error {
do, ok := o.Object.(fs.SetTierer)
if !ok {
return errors.New("underlying remote does not support SetTier")
}
return do.SetTier(tier)
}
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
_ fs.Abouter = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil)
_ fs.Shutdowner = (*Fs)(nil)
_ fs.PublicLinker = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil)
_ fs.MergeDirser = (*Fs)(nil)
_ fs.DirSetModTimer = (*Fs)(nil)
_ fs.MkdirMetadataer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil)
_ fs.OpenWriterAter = (*Fs)(nil)
_ fs.FullObject = (*Object)(nil)
)