forked from TrueCloudLab/rclone
core: Implement Walk directory listing and use in place of Lister
This is in preparation for removing the Lister code and replacing the fundamental operation in the Fs with listing a single directory.
This commit is contained in:
parent
1e88f0702a
commit
7e20e16cff
10 changed files with 719 additions and 168 deletions
|
@ -20,6 +20,7 @@ import (
|
|||
_ "github.com/ncw/rclone/cmd/gendocs"
|
||||
_ "github.com/ncw/rclone/cmd/listremotes"
|
||||
_ "github.com/ncw/rclone/cmd/ls"
|
||||
_ "github.com/ncw/rclone/cmd/ls2"
|
||||
_ "github.com/ncw/rclone/cmd/lsd"
|
||||
_ "github.com/ncw/rclone/cmd/lsl"
|
||||
_ "github.com/ncw/rclone/cmd/md5sum"
|
||||
|
|
46
cmd/ls2/ls2.go
Normal file
46
cmd/ls2/ls2.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
package ls2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/ncw/rclone/cmd"
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var (
|
||||
recurse bool
|
||||
)
|
||||
|
||||
func init() {
|
||||
cmd.Root.AddCommand(commandDefintion)
|
||||
commandDefintion.Flags().BoolVarP(&recurse, "recursive", "R", false, "Recurse into the listing.")
|
||||
}
|
||||
|
||||
var commandDefintion = &cobra.Command{
|
||||
Use: "ls2 remote:path",
|
||||
Short: `List directories and objects in the path.`,
|
||||
Hidden: true,
|
||||
Run: func(command *cobra.Command, args []string) {
|
||||
cmd.CheckArgs(1, 1, command, args)
|
||||
fsrc := cmd.NewFsSrc(args)
|
||||
cmd.Run(false, false, command, func() error {
|
||||
return fs.Walk(fsrc, "", false, fs.ConfigMaxDepth(recurse), func(path string, entries fs.DirEntries, err error) error {
|
||||
if err != nil {
|
||||
fs.Stats.Error()
|
||||
fs.Errorf(path, "error listing: %v", err)
|
||||
return nil
|
||||
}
|
||||
for _, entry := range entries {
|
||||
_, isDir := entry.(*fs.Dir)
|
||||
if isDir {
|
||||
fmt.Println(entry.Remote() + "/")
|
||||
} else {
|
||||
fmt.Println(entry.Remote())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
},
|
||||
}
|
|
@ -240,7 +240,7 @@ func (r *Run) readLocal(t *testing.T, dir dirMap, filepath string) {
|
|||
|
||||
// reads the remote tree into dir
|
||||
func (r *Run) readRemote(t *testing.T, dir dirMap, filepath string) {
|
||||
objs, dirs, err := fs.NewLister().SetLevel(1).Start(r.fremote, filepath).GetAll()
|
||||
objs, dirs, err := fs.WalkGetAll(r.fremote, filepath, true, 1)
|
||||
if err == fs.ErrorDirNotFound {
|
||||
return
|
||||
}
|
||||
|
|
254
fs/operations.go
254
fs/operations.go
|
@ -516,32 +516,12 @@ func DeleteFiles(toBeDeleted ObjectsChan) error {
|
|||
// Each object is passed ito the function provided. If that returns
|
||||
// an error then the listing will be aborted and that error returned.
|
||||
func readFilesFn(fs Fs, includeAll bool, dir string, add func(Object) error) (err error) {
|
||||
list := NewLister()
|
||||
if !includeAll {
|
||||
list.SetFilter(Config.Filter)
|
||||
list.SetLevel(Config.MaxDepth)
|
||||
}
|
||||
list.Start(fs, dir)
|
||||
for {
|
||||
o, err := list.GetObject()
|
||||
return Walk(fs, "", includeAll, Config.MaxDepth, func(dirPath string, entries DirEntries, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Check if we are finished
|
||||
if o == nil {
|
||||
break
|
||||
}
|
||||
// Make sure we don't delete excluded files if not required
|
||||
if includeAll || Config.Filter.IncludeObject(o) {
|
||||
err = add(o)
|
||||
if err != nil {
|
||||
list.SetError(err)
|
||||
}
|
||||
} else {
|
||||
Debugf(o, "Excluded from sync (and deletion)")
|
||||
}
|
||||
}
|
||||
return list.Error()
|
||||
return entries.ForObjectError(add)
|
||||
})
|
||||
}
|
||||
|
||||
// DirEntries is a slice of Object or *Dir
|
||||
|
@ -562,6 +542,54 @@ func (ds DirEntries) Less(i, j int) bool {
|
|||
return ds[i].Remote() < ds[j].Remote()
|
||||
}
|
||||
|
||||
// ForObject runs the function supplied on every object in the entries
|
||||
func (ds DirEntries) ForObject(fn func(o Object)) {
|
||||
for _, entry := range ds {
|
||||
o, ok := entry.(Object)
|
||||
if ok {
|
||||
fn(o)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ForObjectError runs the function supplied on every object in the entries
|
||||
func (ds DirEntries) ForObjectError(fn func(o Object) error) error {
|
||||
for _, entry := range ds {
|
||||
o, ok := entry.(Object)
|
||||
if ok {
|
||||
err := fn(o)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ForDir runs the function supplied on every object in the entries
|
||||
func (ds DirEntries) ForDir(fn func(dir *Dir)) {
|
||||
for _, entry := range ds {
|
||||
dir, ok := entry.(*Dir)
|
||||
if ok {
|
||||
fn(dir)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ForDirError runs the function supplied on every object in the entries
|
||||
func (ds DirEntries) ForDirError(fn func(dir *Dir) error) error {
|
||||
for _, entry := range ds {
|
||||
dir, ok := entry.(*Dir)
|
||||
if ok {
|
||||
err := fn(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListDirSorted reads Object and *Dir into entries for the given Fs.
|
||||
//
|
||||
// dir is the start directory, "" for root
|
||||
|
@ -907,32 +935,14 @@ func CheckDownload(fdst, fsrc Fs) error {
|
|||
//
|
||||
// Lists in parallel which may get them out of order
|
||||
func ListFn(f Fs, fn func(Object)) error {
|
||||
list := NewLister().SetFilter(Config.Filter).SetLevel(Config.MaxDepth).Start(f, "")
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(Config.Checkers)
|
||||
for i := 0; i < Config.Checkers; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
o, err := list.GetObject()
|
||||
if err != nil {
|
||||
// The error will be persisted within the Lister object and
|
||||
// we'll get an opportunity to return it as we leave this
|
||||
// function.
|
||||
return
|
||||
}
|
||||
// check if we are finished
|
||||
if o == nil {
|
||||
return
|
||||
}
|
||||
if Config.Filter.IncludeObject(o) {
|
||||
fn(o)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
return list.Error()
|
||||
return Walk(f, "", false, Config.MaxDepth, func(dirPath string, entries DirEntries, err error) error {
|
||||
if err != nil {
|
||||
// FIXME count errors and carry on for listing
|
||||
return err
|
||||
}
|
||||
entries.ForObject(fn)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// mutex for synchronized output
|
||||
|
@ -1026,24 +1036,29 @@ func Count(f Fs) (objects int64, size int64, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
// ConfigMaxDepth returns the depth to use for a recursive or non recursive listing.
|
||||
func ConfigMaxDepth(recursive bool) int {
|
||||
depth := Config.MaxDepth
|
||||
if !recursive && depth < 0 {
|
||||
depth = 1
|
||||
}
|
||||
return depth
|
||||
}
|
||||
|
||||
// ListDir lists the directories/buckets/containers in the Fs to the supplied writer
|
||||
func ListDir(f Fs, w io.Writer) error {
|
||||
level := 1
|
||||
if Config.MaxDepth > 0 {
|
||||
level = Config.MaxDepth
|
||||
}
|
||||
list := NewLister().SetFilter(Config.Filter).SetLevel(level).Start(f, "")
|
||||
for {
|
||||
dir, err := list.GetDir()
|
||||
return Walk(f, "", false, ConfigMaxDepth(false), func(dirPath string, entries DirEntries, err error) error {
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
// FIXME count errors and carry on for listing
|
||||
return err
|
||||
}
|
||||
if dir == nil {
|
||||
break
|
||||
}
|
||||
syncFprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
|
||||
}
|
||||
return nil
|
||||
entries.ForDir(func(dir *Dir) {
|
||||
if dir != nil {
|
||||
syncFprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
|
||||
}
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Mkdir makes a destination directory or container
|
||||
|
@ -1101,8 +1116,7 @@ func Purge(f Fs) error {
|
|||
}
|
||||
if doFallbackPurge {
|
||||
// DeleteFiles and Rmdir observe --dry-run
|
||||
list := NewLister().Start(f, "")
|
||||
err = DeleteFiles(listToChan(list))
|
||||
err = DeleteFiles(listToChan(f))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1290,18 +1304,18 @@ var _ pflag.Value = (*DeduplicateMode)(nil)
|
|||
func Deduplicate(f Fs, mode DeduplicateMode) error {
|
||||
Infof(f, "Looking for duplicates using %v mode.", mode)
|
||||
files := map[string][]Object{}
|
||||
list := NewLister().Start(f, "")
|
||||
for {
|
||||
o, err := list.GetObject()
|
||||
err := Walk(f, "", true, Config.MaxDepth, func(dirPath string, entries DirEntries, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Check if we are finished
|
||||
if o == nil {
|
||||
break
|
||||
}
|
||||
remote := o.Remote()
|
||||
files[remote] = append(files[remote], o)
|
||||
entries.ForObject(func(o Object) {
|
||||
remote := o.Remote()
|
||||
files[remote] = append(files[remote], o)
|
||||
})
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for remote, objs := range files {
|
||||
if len(objs) > 1 {
|
||||
|
@ -1334,33 +1348,30 @@ func Deduplicate(f Fs, mode DeduplicateMode) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// listToChan will transfer all incoming objects to a new channel.
|
||||
// listToChan will transfer all objects in the listing to the output
|
||||
//
|
||||
// If an error occurs, the error will be logged, and it will close the
|
||||
// channel.
|
||||
//
|
||||
// If the error was ErrorDirNotFound then it will be ignored
|
||||
func listToChan(list *Lister) ObjectsChan {
|
||||
func listToChan(f Fs) ObjectsChan {
|
||||
o := make(ObjectsChan, Config.Checkers)
|
||||
go func() {
|
||||
defer close(o)
|
||||
for {
|
||||
obj, dir, err := list.Get()
|
||||
_ = Walk(f, "", true, Config.MaxDepth, func(dirPath string, entries DirEntries, err error) error {
|
||||
if err != nil {
|
||||
if err != ErrorDirNotFound {
|
||||
Stats.Error()
|
||||
Errorf(nil, "Failed to list: %v", err)
|
||||
if err == ErrorDirNotFound {
|
||||
return nil
|
||||
}
|
||||
return
|
||||
Stats.Error()
|
||||
Errorf(nil, "Failed to list: %v", err)
|
||||
return nil
|
||||
}
|
||||
if dir == nil && obj == nil {
|
||||
return
|
||||
}
|
||||
if obj == nil {
|
||||
continue
|
||||
}
|
||||
o <- obj
|
||||
}
|
||||
entries.ForObject(func(obj Object) {
|
||||
o <- obj
|
||||
})
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
return o
|
||||
}
|
||||
|
@ -1451,41 +1462,44 @@ func Cat(f Fs, w io.Writer, offset, count int64) error {
|
|||
// Rmdirs removes any empty directories (or directories only
|
||||
// containing empty directories) under f, including f.
|
||||
func Rmdirs(f Fs, dir string) error {
|
||||
list := NewLister().Start(f, dir)
|
||||
dirEmpty := make(map[string]bool)
|
||||
dirEmpty[""] = true
|
||||
for {
|
||||
o, dir, err := list.Get()
|
||||
err := Walk(f, dir, true, Config.MaxDepth, func(dirPath string, entries DirEntries, err error) error {
|
||||
if err != nil {
|
||||
Stats.Error()
|
||||
Errorf(f, "Failed to list: %v", err)
|
||||
return err
|
||||
} else if dir != nil {
|
||||
// add a new directory as empty
|
||||
dir := dir.Name
|
||||
_, found := dirEmpty[dir]
|
||||
if !found {
|
||||
dirEmpty[dir] = true
|
||||
}
|
||||
} else if o != nil {
|
||||
// mark the parents of the file as being non-empty
|
||||
dir := o.Remote()
|
||||
for dir != "" {
|
||||
dir = path.Dir(dir)
|
||||
if dir == "." || dir == "/" {
|
||||
dir = ""
|
||||
}
|
||||
empty, found := dirEmpty[dir]
|
||||
// End if we reach a directory which is non-empty
|
||||
if found && !empty {
|
||||
break
|
||||
}
|
||||
dirEmpty[dir] = false
|
||||
}
|
||||
} else {
|
||||
// finished as dir == nil && o == nil
|
||||
break
|
||||
Errorf(f, "Failed to list %q: %v", dirPath, err)
|
||||
return nil
|
||||
}
|
||||
for _, entry := range entries {
|
||||
switch x := entry.(type) {
|
||||
case *Dir:
|
||||
// add a new directory as empty
|
||||
dir := x.Name
|
||||
_, found := dirEmpty[dir]
|
||||
if !found {
|
||||
dirEmpty[dir] = true
|
||||
}
|
||||
case Object:
|
||||
// mark the parents of the file as being non-empty
|
||||
dir := x.Remote()
|
||||
for dir != "" {
|
||||
dir = path.Dir(dir)
|
||||
if dir == "." || dir == "/" {
|
||||
dir = ""
|
||||
}
|
||||
empty, found := dirEmpty[dir]
|
||||
// End if we reach a directory which is non-empty
|
||||
if found && !empty {
|
||||
break
|
||||
}
|
||||
dirEmpty[dir] = false
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to rmdirs")
|
||||
}
|
||||
// Now delete the empty directories, starting from the longest path
|
||||
var toDelete []string
|
||||
|
|
|
@ -156,25 +156,26 @@ func NewRun(t *testing.T) *Run {
|
|||
*r = *oneRun
|
||||
r.cleanRemote = func() {
|
||||
var toDelete dirsToRemove
|
||||
list := fs.NewLister().Start(r.fremote, "")
|
||||
for {
|
||||
o, dir, err := list.Get()
|
||||
require.NoError(t, fs.Walk(r.fremote, "", true, -1, func(dirPath string, entries fs.DirEntries, err error) error {
|
||||
if err != nil {
|
||||
if err == fs.ErrorDirNotFound {
|
||||
break
|
||||
return nil
|
||||
}
|
||||
t.Fatalf("Error listing: %v", err)
|
||||
} else if o != nil {
|
||||
err = o.Remove()
|
||||
if err != nil {
|
||||
t.Errorf("Error removing file %q: %v", o.Remote(), err)
|
||||
}
|
||||
} else if dir != nil {
|
||||
toDelete = append(toDelete, dir.Remote())
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, entry := range entries {
|
||||
switch x := entry.(type) {
|
||||
case fs.Object:
|
||||
err = x.Remove()
|
||||
if err != nil {
|
||||
t.Errorf("Error removing file %q: %v", x.Remote(), err)
|
||||
}
|
||||
case *fs.Dir:
|
||||
toDelete = append(toDelete, x.Remote())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
sort.Sort(toDelete)
|
||||
for _, dir := range toDelete {
|
||||
err := r.fremote.Rmdir(dir)
|
||||
|
@ -666,25 +667,24 @@ func TestDeduplicateRename(t *testing.T) {
|
|||
err := fs.Deduplicate(r.fremote, fs.DeduplicateRename)
|
||||
require.NoError(t, err)
|
||||
|
||||
list := fs.NewLister().Start(r.fremote, "")
|
||||
for {
|
||||
o, err := list.GetObject()
|
||||
require.NoError(t, err)
|
||||
// Check if we are finished
|
||||
if o == nil {
|
||||
break
|
||||
require.NoError(t, fs.Walk(r.fremote, "", true, -1, func(dirPath string, entries fs.DirEntries, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
remote := o.Remote()
|
||||
if remote != "one-1.txt" &&
|
||||
remote != "one-2.txt" &&
|
||||
remote != "one-3.txt" {
|
||||
t.Errorf("Bad file name after rename %q", remote)
|
||||
}
|
||||
size := o.Size()
|
||||
if size != file1.Size && size != file2.Size && size != file3.Size {
|
||||
t.Errorf("Size not one of the object sizes %d", size)
|
||||
}
|
||||
}
|
||||
entries.ForObject(func(o fs.Object) {
|
||||
remote := o.Remote()
|
||||
if remote != "one-1.txt" &&
|
||||
remote != "one-2.txt" &&
|
||||
remote != "one-3.txt" {
|
||||
t.Errorf("Bad file name after rename %q", remote)
|
||||
}
|
||||
size := o.Size()
|
||||
if size != file1.Size && size != file2.Size && size != file3.Size {
|
||||
t.Errorf("Size not one of the object sizes %d", size)
|
||||
}
|
||||
})
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
func TestCat(t *testing.T) {
|
||||
|
|
|
@ -69,6 +69,7 @@ func newTest(remote string, subdir bool) *test {
|
|||
}
|
||||
if *verbose {
|
||||
t.cmdLine = append(t.cmdLine, "-test.v")
|
||||
fs.Config.LogLevel = fs.LogLevelDebug
|
||||
}
|
||||
if *runOnly != "" {
|
||||
t.cmdLine = append(t.cmdLine, "-test.run", *runOnly)
|
||||
|
@ -138,24 +139,21 @@ func (t *test) cleanFs() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dirs, err := fs.NewLister().SetLevel(1).Start(f, "").GetDirs()
|
||||
entries, err := fs.ListDirSorted(f, true, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, dir := range dirs {
|
||||
return entries.ForDirError(func(dir *fs.Dir) error {
|
||||
if fstest.MatchTestRemote.MatchString(dir.Name) {
|
||||
log.Printf("Purging %s%s", t.remote, dir.Name)
|
||||
dir, err := fs.NewFs(t.remote + dir.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = fs.Purge(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fs.Purge(dir)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// clean runs a single clean on a fs for left over directories
|
||||
|
|
159
fs/walk.go
Normal file
159
fs/walk.go
Normal file
|
@ -0,0 +1,159 @@
|
|||
// Walking directories
|
||||
|
||||
package fs
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// ErrorSkipDir is used as a return value from Walk to indicate that the
|
||||
// directory named in the call is to be skipped. It is not returned as
|
||||
// an error by any function.
|
||||
var ErrorSkipDir = errors.New("skip this directory")
|
||||
|
||||
// WalkFunc is the type of the function called for directory
|
||||
// visited by Walk. The path argument contains remote path to the directory.
|
||||
//
|
||||
// If there was a problem walking to directory named by path, the
|
||||
// incoming error will describe the problem and the function can
|
||||
// decide how to handle that error (and Walk will not descend into
|
||||
// that directory). If an error is returned, processing stops. The
|
||||
// sole exception is when the function returns the special value
|
||||
// ErrorSkipDir. If the function returns ErrorSkipDir, Walk skips the
|
||||
// directory's contents entirely.
|
||||
type WalkFunc func(path string, entries DirEntries, err error) error
|
||||
|
||||
// Walk lists the directory.
|
||||
//
|
||||
// If includeAll is not set it will use the filters defined.
|
||||
//
|
||||
// If maxLevel is < 0 then it will recurse indefinitely, else it will
|
||||
// only do maxLevel levels.
|
||||
//
|
||||
// It calls fn for each tranche of DirEntries read.
|
||||
//
|
||||
// Note that fn will not be called concurrently whereas the directory
|
||||
// listing will proceed concurrently.
|
||||
//
|
||||
// Parent directories are always listed before their children
|
||||
//
|
||||
// NB (f, path) to be replaced by fs.Dir at some point
|
||||
func Walk(f Fs, path string, includeAll bool, maxLevel int, fn WalkFunc) error {
|
||||
return walk(f, path, includeAll, maxLevel, fn, ListDirSorted)
|
||||
}
|
||||
|
||||
type listDirFunc func(fs Fs, includeAll bool, dir string) (entries DirEntries, err error)
|
||||
|
||||
func walk(f Fs, path string, includeAll bool, maxLevel int, fn WalkFunc, listDir listDirFunc) error {
|
||||
var (
|
||||
wg sync.WaitGroup // sync closing of go routines
|
||||
traversing sync.WaitGroup // running directory traversals
|
||||
doClose sync.Once // close the channel once
|
||||
mu sync.Mutex // stop fn being called concurrently
|
||||
)
|
||||
// listJob describe a directory listing that needs to be done
|
||||
type listJob struct {
|
||||
remote string
|
||||
depth int
|
||||
}
|
||||
|
||||
in := make(chan listJob, Config.Checkers)
|
||||
errs := make(chan error, 1)
|
||||
quit := make(chan struct{})
|
||||
closeQuit := func() {
|
||||
doClose.Do(func() {
|
||||
close(quit)
|
||||
go func() {
|
||||
for _ = range in {
|
||||
traversing.Done()
|
||||
}
|
||||
}()
|
||||
})
|
||||
}
|
||||
for i := 0; i < Config.Checkers; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case job, ok := <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
entries, err := listDir(f, includeAll, job.remote)
|
||||
mu.Lock()
|
||||
err = fn(job.remote, entries, err)
|
||||
mu.Unlock()
|
||||
if err != nil && err != ErrorSkipDir {
|
||||
traversing.Done()
|
||||
Stats.Error()
|
||||
Errorf(job.remote, "error listing: %v", err)
|
||||
closeQuit()
|
||||
// Send error to error channel if space
|
||||
select {
|
||||
case errs <- err:
|
||||
default:
|
||||
}
|
||||
continue
|
||||
}
|
||||
var jobs []listJob
|
||||
if job.depth != 0 && err == nil {
|
||||
entries.ForDir(func(dir *Dir) {
|
||||
// Recurse for the directory
|
||||
jobs = append(jobs, listJob{
|
||||
remote: dir.Remote(),
|
||||
depth: job.depth - 1,
|
||||
})
|
||||
})
|
||||
}
|
||||
if len(jobs) > 0 {
|
||||
traversing.Add(len(jobs))
|
||||
go func() {
|
||||
// Now we have traversed this directory, send these
|
||||
// jobs off for traversal in the background
|
||||
for _, newJob := range jobs {
|
||||
in <- newJob
|
||||
}
|
||||
}()
|
||||
}
|
||||
traversing.Done()
|
||||
case <-quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
// Start the process
|
||||
traversing.Add(1)
|
||||
in <- listJob{
|
||||
remote: path,
|
||||
depth: maxLevel - 1,
|
||||
}
|
||||
traversing.Wait()
|
||||
close(in)
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
// return the first error returned or nil
|
||||
return <-errs
|
||||
}
|
||||
|
||||
// WalkGetAll runs Walk getting all the results
|
||||
func WalkGetAll(f Fs, path string, includeAll bool, maxLevel int) (objs []Object, dirs []*Dir, err error) {
|
||||
err = Walk(f, path, includeAll, maxLevel, func(dirPath string, entries DirEntries, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, entry := range entries {
|
||||
switch x := entry.(type) {
|
||||
case Object:
|
||||
objs = append(objs, x)
|
||||
case *Dir:
|
||||
dirs = append(dirs, x)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
333
fs/walk_test.go
Normal file
333
fs/walk_test.go
Normal file
|
@ -0,0 +1,333 @@
|
|||
package fs
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type (
|
||||
listResult struct {
|
||||
entries DirEntries
|
||||
err error
|
||||
}
|
||||
|
||||
listResults map[string]listResult
|
||||
|
||||
errorMap map[string]error
|
||||
|
||||
listDirs struct {
|
||||
mu sync.Mutex
|
||||
t *testing.T
|
||||
fs Fs
|
||||
includeAll bool
|
||||
results listResults
|
||||
walkResults listResults
|
||||
walkErrors errorMap
|
||||
finalError error
|
||||
checkMaps bool
|
||||
maxLevel int
|
||||
}
|
||||
)
|
||||
|
||||
func newListDirs(t *testing.T, f Fs, includeAll bool, results listResults, walkErrors errorMap, finalError error) *listDirs {
|
||||
return &listDirs{
|
||||
t: t,
|
||||
fs: f,
|
||||
includeAll: includeAll,
|
||||
results: results,
|
||||
walkErrors: walkErrors,
|
||||
walkResults: listResults{},
|
||||
finalError: finalError,
|
||||
checkMaps: true,
|
||||
maxLevel: -1,
|
||||
}
|
||||
}
|
||||
|
||||
// NoCheckMaps marks the maps as to be ignored at the end
|
||||
func (ls *listDirs) NoCheckMaps() *listDirs {
|
||||
ls.checkMaps = false
|
||||
return ls
|
||||
}
|
||||
|
||||
// SetLevel(1) turns off recursion
|
||||
func (ls *listDirs) SetLevel(maxLevel int) *listDirs {
|
||||
ls.maxLevel = maxLevel
|
||||
return ls
|
||||
}
|
||||
|
||||
// ListDir returns the expected listing for the directory
|
||||
func (ls *listDirs) ListDir(f Fs, includeAll bool, dir string) (entries DirEntries, err error) {
|
||||
ls.mu.Lock()
|
||||
defer ls.mu.Unlock()
|
||||
assert.Equal(ls.t, ls.fs, f)
|
||||
assert.Equal(ls.t, ls.includeAll, includeAll)
|
||||
|
||||
// Fetch results for this path
|
||||
result, ok := ls.results[dir]
|
||||
if !ok {
|
||||
ls.t.Errorf("Unexpected list of %q", dir)
|
||||
return nil, errors.New("unexpected list")
|
||||
}
|
||||
delete(ls.results, dir)
|
||||
|
||||
// Put expected results for call of WalkFn
|
||||
ls.walkResults[dir] = result
|
||||
|
||||
return result.entries, result.err
|
||||
}
|
||||
|
||||
// IsFinished checks everything expected was used up
|
||||
func (ls *listDirs) IsFinished() {
|
||||
if ls.checkMaps {
|
||||
assert.Equal(ls.t, errorMap{}, ls.walkErrors)
|
||||
assert.Equal(ls.t, listResults{}, ls.results)
|
||||
assert.Equal(ls.t, listResults{}, ls.walkResults)
|
||||
}
|
||||
}
|
||||
|
||||
// WalkFn is called by the walk to test the expectations
|
||||
func (ls *listDirs) WalkFn(dir string, entries DirEntries, err error) error {
|
||||
ls.mu.Lock()
|
||||
defer ls.mu.Unlock()
|
||||
|
||||
// Fetch expected entries and err
|
||||
result, ok := ls.walkResults[dir]
|
||||
if !ok {
|
||||
ls.t.Errorf("Unexpected walk of %q (result not found)", dir)
|
||||
return errors.New("result not found")
|
||||
}
|
||||
delete(ls.walkResults, dir)
|
||||
|
||||
// Check arguments are as expected
|
||||
assert.Equal(ls.t, result.entries, entries)
|
||||
assert.Equal(ls.t, result.err, err)
|
||||
|
||||
// Fetch return value
|
||||
returnErr, ok := ls.walkErrors[dir]
|
||||
if !ok {
|
||||
ls.t.Errorf("Unexpected walk of %q (error not found)", dir)
|
||||
return errors.New("error not found")
|
||||
}
|
||||
delete(ls.walkErrors, dir)
|
||||
|
||||
return returnErr
|
||||
}
|
||||
|
||||
// Walk does the walk and tests the expectations
|
||||
func (ls *listDirs) Walk() {
|
||||
err := walk(nil, "", ls.includeAll, ls.maxLevel, ls.WalkFn, ls.ListDir)
|
||||
assert.Equal(ls.t, ls.finalError, err)
|
||||
ls.IsFinished()
|
||||
}
|
||||
|
||||
func newDir(name string) *Dir {
|
||||
return &Dir{Name: name}
|
||||
}
|
||||
|
||||
func TestWalkEmpty(t *testing.T) {
|
||||
newListDirs(t, nil, false,
|
||||
listResults{
|
||||
"": {entries: DirEntries{}, err: nil},
|
||||
},
|
||||
errorMap{
|
||||
"": nil,
|
||||
},
|
||||
nil,
|
||||
).Walk()
|
||||
}
|
||||
|
||||
func TestWalkEmptySkip(t *testing.T) {
|
||||
newListDirs(t, nil, true,
|
||||
listResults{
|
||||
"": {entries: DirEntries{}, err: nil},
|
||||
},
|
||||
errorMap{
|
||||
"": ErrorSkipDir,
|
||||
},
|
||||
nil,
|
||||
).Walk()
|
||||
}
|
||||
|
||||
func TestWalkNotFound(t *testing.T) {
|
||||
newListDirs(t, nil, true,
|
||||
listResults{
|
||||
"": {err: ErrorDirNotFound},
|
||||
},
|
||||
errorMap{
|
||||
"": ErrorDirNotFound,
|
||||
},
|
||||
ErrorDirNotFound,
|
||||
).Walk()
|
||||
}
|
||||
|
||||
func TestWalkNotFoundMaskError(t *testing.T) {
|
||||
newListDirs(t, nil, true,
|
||||
listResults{
|
||||
"": {err: ErrorDirNotFound},
|
||||
},
|
||||
errorMap{
|
||||
"": nil,
|
||||
},
|
||||
nil,
|
||||
).Walk()
|
||||
}
|
||||
|
||||
func TestWalkNotFoundSkipkError(t *testing.T) {
|
||||
newListDirs(t, nil, true,
|
||||
listResults{
|
||||
"": {err: ErrorDirNotFound},
|
||||
},
|
||||
errorMap{
|
||||
"": ErrorSkipDir,
|
||||
},
|
||||
nil,
|
||||
).Walk()
|
||||
}
|
||||
|
||||
func testWalkLevels(t *testing.T, maxLevel int) {
|
||||
da := newDir("a")
|
||||
db := newDir("a/b")
|
||||
dc := newDir("a/b/c")
|
||||
dd := newDir("a/b/c/d")
|
||||
newListDirs(t, nil, false,
|
||||
listResults{
|
||||
"": {entries: DirEntries{da}, err: nil},
|
||||
"a": {entries: DirEntries{db}, err: nil},
|
||||
"a/b": {entries: DirEntries{dc}, err: nil},
|
||||
"a/b/c": {entries: DirEntries{dd}, err: nil},
|
||||
"a/b/c/d": {entries: DirEntries{}, err: nil},
|
||||
},
|
||||
errorMap{
|
||||
"": nil,
|
||||
"a": nil,
|
||||
"a/b": nil,
|
||||
"a/b/c": nil,
|
||||
"a/b/c/d": nil,
|
||||
},
|
||||
nil,
|
||||
).SetLevel(maxLevel).Walk()
|
||||
}
|
||||
|
||||
func TestWalkLevels(t *testing.T) {
|
||||
testWalkLevels(t, -1)
|
||||
}
|
||||
|
||||
func TestWalkLevelsNoRecursive10(t *testing.T) {
|
||||
testWalkLevels(t, 10)
|
||||
}
|
||||
|
||||
func TestWalkLevelsNoRecursive(t *testing.T) {
|
||||
da := newDir("a")
|
||||
newListDirs(t, nil, false,
|
||||
listResults{
|
||||
"": {entries: DirEntries{da}, err: nil},
|
||||
},
|
||||
errorMap{
|
||||
"": nil,
|
||||
},
|
||||
nil,
|
||||
).SetLevel(1).Walk()
|
||||
}
|
||||
|
||||
func TestWalkLevels2(t *testing.T) {
|
||||
da := newDir("a")
|
||||
db := newDir("a/b")
|
||||
newListDirs(t, nil, false,
|
||||
listResults{
|
||||
"": {entries: DirEntries{da}, err: nil},
|
||||
"a": {entries: DirEntries{db}, err: nil},
|
||||
},
|
||||
errorMap{
|
||||
"": nil,
|
||||
"a": nil,
|
||||
},
|
||||
nil,
|
||||
).SetLevel(2).Walk()
|
||||
}
|
||||
|
||||
func TestWalkSkip(t *testing.T) {
|
||||
da := newDir("a")
|
||||
db := newDir("a/b")
|
||||
dc := newDir("a/b/c")
|
||||
newListDirs(t, nil, false,
|
||||
listResults{
|
||||
"": {entries: DirEntries{da}, err: nil},
|
||||
"a": {entries: DirEntries{db}, err: nil},
|
||||
"a/b": {entries: DirEntries{dc}, err: nil},
|
||||
},
|
||||
errorMap{
|
||||
"": nil,
|
||||
"a": nil,
|
||||
"a/b": ErrorSkipDir,
|
||||
},
|
||||
nil,
|
||||
).Walk()
|
||||
}
|
||||
|
||||
func TestWalkErrors(t *testing.T) {
|
||||
lr := listResults{}
|
||||
em := errorMap{}
|
||||
de := make(DirEntries, 10)
|
||||
for i := range de {
|
||||
path := string('0' + i)
|
||||
de[i] = newDir(path)
|
||||
lr[path] = listResult{entries: nil, err: ErrorDirNotFound}
|
||||
em[path] = ErrorDirNotFound
|
||||
}
|
||||
lr[""] = listResult{entries: de, err: nil}
|
||||
em[""] = nil
|
||||
newListDirs(t, nil, true,
|
||||
lr,
|
||||
em,
|
||||
ErrorDirNotFound,
|
||||
).NoCheckMaps().Walk()
|
||||
}
|
||||
|
||||
var errorBoom = errors.New("boom")
|
||||
|
||||
func makeTree(level int, terminalErrors bool) (listResults, errorMap) {
|
||||
lr := listResults{}
|
||||
em := errorMap{}
|
||||
var fill func(path string, level int)
|
||||
fill = func(path string, level int) {
|
||||
de := DirEntries{}
|
||||
if level > 0 {
|
||||
for _, a := range "0123456789" {
|
||||
subPath := string(a)
|
||||
if path != "" {
|
||||
subPath = path + "/" + subPath
|
||||
}
|
||||
de = append(de, newDir(subPath))
|
||||
fill(subPath, level-1)
|
||||
}
|
||||
}
|
||||
lr[path] = listResult{entries: de, err: nil}
|
||||
em[path] = nil
|
||||
if level == 0 && terminalErrors {
|
||||
em[path] = errorBoom
|
||||
}
|
||||
}
|
||||
fill("", level)
|
||||
return lr, em
|
||||
}
|
||||
|
||||
func TestWalkMulti(t *testing.T) {
|
||||
lr, em := makeTree(3, false)
|
||||
newListDirs(t, nil, true,
|
||||
lr,
|
||||
em,
|
||||
nil,
|
||||
).Walk()
|
||||
}
|
||||
|
||||
func TestWalkMultiErrors(t *testing.T) {
|
||||
lr, em := makeTree(3, true)
|
||||
newListDirs(t, nil, true,
|
||||
lr,
|
||||
em,
|
||||
errorBoom,
|
||||
).NoCheckMaps().Walk()
|
||||
}
|
|
@ -192,7 +192,7 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, expectedDirs
|
|||
gotListing := "<unset>"
|
||||
listingOK := false
|
||||
for i := 1; i <= retries; i++ {
|
||||
objs, dirs, err = fs.NewLister().Start(f, "").GetAll()
|
||||
objs, dirs, err = fs.WalkGetAll(f, "", true, -1)
|
||||
if err != nil && err != fs.ErrorDirNotFound {
|
||||
t.Fatalf("Error listing: %v", err)
|
||||
}
|
||||
|
|
|
@ -181,7 +181,7 @@ func objsToNames(objs []fs.Object) []string {
|
|||
// TestFsListDirEmpty tests listing the directories from an empty directory
|
||||
func TestFsListDirEmpty(t *testing.T) {
|
||||
skipIfNotOk(t)
|
||||
objs, dirs, err := fs.NewLister().SetLevel(1).Start(remote, "").GetAll()
|
||||
objs, dirs, err := fs.WalkGetAll(remote, "", true, 1)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{}, objsToNames(objs))
|
||||
assert.Equal(t, []string{}, dirsToNames(dirs))
|
||||
|
@ -303,9 +303,9 @@ func TestFsListDirFile2(t *testing.T) {
|
|||
list := func(dir string, expectedDirNames, expectedObjNames []string) {
|
||||
var objNames, dirNames []string
|
||||
for i := 1; i <= *fstest.ListRetries; i++ {
|
||||
objs, dirs, err := fs.NewLister().SetLevel(1).Start(remote, dir).GetAll()
|
||||
objs, dirs, err := fs.WalkGetAll(remote, dir, false, 1)
|
||||
if err == fs.ErrorDirNotFound {
|
||||
objs, dirs, err = fs.NewLister().SetLevel(1).Start(remote, winPath(dir)).GetAll()
|
||||
objs, dirs, err = fs.WalkGetAll(remote, winPath(dir), false, 1)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
objNames = objsToNames(objs)
|
||||
|
@ -345,7 +345,7 @@ func TestFsListDirRoot(t *testing.T) {
|
|||
skipIfNotOk(t)
|
||||
rootRemote, err := fs.NewFs(RemoteName)
|
||||
require.NoError(t, err)
|
||||
dirs, err := fs.NewLister().SetLevel(1).Start(rootRemote, "").GetDirs()
|
||||
_, dirs, err := fs.WalkGetAll(rootRemote, "", true, 1)
|
||||
require.NoError(t, err)
|
||||
assert.Contains(t, dirsToNames(dirs), subRemoteLeaf, "Remote leaf not found")
|
||||
}
|
||||
|
@ -360,7 +360,7 @@ func TestFsListSubdir(t *testing.T) {
|
|||
for i := 0; i < 2; i++ {
|
||||
dir, _ := path.Split(fileName)
|
||||
dir = dir[:len(dir)-1]
|
||||
objs, dirs, err = fs.NewLister().Start(remote, dir).GetAll()
|
||||
objs, dirs, err = fs.WalkGetAll(remote, dir, true, -1)
|
||||
if err != fs.ErrorDirNotFound {
|
||||
break
|
||||
}
|
||||
|
@ -375,7 +375,7 @@ func TestFsListSubdir(t *testing.T) {
|
|||
// TestFsListLevel2 tests List works for 2 levels
|
||||
func TestFsListLevel2(t *testing.T) {
|
||||
skipIfNotOk(t)
|
||||
objs, dirs, err := fs.NewLister().SetLevel(2).Start(remote, "").GetAll()
|
||||
objs, dirs, err := fs.WalkGetAll(remote, "", true, 2)
|
||||
if err == fs.ErrorLevelNotSupported {
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue