From 70f07fd3ac2bc6e98dbf19b43da28d168e261aab Mon Sep 17 00:00:00 2001 From: Remus Bunduc Date: Thu, 8 Mar 2018 22:03:34 +0200 Subject: [PATCH] fs: add ChangeNotify and backend support for it (#2094) * fs: rename DirChangeNotify to ChangeNotify * cache: switch to ChangeNotify * ChangeNotify: keep order of notifications --- backend/amazonclouddrive/amazonclouddrive.go | 68 ++++-- .../amazonclouddrive/amazonclouddrive_test.go | 2 +- backend/azureblob/azureblob_test.go | 2 +- backend/b2/b2_test.go | 2 +- backend/box/box_test.go | 2 +- backend/cache/cache.go | 205 +++++++++--------- backend/cache/cache_internal_test.go | 134 +++++++++++- backend/cache/cache_test.go | 2 +- backend/cache/handle.go | 2 +- backend/cache/object.go | 22 +- backend/crypt/crypt.go | 14 +- backend/crypt/crypt2_test.go | 2 +- backend/crypt/crypt3_test.go | 2 +- backend/crypt/crypt_test.go | 2 +- backend/drive/drive.go | 90 ++++---- backend/drive/drive_test.go | 2 +- backend/dropbox/dropbox_test.go | 2 +- backend/ftp/ftp_test.go | 2 +- .../googlecloudstorage_test.go | 2 +- backend/hubic/hubic_test.go | 2 +- backend/local/local_test.go | 2 +- backend/onedrive/onedrive_test.go | 2 +- backend/pcloud/pcloud_test.go | 2 +- backend/qingstor/qingstor_test.go | 2 +- backend/s3/s3_test.go | 2 +- backend/sftp/sftp_test.go | 2 +- backend/swift/swift_test.go | 2 +- backend/webdav/webdav_test.go | 2 +- backend/yandex/yandex_test.go | 2 +- cmd/mountlib/mounttest/dir.go | 5 +- fs/fs.go | 31 ++- fstest/fstests/fstests.go | 45 ++-- vfs/dir.go | 10 +- vfs/dir_test.go | 5 +- vfs/vfs.go | 2 +- 35 files changed, 447 insertions(+), 230 deletions(-) diff --git a/backend/amazonclouddrive/amazonclouddrive.go b/backend/amazonclouddrive/amazonclouddrive.go index e7d6486dc..3cd6a16de 100644 --- a/backend/amazonclouddrive/amazonclouddrive.go +++ b/backend/amazonclouddrive/amazonclouddrive.go @@ -19,7 +19,6 @@ import ( "net/http" "path" "regexp" - "sort" "strings" "time" @@ -1207,20 +1206,19 @@ func (o *Object) MimeType() string { return "" } -// DirChangeNotify polls for changes from the remote and hands the path to the -// given function. Only changes that can be resolved to a path through the -// DirCache will handled. +// ChangeNotify calls the passed function with a path that has had changes. +// If the implementation uses polling, it should adhere to the given interval. // // Automatically restarts itself in case of unexpected behaviour of the remote. // // Close the returned channel to stop being notified. -func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration) chan bool { +func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { checkpoint := config.FileGet(f.name, "checkpoint") quit := make(chan bool) go func() { for { - checkpoint = f.dirchangeNotifyRunner(notifyFunc, checkpoint) + checkpoint = f.changeNotifyRunner(notifyFunc, checkpoint) if err := config.SetValueAndSave(f.name, "checkpoint", checkpoint); err != nil { fs.Debugf(f, "Unable to save checkpoint: %v", err) } @@ -1234,7 +1232,7 @@ func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration return quit } -func (f *Fs) dirchangeNotifyRunner(notifyFunc func(string), checkpoint string) string { +func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), checkpoint string) string { var err error var resp *http.Response var reachedEnd bool @@ -1251,7 +1249,11 @@ func (f *Fs) dirchangeNotifyRunner(notifyFunc func(string), checkpoint string) s return err } - pathsToClear := make([]string, 0) + type entryType struct { + path string + entryType fs.EntryType + } + var pathsToClear []entryType csCount++ nodeCount += len(changeSet.Nodes) if changeSet.End { @@ -1262,20 +1264,40 @@ func (f *Fs) dirchangeNotifyRunner(notifyFunc func(string), checkpoint string) s } for _, node := range changeSet.Nodes { if path, ok := f.dirCache.GetInv(*node.Id); ok { - pathsToClear = append(pathsToClear, path) + if node.IsFile() { + pathsToClear = append(pathsToClear, entryType{path: path, entryType: fs.EntryObject}) + } else { + pathsToClear = append(pathsToClear, entryType{path: path, entryType: fs.EntryDirectory}) + } + continue + } + + if node.IsFile() { + // translate the parent dir of this object + if len(node.Parents) > 0 { + if path, ok := f.dirCache.GetInv(node.Parents[0]); ok { + // and append the drive file name to compute the full file name + if len(path) > 0 { + path = path + "/" + *node.Name + } else { + path = *node.Name + } + // this will now clear the actual file too + pathsToClear = append(pathsToClear, entryType{path: path, entryType: fs.EntryObject}) + } + } else { // a true root object that is changed + pathsToClear = append(pathsToClear, entryType{path: *node.Name, entryType: fs.EntryObject}) + } } } - notified := false - lastNotifiedPath := "" - sort.Strings(pathsToClear) - for _, path := range pathsToClear { - if notified && strings.HasPrefix(path+"/", lastNotifiedPath+"/") { + visitedPaths := make(map[string]bool) + for _, entry := range pathsToClear { + if _, ok := visitedPaths[entry.path]; ok { continue } - lastNotifiedPath = path - notified = true - notifyFunc(path) + visitedPaths[entry.path] = true + notifyFunc(entry.path, entry.entryType) } return nil @@ -1303,10 +1325,10 @@ var ( _ fs.Fs = (*Fs)(nil) _ fs.Purger = (*Fs)(nil) // _ fs.Copier = (*Fs)(nil) - _ fs.Mover = (*Fs)(nil) - _ fs.DirMover = (*Fs)(nil) - _ fs.DirCacheFlusher = (*Fs)(nil) - _ fs.DirChangeNotifier = (*Fs)(nil) - _ fs.Object = (*Object)(nil) - _ fs.MimeTyper = &Object{} + _ fs.Mover = (*Fs)(nil) + _ fs.DirMover = (*Fs)(nil) + _ fs.DirCacheFlusher = (*Fs)(nil) + _ fs.ChangeNotifier = (*Fs)(nil) + _ fs.Object = (*Object)(nil) + _ fs.MimeTyper = &Object{} ) diff --git a/backend/amazonclouddrive/amazonclouddrive_test.go b/backend/amazonclouddrive/amazonclouddrive_test.go index 73dffbb3e..cc15c4717 100644 --- a/backend/amazonclouddrive/amazonclouddrive_test.go +++ b/backend/amazonclouddrive/amazonclouddrive_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/azureblob/azureblob_test.go b/backend/azureblob/azureblob_test.go index 8fe04ff7a..4ad3b2fe3 100644 --- a/backend/azureblob/azureblob_test.go +++ b/backend/azureblob/azureblob_test.go @@ -54,7 +54,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/b2/b2_test.go b/backend/b2/b2_test.go index 8be738e9a..6a81f3a16 100644 --- a/backend/b2/b2_test.go +++ b/backend/b2/b2_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/box/box_test.go b/backend/box/box_test.go index c83f83563..cfe47f3b4 100644 --- a/backend/box/box_test.go +++ b/backend/box/box_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/cache/cache.go b/backend/cache/cache.go index 7fd132b4d..d25e83892 100644 --- a/backend/cache/cache.go +++ b/backend/cache/cache.go @@ -174,7 +174,10 @@ type Fs struct { plexConnector *plexConnector backgroundRunner *backgroundWriter cleanupChan chan bool - parentsForgetFn []func(string) + parentsForgetFn []func(string, fs.EntryType) + notifiedRemotes map[string]bool + notifiedMu sync.Mutex + parentsForgetMu sync.Mutex } // parseRootPath returns a cleaned root path and a nil error or "" and an error when the path is invalid @@ -263,6 +266,7 @@ func NewFs(name, rootPath string) (fs.Fs, error) { tempWritePath: *cacheTempWritePath, tempWriteWait: waitTime, cleanupChan: make(chan bool, 1), + notifiedRemotes: make(map[string]bool), } if f.chunkTotalSize < (f.chunkSize * int64(f.totalWorkers)) { return nil, errors.Errorf("don't set cache-total-chunk-size(%v) less than cache-chunk-size(%v) * cache-workers(%v)", @@ -381,8 +385,8 @@ func NewFs(name, rootPath string) (fs.Fs, error) { } }() - if doDirChangeNotify := wrappedFs.Features().DirChangeNotify; doDirChangeNotify != nil { - doDirChangeNotify(f.receiveDirChangeNotify, f.chunkCleanInterval) + if doChangeNotify := wrappedFs.Features().ChangeNotify; doChangeNotify != nil { + doChangeNotify(f.receiveChangeNotify, f.chunkCleanInterval) } f.features = (&fs.Features{ @@ -390,7 +394,7 @@ func NewFs(name, rootPath string) (fs.Fs, error) { DuplicateFiles: false, // storage doesn't permit this }).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs) // override only those features that use a temp fs and it doesn't support them - f.features.DirChangeNotify = f.DirChangeNotify + //f.features.ChangeNotify = f.ChangeNotify if f.tempWritePath != "" { if f.tempFs.Features().Copy == nil { f.features.Copy = nil @@ -414,85 +418,73 @@ func NewFs(name, rootPath string) (fs.Fs, error) { return f, fsErr } -func (f *Fs) receiveDirChangeNotify(forgetPath string) { +// receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files +func (f *Fs) receiveChangeNotify(forgetPath string, entryType fs.EntryType) { fs.Debugf(f, "notify: expiring cache for '%v'", forgetPath) // notify upstreams too (vfs) - f.notifyDirChange(forgetPath) + f.notifyChangeUpstream(forgetPath, entryType) var cd *Directory - co := NewObject(f, forgetPath) - err := f.cache.GetObject(co) - if err == nil { + if entryType == fs.EntryObject { + co := NewObject(f, forgetPath) + err := f.cache.GetObject(co) + if err != nil { + fs.Debugf(f, "ignoring change notification for non cached entry %v", co) + return + } + // expire the entry + co.CacheTs = time.Now().Add(f.fileAge * -1) + err = f.cache.AddObject(co) + if err != nil { + fs.Errorf(forgetPath, "notify: error expiring '%v': %v", co, err) + } else { + fs.Debugf(forgetPath, "notify: expired %v", co) + } cd = NewDirectory(f, cleanPath(path.Dir(co.Remote()))) } else { cd = NewDirectory(f, forgetPath) - } - - // we list all the cached objects and expire all of them - entries, err := f.cache.GetDirEntries(cd) - if err != nil { - fs.Debugf(forgetPath, "notify: ignoring notification on non cached dir") - return - } - for i := 0; i < len(entries); i++ { - if co, ok := entries[i].(*Object); ok { - co.CacheTs = time.Now().Add(f.fileAge * -1) - err = f.cache.AddObject(co) - if err != nil { - fs.Errorf(forgetPath, "notify: error expiring '%v': %v", co, err) - } else { - fs.Debugf(forgetPath, "notify: expired %v", co) - } + // we expire the dir + err := f.cache.ExpireDir(cd) + if err != nil { + fs.Errorf(forgetPath, "notify: error expiring '%v': %v", cd, err) + } else { + fs.Debugf(forgetPath, "notify: expired '%v'", cd) } } - // finally, we expire the dir as well - err = f.cache.ExpireDir(cd) - if err != nil { - fs.Errorf(forgetPath, "notify: error expiring '%v': %v", cd, err) - } else { - fs.Debugf(forgetPath, "notify: expired '%v'", cd) - } + + f.notifiedMu.Lock() + defer f.notifiedMu.Unlock() + f.notifiedRemotes[forgetPath] = true + f.notifiedRemotes[cd.Remote()] = true } -// notifyDirChange takes a remote (can be dir or entry) and -// tries to determine which is it and notify upstreams of the dir change -func (f *Fs) notifyDirChange(remote string) { - var cd *Directory - co := NewObject(f, remote) - err := f.cache.GetObject(co) - if err == nil { - pd := cleanPath(path.Dir(remote)) - cd = NewDirectory(f, pd) - } else { - cd = NewDirectory(f, remote) - } - - f.notifyDirChangeUpstream(cd.Remote()) -} - -// notifyDirChangeUpstreamIfNeeded will check if the wrapped remote doesn't notify on dir changes +// notifyChangeUpstreamIfNeeded will check if the wrapped remote doesn't notify on changes // or if we use a temp fs -func (f *Fs) notifyDirChangeUpstreamIfNeeded(remote string) { - if f.Fs.Features().DirChangeNotify == nil || f.tempWritePath != "" { - f.notifyDirChangeUpstream(remote) +func (f *Fs) notifyChangeUpstreamIfNeeded(remote string, entryType fs.EntryType) { + if f.Fs.Features().ChangeNotify == nil || f.tempWritePath != "" { + f.notifyChangeUpstream(remote, entryType) } } -// notifyDirChangeUpstream will loop through all the upstreams and notify +// notifyChangeUpstream will loop through all the upstreams and notify // of the provided remote (should be only a dir) -func (f *Fs) notifyDirChangeUpstream(remote string) { +func (f *Fs) notifyChangeUpstream(remote string, entryType fs.EntryType) { + f.parentsForgetMu.Lock() + defer f.parentsForgetMu.Unlock() if len(f.parentsForgetFn) > 0 { for _, fn := range f.parentsForgetFn { - fn(remote) + fn(remote, entryType) } } } -// DirChangeNotify can subsribe multiple callers -// this is coupled with the wrapped fs DirChangeNotify (if it supports it) +// ChangeNotify can subsribe multiple callers +// this is coupled with the wrapped fs ChangeNotify (if it supports it) // and also notifies other caches (i.e VFS) to clear out whenever something changes -func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration) chan bool { - fs.Debugf(f, "subscribing to DirChangeNotify") +func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { + f.parentsForgetMu.Lock() + defer f.parentsForgetMu.Unlock() + fs.Debugf(f, "subscribing to ChangeNotify") f.parentsForgetFn = append(f.parentsForgetFn, notifyFunc) return make(chan bool) } @@ -649,12 +641,13 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { fs.Debugf(dir, "list: cached object: %v", co) case fs.Directory: cdd := DirectoryFromOriginal(f, o) - err := f.cache.AddDir(cdd) - if err != nil { - fs.Errorf(dir, "list: error caching dir from listing %v", o) - } else { - fs.Debugf(dir, "list: cached dir: %v", cdd) - } + // FIXME this overrides a possible expired dir + //err := f.cache.AddDir(cdd) + //if err != nil { + // fs.Errorf(dir, "list: error caching dir from listing %v", o) + //} else { + // fs.Debugf(dir, "list: cached dir: %v", cdd) + //} cachedEntries = append(cachedEntries, cdd) default: fs.Debugf(entry, "list: Unknown object type %T", entry) @@ -759,8 +752,8 @@ func (f *Fs) Mkdir(dir string) error { } else { fs.Infof(parentCd, "mkdir: cache expired") } - // advertise to DirChangeNotify if wrapped doesn't do that - f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote()) + // advertise to ChangeNotify if wrapped doesn't do that + f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) return nil } @@ -801,7 +794,7 @@ func (f *Fs) Rmdir(dir string) error { fs.Debugf(dir, "rmdir: read %v from temp fs", len(queuedEntries)) fs.Debugf(dir, "rmdir: temp fs entries: %v", queuedEntries) if len(queuedEntries) > 0 { - fs.Errorf(dir, "rmdir: temporary dir not empty") + fs.Errorf(dir, "rmdir: temporary dir not empty: %v", queuedEntries) return fs.ErrorDirectoryNotEmpty } } @@ -829,8 +822,8 @@ func (f *Fs) Rmdir(dir string) error { } else { fs.Infof(parentCd, "rmdir: cache expired") } - // advertise to DirChangeNotify if wrapped doesn't do that - f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote()) + // advertise to ChangeNotify if wrapped doesn't do that + f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) return nil } @@ -939,8 +932,8 @@ cleanup: } else { fs.Debugf(srcParent, "dirmove: cache expired") } - // advertise to DirChangeNotify if wrapped doesn't do that - f.notifyDirChangeUpstreamIfNeeded(srcParent.Remote()) + // advertise to ChangeNotify if wrapped doesn't do that + f.notifyChangeUpstreamIfNeeded(srcParent.Remote(), fs.EntryDirectory) // expire parent dir at the destination path dstParent := NewDirectory(f, cleanPath(path.Dir(dstRemote))) @@ -950,8 +943,8 @@ cleanup: } else { fs.Debugf(dstParent, "dirmove: cache expired") } - // advertise to DirChangeNotify if wrapped doesn't do that - f.notifyDirChangeUpstreamIfNeeded(dstParent.Remote()) + // advertise to ChangeNotify if wrapped doesn't do that + f.notifyChangeUpstreamIfNeeded(dstParent.Remote(), fs.EntryDirectory) // TODO: precache dst dir and save the chunks return nil @@ -1030,6 +1023,11 @@ func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put p // queue for upload and store in temp fs if configured if f.tempWritePath != "" { + // we need to clear the caches before a put through temp fs + parentCd := NewDirectory(f, cleanPath(path.Dir(src.Remote()))) + _ = f.cache.ExpireDir(parentCd) + f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) + obj, err = f.tempFs.Put(in, src, options...) if err != nil { fs.Errorf(obj, "put: failed to upload in temp fs: %v", err) @@ -1074,8 +1072,8 @@ func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put p } else { fs.Infof(parentCd, "put: cache expired") } - // advertise to DirChangeNotify - f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote()) + // advertise to ChangeNotify + f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) return cachedObj, nil } @@ -1164,8 +1162,8 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { } else { fs.Infof(parentCd, "copy: cache expired") } - // advertise to DirChangeNotify if wrapped doesn't do that - f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote()) + // advertise to ChangeNotify if wrapped doesn't do that + f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) // expire src parent srcParent := NewDirectory(f, cleanPath(path.Dir(src.Remote()))) err = f.cache.ExpireDir(srcParent) @@ -1174,8 +1172,8 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { } else { fs.Infof(srcParent, "copy: cache expired") } - // advertise to DirChangeNotify if wrapped doesn't do that - f.notifyDirChangeUpstreamIfNeeded(srcParent.Remote()) + // advertise to ChangeNotify if wrapped doesn't do that + f.notifyChangeUpstreamIfNeeded(srcParent.Remote(), fs.EntryDirectory) return co, nil } @@ -1260,8 +1258,8 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { } else { fs.Infof(parentCd, "move: cache expired") } - // advertise to DirChangeNotify if wrapped doesn't do that - f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote()) + // advertise to ChangeNotify if wrapped doesn't do that + f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) // persist new cachedObj := ObjectFromOriginal(f, obj).persist() fs.Debugf(cachedObj, "move: added to cache") @@ -1273,8 +1271,8 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { } else { fs.Infof(parentCd, "move: cache expired") } - // advertise to DirChangeNotify if wrapped doesn't do that - f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote()) + // advertise to ChangeNotify if wrapped doesn't do that + f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) return cachedObj, nil } @@ -1416,6 +1414,19 @@ func (f *Fs) GetBackgroundUploadChannel() chan BackgroundUploadState { return nil } +func (f *Fs) isNotifiedRemote(remote string) bool { + f.notifiedMu.Lock() + defer f.notifiedMu.Unlock() + + n, ok := f.notifiedRemotes[remote] + if !ok || !n { + return false + } + + delete(f.notifiedRemotes, remote) + return n +} + func cleanPath(p string) string { p = path.Clean(p) if p == "." || p == "/" { @@ -1427,16 +1438,16 @@ func cleanPath(p string) string { // Check the interfaces are satisfied var ( - _ fs.Fs = (*Fs)(nil) - _ fs.Purger = (*Fs)(nil) - _ fs.Copier = (*Fs)(nil) - _ fs.Mover = (*Fs)(nil) - _ fs.DirMover = (*Fs)(nil) - _ fs.PutUncheckeder = (*Fs)(nil) - _ fs.PutStreamer = (*Fs)(nil) - _ fs.CleanUpper = (*Fs)(nil) - _ fs.UnWrapper = (*Fs)(nil) - _ fs.Wrapper = (*Fs)(nil) - _ fs.ListRer = (*Fs)(nil) - _ fs.DirChangeNotifier = (*Fs)(nil) + _ fs.Fs = (*Fs)(nil) + _ fs.Purger = (*Fs)(nil) + _ fs.Copier = (*Fs)(nil) + _ fs.Mover = (*Fs)(nil) + _ fs.DirMover = (*Fs)(nil) + _ fs.PutUncheckeder = (*Fs)(nil) + _ fs.PutStreamer = (*Fs)(nil) + _ fs.CleanUpper = (*Fs)(nil) + _ fs.UnWrapper = (*Fs)(nil) + _ fs.Wrapper = (*Fs)(nil) + _ fs.ListRer = (*Fs)(nil) + _ fs.ChangeNotifier = (*Fs)(nil) ) diff --git a/backend/cache/cache_internal_test.go b/backend/cache/cache_internal_test.go index 06e63c031..c48ef301a 100644 --- a/backend/cache/cache_internal_test.go +++ b/backend/cache/cache_internal_test.go @@ -44,6 +44,7 @@ const ( cryptPassword2 = "NlgTBEIe-qibA7v-FoMfuX6Cw8KlLai_aMvV" // mv4mZW572HM cryptedTextBase64 = "UkNMT05FAAC320i2xIee0BiNyknSPBn+Qcw3q9FhIFp3tvq6qlqvbsno3PnxmEFeJG3jDBnR/wku2gHWeQ==" // one content cryptedText2Base64 = "UkNMT05FAAATcQkVsgjBh8KafCKcr0wdTa1fMmV0U8hsCLGFoqcvxKVmvv7wx3Hf5EXxFcki2FFV4sdpmSrb9Q==" // updated content + cryptedText3Base64 = "UkNMT05FAAB/f7YtYKbPfmk9+OX/ffN3qG3OEdWT+z74kxCX9V/YZwJ4X2DN3HOnUC3gKQ4Gcoud5UtNvQ==" // test content ) var ( @@ -444,32 +445,134 @@ func TestInternalWrappedFsChangeNotSeen(t *testing.T) { runInstance.writeRemoteBytes(t, rootFs, "data.bin", testData) // update in the wrapped fs + originalSize, err := runInstance.size(t, rootFs, "data.bin") + require.NoError(t, err) + log.Printf("original size: %v", originalSize) + o, err := cfs.UnWrap().NewObject(runInstance.encryptRemoteIfNeeded(t, "data.bin")) require.NoError(t, err) - wrappedTime := time.Now().Add(time.Hour * -1) - err = o.SetModTime(wrappedTime) + expectedSize := int64(len([]byte("test content"))) + var data2 []byte + if runInstance.rootIsCrypt { + data2, err = base64.StdEncoding.DecodeString(cryptedText3Base64) + require.NoError(t, err) + expectedSize = expectedSize + 1 // FIXME newline gets in, likely test data issue + } else { + data2 = []byte("test content") + } + objInfo := object.NewStaticObjectInfo(runInstance.encryptRemoteIfNeeded(t, "data.bin"), time.Now(), int64(len(data2)), true, nil, cfs.UnWrap()) + err = o.Update(bytes.NewReader(data2), objInfo) require.NoError(t, err) + require.Equal(t, int64(len(data2)), o.Size()) + log.Printf("updated size: %v", len(data2)) // get a new instance from the cache if runInstance.wrappedIsExternal { err = runInstance.retryBlock(func() error { - coModTime, err := runInstance.modTime(t, rootFs, "data.bin") + coSize, err := runInstance.size(t, rootFs, "data.bin") if err != nil { return err } - if coModTime.Unix() != o.ModTime().Unix() { - return errors.Errorf("%v <> %v", coModTime, o.ModTime()) + if coSize != expectedSize { + return errors.Errorf("%v <> %v", coSize, expectedSize) } return nil }, 12, time.Second*10) require.NoError(t, err) } else { - coModTime, err := runInstance.modTime(t, rootFs, "data.bin") + coSize, err := runInstance.size(t, rootFs, "data.bin") require.NoError(t, err) - require.NotEqual(t, coModTime.Unix(), o.ModTime().Unix()) + require.NotEqual(t, coSize, expectedSize) } } +func TestInternalMoveWithNotify(t *testing.T) { + id := fmt.Sprintf("timwn%v", time.Now().Unix()) + rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, false, true, nil, nil) + defer runInstance.cleanupFs(t, rootFs, boltDb) + if !runInstance.wrappedIsExternal { + t.Skipf("Not external") + } + + cfs, err := runInstance.getCacheFs(rootFs) + require.NoError(t, err) + + srcName := runInstance.encryptRemoteIfNeeded(t, "test") + "/" + runInstance.encryptRemoteIfNeeded(t, "one") + "/" + runInstance.encryptRemoteIfNeeded(t, "data.bin") + dstName := runInstance.encryptRemoteIfNeeded(t, "test") + "/" + runInstance.encryptRemoteIfNeeded(t, "second") + "/" + runInstance.encryptRemoteIfNeeded(t, "data.bin") + // create some rand test data + var testData []byte + if runInstance.rootIsCrypt { + testData, err = base64.StdEncoding.DecodeString(cryptedTextBase64) + require.NoError(t, err) + } else { + testData = []byte("test content") + } + _ = cfs.UnWrap().Mkdir(runInstance.encryptRemoteIfNeeded(t, "test")) + _ = cfs.UnWrap().Mkdir(runInstance.encryptRemoteIfNeeded(t, "test/one")) + _ = cfs.UnWrap().Mkdir(runInstance.encryptRemoteIfNeeded(t, "test/second")) + srcObj := runInstance.writeObjectBytes(t, cfs.UnWrap(), srcName, testData) + + // list in mount + _, err = runInstance.list(t, rootFs, "test") + require.NoError(t, err) + _, err = runInstance.list(t, rootFs, "test/one") + require.NoError(t, err) + + // move file + _, err = cfs.UnWrap().Features().Move(srcObj, dstName) + require.NoError(t, err) + + err = runInstance.retryBlock(func() error { + li, err := runInstance.list(t, rootFs, "test") + if err != nil { + log.Printf("err: %v", err) + return err + } + if len(li) != 2 { + log.Printf("not expected listing /test: %v", li) + return errors.Errorf("not expected listing /test: %v", li) + } + + li, err = runInstance.list(t, rootFs, "test/one") + if err != nil { + log.Printf("err: %v", err) + return err + } + if len(li) != 0 { + log.Printf("not expected listing /test/one: %v", li) + return errors.Errorf("not expected listing /test/one: %v", li) + } + + li, err = runInstance.list(t, rootFs, "test/second") + if err != nil { + log.Printf("err: %v", err) + return err + } + if len(li) != 1 { + log.Printf("not expected listing /test/second: %v", li) + return errors.Errorf("not expected listing /test/second: %v", li) + } + if fi, ok := li[0].(os.FileInfo); ok { + if fi.Name() != "data.bin" { + log.Printf("not expected name: %v", fi.Name()) + return errors.Errorf("not expected name: %v", fi.Name()) + } + } else if di, ok := li[0].(fs.DirEntry); ok { + if di.Remote() != "test/second/data.bin" { + log.Printf("not expected remote: %v", di.Remote()) + return errors.Errorf("not expected remote: %v", di.Remote()) + } + } else { + log.Printf("unexpected listing: %v", li) + return errors.Errorf("unexpected listing: %v", li) + } + + log.Printf("complete listing: %v", li) + return nil + }, 12, time.Second*10) + require.NoError(t, err) +} + func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) { id := fmt.Sprintf("ticsadcf%v", time.Now().Unix()) rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, false, true, nil, nil) @@ -1661,6 +1764,23 @@ func (r *run) modTime(t *testing.T, rootFs fs.Fs, src string) (time.Time, error) return obj1.ModTime(), nil } +func (r *run) size(t *testing.T, rootFs fs.Fs, src string) (int64, error) { + var err error + + if r.useMount { + fi, err := os.Stat(path.Join(runInstance.mntDir, src)) + if err != nil { + return int64(0), err + } + return fi.Size(), nil + } + obj1, err := rootFs.NewObject(src) + if err != nil { + return int64(0), err + } + return obj1.Size(), nil +} + func (r *run) updateData(t *testing.T, rootFs fs.Fs, src, data, append string) error { var err error diff --git a/backend/cache/cache_test.go b/backend/cache/cache_test.go index 26845cded..30b5e6df2 100644 --- a/backend/cache/cache_test.go +++ b/backend/cache/cache_test.go @@ -55,7 +55,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/cache/handle.go b/backend/cache/handle.go index ef25ab313..05e629532 100644 --- a/backend/cache/handle.go +++ b/backend/cache/handle.go @@ -658,7 +658,7 @@ func (b *backgroundWriter) run() { if err != nil { fs.Errorf(parentCd, "background upload: cache expire error: %v", err) } - b.fs.notifyDirChange(remote) + b.fs.notifyChangeUpstream(remote, fs.EntryObject) fs.Infof(remote, "finished background upload") b.notify(remote, BackgroundUploadCompleted, nil) } diff --git a/backend/cache/object.go b/backend/cache/object.go index 6047bbd6c..dd5ee03a6 100644 --- a/backend/cache/object.go +++ b/backend/cache/object.go @@ -132,19 +132,36 @@ func (o *Object) abs() string { // ModTime returns the cached ModTime func (o *Object) ModTime() time.Time { + _ = o.refresh() return time.Unix(0, o.CacheModTime) } // Size returns the cached Size func (o *Object) Size() int64 { + _ = o.refresh() return o.CacheSize } // Storable returns the cached Storable func (o *Object) Storable() bool { + _ = o.refresh() return o.CacheStorable } +// refresh will check if the object info is expired and request the info from source if it is +// all these conditions must be true to ignore a refresh +// 1. cache ts didn't expire yet +// 2. is not pending a notification from the wrapped fs +func (o *Object) refresh() error { + isNotified := o.CacheFs.isNotifiedRemote(o.Remote()) + isExpired := time.Now().After(o.CacheTs.Add(o.CacheFs.fileAge)) + if !isExpired && !isNotified { + return nil + } + + return o.refreshFromSource(true) +} + // refreshFromSource requests the original FS for the object in case it comes from a cached entry func (o *Object) refreshFromSource(force bool) error { o.refreshMutex.Lock() @@ -274,8 +291,8 @@ func (o *Object) Remove() error { _ = o.CacheFs.cache.removePendingUpload(o.abs()) parentCd := NewDirectory(o.CacheFs, cleanPath(path.Dir(o.Remote()))) _ = o.CacheFs.cache.ExpireDir(parentCd) - // advertise to DirChangeNotify if wrapped doesn't do that - o.CacheFs.notifyDirChangeUpstreamIfNeeded(parentCd.Remote()) + // advertise to ChangeNotify if wrapped doesn't do that + o.CacheFs.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory) return nil } @@ -283,6 +300,7 @@ func (o *Object) Remove() error { // Hash requests a hash of the object and stores in the cache // since it might or might not be called, this is lazy loaded func (o *Object) Hash(ht hash.Type) (string, error) { + _ = o.refresh() if o.CacheHashes == nil { o.CacheHashes = make(map[hash.Type]string) } diff --git a/backend/crypt/crypt.go b/backend/crypt/crypt.go index 8176a82bf..6d575db24 100644 --- a/backend/crypt/crypt.go +++ b/backend/crypt/crypt.go @@ -143,18 +143,18 @@ func NewFs(name, rpath string) (fs.Fs, error) { CanHaveEmptyDirectories: true, }).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs) - doDirChangeNotify := wrappedFs.Features().DirChangeNotify - if doDirChangeNotify != nil { - f.features.DirChangeNotify = func(notifyFunc func(string), pollInterval time.Duration) chan bool { - wrappedNotifyFunc := func(path string) { + doChangeNotify := wrappedFs.Features().ChangeNotify + if doChangeNotify != nil { + f.features.ChangeNotify = func(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { + wrappedNotifyFunc := func(path string, entryType fs.EntryType) { decrypted, err := f.DecryptFileName(path) if err != nil { - fs.Logf(f, "DirChangeNotify was unable to decrypt %q: %s", path, err) + fs.Logf(f, "ChangeNotify was unable to decrypt %q: %s", path, err) return } - notifyFunc(decrypted) + notifyFunc(decrypted, entryType) } - return doDirChangeNotify(wrappedNotifyFunc, pollInterval) + return doChangeNotify(wrappedNotifyFunc, pollInterval) } } diff --git a/backend/crypt/crypt2_test.go b/backend/crypt/crypt2_test.go index f3c29057b..7924c2993 100644 --- a/backend/crypt/crypt2_test.go +++ b/backend/crypt/crypt2_test.go @@ -52,7 +52,7 @@ func TestFsMove2(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove2(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull2(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision2(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify2(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify2(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString2(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs2(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote2(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/crypt/crypt3_test.go b/backend/crypt/crypt3_test.go index 13d4f90f9..06c9dc9bc 100644 --- a/backend/crypt/crypt3_test.go +++ b/backend/crypt/crypt3_test.go @@ -52,7 +52,7 @@ func TestFsMove3(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove3(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull3(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision3(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify3(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify3(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString3(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs3(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote3(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/crypt/crypt_test.go b/backend/crypt/crypt_test.go index 133754b9b..749cd2f8f 100644 --- a/backend/crypt/crypt_test.go +++ b/backend/crypt/crypt_test.go @@ -52,7 +52,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/drive/drive.go b/backend/drive/drive.go index b393ffccb..1e33eb3b3 100644 --- a/backend/drive/drive.go +++ b/backend/drive/drive.go @@ -16,7 +16,6 @@ import ( "net/url" "os" "path" - "sort" "strconv" "strings" "sync" @@ -1182,14 +1181,13 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { return nil } -// DirChangeNotify polls for changes from the remote and hands the path to the -// given function. Only changes that can be resolved to a path through the -// DirCache will handled. +// ChangeNotify calls the passed function with a path that has had changes. +// If the implementation uses polling, it should adhere to the given interval. // // Automatically restarts itself in case of unexpected behaviour of the remote. // // Close the returned channel to stop being notified. -func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration) chan bool { +func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool { quit := make(chan bool) go func() { select { @@ -1197,7 +1195,7 @@ func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration return default: for { - f.dirchangeNotifyRunner(notifyFunc, pollInterval) + f.changeNotifyRunner(notifyFunc, pollInterval) fs.Debugf(f, "Notify listener service ran into issues, restarting shortly.") time.Sleep(pollInterval) } @@ -1206,11 +1204,8 @@ func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration return quit } -func (f *Fs) dirchangeNotifyRunner(notifyFunc func(string), pollInterval time.Duration) { +func (f *Fs) changeNotifyRunner(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) { var err error - var changeList *drive.ChangeList - var pageToken string - var startPageToken *drive.StartPageToken err = f.pacer.Call(func() (bool, error) { startPageToken, err = f.svc.Changes.GetStartPageToken().SupportsTeamDrives(f.isTeamDrive).Do() @@ -1220,12 +1215,14 @@ func (f *Fs) dirchangeNotifyRunner(notifyFunc func(string), pollInterval time.Du fs.Debugf(f, "Failed to get StartPageToken: %v", err) return } - pageToken = startPageToken.StartPageToken + pageToken := startPageToken.StartPageToken for { fs.Debugf(f, "Checking for changes on remote") + var changeList *drive.ChangeList + err = f.pacer.Call(func() (bool, error) { - changesCall := f.svc.Changes.List(pageToken).Fields("nextPageToken,newStartPageToken,changes(fileId,file/parents)") + changesCall := f.svc.Changes.List(pageToken).Fields("nextPageToken,newStartPageToken,changes(fileId,file(name,parents,mimeType))") if *driveListChunk > 0 { changesCall = changesCall.PageSize(*driveListChunk) } @@ -1237,28 +1234,47 @@ func (f *Fs) dirchangeNotifyRunner(notifyFunc func(string), pollInterval time.Du return } - pathsToClear := make([]string, 0) + type entryType struct { + path string + entryType fs.EntryType + } + var pathsToClear []entryType for _, change := range changeList.Changes { if path, ok := f.dirCache.GetInv(change.FileId); ok { - pathsToClear = append(pathsToClear, path) + if change.File != nil && change.File.MimeType != driveFolderType { + pathsToClear = append(pathsToClear, entryType{path: path, entryType: fs.EntryObject}) + } else { + pathsToClear = append(pathsToClear, entryType{path: path, entryType: fs.EntryDirectory}) + } + continue } - if change.File != nil { - for _, parent := range change.File.Parents { - if path, ok := f.dirCache.GetInv(parent); ok { - pathsToClear = append(pathsToClear, path) + if change.File != nil && change.File.MimeType != driveFolderType { + // translate the parent dir of this object + if len(change.File.Parents) > 0 { + if path, ok := f.dirCache.GetInv(change.File.Parents[0]); ok { + // and append the drive file name to compute the full file name + if len(path) > 0 { + path = path + "/" + change.File.Name + } else { + path = change.File.Name + } + // this will now clear the actual file too + pathsToClear = append(pathsToClear, entryType{path: path, entryType: fs.EntryObject}) } + } else { // a true root object that is changed + pathsToClear = append(pathsToClear, entryType{path: change.File.Name, entryType: fs.EntryObject}) } } } - lastNotifiedPath := "" - sort.Strings(pathsToClear) - for _, path := range pathsToClear { - if lastNotifiedPath != "" && (path == lastNotifiedPath || strings.HasPrefix(path+"/", lastNotifiedPath)) { + + visitedPaths := make(map[string]bool) + for _, entry := range pathsToClear { + if _, ok := visitedPaths[entry.path]; ok { continue } - lastNotifiedPath = path - notifyFunc(path) + visitedPaths[entry.path] = true + notifyFunc(entry.path, entry.entryType) } if changeList.NewStartPageToken != "" { @@ -1567,17 +1583,17 @@ func (o *Object) MimeType() string { // Check the interfaces are satisfied var ( - _ fs.Fs = (*Fs)(nil) - _ fs.Purger = (*Fs)(nil) - _ fs.CleanUpper = (*Fs)(nil) - _ fs.PutStreamer = (*Fs)(nil) - _ fs.Copier = (*Fs)(nil) - _ fs.Mover = (*Fs)(nil) - _ fs.DirMover = (*Fs)(nil) - _ fs.DirCacheFlusher = (*Fs)(nil) - _ fs.DirChangeNotifier = (*Fs)(nil) - _ fs.PutUncheckeder = (*Fs)(nil) - _ fs.MergeDirser = (*Fs)(nil) - _ fs.Object = (*Object)(nil) - _ fs.MimeTyper = &Object{} + _ fs.Fs = (*Fs)(nil) + _ fs.Purger = (*Fs)(nil) + _ fs.CleanUpper = (*Fs)(nil) + _ fs.PutStreamer = (*Fs)(nil) + _ fs.Copier = (*Fs)(nil) + _ fs.Mover = (*Fs)(nil) + _ fs.DirMover = (*Fs)(nil) + _ fs.DirCacheFlusher = (*Fs)(nil) + _ fs.ChangeNotifier = (*Fs)(nil) + _ fs.PutUncheckeder = (*Fs)(nil) + _ fs.MergeDirser = (*Fs)(nil) + _ fs.Object = (*Object)(nil) + _ fs.MimeTyper = &Object{} ) diff --git a/backend/drive/drive_test.go b/backend/drive/drive_test.go index 27c44900e..a2b1232df 100644 --- a/backend/drive/drive_test.go +++ b/backend/drive/drive_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/dropbox/dropbox_test.go b/backend/dropbox/dropbox_test.go index 36f5136cf..917b9f828 100644 --- a/backend/dropbox/dropbox_test.go +++ b/backend/dropbox/dropbox_test.go @@ -54,7 +54,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/ftp/ftp_test.go b/backend/ftp/ftp_test.go index d19661bc0..daafe98b3 100644 --- a/backend/ftp/ftp_test.go +++ b/backend/ftp/ftp_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/googlecloudstorage/googlecloudstorage_test.go b/backend/googlecloudstorage/googlecloudstorage_test.go index 9187759b0..3dc46a839 100644 --- a/backend/googlecloudstorage/googlecloudstorage_test.go +++ b/backend/googlecloudstorage/googlecloudstorage_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/hubic/hubic_test.go b/backend/hubic/hubic_test.go index 892b34aab..1dc7ca061 100644 --- a/backend/hubic/hubic_test.go +++ b/backend/hubic/hubic_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/local/local_test.go b/backend/local/local_test.go index 0f81dbcfb..febe5aff0 100644 --- a/backend/local/local_test.go +++ b/backend/local/local_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/onedrive/onedrive_test.go b/backend/onedrive/onedrive_test.go index e030160d0..5539083c9 100644 --- a/backend/onedrive/onedrive_test.go +++ b/backend/onedrive/onedrive_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/pcloud/pcloud_test.go b/backend/pcloud/pcloud_test.go index 00efb1c89..d4414d17f 100644 --- a/backend/pcloud/pcloud_test.go +++ b/backend/pcloud/pcloud_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/qingstor/qingstor_test.go b/backend/qingstor/qingstor_test.go index 85d7a00cb..d530d0063 100644 --- a/backend/qingstor/qingstor_test.go +++ b/backend/qingstor/qingstor_test.go @@ -54,7 +54,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/s3/s3_test.go b/backend/s3/s3_test.go index f0a040ecd..c3da338ac 100644 --- a/backend/s3/s3_test.go +++ b/backend/s3/s3_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/sftp/sftp_test.go b/backend/sftp/sftp_test.go index d1b0c928b..926267892 100644 --- a/backend/sftp/sftp_test.go +++ b/backend/sftp/sftp_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/swift/swift_test.go b/backend/swift/swift_test.go index d95c4dddd..9cb980426 100644 --- a/backend/swift/swift_test.go +++ b/backend/swift/swift_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/webdav/webdav_test.go b/backend/webdav/webdav_test.go index 6e5690478..15bd22081 100644 --- a/backend/webdav/webdav_test.go +++ b/backend/webdav/webdav_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/backend/yandex/yandex_test.go b/backend/yandex/yandex_test.go index f786f2fed..1c888a791 100644 --- a/backend/yandex/yandex_test.go +++ b/backend/yandex/yandex_test.go @@ -51,7 +51,7 @@ func TestFsMove(t *testing.T) { fstests.TestFsMove(t) } func TestFsDirMove(t *testing.T) { fstests.TestFsDirMove(t) } func TestFsRmdirFull(t *testing.T) { fstests.TestFsRmdirFull(t) } func TestFsPrecision(t *testing.T) { fstests.TestFsPrecision(t) } -func TestFsDirChangeNotify(t *testing.T) { fstests.TestFsDirChangeNotify(t) } +func TestFsChangeNotify(t *testing.T) { fstests.TestFsChangeNotify(t) } func TestObjectString(t *testing.T) { fstests.TestObjectString(t) } func TestObjectFs(t *testing.T) { fstests.TestObjectFs(t) } func TestObjectRemote(t *testing.T) { fstests.TestObjectRemote(t) } diff --git a/cmd/mountlib/mounttest/dir.go b/cmd/mountlib/mounttest/dir.go index 81531d796..accdc341f 100644 --- a/cmd/mountlib/mounttest/dir.go +++ b/cmd/mountlib/mounttest/dir.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/ncw/rclone/fs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -178,11 +179,11 @@ func TestDirCacheFlush(t *testing.T) { require.NoError(t, err) // expect newly created "subdir" on remote to not show up - root.ForgetPath("otherdir") + root.ForgetPath("otherdir", fs.EntryDirectory) run.readLocal(t, localDm, "") assert.Equal(t, dm, localDm, "expected vs fuse mount") - root.ForgetPath("dir") + root.ForgetPath("dir", fs.EntryDirectory) dm = newDirMap("otherdir/|otherdir/file 1|dir/|dir/file 1|dir/subdir/") run.readLocal(t, localDm, "") assert.Equal(t, dm, localDm, "expected vs fuse mount") diff --git a/fs/fs.go b/fs/fs.go index 984b122e1..c46d7b35e 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -19,6 +19,9 @@ import ( "github.com/pkg/errors" ) +// EntryType can be associated with remote paths to identify their type +type EntryType int + // Constants const ( // ModTimeNotSupported is a very large precision value to show @@ -26,6 +29,10 @@ const ( ModTimeNotSupported = 100 * 365 * 24 * time.Hour // MaxLevel is a sentinel representing an infinite depth for listings MaxLevel = math.MaxInt32 + // EntryDirectory should be used to classify remote paths in directories + EntryDirectory EntryType = iota // 0 + // EntryObject should be used to classify remote paths in objects + EntryObject // 1 ) // Globals @@ -303,10 +310,10 @@ type Features struct { // If destination exists then return fs.ErrorDirExists DirMove func(src Fs, srcRemote, dstRemote string) error - // DirChangeNotify calls the passed function with a path - // of a directory that has had changes. If the implementation + // ChangeNotify calls the passed function with a path + // that has had changes. If the implementation // uses polling, it should adhere to the given interval. - DirChangeNotify func(func(string), time.Duration) chan bool + ChangeNotify func(func(string, EntryType), time.Duration) chan bool // UnWrap returns the Fs that this Fs is wrapping UnWrap func() Fs @@ -423,8 +430,8 @@ func (ft *Features) Fill(f Fs) *Features { if do, ok := f.(DirMover); ok { ft.DirMove = do.DirMove } - if do, ok := f.(DirChangeNotifier); ok { - ft.DirChangeNotify = do.DirChangeNotify + if do, ok := f.(ChangeNotifier); ok { + ft.ChangeNotify = do.ChangeNotify } if do, ok := f.(UnWrapper); ok { ft.UnWrap = do.UnWrap @@ -480,8 +487,8 @@ func (ft *Features) Mask(f Fs) *Features { if mask.DirMove == nil { ft.DirMove = nil } - if mask.DirChangeNotify == nil { - ft.DirChangeNotify = nil + if mask.ChangeNotify == nil { + ft.ChangeNotify = nil } // if mask.UnWrap == nil { // ft.UnWrap = nil @@ -583,12 +590,12 @@ type DirMover interface { DirMove(src Fs, srcRemote, dstRemote string) error } -// DirChangeNotifier is an optional interface for Fs -type DirChangeNotifier interface { - // DirChangeNotify calls the passed function with a path - // of a directory that has had changes. If the implementation +// ChangeNotifier is an optional interface for Fs +type ChangeNotifier interface { + // ChangeNotify calls the passed function with a path + // that has had changes. If the implementation // uses polling, it should adhere to the given interval. - DirChangeNotify(func(string), time.Duration) chan bool + ChangeNotify(func(string, EntryType), time.Duration) chan bool } // UnWrapper is an optional interfaces for Fs diff --git a/fstest/fstests/fstests.go b/fstest/fstests/fstests.go index e7c4dbb68..7676383d5 100644 --- a/fstest/fstests/fstests.go +++ b/fstest/fstests/fstests.go @@ -685,34 +685,51 @@ func TestFsPrecision(t *testing.T) { // FIXME check expected precision } -// TestFsDirChangeNotify tests that changes to directories are properly +// TestFsChangeNotify tests that changes are properly // propagated // -// go test -v -remote TestDrive: -run '^Test(Setup|Init|FsDirChangeNotify)$' -verbose -func TestFsDirChangeNotify(t *testing.T) { +// go test -v -remote TestDrive: -run '^Test(Setup|Init|FsChangeNotify)$' -verbose +func TestFsChangeNotify(t *testing.T) { skipIfNotOk(t) - // Check have DirChangeNotify - doDirChangeNotify := remote.Features().DirChangeNotify - if doDirChangeNotify == nil { - t.Skip("FS has no DirChangeNotify interface") + // Check have ChangeNotify + doChangeNotify := remote.Features().ChangeNotify + if doChangeNotify == nil { + t.Skip("FS has no ChangeNotify interface") } err := operations.Mkdir(remote, "dir") require.NoError(t, err) - changes := []string{} - quitChannel := doDirChangeNotify(func(x string) { - changes = append(changes, x) + dirChanges := []string{} + objChanges := []string{} + quitChannel := doChangeNotify(func(x string, e fs.EntryType) { + if e == fs.EntryDirectory { + dirChanges = append(dirChanges, x) + } else if e == fs.EntryObject { + objChanges = append(objChanges, x) + } }, time.Second) defer func() { close(quitChannel) }() - err = operations.Mkdir(remote, "dir/subdir") - require.NoError(t, err) + for _, idx := range []int{1, 3, 2} { + err = operations.Mkdir(remote, fmt.Sprintf("dir/subdir%d", idx)) + require.NoError(t, err) + } - time.Sleep(2 * time.Second) + contents := fstest.RandomString(100) + buf := bytes.NewBufferString(contents) - assert.Equal(t, []string{"dir"}, changes) + for _, idx := range []int{2, 4, 3} { + obji := object.NewStaticObjectInfo(fmt.Sprintf("dir/file%d", idx), time.Now(), int64(buf.Len()), true, nil, nil) + _, err = remote.Put(buf, obji) + require.NoError(t, err) + } + + time.Sleep(3 * time.Second) + + assert.Equal(t, []string{"dir/subdir1", "dir/subdir3", "dir/subdir2"}, dirChanges) + assert.Equal(t, []string{"dir/file2", "dir/file4", "dir/file3"}, objChanges) } // TestObjectString tests the Object String method diff --git a/vfs/dir.go b/vfs/dir.go index 5002881d6..c0acdbf85 100644 --- a/vfs/dir.go +++ b/vfs/dir.go @@ -95,7 +95,7 @@ func (d *Dir) Node() Node { // ForgetAll ensures the directory and all its children are purged // from the cache. func (d *Dir) ForgetAll() { - d.ForgetPath("") + d.ForgetPath("", fs.EntryDirectory) } // ForgetPath clears the cache for itself and all subdirectories if @@ -103,9 +103,13 @@ func (d *Dir) ForgetAll() { // directory it is called from. // It is not possible to traverse the directory tree upwards, i.e. // you cannot clear the cache for the Dir's ancestors or siblings. -func (d *Dir) ForgetPath(relativePath string) { +func (d *Dir) ForgetPath(relativePath string, entryType fs.EntryType) { + // if we are requested to forget a file, we use its parent absPath := path.Join(d.path, relativePath) - if absPath == "." { + if entryType != fs.EntryDirectory { + absPath = path.Dir(absPath) + } + if absPath == "." || absPath == "/" { absPath = "" } diff --git a/vfs/dir_test.go b/vfs/dir_test.go index c71c981e7..6ce604166 100644 --- a/vfs/dir_test.go +++ b/vfs/dir_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fstest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -113,11 +114,11 @@ func TestDirForgetPath(t *testing.T) { assert.Equal(t, 1, len(root.items)) assert.Equal(t, 1, len(dir.items)) - root.ForgetPath("dir") + root.ForgetPath("dir", fs.EntryDirectory) assert.Equal(t, 1, len(root.items)) assert.Equal(t, 0, len(dir.items)) - root.ForgetPath("not/in/cache") + root.ForgetPath("not/in/cache", fs.EntryDirectory) assert.Equal(t, 1, len(root.items)) assert.Equal(t, 0, len(dir.items)) } diff --git a/vfs/vfs.go b/vfs/vfs.go index 26a0b11b9..e0fc4e5fb 100644 --- a/vfs/vfs.go +++ b/vfs/vfs.go @@ -217,7 +217,7 @@ func New(f fs.Fs, opt *Options) *VFS { // Start polling if required if vfs.Opt.PollInterval > 0 { - if do := vfs.f.Features().DirChangeNotify; do != nil { + if do := vfs.f.Features().ChangeNotify; do != nil { do(vfs.root.ForgetPath, vfs.Opt.PollInterval) } else { fs.Infof(f, "poll-interval is not supported by this remote")