From 1569176e481ba13d1ae4bc164fb7a79feff38e1c Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 6 Nov 2015 19:41:57 +0100 Subject: [PATCH] pipe: propagate errors properly --- archiver.go | 35 ++++++++-------- node.go | 3 +- pipe/pipe.go | 60 ++++++++++++--------------- pipe/pipe_test.go | 102 ++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 140 insertions(+), 60 deletions(-) diff --git a/archiver.go b/archiver.go index a26dc3e7f..61d2ea8f2 100644 --- a/archiver.go +++ b/archiver.go @@ -304,6 +304,7 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st } func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, dirCh <-chan pipe.Dir) { + debug.Log("Archiver.dirWorker", "start") defer func() { debug.Log("Archiver.dirWorker", "done") wg.Done() @@ -315,12 +316,13 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str // channel is closed return } - debug.Log("Archiver.dirWorker", "save dir %v\n", dir.Path()) + debug.Log("Archiver.dirWorker", "save dir %v (%d entries), error %v\n", dir.Path(), len(dir.Entries), dir.Error()) tree := NewTree() // wait for all content for _, ch := range dir.Entries { + debug.Log("Archiver.dirWorker", "receiving result from %v", ch) res := <-ch // if we get a nil pointer here, an error has happened while @@ -342,21 +344,20 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str } } - var ( - node *Node - err error - ) - if dir.Path() == "" { - // if this is the top-level dir, only create a stub node - node = &Node{} - } else { - // else create node from path and fi - node, err = NodeFromFileInfo(dir.Path(), dir.Info()) + node := &Node{} + + if dir.Path() != "" && dir.Info() != nil { + n, err := NodeFromFileInfo(dir.Path(), dir.Info()) if err != nil { - node.Error = err.Error() - dir.Result() <- node + n.Error = err.Error() + dir.Result() <- n continue } + node = n + } + + if err := dir.Error(); err != nil { + node.Error = err.Error() } id, err := arch.SaveTreeJSON(tree) @@ -370,6 +371,8 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str node.Subtree = &id + debug.Log("Archiver.dirWorker", "sending result to %v", dir.Result()) + dir.Result() <- node if dir.Path() != "" { p.Report(Stat{Dirs: 1}) @@ -615,11 +618,7 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentID *backend.ID pipeCh := make(chan pipe.Job) resCh := make(chan pipe.Result, 1) go func() { - err := pipe.Walk(paths, arch.SelectFilter, done, pipeCh, resCh) - if err != nil { - debug.Log("Archiver.Snapshot", "pipe.Walk returned error %v", err) - return - } + pipe.Walk(paths, arch.SelectFilter, done, pipeCh, resCh) debug.Log("Archiver.Snapshot", "pipe.Walk done") }() jobs.New = pipeCh diff --git a/node.go b/node.go index eb1ca5f02..4690f1333 100644 --- a/node.go +++ b/node.go @@ -10,12 +10,13 @@ import ( "syscall" "time" + "runtime" + "github.com/juju/errors" "github.com/restic/restic/backend" "github.com/restic/restic/debug" "github.com/restic/restic/pack" "github.com/restic/restic/repository" - "runtime" ) // Node is a file, directory or other item in a backup. diff --git a/pipe/pipe.go b/pipe/pipe.go index 83b3353f6..d911f6087 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -82,32 +82,45 @@ var errCancelled = errors.New("walk cancelled") // dirs). If false is returned, files are ignored and dirs are not even walked. type SelectFunc func(item string, fi os.FileInfo) bool -func walk(basedir, dir string, selectFunc SelectFunc, done chan struct{}, jobs chan<- Job, res chan<- Result) error { +func walk(basedir, dir string, selectFunc SelectFunc, done <-chan struct{}, jobs chan<- Job, res chan<- Result) { + debug.Log("pipe.walk", "start on %v", dir) + + relpath, err := filepath.Rel(basedir, dir) + if err != nil { + panic(err) + } + info, err := os.Lstat(dir) if err != nil { debug.Log("pipe.walk", "error for %v: %v", dir, err) - return err + select { + case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}: + case <-done: + } + return } if !selectFunc(dir, info) { debug.Log("pipe.walk", "file %v excluded by filter", dir) - return nil + return } - relpath, _ := filepath.Rel(basedir, dir) - if !info.IsDir() { select { case jobs <- Entry{info: info, basedir: basedir, path: relpath, result: res}: case <-done: - return errCancelled } - return nil + return } names, err := readDirNames(dir) if err != nil { - return err + debug.Log("pipe.walk", "Readdirnames(%v) returned error: %v", dir, err) + select { + case <-done: + case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}: + } + return } // Insert breakpoint to allow testing behaviour with vanishing files @@ -132,7 +145,7 @@ func walk(basedir, dir string, selectFunc SelectFunc, done chan struct{}, jobs c select { case jobs <- Entry{info: fi, error: statErr, basedir: basedir, path: filepath.Join(relpath, name), result: ch}: case <-done: - return errCancelled + return } continue } @@ -141,32 +154,19 @@ func walk(basedir, dir string, selectFunc SelectFunc, done chan struct{}, jobs c // between walk and open debug.RunHook("pipe.walk2", filepath.Join(relpath, name)) - if isDir(fi) { - err = walk(basedir, subpath, selectFunc, done, jobs, ch) - if err != nil { - return err - } - - } else { - select { - case jobs <- Entry{info: fi, basedir: basedir, path: filepath.Join(relpath, name), result: ch}: - case <-done: - return errCancelled - } - } + walk(basedir, subpath, selectFunc, done, jobs, ch) } select { case jobs <- Dir{basedir: basedir, path: relpath, info: info, Entries: entries, result: res}: case <-done: - return errCancelled } - return nil } // Walk sends a Job for each file and directory it finds below the paths. When // the channel done is closed, processing stops. -func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- Job, res chan<- Result) error { +func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- Job, res chan<- Result) { + debug.Log("pipe.Walk", "start on %v", paths) defer func() { debug.Log("pipe.Walk", "output channel closed") close(jobs) @@ -176,11 +176,7 @@ func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- for _, path := range paths { debug.Log("pipe.Walk", "start walker for %v", path) ch := make(chan Result, 1) - err := walk(filepath.Dir(path), path, selectFunc, done, jobs, ch) - if err != nil { - debug.Log("pipe.Walk", "error for %v: %v", path, err) - continue - } + walk(filepath.Dir(path), path, selectFunc, done, jobs, ch) entries = append(entries, ch) debug.Log("pipe.Walk", "walker for %v done", path) } @@ -188,13 +184,11 @@ func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- debug.Log("pipe.Walk", "sending root node") select { case <-done: - return errCancelled + return case jobs <- Dir{Entries: entries, result: res}: } debug.Log("pipe.Walk", "walker done") - - return nil } // Split feeds all elements read from inChan to dirChan and entChan. diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go index 001015938..95bbf7db8 100644 --- a/pipe/pipe_test.go +++ b/pipe/pipe_test.go @@ -1,6 +1,8 @@ package pipe_test import ( + "fmt" + "io/ioutil" "os" "path/filepath" "sync" @@ -122,8 +124,7 @@ func TestPipelineWalkerWithSplit(t *testing.T) { }() resCh := make(chan pipe.Result, 1) - err = pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) - OK(t, err) + pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) // wait for all workers to terminate wg.Wait() @@ -202,8 +203,7 @@ func TestPipelineWalker(t *testing.T) { } resCh := make(chan pipe.Result, 1) - err = pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) - OK(t, err) + pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) // wait for all workers to terminate wg.Wait() @@ -217,6 +217,94 @@ func TestPipelineWalker(t *testing.T) { Assert(t, before == after, "stats do not match, expected %v, got %v", before, after) } +func createFile(filename, data string) error { + f, err := os.Create(filename) + if err != nil { + return err + } + + defer f.Close() + + _, err = f.Write([]byte(data)) + if err != nil { + return err + } + + return nil +} + +func TestPipeWalkerError(t *testing.T) { + dir, err := ioutil.TempDir("", "restic-test-") + OK(t, err) + + base := filepath.Base(dir) + + var testjobs = []struct { + path []string + err bool + }{ + {[]string{base, "a", "file_a"}, false}, + {[]string{base, "a"}, false}, + {[]string{base, "b"}, true}, + {[]string{base, "c", "file_c"}, false}, + {[]string{base, "c"}, false}, + {[]string{base}, false}, + {[]string{}, false}, + } + + OK(t, os.Mkdir(filepath.Join(dir, "a"), 0755)) + OK(t, os.Mkdir(filepath.Join(dir, "b"), 0755)) + OK(t, os.Mkdir(filepath.Join(dir, "c"), 0755)) + + OK(t, createFile(filepath.Join(dir, "a", "file_a"), "file a")) + OK(t, createFile(filepath.Join(dir, "b", "file_b"), "file b")) + OK(t, createFile(filepath.Join(dir, "c", "file_c"), "file c")) + + OK(t, os.Chmod(filepath.Join(dir, "b"), 0)) + + done := make(chan struct{}) + ch := make(chan pipe.Job) + resCh := make(chan pipe.Result, 1) + + go pipe.Walk([]string{dir}, acceptAll, done, ch, resCh) + + i := 0 + for job := range ch { + if i == len(testjobs) { + t.Errorf("too many jobs received") + break + } + + fmt.Printf("job %+v: %+v\n", job.Path(), job) + + p := filepath.Join(testjobs[i].path...) + if p != job.Path() { + t.Errorf("job %d has wrong path: expected %q, got %q", i, p, job.Path()) + } + + if testjobs[i].err { + if job.Error() == nil { + t.Errorf("job %d expected error but got nil", i) + } + } else { + if job.Error() != nil { + t.Errorf("job %d expected no error but got %v", i, job.Error()) + } + } + + i++ + } + + if i != len(testjobs) { + t.Errorf("expected %d jobs, got %d", len(testjobs), i) + } + + close(done) + + OK(t, os.Chmod(filepath.Join(dir, "b"), 0755)) + OK(t, os.RemoveAll(dir)) +} + func BenchmarkPipelineWalker(b *testing.B) { if TestWalkerPath == "" { b.Skipf("walkerpath not set, skipping BenchPipelineWalker") @@ -302,8 +390,7 @@ func BenchmarkPipelineWalker(b *testing.B) { }() resCh := make(chan pipe.Result, 1) - err := pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) - OK(b, err) + pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) // wait for all workers to terminate wg.Wait() @@ -379,8 +466,7 @@ func TestPipelineWalkerMultiple(t *testing.T) { } resCh := make(chan pipe.Result, 1) - err = pipe.Walk(paths, acceptAll, done, jobs, resCh) - OK(t, err) + pipe.Walk(paths, acceptAll, done, jobs, resCh) // wait for all workers to terminate wg.Wait()