march: return errors when listing dirs

Partially fixes #3172
This commit is contained in:
Aleksandar Jankovic 2019-06-20 13:50:25 +02:00 committed by Nick Craig-Wood
parent 1be1fc073e
commit cc0800a72e
5 changed files with 60 additions and 12 deletions

View file

@ -8,6 +8,8 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/pkg/errors"
"github.com/ncw/rclone/fs" "github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fs/filter" "github.com/ncw/rclone/fs/filter"
"github.com/ncw/rclone/fs/list" "github.com/ncw/rclone/fs/list"
@ -110,7 +112,7 @@ type listDirJob struct {
} }
// Run starts the matching process off // Run starts the matching process off
func (m *March) Run() { func (m *March) Run() error {
m.init() m.init()
srcDepth := fs.Config.MaxDepth srcDepth := fs.Config.MaxDepth
@ -122,6 +124,10 @@ func (m *March) Run() {
dstDepth = fs.MaxLevel dstDepth = fs.MaxLevel
} }
var mu sync.Mutex // Protects vars below
var jobError error
var errCount int
// Start some directory listing go routines // Start some directory listing go routines
var wg sync.WaitGroup // sync closing of go routines var wg sync.WaitGroup // sync closing of go routines
var traversing sync.WaitGroup // running directory traversals var traversing sync.WaitGroup // running directory traversals
@ -138,7 +144,16 @@ func (m *March) Run() {
if !ok { if !ok {
return return
} }
jobs := m.processJob(job) jobs, err := m.processJob(job)
if err != nil {
mu.Lock()
// Keep reference only to the first encountered error
if jobError == nil {
jobError = err
}
errCount++
mu.Unlock()
}
if len(jobs) > 0 { if len(jobs) > 0 {
traversing.Add(len(jobs)) traversing.Add(len(jobs))
go func() { go func() {
@ -178,6 +193,11 @@ func (m *March) Run() {
traversing.Wait() traversing.Wait()
close(in) close(in)
wg.Wait() wg.Wait()
if errCount > 1 {
return errors.Wrapf(jobError, "march failed with %d error(s): first error", errCount)
}
return jobError
} }
// Check to see if the context has been cancelled // Check to see if the context has been cancelled
@ -339,8 +359,9 @@ func matchListings(srcListEntries, dstListEntries fs.DirEntries, transforms []ma
// more jobs // more jobs
// //
// returns errors using processError // returns errors using processError
func (m *March) processJob(job listDirJob) (jobs []listDirJob) { func (m *March) processJob(job listDirJob) ([]listDirJob, error) {
var ( var (
jobs []listDirJob
srcList, dstList fs.DirEntries srcList, dstList fs.DirEntries
srcListErr, dstListErr error srcListErr, dstListErr error
wg sync.WaitGroup wg sync.WaitGroup
@ -367,14 +388,14 @@ func (m *March) processJob(job listDirJob) (jobs []listDirJob) {
if srcListErr != nil { if srcListErr != nil {
fs.Errorf(job.srcRemote, "error reading source directory: %v", srcListErr) fs.Errorf(job.srcRemote, "error reading source directory: %v", srcListErr)
fs.CountError(srcListErr) fs.CountError(srcListErr)
return nil return nil, srcListErr
} }
if dstListErr == fs.ErrorDirNotFound { if dstListErr == fs.ErrorDirNotFound {
// Copy the stuff anyway // Copy the stuff anyway
} else if dstListErr != nil { } else if dstListErr != nil {
fs.Errorf(job.dstRemote, "error reading destination directory: %v", dstListErr) fs.Errorf(job.dstRemote, "error reading destination directory: %v", dstListErr)
fs.CountError(dstListErr) fs.CountError(dstListErr)
return nil return nil, dstListErr
} }
// If NoTraverse is set, then try to find a matching object // If NoTraverse is set, then try to find a matching object
@ -395,7 +416,7 @@ func (m *March) processJob(job listDirJob) (jobs []listDirJob) {
srcOnly, dstOnly, matches := matchListings(srcList, dstList, m.transforms) srcOnly, dstOnly, matches := matchListings(srcList, dstList, m.transforms)
for _, src := range srcOnly { for _, src := range srcOnly {
if m.aborting() { if m.aborting() {
return nil return nil, m.Ctx.Err()
} }
recurse := m.Callback.SrcOnly(src) recurse := m.Callback.SrcOnly(src)
if recurse && job.srcDepth > 0 { if recurse && job.srcDepth > 0 {
@ -409,7 +430,7 @@ func (m *March) processJob(job listDirJob) (jobs []listDirJob) {
} }
for _, dst := range dstOnly { for _, dst := range dstOnly {
if m.aborting() { if m.aborting() {
return nil return nil, m.Ctx.Err()
} }
recurse := m.Callback.DstOnly(dst) recurse := m.Callback.DstOnly(dst)
if recurse && job.dstDepth > 0 { if recurse && job.dstDepth > 0 {
@ -422,7 +443,7 @@ func (m *March) processJob(job listDirJob) (jobs []listDirJob) {
} }
for _, match := range matches { for _, match := range matches {
if m.aborting() { if m.aborting() {
return nil return nil, m.Ctx.Err()
} }
recurse := m.Callback.Match(m.Ctx, match.dst, match.src) recurse := m.Callback.Match(m.Ctx, match.dst, match.src)
if recurse && job.srcDepth > 0 && job.dstDepth > 0 { if recurse && job.srcDepth > 0 && job.dstDepth > 0 {
@ -434,5 +455,5 @@ func (m *March) processJob(job listDirJob) (jobs []listDirJob) {
}) })
} }
} }
return jobs return jobs, nil
} }

View file

@ -788,7 +788,7 @@ func CheckFn(ctx context.Context, fdst, fsrc fs.Fs, check checkFn, oneway bool)
Callback: c, Callback: c,
} }
fs.Infof(fdst, "Waiting for checks to finish") fs.Infof(fdst, "Waiting for checks to finish")
m.Run() err := m.Run()
if c.dstFilesMissing > 0 { if c.dstFilesMissing > 0 {
fs.Logf(fdst, "%d files missing", c.dstFilesMissing) fs.Logf(fdst, "%d files missing", c.dstFilesMissing)
@ -807,7 +807,7 @@ func CheckFn(ctx context.Context, fdst, fsrc fs.Fs, check checkFn, oneway bool)
if c.differences > 0 { if c.differences > 0 {
return errors.Errorf("%d differences found", c.differences) return errors.Errorf("%d differences found", c.differences)
} }
return nil return err
} }
// Check the files in fsrc and fdst according to Size and hash // Check the files in fsrc and fdst according to Size and hash

View file

@ -378,6 +378,19 @@ func TestCheck(t *testing.T) {
testCheck(t, operations.Check) testCheck(t, operations.Check)
} }
func TestCheckFsError(t *testing.T) {
dstFs, err := fs.NewFs("non-existent")
if err != nil {
t.Fatal(err)
}
srcFs, err := fs.NewFs("non-existent")
if err != nil {
t.Fatal(err)
}
err = operations.Check(context.Background(), dstFs, srcFs, false)
require.Error(t, err)
}
func TestCheckDownload(t *testing.T) { func TestCheckDownload(t *testing.T) {
testCheck(t, operations.CheckDownload) testCheck(t, operations.CheckDownload)
} }

View file

@ -649,7 +649,7 @@ func (s *syncCopyMove) run() error {
Callback: s, Callback: s,
DstIncludeAll: filter.Active.Opt.DeleteExcluded, DstIncludeAll: filter.Active.Opt.DeleteExcluded,
} }
m.Run() s.processError(m.Run())
s.stopTrackRenames() s.stopTrackRenames()
if s.trackRenames { if s.trackRenames {

View file

@ -64,6 +64,20 @@ func TestCopy(t *testing.T) {
fstest.CheckItems(t, r.Fremote, file1) fstest.CheckItems(t, r.Fremote, file1)
} }
func TestCopyMissingDirectory(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
r.Mkdir(context.Background(), r.Fremote)
nonExistingFs, err := fs.NewFs("/non-existing")
if err != nil {
t.Fatal(err)
}
err = CopyDir(context.Background(), r.Fremote, nonExistingFs, false)
require.Error(t, err)
}
// Now with --no-traverse // Now with --no-traverse
func TestCopyNoTraverse(t *testing.T) { func TestCopyNoTraverse(t *testing.T) {
r := fstest.NewRun(t) r := fstest.NewRun(t)