diff --git a/cache/cache.go b/cache/cache.go index d5d0e8e38..b62bb22db 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -172,7 +172,7 @@ type Storage interface { RemoveDir(fp string) error // remove a directory and all the objects and chunks in it - ExpireDir(fp string) error + ExpireDir(cd *Directory) error // will return an object (file) or error if it doesn't find it GetObject(cachedObject *Object) (err error) @@ -187,10 +187,6 @@ type Storage interface { // Stats returns stats about the cache storage Stats() (map[string]map[string]interface{}, error) - // if the storage can cleanup on a cron basis - // otherwise it can do a noop operation - CleanEntriesByAge(entryAge time.Duration) - // Purge will flush the entire cache Purge() @@ -219,7 +215,6 @@ type Fs struct { cacheWrites bool lastChunkCleanup time.Time - lastRootCleanup time.Time cleanupMu sync.Mutex rateLimiter *rate.Limiter plexConnector *plexConnector @@ -231,7 +226,6 @@ func NewFs(name, rpath string) (fs.Fs, error) { if strings.HasPrefix(remote, name+":") { return nil, errors.New("can't point cache remote at itself - check the value of the remote setting") } - // Look for a file first remotePath := path.Join(remote, rpath) wrappedFs, wrapErr := fs.NewFs(remotePath) @@ -291,7 +285,6 @@ func NewFs(name, rpath string) (fs.Fs, error) { chunkMemory: !*cacheChunkNoMemory, cacheWrites: *cacheStoreWrites, lastChunkCleanup: time.Now().Truncate(time.Hour * 24 * 30), - lastRootCleanup: time.Now().Truncate(time.Hour * 24 * 30), } if f.chunkTotalSize < (f.chunkSize * int64(f.totalWorkers)) { return nil, errors.Errorf("don't set cache-total-chunk-size(%v) less than cache-chunk-size(%v) * cache-workers(%v)", @@ -380,7 +373,6 @@ func NewFs(name, rpath string) (fs.Fs, error) { Move: f.Move, DirMove: f.DirMove, DirChangeNotify: nil, - DirCacheFlush: f.DirCacheFlush, PutUnchecked: f.PutUnchecked, PutStream: f.PutStream, CleanUp: f.CleanUp, @@ -388,6 +380,7 @@ func NewFs(name, rpath string) (fs.Fs, error) { WrapFs: f.WrapFs, SetWrapper: f.SetWrapper, }).Fill(f).Mask(wrappedFs).WrapsFs(f, wrappedFs) + f.features.DirCacheFlush = f.DirCacheFlush return f, wrapErr } @@ -421,7 +414,12 @@ func (f *Fs) ChunkSize() int64 { func (f *Fs) NewObject(remote string) (fs.Object, error) { co := NewObject(f, remote) err := f.cache.GetObject(co) - if err == nil { + if err != nil { + fs.Debugf(remote, "find: error: %v", err) + } else if time.Now().After(co.CacheTs.Add(f.fileAge)) { + fs.Debugf(remote, "find: cold object ts: %v", co.CacheTs) + } else { + fs.Debugf(remote, "find: warm object ts: %v", co.CacheTs) return co, nil } obj, err := f.Fs.NewObject(remote) @@ -438,13 +436,17 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { // clean cache go f.CleanUpCache(false) - cd := NewDirectory(f, dir) + cd := ShallowDirectory(f, dir) entries, err = f.cache.GetDirEntries(cd) if err != nil { - fs.Debugf(dir, "no dir entries in cache: %v", err) + fs.Debugf(dir, "list: error: %v", err) + } else if time.Now().After(cd.CacheTs.Add(f.fileAge)) { + fs.Debugf(dir, "list: cold listing: %v", cd.CacheTs) } else if len(entries) == 0 { // TODO: read empty dirs from source? + fs.Debugf(dir, "list: empty listing") } else { + fs.Debugf(dir, "list: warm %v from cache for: %v, ts: %v", len(entries), cd.abs(), cd.CacheTs) return entries, nil } @@ -452,6 +454,7 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { if err != nil { return nil, err } + fs.Debugf(dir, "list: read %v from source", len(entries)) var cachedEntries fs.DirEntries for _, entry := range entries { @@ -470,6 +473,13 @@ func (f *Fs) List(dir string) (entries fs.DirEntries, err error) { } if err != nil { fs.Errorf(dir, "err caching listing: %v", err) + } else { + t := time.Now() + cd.CacheTs = &t + err := f.cache.AddDir(cd) + if err != nil { + fs.Errorf(cd, "list: save error: %v", err) + } } return cachedEntries, nil @@ -548,8 +558,17 @@ func (f *Fs) Mkdir(dir string) error { } fs.Infof(f, "create dir '%s'", dir) - // make an empty dir - _ = f.cache.AddDir(NewDirectory(f, dir)) + // expire parent of new dir + cd := NewDirectory(f, cleanPath(dir)) + err = f.cache.AddDir(cd) + if err != nil { + fs.Errorf(dir, "mkdir: add error: %v", err) + } + parentCd := NewDirectory(f, cleanPath(path.Dir(dir))) + err = f.cache.ExpireDir(parentCd) + if err != nil { + fs.Errorf(dir, "mkdir: expire error: %v", err) + } // clean cache go f.CleanUpCache(false) @@ -562,8 +581,20 @@ func (f *Fs) Rmdir(dir string) error { if err != nil { return err } + fs.Infof(f, "rm dir '%s'", dir) - _ = f.cache.RemoveDir(NewDirectory(f, dir).abs()) + // remove dir data + d := NewDirectory(f, dir) + err = f.cache.RemoveDir(d.abs()) + if err != nil { + fs.Errorf(dir, "rmdir: remove error: %v", err) + } + // expire parent + parentCd := NewDirectory(f, cleanPath(path.Dir(dir))) + err = f.cache.ExpireDir(parentCd) + if err != nil { + fs.Errorf(dir, "rmdir: expire error: %v", err) + } // clean cache go f.CleanUpCache(false) @@ -593,12 +624,26 @@ func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error { return err } + // delete src dir from cache along with all chunks srcDir := NewDirectory(srcFs, srcRemote) - // clear any likely dir cached - _ = f.cache.ExpireDir(srcDir.parentRemote()) - _ = f.cache.ExpireDir(NewDirectory(srcFs, dstRemote).parentRemote()) - // delete src dir - _ = f.cache.RemoveDir(srcDir.abs()) + err = f.cache.RemoveDir(srcDir.abs()) + if err != nil { + fs.Errorf(srcRemote, "dirmove: remove error: %v", err) + } + // expire src parent + srcParent := NewDirectory(f, cleanPath(path.Dir(srcRemote))) + err = f.cache.ExpireDir(srcParent) + if err != nil { + fs.Errorf(srcRemote, "dirmove: expire error: %v", err) + } + + // expire parent dir at the destination path + dstParent := NewDirectory(f, cleanPath(path.Dir(dstRemote))) + err = f.cache.ExpireDir(dstParent) + if err != nil { + fs.Errorf(dstRemote, "dirmove: expire error: %v", err) + } + // TODO: precache dst dir and save the chunks // clean cache go f.CleanUpCache(false) @@ -682,12 +727,16 @@ func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put p } else { obj, err = put(in, src, options...) } - if err != nil { fs.Errorf(src, "error saving in cache: %v", err) return nil, err } cachedObj := ObjectFromOriginal(f, obj).persist() + // expire parent + err = f.cache.ExpireDir(cachedObj.parentDir()) + if err != nil { + fs.Errorf(cachedObj, "put: expire error: %v", err) + } // clean cache go f.CleanUpCache(false) @@ -696,7 +745,7 @@ func (f *Fs) put(in io.Reader, src fs.ObjectInfo, options []fs.OpenOption, put p // Put in to the remote path with the modTime given of the given size func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { - fs.Debugf(f, "put data at '%s'", src.Remote()) + fs.Infof(f, "put data at '%s'", src.Remote()) return f.put(in, src, options, f.Fs.Put) } @@ -737,7 +786,6 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { fs.Errorf(srcObj, "can't copy - not wrapping same remote types") return nil, fs.ErrorCantCopy } - fs.Infof(f, "copy obj '%s' -> '%s'", srcObj.abs(), remote) // store in cache @@ -752,12 +800,23 @@ func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) { } // persist new - cachedObj := ObjectFromOriginal(f, obj).persist() - _ = f.cache.ExpireDir(cachedObj.parentRemote()) + co := ObjectFromOriginal(f, obj).persist() + // expire the destination path + err = f.cache.ExpireDir(co.parentDir()) + if err != nil { + fs.Errorf(co, "copy: expire error: %v", err) + } + + // expire src parent + srcParent := NewDirectory(f, cleanPath(path.Dir(src.Remote()))) + err = f.cache.ExpireDir(srcParent) + if err != nil { + fs.Errorf(src, "copy: expire error: %v", err) + } // clean cache go f.CleanUpCache(false) - return cachedObj, nil + return co, nil } // Move src to this remote using server side move operations. @@ -773,12 +832,10 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { fs.Errorf(srcObj, "can't move - not same remote type") return nil, fs.ErrorCantMove } - if srcObj.CacheFs.Fs.Name() != f.Fs.Name() { fs.Errorf(srcObj, "can't move - not wrapping same remote types") return nil, fs.ErrorCantMove } - fs.Infof(f, "moving obj '%s' -> %s", srcObj.abs(), remote) // save in cache @@ -793,13 +850,23 @@ func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) { } // remove old - _ = f.cache.ExpireDir(srcObj.parentRemote()) - _ = f.cache.RemoveObject(srcObj.abs()) + err = f.cache.RemoveObject(srcObj.abs()) + if err != nil { + fs.Errorf(srcObj, "move: remove error: %v", err) + } + // expire old parent + err = f.cache.ExpireDir(srcObj.parentDir()) + if err != nil { + fs.Errorf(srcObj, "move: expire error: %v", err) + } // persist new - cachedObj := ObjectFromOriginal(f, obj) - cachedObj.persist() - _ = f.cache.ExpireDir(cachedObj.parentRemote()) + cachedObj := ObjectFromOriginal(f, obj).persist() + // expire new parent + err = f.cache.ExpireDir(cachedObj.parentDir()) + if err != nil { + fs.Errorf(cachedObj, "move: expire error: %v", err) + } // clean cache go f.CleanUpCache(false) @@ -873,11 +940,6 @@ func (f *Fs) CleanUpCache(ignoreLastTs bool) { f.cache.CleanChunksBySize(f.chunkTotalSize) f.lastChunkCleanup = time.Now() } - - if ignoreLastTs || time.Now().After(f.lastRootCleanup.Add(f.fileAge/4)) { - f.cache.CleanEntriesByAge(f.fileAge) - f.lastRootCleanup = time.Now() - } } // UnWrap returns the Fs that this Fs is wrapping diff --git a/cache/cache_internal_test.go b/cache/cache_internal_test.go index 19a33e65a..55d946deb 100644 --- a/cache/cache_internal_test.go +++ b/cache/cache_internal_test.go @@ -10,12 +10,20 @@ import ( "math/rand" "path" "path/filepath" + "runtime" "strconv" - "sync" "testing" "time" + //"os" + "os/exec" + //"strings" + "github.com/ncw/rclone/cache" + //"github.com/ncw/rclone/cmd/mount" + //_ "github.com/ncw/rclone/cmd/cmount" + //"github.com/ncw/rclone/cmd/mountlib" + _ "github.com/ncw/rclone/drive" "github.com/ncw/rclone/fs" "github.com/ncw/rclone/fstest" "github.com/ncw/rclone/local" @@ -24,70 +32,21 @@ import ( ) var ( - WrapRemote = flag.String("wrap-remote", "", "Remote to wrap") - RemoteName = flag.String("remote-name", "TestCacheInternal", "Root remote") - rootFs fs.Fs - boltDb *cache.Persistent infoAge = time.Second * 10 chunkClean = time.Second okDiff = time.Second * 9 // really big diff here but the build machines seem to be slow. need a different way for this workers = 2 ) -// prepare the test server and return a function to tidy it up afterwards -func TestInternalInit(t *testing.T) { - var err error - - // delete the default path - dbPath := filepath.Join(fs.CacheDir, "cache-backend", *RemoteName+".db") - boltDb, err = cache.GetPersistent(dbPath, &cache.Features{PurgeDb: true}) - require.NoError(t, err) - fstest.Initialise() - - if len(*WrapRemote) == 0 { - *WrapRemote = "localInternal:/var/tmp/rclone-cache" - fs.ConfigFileSet("localInternal", "type", "local") - fs.ConfigFileSet("localInternal", "nounc", "true") - } - - remoteExists := false - for _, s := range fs.ConfigFileSections() { - if s == *RemoteName { - remoteExists = true - } - } - - if !remoteExists { - fs.ConfigFileSet(*RemoteName, "type", "cache") - fs.ConfigFileSet(*RemoteName, "remote", *WrapRemote) - fs.ConfigFileSet(*RemoteName, "chunk_size", "1024") - fs.ConfigFileSet(*RemoteName, "chunk_total_size", "2048") - fs.ConfigFileSet(*RemoteName, "info_age", infoAge.String()) - } - - _ = flag.Set("cache-chunk-no-memory", "true") - _ = flag.Set("cache-workers", strconv.Itoa(workers)) - _ = flag.Set("cache-chunk-clean-interval", chunkClean.String()) - - // Instantiate root - rootFs, err = fs.NewFs(*RemoteName + ":") - require.NoError(t, err) - _ = rootFs.Features().Purge() - require.NoError(t, err) - err = rootFs.Mkdir("") - require.NoError(t, err) - - // flush cache - _, err = getCacheFs(rootFs) - require.NoError(t, err) -} - func TestInternalListRootAndInnerRemotes(t *testing.T) { + rootFs, boltDb := newLocalCacheFs(t, "tilrair-local", "tilrair-cache", nil) + defer cleanupFs(t, rootFs, boltDb) + // Instantiate inner fs innerFolder := "inner" err := rootFs.Mkdir(innerFolder) require.NoError(t, err) - innerFs, err := fs.NewFs(*RemoteName + ":" + innerFolder) + innerFs, err := fs.NewFs("tilrair-cache:" + innerFolder) require.NoError(t, err) obj := writeObjectString(t, innerFs, "one", "content") @@ -105,14 +64,12 @@ func TestInternalListRootAndInnerRemotes(t *testing.T) { err = obj.Remove() require.NoError(t, err) - - err = innerFs.Features().Purge() - require.NoError(t, err) - innerFs = nil } func TestInternalObjWrapFsFound(t *testing.T) { - reset(t) + rootFs, boltDb := newLocalCacheFs(t, "tiowff-local", "tiowff-cache", nil) + defer cleanupFs(t, rootFs, boltDb) + cfs, err := getCacheFs(rootFs) require.NoError(t, err) wrappedFs := cfs.UnWrap() @@ -144,14 +101,18 @@ func TestInternalObjWrapFsFound(t *testing.T) { } func TestInternalObjNotFound(t *testing.T) { - reset(t) + rootFs, boltDb := newLocalCacheFs(t, "tionf-local", "tionf-cache", nil) + defer cleanupFs(t, rootFs, boltDb) + obj, err := rootFs.NewObject("404") require.Error(t, err) require.Nil(t, obj) } func TestInternalCachedWrittenContentMatches(t *testing.T) { - reset(t) + rootFs, boltDb := newLocalCacheFs(t, "ticwcm-local", "ticwcm-cache", nil) + defer cleanupFs(t, rootFs, boltDb) + cfs, err := getCacheFs(rootFs) require.NoError(t, err) chunkSize := cfs.ChunkSize() @@ -176,7 +137,8 @@ func TestInternalCachedWrittenContentMatches(t *testing.T) { } func TestInternalCachedUpdatedContentMatches(t *testing.T) { - reset(t) + rootFs, boltDb := newLocalCacheFs(t, "ticucm-local", "ticucm-cache", nil) + defer cleanupFs(t, rootFs, boltDb) // create some rand test data testData1 := []byte(fstest.RandomString(100)) @@ -196,12 +158,13 @@ func TestInternalCachedUpdatedContentMatches(t *testing.T) { } func TestInternalWrappedWrittenContentMatches(t *testing.T) { + rootFs, boltDb := newLocalCacheFs(t, "tiwwcm-local", "tiwwcm-cache", nil) + defer cleanupFs(t, rootFs, boltDb) + cfs, err := getCacheFs(rootFs) require.NoError(t, err) chunkSize := cfs.ChunkSize() - reset(t) - // create some rand test data testData := make([]byte, (chunkSize*4 + chunkSize/2)) testSize, err := rand.Read(testData) @@ -230,13 +193,13 @@ func TestInternalWrappedWrittenContentMatches(t *testing.T) { func TestInternalLargeWrittenContentMatches(t *testing.T) { t.Skip("FIXME disabled because it is unreliable") + rootFs, boltDb := newLocalCacheFs(t, "tilwcm-local", "tilwcm-cache", nil) + defer cleanupFs(t, rootFs, boltDb) cfs, err := getCacheFs(rootFs) require.NoError(t, err) chunkSize := cfs.ChunkSize() - reset(t) - // create some rand test data testData := make([]byte, (chunkSize*10 + chunkSize/2)) testSize, err := rand.Read(testData) @@ -260,8 +223,53 @@ func TestInternalLargeWrittenContentMatches(t *testing.T) { } } +func TestInternalLargeWrittenContentMatches2(t *testing.T) { + t.Skip("FIXME disabled because it is unreliable") + cryptFs, boltDb := newLocalCacheCryptFs(t, "tilwcm2-local", "tilwcm2-cache", "tilwcm2-crypt", true, nil) + defer cleanupFs(t, cryptFs, boltDb) + + cfs, err := getCacheFs(cryptFs) + require.NoError(t, err) + chunkSize := cfs.ChunkSize() + fileSize := 87197196 + readOffset := 87195648 + + // create some rand test data + testData := make([]byte, fileSize) + testSize, err := rand.Read(testData) + require.Equal(t, len(testData), testSize) + require.NoError(t, err) + + // write the object + o := writeObjectBytes(t, cryptFs, "data.bin", testData) + require.Equal(t, o.Size(), int64(testSize)) + + o2, err := cryptFs.NewObject("data.bin") + require.NoError(t, err) + require.Equal(t, o2.Size(), o.Size()) + + // check data from in-file + reader, err := o2.Open(&fs.SeekOption{Offset: int64(readOffset)}) + require.NoError(t, err) + rs, ok := reader.(io.Seeker) + require.True(t, ok) + checkOffset, err := rs.Seek(int64(readOffset), 0) + require.NoError(t, err) + require.Equal(t, checkOffset, int64(readOffset)) + checkSample, err := ioutil.ReadAll(reader) + require.NoError(t, err) + _ = reader.Close() + + require.Equal(t, len(checkSample), fileSize-readOffset) + for i := 0; i < fileSize-readOffset; i++ { + require.Equal(t, testData[readOffset+i], checkSample[i], "byte: %d (%d), chunk: %d", int64(i)%chunkSize, i, int64(i)/chunkSize) + } +} + func TestInternalWrappedFsChangeNotSeen(t *testing.T) { - reset(t) + rootFs, boltDb := newLocalCacheFs(t, "tiwfcns-local", "tiwfcns-cache", nil) + defer cleanupFs(t, rootFs, boltDb) + cfs, err := getCacheFs(rootFs) require.NoError(t, err) chunkSize := cfs.ChunkSize() @@ -279,33 +287,53 @@ func TestInternalWrappedFsChangeNotSeen(t *testing.T) { co2, err := rootFs.NewObject(o.Remote()) require.NoError(t, err) - require.NotEqual(t, o.ModTime(), co.ModTime()) - require.NotEqual(t, o.ModTime(), co2.ModTime()) - require.Equal(t, co.ModTime(), co2.ModTime()) + require.NotEqual(t, o.ModTime().String(), co.ModTime().String()) + require.NotEqual(t, o.ModTime().String(), co2.ModTime().String()) + require.Equal(t, co.ModTime().String(), co2.ModTime().String()) } func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) { + rootFs, boltDb := newLocalCacheFs(t, "ticsadcf-local", "ticsadcf-cache", nil) + defer cleanupFs(t, rootFs, boltDb) + cfs, err := getCacheFs(rootFs) require.NoError(t, err) + chunkSize := cfs.ChunkSize() + + // create some rand test data + co := writeObjectRandomBytes(t, rootFs, (chunkSize*4 + chunkSize/2)) + + // update in the wrapped fs + o, err := cfs.UnWrap().NewObject(co.Remote()) + require.NoError(t, err) + err = o.SetModTime(co.ModTime().Add(-1 * time.Hour)) + require.NoError(t, err) + + // get a new instance from the cache + co2, err := rootFs.NewObject(o.Remote()) + require.NoError(t, err) + + require.NotEqual(t, o.ModTime().String(), co.ModTime().String()) + require.NotEqual(t, o.ModTime().String(), co2.ModTime().String()) + require.Equal(t, co.ModTime().String(), co2.ModTime().String()) cfs.DirCacheFlush() // flush the cache l, err := cfs.UnWrap().List("") require.NoError(t, err) require.Len(t, l, 1) - o := l[0] + o2 := l[0] // get a new instance from the cache - co, err := rootFs.NewObject(o.Remote()) + co, err = rootFs.NewObject(o.Remote()) require.NoError(t, err) - require.Equal(t, o.ModTime(), co.ModTime()) + require.Equal(t, o2.ModTime().String(), co.ModTime().String()) } func TestInternalCacheWrites(t *testing.T) { - reset(t) - _ = flag.Set("cache-writes", "true") - rootFs, err := fs.NewFs(*RemoteName + ":") - require.NoError(t, err) + rootFs, boltDb := newLocalCacheFs(t, "ticw-local", "ticw-cache", map[string]string{"cache-writes": "true"}) + defer cleanupFs(t, rootFs, boltDb) + cfs, err := getCacheFs(rootFs) require.NoError(t, err) chunkSize := cfs.ChunkSize() @@ -316,25 +344,21 @@ func TestInternalCacheWrites(t *testing.T) { ts, err := boltDb.GetChunkTs(path.Join(rootFs.Root(), co.Remote()), 0) require.NoError(t, err) require.WithinDuration(t, expectedTs, ts, okDiff) - - // reset fs - _ = flag.Set("cache-writes", "false") - rootFs, err = fs.NewFs(*RemoteName + ":") - require.NoError(t, err) } func TestInternalMaxChunkSizeRespected(t *testing.T) { - reset(t) - _ = flag.Set("cache-workers", "1") - rootFs, err := fs.NewFs(*RemoteName + ":") - require.NoError(t, err) + rootFs, boltDb := newLocalCacheFs(t, "timcsr-local", "timcsr-cache", map[string]string{"cache-workers": "1"}) + defer cleanupFs(t, rootFs, boltDb) + cfs, err := getCacheFs(rootFs) require.NoError(t, err) chunkSize := cfs.ChunkSize() totalChunks := 20 // create some rand test data - o := writeObjectRandomBytes(t, cfs, (int64(totalChunks-1)*chunkSize + chunkSize/2)) + obj := writeObjectRandomBytes(t, cfs, (int64(totalChunks-1)*chunkSize + chunkSize/2)) + o, err := rootFs.NewObject(obj.Remote()) + require.NoError(t, err) co, ok := o.(*cache.Object) require.True(t, ok) @@ -353,15 +377,12 @@ func TestInternalMaxChunkSizeRespected(t *testing.T) { // the last 2 **must** be in the cache require.True(t, boltDb.HasChunk(co, chunkSize*4)) require.True(t, boltDb.HasChunk(co, chunkSize*5)) - - // reset fs - _ = flag.Set("cache-workers", strconv.Itoa(workers)) - rootFs, err = fs.NewFs(*RemoteName + ":") - require.NoError(t, err) } func TestInternalExpiredEntriesRemoved(t *testing.T) { - reset(t) + rootFs, boltDb := newLocalCacheFs(t, "tieer-local", "tieer-cache", map[string]string{"info_age": "5s"}) + defer cleanupFs(t, rootFs, boltDb) + cfs, err := getCacheFs(rootFs) require.NoError(t, err) @@ -371,26 +392,84 @@ func TestInternalExpiredEntriesRemoved(t *testing.T) { require.NoError(t, err) _ = writeObjectString(t, cfs, "test/second", "second content") - objOne, err := cfs.NewObject("one") + l, err := cfs.List("test") require.NoError(t, err) - require.Equal(t, int64(len([]byte("one content"))), objOne.Size()) + require.Len(t, l, 1) - waitTime := infoAge + time.Second*2 + err = cfs.UnWrap().Mkdir("test/test2") + require.NoError(t, err) + + l, err = cfs.List("test") + require.NoError(t, err) + require.Len(t, l, 1) + + waitTime := time.Second * 5 t.Logf("Waiting %v seconds for cache to expire\n", waitTime) - time.Sleep(infoAge) + time.Sleep(waitTime) - _, err = cfs.List("test") + l, err = cfs.List("test") require.NoError(t, err) - time.Sleep(time.Second * 2) - require.False(t, boltDb.HasEntry("one")) + require.Len(t, l, 2) } -func TestInternalFinalise(t *testing.T) { - var err error - - err = rootFs.Features().Purge() - require.NoError(t, err) -} +// FIXME, enable this when mount is sorted out +//func TestInternalFilesMissingInMount1904(t *testing.T) { +// t.Skip("Not yet") +// if runtime.GOOS == "windows" { +// t.Skip("Not yet") +// } +// id := "tifm1904" +// rootFs, _ := newLocalCacheCryptFs(t, "test-local", "test-cache", "test-crypt", false, +// map[string]string{"chunk_size": "5M", "info_age": "1m", "chunk_total_size": "500M", "cache-writes": "true"}) +// mntPoint := path.Join("/tmp", "tifm1904-mnt") +// testPoint := path.Join(mntPoint, id) +// checkOutput := "1 10 100 11 12 13 14 15 16 17 18 19 2 20 21 22 23 24 25 26 27 28 29 3 30 31 32 33 34 35 36 37 38 39 4 40 41 42 43 44 45 46 47 48 49 5 50 51 52 53 54 55 56 57 58 59 6 60 61 62 63 64 65 66 67 68 69 7 70 71 72 73 74 75 76 77 78 79 8 80 81 82 83 84 85 86 87 88 89 9 90 91 92 93 94 95 96 97 98 99 " +// +// _ = os.MkdirAll(mntPoint, os.ModePerm) +// +// list, err := rootFs.List("") +// require.NoError(t, err) +// found := false +// list.ForDir(func(d fs.Directory) { +// if strings.Contains(d.Remote(), id) { +// found = true +// } +// }) +// +// if !found { +// t.Skip("Test folder '%v' doesn't exist", id) +// } +// +// mountFs(t, rootFs, mntPoint) +// defer unmountFs(t, mntPoint) +// +// for i := 1; i <= 2; i++ { +// out, err := exec.Command("ls", testPoint).Output() +// require.NoError(t, err) +// require.Equal(t, checkOutput, strings.Replace(string(out), "\n", " ", -1)) +// t.Logf("root path has all files") +// _ = writeObjectString(t, rootFs, path.Join(id, strconv.Itoa(i), strconv.Itoa(i), "one_file"), "one content") +// +// for j := 1; j <= 100; j++ { +// out, err := exec.Command("ls", path.Join(testPoint, strconv.Itoa(j))).Output() +// require.NoError(t, err) +// require.Equal(t, checkOutput, strings.Replace(string(out), "\n", " ", -1), "'%v' doesn't match", j) +// } +// obj, err := rootFs.NewObject(path.Join(id, strconv.Itoa(i), strconv.Itoa(i), "one_file")) +// require.NoError(t, err) +// err = obj.Remove() +// require.NoError(t, err) +// t.Logf("folders contain all the files") +// +// out, err = exec.Command("date").Output() +// require.NoError(t, err) +// t.Logf("check #%v date: '%v'", i, strings.Replace(string(out), "\n", " ", -1)) +// +// if i < 2 { +// time.Sleep(time.Second * 60) +// } +// } +//} func writeObjectRandomBytes(t *testing.T, f fs.Fs, size int64) fs.Object { remote := strconv.Itoa(rand.Int()) + ".bin" @@ -454,32 +533,205 @@ func readDataFromObj(t *testing.T, co fs.Object, offset, end int64, useSeek bool return checkSample } -func doStuff(t *testing.T, times int, maxDuration time.Duration, stuff func()) { - var wg sync.WaitGroup - - for i := 0; i < times; i++ { - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(maxDuration / 2) - stuff() - time.Sleep(maxDuration / 2) - }() - } - - wg.Wait() +func cleanupFs(t *testing.T, f fs.Fs, b *cache.Persistent) { + err := f.Features().Purge() + require.NoError(t, err) + b.Close() } -func reset(t *testing.T) { - var err error - - err = rootFs.Features().Purge() +func newLocalCacheCryptFs(t *testing.T, localRemote, cacheRemote, cryptRemote string, purge bool, cfg map[string]string) (fs.Fs, *cache.Persistent) { + fstest.Initialise() + dbPath := filepath.Join(fs.CacheDir, "cache-backend", cacheRemote+".db") + boltDb, err := cache.GetPersistent(dbPath, &cache.Features{PurgeDb: true}) require.NoError(t, err) + localExists := false + cacheExists := false + cryptExists := false + for _, s := range fs.ConfigFileSections() { + if s == localRemote { + localExists = true + } + if s == cacheRemote { + cacheExists = true + } + if s == cryptRemote { + cryptExists = true + } + } + + localRemoteWrap := "" + if !localExists { + localRemoteWrap = localRemote + ":/var/tmp/" + localRemote + fs.ConfigFileSet(localRemote, "type", "local") + fs.ConfigFileSet(localRemote, "nounc", "true") + } + + if !cacheExists { + fs.ConfigFileSet(cacheRemote, "type", "cache") + fs.ConfigFileSet(cacheRemote, "remote", localRemoteWrap) + } + if c, ok := cfg["chunk_size"]; ok { + fs.ConfigFileSet(cacheRemote, "chunk_size", c) + } else { + fs.ConfigFileSet(cacheRemote, "chunk_size", "1m") + } + if c, ok := cfg["chunk_total_size"]; ok { + fs.ConfigFileSet(cacheRemote, "chunk_total_size", c) + } else { + fs.ConfigFileSet(cacheRemote, "chunk_total_size", "2m") + } + if c, ok := cfg["info_age"]; ok { + fs.ConfigFileSet(cacheRemote, "info_age", c) + } else { + fs.ConfigFileSet(cacheRemote, "info_age", infoAge.String()) + } + + if !cryptExists { + t.Skipf("Skipping due to missing crypt remote: %v", cryptRemote) + } + + if c, ok := cfg["cache-chunk-no-memory"]; ok { + _ = flag.Set("cache-chunk-no-memory", c) + } else { + _ = flag.Set("cache-chunk-no-memory", "true") + } + if c, ok := cfg["cache-workers"]; ok { + _ = flag.Set("cache-workers", c) + } else { + _ = flag.Set("cache-workers", strconv.Itoa(workers)) + } + if c, ok := cfg["cache-chunk-clean-interval"]; ok { + _ = flag.Set("cache-chunk-clean-interval", c) + } else { + _ = flag.Set("cache-chunk-clean-interval", chunkClean.String()) + } + if c, ok := cfg["cache-writes"]; ok { + _ = flag.Set("cache-writes", c) + } else { + _ = flag.Set("cache-writes", strconv.FormatBool(cache.DefCacheWrites)) + } + // Instantiate root - rootFs, err = fs.NewFs(*RemoteName + ":") + f, err := fs.NewFs(cryptRemote + ":") require.NoError(t, err) - err = rootFs.Mkdir("") + if purge { + _ = f.Features().Purge() + require.NoError(t, err) + } + err = f.Mkdir("") + require.NoError(t, err) + + return f, boltDb +} + +func newLocalCacheFs(t *testing.T, localRemote, cacheRemote string, cfg map[string]string) (fs.Fs, *cache.Persistent) { + fstest.Initialise() + dbPath := filepath.Join(fs.CacheDir, "cache-backend", cacheRemote+".db") + boltDb, err := cache.GetPersistent(dbPath, &cache.Features{PurgeDb: true}) + require.NoError(t, err) + + localExists := false + cacheExists := false + for _, s := range fs.ConfigFileSections() { + if s == localRemote { + localExists = true + } + if s == cacheRemote { + cacheExists = true + } + } + + localRemoteWrap := "" + if !localExists { + localRemoteWrap = localRemote + ":/var/tmp/" + localRemote + fs.ConfigFileSet(localRemote, "type", "local") + fs.ConfigFileSet(localRemote, "nounc", "true") + } + + if !cacheExists { + fs.ConfigFileSet(cacheRemote, "type", "cache") + fs.ConfigFileSet(cacheRemote, "remote", localRemoteWrap) + } + if c, ok := cfg["chunk_size"]; ok { + fs.ConfigFileSet(cacheRemote, "chunk_size", c) + } else { + fs.ConfigFileSet(cacheRemote, "chunk_size", "1m") + } + if c, ok := cfg["chunk_total_size"]; ok { + fs.ConfigFileSet(cacheRemote, "chunk_total_size", c) + } else { + fs.ConfigFileSet(cacheRemote, "chunk_total_size", "2m") + } + if c, ok := cfg["info_age"]; ok { + fs.ConfigFileSet(cacheRemote, "info_age", c) + } else { + fs.ConfigFileSet(cacheRemote, "info_age", infoAge.String()) + } + + if c, ok := cfg["cache-chunk-no-memory"]; ok { + _ = flag.Set("cache-chunk-no-memory", c) + } else { + _ = flag.Set("cache-chunk-no-memory", "true") + } + if c, ok := cfg["cache-workers"]; ok { + _ = flag.Set("cache-workers", c) + } else { + _ = flag.Set("cache-workers", strconv.Itoa(workers)) + } + if c, ok := cfg["cache-chunk-clean-interval"]; ok { + _ = flag.Set("cache-chunk-clean-interval", c) + } else { + _ = flag.Set("cache-chunk-clean-interval", chunkClean.String()) + } + if c, ok := cfg["cache-writes"]; ok { + _ = flag.Set("cache-writes", c) + } else { + _ = flag.Set("cache-writes", strconv.FormatBool(cache.DefCacheWrites)) + } + + // Instantiate root + f, err := fs.NewFs(cacheRemote + ":") + require.NoError(t, err) + _ = f.Features().Purge() + require.NoError(t, err) + err = f.Mkdir("") + require.NoError(t, err) + + return f, boltDb +} + +//func mountFs(t *testing.T, f fs.Fs, mntPoint string) { +// if runtime.GOOS == "windows" { +// t.Skip("Skipping test cause on windows") +// return +// } +// +// _ = flag.Set("debug-fuse", "false") +// +// go func() { +// mountlib.DebugFUSE = false +// mountlib.AllowOther = true +// mount.Mount(f, mntPoint) +// }() +// +// time.Sleep(time.Second * 3) +//} + +func unmountFs(t *testing.T, mntPoint string) { + var out []byte + var err error + + if runtime.GOOS == "windows" { + t.Skip("Skipping test cause on windows") + return + } else if runtime.GOOS == "linux" { + out, err = exec.Command("fusermount", "-u", mntPoint).Output() + } else if runtime.GOOS == "darwin" { + out, err = exec.Command("diskutil", "unmount", mntPoint).Output() + } + + t.Logf("Unmount output: %v", string(out)) require.NoError(t, err) } @@ -499,20 +751,6 @@ func getCacheFs(f fs.Fs) (*cache.Fs, error) { return nil, fmt.Errorf("didn't found a cache fs") } -func getSourceFs(f fs.Fs) (fs.Fs, error) { - if f.Features().UnWrap != nil { - sfs := f.Features().UnWrap() - _, ok := sfs.(*cache.Fs) - if !ok { - return sfs, nil - } - - return getSourceFs(sfs) - } - - return nil, fmt.Errorf("didn't found a source fs") -} - var ( _ fs.Fs = (*cache.Fs)(nil) _ fs.Fs = (*local.Fs)(nil) diff --git a/cache/directory.go b/cache/directory.go index 84ae21e50..d137e5869 100644 --- a/cache/directory.go +++ b/cache/directory.go @@ -22,12 +22,22 @@ type Directory struct { CacheModTime int64 `json:"modTime"` // modification or creation time - IsZero for unknown CacheSize int64 `json:"size"` // size of directory and contents or -1 if unknown - CacheItems int64 `json:"items"` // number of objects or -1 for unknown - CacheType string `json:"cacheType"` // object type + CacheItems int64 `json:"items"` // number of objects or -1 for unknown + CacheType string `json:"cacheType"` // object type + CacheTs *time.Time `json:",omitempty"` } // NewDirectory builds an empty dir which will be used to unmarshal data in it func NewDirectory(f *Fs, remote string) *Directory { + cd := ShallowDirectory(f, remote) + t := time.Now() + cd.CacheTs = &t + + return cd +} + +// ShallowDirectory builds an empty dir which will be used to unmarshal data in it +func ShallowDirectory(f *Fs, remote string) *Directory { var cd *Directory fullRemote := cleanPath(path.Join(f.Root(), remote)) @@ -54,6 +64,7 @@ func DirectoryFromOriginal(f *Fs, d fs.Directory) *Directory { dir := cleanPath(path.Dir(fullRemote)) name := cleanPath(path.Base(fullRemote)) + t := time.Now() cd = &Directory{ Directory: d, CacheFs: f, @@ -63,6 +74,7 @@ func DirectoryFromOriginal(f *Fs, d fs.Directory) *Directory { CacheSize: d.Size(), CacheItems: d.Items(), CacheType: "Directory", + CacheTs: &t, } return cd diff --git a/cache/object.go b/cache/object.go index 028f72864..ceed9f583 100644 --- a/cache/object.go +++ b/cache/object.go @@ -27,6 +27,7 @@ type Object struct { CacheSize int64 `json:"size"` // size of directory and contents or -1 if unknown CacheStorable bool `json:"storable"` // says whether this object can be stored CacheType string `json:"cacheType"` + CacheTs time.Time `json:"cacheTs"` cacheHashes map[fs.HashType]string // all supported hashes cached refreshMutex sync.Mutex @@ -45,6 +46,7 @@ func NewObject(f *Fs, remote string) *Object { //0745 379 768 CacheSize: 0, CacheStorable: false, CacheType: "Object", + CacheTs: time.Now(), } return co } @@ -99,6 +101,7 @@ func ObjectFromOriginal(f *Fs, o fs.Object) *Object { Name: cleanPath(name), Dir: cleanPath(dir), CacheType: "Object", + CacheTs: time.Now(), } co.updateData(o) return co @@ -109,6 +112,7 @@ func (o *Object) updateData(source fs.Object) { o.CacheModTime = source.ModTime().UnixNano() o.CacheSize = source.Size() o.CacheStorable = source.Storable() + o.CacheTs = time.Now() o.cacheHashes = make(map[fs.HashType]string) } @@ -147,6 +151,11 @@ func (o *Object) parentRemote() string { return cleanPath(path.Dir(absPath)) } +// parentDir returns the absolute path parent remote +func (o *Object) parentDir() *Directory { + return NewDirectory(o.CacheFs, cleanPath(path.Dir(o.Remote()))) +} + // ModTime returns the cached ModTime func (o *Object) ModTime() time.Time { return time.Unix(0, o.CacheModTime) diff --git a/cache/storage_persistent.go b/cache/storage_persistent.go index 87678c438..74c0d8018 100644 --- a/cache/storage_persistent.go +++ b/cache/storage_persistent.go @@ -130,7 +130,6 @@ func (b *Persistent) Connect() error { }) b.db = db - return nil } @@ -159,36 +158,6 @@ func (b *Persistent) getBucket(dir string, createIfMissing bool, tx *bolt.Tx) *b return bucket } -// updateRootTs is a convenience method to update an object timestamp to mark that it was used recently -func (b *Persistent) updateRootTs(tx *bolt.Tx, path string, t time.Duration) { - tsBucket := tx.Bucket([]byte(RootTsBucket)) - ts := time.Now().Add(t) - found := false - - // delete previous timestamps for the same object - c := tsBucket.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - if bytes.Equal(v, []byte(path)) { - if tsInCache := time.Unix(0, btoi(k)); tsInCache.After(ts) && !found { - found = true - continue - } - err := c.Delete() - if err != nil { - fs.Debugf(path, "failed to clean object: %v", err) - } - } - } - if found { - return - } - - err := tsBucket.Put(itob(ts.UnixNano()), []byte(path)) - if err != nil { - fs.Debugf(path, "failed to timestamp chunk: %v", err) - } -} - // AddDir will update a CachedDirectory metadata and all its entries func (b *Persistent) AddDir(cachedDir *Directory) error { return b.db.Update(func(tx *bolt.Tx) error { @@ -205,8 +174,6 @@ func (b *Persistent) AddDir(cachedDir *Directory) error { if err != nil { return err } - - b.updateRootTs(tx, cachedDir.abs(), cachedDir.CacheFs.fileAge) return nil }) } @@ -225,7 +192,7 @@ func (b *Persistent) GetDirEntries(cachedDir *Directory) (fs.DirEntries, error) if val != nil { err := json.Unmarshal(val, cachedDir) if err != nil { - fs.Debugf(cachedDir.abs(), "error during unmarshalling obj: %v", err) + return errors.Errorf("error during unmarshalling obj: %v", err) } } else { return errors.Errorf("missing cached dir: %v", cachedDir) @@ -276,7 +243,32 @@ func (b *Persistent) GetDirEntries(cachedDir *Directory) (fs.DirEntries, error) // RemoveDir will delete a CachedDirectory, all its objects and all the chunks stored for it func (b *Persistent) RemoveDir(fp string) error { - err := b.ExpireDir(fp) + var err error + parentDir, dirName := path.Split(fp) + if fp == "" { + err = b.db.Update(func(tx *bolt.Tx) error { + err := tx.DeleteBucket([]byte(RootBucket)) + if err != nil { + fs.Debugf(fp, "couldn't delete from cache: %v", err) + return err + } + _, _ = tx.CreateBucketIfNotExists([]byte(RootBucket)) + return nil + }) + } else { + err = b.db.Update(func(tx *bolt.Tx) error { + bucket := b.getBucket(cleanPath(parentDir), false, tx) + if bucket == nil { + return errors.Errorf("couldn't open bucket (%v)", fp) + } + // delete the cached dir + err := bucket.DeleteBucket([]byte(cleanPath(dirName))) + if err != nil { + fs.Debugf(fp, "couldn't delete from cache: %v", err) + } + return nil + }) + } // delete chunks on disk // safe to ignore as the files might not have been open @@ -289,35 +281,33 @@ func (b *Persistent) RemoveDir(fp string) error { // ExpireDir will flush a CachedDirectory and all its objects from the objects // chunks will remain as they are -func (b *Persistent) ExpireDir(fp string) error { - parentDir, dirName := path.Split(fp) - if fp == "" { - return b.db.Update(func(tx *bolt.Tx) error { - err := tx.DeleteBucket([]byte(RootBucket)) - if err != nil { - fs.Debugf(fp, "couldn't delete from cache: %v", err) - return err - } - err = tx.DeleteBucket([]byte(RootTsBucket)) - if err != nil { - fs.Debugf(fp, "couldn't delete from cache: %v", err) - return err - } - _, _ = tx.CreateBucketIfNotExists([]byte(RootBucket)) - _, _ = tx.CreateBucketIfNotExists([]byte(RootTsBucket)) - return nil - }) - } +func (b *Persistent) ExpireDir(cd *Directory) error { + t := time.Now().Add(cd.CacheFs.fileAge * -1) + cd.CacheTs = &t + // expire all parents return b.db.Update(func(tx *bolt.Tx) error { - bucket := b.getBucket(cleanPath(parentDir), false, tx) - if bucket == nil { - return errors.Errorf("couldn't open bucket (%v)", fp) - } - // delete the cached dir - err := bucket.DeleteBucket([]byte(cleanPath(dirName))) - if err != nil { - fs.Debugf(fp, "couldn't delete from cache: %v", err) + // expire all the parents + currentDir := cd.abs() + for { // until we get to the root + bucket := b.getBucket(currentDir, false, tx) + if bucket != nil { + val := bucket.Get([]byte(".")) + if val != nil { + cd2 := &Directory{CacheFs: cd.CacheFs} + err := json.Unmarshal(val, cd2) + if err == nil { + fs.Debugf(cd, "cache: expired %v", currentDir) + cd2.CacheTs = &t + enc2, _ := json.Marshal(cd2) + _ = bucket.Put([]byte("."), enc2) + } + } + } + if currentDir == "" { + break + } + currentDir = cleanPath(path.Dir(currentDir)) } return nil }) @@ -354,7 +344,6 @@ func (b *Persistent) AddObject(cachedObject *Object) error { if err != nil { return errors.Errorf("couldn't cache object (%v) info: %v", cachedObject, err) } - b.updateRootTs(tx, cachedObject.abs(), cachedObject.CacheFs.fileAge) return nil }) } @@ -543,54 +532,6 @@ func (b *Persistent) CleanChunksBySize(maxSize int64) { } } -// CleanEntriesByAge will cleanup on a cron basis -func (b *Persistent) CleanEntriesByAge(entryAge time.Duration) { - b.cleanupMux.Lock() - defer b.cleanupMux.Unlock() - var cntEntries int - - err := b.db.Update(func(tx *bolt.Tx) error { - min := itob(0) - max := itob(time.Now().UnixNano()) - - rootTsBucket := tx.Bucket([]byte(RootTsBucket)) - if rootTsBucket == nil { - return errors.Errorf("Couldn't open (%v) bucket", rootTsBucket) - } - // iterate through ts - c := rootTsBucket.Cursor() - for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() { - if v == nil { - continue - } - // get the path - absPath := string(v) - absDir, absName := path.Split(absPath) - - // delete this ts entry - err := c.Delete() - if err != nil { - fs.Errorf(absPath, "failed deleting object during cleanup: %v", err) - continue - } - - // search for the entry in the root bucket, skip it if it's not found - parentBucket := b.getBucket(cleanPath(absDir), false, tx) - if parentBucket == nil { - continue - } - _ = parentBucket.Delete([]byte(cleanPath(absName))) - cntEntries = cntEntries + 1 - } - fs.Infof("cache", "deleted (%v) entries", cntEntries) - return nil - }) - - if err != nil { - fs.Errorf("cache", "cleanup failed: %v", err) - } -} - // Stats returns a go map with the stats key values func (b *Persistent) Stats() (map[string]map[string]interface{}, error) { r := make(map[string]map[string]interface{}) @@ -670,6 +611,9 @@ func (b *Persistent) Stats() (map[string]map[string]interface{}, error) { // Purge will flush the entire cache func (b *Persistent) Purge() { + b.cleanupMux.Lock() + defer b.cleanupMux.Unlock() + _ = b.db.Update(func(tx *bolt.Tx) error { _ = tx.DeleteBucket([]byte(RootBucket)) _ = tx.DeleteBucket([]byte(RootTsBucket)) @@ -716,25 +660,6 @@ func (b *Persistent) GetChunkTs(path string, offset int64) (time.Time, error) { return t, err } -// GetRootTs retrieves the current timestamp of an object or dir -func (b *Persistent) GetRootTs(path string) (time.Time, error) { - var t time.Time - - err := b.db.View(func(tx *bolt.Tx) error { - tsBucket := tx.Bucket([]byte(RootTsBucket)) - c := tsBucket.Cursor() - for k, v := c.First(); k != nil; k, v = c.Next() { - if bytes.Equal(v, []byte(path)) { - t = time.Unix(0, btoi(k)) - return nil - } - } - return errors.Errorf("not found %v", path) - }) - - return t, err -} - func (b *Persistent) iterateBuckets(buk *bolt.Bucket, bucketFn func(name string), kvFn func(key string, val []byte)) error { err := b.db.View(func(tx *bolt.Tx) error { var c *bolt.Cursor @@ -766,6 +691,9 @@ func (b *Persistent) iterateBuckets(buk *bolt.Bucket, bucketFn func(name string) // Close should be called when the program ends gracefully func (b *Persistent) Close() { + b.cleanupMux.Lock() + defer b.cleanupMux.Unlock() + err := b.db.Close() if err != nil { fs.Errorf(b, "closing handle: %v", err)