forked from TrueCloudLab/rclone
onedrive: Implement --poll-interval for onedrive
Implement ChangeNotifier for onedrive. Use drive delta queries to listen for modifications.
This commit is contained in:
parent
da404dc0f2
commit
c138367df6
1 changed files with 140 additions and 0 deletions
|
@ -301,6 +301,10 @@ type siteResource struct {
|
|||
type siteResponse struct {
|
||||
Sites []siteResource `json:"value"`
|
||||
}
|
||||
type deltaResponse struct {
|
||||
DeltaLink string `json:"@odata.deltaLink"`
|
||||
Value []api.Item `json:"value"`
|
||||
}
|
||||
|
||||
// Get the region and graphURL from the config
|
||||
func getRegionURL(m configmap.Mapper) (region, graphURL string) {
|
||||
|
@ -2302,6 +2306,142 @@ func (f *Fs) canonicalDriveID(driveID string) (canonicalDriveID string) {
|
|||
return canonicalDriveID
|
||||
}
|
||||
|
||||
// ChangeNotify calls the passed function with a path that has had changes.
|
||||
// If the implementation uses polling, it should adhere to the given interval.
|
||||
//
|
||||
// Automatically restarts itself in case of unexpected behavior of the remote.
|
||||
//
|
||||
// Close the returned channel to stop being notified.
|
||||
//
|
||||
// The Onedrive implementation gives the whole hierarchy up to the top when
|
||||
// an object is changed. For instance, if a/b/c is changed, this function
|
||||
// will call notifyFunc with a, a/b and a/b/c.
|
||||
func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
|
||||
go func() {
|
||||
// get the StartPageToken early so all changes from now on get processed
|
||||
nextDeltaToken, err := f.changeNotifyStartPageToken(ctx)
|
||||
if err != nil {
|
||||
fs.Errorf(f, "Could not get first deltaLink: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
fs.Debugf(f, "Next delta token is: %s", nextDeltaToken)
|
||||
|
||||
var ticker *time.Ticker
|
||||
var tickerC <-chan time.Time
|
||||
for {
|
||||
select {
|
||||
case pollInterval, ok := <-pollIntervalChan:
|
||||
if !ok {
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
}
|
||||
return
|
||||
}
|
||||
if ticker != nil {
|
||||
ticker.Stop()
|
||||
ticker, tickerC = nil, nil
|
||||
}
|
||||
if pollInterval != 0 {
|
||||
ticker = time.NewTicker(pollInterval)
|
||||
tickerC = ticker.C
|
||||
}
|
||||
case <-tickerC:
|
||||
fs.Debugf(f, "Checking for changes on remote")
|
||||
nextDeltaToken, err = f.changeNotifyRunner(ctx, notifyFunc, nextDeltaToken)
|
||||
if err != nil {
|
||||
fs.Infof(f, "Change notify listener failure: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (f *Fs) changeNotifyStartPageToken(ctx context.Context) (nextDeltaToken string, err error) {
|
||||
delta, err := f.changeNotifyNextChange(ctx, "latest")
|
||||
parsedURL, err := url.Parse(delta.DeltaLink)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
nextDeltaToken = parsedURL.Query().Get("token")
|
||||
return
|
||||
}
|
||||
|
||||
func (f *Fs) changeNotifyNextChange(ctx context.Context, token string) (delta deltaResponse, err error) {
|
||||
opts := f.buildDriveDeltaOpts(token)
|
||||
|
||||
_, err = f.srv.CallJSON(ctx, &opts, nil, &delta)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (f *Fs) buildDriveDeltaOpts(token string) rest.Opts {
|
||||
rootURL := graphAPIEndpoint[f.opt.Region] + "/v1.0/drives"
|
||||
|
||||
return rest.Opts{
|
||||
Method: "GET",
|
||||
RootURL: rootURL,
|
||||
Path: "/" + f.driveID + "/root/delta",
|
||||
Parameters: map[string][]string{"token": {token}},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Fs) changeNotifyRunner(ctx context.Context, notifyFunc func(string, fs.EntryType), deltaToken string) (nextDeltaToken string, err error) {
|
||||
delta, err := f.changeNotifyNextChange(ctx, deltaToken)
|
||||
parsedURL, err := url.Parse(delta.DeltaLink)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
nextDeltaToken = parsedURL.Query().Get("token")
|
||||
|
||||
for _, item := range delta.Value {
|
||||
isDriveRootFolder := item.GetParentReference().ID == ""
|
||||
if isDriveRootFolder {
|
||||
continue
|
||||
}
|
||||
|
||||
fullPath, err := getItemFullPath(&item)
|
||||
if err != nil {
|
||||
fs.Errorf(f, "Could not get item full path: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if fullPath == f.root {
|
||||
continue
|
||||
}
|
||||
|
||||
relName, insideRoot := getRelativePathInsideBase(f.root, fullPath)
|
||||
if !insideRoot {
|
||||
continue
|
||||
}
|
||||
|
||||
if item.GetFile() != nil {
|
||||
notifyFunc(relName, fs.EntryObject)
|
||||
} else if item.GetFolder() != nil {
|
||||
notifyFunc(relName, fs.EntryDirectory)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func getItemFullPath(item *api.Item) (fullPath string, err error) {
|
||||
err = nil
|
||||
fullPath = item.GetName()
|
||||
if parent := item.GetParentReference(); parent != nil && parent.Path != "" {
|
||||
pathParts := strings.SplitN(parent.Path, ":", 2)
|
||||
if len(pathParts) != 2 {
|
||||
err = fmt.Errorf("invalid parent path: %s", parent.Path)
|
||||
return
|
||||
}
|
||||
|
||||
if pathParts[1] != "" {
|
||||
fullPath = strings.TrimPrefix(pathParts[1], "/") + "/" + fullPath
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// getRelativePathInsideBase checks if `target` is inside `base`. If so, it
|
||||
// returns a relative path for `target` based on `base` and a boolean `true`.
|
||||
// Otherwise returns "", false.
|
||||
|
|
Loading…
Reference in a new issue