cache: add support for rc
This commit is contained in:
parent
f4a1c1163c
commit
677971643c
3 changed files with 123 additions and 0 deletions
58
backend/cache/cache.go
vendored
58
backend/cache/cache.go
vendored
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/ncw/rclone/fs/config/flags"
|
"github.com/ncw/rclone/fs/config/flags"
|
||||||
"github.com/ncw/rclone/fs/config/obscure"
|
"github.com/ncw/rclone/fs/config/obscure"
|
||||||
"github.com/ncw/rclone/fs/hash"
|
"github.com/ncw/rclone/fs/hash"
|
||||||
|
"github.com/ncw/rclone/fs/rc"
|
||||||
"github.com/ncw/rclone/fs/walk"
|
"github.com/ncw/rclone/fs/walk"
|
||||||
"github.com/ncw/rclone/lib/atexit"
|
"github.com/ncw/rclone/lib/atexit"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
@ -418,9 +419,66 @@ func NewFs(name, rootPath string) (fs.Fs, error) {
|
||||||
// even if the wrapped fs doesn't support it, we still want it
|
// even if the wrapped fs doesn't support it, we still want it
|
||||||
f.features.DirCacheFlush = f.DirCacheFlush
|
f.features.DirCacheFlush = f.DirCacheFlush
|
||||||
|
|
||||||
|
rc.Add(rc.Call{
|
||||||
|
Path: "cache/expire",
|
||||||
|
Fn: f.httpExpireRemote,
|
||||||
|
Title: "Purge a remote from cache",
|
||||||
|
Help: `
|
||||||
|
Purge a remote from the cache backend. Supports either a directory or a file.
|
||||||
|
Params:
|
||||||
|
- remote = path to remote (required)
|
||||||
|
- withData = true/false to delete cached data (chunks) as well (optional)
|
||||||
|
`,
|
||||||
|
})
|
||||||
|
|
||||||
return f, fsErr
|
return f, fsErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) {
|
||||||
|
out = make(rc.Params)
|
||||||
|
remoteInt, ok := in["remote"]
|
||||||
|
if !ok {
|
||||||
|
return out, errors.Errorf("remote is needed")
|
||||||
|
}
|
||||||
|
remote := remoteInt.(string)
|
||||||
|
withData := false
|
||||||
|
_, ok = in["withData"]
|
||||||
|
if ok {
|
||||||
|
withData = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !f.cache.HasEntry(path.Join(f.Root(), remote)) {
|
||||||
|
return out, errors.Errorf("%s doesn't exist in cache", remote)
|
||||||
|
}
|
||||||
|
|
||||||
|
co := NewObject(f, remote)
|
||||||
|
err = f.cache.GetObject(co)
|
||||||
|
if err != nil { // it could be a dir
|
||||||
|
cd := NewDirectory(f, remote)
|
||||||
|
err := f.cache.ExpireDir(cd)
|
||||||
|
if err != nil {
|
||||||
|
return out, errors.WithMessage(err, "error expiring directory")
|
||||||
|
}
|
||||||
|
out["status"] = "ok"
|
||||||
|
out["message"] = fmt.Sprintf("cached directory cleared: %v", remote)
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
// expire the entry
|
||||||
|
co.CacheTs = time.Now().Add(f.fileAge * -1)
|
||||||
|
err = f.cache.AddObject(co)
|
||||||
|
if err != nil {
|
||||||
|
return out, errors.WithMessage(err, "error expiring file")
|
||||||
|
}
|
||||||
|
if withData {
|
||||||
|
// safe to ignore as the file might not have been open
|
||||||
|
_ = os.RemoveAll(path.Join(f.cache.dataPath, co.abs()))
|
||||||
|
}
|
||||||
|
|
||||||
|
out["status"] = "ok"
|
||||||
|
out["message"] = fmt.Sprintf("cached file cleared: %v", remote)
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files
|
// receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files
|
||||||
func (f *Fs) receiveChangeNotify(forgetPath string, entryType fs.EntryType) {
|
func (f *Fs) receiveChangeNotify(forgetPath string, entryType fs.EntryType) {
|
||||||
fs.Debugf(f, "notify: expiring cache for '%v'", forgetPath)
|
fs.Debugf(f, "notify: expiring cache for '%v'", forgetPath)
|
||||||
|
|
55
backend/cache/cache_internal_test.go
vendored
55
backend/cache/cache_internal_test.go
vendored
|
@ -24,6 +24,9 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
|
||||||
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
"github.com/ncw/rclone/backend/cache"
|
"github.com/ncw/rclone/backend/cache"
|
||||||
"github.com/ncw/rclone/backend/crypt"
|
"github.com/ncw/rclone/backend/crypt"
|
||||||
_ "github.com/ncw/rclone/backend/drive"
|
_ "github.com/ncw/rclone/backend/drive"
|
||||||
|
@ -31,6 +34,8 @@ import (
|
||||||
"github.com/ncw/rclone/fs"
|
"github.com/ncw/rclone/fs"
|
||||||
"github.com/ncw/rclone/fs/config"
|
"github.com/ncw/rclone/fs/config"
|
||||||
"github.com/ncw/rclone/fs/object"
|
"github.com/ncw/rclone/fs/object"
|
||||||
|
"github.com/ncw/rclone/fs/rc"
|
||||||
|
"github.com/ncw/rclone/fs/rc/rcflags"
|
||||||
"github.com/ncw/rclone/fstest"
|
"github.com/ncw/rclone/fstest"
|
||||||
"github.com/ncw/rclone/vfs"
|
"github.com/ncw/rclone/vfs"
|
||||||
"github.com/ncw/rclone/vfs/vfsflags"
|
"github.com/ncw/rclone/vfs/vfsflags"
|
||||||
|
@ -607,6 +612,56 @@ func TestInternalChangeSeenAfterDirCacheFlush(t *testing.T) {
|
||||||
require.Equal(t, wrappedTime.Unix(), co.ModTime().Unix())
|
require.Equal(t, wrappedTime.Unix(), co.ModTime().Unix())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInternalChangeSeenAfterRc(t *testing.T) {
|
||||||
|
rcflags.Opt.Enabled = true
|
||||||
|
rc.Start(&rcflags.Opt)
|
||||||
|
|
||||||
|
id := fmt.Sprintf("ticsarc%v", time.Now().Unix())
|
||||||
|
rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, false, true, nil, map[string]string{"rc": "true"})
|
||||||
|
defer runInstance.cleanupFs(t, rootFs, boltDb)
|
||||||
|
|
||||||
|
if !runInstance.useMount {
|
||||||
|
t.Skipf("needs mount")
|
||||||
|
}
|
||||||
|
|
||||||
|
cfs, err := runInstance.getCacheFs(rootFs)
|
||||||
|
require.NoError(t, err)
|
||||||
|
chunkSize := cfs.ChunkSize()
|
||||||
|
|
||||||
|
// create some rand test data
|
||||||
|
testData := runInstance.randomBytes(t, (chunkSize*4 + chunkSize/2))
|
||||||
|
runInstance.writeRemoteBytes(t, rootFs, "data.bin", testData)
|
||||||
|
|
||||||
|
// update in the wrapped fs
|
||||||
|
o, err := cfs.UnWrap().NewObject(runInstance.encryptRemoteIfNeeded(t, "data.bin"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
wrappedTime := time.Now().Add(-1 * time.Hour)
|
||||||
|
err = o.SetModTime(wrappedTime)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// get a new instance from the cache
|
||||||
|
co, err := rootFs.NewObject("data.bin")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEqual(t, o.ModTime().String(), co.ModTime().String())
|
||||||
|
|
||||||
|
m := make(map[string]string)
|
||||||
|
res, err := http.Post(fmt.Sprintf("http://localhost:5572/cache/expire?remote=%s", runInstance.encryptRemoteIfNeeded(t, "data.bin")), "application/json; charset=utf-8", strings.NewReader(""))
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer func() {
|
||||||
|
_ = res.Body.Close()
|
||||||
|
}()
|
||||||
|
_ = json.NewDecoder(res.Body).Decode(&m)
|
||||||
|
require.Contains(t, m, "status")
|
||||||
|
require.Contains(t, m, "message")
|
||||||
|
require.Equal(t, "ok", m["status"])
|
||||||
|
require.Contains(t, m["message"], "cached file cleared")
|
||||||
|
|
||||||
|
// get a new instance from the cache
|
||||||
|
co, err = rootFs.NewObject("data.bin")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, wrappedTime.Unix(), co.ModTime().Unix())
|
||||||
|
}
|
||||||
|
|
||||||
func TestInternalCacheWrites(t *testing.T) {
|
func TestInternalCacheWrites(t *testing.T) {
|
||||||
id := "ticw"
|
id := "ticw"
|
||||||
rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, false, true, nil, map[string]string{"cache-writes": "true"})
|
rootFs, boltDb := runInstance.newCacheFs(t, remoteName, id, false, true, nil, map[string]string{"cache-writes": "true"})
|
||||||
|
|
|
@ -242,6 +242,16 @@ which makes it think we're downloading the full file instead of small chunks.
|
||||||
Organizing the remotes in this order yelds better results:
|
Organizing the remotes in this order yelds better results:
|
||||||
<span style="color:green">**cloud remote** -> **cache** -> **crypt**</span>
|
<span style="color:green">**cloud remote** -> **cache** -> **crypt**</span>
|
||||||
|
|
||||||
|
### Cache and Remote Control (--rc) ###
|
||||||
|
Cache supports the new `--rc` mode in rclone and can be remote controlled through the following end points:
|
||||||
|
By default, the listener is disabled if you do not add the flag.
|
||||||
|
|
||||||
|
### rc cache/expire
|
||||||
|
Purge a remote from the cache backend. Supports either a directory or a file.
|
||||||
|
Params:
|
||||||
|
- **remote** = path to remote **(required)**
|
||||||
|
- **withData** = true/false to delete cached data (chunks) as well _(optional, false by default)_
|
||||||
|
|
||||||
### Specific options ###
|
### Specific options ###
|
||||||
|
|
||||||
Here are the command line options specific to this cloud storage
|
Here are the command line options specific to this cloud storage
|
||||||
|
|
Loading…
Reference in a new issue