forked from TrueCloudLab/rclone
drive: implement ListR
This commit is contained in:
parent
3c2ffa7f57
commit
dc5a734522
1 changed files with 259 additions and 49 deletions
|
@ -8,6 +8,7 @@ package drive
|
|||
// * files with / in name
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -29,6 +30,7 @@ import (
|
|||
"github.com/ncw/rclone/fs/fserrors"
|
||||
"github.com/ncw/rclone/fs/fshttp"
|
||||
"github.com/ncw/rclone/fs/hash"
|
||||
"github.com/ncw/rclone/fs/walk"
|
||||
"github.com/ncw/rclone/lib/dircache"
|
||||
"github.com/ncw/rclone/lib/oauthutil"
|
||||
"github.com/ncw/rclone/lib/pacer"
|
||||
|
@ -88,7 +90,7 @@ var (
|
|||
"text/tab-separated-values": "tsv",
|
||||
}
|
||||
extensionToMimeType map[string]string
|
||||
partialFields = "id,name,size,md5Checksum,trashed,modifiedTime,createdTime,mimeType"
|
||||
partialFields = "id,name,size,md5Checksum,trashed,modifiedTime,createdTime,mimeType,parents"
|
||||
exportFormatsOnce sync.Once // make sure we fetch the export formats only once
|
||||
_exportFormats map[string][]string // allowed export mime-type conversions
|
||||
)
|
||||
|
@ -373,7 +375,7 @@ func containsString(slice []string, s string) bool {
|
|||
// If the user fn ever returns true then it early exits with found = true
|
||||
//
|
||||
// Search params: https://developers.google.com/drive/search-parameters
|
||||
func (f *Fs) list(dirID string, title string, directoriesOnly bool, filesOnly bool, includeAll bool, fn listFn) (found bool, err error) {
|
||||
func (f *Fs) list(dirIDs []string, title string, directoriesOnly bool, filesOnly bool, includeAll bool, fn listFn) (found bool, err error) {
|
||||
var query []string
|
||||
if !includeAll {
|
||||
q := "trashed=" + strconv.FormatBool(f.opt.TrashedOnly)
|
||||
|
@ -385,11 +387,23 @@ func (f *Fs) list(dirID string, title string, directoriesOnly bool, filesOnly bo
|
|||
// Search with sharedWithMe will always return things listed in "Shared With Me" (without any parents)
|
||||
// We must not filter with parent when we try list "ROOT" with drive-shared-with-me
|
||||
// If we need to list file inside those shared folders, we must search it without sharedWithMe
|
||||
if f.opt.SharedWithMe && dirID == f.rootFolderID {
|
||||
query = append(query, "sharedWithMe=true")
|
||||
parentsQuery := bytes.NewBufferString("(")
|
||||
for _, dirID := range dirIDs {
|
||||
if dirID == "" {
|
||||
continue
|
||||
}
|
||||
if parentsQuery.Len() > 1 {
|
||||
_, _ = parentsQuery.WriteString(" or ")
|
||||
}
|
||||
if f.opt.SharedWithMe && dirID == f.rootFolderID {
|
||||
_, _ = parentsQuery.WriteString("sharedWithMe=true")
|
||||
} else {
|
||||
_, _ = fmt.Fprintf(parentsQuery, "'%s' in parents", dirID)
|
||||
}
|
||||
}
|
||||
if dirID != "" && !(f.opt.SharedWithMe && dirID == f.rootFolderID) {
|
||||
query = append(query, fmt.Sprintf("'%s' in parents", dirID))
|
||||
if parentsQuery.Len() > 1 {
|
||||
_ = parentsQuery.WriteByte(')')
|
||||
query = append(query, parentsQuery.String())
|
||||
}
|
||||
stem := ""
|
||||
if title != "" {
|
||||
|
@ -736,7 +750,7 @@ func (f *Fs) NewObject(remote string) (fs.Object, error) {
|
|||
// FindLeaf finds a directory of name leaf in the folder with ID pathID
|
||||
func (f *Fs) FindLeaf(pathID, leaf string) (pathIDOut string, found bool, err error) {
|
||||
// Find the leaf in pathID
|
||||
found, err = f.list(pathID, leaf, true, false, false, func(item *drive.File) bool {
|
||||
found, err = f.list([]string{pathID}, leaf, true, false, false, func(item *drive.File) bool {
|
||||
if item.Name == leaf {
|
||||
pathIDOut = item.Id
|
||||
return true
|
||||
|
@ -848,45 +862,13 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) {
|
|||
}
|
||||
|
||||
var iErr error
|
||||
_, err = f.list(directoryID, "", false, false, false, func(item *drive.File) bool {
|
||||
remote := path.Join(dir, item.Name)
|
||||
switch {
|
||||
case item.MimeType == driveFolderType:
|
||||
// cache the directory ID for later lookups
|
||||
f.dirCache.Put(remote, item.Id)
|
||||
when, _ := time.Parse(timeFormatIn, item.ModifiedTime)
|
||||
d := fs.NewDir(remote, when).SetID(item.Id)
|
||||
entries = append(entries, d)
|
||||
case f.opt.AuthOwnerOnly && !isAuthOwned(item):
|
||||
// ignore object
|
||||
case item.Md5Checksum != "" || item.Size > 0:
|
||||
// If item has MD5 sum or a length it is a file stored on drive
|
||||
o, err := f.newObjectWithInfo(remote, item)
|
||||
if err != nil {
|
||||
iErr = err
|
||||
return true
|
||||
}
|
||||
entries = append(entries, o)
|
||||
case f.opt.SkipGdocs:
|
||||
fs.Debugf(remote, "Skipping google document type %q", item.MimeType)
|
||||
default:
|
||||
// If item MimeType is in the ExportFormats then it is a google doc
|
||||
extension, _, exportMimeType, isDocument := f.findExportFormat(item)
|
||||
if !isDocument {
|
||||
fs.Debugf(remote, "Ignoring unknown document type %q", item.MimeType)
|
||||
break
|
||||
}
|
||||
if extension == "" {
|
||||
fs.Debugf(remote, "No export formats found for %q", item.MimeType)
|
||||
break
|
||||
}
|
||||
obj, err := f.newObjectWithInfo(remote+"."+extension, item)
|
||||
if err != nil {
|
||||
iErr = err
|
||||
return true
|
||||
}
|
||||
obj.(*Object).setGdocsMetaData(item, extension, exportMimeType)
|
||||
entries = append(entries, obj)
|
||||
_, err = f.list([]string{directoryID}, "", false, false, false, func(item *drive.File) bool {
|
||||
entry, err := f.itemToDirEntry(path.Join(dir, item.Name), item)
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
if entry != nil {
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
@ -899,6 +881,233 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) {
|
|||
return entries, nil
|
||||
}
|
||||
|
||||
// listRRunner will read dirIDs from the in channel, perform the file listing an call cb with each DirEntry.
|
||||
//
|
||||
// In each cycle, will wait up to 10ms to read up to grouping entries from the in channel.
|
||||
// If an error occurs it will be send to the out channel and then return. Once the in channel is closed,
|
||||
// nil is send to the out channel and the function returns.
|
||||
func (f *Fs) listRRunner(wg *sync.WaitGroup, in <-chan string, out chan<- error, cb func(fs.DirEntry) error, grouping int) {
|
||||
var dirs []string
|
||||
|
||||
for dir := range in {
|
||||
dirs = append(dirs[:0], dir)
|
||||
wait := time.After(10 * time.Millisecond)
|
||||
waitloop:
|
||||
for i := 1; i < grouping; i++ {
|
||||
select {
|
||||
case d, ok := <-in:
|
||||
if !ok {
|
||||
break waitloop
|
||||
}
|
||||
dirs = append(dirs, d)
|
||||
case <-wait:
|
||||
break waitloop
|
||||
}
|
||||
}
|
||||
var iErr error
|
||||
_, err := f.list(dirs, "", false, false, false, func(item *drive.File) bool {
|
||||
parentPath := ""
|
||||
if len(item.Parents) > 0 {
|
||||
p, ok := f.dirCache.GetInv(item.Parents[0])
|
||||
if ok {
|
||||
parentPath = p
|
||||
}
|
||||
}
|
||||
remote := path.Join(parentPath, item.Name)
|
||||
entry, err := f.itemToDirEntry(remote, item)
|
||||
if err != nil {
|
||||
iErr = err
|
||||
return true
|
||||
}
|
||||
|
||||
err = cb(entry)
|
||||
if err != nil {
|
||||
iErr = err
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
for range dirs {
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
if iErr != nil {
|
||||
out <- iErr
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
out <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
out <- nil
|
||||
}
|
||||
|
||||
// ListR lists the objects and directories of the Fs starting
|
||||
// from dir recursively into out.
|
||||
//
|
||||
// dir should be "" to start from the root, and should not
|
||||
// have trailing slashes.
|
||||
//
|
||||
// This should return ErrDirNotFound if the directory isn't
|
||||
// found.
|
||||
//
|
||||
// It should call callback for each tranche of entries read.
|
||||
// These need not be returned in any particular order. If
|
||||
// callback returns an error then the listing will stop
|
||||
// immediately.
|
||||
//
|
||||
// Don't implement this unless you have a more efficient way
|
||||
// of listing recursively that doing a directory traversal.
|
||||
func (f *Fs) ListR(dir string, callback fs.ListRCallback) (err error) {
|
||||
const (
|
||||
grouping = 50
|
||||
inputBuffer = 1000
|
||||
)
|
||||
|
||||
err = f.dirCache.FindRoot(false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
directoryID, err := f.dirCache.FindDir(dir, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
mu := sync.Mutex{} // protects in and overflow
|
||||
wg := sync.WaitGroup{}
|
||||
in := make(chan string, inputBuffer)
|
||||
out := make(chan error, fs.Config.Checkers)
|
||||
list := walk.NewListRHelper(callback)
|
||||
overfflow := []string{}
|
||||
|
||||
cb := func(entry fs.DirEntry) error {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if d, isDir := entry.(*fs.Dir); isDir && in != nil {
|
||||
select {
|
||||
case in <- d.ID():
|
||||
wg.Add(1)
|
||||
default:
|
||||
overfflow = append(overfflow, d.ID())
|
||||
}
|
||||
}
|
||||
return list.Add(entry)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
in <- directoryID
|
||||
|
||||
for i := 0; i < fs.Config.Checkers; i++ {
|
||||
go f.listRRunner(&wg, in, out, cb, grouping)
|
||||
}
|
||||
go func() {
|
||||
// wait until the all directories are processed
|
||||
wg.Wait()
|
||||
// if the input channel overflowed add the collected entries to the channel now
|
||||
for len(overfflow) > 0 {
|
||||
mu.Lock()
|
||||
l := len(overfflow)
|
||||
// only fill half of the channel to prevent entries beeing put into overfflow again
|
||||
if l > inputBuffer/2 {
|
||||
l = inputBuffer / 2
|
||||
}
|
||||
wg.Add(l)
|
||||
for _, d := range overfflow[:l] {
|
||||
in <- d
|
||||
}
|
||||
overfflow = overfflow[l:]
|
||||
mu.Unlock()
|
||||
|
||||
// wait again for the completion of all directories
|
||||
wg.Wait()
|
||||
}
|
||||
mu.Lock()
|
||||
if in != nil {
|
||||
// notify all workers to exit
|
||||
close(in)
|
||||
in = nil
|
||||
}
|
||||
mu.Unlock()
|
||||
}()
|
||||
// wait until the all workers to finish
|
||||
for i := 0; i < fs.Config.Checkers; i++ {
|
||||
e := <-out
|
||||
mu.Lock()
|
||||
// if one worker returns an error early, close the input so all other workers exit
|
||||
if e != nil && in != nil {
|
||||
err = e
|
||||
close(in)
|
||||
in = nil
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
close(out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return list.Flush()
|
||||
}
|
||||
|
||||
func (f *Fs) itemToDirEntry(remote string, item *drive.File) (fs.DirEntry, error) {
|
||||
switch {
|
||||
case item.MimeType == driveFolderType:
|
||||
// cache the directory ID for later lookups
|
||||
f.dirCache.Put(remote, item.Id)
|
||||
when, _ := time.Parse(timeFormatIn, item.ModifiedTime)
|
||||
d := fs.NewDir(remote, when).SetID(item.Id)
|
||||
return d, nil
|
||||
case f.opt.AuthOwnerOnly && !isAuthOwned(item):
|
||||
// ignore object
|
||||
case item.Md5Checksum != "" || item.Size > 0:
|
||||
// If item has MD5 sum or a length it is a file stored on drive
|
||||
o, err := f.newObjectWithInfo(remote, item)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return o, nil
|
||||
case f.opt.SkipGdocs:
|
||||
fs.Debugf(remote, "Skipping google document type %q", item.MimeType)
|
||||
default:
|
||||
// If item MimeType is in the ExportFormats then it is a google doc
|
||||
extension, _, exportMimeType, isDocument := f.findExportFormat(item)
|
||||
if !isDocument {
|
||||
fs.Debugf(remote, "Ignoring unknown document type %q", item.MimeType)
|
||||
break
|
||||
}
|
||||
if extension == "" {
|
||||
fs.Debugf(remote, "No export formats found for %q", item.MimeType)
|
||||
break
|
||||
}
|
||||
o, err := f.newObjectWithInfo(remote+"."+extension, item)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
obj := o.(*Object)
|
||||
obj.url = fmt.Sprintf("%sfiles/%s/export?mimeType=%s", f.svc.BasePath, item.Id, url.QueryEscape(exportMimeType))
|
||||
if f.opt.AlternateExport {
|
||||
switch item.MimeType {
|
||||
case "application/vnd.google-apps.drawing":
|
||||
obj.url = fmt.Sprintf("https://docs.google.com/drawings/d/%s/export/%s", item.Id, extension)
|
||||
case "application/vnd.google-apps.document":
|
||||
obj.url = fmt.Sprintf("https://docs.google.com/document/d/%s/export?format=%s", item.Id, extension)
|
||||
case "application/vnd.google-apps.spreadsheet":
|
||||
obj.url = fmt.Sprintf("https://docs.google.com/spreadsheets/d/%s/export?format=%s", item.Id, extension)
|
||||
case "application/vnd.google-apps.presentation":
|
||||
obj.url = fmt.Sprintf("https://docs.google.com/presentation/d/%s/export/%s", item.Id, extension)
|
||||
}
|
||||
}
|
||||
obj.isDocument = true
|
||||
obj.mimeType = exportMimeType
|
||||
obj.bytes = -1
|
||||
return o, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Creates a drive.File info from the parameters passed in and a half
|
||||
// finished Object which must have setMetaData called on it
|
||||
//
|
||||
|
@ -996,7 +1205,7 @@ func (f *Fs) MergeDirs(dirs []fs.Directory) error {
|
|||
for _, srcDir := range dirs[1:] {
|
||||
// list the the objects
|
||||
infos := []*drive.File{}
|
||||
_, err := f.list(srcDir.ID(), "", false, false, true, func(info *drive.File) bool {
|
||||
_, err := f.list([]string{srcDir.ID()}, "", false, false, true, func(info *drive.File) bool {
|
||||
infos = append(infos, info)
|
||||
return false
|
||||
})
|
||||
|
@ -1064,7 +1273,7 @@ func (f *Fs) Rmdir(dir string) error {
|
|||
return err
|
||||
}
|
||||
var trashedFiles = false
|
||||
found, err := f.list(directoryID, "", false, false, true, func(item *drive.File) bool {
|
||||
found, err := f.list([]string{directoryID}, "", false, false, true, func(item *drive.File) bool {
|
||||
if !item.Trashed {
|
||||
fs.Debugf(dir, "Rmdir: contains file: %q", item.Name)
|
||||
return true
|
||||
|
@ -1597,7 +1806,7 @@ func (o *Object) readMetaData() (err error) {
|
|||
return err
|
||||
}
|
||||
|
||||
found, err := o.fs.list(directoryID, leaf, false, true, false, func(item *drive.File) bool {
|
||||
found, err := o.fs.list([]string{directoryID}, leaf, false, true, false, func(item *drive.File) bool {
|
||||
if item.Name == leaf {
|
||||
o.setMetaData(item)
|
||||
return true
|
||||
|
@ -1870,6 +2079,7 @@ var (
|
|||
_ fs.ChangeNotifier = (*Fs)(nil)
|
||||
_ fs.PutUncheckeder = (*Fs)(nil)
|
||||
_ fs.PublicLinker = (*Fs)(nil)
|
||||
_ fs.ListRer = (*Fs)(nil)
|
||||
_ fs.MergeDirser = (*Fs)(nil)
|
||||
_ fs.Abouter = (*Fs)(nil)
|
||||
_ fs.Object = (*Object)(nil)
|
||||
|
|
Loading…
Reference in a new issue