drive: fix missing items when listing using --fast-list / ListR
This is caused by a bug in Google drive where, in some circumstances querying for "(A in parents) or (B in parents)" returns nothing whereas querying for "A in parents" and "B in parents" separately works fine. This has been reported here: https://issuetracker.google.com/issues/149522397 This workaround detects this condition by seeing if a listing for more than one directory at once returns nothing. If it does then it retries each one individually. This can potentially have a false positive if the user has multiple empty directories which are queried at once. The consequence of this will be that ListR is disabled for a while until the directories are found to be actually empty in which case it will be re-enabled. Fixes #3114 and Fixes #4289
This commit is contained in:
parent
e7bd392a69
commit
cbf3d43561
1 changed files with 74 additions and 16 deletions
|
@ -24,6 +24,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
|
@ -68,6 +69,8 @@ const (
|
|||
minChunkSize = 256 * fs.KibiByte
|
||||
defaultChunkSize = 8 * fs.MebiByte
|
||||
partialFields = "id,name,size,md5Checksum,trashed,modifiedTime,createdTime,mimeType,parents,webViewLink,shortcutDetails"
|
||||
listRGrouping = 50 // number of IDs to search at once when using ListR
|
||||
listRInputBuffer = 1000 // size of input buffer when using ListR
|
||||
)
|
||||
|
||||
// Globals
|
||||
|
@ -558,6 +561,9 @@ type Fs struct {
|
|||
isTeamDrive bool // true if this is a team drive
|
||||
fileFields googleapi.Field // fields to fetch file info with
|
||||
m configmap.Mapper
|
||||
grouping int32 // number of IDs to search at once in ListR - read with atomic
|
||||
listRmu *sync.Mutex // protects listRempties
|
||||
listRempties map[string]struct{} // IDs of supposedly empty directories which triggered grouping disable
|
||||
}
|
||||
|
||||
type baseObject struct {
|
||||
|
@ -1084,6 +1090,9 @@ func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) {
|
|||
opt: *opt,
|
||||
pacer: newPacer(opt),
|
||||
m: m,
|
||||
grouping: listRGrouping,
|
||||
listRmu: new(sync.Mutex),
|
||||
listRempties: make(map[string]struct{}),
|
||||
}
|
||||
f.isTeamDrive = opt.TeamDriveID != ""
|
||||
f.fileFields = f.getFileFields()
|
||||
|
@ -1634,15 +1643,17 @@ func (s listRSlices) Less(i, j int) bool {
|
|||
// In each cycle it will read up to grouping entries from the in channel without blocking.
|
||||
// If an error occurs it will be send to the out channel and then return. Once the in channel is closed,
|
||||
// nil is send to the out channel and the function returns.
|
||||
func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan listREntry, out chan<- error, cb func(fs.DirEntry) error, grouping int) {
|
||||
func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listREntry, out chan<- error, cb func(fs.DirEntry) error) {
|
||||
var dirs []string
|
||||
var paths []string
|
||||
var grouping int32
|
||||
|
||||
for dir := range in {
|
||||
dirs = append(dirs[:0], dir.id)
|
||||
paths = append(paths[:0], dir.path)
|
||||
grouping = atomic.LoadInt32(&f.grouping)
|
||||
waitloop:
|
||||
for i := 1; i < grouping; i++ {
|
||||
for i := int32(1); i < grouping; i++ {
|
||||
select {
|
||||
case d, ok := <-in:
|
||||
if !ok {
|
||||
|
@ -1655,6 +1666,7 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
|
|||
}
|
||||
listRSlices{dirs, paths}.Sort()
|
||||
var iErr error
|
||||
foundItems := false
|
||||
_, err := f.list(ctx, dirs, "", false, false, false, func(item *drive.File) bool {
|
||||
// shared with me items have no parents when at the root
|
||||
if f.opt.SharedWithMe && len(item.Parents) == 0 && len(paths) == 1 && paths[0] == "" {
|
||||
|
@ -1662,6 +1674,7 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
|
|||
}
|
||||
for _, parent := range item.Parents {
|
||||
var i int
|
||||
foundItems = true
|
||||
earlyExit := false
|
||||
// If only one item in paths then no need to search for the ID
|
||||
// assuming google drive is doing its job properly.
|
||||
|
@ -1702,6 +1715,53 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
|
|||
}
|
||||
return false
|
||||
})
|
||||
// Found no items in more than one directory. Retry these as
|
||||
// individual directories This is to work around a bug in google
|
||||
// drive where (A in parents) or (B in parents) returns nothing
|
||||
// sometimes. See #3114, #4289 and
|
||||
// https://issuetracker.google.com/issues/149522397
|
||||
if len(dirs) > 1 && !foundItems {
|
||||
if atomic.SwapInt32(&f.grouping, 1) != 1 {
|
||||
fs.Logf(f, "Disabling ListR to work around bug in drive as multi listing (%d) returned no entries", len(dirs))
|
||||
}
|
||||
var recycled = make([]listREntry, len(dirs))
|
||||
f.listRmu.Lock()
|
||||
for i := range dirs {
|
||||
recycled[i] = listREntry{id: dirs[i], path: paths[i]}
|
||||
// Make a note of these dirs - if they all turn
|
||||
// out to be empty then we can re-enable grouping
|
||||
f.listRempties[dirs[i]] = struct{}{}
|
||||
}
|
||||
f.listRmu.Unlock()
|
||||
// recycle these in the background so we don't deadlock
|
||||
// the listR runners if they all get here
|
||||
wg.Add(len(recycled))
|
||||
go func() {
|
||||
for _, entry := range recycled {
|
||||
in <- entry
|
||||
}
|
||||
fs.Debugf(f, "Recycled %d entries", len(recycled))
|
||||
}()
|
||||
}
|
||||
// If using a grouping of 1 and dir was empty then check to see if it
|
||||
// is part of the group that caused grouping to be disabled.
|
||||
if grouping == 1 && len(dirs) == 1 && !foundItems {
|
||||
f.listRmu.Lock()
|
||||
if _, found := f.listRempties[dirs[0]]; found {
|
||||
// Remove the ID
|
||||
delete(f.listRempties, dirs[0])
|
||||
// If no empties left => all the directories that
|
||||
// triggered the grouping being set to 1 were actually
|
||||
// empty so must have made a mistake
|
||||
if len(f.listRempties) == 0 {
|
||||
if atomic.SwapInt32(&f.grouping, listRGrouping) != listRGrouping {
|
||||
fs.Logf(f, "Re-enabling ListR as previous detection was in error")
|
||||
}
|
||||
}
|
||||
}
|
||||
f.listRmu.Unlock()
|
||||
}
|
||||
|
||||
for range dirs {
|
||||
wg.Done()
|
||||
}
|
||||
|
@ -1736,11 +1796,6 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
|
|||
// Don't implement this unless you have a more efficient way
|
||||
// of listing recursively that doing a directory traversal.
|
||||
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
|
||||
const (
|
||||
grouping = 50
|
||||
inputBuffer = 1000
|
||||
)
|
||||
|
||||
err = f.dirCache.FindRoot(ctx, false)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1753,7 +1808,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||
|
||||
mu := sync.Mutex{} // protects in and overflow
|
||||
wg := sync.WaitGroup{}
|
||||
in := make(chan listREntry, inputBuffer)
|
||||
in := make(chan listREntry, listRInputBuffer)
|
||||
out := make(chan error, fs.Config.Checkers)
|
||||
list := walk.NewListRHelper(callback)
|
||||
overflow := []listREntry{}
|
||||
|
@ -1766,6 +1821,9 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||
job := listREntry{actualID(d.ID()), d.Remote()}
|
||||
select {
|
||||
case in <- job:
|
||||
// Adding the wg after we've entered the item is
|
||||
// safe here because we know when the callback
|
||||
// is called we are holding a waitgroup.
|
||||
wg.Add(1)
|
||||
default:
|
||||
overflow = append(overflow, job)
|
||||
|
@ -1779,7 +1837,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||
in <- listREntry{directoryID, dir}
|
||||
|
||||
for i := 0; i < fs.Config.Checkers; i++ {
|
||||
go f.listRRunner(ctx, &wg, in, out, cb, grouping)
|
||||
go f.listRRunner(ctx, &wg, in, out, cb)
|
||||
}
|
||||
go func() {
|
||||
// wait until the all directories are processed
|
||||
|
@ -1789,8 +1847,8 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
|
|||
mu.Lock()
|
||||
l := len(overflow)
|
||||
// only fill half of the channel to prevent entries being put into overflow again
|
||||
if l > inputBuffer/2 {
|
||||
l = inputBuffer / 2
|
||||
if l > listRInputBuffer/2 {
|
||||
l = listRInputBuffer / 2
|
||||
}
|
||||
wg.Add(l)
|
||||
for _, d := range overflow[:l] {
|
||||
|
|
Loading…
Reference in a new issue