From c138367df68097e0dadc426ef1b51cb7287bc35c Mon Sep 17 00:00:00 2001 From: Hugo Laloge Date: Fri, 29 Apr 2022 15:46:06 +0200 Subject: [PATCH] onedrive: Implement --poll-interval for onedrive Implement ChangeNotifier for onedrive. Use drive delta queries to listen for modifications. --- backend/onedrive/onedrive.go | 140 +++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/backend/onedrive/onedrive.go b/backend/onedrive/onedrive.go index 630a51197..80bf340c6 100644 --- a/backend/onedrive/onedrive.go +++ b/backend/onedrive/onedrive.go @@ -301,6 +301,10 @@ type siteResource struct { type siteResponse struct { Sites []siteResource `json:"value"` } +type deltaResponse struct { + DeltaLink string `json:"@odata.deltaLink"` + Value []api.Item `json:"value"` +} // Get the region and graphURL from the config func getRegionURL(m configmap.Mapper) (region, graphURL string) { @@ -2302,6 +2306,142 @@ func (f *Fs) canonicalDriveID(driveID string) (canonicalDriveID string) { return canonicalDriveID } +// ChangeNotify calls the passed function with a path that has had changes. +// If the implementation uses polling, it should adhere to the given interval. +// +// Automatically restarts itself in case of unexpected behavior of the remote. +// +// Close the returned channel to stop being notified. +// +// The Onedrive implementation gives the whole hierarchy up to the top when +// an object is changed. For instance, if a/b/c is changed, this function +// will call notifyFunc with a, a/b and a/b/c. +func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { + go func() { + // get the StartPageToken early so all changes from now on get processed + nextDeltaToken, err := f.changeNotifyStartPageToken(ctx) + if err != nil { + fs.Errorf(f, "Could not get first deltaLink: %s", err) + return + } + + fs.Debugf(f, "Next delta token is: %s", nextDeltaToken) + + var ticker *time.Ticker + var tickerC <-chan time.Time + for { + select { + case pollInterval, ok := <-pollIntervalChan: + if !ok { + if ticker != nil { + ticker.Stop() + } + return + } + if ticker != nil { + ticker.Stop() + ticker, tickerC = nil, nil + } + if pollInterval != 0 { + ticker = time.NewTicker(pollInterval) + tickerC = ticker.C + } + case <-tickerC: + fs.Debugf(f, "Checking for changes on remote") + nextDeltaToken, err = f.changeNotifyRunner(ctx, notifyFunc, nextDeltaToken) + if err != nil { + fs.Infof(f, "Change notify listener failure: %s", err) + } + } + } + }() +} + +func (f *Fs) changeNotifyStartPageToken(ctx context.Context) (nextDeltaToken string, err error) { + delta, err := f.changeNotifyNextChange(ctx, "latest") + parsedURL, err := url.Parse(delta.DeltaLink) + if err != nil { + return + } + nextDeltaToken = parsedURL.Query().Get("token") + return +} + +func (f *Fs) changeNotifyNextChange(ctx context.Context, token string) (delta deltaResponse, err error) { + opts := f.buildDriveDeltaOpts(token) + + _, err = f.srv.CallJSON(ctx, &opts, nil, &delta) + + return +} + +func (f *Fs) buildDriveDeltaOpts(token string) rest.Opts { + rootURL := graphAPIEndpoint[f.opt.Region] + "/v1.0/drives" + + return rest.Opts{ + Method: "GET", + RootURL: rootURL, + Path: "/" + f.driveID + "/root/delta", + Parameters: map[string][]string{"token": {token}}, + } +} + +func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs.EntryType), deltaToken string) (nextDeltaToken string, err error) { + delta, err := f.changeNotifyNextChange(ctx, deltaToken) + parsedURL, err := url.Parse(delta.DeltaLink) + if err != nil { + return + } + nextDeltaToken = parsedURL.Query().Get("token") + + for _, item := range delta.Value { + isDriveRootFolder := item.GetParentReference().ID == "" + if isDriveRootFolder { + continue + } + + fullPath, err := getItemFullPath(&item) + if err != nil { + fs.Errorf(f, "Could not get item full path: %s", err) + continue + } + + if fullPath == f.root { + continue + } + + relName, insideRoot := getRelativePathInsideBase(f.root, fullPath) + if !insideRoot { + continue + } + + if item.GetFile() != nil { + notifyFunc(relName, fs.EntryObject) + } else if item.GetFolder() != nil { + notifyFunc(relName, fs.EntryDirectory) + } + } + + return +} + +func getItemFullPath(item *api.Item) (fullPath string, err error) { + err = nil + fullPath = item.GetName() + if parent := item.GetParentReference(); parent != nil && parent.Path != "" { + pathParts := strings.SplitN(parent.Path, ":", 2) + if len(pathParts) != 2 { + err = fmt.Errorf("invalid parent path: %s", parent.Path) + return + } + + if pathParts[1] != "" { + fullPath = strings.TrimPrefix(pathParts[1], "/") + "/" + fullPath + } + } + return +} + // getRelativePathInsideBase checks if `target` is inside `base`. If so, it // returns a relative path for `target` based on `base` and a boolean `true`. // Otherwise returns "", false.