From 1eec59e091c42c839fe9ee4c1e7436142d97fc91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20M=C3=B6ller?= Date: Sat, 25 Aug 2018 21:28:57 +0200 Subject: [PATCH] fs: update ChangeNotifier interface This introduces a channel to the ChangeNotify function, which can be used to update the poll-interval and cleanly exit the polling function. --- backend/amazonclouddrive/amazonclouddrive.go | 34 ++++++--- backend/cache/cache.go | 11 ++- backend/crypt/crypt.go | 4 +- backend/drive/drive.go | 74 +++++++++++++------- fs/fs.go | 10 ++- fstest/fstests/fstests.go | 8 ++- vfs/vfs.go | 15 ++-- 7 files changed, 103 insertions(+), 53 deletions(-) diff --git a/backend/amazonclouddrive/amazonclouddrive.go b/backend/amazonclouddrive/amazonclouddrive.go index 22201febf..d5932fae4 100644 --- a/backend/amazonclouddrive/amazonclouddrive.go +++ b/backend/amazonclouddrive/amazonclouddrive.go @@ -1240,24 +1240,38 @@ func (o *Object) MimeType() string { // Automatically restarts itself in case of unexpected behaviour of the remote. // // Close the returned channel to stop being notified. -func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { +func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { checkpoint := f.opt.Checkpoint - quit := make(chan bool) go func() { + var ticker *time.Ticker + var tickerC <-chan time.Time for { - checkpoint = f.changeNotifyRunner(notifyFunc, checkpoint) - if err := config.SetValueAndSave(f.name, "checkpoint", checkpoint); err != nil { - fs.Debugf(f, "Unable to save checkpoint: %v", err) - } select { - case <-quit: - return - case <-time.After(pollInterval): + case pollInterval, ok := <-pollIntervalChan: + if !ok { + if ticker != nil { + ticker.Stop() + } + return + } + if pollInterval == 0 { + if ticker != nil { + ticker.Stop() + ticker, tickerC = nil, nil + } + } else { + ticker = time.NewTicker(pollInterval) + tickerC = ticker.C + } + case <-tickerC: + checkpoint = f.changeNotifyRunner(notifyFunc, checkpoint) + if err := config.SetValueAndSave(f.name, "checkpoint", checkpoint); err != nil { + fs.Debugf(f, "Unable to save checkpoint: %v", err) + } } } }() - return quit } func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), checkpoint string) string { diff --git a/backend/cache/cache.go b/backend/cache/cache.go index afdd6cd89..6e3f092f0 100644 --- a/backend/cache/cache.go +++ b/backend/cache/cache.go @@ -415,7 +415,9 @@ func NewFs(name, rootPath string, m configmap.Mapper) (fs.Fs, error) { }() if doChangeNotify := wrappedFs.Features().ChangeNotify; doChangeNotify != nil { - doChangeNotify(f.receiveChangeNotify, time.Duration(f.opt.ChunkCleanInterval)) + pollInterval := make(chan time.Duration, 1) + pollInterval <- time.Duration(f.opt.ChunkCleanInterval) + doChangeNotify(f.receiveChangeNotify, pollInterval) } f.features = (&fs.Features{ @@ -780,12 +782,15 @@ func (f *Fs) notifyChangeUpstream(remote string, entryType fs.EntryType) { // ChangeNotify can subsribe multiple callers // this is coupled with the wrapped fs ChangeNotify (if it supports it) // and also notifies other caches (i.e VFS) to clear out whenever something changes -func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { +func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval <-chan time.Duration) { f.parentsForgetMu.Lock() defer f.parentsForgetMu.Unlock() fs.Debugf(f, "subscribing to ChangeNotify") f.parentsForgetFn = append(f.parentsForgetFn, notifyFunc) - return make(chan bool) + go func() { + for range pollInterval { + } + }() } // Name of the remote (as passed into NewFs) diff --git a/backend/crypt/crypt.go b/backend/crypt/crypt.go index 1feded4ee..f6de2214d 100644 --- a/backend/crypt/crypt.go +++ b/backend/crypt/crypt.go @@ -165,7 +165,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { doChangeNotify := wrappedFs.Features().ChangeNotify if doChangeNotify != nil { - f.features.ChangeNotify = func(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { + f.features.ChangeNotify = func(notifyFunc func(string, fs.EntryType), pollInterval <-chan time.Duration) { wrappedNotifyFunc := func(path string, entryType fs.EntryType) { decrypted, err := f.DecryptFileName(path) if err != nil { @@ -174,7 +174,7 @@ func NewFs(name, rpath string, m configmap.Mapper) (fs.Fs, error) { } notifyFunc(decrypted, entryType) } - return doChangeNotify(wrappedNotifyFunc, pollInterval) + doChangeNotify(wrappedNotifyFunc, pollInterval) } } diff --git a/backend/drive/drive.go b/backend/drive/drive.go index 61fb4fd0a..1205d9519 100644 --- a/backend/drive/drive.go +++ b/backend/drive/drive.go @@ -1660,25 +1660,50 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { // Automatically restarts itself in case of unexpected behaviour of the remote. // // Close the returned channel to stop being notified. -func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { - quit := make(chan bool) +func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { go func() { - select { - case <-quit: - return - default: - for { - f.changeNotifyRunner(notifyFunc, pollInterval) - fs.Debugf(f, "Notify listener service ran into issues, restarting shortly.") - time.Sleep(pollInterval) + // get the StartPageToken early so all changes from now on get processed + startPageToken, err := f.changeNotifyStartPageToken() + if err != nil { + fs.Infof(f, "Failed to get StartPageToken: %s", err) + } + var ticker *time.Ticker + var tickerC <-chan time.Time + for { + select { + case pollInterval, ok := <-pollIntervalChan: + if !ok { + if ticker != nil { + ticker.Stop() + } + return + } + if ticker != nil { + ticker.Stop() + ticker, tickerC = nil, nil + } + if pollInterval != 0 { + ticker = time.NewTicker(pollInterval) + tickerC = ticker.C + } + case <-tickerC: + if startPageToken == "" { + startPageToken, err = f.changeNotifyStartPageToken() + if err != nil { + fs.Infof(f, "Failed to get StartPageToken: %s", err) + continue + } + } + fs.Debugf(f, "Checking for changes on remote") + startPageToken, err = f.changeNotifyRunner(notifyFunc, startPageToken) + if err != nil { + fs.Infof(f, "Change notify listener failure: %s", err) + } } } }() - return quit } - -func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) { - var err error +func (f *Fs) changeNotifyStartPageToken() (pageToken string, err error) { var startPageToken *drive.StartPageToken err = f.pacer.Call(func() (bool, error) { startPageToken, err = f.svc.Changes.GetStartPageToken(). @@ -1687,13 +1712,14 @@ func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), pollInter return shouldRetry(err) }) if err != nil { - fs.Debugf(f, "Failed to get StartPageToken: %v", err) return } - pageToken := startPageToken.StartPageToken + return startPageToken.StartPageToken, nil +} +func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), startPageToken string) (newStartPageToken string, err error) { + pageToken := startPageToken for { - fs.Debugf(f, "Checking for changes on remote") var changeList *drive.ChangeList err = f.pacer.Call(func() (bool, error) { @@ -1711,7 +1737,6 @@ func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), pollInter return shouldRetry(err) }) if err != nil { - fs.Debugf(f, "Failed to get Changes: %v", err) return } @@ -1763,15 +1788,12 @@ func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), pollInter notifyFunc(entry.path, entry.entryType) } - if changeList.NewStartPageToken != "" { - pageToken = changeList.NewStartPageToken - fs.Debugf(f, "All changes were processed. Waiting for more.") - time.Sleep(pollInterval) - } else if changeList.NextPageToken != "" { + switch { + case changeList.NewStartPageToken != "": + return changeList.NewStartPageToken, nil + case changeList.NextPageToken != "": pageToken = changeList.NextPageToken - fs.Debugf(f, "There are more changes pending, checking now.") - } else { - fs.Debugf(f, "Did not get any page token, something went wrong! %+v", changeList) + default: return } } diff --git a/fs/fs.go b/fs/fs.go index fc16591b2..66e2c02b0 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -424,7 +424,7 @@ type Features struct { // ChangeNotify calls the passed function with a path // that has had changes. If the implementation // uses polling, it should adhere to the given interval. - ChangeNotify func(func(string, EntryType), time.Duration) chan bool + ChangeNotify func(func(string, EntryType), <-chan time.Duration) // UnWrap returns the Fs that this Fs is wrapping UnWrap func() Fs @@ -724,7 +724,13 @@ type ChangeNotifier interface { // ChangeNotify calls the passed function with a path // that has had changes. If the implementation // uses polling, it should adhere to the given interval. - ChangeNotify(func(string, EntryType), time.Duration) chan bool + // At least one value will be written to the channel, + // specifying the initial value and updated values might + // follow. A 0 Duration should pause the polling. + // The ChangeNotify implemantion must empty the channel + // regulary. When the channel gets closed, the implemantion + // should stop polling and release resources. + ChangeNotify(func(string, EntryType), <-chan time.Duration) } // UnWrapper is an optional interfaces for Fs diff --git a/fstest/fstests/fstests.go b/fstest/fstests/fstests.go index 073933234..4fced08f0 100644 --- a/fstest/fstests/fstests.go +++ b/fstest/fstests/fstests.go @@ -770,9 +770,10 @@ func Run(t *testing.T, opt *Opt) { err := operations.Mkdir(remote, "dir") require.NoError(t, err) + pollInterval := make(chan time.Duration) dirChanges := []string{} objChanges := []string{} - quitChannel := doChangeNotify(func(x string, e fs.EntryType) { + doChangeNotify(func(x string, e fs.EntryType) { fs.Debugf(nil, "doChangeNotify(%q, %+v)", x, e) if strings.HasPrefix(x, file1.Path[:5]) || strings.HasPrefix(x, file2.Path[:5]) { fs.Debugf(nil, "Ignoring notify for file1 or file2: %q, %v", x, e) @@ -783,8 +784,9 @@ func Run(t *testing.T, opt *Opt) { } else if e == fs.EntryObject { objChanges = append(objChanges, x) } - }, time.Second) - defer func() { close(quitChannel) }() + }, pollInterval) + defer func() { close(pollInterval) }() + pollInterval <- time.Second var dirs []string for _, idx := range []int{1, 3, 2} { diff --git a/vfs/vfs.go b/vfs/vfs.go index e436fb3ff..05bcf4392 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -176,6 +176,7 @@ type VFS struct { usageMu sync.Mutex usageTime time.Time usage *fs.Usage + pollChan chan time.Duration } // Options is options for creating the vfs @@ -223,13 +224,13 @@ func New(f fs.Fs, opt *Options) *VFS { // Create root directory vfs.root = newDir(vfs, f, nil, fsDir) - // Start polling if required - if vfs.Opt.PollInterval > 0 { - if do := vfs.f.Features().ChangeNotify; do != nil { - do(vfs.notifyFunc, vfs.Opt.PollInterval) - } else { - fs.Infof(f, "poll-interval is not supported by this remote") - } + // Start polling function + if do := vfs.f.Features().ChangeNotify; do != nil { + vfs.pollChan = make(chan time.Duration) + do(vfs.notifyFunc, vfs.pollChan) + vfs.pollChan <- vfs.Opt.PollInterval + } else { + fs.Infof(f, "poll-interval is not supported by this remote") } vfs.SetCacheMode(vfs.Opt.CacheMode)