cache: add support for polling

This commit is contained in:
remusb 2018-02-10 22:01:05 +02:00 committed by GitHub
parent 8a25ca786c
commit b33e3f779c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 351 additions and 94 deletions

151
backend/cache/cache.go vendored
View file

@ -174,6 +174,7 @@ type Fs struct {
plexConnector *plexConnector plexConnector *plexConnector
backgroundRunner *backgroundWriter backgroundRunner *backgroundWriter
cleanupChan chan bool cleanupChan chan bool
parentsForgetFn []func(string)
} }
// parseRootPath returns a cleaned root path and a nil error or "" and an error when the path is invalid // parseRootPath returns a cleaned root path and a nil error or "" and an error when the path is invalid
@ -380,24 +381,16 @@ func NewFs(name, rootPath string) (fs.Fs, error) {
} }
}() }()
// TODO: Explore something here but now it's not something we want if doDirChangeNotify := wrappedFs.Features().DirChangeNotify; doDirChangeNotify != nil {
// when writing from cache, source FS will send a notification and clear it out immediately doDirChangeNotify(f.receiveDirChangeNotify, f.chunkCleanInterval)
//setup dir notification }
//doDirChangeNotify := wrappedFs.Features().DirChangeNotify
//if doDirChangeNotify != nil {
// doDirChangeNotify(func(dir string) {
// d := NewAbsDirectory(f, dir)
// d.Flush()
// fs.Infof(dir, "updated from notification")
// }, time.Second * 10)
//}
f.features = (&fs.Features{ f.features = (&fs.Features{
CanHaveEmptyDirectories: true, CanHaveEmptyDirectories: true,
DuplicateFiles: false, // storage doesn't permit this DuplicateFiles: false, // storage doesn't permit this
DirChangeNotify: nil,
}).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs) }).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs)
// override only those features that use a temp fs and it doesn't support them // override only those features that use a temp fs and it doesn't support them
f.features.DirChangeNotify = f.DirChangeNotify
if f.tempWritePath != "" { if f.tempWritePath != "" {
if f.tempFs.Features().Copy == nil { if f.tempFs.Features().Copy == nil {
f.features.Copy = nil f.features.Copy = nil
@ -421,6 +414,81 @@ func NewFs(name, rootPath string) (fs.Fs, error) {
return f, fsErr return f, fsErr
} }
func (f *Fs) receiveDirChangeNotify(forgetPath string) {
fs.Debugf(f, "notify: expiring cache for '%v'", forgetPath)
// notify upstreams too (vfs)
f.notifyDirChange(forgetPath)
var cd *Directory
co := NewObject(f, forgetPath)
err := f.cache.GetObject(co)
if err == nil {
cd = NewDirectory(f, cleanPath(path.Dir(co.Remote())))
} else {
cd = NewDirectory(f, forgetPath)
}
// we list all the cached objects and expire all of them
entries, err := f.cache.GetDirEntries(cd)
if err != nil {
fs.Debugf(forgetPath, "notify: ignoring notification on non cached dir")
return
}
for i := 0; i < len(entries); i++ {
if co, ok := entries[i].(*Object); ok {
co.CacheTs = time.Now().Add(f.fileAge * -1)
err = f.cache.AddObject(co)
if err != nil {
fs.Errorf(forgetPath, "notify: error expiring '%v': %v", co, err)
} else {
fs.Debugf(forgetPath, "notify: expired %v", co)
}
}
}
// finally, we expire the dir as well
err = f.cache.ExpireDir(cd)
if err != nil {
fs.Errorf(forgetPath, "notify: error expiring '%v': %v", cd, err)
} else {
fs.Debugf(forgetPath, "notify: expired '%v'", cd)
}
}
// notifyDirChange takes a remote (can be dir or entry) and
// tries to determine which is it and notify upstreams of the dir change
func (f *Fs) notifyDirChange(remote string) {
var cd *Directory
co := NewObject(f, remote)
err := f.cache.GetObject(co)
if err == nil {
pd := cleanPath(path.Dir(remote))
cd = NewDirectory(f, pd)
} else {
cd = NewDirectory(f, remote)
}
f.notifyDirChangeUpstream(cd.Remote())
}
// notifyDirChangeUpstream will loop through all the upstreams and notify
// of the provided remote (should be only a dir)
func (f *Fs) notifyDirChangeUpstream(remote string) {
if len(f.parentsForgetFn) > 0 {
for _, fn := range f.parentsForgetFn {
fn(remote)
}
}
}
// DirChangeNotify can subsribe multiple callers
// this is coupled with the wrapped fs DirChangeNotify (if it supports it)
// and also notifies other caches (i.e VFS) to clear out whenever something changes
func (f *Fs) DirChangeNotify(notifyFunc func(string), pollInterval time.Duration) chan bool {
fs.Debugf(f, "subscribing to DirChangeNotify")
f.parentsForgetFn = append(f.parentsForgetFn, notifyFunc)
return make(chan bool)
}
// Name of the remote (as passed into NewFs) // Name of the remote (as passed into NewFs)
func (f *Fs) Name() string { func (f *Fs) Name() string {
return f.name return f.name
@ -683,6 +751,10 @@ func (f *Fs) Mkdir(dir string) error {
} else { } else {
fs.Infof(parentCd, "mkdir: cache expired") fs.Infof(parentCd, "mkdir: cache expired")
} }
// advertise to DirChangeNotify if wrapped doesn't do that
if f.Fs.Features().DirChangeNotify == nil {
f.notifyDirChangeUpstream(parentCd.Remote())
}
return nil return nil
} }
@ -751,6 +823,10 @@ func (f *Fs) Rmdir(dir string) error {
} else { } else {
fs.Infof(parentCd, "rmdir: cache expired") fs.Infof(parentCd, "rmdir: cache expired")
} }
// advertise to DirChangeNotify if wrapped doesn't do that
if f.Fs.Features().DirChangeNotify == nil {
f.notifyDirChangeUpstream(parentCd.Remote())
}
return nil return nil
} }
@ -847,6 +923,10 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
} else { } else {
fs.Debugf(srcParent, "dirmove: cache expired") fs.Debugf(srcParent, "dirmove: cache expired")
} }
// advertise to DirChangeNotify if wrapped doesn't do that
if f.Fs.Features().DirChangeNotify == nil {
f.notifyDirChangeUpstream(srcParent.Remote())
}
// expire parent dir at the destination path // expire parent dir at the destination path
dstParent := NewDirectory(f, cleanPath(path.Dir(dstRemote))) dstParent := NewDirectory(f, cleanPath(path.Dir(dstRemote)))
@ -856,6 +936,10 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
} else { } else {
fs.Debugf(dstParent, "dirmove: cache expired") fs.Debugf(dstParent, "dirmove: cache expired")
} }
// advertise to DirChangeNotify if wrapped doesn't do that
if f.Fs.Features().DirChangeNotify == nil {
f.notifyDirChangeUpstream(dstParent.Remote())
}
// TODO: precache dst dir and save the chunks // TODO: precache dst dir and save the chunks
return nil return nil
@ -978,6 +1062,10 @@ func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put p
} else { } else {
fs.Infof(parentCd, "put: cache expired") fs.Infof(parentCd, "put: cache expired")
} }
// advertise to DirChangeNotify if wrapped doesn't do that
if f.Fs.Features().DirChangeNotify == nil {
f.notifyDirChangeUpstream(parentCd.Remote())
}
return cachedObj, nil return cachedObj, nil
} }
@ -1066,6 +1154,10 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
} else { } else {
fs.Infof(parentCd, "copy: cache expired") fs.Infof(parentCd, "copy: cache expired")
} }
// advertise to DirChangeNotify if wrapped doesn't do that
if f.Fs.Features().DirChangeNotify == nil {
f.notifyDirChangeUpstream(parentCd.Remote())
}
// expire src parent // expire src parent
srcParent := NewDirectory(f, cleanPath(path.Dir(src.Remote()))) srcParent := NewDirectory(f, cleanPath(path.Dir(src.Remote())))
err = f.cache.ExpireDir(srcParent) err = f.cache.ExpireDir(srcParent)
@ -1074,6 +1166,10 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
} else { } else {
fs.Infof(srcParent, "copy: cache expired") fs.Infof(srcParent, "copy: cache expired")
} }
// advertise to DirChangeNotify if wrapped doesn't do that
if f.Fs.Features().DirChangeNotify == nil {
f.notifyDirChangeUpstream(srcParent.Remote())
}
return co, nil return co, nil
} }
@ -1158,6 +1254,10 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) {
} else { } else {
fs.Infof(parentCd, "move: cache expired") fs.Infof(parentCd, "move: cache expired")
} }
// advertise to DirChangeNotify if wrapped doesn't do that
if f.Fs.Features().DirChangeNotify == nil {
f.notifyDirChangeUpstream(parentCd.Remote())
}
// persist new // persist new
cachedObj := ObjectFromOriginal(f, obj).persist() cachedObj := ObjectFromOriginal(f, obj).persist()
fs.Debugf(cachedObj, "move: added to cache") fs.Debugf(cachedObj, "move: added to cache")
@ -1169,6 +1269,10 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) {
} else { } else {
fs.Infof(parentCd, "move: cache expired") fs.Infof(parentCd, "move: cache expired")
} }
// advertise to DirChangeNotify if wrapped doesn't do that
if f.Fs.Features().DirChangeNotify == nil {
f.notifyDirChangeUpstream(parentCd.Remote())
}
return cachedObj, nil return cachedObj, nil
} }
@ -1321,15 +1425,16 @@ func cleanPath(p string) string {
// Check the interfaces are satisfied // Check the interfaces are satisfied
var ( var (
_ fs.Fs = (*Fs)(nil) _ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil) _ fs.Purger = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil) _ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil) _ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil) _ fs.DirMover = (*Fs)(nil)
_ fs.PutUncheckeder = (*Fs)(nil) _ fs.PutUncheckeder = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil) _ fs.PutStreamer = (*Fs)(nil)
_ fs.CleanUpper = (*Fs)(nil) _ fs.CleanUpper = (*Fs)(nil)
_ fs.UnWrapper = (*Fs)(nil) _ fs.UnWrapper = (*Fs)(nil)
_ fs.Wrapper = (*Fs)(nil) _ fs.Wrapper = (*Fs)(nil)
_ fs.ListRer = (*Fs)(nil) _ fs.ListRer = (*Fs)(nil)
_ fs.DirChangeNotifier = (*Fs)(nil)
) )

View file

@ -54,34 +54,35 @@ var (
runInstance *run runInstance *run
errNotSupported = errors.New("not supported") errNotSupported = errors.New("not supported")
decryptedToEncryptedRemotes = map[string]string{ decryptedToEncryptedRemotes = map[string]string{
"one": "lm4u7jjt3c85bf56vjqgeenuno", "one": "lm4u7jjt3c85bf56vjqgeenuno",
"second": "qvt1ochrkcfbptp5mu9ugb2l14", "second": "qvt1ochrkcfbptp5mu9ugb2l14",
"test": "jn4tegjtpqro30t3o11thb4b5s", "test": "jn4tegjtpqro30t3o11thb4b5s",
"test2": "qakvqnh8ttei89e0gc76crpql4", "test2": "qakvqnh8ttei89e0gc76crpql4",
"data.bin": "0q2847tfko6mhj3dag3r809qbc", "data.bin": "0q2847tfko6mhj3dag3r809qbc",
"ticw/data.bin": "5mv97b0ule6pht33srae5pice8/0q2847tfko6mhj3dag3r809qbc", "ticw/data.bin": "5mv97b0ule6pht33srae5pice8/0q2847tfko6mhj3dag3r809qbc",
"tiutfo/test/one": "legd371aa8ol36tjfklt347qnc/jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno", "tiuufo/test/one": "vi6u1olqhirqv14cd8qlej1mgo/jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno",
"tiuufo/test/one": "vi6u1olqhirqv14cd8qlej1mgo/jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno", "tiuufo/test/second": "vi6u1olqhirqv14cd8qlej1mgo/jn4tegjtpqro30t3o11thb4b5s/qvt1ochrkcfbptp5mu9ugb2l14",
"tiutfo/second/one": "legd371aa8ol36tjfklt347qnc/qvt1ochrkcfbptp5mu9ugb2l14/lm4u7jjt3c85bf56vjqgeenuno", "tiutfo/test/one": "legd371aa8ol36tjfklt347qnc/jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno",
"second/one": "qvt1ochrkcfbptp5mu9ugb2l14/lm4u7jjt3c85bf56vjqgeenuno", "tiutfo/second/one": "legd371aa8ol36tjfklt347qnc/qvt1ochrkcfbptp5mu9ugb2l14/lm4u7jjt3c85bf56vjqgeenuno",
"test/one": "jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno", "second/one": "qvt1ochrkcfbptp5mu9ugb2l14/lm4u7jjt3c85bf56vjqgeenuno",
"test/second": "jn4tegjtpqro30t3o11thb4b5s/qvt1ochrkcfbptp5mu9ugb2l14", "test/one": "jn4tegjtpqro30t3o11thb4b5s/lm4u7jjt3c85bf56vjqgeenuno",
"test/third": "jn4tegjtpqro30t3o11thb4b5s/2nd7fjiop5h3ihfj1vl953aa5g", "test/second": "jn4tegjtpqro30t3o11thb4b5s/qvt1ochrkcfbptp5mu9ugb2l14",
"test/0.bin": "jn4tegjtpqro30t3o11thb4b5s/e6frddt058b6kvbpmlstlndmtk", "test/third": "jn4tegjtpqro30t3o11thb4b5s/2nd7fjiop5h3ihfj1vl953aa5g",
"test/1.bin": "jn4tegjtpqro30t3o11thb4b5s/kck472nt1k7qbmob0mt1p1crgc", "test/0.bin": "jn4tegjtpqro30t3o11thb4b5s/e6frddt058b6kvbpmlstlndmtk",
"test/2.bin": "jn4tegjtpqro30t3o11thb4b5s/744oe9ven2rmak4u27if51qk24", "test/1.bin": "jn4tegjtpqro30t3o11thb4b5s/kck472nt1k7qbmob0mt1p1crgc",
"test/3.bin": "jn4tegjtpqro30t3o11thb4b5s/2bjd8kef0u5lmsu6qhqll34bcs", "test/2.bin": "jn4tegjtpqro30t3o11thb4b5s/744oe9ven2rmak4u27if51qk24",
"test/4.bin": "jn4tegjtpqro30t3o11thb4b5s/cvjs73iv0a82v0c7r67avllh7s", "test/3.bin": "jn4tegjtpqro30t3o11thb4b5s/2bjd8kef0u5lmsu6qhqll34bcs",
"test/5.bin": "jn4tegjtpqro30t3o11thb4b5s/0plkdo790b6bnmt33qsdqmhv9c", "test/4.bin": "jn4tegjtpqro30t3o11thb4b5s/cvjs73iv0a82v0c7r67avllh7s",
"test/6.bin": "jn4tegjtpqro30t3o11thb4b5s/s5r633srnjtbh83893jovjt5d0", "test/5.bin": "jn4tegjtpqro30t3o11thb4b5s/0plkdo790b6bnmt33qsdqmhv9c",
"test/7.bin": "jn4tegjtpqro30t3o11thb4b5s/6rq45tr9bjsammku622flmqsu4", "test/6.bin": "jn4tegjtpqro30t3o11thb4b5s/s5r633srnjtbh83893jovjt5d0",
"test/8.bin": "jn4tegjtpqro30t3o11thb4b5s/37bc6tcl3e31qb8cadvjb749vk", "test/7.bin": "jn4tegjtpqro30t3o11thb4b5s/6rq45tr9bjsammku622flmqsu4",
"test/9.bin": "jn4tegjtpqro30t3o11thb4b5s/t4pr35hnls32789o8fk0chk1ec", "test/8.bin": "jn4tegjtpqro30t3o11thb4b5s/37bc6tcl3e31qb8cadvjb749vk",
"test/9.bin": "jn4tegjtpqro30t3o11thb4b5s/t4pr35hnls32789o8fk0chk1ec",
} }
) )
func init() { func init() {
goflag.StringVar(&remoteName, "remote-internal", "TestCache", "Remote to test with, defaults to local filesystem") goflag.StringVar(&remoteName, "remote-internal", "TestInternalCache", "Remote to test with, defaults to local filesystem")
goflag.StringVar(&mountDir, "mount-dir-internal", "", "") goflag.StringVar(&mountDir, "mount-dir-internal", "", "")
goflag.StringVar(&uploadDir, "upload-dir-internal", "", "") goflag.StringVar(&uploadDir, "upload-dir-internal", "", "")
goflag.BoolVar(&useMount, "cache-use-mount", false, "Test only with mount") goflag.BoolVar(&useMount, "cache-use-mount", false, "Test only with mount")
@ -109,8 +110,10 @@ func TestInternalListRootAndInnerRemotes(t *testing.T) {
defer runInstance.cleanupFs(t, rootFs2, boltDb2) defer runInstance.cleanupFs(t, rootFs2, boltDb2)
runInstance.writeObjectString(t, rootFs2, "one", "content") runInstance.writeObjectString(t, rootFs2, "one", "content")
listRoot := runInstance.list(t, rootFs, "") listRoot, err := runInstance.list(t, rootFs, "")
listRootInner := runInstance.list(t, rootFs, innerFolder) require.NoError(t, err)
listRootInner, err := runInstance.list(t, rootFs, innerFolder)
require.NoError(t, err)
listInner, err := rootFs2.List("") listInner, err := rootFs2.List("")
require.NoError(t, err) require.NoError(t, err)
@ -119,6 +122,104 @@ func TestInternalListRootAndInnerRemotes(t *testing.T) {
require.Len(t, listInner, 1) require.Len(t, listInner, 1)
} }
func TestInternalVfsCache(t *testing.T) {
vfsflags.Opt.DirCacheTime = time.Second * 30
testSize := int64(524288000)
vfsflags.Opt.CacheMode = vfs.CacheModeWrites
id := "tiuufo"
rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, true, true, nil, map[string]string{"cache-writes": "true", "cache-info-age": "1h"})
defer runInstance.cleanupFs(t, rootFs, boltDb)
err := rootFs.Mkdir("test")
require.NoError(t, err)
runInstance.writeObjectString(t, rootFs, "test/second", "content")
_, err = rootFs.List("test")
require.NoError(t, err)
testReader := runInstance.randomReader(t, testSize)
writeCh := make(chan interface{})
//write2Ch := make(chan interface{})
readCh := make(chan interface{})
cacheCh := make(chan interface{})
// write the main file
go func() {
defer func() {
writeCh <- true
}()
log.Printf("========== started writing file 'test/one'")
runInstance.writeRemoteReader(t, rootFs, "test/one", testReader)
log.Printf("========== done writing file 'test/one'")
}()
// routine to check which cache has what, autostarts
go func() {
for {
select {
case <-cacheCh:
log.Printf("========== finished checking caches")
return
default:
}
li2 := [2]string{path.Join("test", "one"), path.Join("test", "second")}
for _, r := range li2 {
var err error
ci, err := ioutil.ReadDir(path.Join(runInstance.chunkPath, runInstance.encryptRemoteIfNeeded(t, path.Join(id, r))))
if err != nil || len(ci) == 0 {
log.Printf("========== '%v' not in cache", r)
} else {
log.Printf("========== '%v' IN CACHE", r)
}
_, err = os.Stat(path.Join(runInstance.vfsCachePath, id, r))
if err != nil {
log.Printf("========== '%v' not in vfs", r)
} else {
log.Printf("========== '%v' IN VFS", r)
}
}
time.Sleep(time.Second * 10)
}
}()
// routine to list, autostarts
go func() {
for {
select {
case <-readCh:
log.Printf("========== finished checking listings and readings")
return
default:
}
li, err := runInstance.list(t, rootFs, "test")
if err != nil {
log.Printf("========== error listing 'test' folder: %v", err)
} else {
log.Printf("========== list 'test' folder count: %v", len(li))
}
time.Sleep(time.Second * 10)
}
}()
// wait for main file to be written
<-writeCh
log.Printf("========== waiting for VFS to expire")
time.Sleep(time.Second * 120)
// try a final read
li2 := [2]string{"test/one", "test/second"}
for _, r := range li2 {
_, err := runInstance.readDataFromRemote(t, rootFs, r, int64(0), int64(2), false)
if err != nil {
log.Printf("========== error reading '%v': %v", r, err)
} else {
log.Printf("========== read '%v'", r)
}
}
// close the cache and list checkers
cacheCh <- true
readCh <- true
}
func TestInternalObjWrapFsFound(t *testing.T) { func TestInternalObjWrapFsFound(t *testing.T) {
id := fmt.Sprintf("tiowff%v", time.Now().Unix()) id := fmt.Sprintf("tiowff%v", time.Now().Unix())
rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, true, true, nil, nil) rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, true, true, nil, nil)
@ -137,15 +238,18 @@ func TestInternalObjWrapFsFound(t *testing.T) {
} }
runInstance.writeObjectBytes(t, wrappedFs, runInstance.encryptRemoteIfNeeded(t, "test"), testData) runInstance.writeObjectBytes(t, wrappedFs, runInstance.encryptRemoteIfNeeded(t, "test"), testData)
listRoot := runInstance.list(t, rootFs, "") listRoot, err := runInstance.list(t, rootFs, "")
require.NoError(t, err)
require.Len(t, listRoot, 1) require.Len(t, listRoot, 1)
cachedData := runInstance.readDataFromRemote(t, rootFs, "test", 0, int64(len([]byte("test content"))), false) cachedData, err := runInstance.readDataFromRemote(t, rootFs, "test", 0, int64(len([]byte("test content"))), false)
require.NoError(t, err)
require.Equal(t, "test content", string(cachedData)) require.Equal(t, "test content", string(cachedData))
err = runInstance.rm(t, rootFs, "test") err = runInstance.rm(t, rootFs, "test")
require.NoError(t, err) require.NoError(t, err)
listRoot = runInstance.list(t, rootFs, "") listRoot, err = runInstance.list(t, rootFs, "")
require.NoError(t, err)
require.Len(t, listRoot, 0) require.Len(t, listRoot, 0)
} }
@ -179,7 +283,8 @@ func TestInternalRemoteWrittenFileFoundInMount(t *testing.T) {
} }
runInstance.writeObjectBytes(t, cfs.UnWrap(), runInstance.encryptRemoteIfNeeded(t, "test"), testData) runInstance.writeObjectBytes(t, cfs.UnWrap(), runInstance.encryptRemoteIfNeeded(t, "test"), testData)
data := runInstance.readDataFromRemote(t, rootFs, "test", 0, int64(len([]byte("test content"))), false) data, err := runInstance.readDataFromRemote(t, rootFs, "test", 0, int64(len([]byte("test content"))), false)
require.NoError(t, err)
require.Equal(t, "test content", string(data)) require.Equal(t, "test content", string(data))
} }
@ -202,7 +307,8 @@ func TestInternalCachedWrittenContentMatches(t *testing.T) {
sampleStart := chunkSize / 2 sampleStart := chunkSize / 2
sampleEnd := chunkSize sampleEnd := chunkSize
testSample := testData[sampleStart:sampleEnd] testSample := testData[sampleStart:sampleEnd]
checkSample := runInstance.readDataFromRemote(t, rootFs, "data.bin", sampleStart, sampleEnd, false) checkSample, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", sampleStart, sampleEnd, false)
require.NoError(t, err)
require.Equal(t, int64(len(checkSample)), sampleEnd-sampleStart) require.Equal(t, int64(len(checkSample)), sampleEnd-sampleStart)
require.Equal(t, checkSample, testSample) require.Equal(t, checkSample, testSample)
} }
@ -231,7 +337,8 @@ func TestInternalCachedUpdatedContentMatches(t *testing.T) {
require.Equal(t, o.Size(), int64(len(testData2))) require.Equal(t, o.Size(), int64(len(testData2)))
// check data from in-file // check data from in-file
checkSample := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, int64(len(testData2)), false) checkSample, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, int64(len(testData2)), false)
require.NoError(t, err)
require.Equal(t, checkSample, testData2) require.Equal(t, checkSample, testData2)
} }
@ -257,14 +364,16 @@ func TestInternalWrappedWrittenContentMatches(t *testing.T) {
require.Equal(t, o.Size(), int64(testSize)) require.Equal(t, o.Size(), int64(testSize))
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
data2 := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, int64(testSize), false) data2, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, int64(testSize), false)
require.NoError(t, err)
require.Equal(t, int64(len(data2)), o.Size()) require.Equal(t, int64(len(data2)), o.Size())
// check sample of data from in-file // check sample of data from in-file
sampleStart := chunkSize / 2 sampleStart := chunkSize / 2
sampleEnd := chunkSize sampleEnd := chunkSize
testSample := testData[sampleStart:sampleEnd] testSample := testData[sampleStart:sampleEnd]
checkSample := runInstance.readDataFromRemote(t, rootFs, "data.bin", sampleStart, sampleEnd, false) checkSample, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", sampleStart, sampleEnd, false)
require.NoError(t, err)
require.Equal(t, len(checkSample), len(testSample)) require.Equal(t, len(checkSample), len(testSample))
for i := 0; i < len(checkSample); i++ { for i := 0; i < len(checkSample); i++ {
@ -293,7 +402,8 @@ func TestInternalLargeWrittenContentMatches(t *testing.T) {
runInstance.writeObjectBytes(t, cfs.UnWrap(), "data.bin", testData) runInstance.writeObjectBytes(t, cfs.UnWrap(), "data.bin", testData)
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
readData := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, testSize, false) readData, err := runInstance.readDataFromRemote(t, rootFs, "data.bin", 0, testSize, false)
require.NoError(t, err)
for i := 0; i < len(readData); i++ { for i := 0; i < len(readData); i++ {
require.Equalf(t, testData[i], readData[i], "at byte %v", i) require.Equalf(t, testData[i], readData[i], "at byte %v", i)
} }
@ -320,9 +430,23 @@ func TestInternalWrappedFsChangeNotSeen(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// get a new instance from the cache // get a new instance from the cache
co, err := rootFs.NewObject("data.bin") if runInstance.wrappedIsExternal {
require.NoError(t, err) err = runInstance.retryBlock(func() error {
require.NotEqual(t, co.ModTime().String(), o.ModTime().String()) coModTime, err := runInstance.modTime(t, rootFs, "data.bin")
if err != nil {
return err
}
if coModTime.Unix() != o.ModTime().Unix() {
return errors.Errorf("%v <> %v", coModTime, o.ModTime())
}
return nil
}, 12, time.Second*10)
require.NoError(t, err)
} else {
coModTime, err := runInstance.modTime(t, rootFs, "data.bin")
require.NoError(t, err)
require.NotEqual(t, coModTime.Unix(), o.ModTime().Unix())
}
} }
func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) { func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) {
@ -425,17 +549,22 @@ func TestInternalExpiredEntriesRemoved(t *testing.T) {
runInstance.mkdir(t, rootFs, "test") runInstance.mkdir(t, rootFs, "test")
runInstance.writeRemoteString(t, rootFs, "test/second", "second content") runInstance.writeRemoteString(t, rootFs, "test/second", "second content")
l := runInstance.list(t, rootFs, "test") l, err := runInstance.list(t, rootFs, "test")
require.NoError(t, err)
require.Len(t, l, 1) require.Len(t, l, 1)
err = cfs.UnWrap().Mkdir(runInstance.encryptRemoteIfNeeded(t, "test/third")) err = cfs.UnWrap().Mkdir(runInstance.encryptRemoteIfNeeded(t, "test/third"))
require.NoError(t, err) require.NoError(t, err)
l = runInstance.list(t, rootFs, "test") l, err = runInstance.list(t, rootFs, "test")
require.NoError(t, err)
require.Len(t, l, 1) require.Len(t, l, 1)
err = runInstance.retryBlock(func() error { err = runInstance.retryBlock(func() error {
l = runInstance.list(t, rootFs, "test") l, err = runInstance.list(t, rootFs, "test")
if err != nil {
return err
}
if len(l) != 2 { if len(l) != 2 {
return errors.New("list is not 2") return errors.New("list is not 2")
} }
@ -470,7 +599,8 @@ func testInternalUploadQueueOneFile(t *testing.T, id string, rootFs fs.Fs, boltD
} else { } else {
require.Equal(t, testSize, ti.Size()) require.Equal(t, testSize, ti.Size())
} }
de1 := runInstance.list(t, rootFs, "") de1, err := runInstance.list(t, rootFs, "")
require.NoError(t, err)
require.Len(t, de1, 1) require.Len(t, de1, 1)
runInstance.completeBackgroundUpload(t, "one", bu) runInstance.completeBackgroundUpload(t, "one", bu)
@ -479,7 +609,8 @@ func testInternalUploadQueueOneFile(t *testing.T, id string, rootFs fs.Fs, boltD
require.True(t, os.IsNotExist(err)) require.True(t, os.IsNotExist(err))
// check if it can be read // check if it can be read
data2 := runInstance.readDataFromRemote(t, rootFs, "one", 0, int64(1024), false) data2, err := runInstance.readDataFromRemote(t, rootFs, "one", 0, int64(1024), false)
require.NoError(t, err)
require.Len(t, data2, 1024) require.Len(t, data2, 1024)
} }
@ -536,7 +667,8 @@ func TestInternalUploadQueueMoreFiles(t *testing.T) {
} }
// check if cache lists all files, likely temp upload didn't finish yet // check if cache lists all files, likely temp upload didn't finish yet
de1 := runInstance.list(t, rootFs, "test") de1, err := runInstance.list(t, rootFs, "test")
require.NoError(t, err)
require.Len(t, de1, totalFiles) require.Len(t, de1, totalFiles)
// wait for background uploader to do its thing // wait for background uploader to do its thing
@ -548,7 +680,8 @@ func TestInternalUploadQueueMoreFiles(t *testing.T) {
require.Len(t, tf, 0) require.Len(t, tf, 0)
// check if cache lists all files // check if cache lists all files
de1 = runInstance.list(t, rootFs, "test") de1, err = runInstance.list(t, rootFs, "test")
require.NoError(t, err)
require.Len(t, de1, totalFiles) require.Len(t, de1, totalFiles)
} }
@ -566,10 +699,11 @@ func TestInternalUploadTempFileOperations(t *testing.T) {
runInstance.writeRemoteString(t, rootFs, "test/one", "one content") runInstance.writeRemoteString(t, rootFs, "test/one", "one content")
// check if it can be read // check if it can be read
data1 := runInstance.readDataFromRemote(t, rootFs, "test/one", 0, int64(len([]byte("one content"))), false) data1, err := runInstance.readDataFromRemote(t, rootFs, "test/one", 0, int64(len([]byte("one content"))), false)
require.NoError(t, err)
require.Equal(t, []byte("one content"), data1) require.Equal(t, []byte("one content"), data1)
// validate that it exists in temp fs // validate that it exists in temp fs
_, err := os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one")))
require.NoError(t, err) require.NoError(t, err)
// test DirMove - allowed // test DirMove - allowed
@ -616,7 +750,8 @@ func TestInternalUploadTempFileOperations(t *testing.T) {
require.Error(t, err) require.Error(t, err)
_, err = rootFs.NewObject("test/second") _, err = rootFs.NewObject("test/second")
require.NoError(t, err) require.NoError(t, err)
data2 := runInstance.readDataFromRemote(t, rootFs, "test/second", 0, int64(len([]byte("one content"))), false) data2, err := runInstance.readDataFromRemote(t, rootFs, "test/second", 0, int64(len([]byte("one content"))), false)
require.NoError(t, err)
require.Equal(t, []byte("one content"), data2) require.Equal(t, []byte("one content"), data2)
// validate that it exists in temp fs // validate that it exists in temp fs
_, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one")))
@ -634,7 +769,8 @@ func TestInternalUploadTempFileOperations(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
_, err = rootFs.NewObject("test/third") _, err = rootFs.NewObject("test/third")
require.NoError(t, err) require.NoError(t, err)
data2 := runInstance.readDataFromRemote(t, rootFs, "test/third", 0, int64(len([]byte("one content"))), false) data2, err := runInstance.readDataFromRemote(t, rootFs, "test/third", 0, int64(len([]byte("one content"))), false)
require.NoError(t, err)
require.Equal(t, []byte("one content"), data2) require.Equal(t, []byte("one content"), data2)
// validate that it exists in temp fs // validate that it exists in temp fs
_, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one")))
@ -692,10 +828,11 @@ func TestInternalUploadUploadingFileOperations(t *testing.T) {
runInstance.writeRemoteString(t, rootFs, "test/one", "one content") runInstance.writeRemoteString(t, rootFs, "test/one", "one content")
// check if it can be read // check if it can be read
data1 := runInstance.readDataFromRemote(t, rootFs, "test/one", 0, int64(len([]byte("one content"))), false) data1, err := runInstance.readDataFromRemote(t, rootFs, "test/one", 0, int64(len([]byte("one content"))), false)
require.NoError(t, err)
require.Equal(t, []byte("one content"), data1) require.Equal(t, []byte("one content"), data1)
// validate that it exists in temp fs // validate that it exists in temp fs
_, err := os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one")))
require.NoError(t, err) require.NoError(t, err)
err = boltDb.SetPendingUploadToStarted(runInstance.encryptRemoteIfNeeded(t, path.Join(rootFs.Root(), "test/one"))) err = boltDb.SetPendingUploadToStarted(runInstance.encryptRemoteIfNeeded(t, path.Join(rootFs.Root(), "test/one")))
@ -747,7 +884,8 @@ func TestInternalUploadUploadingFileOperations(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
_, err = rootFs.NewObject("test/third") _, err = rootFs.NewObject("test/third")
require.NoError(t, err) require.NoError(t, err)
data2 := runInstance.readDataFromRemote(t, rootFs, "test/third", 0, int64(len([]byte("one content"))), false) data2, err := runInstance.readDataFromRemote(t, rootFs, "test/third", 0, int64(len([]byte("one content"))), false)
require.NoError(t, err)
require.Equal(t, []byte("one content"), data2) require.Equal(t, []byte("one content"), data2)
// validate that it exists in temp fs // validate that it exists in temp fs
_, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one"))) _, err = os.Stat(path.Join(runInstance.tmpUploadDir, id, runInstance.encryptRemoteIfNeeded(t, "test/one")))
@ -867,6 +1005,9 @@ type run struct {
unmountRes chan error unmountRes chan error
vfs *vfs.VFS vfs *vfs.VFS
tempFiles []*os.File tempFiles []*os.File
dbPath string
chunkPath string
vfsCachePath string
} }
func newRun() *run { func newRun() *run {
@ -1012,9 +1153,10 @@ func (r *run) newCacheFs(t *testing.T, remote, id string, needRemote, purge bool
} }
} }
runInstance.rootIsCrypt = rootIsCrypt runInstance.rootIsCrypt = rootIsCrypt
dbPath := filepath.Join(config.CacheDir, "cache-backend", cacheRemote+".db") runInstance.dbPath = filepath.Join(config.CacheDir, "cache-backend", cacheRemote+".db")
chunkPath := filepath.Join(config.CacheDir, "cache-backend", cacheRemote) runInstance.chunkPath = filepath.Join(config.CacheDir, "cache-backend", cacheRemote)
boltDb, err := cache.GetPersistent(dbPath, chunkPath, &cache.Features{PurgeDb: true}) runInstance.vfsCachePath = filepath.Join(config.CacheDir, "vfs", remote)
boltDb, err := cache.GetPersistent(runInstance.dbPath, runInstance.chunkPath, &cache.Features{PurgeDb: true})
require.NoError(t, err) require.NoError(t, err)
for k, v := range r.runDefaultCfgMap { for k, v := range r.runDefaultCfgMap {
@ -1046,7 +1188,7 @@ func (r *run) newCacheFs(t *testing.T, remote, id string, needRemote, purge bool
_, isCrypt := cfs.Features().UnWrap().(*crypt.Fs) _, isCrypt := cfs.Features().UnWrap().(*crypt.Fs)
_, isLocal := cfs.Features().UnWrap().(*local.Fs) _, isLocal := cfs.Features().UnWrap().(*local.Fs)
if isCache || isCrypt || isLocal { if isCache || isCrypt || isLocal {
r.wrappedIsExternal = true r.wrappedIsExternal = false
} else { } else {
r.wrappedIsExternal = true r.wrappedIsExternal = true
} }
@ -1230,7 +1372,7 @@ func (r *run) updateObjectRemote(t *testing.T, f fs.Fs, remote string, data1 []b
return obj return obj
} }
func (r *run) readDataFromRemote(t *testing.T, f fs.Fs, remote string, offset, end int64, noLengthCheck bool) []byte { func (r *run) readDataFromRemote(t *testing.T, f fs.Fs, remote string, offset, end int64, noLengthCheck bool) ([]byte, error) {
size := end - offset size := end - offset
checkSample := make([]byte, size) checkSample := make([]byte, size)
@ -1239,27 +1381,32 @@ func (r *run) readDataFromRemote(t *testing.T, f fs.Fs, remote string, offset, e
defer func() { defer func() {
_ = f.Close() _ = f.Close()
}() }()
require.NoError(t, err) if err != nil {
return checkSample, err
}
_, _ = f.Seek(offset, 0) _, _ = f.Seek(offset, 0)
totalRead, err := io.ReadFull(f, checkSample) totalRead, err := io.ReadFull(f, checkSample)
checkSample = checkSample[:totalRead] checkSample = checkSample[:totalRead]
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
err = nil err = nil
} }
require.NoError(t, err) if err != nil {
if !noLengthCheck { return checkSample, err
require.Equal(t, size, int64(totalRead)) }
if !noLengthCheck && size != int64(totalRead) {
return checkSample, errors.Errorf("read size doesn't match expected: %v <> %v", totalRead, size)
} }
require.NoError(t, err)
} else { } else {
co, err := f.NewObject(remote) co, err := f.NewObject(remote)
require.NoError(t, err) if err != nil {
return checkSample, err
}
checkSample = r.readDataFromObj(t, co, offset, end, noLengthCheck) checkSample = r.readDataFromObj(t, co, offset, end, noLengthCheck)
} }
if !noLengthCheck { if !noLengthCheck && size != int64(len(checkSample)) {
require.Equal(t, size, int64(len(checkSample)), "wrong data read size from file") return checkSample, errors.Errorf("read size doesn't match expected: %v <> %v", len(checkSample), size)
} }
return checkSample return checkSample, nil
} }
func (r *run) readDataFromObj(t *testing.T, o fs.Object, offset, end int64, noLengthCheck bool) []byte { func (r *run) readDataFromObj(t *testing.T, o fs.Object, offset, end int64, noLengthCheck bool) []byte {
@ -1305,7 +1452,7 @@ func (r *run) rm(t *testing.T, f fs.Fs, remote string) error {
return err return err
} }
func (r *run) list(t *testing.T, f fs.Fs, remote string) []interface{} { func (r *run) list(t *testing.T, f fs.Fs, remote string) ([]interface{}, error) {
var err error var err error
var l []interface{} var l []interface{}
if r.useMount { if r.useMount {
@ -1321,8 +1468,7 @@ func (r *run) list(t *testing.T, f fs.Fs, remote string) []interface{} {
l = append(l, ll) l = append(l, ll)
} }
} }
require.NoError(t, err) return l, err
return l
} }
func (r *run) listPath(t *testing.T, f fs.Fs, remote string) []string { func (r *run) listPath(t *testing.T, f fs.Fs, remote string) []string {

View file

@ -249,7 +249,7 @@ func (r *Handle) getChunk(chunkStart int64) ([]byte, error) {
if !found { if !found {
// we're gonna give the workers a chance to pickup the chunk // we're gonna give the workers a chance to pickup the chunk
// and retry a couple of times // and retry a couple of times
for i := 0; i < r.cacheFs().readRetries*2; i++ { for i := 0; i < r.cacheFs().readRetries*8; i++ {
data, err = r.storage().GetChunk(r.cachedObject, chunkStart) data, err = r.storage().GetChunk(r.cachedObject, chunkStart)
if err == nil { if err == nil {
found = true found = true
@ -656,6 +656,7 @@ func (b *backgroundWriter) run() {
if err != nil { if err != nil {
fs.Errorf(parentCd, "background upload: cache expire error: %v", err) fs.Errorf(parentCd, "background upload: cache expire error: %v", err)
} }
b.fs.notifyDirChange(remote)
fs.Infof(remote, "finished background upload") fs.Infof(remote, "finished background upload")
b.notify(remote, BackgroundUploadCompleted, nil) b.notify(remote, BackgroundUploadCompleted, nil)
} }

View file

@ -272,7 +272,12 @@ func (o *Object) Remove() error {
fs.Debugf(o, "removing object") fs.Debugf(o, "removing object")
_ = o.CacheFs.cache.RemoveObject(o.abs()) _ = o.CacheFs.cache.RemoveObject(o.abs())
_ = o.CacheFs.cache.removePendingUpload(o.abs()) _ = o.CacheFs.cache.removePendingUpload(o.abs())
_ = o.CacheFs.cache.ExpireDir(NewDirectory(o.CacheFs, cleanPath(path.Dir(o.Remote())))) parentCd := NewDirectory(o.CacheFs, cleanPath(path.Dir(o.Remote())))
_ = o.CacheFs.cache.ExpireDir(parentCd)
// advertise to DirChangeNotify if wrapped doesn't do that
if o.CacheFs.Fs.Features().DirChangeNotify == nil {
o.CacheFs.notifyDirChangeUpstream(parentCd.Remote())
}
return nil return nil
} }