poll for Google Drive changes when mounted

This commit is contained in:
Stefan Breunig 2017-05-25 23:05:49 +02:00 committed by Nick Craig-Wood
parent 5455d34f8c
commit a2e3af0523
24 changed files with 199 additions and 9 deletions

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -48,6 +48,9 @@ func NewFS(f fs.Fs) *FS {
if readOnly {
fsys.FS.ReadOnly()
}
if pollInterval > 0 {
fsys.FS.PollChanges(pollInterval)
}
return fsys
}

View file

@ -32,6 +32,7 @@ var (
debugFUSE = false
noSeek = false
dirCacheTime = 5 * 60 * time.Second
pollInterval = time.Minute
// mount options
readOnly = false
allowNonEmpty = false
@ -58,6 +59,7 @@ func init() {
commandDefintion.Flags().BoolVarP(&debugFUSE, "debug-fuse", "", debugFUSE, "Debug the FUSE internals - needs -v.")
commandDefintion.Flags().BoolVarP(&noSeek, "no-seek", "", noSeek, "Don't allow seeking in files.")
commandDefintion.Flags().DurationVarP(&dirCacheTime, "dir-cache-time", "", dirCacheTime, "Time to cache directory entries for.")
commandDefintion.Flags().DurationVarP(&pollInterval, "poll-interval", "", pollInterval, "Time to wait between polling for changes. Must be smaller than dir-cache-time. Only on supported remotes. Set to 0 to disable.")
// mount options
commandDefintion.Flags().BoolVarP(&readOnly, "read-only", "", readOnly, "Mount read-only.")
commandDefintion.Flags().BoolVarP(&allowNonEmpty, "allow-non-empty", "", allowNonEmpty, "Allow mounting over a non-empty directory.")

View file

@ -39,6 +39,9 @@ func NewFS(f fs.Fs) *FS {
if readOnly {
fsys.FS.ReadOnly()
}
if pollInterval > 0 {
fsys.FS.PollChanges(pollInterval)
}
return fsys
}

View file

@ -28,6 +28,7 @@ var (
debugFUSE = false
noSeek = false
dirCacheTime = 5 * 60 * time.Second
pollInterval = time.Minute
// mount options
readOnly = false
allowNonEmpty = false
@ -54,6 +55,7 @@ func init() {
commandDefintion.Flags().BoolVarP(&debugFUSE, "debug-fuse", "", debugFUSE, "Debug the FUSE internals - needs -v.")
commandDefintion.Flags().BoolVarP(&noSeek, "no-seek", "", noSeek, "Don't allow seeking in files.")
commandDefintion.Flags().DurationVarP(&dirCacheTime, "dir-cache-time", "", dirCacheTime, "Time to cache directory entries for.")
commandDefintion.Flags().DurationVarP(&pollInterval, "poll-interval", "", pollInterval, "Time to wait between polling for changes. Must be smaller than dir-cache-time. Only on supported remotes. Set to 0 to disable.")
// mount options
commandDefintion.Flags().BoolVarP(&readOnly, "read-only", "", readOnly, "Mount read-only.")
commandDefintion.Flags().BoolVarP(&allowNonEmpty, "allow-non-empty", "", allowNonEmpty, "Allow mounting over a non-empty directory.")

View file

@ -51,7 +51,20 @@ func NewFS(f fs.Fs) *FS {
fsys := &FS{
f: f,
}
fsys.root = newDir(fsys, f, fsDir)
return fsys
}
// PollChanges will poll the remote every pollInterval for changes if the remote
// supports it. If a non-polling option is used, the given time interval can be
// ignored
func (fsys *FS) PollChanges(pollInterval time.Duration) *FS {
doDirChangeNotify := fsys.f.Features().DirChangeNotify
if doDirChangeNotify != nil {
doDirChangeNotify(fsys.root.ForgetPath, pollInterval)
}
return fsys
}

View file

@ -45,6 +45,7 @@ func TestFsMove2(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove2(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull2(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision2(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify2(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString2(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs2(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote2(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -45,6 +45,7 @@ func TestFsMove3(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove3(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull3(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision3(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify3(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString3(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs3(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote3(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -45,6 +45,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -13,6 +13,7 @@ import (
"log"
"net/http"
"path"
"sort"
"strings"
"time"
@ -889,6 +890,106 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
return nil
}
// DirChangeNotify polls for changes from the remote and hands the path to the
// given function. Only changes that can be resolved to a path through the
// DirCache will handled.
//
// Automatically restarts itself in case of unexpected behaviour of the remote.
//
// Close the returned channel to stop being notified.
func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration) chan bool {
quit := make(chan bool)
go func() {
select {
case <-quit:
return
default:
for {
f.dirchangeNotifyRunner(notifyFunc, pollInterval)
fs.Debugf(f, "Notify listener service ran into issues, restarting shortly.")
time.Sleep(pollInterval)
}
}
}()
return quit
}
func (f *Fs) dirchangeNotifyRunner(notifyFunc func(string), pollInterval time.Duration) {
var err error
var changeList *drive.ChangeList
var pageToken string
var largestChangeID int64
var startPageToken *drive.StartPageToken
err = f.pacer.Call(func() (bool, error) {
startPageToken, err = f.svc.Changes.GetStartPageToken().Do()
return shouldRetry(err)
})
if err != nil {
fs.Debugf(f, "Failed to get StartPageToken: %v", err)
return
}
pageToken = startPageToken.StartPageToken
for {
fs.Debugf(f, "Checking for changes on remote")
err = f.pacer.Call(func() (bool, error) {
changesCall := f.svc.Changes.List().PageToken(pageToken).Fields(googleapi.Field("nextPageToken,largestChangeId,newStartPageToken,items(fileId,file/parents(id))"))
if largestChangeID != 0 {
changesCall = changesCall.StartChangeId(largestChangeID)
}
if *driveListChunk > 0 {
changesCall = changesCall.MaxResults(*driveListChunk)
}
changeList, err = changesCall.Do()
return shouldRetry(err)
})
if err != nil {
fs.Debugf(f, "Failed to get Changes: %v", err)
return
}
pathsToClear := make([]string, 0)
for _, change := range changeList.Items {
if path, ok := f.dirCache.GetInv(change.FileId); ok {
pathsToClear = append(pathsToClear, path)
}
if change.File != nil {
for _, parent := range change.File.Parents {
if path, ok := f.dirCache.GetInv(parent.Id); ok {
pathsToClear = append(pathsToClear, path)
}
}
}
}
lastNotifiedPath := ""
sort.Strings(pathsToClear)
for _, path := range pathsToClear {
if lastNotifiedPath != "" && (path == lastNotifiedPath || strings.HasPrefix(path+"/", lastNotifiedPath)) {
continue
}
lastNotifiedPath = path
notifyFunc(path)
}
if changeList.LargestChangeId != 0 {
largestChangeID = changeList.LargestChangeId
}
if changeList.NewStartPageToken != "" {
pageToken = changeList.NewStartPageToken
fs.Debugf(f, "All changes were processed. Waiting for more.")
time.Sleep(pollInterval)
} else if changeList.NextPageToken != "" {
pageToken = changeList.NextPageToken
fs.Debugf(f, "There are more changes pending, checking now.")
} else {
fs.Debugf(f, "Did not get any page token, something went wrong! %+v", changeList)
return
}
}
}
// DirCacheFlush resets the directory cache - used in testing as an
// optional interface
func (f *Fs) DirCacheFlush() {
@ -1175,13 +1276,14 @@ func (o *Object) MimeType() string {
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil)
_ fs.Object = (*Object)(nil)
_ fs.MimeTyper = &Object{}
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.DirChangeNotifier = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil)
_ fs.Object = (*Object)(nil)
_ fs.MimeTyper = &Object{}
)

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -267,6 +267,11 @@ type Features struct {
// If destination exists then return fs.ErrorDirExists
DirMove func(src Fs, srcRemote, dstRemote string) error
// DirChangeNotify calls the passed function with a path
// of a directory that has had changes. If the implementation
// uses polling, it should adhere to the given interval.
DirChangeNotify func(func(string), time.Duration) chan bool
// UnWrap returns the Fs that this Fs is wrapping
UnWrap func() Fs
@ -307,6 +312,9 @@ func (ft *Features) Fill(f Fs) *Features {
if do, ok := f.(DirMover); ok {
ft.DirMove = do.DirMove
}
if do, ok := f.(DirChangeNotifier); ok {
ft.DirChangeNotify = do.DirChangeNotify
}
if do, ok := f.(UnWrapper); ok {
ft.UnWrap = do.UnWrap
}
@ -346,6 +354,9 @@ func (ft *Features) Mask(f Fs) *Features {
if mask.DirMove == nil {
ft.DirMove = nil
}
if mask.DirChangeNotify == nil {
ft.DirChangeNotify = nil
}
// if mask.UnWrap == nil {
// ft.UnWrap = nil
// }
@ -424,6 +435,14 @@ type DirMover interface {
DirMove(src Fs, srcRemote, dstRemote string) error
}
// DirChangeNotifier is an optional interface for Fs
type DirChangeNotifier interface {
// DirChangeNotify calls the passed function with a path
// of a directory that has had changes. If the implementation
// uses polling, it should adhere to the given interval.
DirChangeNotify(func(string), time.Duration) chan bool
}
// UnWrapper is an optional interfaces for Fs
type UnWrapper interface {
// UnWrap returns the Fs that this Fs is wrapping

View file

@ -577,6 +577,36 @@ func TestFsPrecision(t *testing.T) {
// FIXME check expected precision
}
// TestFsDirChangeNotify tests that changes to directories are properly
// propagated
//
// go test -v -remote TestDrive: -run '^Test(Setup|Init|FsDirChangeNotify)$' -verbose
func TestFsDirChangeNotify(t *testing.T) {
skipIfNotOk(t)
// Check have DirChangeNotify
doDirChangeNotify := remote.Features().DirChangeNotify
if doDirChangeNotify == nil {
t.Skip("FS has no DirChangeNotify interface")
}
err := fs.Mkdir(remote, "dir")
require.NoError(t, err)
changes := []string{}
quitChannel := doDirChangeNotify(func(x string) {
changes = append(changes, x)
}, time.Second)
defer func() { close(quitChannel) }()
err = fs.Mkdir(remote, "dir/subdir")
require.NoError(t, err)
time.Sleep(2 * time.Second)
assert.Equal(t, []string{"dir"}, changes)
}
// TestObjectString tests the Object String method
func TestObjectString(t *testing.T) {
skipIfNotOk(t)

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }

View file

@ -44,6 +44,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) }
func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) }
func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) }
func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) }
func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) }
func TestObjectString(t *testing.T) { fstests.TestObjectString(t) }
func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) }
func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) }