drive: fix "panic: send on closed channel" when recycling dir entries

In this commit:

cbf3d43561 drive: fix missing items when listing using --fast-list / ListR

We introduced a bug where under specific circumstances it could cause
a "panic: send on closed channel".

This was caused by:

- rclone engaging the workaround from the commit above
- one of the listing routines returning an error
- this caused the `in` channel to be closed to stop the readers
- however the workaround was recycling stuff into the `in` channel at the time
- hence the panic on closed channel

This fix factors out the sending to the `in` channel into `sendJob`
and calls this both from the master go routine and the list
runners. `sendJob` detects the `in` channel being closed properly and
also deals correctly with contention on the `in` channel.

Fixes #4511
This commit is contained in:
Nick Craig-Wood 2020-08-20 11:47:27 +01:00
parent 7d62d1fc97
commit 068cfdaa00

View file

@ -1643,7 +1643,7 @@ func (s listRSlices) Less(i, j int) bool {
// In each cycle it will read up to grouping entries from the in channel without blocking. // In each cycle it will read up to grouping entries from the in channel without blocking.
// If an error occurs it will be send to the out channel and then return. Once the in channel is closed, // If an error occurs it will be send to the out channel and then return. Once the in channel is closed,
// nil is send to the out channel and the function returns. // nil is send to the out channel and the function returns.
func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listREntry, out chan<- error, cb func(fs.DirEntry) error) { func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listREntry, out chan<- error, cb func(fs.DirEntry) error, sendJob func(listREntry)) {
var dirs []string var dirs []string
var paths []string var paths []string
var grouping int32 var grouping int32
@ -1724,24 +1724,17 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listRE
if atomic.SwapInt32(&f.grouping, 1) != 1 { if atomic.SwapInt32(&f.grouping, 1) != 1 {
fs.Debugf(f, "Disabling ListR to work around bug in drive as multi listing (%d) returned no entries", len(dirs)) fs.Debugf(f, "Disabling ListR to work around bug in drive as multi listing (%d) returned no entries", len(dirs))
} }
var recycled = make([]listREntry, len(dirs))
f.listRmu.Lock() f.listRmu.Lock()
for i := range dirs { for i := range dirs {
recycled[i] = listREntry{id: dirs[i], path: paths[i]} // Requeue the jobs
job := listREntry{id: dirs[i], path: paths[i]}
sendJob(job)
// Make a note of these dirs - if they all turn // Make a note of these dirs - if they all turn
// out to be empty then we can re-enable grouping // out to be empty then we can re-enable grouping
f.listRempties[dirs[i]] = struct{}{} f.listRempties[dirs[i]] = struct{}{}
} }
f.listRmu.Unlock() f.listRmu.Unlock()
// recycle these in the background so we don't deadlock fs.Debugf(f, "Recycled %d entries", len(dirs))
// the listR runners if they all get here
wg.Add(len(recycled))
go func() {
for _, entry := range recycled {
in <- entry
}
fs.Debugf(f, "Recycled %d entries", len(recycled))
}()
} }
// If using a grouping of 1 and dir was empty then check to see if it // If using a grouping of 1 and dir was empty then check to see if it
// is part of the group that caused grouping to be disabled. // is part of the group that caused grouping to be disabled.
@ -1810,21 +1803,33 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
overflow := []listREntry{} overflow := []listREntry{}
listed := 0 listed := 0
cb := func(entry fs.DirEntry) error { // Send a job to the input channel if not closed. If the job
// won't fit then queue it in the overflow slice.
//
// This will not block if the channel is full.
sendJob := func(job listREntry) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if d, isDir := entry.(*fs.Dir); isDir && in != nil { if in == nil {
job := listREntry{actualID(d.ID()), d.Remote()} return
select {
case in <- job:
// Adding the wg after we've entered the item is
// safe here because we know when the callback
// is called we are holding a waitgroup.
wg.Add(1)
default:
overflow = append(overflow, job)
}
} }
wg.Add(1)
select {
case in <- job:
default:
overflow = append(overflow, job)
wg.Add(-1)
}
}
// Send the entry to the caller, queueing any directories as new jobs
cb := func(entry fs.DirEntry) error {
if d, isDir := entry.(*fs.Dir); isDir {
job := listREntry{actualID(d.ID()), d.Remote()}
sendJob(job)
}
mu.Lock()
defer mu.Unlock()
listed++ listed++
return list.Add(entry) return list.Add(entry)
} }
@ -1833,7 +1838,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
in <- listREntry{directoryID, dir} in <- listREntry{directoryID, dir}
for i := 0; i < fs.Config.Checkers; i++ { for i := 0; i < fs.Config.Checkers; i++ {
go f.listRRunner(ctx, &wg, in, out, cb) go f.listRRunner(ctx, &wg, in, out, cb, sendJob)
} }
go func() { go func() {
// wait until the all directories are processed // wait until the all directories are processed