forked from TrueCloudLab/rclone
parent
f8c2689e77
commit
f3874707ee
1 changed files with 73 additions and 28 deletions
|
@ -21,6 +21,7 @@ import (
|
|||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -1339,17 +1340,46 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) {
|
|||
return entries, nil
|
||||
}
|
||||
|
||||
// listREntry is a task to be executed by a litRRunner
|
||||
type listREntry struct {
|
||||
id, path string
|
||||
}
|
||||
|
||||
// listRSlices is a helper struct to sort two slices at once
|
||||
type listRSlices struct {
|
||||
dirs []string
|
||||
paths []string
|
||||
}
|
||||
|
||||
func (s listRSlices) Sort() {
|
||||
sort.Sort(s)
|
||||
}
|
||||
|
||||
func (s listRSlices) Len() int {
|
||||
return len(s.dirs)
|
||||
}
|
||||
|
||||
func (s listRSlices) Swap(i, j int) {
|
||||
s.dirs[i], s.dirs[j] = s.dirs[j], s.dirs[i]
|
||||
s.paths[i], s.paths[j] = s.paths[j], s.paths[i]
|
||||
}
|
||||
|
||||
func (s listRSlices) Less(i, j int) bool {
|
||||
return s.dirs[i] < s.dirs[j]
|
||||
}
|
||||
|
||||
// listRRunner will read dirIDs from the in channel, perform the file listing an call cb with each DirEntry.
|
||||
//
|
||||
// In each cycle, will wait up to 10ms to read up to grouping entries from the in channel.
|
||||
// In each cycle it will read up to grouping entries from the in channel without blocking.
|
||||
// If an error occurs it will be send to the out channel and then return. Once the in channel is closed,
|
||||
// nil is send to the out channel and the function returns.
|
||||
func (f *Fs) listRRunner(wg *sync.WaitGroup, in <-chan string, out chan<- error, cb func(fs.DirEntry) error, grouping int) {
|
||||
func (f *Fs) listRRunner(wg *sync.WaitGroup, in <-chan listREntry, out chan<- error, cb func(fs.DirEntry) error, grouping int) {
|
||||
var dirs []string
|
||||
var paths []string
|
||||
|
||||
for dir := range in {
|
||||
dirs = append(dirs[:0], dir)
|
||||
wait := time.After(10 * time.Millisecond)
|
||||
dirs = append(dirs[:0], dir.id)
|
||||
paths = append(paths[:0], dir.path)
|
||||
waitloop:
|
||||
for i := 1; i < grouping; i++ {
|
||||
select {
|
||||
|
@ -1357,31 +1387,32 @@ func (f *Fs) listRRunner(wg *sync.WaitGroup, in <-chan string, out chan<- error,
|
|||
if !ok {
|
||||
break waitloop
|
||||
}
|
||||
dirs = append(dirs, d)
|
||||
case <-wait:
|
||||
break waitloop
|
||||
dirs = append(dirs, d.id)
|
||||
paths = append(paths, d.path)
|
||||
default:
|
||||
}
|
||||
}
|
||||
listRSlices{dirs, paths}.Sort()
|
||||
var iErr error
|
||||
_, err := f.list(dirs, "", false, false, false, func(item *drive.File) bool {
|
||||
parentPath := ""
|
||||
if len(item.Parents) > 0 {
|
||||
p, ok := f.dirCache.GetInv(item.Parents[0])
|
||||
if ok {
|
||||
parentPath = p
|
||||
for _, parent := range item.Parents {
|
||||
// only handle parents that are in the requested dirs list
|
||||
i := sort.SearchStrings(dirs, parent)
|
||||
if i == len(dirs) || dirs[i] != parent {
|
||||
continue
|
||||
}
|
||||
remote := path.Join(paths[i], item.Name)
|
||||
entry, err := f.itemToDirEntry(remote, item)
|
||||
if err != nil {
|
||||
iErr = err
|
||||
return true
|
||||
}
|
||||
}
|
||||
remote := path.Join(parentPath, item.Name)
|
||||
entry, err := f.itemToDirEntry(remote, item)
|
||||
if err != nil {
|
||||
iErr = err
|
||||
return true
|
||||
}
|
||||
|
||||
err = cb(entry)
|
||||
if err != nil {
|
||||
iErr = err
|
||||
return true
|
||||
err = cb(entry)
|
||||
if err != nil {
|
||||
iErr = err
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
@ -1432,30 +1463,44 @@ func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if directoryID == "root" {
|
||||
var info *drive.File
|
||||
err = f.pacer.CallNoRetry(func() (bool, error) {
|
||||
info, err = f.svc.Files.Get("root").
|
||||
Fields("id").
|
||||
SupportsTeamDrives(f.isTeamDrive).
|
||||
Do()
|
||||
return shouldRetry(err)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
directoryID = info.Id
|
||||
}
|
||||
|
||||
mu := sync.Mutex{} // protects in and overflow
|
||||
wg := sync.WaitGroup{}
|
||||
in := make(chan string, inputBuffer)
|
||||
in := make(chan listREntry, inputBuffer)
|
||||
out := make(chan error, fs.Config.Checkers)
|
||||
list := walk.NewListRHelper(callback)
|
||||
overfflow := []string{}
|
||||
overfflow := []listREntry{}
|
||||
|
||||
cb := func(entry fs.DirEntry) error {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if d, isDir := entry.(*fs.Dir); isDir && in != nil {
|
||||
select {
|
||||
case in <- d.ID():
|
||||
case in <- listREntry{d.ID(), d.Remote()}:
|
||||
wg.Add(1)
|
||||
default:
|
||||
overfflow = append(overfflow, d.ID())
|
||||
overfflow = append(overfflow, listREntry{d.ID(), d.Remote()})
|
||||
}
|
||||
}
|
||||
return list.Add(entry)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
in <- directoryID
|
||||
in <- listREntry{directoryID, dir}
|
||||
|
||||
for i := 0; i < fs.Config.Checkers; i++ {
|
||||
go f.listRRunner(&wg, in, out, cb, grouping)
|
||||
|
|
Loading…
Reference in a new issue