diff --git a/amazonclouddrive/amazonclouddrive.go b/amazonclouddrive/amazonclouddrive.go index 80b2643bf..f331e17b5 100644 --- a/amazonclouddrive/amazonclouddrive.go +++ b/amazonclouddrive/amazonclouddrive.go @@ -19,7 +19,6 @@ import ( "net/http" "regexp" "strings" - "sync" "time" "github.com/ncw/go-acd" @@ -318,9 +317,7 @@ OUTER: return shouldRetry(resp, err) }) if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't list files: %v", err) - break + return false, err } if nodes == nil { break @@ -341,177 +338,45 @@ OUTER: return } -// Path should be directory path either "" or "path/" -// -// List the directory using a recursive list from the root -// -// This fetches the minimum amount of stuff but does more API calls -// which makes it slow -func (f *Fs) listDirRecursive(dirID string, path string, out fs.ObjectsChan) error { - var subError error - // Make the API request - var wg sync.WaitGroup - _, err := f.listAll(dirID, "", false, false, func(node *acd.Node) bool { - // Recurse on directories +// ListDir reads the directory specified by the job into out, returning any more jobs +func (f *Fs) ListDir(out fs.ListOpts, job dircache.ListDirJob) (jobs []dircache.ListDirJob, err error) { + fs.Debug(f, "Reading %q", job.Path) + _, err = f.listAll(job.DirID, "", false, false, func(node *acd.Node) bool { + remote := job.Path + *node.Name switch *node.Kind { case folderKind: - wg.Add(1) - folder := path + *node.Name + "/" - fs.Debug(f, "Reading %s", folder) - go func() { - defer wg.Done() - err := f.listDirRecursive(*node.Id, folder, out) - if err != nil { - subError = err - fs.ErrorLog(f, "Error reading %s:%s", folder, err) + if out.IncludeDirectory(remote) { + dir := &fs.Dir{ + Name: remote, + Bytes: -1, + Count: -1, } - }() - return false + dir.When, _ = time.Parse(timeFormat, *node.ModifiedDate) // FIXME + if out.AddDir(dir) { + return true + } + if job.Depth > 0 { + jobs = append(jobs, dircache.ListDirJob{DirID: *node.Id, Path: remote + "/", Depth: job.Depth - 1}) + } + } case fileKind: - if fs := f.newFsObjectWithInfo(path+*node.Name, node); fs != nil { - out <- fs + if o := f.newFsObjectWithInfo(remote, node); o != nil { + if out.Add(o) { + return true + } } default: // ignore ASSET etc } return false }) - wg.Wait() - fs.Debug(f, "Finished reading %s", path) - if err != nil { - return err - } - if subError != nil { - return subError - } - return nil + fs.Debug(f, "Finished reading %q", job.Path) + return jobs, err } -// Path should be directory path either "" or "path/" -// -// List the directory using a recursive list from the root -// -// This fetches the minimum amount of stuff but does more API calls -// which makes it slow -func (f *Fs) listDirNonRecursive(dirID string, path string, out fs.ObjectsChan) error { - // Start some directory listing go routines - var wg sync.WaitGroup // sync closing of go routines - var traversing sync.WaitGroup // running directory traversals - type dirListJob struct { - dirID string - path string - } - in := make(chan dirListJob, fs.Config.Checkers) - errs := make(chan error, fs.Config.Checkers) - for i := 0; i < fs.Config.Checkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for job := range in { - var jobs []dirListJob - fs.Debug(f, "Reading %q", job.path) - // Make the API request - _, err := f.listAll(job.dirID, "", false, false, func(node *acd.Node) bool { - // Recurse on directories - switch *node.Kind { - case folderKind: - jobs = append(jobs, dirListJob{dirID: *node.Id, path: job.path + *node.Name + "/"}) - case fileKind: - if fs := f.newFsObjectWithInfo(job.path+*node.Name, node); fs != nil { - out <- fs - } - default: - // ignore ASSET etc - } - return false - }) - fs.Debug(f, "Finished reading %q", job.path) - if err != nil { - fs.ErrorLog(f, "Error reading %s: %s", path, err) - errs <- err - } - // FIXME stop traversal on error? - traversing.Add(len(jobs)) - go func() { - // Now we have traversed this directory, send these jobs off for traversal in - // the background - for _, job := range jobs { - in <- job - } - }() - traversing.Done() - } - }() - } - - // Collect the errors - wg.Add(1) - var errResult error - go func() { - defer wg.Done() - for err := range errs { - errResult = err - } - }() - - // Start the process - traversing.Add(1) - in <- dirListJob{dirID: dirID, path: path} - traversing.Wait() - close(in) - close(errs) - wg.Wait() - - return errResult -} - -// List walks the path returning a channel of FsObjects -func (f *Fs) List() fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) - go func() { - defer close(out) - err := f.dirCache.FindRoot(false) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't find root: %s", err) - } else { - err = f.listDirNonRecursive(f.dirCache.RootID(), "", out) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "List failed: %s", err) - } - } - }() - return out -} - -// ListDir lists the directories -func (f *Fs) ListDir() fs.DirChan { - out := make(fs.DirChan, fs.Config.Checkers) - go func() { - defer close(out) - err := f.dirCache.FindRoot(false) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't find root: %s", err) - } else { - _, err := f.listAll(f.dirCache.RootID(), "", true, false, func(item *acd.Node) bool { - dir := &fs.Dir{ - Name: *item.Name, - Bytes: -1, - Count: -1, - } - dir.When, _ = time.Parse(timeFormat, *item.ModifiedDate) - out <- dir - return false - }) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "ListDir failed: %s", err) - } - } - }() - return out +// List walks the path returning iles and directories into out +func (f *Fs) List(out fs.ListOpts) { + f.dirCache.List(f, out) } // Put the object into the container diff --git a/b2/b2.go b/b2/b2.go index 13fb296e9..e5c56f1ba 100644 --- a/b2/b2.go +++ b/b2/b2.go @@ -330,7 +330,7 @@ func (f *Fs) NewFsObject(remote string) fs.Object { } // listFn is called from list to handle an object -type listFn func(string, *api.File) error +type listFn func(remote string, object *api.File, isDirectory bool) error // errEndList is a sentinel used to end the list iteration now. // listFn should return it to end the iteration with no errors. @@ -339,6 +339,8 @@ var errEndList = errors.New("end list") // list lists the objects into the function supplied from // the bucket and root supplied // +// level is the depth to search to +// // If prefix is set then startFileName is used as a prefix which all // files must have // @@ -346,7 +348,7 @@ var errEndList = errors.New("end list") // than 1000) // // If hidden is set then it will list the hidden (deleted) files too. -func (f *Fs) list(prefix string, limit int, hidden bool, fn listFn) error { +func (f *Fs) list(level int, prefix string, limit int, hidden bool, fn listFn) error { bucketID, err := f.getBucketID() if err != nil { return err @@ -371,6 +373,7 @@ func (f *Fs) list(prefix string, limit int, hidden bool, fn listFn) error { if hidden { opts.Path = "/b2_list_file_versions" } + lastDir := "" for { err := f.pacer.Call(func() (bool, error) { resp, err := f.srv.CallJSON(&opts, &request, &response) @@ -385,12 +388,34 @@ func (f *Fs) list(prefix string, limit int, hidden bool, fn listFn) error { if !strings.HasPrefix(file.Name, prefix) { return nil } - err = fn(file.Name[len(f.root):], file) - if err != nil { - if err == errEndList { - return nil + remote := file.Name[len(f.root):] + slashes := strings.Count(remote, "/") + + // Check if this file makes a new directory + if slash := strings.IndexRune(remote, '/'); slash >= 0 { + if dir := remote[:slash]; dir != lastDir { + if slashes-1 < fs.MaxLevel { + err = fn(dir, nil, true) + if err != nil { + if err == errEndList { + return nil + } + return err + } + } + lastDir = dir + } + } + + // Send the file + if slashes < fs.MaxLevel { + err = fn(remote, file, false) + if err != nil { + if err == errEndList { + return nil + } + return err } - return err } } // end if no NextFileName @@ -405,38 +430,68 @@ func (f *Fs) list(prefix string, limit int, hidden bool, fn listFn) error { return nil } -// List walks the path returning a channel of FsObjects -func (f *Fs) List() fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) - if f.bucket == "" { - // Return no objects at top level list - close(out) - fs.Stats.Error() - fs.ErrorLog(f, "Can't list objects at root - choose a bucket using lsd") - } else { - // List the objects - go func() { - defer close(out) - err := f.list("", 0, false, func(remote string, object *api.File) error { - if o := f.newFsObjectWithInfo(remote, object); o != nil { - out <- o - } - return nil - }) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't list bucket %q: %s", f.bucket, err) +// listFiles walks the path returning files and directories to out +func (f *Fs) listFiles(out fs.ListOpts) { + defer out.Finished() + // List the objects + err := f.list(out.Level(), "", 0, false, func(remote string, object *api.File, isDirectory bool) error { + if isDirectory { + dir := &fs.Dir{ + Name: remote, + Bytes: -1, + Count: -1, } - }() + if out.AddDir(dir) { + return fs.ErrorListAborted + } + } else { + if o := f.newFsObjectWithInfo(remote, object); o != nil { + if out.Add(o) { + return fs.ErrorListAborted + } + } + } + return nil + }) + if err != nil { + out.SetError(err) } - return out } -// listBucketFn is called from listBuckets to handle a bucket -type listBucketFn func(*api.Bucket) +// listBuckets returns all the buckets to out +func (f *Fs) listBuckets(out fs.ListOpts) { + defer out.Finished() + err := f.listBucketsToFn(func(bucket *api.Bucket) error { + dir := &fs.Dir{ + Name: bucket.Name, + Bytes: -1, + Count: -1, + } + if out.AddDir(dir) { + return fs.ErrorListAborted + } + return nil + }) + if err != nil { + out.SetError(err) + } +} -// listBuckets lists the buckets to the function supplied -func (f *Fs) listBuckets(fn listBucketFn) error { +// List walks the path returning files and directories to out +func (f *Fs) List(out fs.ListOpts) { + if f.bucket == "" { + f.listBuckets(out) + } else { + f.listFiles(out) + } + return +} + +// listBucketFn is called from listBucketsToFn to handle a bucket +type listBucketFn func(*api.Bucket) error + +// listBucketsToFn lists the buckets to the function supplied +func (f *Fs) listBucketsToFn(fn listBucketFn) error { var account = api.Account{ID: f.info.AccountID} var response api.ListBucketsResponse opts := rest.Opts{ @@ -451,7 +506,10 @@ func (f *Fs) listBuckets(fn listBucketFn) error { return err } for i := range response.Buckets { - fn(&response.Buckets[i]) + err = fn(&response.Buckets[i]) + if err != nil { + return err + } } return nil } @@ -463,13 +521,15 @@ func (f *Fs) getBucketID() (bucketID string, err error) { if f._bucketID != "" { return f._bucketID, nil } - err = f.listBuckets(func(bucket *api.Bucket) { + err = f.listBucketsToFn(func(bucket *api.Bucket) error { if bucket.Name == f.bucket { bucketID = bucket.ID } + return nil + }) if bucketID == "" { - err = fmt.Errorf("Couldn't find bucket %q", f.bucket) + err = fs.ErrorDirNotFound //fmt.Errorf("Couldn't find bucket %q", f.bucket) } f._bucketID = bucketID return bucketID, err @@ -489,56 +549,6 @@ func (f *Fs) clearBucketID() { f.bucketIDMutex.Unlock() } -// ListDir lists the buckets -func (f *Fs) ListDir() fs.DirChan { - out := make(fs.DirChan, fs.Config.Checkers) - if f.bucket == "" { - // List the buckets - go func() { - defer close(out) - err := f.listBuckets(func(bucket *api.Bucket) { - out <- &fs.Dir{ - Name: bucket.Name, - Bytes: -1, - Count: -1, - } - }) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Error listing buckets: %v", err) - } - }() - } else { - // List the directories in the path in the bucket - go func() { - defer close(out) - lastDir := "" - err := f.list("", 0, false, func(remote string, object *api.File) error { - slash := strings.IndexRune(remote, '/') - if slash < 0 { - return nil - } - dir := remote[:slash] - if dir == lastDir { - return nil - } - out <- &fs.Dir{ - Name: dir, - Bytes: -1, - Count: -1, - } - lastDir = dir - return nil - }) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't list bucket %q: %s", f.bucket, err) - } - }() - } - return out -} - // Put the object into the bucket // // Copy the reader in to the new object which is returned @@ -651,8 +661,6 @@ func (f *Fs) Purge() error { } checkErrMutex.Lock() defer checkErrMutex.Unlock() - fs.Stats.Error() - fs.ErrorLog(f, "Purge error: %v", err) if errReturn == nil { errReturn = err } @@ -670,9 +678,11 @@ func (f *Fs) Purge() error { } }() } - checkErr(f.list("", 0, true, func(remote string, object *api.File) error { - fs.Debug(remote, "Deleting (id %q)", object.ID) - toBeDeleted <- object + checkErr(f.list(fs.MaxLevel, "", 0, true, func(remote string, object *api.File, isDirectory bool) error { + if !isDirectory { + fs.Debug(remote, "Deleting (id %q)", object.ID) + toBeDeleted <- object + } return nil })) close(toBeDeleted) @@ -755,7 +765,10 @@ func (o *Object) readMetaData() (err error) { return nil } var info *api.File - err = o.fs.list(o.remote, 1, false, func(remote string, object *api.File) error { + err = o.fs.list(fs.MaxLevel, o.remote, 1, false, func(remote string, object *api.File, isDirectory bool) error { + if isDirectory { + return nil + } if remote == o.remote { info = object } diff --git a/dircache/list.go b/dircache/list.go new file mode 100644 index 000000000..4233abea0 --- /dev/null +++ b/dircache/list.go @@ -0,0 +1,74 @@ +// Listing utility functions for fses which use dircache + +package dircache + +import ( + "sync" + + "github.com/ncw/rclone/fs" +) + +// ListDirJob describe a directory listing that needs to be done +type ListDirJob struct { + DirID string + Path string + Depth int +} + +// ListDirer describes the interface necessary to use ListDir +type ListDirer interface { + // ListDir reads the directory specified by the job into out, returning any more jobs + ListDir(out fs.ListOpts, job ListDirJob) (jobs []ListDirJob, err error) +} + +// listDir lists the directory using a recursive list from the root +// +// It does this in parallel, calling f.ListDir to do the actual reading +func listDir(f ListDirer, out fs.ListOpts, dirID string, path string) { + // Start some directory listing go routines + var wg sync.WaitGroup // sync closing of go routines + var traversing sync.WaitGroup // running directory traversals + buffer := out.Buffer() + in := make(chan ListDirJob, buffer) + for i := 0; i < buffer; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for job := range in { + jobs, err := f.ListDir(out, job) + if err != nil { + out.SetError(err) + fs.Debug(f, "Error reading %s: %s", path, err) + } else { + traversing.Add(len(jobs)) + go func() { + // Now we have traversed this directory, send these + // jobs off for traversal in the background + for _, job := range jobs { + in <- job + } + }() + } + traversing.Done() + } + }() + } + + // Start the process + traversing.Add(1) + in <- ListDirJob{DirID: dirID, Path: path, Depth: out.Level() - 1} + traversing.Wait() + close(in) + wg.Wait() +} + +// List walks the path returning iles and directories into out +func (dc *DirCache) List(f ListDirer, out fs.ListOpts) { + defer out.Finished() + err := dc.FindRoot(false) + if err != nil { + out.SetError(fs.ErrorDirNotFound) + } else { + listDir(f, out, dc.RootID(), "") + } +} diff --git a/drive/drive.go b/drive/drive.go index b318442e3..f99637803 100644 --- a/drive/drive.go +++ b/drive/drive.go @@ -13,7 +13,6 @@ import ( "log" "net/http" "strings" - "sync" "time" "golang.org/x/oauth2" @@ -452,117 +451,64 @@ func (f *Fs) findExportFormat(filepath string, item *drive.File) (extension, lin return "", "" } -// Path should be directory path either "" or "path/" -// -// List the directory using a recursive list from the root -// -// This fetches the minimum amount of stuff but does more API calls -// which makes it slow -func (f *Fs) listDirRecursive(dirID string, path string, out fs.ObjectsChan) error { - var subError error - // Make the API request - var wg sync.WaitGroup - _, err := f.listAll(dirID, "", false, false, func(item *drive.File) bool { - filepath := path + item.Title +// ListDir reads the directory specified by the job into out, returning any more jobs +func (f *Fs) ListDir(out fs.ListOpts, job dircache.ListDirJob) (jobs []dircache.ListDirJob, err error) { + fs.Debug(f, "Reading %q", job.Path) + _, err = f.listAll(job.DirID, "", false, false, func(item *drive.File) bool { + remote := job.Path + item.Title switch { case *driveAuthOwnerOnly && !isAuthOwned(item): // ignore object or directory case item.MimeType == driveFolderType: - // Recurse on directories - wg.Add(1) - folder := filepath + "/" - fs.Debug(f, "Reading %s", folder) - go func() { - defer wg.Done() - err := f.listDirRecursive(item.Id, folder, out) - if err != nil { - subError = err - fs.ErrorLog(f, "Error reading %s:%s", folder, err) - } - - }() - case item.Md5Checksum != "": - // If item has MD5 sum it is a file stored on drive - if o := f.newFsObjectWithInfo(filepath, item); o != nil { - out <- o - } - case len(item.ExportLinks) != 0: - // If item has export links then it is a google doc - extension, link := f.findExportFormat(filepath, item) - if extension == "" { - fs.Debug(filepath, "No export formats found") - } else { - if o := f.newFsObjectWithInfo(filepath+"."+extension, item); o != nil { - obj := o.(*Object) - obj.isDocument = true - obj.url = link - obj.bytes = -1 - out <- o - } - } - default: - fs.Debug(filepath, "Ignoring unknown object") - } - return false - }) - wg.Wait() - fs.Debug(f, "Finished reading %s", path) - if err != nil { - return err - } - if subError != nil { - return subError - } - return nil -} - -// List walks the path returning a channel of FsObjects -func (f *Fs) List() fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) - go func() { - defer close(out) - err := f.dirCache.FindRoot(false) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't find root: %s", err) - } else { - err = f.listDirRecursive(f.dirCache.RootID(), "", out) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "List failed: %s", err) - } - } - }() - return out -} - -// ListDir walks the path returning a channel of directories -func (f *Fs) ListDir() fs.DirChan { - out := make(fs.DirChan, fs.Config.Checkers) - go func() { - defer close(out) - err := f.dirCache.FindRoot(false) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't find root: %s", err) - } else { - _, err := f.listAll(f.dirCache.RootID(), "", true, false, func(item *drive.File) bool { + if out.IncludeDirectory(remote) { dir := &fs.Dir{ Name: item.Title, Bytes: -1, Count: -1, } dir.When, _ = time.Parse(timeFormatIn, item.ModifiedDate) - out <- dir - return false - }) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "ListDir failed: %s", err) + if out.AddDir(dir) { + return true + } + if job.Depth > 0 { + jobs = append(jobs, dircache.ListDirJob{DirID: item.Id, Path: remote + "/", Depth: job.Depth - 1}) + } } + case item.Md5Checksum != "": + // If item has MD5 sum it is a file stored on drive + if o := f.newFsObjectWithInfo(remote, item); o != nil { + if out.Add(o) { + return true + } + } + case len(item.ExportLinks) != 0: + // If item has export links then it is a google doc + extension, link := f.findExportFormat(remote, item) + if extension == "" { + fs.Debug(remote, "No export formats found") + } else { + if o := f.newFsObjectWithInfo(remote+"."+extension, item); o != nil { + obj := o.(*Object) + obj.isDocument = true + obj.url = link + obj.bytes = -1 + if out.Add(o) { + return true + } + } + } + default: + fs.Debug(remote, "Ignoring unknown object") } - }() - return out + return false + }) + fs.Debug(f, "Finished reading %q", job.Path) + return jobs, err +} + +// List walks the path returning files and directories to out +func (f *Fs) List(out fs.ListOpts) { + f.dirCache.List(f, out) } // Creates a drive.File info from the parameters passed in and a half diff --git a/dropbox/dropbox.go b/dropbox/dropbox.go index 0c48e7de0..42afb7352 100644 --- a/dropbox/dropbox.go +++ b/dropbox/dropbox.go @@ -226,21 +226,16 @@ func (f *Fs) NewFsObject(remote string) fs.Object { } // Strips the root off path and returns it -func (f *Fs) stripRoot(path string) *string { +func (f *Fs) stripRoot(path string) (string, error) { lowercase := strings.ToLower(path) - if !strings.HasPrefix(lowercase, f.slashRootSlash) { - fs.Stats.Error() - fs.ErrorLog(f, "Path '%s' is not under root '%s'", path, f.slashRootSlash) - return nil + return "", fmt.Errorf("Path %q is not under root %q", path, f.slashRootSlash) } - - stripped := path[len(f.slashRootSlash):] - return &stripped + return path[len(f.slashRootSlash):], nil } // Walk the root returning a channel of FsObjects -func (f *Fs) list(out fs.ObjectsChan) { +func (f *Fs) list(out fs.ListOpts) { // Track path component case, it could be different for entries coming from DropBox API // See https://www.dropboxforum.com/hc/communities/public/questions/201665409-Wrong-character-case-of-folder-name-when-calling-listFolder-using-Sync-API?locale=en-us // and https://github.com/ncw/rclone/issues/53 @@ -283,16 +278,36 @@ func (f *Fs) list(out fs.ObjectsChan) { if entry.IsDir { nameTree.PutCaseCorrectDirectoryName(parentPath, lastComponent) + name, err := f.stripRoot(entry.Path + "/") + if err != nil { + out.SetError(err) + return + } + name = strings.Trim(name, "/") + if name != "" { + dir := &fs.Dir{ + Name: name, + When: time.Time(entry.ClientMtime), + Bytes: entry.Bytes, + Count: -1, + } + if out.AddDir(dir) { + return + } + } } else { parentPathCorrectCase := nameTree.GetPathWithCorrectCase(parentPath) if parentPathCorrectCase != nil { - path := f.stripRoot(*parentPathCorrectCase + "/" + lastComponent) - if path == nil { - // an error occurred and logged by stripRoot - continue + path, err := f.stripRoot(*parentPathCorrectCase + "/" + lastComponent) + if err != nil { + out.SetError(err) + return + } + if o := f.newFsObjectWithInfo(path, entry); o != nil { + if out.Add(o) { + return + } } - - out <- f.newFsObjectWithInfo(*path, entry) } else { nameTree.PutFile(parentPath, lastComponent, entry) } @@ -306,26 +321,28 @@ func (f *Fs) list(out fs.ObjectsChan) { } } - walkFunc := func(caseCorrectFilePath string, entry *dropbox.Entry) { - path := f.stripRoot("/" + caseCorrectFilePath) - if path == nil { - // an error occurred and logged by stripRoot - return + walkFunc := func(caseCorrectFilePath string, entry *dropbox.Entry) error { + path, err := f.stripRoot("/" + caseCorrectFilePath) + if err != nil { + return err } - - out <- f.newFsObjectWithInfo(*path, entry) + if o := f.newFsObjectWithInfo(path, entry); o != nil { + if out.Add(o) { + return fs.ErrorListAborted + } + } + return nil + } + err := nameTree.WalkFiles(f.root, walkFunc) + if err != nil { + out.SetError(err) } - nameTree.WalkFiles(f.root, walkFunc) } // List walks the path returning a channel of FsObjects -func (f *Fs) List() fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) - go func() { - defer close(out) - f.list(out) - }() - return out +func (f *Fs) List(out fs.ListOpts) { + defer out.Finished() + f.list(out) } // ListDir walks the path returning a channel of FsObjects @@ -341,14 +358,13 @@ func (f *Fs) ListDir() fs.DirChan { for i := range entry.Contents { entry := &entry.Contents[i] if entry.IsDir { - name := f.stripRoot(entry.Path) - if name == nil { - // an error occurred and logged by stripRoot + name, err := f.stripRoot(entry.Path) + if err != nil { continue } out <- &fs.Dir{ - Name: *name, + Name: name, When: time.Time(entry.ClientMtime), Bytes: entry.Bytes, Count: -1, diff --git a/dropbox/nametree.go b/dropbox/nametree.go index 00749fa74..c3e096bf0 100644 --- a/dropbox/nametree.go +++ b/dropbox/nametree.go @@ -143,9 +143,9 @@ func (tree *nameTreeNode) GetPathWithCorrectCase(path string) *string { return &resultString } -type nameTreeFileWalkFunc func(caseCorrectFilePath string, entry *dropbox.Entry) +type nameTreeFileWalkFunc func(caseCorrectFilePath string, entry *dropbox.Entry) error -func (tree *nameTreeNode) walkFilesRec(currentPath string, walkFunc nameTreeFileWalkFunc) { +func (tree *nameTreeNode) walkFilesRec(currentPath string, walkFunc nameTreeFileWalkFunc) error { var prefix string if currentPath == "" { prefix = "" @@ -154,7 +154,10 @@ func (tree *nameTreeNode) walkFilesRec(currentPath string, walkFunc nameTreeFile } for name, entry := range tree.Files { - walkFunc(prefix+name, entry) + err := walkFunc(prefix+name, entry) + if err != nil { + return err + } } for lowerCaseName, directory := range tree.Directories { @@ -165,15 +168,20 @@ func (tree *nameTreeNode) walkFilesRec(currentPath string, walkFunc nameTreeFile continue } - directory.walkFilesRec(prefix+caseCorrectName, walkFunc) + err := directory.walkFilesRec(prefix+caseCorrectName, walkFunc) + if err != nil { + return err + } } + + return nil } -func (tree *nameTreeNode) WalkFiles(rootPath string, walkFunc nameTreeFileWalkFunc) { +func (tree *nameTreeNode) WalkFiles(rootPath string, walkFunc nameTreeFileWalkFunc) error { node := tree.getTreeNode(rootPath) if node == nil { - return + return nil } - node.walkFilesRec(rootPath, walkFunc) + return node.walkFilesRec(rootPath, walkFunc) } diff --git a/dropbox/nametree_test.go b/dropbox/nametree_test.go index 0f8ccfbc5..a30d99b6b 100644 --- a/dropbox/nametree_test.go +++ b/dropbox/nametree_test.go @@ -77,15 +77,15 @@ func TestPutAndWalk(t *testing.T) { tree.PutCaseCorrectDirectoryName("", "A") numCalled := 0 - walkFunc := func(caseCorrectFilePath string, entry *dropboxapi.Entry) { + walkFunc := func(caseCorrectFilePath string, entry *dropboxapi.Entry) error { assert(t, caseCorrectFilePath == "A/F", "caseCorrectFilePath should be A/F, not "+caseCorrectFilePath) assert(t, entry.Path == "xxx", "entry.Path should be xxx") numCalled++ + return nil } - tree.WalkFiles("", walkFunc) - + err := tree.WalkFiles("", walkFunc) + assert(t, err == nil, "No error should be returned") assert(t, numCalled == 1, "walk func should be called only once") - assert(t, fs.Stats.GetErrors() == errors, "No errors should be reported") } @@ -97,15 +97,15 @@ func TestPutAndWalkWithPrefix(t *testing.T) { tree.PutCaseCorrectDirectoryName("", "A") numCalled := 0 - walkFunc := func(caseCorrectFilePath string, entry *dropboxapi.Entry) { + walkFunc := func(caseCorrectFilePath string, entry *dropboxapi.Entry) error { assert(t, caseCorrectFilePath == "A/F", "caseCorrectFilePath should be A/F, not "+caseCorrectFilePath) assert(t, entry.Path == "xxx", "entry.Path should be xxx") numCalled++ + return nil } - tree.WalkFiles("A", walkFunc) - + err := tree.WalkFiles("A", walkFunc) + assert(t, err == nil, "No error should be returned") assert(t, numCalled == 1, "walk func should be called only once") - assert(t, fs.Stats.GetErrors() == errors, "No errors should be reported") } @@ -115,10 +115,11 @@ func TestPutAndWalkIncompleteTree(t *testing.T) { tree := newNameTree() tree.PutFile("a", "F", &dropboxapi.Entry{Path: "xxx"}) - walkFunc := func(caseCorrectFilePath string, entry *dropboxapi.Entry) { + walkFunc := func(caseCorrectFilePath string, entry *dropboxapi.Entry) error { t.Fatal("Should not be called") + return nil } - tree.WalkFiles("", walkFunc) - + err := tree.WalkFiles("", walkFunc) + assert(t, err == nil, "No error should be returned") assert(t, fs.Stats.GetErrors() == errors+1, "One error should be reported") } diff --git a/fs/filter.go b/fs/filter.go index da97832ef..c1ff1c496 100644 --- a/fs/filter.go +++ b/fs/filter.go @@ -6,6 +6,7 @@ import ( "bufio" "fmt" "os" + "path" "regexp" "strconv" "strings" @@ -69,7 +70,8 @@ type Filter struct { ModTimeFrom time.Time ModTimeTo time.Time rules []rule - files filesMap + files filesMap // files if filesFrom + dirs filesMap // dirs from filesFrom } // We use time conventions @@ -244,9 +246,21 @@ func (f *Filter) AddRule(rule string) error { func (f *Filter) AddFile(file string) error { if f.files == nil { f.files = make(filesMap) + f.dirs = make(filesMap) } file = strings.Trim(file, "/") f.files[file] = struct{}{} + // Put all the parent directories into f.dirs + for { + file = path.Dir(file) + if file == "." { + break + } + if _, found := f.dirs[file]; found { + break + } + f.dirs[file] = struct{}{} + } return nil } @@ -265,6 +279,28 @@ func (f *Filter) InActive() bool { len(f.rules) == 0) } +// includeRemote returns whether this remote passes the filter rules. +func (f *Filter) includeRemote(remote string) bool { + for _, rule := range f.rules { + if rule.Match(remote) { + return rule.Include + } + } + return true +} + +// IncludeDirectory returns whether this directory should be included +// in the sync or not. +func (f *Filter) IncludeDirectory(remote string) bool { + remote = strings.Trim(remote, "/") + // filesFrom takes precedence + if f.files != nil { + _, include := f.dirs[remote] + return include + } + return f.includeRemote(remote + "/") +} + // Include returns whether this object should be included into the // sync or not func (f *Filter) Include(remote string, size int64, modTime time.Time) bool { @@ -285,12 +321,7 @@ func (f *Filter) Include(remote string, size int64, modTime time.Time) bool { if f.MaxSize != 0 && size > f.MaxSize { return false } - for _, rule := range f.rules { - if rule.Match(remote) { - return rule.Include - } - } - return true + return f.includeRemote(remote) } // IncludeObject returns whether this object should be included into diff --git a/fs/filter_test.go b/fs/filter_test.go index 01c3b03fa..b2f4743c1 100644 --- a/fs/filter_test.go +++ b/fs/filter_test.go @@ -6,6 +6,8 @@ import ( "strings" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestAgeSuffix(t *testing.T) { @@ -192,6 +194,20 @@ func testInclude(t *testing.T, f *Filter, tests []includeTest) { } } +type includeDirTest struct { + in string + want bool +} + +func testDirInclude(t *testing.T, f *Filter, tests []includeDirTest) { + for _, test := range tests { + got := f.IncludeDirectory(test.in) + if test.want != got { + t.Errorf("%q: want %v got %v", test.in, test.want, got) + } + } +} + func TestNewFilterIncludeFiles(t *testing.T) { f, err := NewFilter() if err != nil { @@ -205,6 +221,11 @@ func TestNewFilterIncludeFiles(t *testing.T) { if err != nil { t.Error(err) } + assert.Equal(t, filesMap{ + "file1.jpg": {}, + "file2.jpg": {}, + }, f.files) + assert.Equal(t, filesMap{}, f.dirs) testInclude(t, f, []includeTest{ {"file1.jpg", 0, 0, true}, {"file2.jpg", 1, 0, true}, @@ -216,6 +237,42 @@ func TestNewFilterIncludeFiles(t *testing.T) { } } +func TestNewFilterIncludeFilesDirs(t *testing.T) { + f, err := NewFilter() + if err != nil { + t.Fatal(err) + } + for _, path := range []string{ + "path/to/dir/file1.png", + "/path/to/dir/file2.png", + "/path/to/file3.png", + "/path/to/dir2/file4.png", + } { + err = f.AddFile(path) + if err != nil { + t.Error(err) + } + } + assert.Equal(t, filesMap{ + "path": {}, + "path/to": {}, + "path/to/dir": {}, + "path/to/dir2": {}, + }, f.dirs) + testDirInclude(t, f, []includeDirTest{ + {"path", true}, + {"path/to", true}, + {"path/to/", true}, + {"/path/to", true}, + {"/path/to/", true}, + {"path/to/dir", true}, + {"path/to/dir2", true}, + {"path/too", false}, + {"path/three", false}, + {"four", false}, + }) +} + func TestNewFilterMinSize(t *testing.T) { f, err := NewFilter() if err != nil { @@ -340,6 +397,16 @@ func TestNewFilterMatches(t *testing.T) { {"sausage3/potato", 101, 0, true}, {"unicorn", 99, 0, false}, }) + testDirInclude(t, f, []includeDirTest{ + {"sausage1", false}, + {"sausage2", false}, + {"sausage2/sub", false}, + {"sausage2/sub/dir", false}, + {"sausage3", true}, + {"sausage3/sub", true}, + {"sausage3/sub/dir", true}, + {"sausage4", false}, + }) if f.InActive() { t.Errorf("want !InActive") } diff --git a/fs/fs.go b/fs/fs.go index 06aa3f949..1969e3f06 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -5,9 +5,12 @@ import ( "fmt" "io" "log" + "math" "path/filepath" "regexp" "sort" + "strings" + "sync" "time" ) @@ -18,6 +21,8 @@ const ( // ModTimeNotSupported is a very large precision value to show // mod time isn't supported on this Fs ModTimeNotSupported = 100 * 365 * 24 * time.Hour + // MaxLevel is a sentinel representing an infinite depth for listings + MaxLevel = math.MaxInt32 ) // Globals @@ -32,6 +37,9 @@ var ( ErrorCantDirMove = fmt.Errorf("Can't move directory - incompatible remotes") ErrorDirExists = fmt.Errorf("Can't copy directory - destination already exists") ErrorCantSetModTime = fmt.Errorf("Can't set modified time") + ErrorDirNotFound = fmt.Errorf("Directory not found") + ErrorLevelNotSupported = fmt.Errorf("Level value not supported") + ErrorListAborted = fmt.Errorf("List aborted") ) // RegInfo provides information about a filesystem @@ -90,11 +98,10 @@ func Register(info *RegInfo) { type Fs interface { Info - // List the Fs into a channel - List() ObjectsChan - - // ListDir lists the Fs directories/buckets/containers into a channel - ListDir() DirChan + // List the objects and directories of the Fs + // + // This should return ErrDirNotFound if the directory isn't found. + List(ListOpts) // NewFsObject finds the Object at remote. Returns nil if can't be found NewFsObject(remote string) Object @@ -239,6 +246,292 @@ type UnWrapper interface { // ObjectsChan is a channel of Objects type ObjectsChan chan Object +// ListOpts describes the interface used for Fs.List operations +type ListOpts interface { + // Add an object to the output. + // If the function returns true, the operation has been aborted. + // Multiple goroutines can safely add objects concurrently. + Add(obj Object) (abort bool) + + // Add a directory to the output. + // If the function returns true, the operation has been aborted. + // Multiple goroutines can safely add objects concurrently. + AddDir(dir *Dir) (abort bool) + + // IncludeDirectory returns whether this directory should be + // included in the listing (and recursed into or not). + IncludeDirectory(remote string) bool + + // SetError will set an error state, and will cause the listing to + // be aborted. + // Multiple goroutines can set the error state concurrently, + // but only the first will be returned to the caller. + SetError(err error) + + // Level returns the level it should recurse to. Fses may + // ignore this in which case the listing will be less + // efficient. + Level() int + + // Buffer returns the channel depth in use + Buffer() int + + // Finished should be called when listing is finished + Finished() + + // IsFinished returns whether Finished or SetError have been called + IsFinished() bool +} + +// listerResult is returned by the lister methods +type listerResult struct { + Obj Object + Dir *Dir + Err error +} + +// Lister objects are used for controlling listing of Fs objects +type Lister struct { + mu sync.RWMutex + buffer int + abort bool + results chan listerResult + finished sync.Once + level int + filter *Filter +} + +// NewLister creates a Lister object. +// +// The default channel buffer size will be Config.Checkers unless +// overridden with SetBuffer. The default level will be infinite. +func NewLister() *Lister { + o := &Lister{} + return o.SetLevel(-1).SetBuffer(Config.Checkers) +} + +// Start starts a go routine listing the Fs passed in. It returns the +// same Lister that was passed in for convenience. +func (o *Lister) Start(f Fs) *Lister { + o.results = make(chan listerResult, o.buffer) + go func() { + f.List(o) + }() + return o +} + +// SetLevel sets the level to recurse to. It returns same Lister that +// was passed in for convenience. If Level is < 0 then it sets it to +// infinite. Must be called before Start(). +func (o *Lister) SetLevel(level int) *Lister { + if level < 0 { + o.level = MaxLevel + } else { + o.level = level + } + return o +} + +// SetFilter sets the Filter that is in use. It defaults to no +// filtering. Must be called before Start(). +func (o *Lister) SetFilter(filter *Filter) *Lister { + o.filter = filter + return o +} + +// Level gets the recursion level for this listing. +// +// Fses may ignore this, but should implement it for improved efficiency if possible. +// +// Level 1 means list just the contents of the directory +// +// Each returned item must have less than level `/`s in. +func (o *Lister) Level() int { + return o.level +} + +// SetBuffer sets the channel buffer size in use. Must be called +// before Start(). +func (o *Lister) SetBuffer(buffer int) *Lister { + if buffer < 1 { + buffer = 1 + } + o.buffer = buffer + return o +} + +// Buffer gets the channel buffer size in use +func (o *Lister) Buffer() int { + return o.buffer +} + +// Add an object to the output. +// If the function returns true, the operation has been aborted. +// Multiple goroutines can safely add objects concurrently. +func (o *Lister) Add(obj Object) (abort bool) { + o.mu.RLock() + defer o.mu.RUnlock() + if o.abort { + return true + } + o.results <- listerResult{Obj: obj} + return false +} + +// AddDir will a directory to the output. +// If the function returns true, the operation has been aborted. +// Multiple goroutines can safely add objects concurrently. +func (o *Lister) AddDir(dir *Dir) (abort bool) { + o.mu.RLock() + defer o.mu.RUnlock() + if o.abort { + return true + } + remote := dir.Name + remote = strings.Trim(remote, "/") + dir.Name = remote + // Check the level and ignore if too high + slashes := strings.Count(remote, "/") + if slashes >= o.level { + return false + } + // Check if directory is included + if !o.IncludeDirectory(remote) { + return false + } + o.results <- listerResult{Dir: dir} + return false +} + +// IncludeDirectory returns whether this directory should be +// included in the listing (and recursed into or not). +func (o *Lister) IncludeDirectory(remote string) bool { + if o.filter == nil { + return true + } + return o.filter.IncludeDirectory(remote) +} + +// SetError will set an error state, and will cause the listing to +// be aborted. +// Multiple goroutines can set the error state concurrently, +// but only the first will be returned to the caller. +func (o *Lister) SetError(err error) { + o.mu.RLock() + if err != nil && !o.abort { + o.results <- listerResult{Err: err} + } + o.mu.RUnlock() + o.Finished() +} + +// Finished should be called when listing is finished +func (o *Lister) Finished() { + o.finished.Do(func() { + o.mu.Lock() + o.abort = true + close(o.results) + o.mu.Unlock() + }) +} + +// IsFinished returns whether the directory listing is finished or not +func (o *Lister) IsFinished() bool { + o.mu.RLock() + defer o.mu.RUnlock() + return o.abort +} + +// Get an object from the listing. +// Will return either an object or a directory, never both. +// Will return (nil, nil, nil) when all objects have been returned. +func (o *Lister) Get() (Object, *Dir, error) { + select { + case r := <-o.results: + return r.Obj, r.Dir, r.Err + } +} + +// GetObject will return an object from the listing. +// It will skip over any directories. +// Will return (nil, nil) when all objects have been returned. +func (o *Lister) GetObject() (Object, error) { + for { + obj, dir, err := o.Get() + if err != nil { + return nil, err + } + // Check if we are finished + if dir == nil && obj == nil { + return nil, nil + } + // Ignore directories + if dir != nil { + continue + } + return obj, nil + } +} + +// GetObjects will return a slice of object from the listing. +// It will skip over any directories. +func (o *Lister) GetObjects() (objs []Object, err error) { + for { + obj, dir, err := o.Get() + if err != nil { + return nil, err + } + // Check if we are finished + if dir == nil && obj == nil { + break + } + if obj != nil { + objs = append(objs, obj) + } + } + return objs, nil +} + +// GetDir will return a directory from the listing. +// It will skip over any objects. +// Will return (nil, nil) when all objects have been returned. +func (o *Lister) GetDir() (*Dir, error) { + for { + obj, dir, err := o.Get() + if err != nil { + return nil, err + } + // Check if we are finished + if dir == nil && obj == nil { + return nil, nil + } + // Ignore objects + if obj != nil { + continue + } + return dir, nil + } +} + +// GetDirs will return a slice of directories from the listing. +// It will skip over any objects. +func (o *Lister) GetDirs() (dirs []*Dir, err error) { + for { + obj, dir, err := o.Get() + if err != nil { + return nil, err + } + // Check if we are finished + if dir == nil && obj == nil { + break + } + if dir != nil { + dirs = append(dirs, dir) + } + } + return dirs, nil +} + // Objects is a slice of Object~s type Objects []Object diff --git a/fs/limited.go b/fs/limited.go index f5c48be8d..2139ba6d1 100644 --- a/fs/limited.go +++ b/fs/limited.go @@ -38,22 +38,13 @@ func (f *Limited) String() string { } // List the Fs into a channel -func (f *Limited) List() ObjectsChan { - out := make(ObjectsChan, Config.Checkers) - go func() { - for _, obj := range f.objects { - out <- obj +func (f *Limited) List(opts ListOpts) { + defer opts.Finished() + for _, obj := range f.objects { + if opts.Add(obj) { + return } - close(out) - }() - return out -} - -// ListDir lists the Fs directories/buckets/containers into a channel -func (f *Limited) ListDir() DirChan { - out := make(DirChan, Config.Checkers) - close(out) - return out + } } // NewFsObject finds the Object at remote. Returns nil if can't be found diff --git a/fs/operations.go b/fs/operations.go index d2eb6e223..c8459eccb 100644 --- a/fs/operations.go +++ b/fs/operations.go @@ -456,10 +456,23 @@ func DeleteFiles(toBeDeleted ObjectsChan) { // Read a map of Object.Remote to Object for the given Fs. // If includeAll is specified all files will be added, // otherwise only files passing the filter will be added. -func readFilesMap(fs Fs, includeAll bool) map[string]Object { - files := make(map[string]Object) +func readFilesMap(fs Fs, includeAll bool) (files map[string]Object, err error) { + files = make(map[string]Object) normalised := make(map[string]struct{}) - for o := range fs.List() { + list := NewLister() + if !includeAll { + list.SetFilter(Config.Filter) + } + list.Start(fs) + for { + o, err := list.GetObject() + if err != nil { + return files, err + } + // Check if we are finished + if o == nil { + break + } remote := o.Remote() normalisedRemote := strings.ToLower(norm.NFC.String(remote)) if _, ok := files[remote]; !ok { @@ -477,7 +490,39 @@ func readFilesMap(fs Fs, includeAll bool) map[string]Object { } normalised[normalisedRemote] = struct{}{} } - return files + return files, nil +} + +// readFilesMaps runs readFilesMap on fdst and fsrc at the same time +func readFilesMaps(fdst Fs, fdstIncludeAll bool, fsrc Fs, fsrcIncludeAll bool) (dstFiles, srcFiles map[string]Object, err error) { + var wg sync.WaitGroup + var srcErr, dstErr error + + list := func(fs Fs, includeAll bool, pMap *map[string]Object, pErr *error) { + defer wg.Done() + Log(fs, "Building file list") + dstFiles, listErr := readFilesMap(fs, includeAll) + if listErr != nil { + ErrorLog(fs, "Error building file list: %v", listErr) + *pErr = listErr + } else { + Debug(fs, "Done building file list") + *pMap = dstFiles + } + } + + wg.Add(2) + go list(fdst, fdstIncludeAll, &dstFiles, &srcErr) + go list(fsrc, fsrcIncludeAll, &srcFiles, &dstErr) + wg.Wait() + + if srcErr != nil { + err = srcErr + } + if dstErr != nil { + err = dstErr + } + return dstFiles, srcFiles, err } // Same returns true if fdst and fsrc point to the same underlying Fs @@ -501,31 +546,11 @@ func syncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) error { return err } - Log(fdst, "Building file list") - - // Read the files of both source and destination - var listWg sync.WaitGroup - listWg.Add(2) - - var dstFiles map[string]Object - var srcFiles map[string]Object - var srcObjects = make(ObjectsChan, Config.Transfers) - - // Read dst files including excluded files if DeleteExcluded is set - go func() { - dstFiles = readFilesMap(fdst, Config.Filter.DeleteExcluded) - listWg.Done() - }() - - // Read src file not including excluded files - go func() { - srcFiles = readFilesMap(fsrc, false) - listWg.Done() - for _, v := range srcFiles { - srcObjects <- v - } - close(srcObjects) - }() + // Read the files of both source and destination in parallel + dstFiles, srcFiles, err := readFilesMaps(fdst, Config.Filter.DeleteExcluded, fsrc, false) + if err != nil { + return err + } startDeletion := make(chan struct{}, 0) @@ -564,9 +589,6 @@ func syncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) error { DeleteFiles(toDelete) }() - // Wait for all files to be read - listWg.Wait() - // Start deleting, unless we must delete after transfer if Delete && !Config.DeleteAfter { close(startDeletion) @@ -598,18 +620,15 @@ func syncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) error { } } - go func() { - for src := range srcObjects { - remote := src.Remote() - if dst, dstFound := dstFiles[remote]; dstFound { - toBeChecked <- ObjectPair{src, dst} - } else { - // No need to check since doesn't exist - toBeUploaded <- ObjectPair{src, nil} - } + for remote, src := range srcFiles { + if dst, dstFound := dstFiles[remote]; dstFound { + toBeChecked <- ObjectPair{src, dst} + } else { + // No need to check since doesn't exist + toBeUploaded <- ObjectPair{src, nil} } - close(toBeChecked) - }() + } + close(toBeChecked) Log(fdst, "Waiting for checks to finish") checkerWg.Wait() @@ -713,30 +732,11 @@ func checkIdentical(dst, src Object) bool { // Check the files in fsrc and fdst according to Size and hash func Check(fdst, fsrc Fs) error { + dstFiles, srcFiles, err := readFilesMaps(fdst, false, fsrc, false) + if err != nil { + return err + } differences := int32(0) - var ( - wg sync.WaitGroup - dstFiles, srcFiles map[string]Object - ) - - wg.Add(2) - go func() { - defer wg.Done() - // Read the destination files - Log(fdst, "Building file list") - dstFiles = readFilesMap(fdst, false) - Debug(fdst, "Done building file list") - }() - - go func() { - defer wg.Done() - // Read the source files - Log(fsrc, "Building file list") - srcFiles = readFilesMap(fsrc, false) - Debug(fdst, "Done building file list") - }() - - wg.Wait() // FIXME could do this as it goes along and make it use less // memory. @@ -800,13 +800,21 @@ func Check(fdst, fsrc Fs) error { // // Lists in parallel which may get them out of order func ListFn(f Fs, fn func(Object)) error { - in := f.List() + list := NewLister().SetFilter(Config.Filter).Start(f) var wg sync.WaitGroup wg.Add(Config.Checkers) for i := 0; i < Config.Checkers; i++ { go func() { defer wg.Done() - for o := range in { + for { + o, err := list.GetObject() + if err != nil { + log.Fatal(err) + } + // check if we are finished + if o == nil { + return + } if Config.Filter.IncludeObject(o) { fn(o) } @@ -901,7 +909,15 @@ func Count(f Fs) (objects int64, size int64, err error) { // ListDir lists the directories/buckets/containers in the Fs to the supplied writer func ListDir(f Fs, w io.Writer) error { - for dir := range f.ListDir() { + list := NewLister().SetLevel(1).Start(f) + for { + dir, err := list.GetDir() + if err != nil { + log.Fatal(err) + } + if dir == nil { + break + } syncFprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name) } return nil @@ -960,7 +976,8 @@ func Purge(f Fs) error { } if doFallbackPurge { // DeleteFiles and Rmdir observe --dry-run - DeleteFiles(f.List()) + list := NewLister().Start(f) + DeleteFiles(listToChan(list)) err = Rmdir(f) } if err != nil { @@ -1115,7 +1132,16 @@ func (mode DeduplicateMode) String() string { func Deduplicate(f Fs, mode DeduplicateMode) error { Log(f, "Looking for duplicates using %v mode.", mode) files := map[string][]Object{} - for o := range f.List() { + list := NewLister().Start(f) + for { + o, err := list.GetObject() + if err != nil { + return err + } + // Check if we are finished + if o == nil { + break + } remote := o.Remote() files[remote] = append(files[remote], o) } @@ -1149,3 +1175,34 @@ func Deduplicate(f Fs, mode DeduplicateMode) error { } return nil } + +// listToChan will transfer all incoming objects to a new channel. +// +// If an error occurs, the error will be logged, and it will close the +// channel. +// +// If the error was ErrorDirNotFound then it will be ignored +func listToChan(list *Lister) ObjectsChan { + o := make(ObjectsChan, Config.Checkers) + go func() { + defer close(o) + for { + obj, dir, err := list.Get() + if err != nil { + if err != ErrorDirNotFound { + Stats.Error() + ErrorLog(nil, "Failed to list: %v", err) + } + return + } + if dir == nil && obj == nil { + return + } + if o == nil { + continue + } + o <- obj + } + }() + return o +} diff --git a/fs/operations_test.go b/fs/operations_test.go index 39bfe9ae3..19c36e94e 100644 --- a/fs/operations_test.go +++ b/fs/operations_test.go @@ -140,11 +140,20 @@ func NewRun(t *testing.T) *Run { r = new(Run) *r = *oneRun r.cleanRemote = func() { - oldErrors := fs.Stats.GetErrors() - fs.DeleteFiles(r.fremote.List()) - errors := fs.Stats.GetErrors() - oldErrors - if errors != 0 { - t.Fatalf("%d errors while cleaning remote %v", errors, r.fremote) + list := fs.NewLister().Start(r.fremote) + for { + o, err := list.GetObject() + if err != nil { + t.Fatalf("Error listing: %v", err) + } + // Check if we are finished + if o == nil { + break + } + err = o.Remove() + if err != nil { + t.Errorf("Error removing file: %v", err) + } } // Check remote is empty fstest.CheckItems(t, r.fremote) @@ -320,7 +329,12 @@ func TestCopyAfterDelete(t *testing.T) { fstest.CheckItems(t, r.flocal) fstest.CheckItems(t, r.fremote, file1) - err := fs.CopyDir(r.fremote, r.flocal) + err := fs.Mkdir(r.flocal) + if err != nil { + t.Fatalf("Mkdir failed: %v", err) + } + + err = fs.CopyDir(r.fremote, r.flocal) if err != nil { t.Fatalf("Copy failed: %v", err) } @@ -1167,7 +1181,16 @@ func TestDeduplicateRename(t *testing.T) { t.Fatalf("fs.Deduplicate returned error: %v", err) } - for o := range r.fremote.List() { + list := fs.NewLister().Start(r.fremote) + for { + o, err := list.GetObject() + if err != nil { + t.Fatalf("Listing failed: %v", err) + } + // Check if we are finished + if o == nil { + break + } remote := o.Remote() if remote != "one-1.txt" && remote != "one-2.txt" && diff --git a/fstest/fstest.go b/fstest/fstest.go index cd3c3f383..89b674daa 100644 --- a/fstest/fstest.go +++ b/fstest/fstest.go @@ -154,12 +154,13 @@ func CheckListingWithPrecision(t *testing.T, f fs.Fs, items []Item, precision ti is := NewItems(items) oldErrors := fs.Stats.GetErrors() var objs []fs.Object + var err error const retries = 6 sleep := time.Second / 2 for i := 1; i <= retries; i++ { - objs = nil - for obj := range f.List() { - objs = append(objs, obj) + objs, err = fs.NewLister().Start(f).GetObjects() + if err != nil && err != fs.ErrorDirNotFound { + t.Fatalf("Error listing: %v", err) } if len(objs) == len(items) { // Put an extra sleep in if we did any retries just to make sure it really diff --git a/fstest/fstests/fstests.go b/fstest/fstests/fstests.go index 7b97a92cf..fd8a418ea 100644 --- a/fstest/fstests/fstests.go +++ b/fstest/fstests/fstests.go @@ -129,8 +129,12 @@ func TestFsListEmpty(t *testing.T) { // TestFsListDirEmpty tests listing the directories from an empty directory func TestFsListDirEmpty(t *testing.T) { skipIfNotOk(t) - for obj := range remote.ListDir() { - t.Errorf("Found unexpected item %q", obj.Name) + dirs, err := fs.NewLister().SetLevel(1).Start(remote).GetDirs() + if err != nil { + t.Fatal(err) + } + for _, dir := range dirs { + t.Errorf("Found unexpected item %q", dir.Name) } } @@ -193,9 +197,13 @@ func TestFsListDirFile2(t *testing.T) { skipIfNotOk(t) found := false for i := 1; i <= eventualConsistencyRetries; i++ { - for obj := range remote.ListDir() { - if obj.Name != `hello? sausage` && obj.Name != `hello_ sausage` { - t.Errorf("Found unexpected item %q", obj.Name) + dirs, err := fs.NewLister().SetLevel(1).Start(remote).GetDirs() + if err != nil { + t.Fatal(err) + } + for _, dir := range dirs { + if dir.Name != `hello? sausage` && dir.Name != `hello_ sausage` { + t.Errorf("Found unexpected item %q", dir.Name) } else { found = true } @@ -219,8 +227,12 @@ func TestFsListDirRoot(t *testing.T) { t.Fatalf("Failed to make remote %q: %v", RemoteName, err) } found := false - for obj := range rootRemote.ListDir() { - if obj.Name == subRemoteLeaf { + dirs, err := fs.NewLister().SetLevel(1).Start(rootRemote).GetDirs() + if err != nil { + t.Fatal(err) + } + for _, dir := range dirs { + if dir.Name == subRemoteLeaf { found = true } } @@ -243,8 +255,11 @@ func TestFsListRoot(t *testing.T) { f2 := subRemoteLeaf + "/" + file2.Path f2Alt := subRemoteLeaf + "/" + file2.WinPath count := 0 - errors := fs.Stats.GetErrors() - for obj := range rootRemote.List() { + objs, err := fs.NewLister().Start(rootRemote).GetObjects() + if err != nil { + t.Fatal(err) + } + for _, obj := range objs { count++ if obj.Remote() == f1 { found1 = true @@ -253,17 +268,12 @@ func TestFsListRoot(t *testing.T) { found2 = true } } - errors -= fs.Stats.GetErrors() if count == 0 { - if errors == 0 { - t.Error("Expecting error if count==0") - } + // Nothing found is OK return } if found1 && found2 { - if errors != 0 { - t.Error("Not expecting error if found") - } + // Both found is OK return } t.Errorf("Didn't find %q (%v) and %q (%v) or no files (count %d)", f1, found1, f2, found2, count) diff --git a/googlecloudstorage/googlecloudstorage.go b/googlecloudstorage/googlecloudstorage.go index 25ea96434..bc5554863 100644 --- a/googlecloudstorage/googlecloudstorage.go +++ b/googlecloudstorage/googlecloudstorage.go @@ -15,6 +15,7 @@ FIXME Patch/Delete/Get isn't working with files with spaces in - giving 404 erro import ( "encoding/base64" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -294,38 +295,48 @@ func (f *Fs) NewFsObject(remote string) fs.Object { return f.newFsObjectWithInfo(remote, nil) } +// listFn is called from list to handle an object. +type listFn func(remote string, object *storage.Object, isDirectory bool) error + // list the objects into the function supplied // // If directories is set it only sends directories -func (f *Fs) list(directories bool, fn func(string, *storage.Object)) { +func (f *Fs) list(level int, fn listFn) error { list := f.svc.Objects.List(f.bucket).Prefix(f.root).MaxResults(listChunks) - if directories { + switch level { + case 1: list = list.Delimiter("/") + case fs.MaxLevel: + default: + return fs.ErrorLevelNotSupported } rootLength := len(f.root) for { objects, err := list.Do() if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't read bucket %q: %s", f.bucket, err) - return + return err } - if !directories { - for _, object := range objects.Items { - if !strings.HasPrefix(object.Name, f.root) { - fs.Log(f, "Odd name received %q", object.Name) - continue - } - remote := object.Name[rootLength:] - fn(remote, object) - } - } else { + if level == 1 { var object storage.Object for _, prefix := range objects.Prefixes { if !strings.HasSuffix(prefix, "/") { continue } - fn(prefix[:len(prefix)-1], &object) + err = fn(prefix[:len(prefix)-1], &object, true) + if err != nil { + return err + } + } + } + for _, object := range objects.Items { + if !strings.HasPrefix(object.Name, f.root) { + fs.Log(f, "Odd name received %q", object.Name) + continue + } + remote := object.Name[rootLength:] + err = fn(remote, object, false) + if err != nil { + return err } } if objects.NextPageToken == "" { @@ -333,78 +344,85 @@ func (f *Fs) list(directories bool, fn func(string, *storage.Object)) { } list.PageToken(objects.NextPageToken) } + return nil } -// List walks the path returning a channel of FsObjects -func (f *Fs) List() fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) +// listFiles lists files and directories to out +func (f *Fs) listFiles(out fs.ListOpts) { + defer out.Finished() if f.bucket == "" { - // Return no objects at top level list - close(out) - fs.Stats.Error() - fs.ErrorLog(f, "Can't list objects at root - choose a bucket using lsd") - } else { - // List the objects - go func() { - defer close(out) - f.list(false, func(remote string, object *storage.Object) { - if fs := f.newFsObjectWithInfo(remote, object); fs != nil { - out <- fs - } - }) - }() + out.SetError(fmt.Errorf("Can't list objects at root - choose a bucket using lsd")) + return + } + // List the objects + err := f.list(out.Level(), func(remote string, object *storage.Object, isDirectory bool) error { + if isDirectory { + dir := &fs.Dir{ + Name: remote, + Bytes: int64(object.Size), + Count: 0, + } + if out.AddDir(dir) { + return fs.ErrorListAborted + } + } else { + if o := f.newFsObjectWithInfo(remote, object); o != nil { + if out.Add(o) { + return fs.ErrorListAborted + } + } + } + return nil + }) + if err != nil { + if gErr, ok := err.(*googleapi.Error); ok { + if gErr.Code == http.StatusNotFound { + err = fs.ErrorDirNotFound + } + } + out.SetError(err) } - return out } -// ListDir lists the buckets -func (f *Fs) ListDir() fs.DirChan { - out := make(fs.DirChan, fs.Config.Checkers) - if f.bucket == "" { - // List the buckets - go func() { - defer close(out) - if f.projectNumber == "" { - fs.Stats.Error() - fs.ErrorLog(f, "Can't list buckets without project number") +// listBuckets lists the buckets to out +func (f *Fs) listBuckets(out fs.ListOpts) { + defer out.Finished() + if f.projectNumber == "" { + out.SetError(errors.New("Can't list buckets without project number")) + return + } + listBuckets := f.svc.Buckets.List(f.projectNumber).MaxResults(listChunks) + for { + buckets, err := listBuckets.Do() + if err != nil { + out.SetError(err) + return + } + for _, bucket := range buckets.Items { + dir := &fs.Dir{ + Name: bucket.Name, + Bytes: 0, + Count: 0, + } + if out.AddDir(dir) { return } - listBuckets := f.svc.Buckets.List(f.projectNumber).MaxResults(listChunks) - for { - buckets, err := listBuckets.Do() - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't list buckets: %v", err) - break - } else { - for _, bucket := range buckets.Items { - out <- &fs.Dir{ - Name: bucket.Name, - Bytes: 0, - Count: 0, - } - } - } - if buckets.NextPageToken == "" { - break - } - listBuckets.PageToken(buckets.NextPageToken) - } - }() - } else { - // List the directories in the path in the bucket - go func() { - defer close(out) - f.list(true, func(remote string, object *storage.Object) { - out <- &fs.Dir{ - Name: remote, - Bytes: int64(object.Size), - Count: 0, - } - }) - }() + } + if buckets.NextPageToken == "" { + break + } + listBuckets.PageToken(buckets.NextPageToken) } - return out +} + +// List lists the path to out +func (f *Fs) List(out fs.ListOpts) { + if f.bucket == "" { + f.listBuckets(out) + } else { + f.listFiles(out) + } + return } // Put the object into the bucket diff --git a/local/local.go b/local/local.go index 474c27cec..51e1c2d2f 100644 --- a/local/local.go +++ b/local/local.go @@ -6,6 +6,7 @@ import ( "io" "io/ioutil" "os" + "path" "path/filepath" "regexp" "runtime" @@ -137,42 +138,123 @@ func (f *Fs) NewFsObject(remote string) fs.Object { return f.newFsObjectWithInfo(remote, nil) } -// List the path returning a channel of FsObjects -// -// Ignores everything which isn't Storable, eg links etc -func (f *Fs) List() fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) - go func() { - err := filepath.Walk(f.root, func(path string, fi os.FileInfo, err error) error { - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Failed to open directory: %s: %s", path, err) +// listArgs is the arguments that a new list takes +type listArgs struct { + remote string + dirpath string + level int +} + +// list traverses the directory passed in, listing to out. +// it returns a boolean whether it is finished or not. +func (f *Fs) list(out fs.ListOpts, remote string, dirpath string, level int) (subdirs []listArgs) { + fd, err := os.Open(dirpath) + if err != nil { + out.SetError(err) + fs.Stats.Error() + fs.ErrorLog(f, "Failed to open directory: %s: %s", dirpath, err) + return nil + } + defer func() { + err := fd.Close() + if err != nil { + out.SetError(err) + fs.Stats.Error() + fs.ErrorLog(f, "Failed to close directory: %s: %s", dirpath, err) + } + }() + + for { + fis, err := fd.Readdir(1024) + if err == io.EOF && len(fis) == 0 { + break + } + if err != nil { + out.SetError(err) + fs.Stats.Error() + fs.ErrorLog(f, "Failed to read directory: %s: %s", dirpath, err) + return nil + } + + for _, fi := range fis { + name := fi.Name() + newRemote := path.Join(remote, name) + newPath := filepath.Join(dirpath, name) + if fi.IsDir() { + if out.IncludeDirectory(newRemote) { + dir := &fs.Dir{ + Name: f.cleanUtf8(newRemote), + When: fi.ModTime(), + Bytes: 0, + Count: 0, + } + if out.AddDir(dir) { + return nil + } + if level > 0 { + subdirs = append(subdirs, listArgs{remote: newRemote, dirpath: newPath, level: level - 1}) + } + } } else { - remote, err := filepath.Rel(f.root, path) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Failed to get relative path %s: %s", path, err) - return nil - } - if remote == "." { - return nil - // remote = "" - } - if fs := f.newFsObjectWithInfo(remote, fi); fs != nil { - if fs.Storable() { - out <- fs + if fso := f.newFsObjectWithInfo(newRemote, fi); fso != nil { + if fso.Storable() && out.Add(fso) { + return nil } } } - return nil - }) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Failed to open directory: %s: %s", f.root, err) } - close(out) - }() - return out + } + return subdirs +} + +// List the path into out +// +// Ignores everything which isn't Storable, eg links etc +func (f *Fs) List(out fs.ListOpts) { + defer out.Finished() + _, err := os.Stat(f.root) + if err != nil { + out.SetError(fs.ErrorDirNotFound) + fs.Stats.Error() + fs.ErrorLog(f, "Directory not found: %s: %s", f.root, err) + return + } + + in := make(chan listArgs, out.Buffer()) + var wg sync.WaitGroup // sync closing of go routines + var traversing sync.WaitGroup // running directory traversals + + // Start the process + traversing.Add(1) + in <- listArgs{remote: "", dirpath: f.root, level: out.Level() - 1} + for i := 0; i < fs.Config.Checkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for job := range in { + if out.IsFinished() { + continue + } + newJobs := f.list(out, job.remote, job.dirpath, job.level) + // Now we have traversed this directory, send + // these ones off for traversal + if len(newJobs) != 0 { + traversing.Add(len(newJobs)) + go func() { + for _, newJob := range newJobs { + in <- newJob + } + }() + } + traversing.Done() + } + }() + } + + // Wait for traversal to finish + traversing.Wait() + close(in) + wg.Wait() } // CleanUtf8 makes string a valid UTF-8 string @@ -194,48 +276,50 @@ func (f *Fs) cleanUtf8(name string) string { return name } +/* // ListDir walks the path returning a channel of FsObjects -func (f *Fs) ListDir() fs.DirChan { - out := make(fs.DirChan, fs.Config.Checkers) - go func() { - defer close(out) - items, err := ioutil.ReadDir(f.root) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't find read directory: %s", err) - } else { - for _, item := range items { - if item.IsDir() { - dir := &fs.Dir{ - Name: f.cleanUtf8(item.Name()), - When: item.ModTime(), - Bytes: 0, - Count: 0, - } - // Go down the tree to count the files and directories - dirpath := f.filterPath(filepath.Join(f.root, item.Name())) - err := filepath.Walk(dirpath, func(path string, fi os.FileInfo, err error) error { - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Failed to open directory: %s: %s", path, err) - } else { - dir.Count++ - dir.Bytes += fi.Size() - } - return nil - }) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Failed to open directory: %s: %s", dirpath, err) - } - out <- dir +func (f *Fs) ListDir(out fs.ListDirOpts) { + defer out.Finished() + items, err := ioutil.ReadDir(f.root) + if err != nil { + fs.Stats.Error() + fs.ErrorLog(f, "Couldn't find read directory: %s", err) + out.SetError(err) + return + } + for _, item := range items { + if item.IsDir() { + dir := &fs.Dir{ + Name: f.cleanUtf8(item.Name()), + When: item.ModTime(), + Bytes: 0, + Count: 0, + } + // Go down the tree to count the files and directories + dirpath := f.filterPath(filepath.Join(f.root, item.Name())) + err := filepath.Walk(dirpath, func(path string, fi os.FileInfo, err error) error { + if err != nil { + fs.Stats.Error() + fs.ErrorLog(f, "Failed to open directory: %s: %s", path, err) + out.SetError(err) + } else { + dir.Count++ + dir.Bytes += fi.Size() } + return nil + }) + if err != nil { + out.SetError(err) + fs.Stats.Error() + fs.ErrorLog(f, "Failed to open directory: %s: %s", dirpath, err) + } + if out.Add(dir) { + return } } - // err := f.findRoot(false) - }() - return out + } } +*/ // Put the FsObject to the local filesystem func (f *Fs) Put(in io.Reader, src fs.ObjectInfo) (fs.Object, error) { diff --git a/onedrive/onedrive.go b/onedrive/onedrive.go index a6074e65a..7ec0cc99f 100644 --- a/onedrive/onedrive.go +++ b/onedrive/onedrive.go @@ -10,7 +10,6 @@ import ( "net/http" "regexp" "strings" - "sync" "time" "github.com/ncw/rclone/dircache" @@ -369,98 +368,45 @@ OUTER: return } -// Path should be directory path either "" or "path/" -// -// List the directory using a recursive list from the root -// -// This fetches the minimum amount of stuff but does more API calls -// which makes it slow -func (f *Fs) listDirRecursive(dirID string, path string, out fs.ObjectsChan) error { - var subError error - // Make the API request - var wg sync.WaitGroup - _, err := f.listAll(dirID, false, false, func(info *api.Item) bool { - // Recurse on directories +// ListDir reads the directory specified by the job into out, returning any more jobs +func (f *Fs) ListDir(out fs.ListOpts, job dircache.ListDirJob) (jobs []dircache.ListDirJob, err error) { + fs.Debug(f, "Reading %q", job.Path) + _, err = f.listAll(job.DirID, false, false, func(info *api.Item) bool { + remote := job.Path + info.Name if info.Folder != nil { - wg.Add(1) - folder := path + info.Name + "/" - fs.Debug(f, "Reading %s", folder) - go func() { - defer wg.Done() - err := f.listDirRecursive(info.ID, folder, out) - if err != nil { - subError = err - fs.ErrorLog(f, "Error reading %s:%s", folder, err) + if out.IncludeDirectory(remote) { + dir := &fs.Dir{ + Name: remote, + Bytes: -1, + Count: -1, + When: time.Time(info.LastModifiedDateTime), } - }() + if info.Folder != nil { + dir.Count = info.Folder.ChildCount + } + if out.AddDir(dir) { + return true + } + if job.Depth > 0 { + jobs = append(jobs, dircache.ListDirJob{DirID: info.ID, Path: remote + "/", Depth: job.Depth - 1}) + } + } } else { - if fs := f.newObjectWithInfo(path+info.Name, info); fs != nil { - out <- fs + if o := f.newObjectWithInfo(remote, info); o != nil { + if out.Add(o) { + return true + } } } return false }) - wg.Wait() - fs.Debug(f, "Finished reading %s", path) - if err != nil { - return err - } - if subError != nil { - return subError - } - return nil + fs.Debug(f, "Finished reading %q", job.Path) + return jobs, err } -// List walks the path returning a channel of Objects -func (f *Fs) List() fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) - go func() { - defer close(out) - err := f.dirCache.FindRoot(false) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't find root: %s", err) - } else { - err = f.listDirRecursive(f.dirCache.RootID(), "", out) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "List failed: %s", err) - } - } - }() - return out -} - -// ListDir lists the directories -func (f *Fs) ListDir() fs.DirChan { - out := make(fs.DirChan, fs.Config.Checkers) - go func() { - defer close(out) - err := f.dirCache.FindRoot(false) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't find root: %s", err) - } else { - _, err := f.listAll(f.dirCache.RootID(), true, false, func(item *api.Item) bool { - dir := &fs.Dir{ - Name: item.Name, - Bytes: -1, - Count: -1, - When: time.Time(item.LastModifiedDateTime), - } - if item.Folder != nil { - dir.Count = item.Folder.ChildCount - } - out <- dir - return false - }) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "ListDir failed: %s", err) - } - } - }() - return out +// List walks the path returning files and directories into out +func (f *Fs) List(out fs.ListOpts) { + f.dirCache.List(f, out) } // Creates from the parameters passed in a half finished Object which diff --git a/s3/s3.go b/s3/s3.go index a48e3e61f..5b456c323 100644 --- a/s3/s3.go +++ b/s3/s3.go @@ -364,14 +364,21 @@ func (f *Fs) NewFsObject(remote string) fs.Object { return f.newFsObjectWithInfo(remote, nil) } +// listFn is called from list to handle an object. +type listFn func(remote string, object *s3.Object, isDirectory bool) error + // list the objects into the function supplied // -// If directories is set it only sends directories -func (f *Fs) list(directories bool, fn func(string, *s3.Object)) { +// Level is the level of the recursion +func (f *Fs) list(level int, fn listFn) error { maxKeys := int64(listChunkSize) delimiter := "" - if directories { + switch level { + case 1: delimiter = "/" + case fs.MaxLevel: + default: + return fs.ErrorLevelNotSupported } var marker *string for { @@ -385,114 +392,127 @@ func (f *Fs) list(directories bool, fn func(string, *s3.Object)) { } resp, err := f.c.ListObjects(&req) if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't read bucket %q: %s", f.bucket, err) + return err + } + rootLength := len(f.root) + if level == 1 { + for _, commonPrefix := range resp.CommonPrefixes { + if commonPrefix.Prefix == nil { + fs.Log(f, "Nil common prefix received") + continue + } + remote := *commonPrefix.Prefix + if !strings.HasPrefix(remote, f.root) { + fs.Log(f, "Odd name received %q", remote) + continue + } + remote = remote[rootLength:] + if strings.HasSuffix(remote, "/") { + remote = remote[:len(remote)-1] + } + err = fn(remote, &s3.Object{Key: &remote}, true) + if err != nil { + return err + } + } + } + for _, object := range resp.Contents { + key := aws.StringValue(object.Key) + if !strings.HasPrefix(key, f.root) { + fs.Log(f, "Odd name received %q", key) + continue + } + remote := key[rootLength:] + err = fn(remote, object, false) + if err != nil { + return err + } + } + if !aws.BoolValue(resp.IsTruncated) { break + } + // Use NextMarker if set, otherwise use last Key + if resp.NextMarker == nil || *resp.NextMarker == "" { + marker = resp.Contents[len(resp.Contents)-1].Key } else { - rootLength := len(f.root) - if directories { - for _, commonPrefix := range resp.CommonPrefixes { - if commonPrefix.Prefix == nil { - fs.Log(f, "Nil common prefix received") - continue - } - remote := *commonPrefix.Prefix - if !strings.HasPrefix(remote, f.root) { - fs.Log(f, "Odd name received %q", remote) - continue - } - remote = remote[rootLength:] - if strings.HasSuffix(remote, "/") { - remote = remote[:len(remote)-1] - } - fn(remote, &s3.Object{Key: &remote}) - } - } else { - for _, object := range resp.Contents { - key := aws.StringValue(object.Key) - if !strings.HasPrefix(key, f.root) { - fs.Log(f, "Odd name received %q", key) - continue - } - remote := key[rootLength:] - fn(remote, object) + marker = resp.NextMarker + } + } + return nil +} + +// listFiles lists files and directories to out +func (f *Fs) listFiles(out fs.ListOpts) { + defer out.Finished() + if f.bucket == "" { + // Return no objects at top level list + out.SetError(errors.New("Can't list objects at root - choose a bucket using lsd")) + return + } + // List the objects and directories + err := f.list(out.Level(), func(remote string, object *s3.Object, isDirectory bool) error { + if isDirectory { + size := int64(0) + if object.Size != nil { + size = *object.Size + } + dir := &fs.Dir{ + Name: remote, + Bytes: size, + Count: 0, + } + if out.AddDir(dir) { + return fs.ErrorListAborted + } + } else { + if o := f.newFsObjectWithInfo(remote, object); o != nil { + if out.Add(o) { + return fs.ErrorListAborted } } - if !aws.BoolValue(resp.IsTruncated) { - break - } - // Use NextMarker if set, otherwise use last Key - if resp.NextMarker == nil || *resp.NextMarker == "" { - marker = resp.Contents[len(resp.Contents)-1].Key - } else { - marker = resp.NextMarker + } + return nil + }) + if err != nil { + if awsErr, ok := err.(awserr.RequestFailure); ok { + if awsErr.StatusCode() == http.StatusNotFound { + err = fs.ErrorDirNotFound } } + out.SetError(err) + } +} + +// listBuckets lists the buckets to out +func (f *Fs) listBuckets(out fs.ListOpts) { + defer out.Finished() + req := s3.ListBucketsInput{} + resp, err := f.c.ListBuckets(&req) + if err != nil { + out.SetError(err) + return + } + for _, bucket := range resp.Buckets { + dir := &fs.Dir{ + Name: aws.StringValue(bucket.Name), + When: aws.TimeValue(bucket.CreationDate), + Bytes: -1, + Count: -1, + } + if out.AddDir(dir) { + break + } } } -// List walks the path returning a channel of FsObjects -func (f *Fs) List() fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) +// List lists files and directories to out +func (f *Fs) List(out fs.ListOpts) { if f.bucket == "" { - // Return no objects at top level list - close(out) - fs.Stats.Error() - fs.ErrorLog(f, "Can't list objects at root - choose a bucket using lsd") + f.listBuckets(out) } else { - go func() { - defer close(out) - f.list(false, func(remote string, object *s3.Object) { - if fs := f.newFsObjectWithInfo(remote, object); fs != nil { - out <- fs - } - }) - }() + f.listFiles(out) } - return out -} - -// ListDir lists the buckets -func (f *Fs) ListDir() fs.DirChan { - out := make(fs.DirChan, fs.Config.Checkers) - if f.bucket == "" { - // List the buckets - go func() { - defer close(out) - req := s3.ListBucketsInput{} - resp, err := f.c.ListBuckets(&req) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't list buckets: %s", err) - } else { - for _, bucket := range resp.Buckets { - out <- &fs.Dir{ - Name: aws.StringValue(bucket.Name), - When: aws.TimeValue(bucket.CreationDate), - Bytes: -1, - Count: -1, - } - } - } - }() - } else { - // List the directories in the path in the bucket - go func() { - defer close(out) - f.list(true, func(remote string, object *s3.Object) { - size := int64(0) - if object.Size != nil { - size = *object.Size - } - out <- &fs.Dir{ - Name: remote, - Bytes: size, - Count: 0, - } - }) - }() - } - return out + return } // Put the FsObject into the bucket diff --git a/swift/swift.go b/swift/swift.go index 97d66db2b..9d450eabd 100644 --- a/swift/swift.go +++ b/swift/swift.go @@ -253,21 +253,25 @@ func (f *Fs) NewFsObject(remote string) fs.Object { return f.newFsObjectWithInfo(remote, nil) } -// listFn is called from list and listContainerRoot to handle an object -type listFn func(string, *swift.Object) error +// listFn is called from list and listContainerRoot to handle an object. +type listFn func(remote string, object *swift.Object, isDirectory bool) error // listContainerRoot lists the objects into the function supplied from // the container and root supplied // -// If directories is set it only sends directories -func (f *Fs) listContainerRoot(container, root string, directories bool, fn listFn) error { +// Level is the level of the recursion +func (f *Fs) listContainerRoot(container, root string, level int, fn listFn) error { // Options for ObjectsWalk opts := swift.ObjectsOpts{ Prefix: root, Limit: 256, } - if directories { + switch level { + case 1: opts.Delimiter = '/' + case fs.MaxLevel: + default: + return fs.ErrorLevelNotSupported } rootLength := len(root) return f.c.ObjectsWalk(container, &opts, func(opts *swift.ObjectsOpts) (interface{}, error) { @@ -275,19 +279,19 @@ func (f *Fs) listContainerRoot(container, root string, directories bool, fn list if err == nil { for i := range objects { object := &objects[i] - // FIXME if there are no directories, swift gives back the files for some reason! - if directories { - if !strings.HasSuffix(object.Name, "/") { - continue + isDirectory := false + if level == 1 { + if strings.HasSuffix(object.Name, "/") { + isDirectory = true + object.Name = object.Name[:len(object.Name)-1] } - object.Name = object.Name[:len(object.Name)-1] } if !strings.HasPrefix(object.Name, root) { fs.Log(f, "Odd name received %q", object.Name) continue } remote := object.Name[rootLength:] - err = fn(remote, object) + err = fn(remote, object, isDirectory) if err != nil { break } @@ -298,86 +302,79 @@ func (f *Fs) listContainerRoot(container, root string, directories bool, fn list } // list the objects into the function supplied -// -// If directories is set it only sends directories -func (f *Fs) list(directories bool, fn listFn) { - err := f.listContainerRoot(f.container, f.root, directories, fn) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't read container %q: %s", f.container, err) - } +func (f *Fs) list(level int, fn listFn) error { + return f.listContainerRoot(f.container, f.root, level, fn) } // listFiles walks the path returning a channel of FsObjects // // if ignoreStorable is set then it outputs the file even if Storable() is false -func (f *Fs) listFiles(ignoreStorable bool) fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) +func (f *Fs) listFiles(out fs.ListOpts, ignoreStorable bool) { + defer out.Finished() if f.container == "" { - // Return no objects at top level list - close(out) - fs.Stats.Error() - fs.ErrorLog(f, "Can't list objects at root - choose a container using lsd") - } else { - // List the objects - go func() { - defer close(out) - f.list(false, func(remote string, object *swift.Object) error { - if o := f.newFsObjectWithInfo(remote, object); o != nil { - // Storable does a full metadata read on 0 size objects which might be dynamic large objects - storable := o.Storable() - if storable || ignoreStorable { - out <- o - } - } - return nil - }) - }() + out.SetError(errors.New("Can't list objects at root - choose a container using lsd")) + return } - return out -} - -// List walks the path returning a channel of FsObjects -func (f *Fs) List() fs.ObjectsChan { - return f.listFiles(false) -} - -// ListDir lists the containers -func (f *Fs) ListDir() fs.DirChan { - out := make(fs.DirChan, fs.Config.Checkers) - if f.container == "" { - // List the containers - go func() { - defer close(out) - containers, err := f.c.ContainersAll(nil) - if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't list containers: %v", err) - } else { - for _, container := range containers { - out <- &fs.Dir{ - Name: container.Name, - Bytes: container.Bytes, - Count: container.Count, + // List the objects + err := f.list(out.Level(), func(remote string, object *swift.Object, isDirectory bool) error { + if isDirectory { + dir := &fs.Dir{ + Name: remote, + Bytes: object.Bytes, + Count: 0, + } + if out.AddDir(dir) { + return fs.ErrorListAborted + } + } else { + if o := f.newFsObjectWithInfo(remote, object); o != nil { + // Storable does a full metadata read on 0 size objects which might be dynamic large objects + storable := o.Storable() + if storable || ignoreStorable { + if out.Add(o) { + return fs.ErrorListAborted } } } - }() - } else { - // List the directories in the path in the container - go func() { - defer close(out) - f.list(true, func(remote string, object *swift.Object) error { - out <- &fs.Dir{ - Name: remote, - Bytes: object.Bytes, - Count: 0, - } - return nil - }) - }() + } + return nil + }) + if err != nil { + if err == swift.ContainerNotFound { + err = fs.ErrorDirNotFound + } + out.SetError(err) } - return out +} + +// listContainers lists the containers +func (f *Fs) listContainers(out fs.ListOpts) { + defer out.Finished() + containers, err := f.c.ContainersAll(nil) + if err != nil { + out.SetError(err) + return + } + for _, container := range containers { + dir := &fs.Dir{ + Name: container.Name, + Bytes: container.Bytes, + Count: container.Count, + } + if out.AddDir(dir) { + break + } + } +} + +// List walks the path returning files and directories to out +func (f *Fs) List(out fs.ListOpts) { + if f.container == "" { + f.listContainers(out) + } else { + f.listFiles(out, false) + } + return } // Put the object into the container @@ -427,7 +424,24 @@ func (f *Fs) Precision() time.Duration { // // Implemented here so we can make sure we delete directory markers func (f *Fs) Purge() error { - fs.DeleteFiles(f.listFiles(true)) + // Delete all the files including the directory markers + toBeDeleted := make(chan fs.Object, fs.Config.Transfers) + var err error + go func() { + err = f.list(fs.MaxLevel, func(remote string, object *swift.Object, isDirectory bool) error { + if !isDirectory { + if o := f.newFsObjectWithInfo(remote, object); o != nil { + toBeDeleted <- o + } + } + return nil + }) + close(toBeDeleted) + }() + fs.DeleteFiles(toBeDeleted) + if err != nil { + return err + } return f.Rmdir() } @@ -611,7 +625,10 @@ func min(x, y int64) int64 { // if except is passed in then segments with that prefix won't be deleted func (o *Object) removeSegments(except string) error { segmentsRoot := o.fs.root + o.remote + "/" - err := o.fs.listContainerRoot(o.fs.segmentsContainer, segmentsRoot, false, func(remote string, object *swift.Object) error { + err := o.fs.listContainerRoot(o.fs.segmentsContainer, segmentsRoot, fs.MaxLevel, func(remote string, object *swift.Object, isDirectory bool) error { + if isDirectory { + return nil + } if except != "" && strings.HasPrefix(remote, except) { // fs.Debug(o, "Ignoring current segment file %q in container %q", segmentsRoot+remote, o.fs.segmentsContainer) return nil diff --git a/yandex/yandex.go b/yandex/yandex.go index c28110899..87e952200 100644 --- a/yandex/yandex.go +++ b/yandex/yandex.go @@ -163,10 +163,44 @@ func (f *Fs) setRoot(root string) { f.diskRoot = diskRoot } +// listFn is called from list and listContainerRoot to handle an object. +type listFn func(remote string, item *yandex.ResourceInfoResponse, isDirectory bool) error + +// listDir lists this directory only returning objects and directories +func (f *Fs) listDir(fn listFn) (err error) { + //request object meta info + var opt yandex.ResourceInfoRequestOptions + ResourceInfoResponse, err := f.yd.NewResourceInfoRequest(f.diskRoot, opt).Exec() + if err != nil { + return err + } + if ResourceInfoResponse.ResourceType == "dir" { + //list all subdirs + for _, element := range ResourceInfoResponse.Embedded.Items { + remote := element.Name + switch element.ResourceType { + case "dir": + err = fn(remote, &element, true) + if err != nil { + return err + } + case "file": + err = fn(remote, &element, false) + if err != nil { + return err + } + default: + fs.Debug(f, "Unknown resource type %q", element.ResourceType) + } + } + } + return nil +} + // list the objects into the function supplied // -// If directories is set it only sends directories -func (f *Fs) list(directories bool, fn func(string, yandex.ResourceInfoResponse)) { +// This does a flat listing of all the files in the drive +func (f *Fs) list(fn listFn) error { //request files list. list is divided into pages. We send request for each page //items per page is limited by limit //TODO may be add config parameter for the items per page limit @@ -182,9 +216,7 @@ func (f *Fs) list(directories bool, fn func(string, yandex.ResourceInfoResponse) //send request info, err := f.yd.NewFlatFileListRequest(opt).Exec() if err != nil { - fs.Stats.Error() - fs.ErrorLog(f, "Couldn't list: %s", err) - return + return err } itemsCount = uint32(len(info.Items)) @@ -194,7 +226,10 @@ func (f *Fs) list(directories bool, fn func(string, yandex.ResourceInfoResponse) if strings.HasPrefix(item.Path, f.diskRoot) { //trim root folder from filename var name = strings.TrimPrefix(item.Path, f.diskRoot) - fn(name, item) + err = fn(name, &item, false) + if err != nil { + return err + } } } @@ -205,21 +240,55 @@ func (f *Fs) list(directories bool, fn func(string, yandex.ResourceInfoResponse) break } } + return nil } // List walks the path returning a channel of FsObjects -func (f *Fs) List() fs.ObjectsChan { - out := make(fs.ObjectsChan, fs.Config.Checkers) - // List the objects - go func() { - defer close(out) - f.list(false, func(remote string, object yandex.ResourceInfoResponse) { - if fs := f.newFsObjectWithInfo(remote, &object); fs != nil { - out <- fs +func (f *Fs) List(out fs.ListOpts) { + defer out.Finished() + + listItem := func(remote string, object *yandex.ResourceInfoResponse, isDirectory bool) error { + if isDirectory { + t, err := time.Parse(time.RFC3339Nano, object.Modified) + if err != nil { + return err } - }) - }() - return out + dir := &fs.Dir{ + Name: remote, + When: t, + Bytes: int64(object.Size), + Count: -1, + } + if out.AddDir(dir) { + return fs.ErrorListAborted + } + } else { + if o := f.newFsObjectWithInfo(remote, object); o != nil { + if out.Add(o) { + return fs.ErrorListAborted + } + } + } + return nil + } + + var err error + switch out.Level() { + case 1: + err = f.listDir(listItem) + case fs.MaxLevel: + err = f.list(listItem) + default: + out.SetError(fs.ErrorLevelNotSupported) + } + + if err != nil { + // FIXME + // if err == swift.ContainerNotFound { + // err = fs.ErrorDirNotFound + // } + out.SetError(err) + } } // NewFsObject returns an Object from a path @@ -242,7 +311,7 @@ func (f *Fs) newFsObjectWithInfo(remote string, info *yandex.ResourceInfoRespons } else { err := o.readMetaData() if err != nil { - fs.ErrorLog(f, "Couldn't get object '%s' metadata: %s", o.remotePath(), err) + fs.Debug(f, "Couldn't get object '%s' metadata: %s", o.remotePath(), err) return nil } } @@ -288,40 +357,6 @@ func (o *Object) readMetaData() (err error) { return nil } -// ListDir walks the path returning a channel of FsObjects -func (f *Fs) ListDir() fs.DirChan { - out := make(fs.DirChan, fs.Config.Checkers) - go func() { - defer close(out) - - //request object meta info - var opt yandex.ResourceInfoRequestOptions - ResourceInfoResponse, err := f.yd.NewResourceInfoRequest(f.diskRoot, opt).Exec() - if err != nil { - return - } - if ResourceInfoResponse.ResourceType == "dir" { - //list all subdirs - for _, element := range ResourceInfoResponse.Embedded.Items { - if element.ResourceType == "dir" { - t, err := time.Parse(time.RFC3339Nano, element.Modified) - if err != nil { - return - } - out <- &fs.Dir{ - Name: element.Name, - When: t, - Bytes: int64(element.Size), - Count: -1, - } - } - } - } - - }() - return out -} - // Put the object // // Copy the reader in to the new object which is returned