fs: add ChangeNotify and backend support for it (#2094)

* fs: rename DirChangeNotify to ChangeNotify

* cache: switch to ChangeNotify

* ChangeNotify: keep order of notifications
This commit is contained in:
Remus Bunduc 2018-03-08 22:03:34 +02:00 committed by GitHub
parent b3f55d6bda
commit 70f07fd3ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 447 additions and 230 deletions

205
backend/cache/cache.go vendored
View file

@ -174,7 +174,10 @@ type Fs struct {
plexConnector *plexConnector
backgroundRunner *backgroundWriter
cleanupChan chan bool
parentsForgetFn []func(string)
parentsForgetFn []func(string, fs.EntryType)
notifiedRemotes map[string]bool
notifiedMu sync.Mutex
parentsForgetMu sync.Mutex
}
// parseRootPath returns a cleaned root path and a nil error or "" and an error when the path is invalid
@ -263,6 +266,7 @@ func NewFs(name, rootPath string) (fs.Fs, error) {
tempWritePath: *cacheTempWritePath,
tempWriteWait: waitTime,
cleanupChan: make(chan bool, 1),
notifiedRemotes: make(map[string]bool),
}
if f.chunkTotalSize < (f.chunkSize * int64(f.totalWorkers)) {
return nil, errors.Errorf("don't set cache-total-chunk-size(%v) less than cache-chunk-size(%v) * cache-workers(%v)",
@ -381,8 +385,8 @@ func NewFs(name, rootPath string) (fs.Fs, error) {
}
}()
if doDirChangeNotify := wrappedFs.Features().DirChangeNotify; doDirChangeNotify != nil {
doDirChangeNotify(f.receiveDirChangeNotify, f.chunkCleanInterval)
if doChangeNotify := wrappedFs.Features().ChangeNotify; doChangeNotify != nil {
doChangeNotify(f.receiveChangeNotify, f.chunkCleanInterval)
}
f.features = (&fs.Features{
@ -390,7 +394,7 @@ func NewFs(name, rootPath string) (fs.Fs, error) {
DuplicateFiles: false, // storage doesn't permit this
}).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs)
// override only those features that use a temp fs and it doesn't support them
f.features.DirChangeNotify = f.DirChangeNotify
//f.features.ChangeNotify = f.ChangeNotify
if f.tempWritePath != "" {
if f.tempFs.Features().Copy == nil {
f.features.Copy = nil
@ -414,85 +418,73 @@ func NewFs(name, rootPath string) (fs.Fs, error) {
return f, fsErr
}
func (f *Fs) receiveDirChangeNotify(forgetPath string) {
// receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files
func (f *Fs) receiveChangeNotify(forgetPath string, entryType fs.EntryType) {
fs.Debugf(f, "notify: expiring cache for '%v'", forgetPath)
// notify upstreams too (vfs)
f.notifyDirChange(forgetPath)
f.notifyChangeUpstream(forgetPath, entryType)
var cd *Directory
co := NewObject(f, forgetPath)
err := f.cache.GetObject(co)
if err == nil {
if entryType == fs.EntryObject {
co := NewObject(f, forgetPath)
err := f.cache.GetObject(co)
if err != nil {
fs.Debugf(f, "ignoring change notification for non cached entry %v", co)
return
}
// expire the entry
co.CacheTs = time.Now().Add(f.fileAge * -1)
err = f.cache.AddObject(co)
if err != nil {
fs.Errorf(forgetPath, "notify: error expiring '%v': %v", co, err)
} else {
fs.Debugf(forgetPath, "notify: expired %v", co)
}
cd = NewDirectory(f, cleanPath(path.Dir(co.Remote())))
} else {
cd = NewDirectory(f, forgetPath)
}
// we list all the cached objects and expire all of them
entries, err := f.cache.GetDirEntries(cd)
if err != nil {
fs.Debugf(forgetPath, "notify: ignoring notification on non cached dir")
return
}
for i := 0; i < len(entries); i++ {
if co, ok := entries[i].(*Object); ok {
co.CacheTs = time.Now().Add(f.fileAge * -1)
err = f.cache.AddObject(co)
if err != nil {
fs.Errorf(forgetPath, "notify: error expiring '%v': %v", co, err)
} else {
fs.Debugf(forgetPath, "notify: expired %v", co)
}
// we expire the dir
err := f.cache.ExpireDir(cd)
if err != nil {
fs.Errorf(forgetPath, "notify: error expiring '%v': %v", cd, err)
} else {
fs.Debugf(forgetPath, "notify: expired '%v'", cd)
}
}
// finally, we expire the dir as well
err = f.cache.ExpireDir(cd)
if err != nil {
fs.Errorf(forgetPath, "notify: error expiring '%v': %v", cd, err)
} else {
fs.Debugf(forgetPath, "notify: expired '%v'", cd)
}
f.notifiedMu.Lock()
defer f.notifiedMu.Unlock()
f.notifiedRemotes[forgetPath] = true
f.notifiedRemotes[cd.Remote()] = true
}
// notifyDirChange takes a remote (can be dir or entry) and
// tries to determine which is it and notify upstreams of the dir change
func (f *Fs) notifyDirChange(remote string) {
var cd *Directory
co := NewObject(f, remote)
err := f.cache.GetObject(co)
if err == nil {
pd := cleanPath(path.Dir(remote))
cd = NewDirectory(f, pd)
} else {
cd = NewDirectory(f, remote)
}
f.notifyDirChangeUpstream(cd.Remote())
}
// notifyDirChangeUpstreamIfNeeded will check if the wrapped remote doesn't notify on dir changes
// notifyChangeUpstreamIfNeeded will check if the wrapped remote doesn't notify on changes
// or if we use a temp fs
func (f *Fs) notifyDirChangeUpstreamIfNeeded(remote string) {
if f.Fs.Features().DirChangeNotify == nil || f.tempWritePath != "" {
f.notifyDirChangeUpstream(remote)
func (f *Fs) notifyChangeUpstreamIfNeeded(remote string, entryType fs.EntryType) {
if f.Fs.Features().ChangeNotify == nil || f.tempWritePath != "" {
f.notifyChangeUpstream(remote, entryType)
}
}
// notifyDirChangeUpstream will loop through all the upstreams and notify
// notifyChangeUpstream will loop through all the upstreams and notify
// of the provided remote (should be only a dir)
func (f *Fs) notifyDirChangeUpstream(remote string) {
func (f *Fs) notifyChangeUpstream(remote string, entryType fs.EntryType) {
f.parentsForgetMu.Lock()
defer f.parentsForgetMu.Unlock()
if len(f.parentsForgetFn) > 0 {
for _, fn := range f.parentsForgetFn {
fn(remote)
fn(remote, entryType)
}
}
}
// DirChangeNotify can subsribe multiple callers
// this is coupled with the wrapped fs DirChangeNotify (if it supports it)
// ChangeNotify can subsribe multiple callers
// this is coupled with the wrapped fs ChangeNotify (if it supports it)
// and also notifies other caches (i.e VFS) to clear out whenever something changes
func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration) chan bool {
fs.Debugf(f, "subscribing to DirChangeNotify")
func (f *Fs) ChangeNotify(notifyFunc func(string, fs.EntryType), pollInterval time.Duration) chan bool {
f.parentsForgetMu.Lock()
defer f.parentsForgetMu.Unlock()
fs.Debugf(f, "subscribing to ChangeNotify")
f.parentsForgetFn = append(f.parentsForgetFn, notifyFunc)
return make(chan bool)
}
@ -649,12 +641,13 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) {
fs.Debugf(dir, "list: cached object: %v", co)
case fs.Directory:
cdd := DirectoryFromOriginal(f, o)
err := f.cache.AddDir(cdd)
if err != nil {
fs.Errorf(dir, "list: error caching dir from listing %v", o)
} else {
fs.Debugf(dir, "list: cached dir: %v", cdd)
}
// FIXME this overrides a possible expired dir
//err := f.cache.AddDir(cdd)
//if err != nil {
// fs.Errorf(dir, "list: error caching dir from listing %v", o)
//} else {
// fs.Debugf(dir, "list: cached dir: %v", cdd)
//}
cachedEntries = append(cachedEntries, cdd)
default:
fs.Debugf(entry, "list: Unknown object type %T", entry)
@ -759,8 +752,8 @@ func (f *Fs) Mkdir(dir string) error {
} else {
fs.Infof(parentCd, "mkdir: cache expired")
}
// advertise to DirChangeNotify if wrapped doesn't do that
f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote())
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
return nil
}
@ -801,7 +794,7 @@ func (f *Fs) Rmdir(dir string) error {
fs.Debugf(dir, "rmdir: read %v from temp fs", len(queuedEntries))
fs.Debugf(dir, "rmdir: temp fs entries: %v", queuedEntries)
if len(queuedEntries) > 0 {
fs.Errorf(dir, "rmdir: temporary dir not empty")
fs.Errorf(dir, "rmdir: temporary dir not empty: %v", queuedEntries)
return fs.ErrorDirectoryNotEmpty
}
}
@ -829,8 +822,8 @@ func (f *Fs) Rmdir(dir string) error {
} else {
fs.Infof(parentCd, "rmdir: cache expired")
}
// advertise to DirChangeNotify if wrapped doesn't do that
f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote())
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
return nil
}
@ -939,8 +932,8 @@ cleanup:
} else {
fs.Debugf(srcParent, "dirmove: cache expired")
}
// advertise to DirChangeNotify if wrapped doesn't do that
f.notifyDirChangeUpstreamIfNeeded(srcParent.Remote())
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(srcParent.Remote(), fs.EntryDirectory)
// expire parent dir at the destination path
dstParent := NewDirectory(f, cleanPath(path.Dir(dstRemote)))
@ -950,8 +943,8 @@ cleanup:
} else {
fs.Debugf(dstParent, "dirmove: cache expired")
}
// advertise to DirChangeNotify if wrapped doesn't do that
f.notifyDirChangeUpstreamIfNeeded(dstParent.Remote())
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(dstParent.Remote(), fs.EntryDirectory)
// TODO: precache dst dir and save the chunks
return nil
@ -1030,6 +1023,11 @@ func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put p
// queue for upload and store in temp fs if configured
if f.tempWritePath != "" {
// we need to clear the caches before a put through temp fs
parentCd := NewDirectory(f, cleanPath(path.Dir(src.Remote())))
_ = f.cache.ExpireDir(parentCd)
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
obj, err = f.tempFs.Put(in, src, options...)
if err != nil {
fs.Errorf(obj, "put: failed to upload in temp fs: %v", err)
@ -1074,8 +1072,8 @@ func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put p
} else {
fs.Infof(parentCd, "put: cache expired")
}
// advertise to DirChangeNotify
f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote())
// advertise to ChangeNotify
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
return cachedObj, nil
}
@ -1164,8 +1162,8 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
} else {
fs.Infof(parentCd, "copy: cache expired")
}
// advertise to DirChangeNotify if wrapped doesn't do that
f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote())
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
// expire src parent
srcParent := NewDirectory(f, cleanPath(path.Dir(src.Remote())))
err = f.cache.ExpireDir(srcParent)
@ -1174,8 +1172,8 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
} else {
fs.Infof(srcParent, "copy: cache expired")
}
// advertise to DirChangeNotify if wrapped doesn't do that
f.notifyDirChangeUpstreamIfNeeded(srcParent.Remote())
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(srcParent.Remote(), fs.EntryDirectory)
return co, nil
}
@ -1260,8 +1258,8 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) {
} else {
fs.Infof(parentCd, "move: cache expired")
}
// advertise to DirChangeNotify if wrapped doesn't do that
f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote())
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
// persist new
cachedObj := ObjectFromOriginal(f, obj).persist()
fs.Debugf(cachedObj, "move: added to cache")
@ -1273,8 +1271,8 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) {
} else {
fs.Infof(parentCd, "move: cache expired")
}
// advertise to DirChangeNotify if wrapped doesn't do that
f.notifyDirChangeUpstreamIfNeeded(parentCd.Remote())
// advertise to ChangeNotify if wrapped doesn't do that
f.notifyChangeUpstreamIfNeeded(parentCd.Remote(), fs.EntryDirectory)
return cachedObj, nil
}
@ -1416,6 +1414,19 @@ func (f *Fs) GetBackgroundUploadChannel() chan BackgroundUploadState {
return nil
}
func (f *Fs) isNotifiedRemote(remote string) bool {
f.notifiedMu.Lock()
defer f.notifiedMu.Unlock()
n, ok := f.notifiedRemotes[remote]
if !ok || !n {
return false
}
delete(f.notifiedRemotes, remote)
return n
}
func cleanPath(p string) string {
p = path.Clean(p)
if p == "." || p == "/" {
@ -1427,16 +1438,16 @@ func cleanPath(p string) string {
// Check the interfaces are satisfied
var (
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil)
_ fs.UnWrapper = (*Fs)(nil)
_ fs.Wrapper = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil)
_ fs.DirChangeNotifier = (*Fs)(nil)
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil)
_ fs.UnWrapper = (*Fs)(nil)
_ fs.Wrapper = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
)