Refactor the List and ListDir interface

Gives more accurate error propagation, control of depth of recursion
and short circuit recursion where possible.

Most of the the heavy lifting is done in the "fs" package, making file
system implementations a bit simpler.

This commit contains some code originally by Klaus Post.

Fixes #316
This commit is contained in:
Nick Craig-Wood 2016-04-21 20:06:21 +01:00
parent 3bdad260b0
commit 753b0717be
21 changed files with 1512 additions and 996 deletions

View file

@ -456,10 +456,23 @@ func DeleteFiles(toBeDeleted ObjectsChan) {
// Read a map of Object.Remote to Object for the given Fs.
// If includeAll is specified all files will be added,
// otherwise only files passing the filter will be added.
func readFilesMap(fs Fs, includeAll bool) map[string]Object {
files := make(map[string]Object)
func readFilesMap(fs Fs, includeAll bool) (files map[string]Object, err error) {
files = make(map[string]Object)
normalised := make(map[string]struct{})
for o := range fs.List() {
list := NewLister()
if !includeAll {
list.SetFilter(Config.Filter)
}
list.Start(fs)
for {
o, err := list.GetObject()
if err != nil {
return files, err
}
// Check if we are finished
if o == nil {
break
}
remote := o.Remote()
normalisedRemote := strings.ToLower(norm.NFC.String(remote))
if _, ok := files[remote]; !ok {
@ -477,7 +490,39 @@ func readFilesMap(fs Fs, includeAll bool) map[string]Object {
}
normalised[normalisedRemote] = struct{}{}
}
return files
return files, nil
}
// readFilesMaps runs readFilesMap on fdst and fsrc at the same time
func readFilesMaps(fdst Fs, fdstIncludeAll bool, fsrc Fs, fsrcIncludeAll bool) (dstFiles, srcFiles map[string]Object, err error) {
var wg sync.WaitGroup
var srcErr, dstErr error
list := func(fs Fs, includeAll bool, pMap *map[string]Object, pErr *error) {
defer wg.Done()
Log(fs, "Building file list")
dstFiles, listErr := readFilesMap(fs, includeAll)
if listErr != nil {
ErrorLog(fs, "Error building file list: %v", listErr)
*pErr = listErr
} else {
Debug(fs, "Done building file list")
*pMap = dstFiles
}
}
wg.Add(2)
go list(fdst, fdstIncludeAll, &dstFiles, &srcErr)
go list(fsrc, fsrcIncludeAll, &srcFiles, &dstErr)
wg.Wait()
if srcErr != nil {
err = srcErr
}
if dstErr != nil {
err = dstErr
}
return dstFiles, srcFiles, err
}
// Same returns true if fdst and fsrc point to the same underlying Fs
@ -501,31 +546,11 @@ func syncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) error {
return err
}
Log(fdst, "Building file list")
// Read the files of both source and destination
var listWg sync.WaitGroup
listWg.Add(2)
var dstFiles map[string]Object
var srcFiles map[string]Object
var srcObjects = make(ObjectsChan, Config.Transfers)
// Read dst files including excluded files if DeleteExcluded is set
go func() {
dstFiles = readFilesMap(fdst, Config.Filter.DeleteExcluded)
listWg.Done()
}()
// Read src file not including excluded files
go func() {
srcFiles = readFilesMap(fsrc, false)
listWg.Done()
for _, v := range srcFiles {
srcObjects <- v
}
close(srcObjects)
}()
// Read the files of both source and destination in parallel
dstFiles, srcFiles, err := readFilesMaps(fdst, Config.Filter.DeleteExcluded, fsrc, false)
if err != nil {
return err
}
startDeletion := make(chan struct{}, 0)
@ -564,9 +589,6 @@ func syncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) error {
DeleteFiles(toDelete)
}()
// Wait for all files to be read
listWg.Wait()
// Start deleting, unless we must delete after transfer
if Delete && !Config.DeleteAfter {
close(startDeletion)
@ -598,18 +620,15 @@ func syncCopyMove(fdst, fsrc Fs, Delete bool, DoMove bool) error {
}
}
go func() {
for src := range srcObjects {
remote := src.Remote()
if dst, dstFound := dstFiles[remote]; dstFound {
toBeChecked <- ObjectPair{src, dst}
} else {
// No need to check since doesn't exist
toBeUploaded <- ObjectPair{src, nil}
}
for remote, src := range srcFiles {
if dst, dstFound := dstFiles[remote]; dstFound {
toBeChecked <- ObjectPair{src, dst}
} else {
// No need to check since doesn't exist
toBeUploaded <- ObjectPair{src, nil}
}
close(toBeChecked)
}()
}
close(toBeChecked)
Log(fdst, "Waiting for checks to finish")
checkerWg.Wait()
@ -713,30 +732,11 @@ func checkIdentical(dst, src Object) bool {
// Check the files in fsrc and fdst according to Size and hash
func Check(fdst, fsrc Fs) error {
dstFiles, srcFiles, err := readFilesMaps(fdst, false, fsrc, false)
if err != nil {
return err
}
differences := int32(0)
var (
wg sync.WaitGroup
dstFiles, srcFiles map[string]Object
)
wg.Add(2)
go func() {
defer wg.Done()
// Read the destination files
Log(fdst, "Building file list")
dstFiles = readFilesMap(fdst, false)
Debug(fdst, "Done building file list")
}()
go func() {
defer wg.Done()
// Read the source files
Log(fsrc, "Building file list")
srcFiles = readFilesMap(fsrc, false)
Debug(fdst, "Done building file list")
}()
wg.Wait()
// FIXME could do this as it goes along and make it use less
// memory.
@ -800,13 +800,21 @@ func Check(fdst, fsrc Fs) error {
//
// Lists in parallel which may get them out of order
func ListFn(f Fs, fn func(Object)) error {
in := f.List()
list := NewLister().SetFilter(Config.Filter).Start(f)
var wg sync.WaitGroup
wg.Add(Config.Checkers)
for i := 0; i < Config.Checkers; i++ {
go func() {
defer wg.Done()
for o := range in {
for {
o, err := list.GetObject()
if err != nil {
log.Fatal(err)
}
// check if we are finished
if o == nil {
return
}
if Config.Filter.IncludeObject(o) {
fn(o)
}
@ -901,7 +909,15 @@ func Count(f Fs) (objects int64, size int64, err error) {
// ListDir lists the directories/buckets/containers in the Fs to the supplied writer
func ListDir(f Fs, w io.Writer) error {
for dir := range f.ListDir() {
list := NewLister().SetLevel(1).Start(f)
for {
dir, err := list.GetDir()
if err != nil {
log.Fatal(err)
}
if dir == nil {
break
}
syncFprintf(w, "%12d %13s %9d %s\n", dir.Bytes, dir.When.Format("2006-01-02 15:04:05"), dir.Count, dir.Name)
}
return nil
@ -960,7 +976,8 @@ func Purge(f Fs) error {
}
if doFallbackPurge {
// DeleteFiles and Rmdir observe --dry-run
DeleteFiles(f.List())
list := NewLister().Start(f)
DeleteFiles(listToChan(list))
err = Rmdir(f)
}
if err != nil {
@ -1115,7 +1132,16 @@ func (mode DeduplicateMode) String() string {
func Deduplicate(f Fs, mode DeduplicateMode) error {
Log(f, "Looking for duplicates using %v mode.", mode)
files := map[string][]Object{}
for o := range f.List() {
list := NewLister().Start(f)
for {
o, err := list.GetObject()
if err != nil {
return err
}
// Check if we are finished
if o == nil {
break
}
remote := o.Remote()
files[remote] = append(files[remote], o)
}
@ -1149,3 +1175,34 @@ func Deduplicate(f Fs, mode DeduplicateMode) error {
}
return nil
}
// listToChan will transfer all incoming objects to a new channel.
//
// If an error occurs, the error will be logged, and it will close the
// channel.
//
// If the error was ErrorDirNotFound then it will be ignored
func listToChan(list *Lister) ObjectsChan {
o := make(ObjectsChan, Config.Checkers)
go func() {
defer close(o)
for {
obj, dir, err := list.Get()
if err != nil {
if err != ErrorDirNotFound {
Stats.Error()
ErrorLog(nil, "Failed to list: %v", err)
}
return
}
if dir == nil && obj == nil {
return
}
if o == nil {
continue
}
o <- obj
}
}()
return o
}