diff --git a/fs/cache/cache.go b/fs/cache/cache.go new file mode 100644 index 000000000..ce331ca66 --- /dev/null +++ b/fs/cache/cache.go @@ -0,0 +1,92 @@ +// Package cache implements the Fs cache +package cache + +import ( + "sync" + "time" + + "github.com/ncw/rclone/fs" +) + +var ( + fsCacheMu sync.Mutex + fsCache = map[string]*cacheEntry{} + fsNewFs = fs.NewFs // for tests + expireRunning = false + cacheExpireDuration = 300 * time.Second // expire the cache entry when it is older than this + cacheExpireInterval = 60 * time.Second // interval to run the cache expire +) + +type cacheEntry struct { + f fs.Fs // cached f + err error // nil or fs.ErrorIsFile + fsString string // remote string + lastUsed time.Time // time used for expiry +} + +// Get gets a fs.Fs named fsString either from the cache or creates it afresh +func Get(fsString string) (f fs.Fs, err error) { + fsCacheMu.Lock() + defer fsCacheMu.Unlock() + entry, ok := fsCache[fsString] + if !ok { + f, err = fsNewFs(fsString) + if err != nil && err != fs.ErrorIsFile { + return f, err + } + entry = &cacheEntry{ + f: f, + fsString: fsString, + err: err, + } + fsCache[fsString] = entry + } + entry.lastUsed = time.Now() + if !expireRunning { + time.AfterFunc(cacheExpireInterval, cacheExpire) + expireRunning = true + } + return entry.f, entry.err +} + +// Put puts an fs.Fs named fsString into the cache +func Put(fsString string, f fs.Fs) { + fsCacheMu.Lock() + defer fsCacheMu.Unlock() + fsCache[fsString] = &cacheEntry{ + f: f, + fsString: fsString, + lastUsed: time.Now(), + } + if !expireRunning { + time.AfterFunc(cacheExpireInterval, cacheExpire) + expireRunning = true + } +} + +// cacheExpire expires any entries that haven't been used recently +func cacheExpire() { + fsCacheMu.Lock() + defer fsCacheMu.Unlock() + now := time.Now() + for fsString, entry := range fsCache { + if now.Sub(entry.lastUsed) > cacheExpireDuration { + delete(fsCache, fsString) + } + } + if len(fsCache) != 0 { + time.AfterFunc(cacheExpireInterval, cacheExpire) + expireRunning = true + } else { + expireRunning = false + } +} + +// Clear removes everything from the cahce +func Clear() { + fsCacheMu.Lock() + for k := range fsCache { + delete(fsCache, k) + } + fsCacheMu.Unlock() +} diff --git a/fs/cache/cache_test.go b/fs/cache/cache_test.go new file mode 100644 index 000000000..9bf16b182 --- /dev/null +++ b/fs/cache/cache_test.go @@ -0,0 +1,147 @@ +package cache + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fstest/mockfs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + called = 0 + errSentinel = errors.New("an error") +) + +func mockNewFs(t *testing.T) func() { + called = 0 + oldFsNewFs := fsNewFs + fsNewFs = func(path string) (fs.Fs, error) { + assert.Equal(t, 0, called) + called++ + switch path { + case "/": + return mockfs.NewFs("mock", "mock"), nil + case "/file.txt": + return mockfs.NewFs("mock", "mock"), fs.ErrorIsFile + case "/error": + return nil, errSentinel + } + panic(fmt.Sprintf("Unknown path %q", path)) + } + return func() { + fsNewFs = oldFsNewFs + fsCacheMu.Lock() + fsCache = map[string]*cacheEntry{} + expireRunning = false + fsCacheMu.Unlock() + } +} + +func TestGet(t *testing.T) { + defer mockNewFs(t)() + + assert.Equal(t, 0, len(fsCache)) + + f, err := Get("/") + require.NoError(t, err) + + assert.Equal(t, 1, len(fsCache)) + + f2, err := Get("/") + require.NoError(t, err) + + assert.Equal(t, f, f2) +} + +func TestGetFile(t *testing.T) { + defer mockNewFs(t)() + + assert.Equal(t, 0, len(fsCache)) + + f, err := Get("/file.txt") + require.Equal(t, fs.ErrorIsFile, err) + + assert.Equal(t, 1, len(fsCache)) + + f2, err := Get("/file.txt") + require.Equal(t, fs.ErrorIsFile, err) + + assert.Equal(t, f, f2) +} + +func TestGetError(t *testing.T) { + defer mockNewFs(t)() + + assert.Equal(t, 0, len(fsCache)) + + f, err := Get("/error") + require.Equal(t, errSentinel, err) + require.Equal(t, nil, f) + + assert.Equal(t, 0, len(fsCache)) +} + +func TestPut(t *testing.T) { + defer mockNewFs(t)() + + f := mockfs.NewFs("mock", "mock") + + assert.Equal(t, 0, len(fsCache)) + + Put("/alien", f) + + assert.Equal(t, 1, len(fsCache)) + + fNew, err := Get("/alien") + require.NoError(t, err) + require.Equal(t, f, fNew) + + assert.Equal(t, 1, len(fsCache)) +} + +func TestCacheExpire(t *testing.T) { + defer mockNewFs(t)() + + cacheExpireInterval = time.Millisecond + assert.Equal(t, false, expireRunning) + + _, err := Get("/") + require.NoError(t, err) + + fsCacheMu.Lock() + entry := fsCache["/"] + + assert.Equal(t, 1, len(fsCache)) + fsCacheMu.Unlock() + cacheExpire() + fsCacheMu.Lock() + assert.Equal(t, 1, len(fsCache)) + entry.lastUsed = time.Now().Add(-cacheExpireDuration - 60*time.Second) + assert.Equal(t, true, expireRunning) + fsCacheMu.Unlock() + time.Sleep(10 * time.Millisecond) + fsCacheMu.Lock() + assert.Equal(t, false, expireRunning) + assert.Equal(t, 0, len(fsCache)) + fsCacheMu.Unlock() +} + +func TestClear(t *testing.T) { + defer mockNewFs(t)() + + assert.Equal(t, 0, len(fsCache)) + + _, err := Get("/") + require.NoError(t, err) + + assert.Equal(t, 1, len(fsCache)) + + Clear() + + assert.Equal(t, 0, len(fsCache)) +} diff --git a/fs/operations/rc_test.go b/fs/operations/rc_test.go index bb7075a13..426abc62a 100644 --- a/fs/operations/rc_test.go +++ b/fs/operations/rc_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fs/cache" "github.com/ncw/rclone/fs/operations" "github.com/ncw/rclone/fs/rc" "github.com/ncw/rclone/fstest" @@ -21,8 +22,8 @@ func rcNewRun(t *testing.T, method string) (*fstest.Run, *rc.Call) { r := fstest.NewRun(t) call := rc.Calls.Get(method) assert.NotNil(t, call) - rc.PutCachedFs(r.LocalName, r.Flocal) - rc.PutCachedFs(r.FremoteName, r.Fremote) + cache.Put(r.LocalName, r.Flocal) + cache.Put(r.FremoteName, r.Fremote) return r, call } diff --git a/fs/rc/cache.go b/fs/rc/cache.go index dd9896ce3..5fb52c4b3 100644 --- a/fs/rc/cache.go +++ b/fs/rc/cache.go @@ -1,86 +1,12 @@ -// This implements the Fs cache +// Utilities for accessing the Fs cache package rc import ( - "sync" - "time" - "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fs/cache" ) -var ( - fsCacheMu sync.Mutex - fsCache = map[string]*cacheEntry{} - fsNewFs = fs.NewFs // for tests - expireRunning = false - cacheExpireDuration = 300 * time.Second // expire the cache entry when it is older than this - cacheExpireInterval = 60 * time.Second // interval to run the cache expire -) - -type cacheEntry struct { - f fs.Fs - fsString string - lastUsed time.Time -} - -// GetCachedFs gets a fs.Fs named fsString either from the cache or creates it afresh -func GetCachedFs(fsString string) (f fs.Fs, err error) { - fsCacheMu.Lock() - defer fsCacheMu.Unlock() - entry, ok := fsCache[fsString] - if !ok { - f, err = fsNewFs(fsString) - if err != nil { - return nil, err - } - entry = &cacheEntry{ - f: f, - fsString: fsString, - } - fsCache[fsString] = entry - } - entry.lastUsed = time.Now() - if !expireRunning { - time.AfterFunc(cacheExpireInterval, cacheExpire) - expireRunning = true - } - return entry.f, err -} - -// PutCachedFs puts an fs.Fs named fsString into the cache -func PutCachedFs(fsString string, f fs.Fs) { - fsCacheMu.Lock() - defer fsCacheMu.Unlock() - fsCache[fsString] = &cacheEntry{ - f: f, - fsString: fsString, - lastUsed: time.Now(), - } - if !expireRunning { - time.AfterFunc(cacheExpireInterval, cacheExpire) - expireRunning = true - } -} - -// cacheExpire expires any entries that haven't been used recently -func cacheExpire() { - fsCacheMu.Lock() - defer fsCacheMu.Unlock() - now := time.Now() - for fsString, entry := range fsCache { - if now.Sub(entry.lastUsed) > cacheExpireDuration { - delete(fsCache, fsString) - } - } - if len(fsCache) != 0 { - time.AfterFunc(cacheExpireInterval, cacheExpire) - expireRunning = true - } else { - expireRunning = false - } -} - // GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) { fsString, err := in.GetString(fsName) @@ -88,7 +14,7 @@ func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) { return nil, err } - return GetCachedFs(fsString) + return cache.Get(fsString) } // GetFs gets a fs.Fs named "fs" either from the cache or creates it afresh diff --git a/fs/rc/cache_test.go b/fs/rc/cache_test.go index 6d53d6e2f..d133df462 100644 --- a/fs/rc/cache_test.go +++ b/fs/rc/cache_test.go @@ -2,77 +2,21 @@ package rc import ( "testing" - "time" - "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fs/cache" "github.com/ncw/rclone/fstest/mockfs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -var called = 0 - func mockNewFs(t *testing.T) func() { - called = 0 - oldFsNewFs := fsNewFs - fsNewFs = func(path string) (fs.Fs, error) { - assert.Equal(t, 0, called) - called++ - assert.Equal(t, "/", path) - return mockfs.NewFs("mock", "mock"), nil - } + f := mockfs.NewFs("mock", "mock") + cache.Put("/", f) return func() { - fsNewFs = oldFsNewFs - fsCacheMu.Lock() - fsCache = map[string]*cacheEntry{} - expireRunning = false - fsCacheMu.Unlock() + cache.Clear() } } -func TestGetCachedFs(t *testing.T) { - defer mockNewFs(t)() - - assert.Equal(t, 0, len(fsCache)) - - f, err := GetCachedFs("/") - require.NoError(t, err) - - assert.Equal(t, 1, len(fsCache)) - - f2, err := GetCachedFs("/") - require.NoError(t, err) - - assert.Equal(t, f, f2) -} - -func TestCacheExpire(t *testing.T) { - defer mockNewFs(t)() - - cacheExpireInterval = time.Millisecond - assert.Equal(t, false, expireRunning) - - _, err := GetCachedFs("/") - require.NoError(t, err) - - fsCacheMu.Lock() - entry := fsCache["/"] - - assert.Equal(t, 1, len(fsCache)) - fsCacheMu.Unlock() - cacheExpire() - fsCacheMu.Lock() - assert.Equal(t, 1, len(fsCache)) - entry.lastUsed = time.Now().Add(-cacheExpireDuration - 60*time.Second) - assert.Equal(t, true, expireRunning) - fsCacheMu.Unlock() - time.Sleep(10 * time.Millisecond) - fsCacheMu.Lock() - assert.Equal(t, false, expireRunning) - assert.Equal(t, 0, len(fsCache)) - fsCacheMu.Unlock() -} - func TestGetFsNamed(t *testing.T) { defer mockNewFs(t)() diff --git a/fs/rc/rcserver/rcserver.go b/fs/rc/rcserver/rcserver.go index 6ccddfe3c..0c302a80d 100644 --- a/fs/rc/rcserver/rcserver.go +++ b/fs/rc/rcserver/rcserver.go @@ -13,6 +13,7 @@ import ( "github.com/ncw/rclone/cmd/serve/httplib" "github.com/ncw/rclone/cmd/serve/httplib/serve" "github.com/ncw/rclone/fs" + "github.com/ncw/rclone/fs/cache" "github.com/ncw/rclone/fs/config" "github.com/ncw/rclone/fs/list" "github.com/ncw/rclone/fs/rc" @@ -222,7 +223,7 @@ func (s *Server) serveRoot(w http.ResponseWriter, r *http.Request) { } func (s *Server) serveRemote(w http.ResponseWriter, r *http.Request, path string, fsName string) { - f, err := rc.GetCachedFs(fsName) + f, err := cache.Get(fsName) if err != nil { writeError(path, nil, w, errors.Wrap(err, "failed to make Fs"), http.StatusInternalServerError) return diff --git a/fs/sync/rc_test.go b/fs/sync/rc_test.go index 346796e05..852cff5f2 100644 --- a/fs/sync/rc_test.go +++ b/fs/sync/rc_test.go @@ -3,6 +3,7 @@ package sync import ( "testing" + "github.com/ncw/rclone/fs/cache" "github.com/ncw/rclone/fs/rc" "github.com/ncw/rclone/fstest" "github.com/stretchr/testify/assert" @@ -16,8 +17,8 @@ func rcNewRun(t *testing.T, method string) (*fstest.Run, *rc.Call) { r := fstest.NewRun(t) call := rc.Calls.Get(method) assert.NotNil(t, call) - rc.PutCachedFs(r.LocalName, r.Flocal) - rc.PutCachedFs(r.FremoteName, r.Fremote) + cache.Put(r.LocalName, r.Flocal) + cache.Put(r.FremoteName, r.Fremote) return r, call }