fs/cache: factor Fs caching from fs/rc into its own package
This commit is contained in:
parent
f0e439de0d
commit
206e1caa99
7 changed files with 254 additions and 142 deletions
92
fs/cache/cache.go
vendored
Normal file
92
fs/cache/cache.go
vendored
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
// Package cache implements the Fs cache
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
fsCacheMu sync.Mutex
|
||||||
|
fsCache = map[string]*cacheEntry{}
|
||||||
|
fsNewFs = fs.NewFs // for tests
|
||||||
|
expireRunning = false
|
||||||
|
cacheExpireDuration = 300 * time.Second // expire the cache entry when it is older than this
|
||||||
|
cacheExpireInterval = 60 * time.Second // interval to run the cache expire
|
||||||
|
)
|
||||||
|
|
||||||
|
type cacheEntry struct {
|
||||||
|
f fs.Fs // cached f
|
||||||
|
err error // nil or fs.ErrorIsFile
|
||||||
|
fsString string // remote string
|
||||||
|
lastUsed time.Time // time used for expiry
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get gets a fs.Fs named fsString either from the cache or creates it afresh
|
||||||
|
func Get(fsString string) (f fs.Fs, err error) {
|
||||||
|
fsCacheMu.Lock()
|
||||||
|
defer fsCacheMu.Unlock()
|
||||||
|
entry, ok := fsCache[fsString]
|
||||||
|
if !ok {
|
||||||
|
f, err = fsNewFs(fsString)
|
||||||
|
if err != nil && err != fs.ErrorIsFile {
|
||||||
|
return f, err
|
||||||
|
}
|
||||||
|
entry = &cacheEntry{
|
||||||
|
f: f,
|
||||||
|
fsString: fsString,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
fsCache[fsString] = entry
|
||||||
|
}
|
||||||
|
entry.lastUsed = time.Now()
|
||||||
|
if !expireRunning {
|
||||||
|
time.AfterFunc(cacheExpireInterval, cacheExpire)
|
||||||
|
expireRunning = true
|
||||||
|
}
|
||||||
|
return entry.f, entry.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put puts an fs.Fs named fsString into the cache
|
||||||
|
func Put(fsString string, f fs.Fs) {
|
||||||
|
fsCacheMu.Lock()
|
||||||
|
defer fsCacheMu.Unlock()
|
||||||
|
fsCache[fsString] = &cacheEntry{
|
||||||
|
f: f,
|
||||||
|
fsString: fsString,
|
||||||
|
lastUsed: time.Now(),
|
||||||
|
}
|
||||||
|
if !expireRunning {
|
||||||
|
time.AfterFunc(cacheExpireInterval, cacheExpire)
|
||||||
|
expireRunning = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cacheExpire expires any entries that haven't been used recently
|
||||||
|
func cacheExpire() {
|
||||||
|
fsCacheMu.Lock()
|
||||||
|
defer fsCacheMu.Unlock()
|
||||||
|
now := time.Now()
|
||||||
|
for fsString, entry := range fsCache {
|
||||||
|
if now.Sub(entry.lastUsed) > cacheExpireDuration {
|
||||||
|
delete(fsCache, fsString)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(fsCache) != 0 {
|
||||||
|
time.AfterFunc(cacheExpireInterval, cacheExpire)
|
||||||
|
expireRunning = true
|
||||||
|
} else {
|
||||||
|
expireRunning = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear removes everything from the cahce
|
||||||
|
func Clear() {
|
||||||
|
fsCacheMu.Lock()
|
||||||
|
for k := range fsCache {
|
||||||
|
delete(fsCache, k)
|
||||||
|
}
|
||||||
|
fsCacheMu.Unlock()
|
||||||
|
}
|
147
fs/cache/cache_test.go
vendored
Normal file
147
fs/cache/cache_test.go
vendored
Normal file
|
@ -0,0 +1,147 @@
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs"
|
||||||
|
"github.com/ncw/rclone/fstest/mockfs"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
called = 0
|
||||||
|
errSentinel = errors.New("an error")
|
||||||
|
)
|
||||||
|
|
||||||
|
func mockNewFs(t *testing.T) func() {
|
||||||
|
called = 0
|
||||||
|
oldFsNewFs := fsNewFs
|
||||||
|
fsNewFs = func(path string) (fs.Fs, error) {
|
||||||
|
assert.Equal(t, 0, called)
|
||||||
|
called++
|
||||||
|
switch path {
|
||||||
|
case "/":
|
||||||
|
return mockfs.NewFs("mock", "mock"), nil
|
||||||
|
case "/file.txt":
|
||||||
|
return mockfs.NewFs("mock", "mock"), fs.ErrorIsFile
|
||||||
|
case "/error":
|
||||||
|
return nil, errSentinel
|
||||||
|
}
|
||||||
|
panic(fmt.Sprintf("Unknown path %q", path))
|
||||||
|
}
|
||||||
|
return func() {
|
||||||
|
fsNewFs = oldFsNewFs
|
||||||
|
fsCacheMu.Lock()
|
||||||
|
fsCache = map[string]*cacheEntry{}
|
||||||
|
expireRunning = false
|
||||||
|
fsCacheMu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGet(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
assert.Equal(t, 0, len(fsCache))
|
||||||
|
|
||||||
|
f, err := Get("/")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, 1, len(fsCache))
|
||||||
|
|
||||||
|
f2, err := Get("/")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, f, f2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFile(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
assert.Equal(t, 0, len(fsCache))
|
||||||
|
|
||||||
|
f, err := Get("/file.txt")
|
||||||
|
require.Equal(t, fs.ErrorIsFile, err)
|
||||||
|
|
||||||
|
assert.Equal(t, 1, len(fsCache))
|
||||||
|
|
||||||
|
f2, err := Get("/file.txt")
|
||||||
|
require.Equal(t, fs.ErrorIsFile, err)
|
||||||
|
|
||||||
|
assert.Equal(t, f, f2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetError(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
assert.Equal(t, 0, len(fsCache))
|
||||||
|
|
||||||
|
f, err := Get("/error")
|
||||||
|
require.Equal(t, errSentinel, err)
|
||||||
|
require.Equal(t, nil, f)
|
||||||
|
|
||||||
|
assert.Equal(t, 0, len(fsCache))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPut(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
f := mockfs.NewFs("mock", "mock")
|
||||||
|
|
||||||
|
assert.Equal(t, 0, len(fsCache))
|
||||||
|
|
||||||
|
Put("/alien", f)
|
||||||
|
|
||||||
|
assert.Equal(t, 1, len(fsCache))
|
||||||
|
|
||||||
|
fNew, err := Get("/alien")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, f, fNew)
|
||||||
|
|
||||||
|
assert.Equal(t, 1, len(fsCache))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCacheExpire(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
cacheExpireInterval = time.Millisecond
|
||||||
|
assert.Equal(t, false, expireRunning)
|
||||||
|
|
||||||
|
_, err := Get("/")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fsCacheMu.Lock()
|
||||||
|
entry := fsCache["/"]
|
||||||
|
|
||||||
|
assert.Equal(t, 1, len(fsCache))
|
||||||
|
fsCacheMu.Unlock()
|
||||||
|
cacheExpire()
|
||||||
|
fsCacheMu.Lock()
|
||||||
|
assert.Equal(t, 1, len(fsCache))
|
||||||
|
entry.lastUsed = time.Now().Add(-cacheExpireDuration - 60*time.Second)
|
||||||
|
assert.Equal(t, true, expireRunning)
|
||||||
|
fsCacheMu.Unlock()
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
fsCacheMu.Lock()
|
||||||
|
assert.Equal(t, false, expireRunning)
|
||||||
|
assert.Equal(t, 0, len(fsCache))
|
||||||
|
fsCacheMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClear(t *testing.T) {
|
||||||
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
assert.Equal(t, 0, len(fsCache))
|
||||||
|
|
||||||
|
_, err := Get("/")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, 1, len(fsCache))
|
||||||
|
|
||||||
|
Clear()
|
||||||
|
|
||||||
|
assert.Equal(t, 0, len(fsCache))
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
|
"github.com/ncw/rclone/fs/cache"
|
||||||
"github.com/ncw/rclone/fs/operations"
|
"github.com/ncw/rclone/fs/operations"
|
||||||
"github.com/ncw/rclone/fs/rc"
|
"github.com/ncw/rclone/fs/rc"
|
||||||
"github.com/ncw/rclone/fstest"
|
"github.com/ncw/rclone/fstest"
|
||||||
|
@ -21,8 +22,8 @@ func rcNewRun(t *testing.T, method string) (*fstest.Run, *rc.Call) {
|
||||||
r := fstest.NewRun(t)
|
r := fstest.NewRun(t)
|
||||||
call := rc.Calls.Get(method)
|
call := rc.Calls.Get(method)
|
||||||
assert.NotNil(t, call)
|
assert.NotNil(t, call)
|
||||||
rc.PutCachedFs(r.LocalName, r.Flocal)
|
cache.Put(r.LocalName, r.Flocal)
|
||||||
rc.PutCachedFs(r.FremoteName, r.Fremote)
|
cache.Put(r.FremoteName, r.Fremote)
|
||||||
return r, call
|
return r, call
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,86 +1,12 @@
|
||||||
// This implements the Fs cache
|
// Utilities for accessing the Fs cache
|
||||||
|
|
||||||
package rc
|
package rc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
|
"github.com/ncw/rclone/fs/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
fsCacheMu sync.Mutex
|
|
||||||
fsCache = map[string]*cacheEntry{}
|
|
||||||
fsNewFs = fs.NewFs // for tests
|
|
||||||
expireRunning = false
|
|
||||||
cacheExpireDuration = 300 * time.Second // expire the cache entry when it is older than this
|
|
||||||
cacheExpireInterval = 60 * time.Second // interval to run the cache expire
|
|
||||||
)
|
|
||||||
|
|
||||||
type cacheEntry struct {
|
|
||||||
f fs.Fs
|
|
||||||
fsString string
|
|
||||||
lastUsed time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetCachedFs gets a fs.Fs named fsString either from the cache or creates it afresh
|
|
||||||
func GetCachedFs(fsString string) (f fs.Fs, err error) {
|
|
||||||
fsCacheMu.Lock()
|
|
||||||
defer fsCacheMu.Unlock()
|
|
||||||
entry, ok := fsCache[fsString]
|
|
||||||
if !ok {
|
|
||||||
f, err = fsNewFs(fsString)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
entry = &cacheEntry{
|
|
||||||
f: f,
|
|
||||||
fsString: fsString,
|
|
||||||
}
|
|
||||||
fsCache[fsString] = entry
|
|
||||||
}
|
|
||||||
entry.lastUsed = time.Now()
|
|
||||||
if !expireRunning {
|
|
||||||
time.AfterFunc(cacheExpireInterval, cacheExpire)
|
|
||||||
expireRunning = true
|
|
||||||
}
|
|
||||||
return entry.f, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// PutCachedFs puts an fs.Fs named fsString into the cache
|
|
||||||
func PutCachedFs(fsString string, f fs.Fs) {
|
|
||||||
fsCacheMu.Lock()
|
|
||||||
defer fsCacheMu.Unlock()
|
|
||||||
fsCache[fsString] = &cacheEntry{
|
|
||||||
f: f,
|
|
||||||
fsString: fsString,
|
|
||||||
lastUsed: time.Now(),
|
|
||||||
}
|
|
||||||
if !expireRunning {
|
|
||||||
time.AfterFunc(cacheExpireInterval, cacheExpire)
|
|
||||||
expireRunning = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// cacheExpire expires any entries that haven't been used recently
|
|
||||||
func cacheExpire() {
|
|
||||||
fsCacheMu.Lock()
|
|
||||||
defer fsCacheMu.Unlock()
|
|
||||||
now := time.Now()
|
|
||||||
for fsString, entry := range fsCache {
|
|
||||||
if now.Sub(entry.lastUsed) > cacheExpireDuration {
|
|
||||||
delete(fsCache, fsString)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(fsCache) != 0 {
|
|
||||||
time.AfterFunc(cacheExpireInterval, cacheExpire)
|
|
||||||
expireRunning = true
|
|
||||||
} else {
|
|
||||||
expireRunning = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh
|
// GetFsNamed gets a fs.Fs named fsName either from the cache or creates it afresh
|
||||||
func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) {
|
func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) {
|
||||||
fsString, err := in.GetString(fsName)
|
fsString, err := in.GetString(fsName)
|
||||||
|
@ -88,7 +14,7 @@ func GetFsNamed(in Params, fsName string) (f fs.Fs, err error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return GetCachedFs(fsString)
|
return cache.Get(fsString)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFs gets a fs.Fs named "fs" either from the cache or creates it afresh
|
// GetFs gets a fs.Fs named "fs" either from the cache or creates it afresh
|
||||||
|
|
|
@ -2,77 +2,21 @@ package rc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs/cache"
|
||||||
"github.com/ncw/rclone/fstest/mockfs"
|
"github.com/ncw/rclone/fstest/mockfs"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
var called = 0
|
|
||||||
|
|
||||||
func mockNewFs(t *testing.T) func() {
|
func mockNewFs(t *testing.T) func() {
|
||||||
called = 0
|
f := mockfs.NewFs("mock", "mock")
|
||||||
oldFsNewFs := fsNewFs
|
cache.Put("/", f)
|
||||||
fsNewFs = func(path string) (fs.Fs, error) {
|
|
||||||
assert.Equal(t, 0, called)
|
|
||||||
called++
|
|
||||||
assert.Equal(t, "/", path)
|
|
||||||
return mockfs.NewFs("mock", "mock"), nil
|
|
||||||
}
|
|
||||||
return func() {
|
return func() {
|
||||||
fsNewFs = oldFsNewFs
|
cache.Clear()
|
||||||
fsCacheMu.Lock()
|
|
||||||
fsCache = map[string]*cacheEntry{}
|
|
||||||
expireRunning = false
|
|
||||||
fsCacheMu.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetCachedFs(t *testing.T) {
|
|
||||||
defer mockNewFs(t)()
|
|
||||||
|
|
||||||
assert.Equal(t, 0, len(fsCache))
|
|
||||||
|
|
||||||
f, err := GetCachedFs("/")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, 1, len(fsCache))
|
|
||||||
|
|
||||||
f2, err := GetCachedFs("/")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, f, f2)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCacheExpire(t *testing.T) {
|
|
||||||
defer mockNewFs(t)()
|
|
||||||
|
|
||||||
cacheExpireInterval = time.Millisecond
|
|
||||||
assert.Equal(t, false, expireRunning)
|
|
||||||
|
|
||||||
_, err := GetCachedFs("/")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
fsCacheMu.Lock()
|
|
||||||
entry := fsCache["/"]
|
|
||||||
|
|
||||||
assert.Equal(t, 1, len(fsCache))
|
|
||||||
fsCacheMu.Unlock()
|
|
||||||
cacheExpire()
|
|
||||||
fsCacheMu.Lock()
|
|
||||||
assert.Equal(t, 1, len(fsCache))
|
|
||||||
entry.lastUsed = time.Now().Add(-cacheExpireDuration - 60*time.Second)
|
|
||||||
assert.Equal(t, true, expireRunning)
|
|
||||||
fsCacheMu.Unlock()
|
|
||||||
time.Sleep(10 * time.Millisecond)
|
|
||||||
fsCacheMu.Lock()
|
|
||||||
assert.Equal(t, false, expireRunning)
|
|
||||||
assert.Equal(t, 0, len(fsCache))
|
|
||||||
fsCacheMu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetFsNamed(t *testing.T) {
|
func TestGetFsNamed(t *testing.T) {
|
||||||
defer mockNewFs(t)()
|
defer mockNewFs(t)()
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"github.com/ncw/rclone/cmd/serve/httplib"
|
"github.com/ncw/rclone/cmd/serve/httplib"
|
||||||
"github.com/ncw/rclone/cmd/serve/httplib/serve"
|
"github.com/ncw/rclone/cmd/serve/httplib/serve"
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
|
"github.com/ncw/rclone/fs/cache"
|
||||||
"github.com/ncw/rclone/fs/config"
|
"github.com/ncw/rclone/fs/config"
|
||||||
"github.com/ncw/rclone/fs/list"
|
"github.com/ncw/rclone/fs/list"
|
||||||
"github.com/ncw/rclone/fs/rc"
|
"github.com/ncw/rclone/fs/rc"
|
||||||
|
@ -222,7 +223,7 @@ func (s *Server) serveRoot(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) serveRemote(w http.ResponseWriter, r *http.Request, path string, fsName string) {
|
func (s *Server) serveRemote(w http.ResponseWriter, r *http.Request, path string, fsName string) {
|
||||||
f, err := rc.GetCachedFs(fsName)
|
f, err := cache.Get(fsName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(path, nil, w, errors.Wrap(err, "failed to make Fs"), http.StatusInternalServerError)
|
writeError(path, nil, w, errors.Wrap(err, "failed to make Fs"), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
|
|
@ -3,6 +3,7 @@ package sync
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ncw/rclone/fs/cache"
|
||||||
"github.com/ncw/rclone/fs/rc"
|
"github.com/ncw/rclone/fs/rc"
|
||||||
"github.com/ncw/rclone/fstest"
|
"github.com/ncw/rclone/fstest"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -16,8 +17,8 @@ func rcNewRun(t *testing.T, method string) (*fstest.Run, *rc.Call) {
|
||||||
r := fstest.NewRun(t)
|
r := fstest.NewRun(t)
|
||||||
call := rc.Calls.Get(method)
|
call := rc.Calls.Get(method)
|
||||||
assert.NotNil(t, call)
|
assert.NotNil(t, call)
|
||||||
rc.PutCachedFs(r.LocalName, r.Flocal)
|
cache.Put(r.LocalName, r.Flocal)
|
||||||
rc.PutCachedFs(r.FremoteName, r.Fremote)
|
cache.Put(r.FremoteName, r.Fremote)
|
||||||
return r, call
|
return r, call
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue