From 364fca5ceae3c7f613bcd53a29364e32d8f7f215 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Tue, 2 Oct 2018 22:04:50 +0100 Subject: [PATCH] union: implement optional interfaces (Move, DirMove, Copy etc) - fixes #2619 Implement optional interfaces - Purge - PutStream - Copy - Move - DirMove - DirCacheFlush - ChangeNotify - About Make Hashes() return the intersection of all the hashes supported by the remotes --- backend/union/union.go | 155 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 148 insertions(+), 7 deletions(-) diff --git a/backend/union/union.go b/backend/union/union.go index 5d614ce45..07002446c 100644 --- a/backend/union/union.go +++ b/backend/union/union.go @@ -42,6 +42,8 @@ type Fs struct { opt Options // options for this Fs root string // the path we are working on remotes []fs.Fs // slice of remotes + wr fs.Fs // writable remote + hashSet hash.Set // intersection of hash types } // Name of the remote (as passed into NewFs) @@ -66,18 +68,119 @@ func (f *Fs) Features() *fs.Features { // Rmdir removes the root directory of the Fs object func (f *Fs) Rmdir(dir string) error { - return f.remotes[len(f.remotes)-1].Rmdir(dir) + return f.wr.Rmdir(dir) } // Hashes returns hash.HashNone to indicate remote hashing is unavailable func (f *Fs) Hashes() hash.Set { - // This could probably be set if all remotes share the same hashing algorithm - return hash.Set(hash.None) + return f.hashSet } // Mkdir makes the root directory of the Fs object func (f *Fs) Mkdir(dir string) error { - return f.remotes[len(f.remotes)-1].Mkdir(dir) + return f.wr.Mkdir(dir) +} + +// Purge all files in the root and the root directory +// +// Implement this if you have a way of deleting all the files +// quicker than just running Remove() on the result of List() +// +// Return an error if it doesn't exist +func (f *Fs) Purge() error { + return f.wr.Features().Purge() +} + +// Copy src to this remote using server side copy operations. +// +// This is stored with the remote path given +// +// It returns the destination Object and a possible error +// +// Will only be called if src.Fs().Name() == f.Name() +// +// If it isn't possible then return fs.ErrorCantCopy +func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { + if src.Fs() != f.wr { + fs.Debugf(src, "Can't copy - not same remote type") + return nil, fs.ErrorCantCopy + } + return f.wr.Features().Copy(src, remote) +} + +// Move src to this remote using server side move operations. +// +// This is stored with the remote path given +// +// It returns the destination Object and a possible error +// +// Will only be called if src.Fs().Name() == f.Name() +// +// If it isn't possible then return fs.ErrorCantMove +func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { + if src.Fs() != f.wr { + fs.Debugf(src, "Can't move - not same remote type") + return nil, fs.ErrorCantMove + } + return f.wr.Features().Move(src, remote) +} + +// DirMove moves src, srcRemote to this remote at dstRemote +// using server side move operations. +// +// Will only be called if src.Fs().Name() == f.Name() +// +// If it isn't possible then return fs.ErrorCantDirMove +// +// If destination exists then return fs.ErrorDirExists +func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { + srcFs, ok := src.(*Fs) + if !ok { + fs.Debugf(srcFs, "Can't move directory - not same remote type") + return fs.ErrorCantDirMove + } + return f.wr.Features().DirMove(srcFs.wr, srcRemote, dstRemote) +} + +// ChangeNotify calls the passed function with a path +// that has had changes. If the implementation +// uses polling, it should adhere to the given interval. +// At least one value will be written to the channel, +// specifying the initial value and updated values might +// follow. A 0 Duration should pause the polling. +// The ChangeNotify implemantion must empty the channel +// regulary. When the channel gets closed, the implemantion +// should stop polling and release resources. +func (f *Fs) ChangeNotify(fn func(string, fs.EntryType), ch <-chan time.Duration) { + for _, remote := range f.remotes { + if ChangeNotify := remote.Features().ChangeNotify; ChangeNotify != nil { + ChangeNotify(fn, ch) + } + } +} + +// DirCacheFlush resets the directory cache - used in testing +// as an optional interface +func (f *Fs) DirCacheFlush() { + for _, remote := range f.remotes { + if DirCacheFlush := remote.Features().DirCacheFlush; DirCacheFlush != nil { + DirCacheFlush() + } + } +} + +// PutStream uploads to the remote path with the modTime given of indeterminate size +// +// May create the object even if it returns an error - if so +// will return the object and the error, otherwise will return +// nil and the error +func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { + return f.wr.Features().PutStream(in, src, options...) +} + +// About gets quota information from the Fs +func (f *Fs) About() (*fs.Usage, error) { + return f.wr.Features().About() } // Put in to the remote path with the modTime given of the given size @@ -86,7 +189,7 @@ func (f *Fs) Mkdir(dir string) error { // will return the object and the error, otherwise will return // nil and the error func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - return f.remotes[len(f.remotes)-1].Put(in, src, options...) + return f.wr.Put(in, src, options...) } // List the objects and directories in dir into entries. The @@ -204,6 +307,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { root: root, opt: *opt, remotes: remotes, + wr: remotes[len(remotes)-1], } var features = (&fs.Features{ CaseInsensitive: true, @@ -212,16 +316,53 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) { WriteMimeType: true, CanHaveEmptyDirectories: true, BucketBased: true, + SetTier: true, + GetTier: true, }).Fill(f) + features = features.Mask(f.wr) // mask the features just on the writable fs + + // FIXME maybe should be masking the bools here? + + // Clear ChangeNotify and DirCacheFlush if all are nil + clearChangeNotify := true + clearDirCacheFlush := true for _, remote := range f.remotes { - features = features.Mask(remote) + remoteFeatures := remote.Features() + if remoteFeatures.ChangeNotify != nil { + clearChangeNotify = false + } + if remoteFeatures.DirCacheFlush != nil { + clearDirCacheFlush = false + } } + if clearChangeNotify { + features.ChangeNotify = nil + } + if clearDirCacheFlush { + features.DirCacheFlush = nil + } + f.features = features + // Get common intersection of hashes + hashSet := f.remotes[0].Hashes() + for _, remote := range f.remotes[1:] { + hashSet = hashSet.Overlap(remote.Hashes()) + } + f.hashSet = hashSet + return f, nil } // Check the interfaces are satisfied var ( - _ fs.Fs = &Fs{} + _ fs.Fs = (*Fs)(nil) + _ fs.Purger = (*Fs)(nil) + _ fs.PutStreamer = (*Fs)(nil) + _ fs.Copier = (*Fs)(nil) + _ fs.Mover = (*Fs)(nil) + _ fs.DirMover = (*Fs)(nil) + _ fs.DirCacheFlusher = (*Fs)(nil) + _ fs.ChangeNotifier = (*Fs)(nil) + _ fs.Abouter = (*Fs)(nil) )