forked from TrueCloudLab/rclone
cache: add cache/fetch rc function
This commit is contained in:
parent
3a0b3b0f6e
commit
cdbe3691b7
1 changed files with 189 additions and 14 deletions
203
backend/cache/cache.go
vendored
203
backend/cache/cache.go
vendored
|
@ -6,10 +6,12 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"path"
|
"path"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
@ -455,6 +457,39 @@ Eg
|
||||||
Title: "Get cache stats",
|
Title: "Get cache stats",
|
||||||
Help: `
|
Help: `
|
||||||
Show statistics for the cache remote.
|
Show statistics for the cache remote.
|
||||||
|
`,
|
||||||
|
})
|
||||||
|
|
||||||
|
rc.Add(rc.Call{
|
||||||
|
Path: "cache/fetch",
|
||||||
|
Fn: f.rcFetch,
|
||||||
|
Title: "Fetch file chunks",
|
||||||
|
Help: `
|
||||||
|
Ensure the specified file chunks are cached on disk.
|
||||||
|
|
||||||
|
The chunks= parameter specifies the file chunks to check.
|
||||||
|
It takes a comma separated list of array slice indices.
|
||||||
|
The slice indices are similar to Python slices: start[:end]
|
||||||
|
|
||||||
|
start is the 0 based chunk number from the beginning of the file
|
||||||
|
to fetch inclusive. end is 0 based chunk number from the beginning
|
||||||
|
of the file to fetch exclisive.
|
||||||
|
Both values can be negative, in which case they count from the back
|
||||||
|
of the file. The value "-5:" represents the last 5 chunks of a file.
|
||||||
|
|
||||||
|
Some valid examples are:
|
||||||
|
":5,-5:" -> the first and last five chunks
|
||||||
|
"0,-2" -> the first and the second last chunk
|
||||||
|
"0:10" -> the first ten chunks
|
||||||
|
|
||||||
|
Any parameter with a key that starts with "file" can be used to
|
||||||
|
specify files to fetch, eg
|
||||||
|
|
||||||
|
rclone rc cache/fetch chunks=0 file=hello file2=home/goodbye
|
||||||
|
|
||||||
|
File names will automatically be encrypted when the a crypt remote
|
||||||
|
is used on top of the cache.
|
||||||
|
|
||||||
`,
|
`,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -472,6 +507,22 @@ func (f *Fs) httpStats(in rc.Params) (out rc.Params, err error) {
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Fs) unwrapRemote(remote string) string {
|
||||||
|
remote = cleanPath(remote)
|
||||||
|
if remote != "" {
|
||||||
|
// if it's wrapped by crypt we need to check what format we got
|
||||||
|
if cryptFs, yes := f.isWrappedByCrypt(); yes {
|
||||||
|
_, err := cryptFs.DecryptFileName(remote)
|
||||||
|
// if it failed to decrypt then it is a decrypted format and we need to encrypt it
|
||||||
|
if err != nil {
|
||||||
|
return cryptFs.EncryptFileName(remote)
|
||||||
|
}
|
||||||
|
// else it's an encrypted format and we can use it as it is
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return remote
|
||||||
|
}
|
||||||
|
|
||||||
func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) {
|
func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) {
|
||||||
out = make(rc.Params)
|
out = make(rc.Params)
|
||||||
remoteInt, ok := in["remote"]
|
remoteInt, ok := in["remote"]
|
||||||
|
@ -485,20 +536,9 @@ func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) {
|
||||||
withData = true
|
withData = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if cleanPath(remote) != "" {
|
remote = f.unwrapRemote(remote)
|
||||||
// if it's wrapped by crypt we need to check what format we got
|
if !f.cache.HasEntry(path.Join(f.Root(), remote)) {
|
||||||
if cryptFs, yes := f.isWrappedByCrypt(); yes {
|
return out, errors.Errorf("%s doesn't exist in cache", remote)
|
||||||
_, err := cryptFs.DecryptFileName(remote)
|
|
||||||
// if it failed to decrypt then it is a decrypted format and we need to encrypt it
|
|
||||||
if err != nil {
|
|
||||||
remote = cryptFs.EncryptFileName(remote)
|
|
||||||
}
|
|
||||||
// else it's an encrypted format and we can use it as it is
|
|
||||||
}
|
|
||||||
|
|
||||||
if !f.cache.HasEntry(path.Join(f.Root(), remote)) {
|
|
||||||
return out, errors.Errorf("%s doesn't exist in cache", remote)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
co := NewObject(f, remote)
|
co := NewObject(f, remote)
|
||||||
|
@ -528,6 +568,141 @@ func (f *Fs) httpExpireRemote(in rc.Params) (out rc.Params, err error) {
|
||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *Fs) rcFetch(in rc.Params) (rc.Params, error) {
|
||||||
|
type chunkRange struct {
|
||||||
|
start, end int64
|
||||||
|
}
|
||||||
|
parseChunks := func(ranges string) (crs []chunkRange, err error) {
|
||||||
|
for _, part := range strings.Split(ranges, ",") {
|
||||||
|
var start, end int64 = 0, math.MaxInt64
|
||||||
|
switch ints := strings.Split(part, ":"); len(ints) {
|
||||||
|
case 1:
|
||||||
|
start, err = strconv.ParseInt(ints[0], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Errorf("invalid range: %q", part)
|
||||||
|
}
|
||||||
|
end = start + 1
|
||||||
|
case 2:
|
||||||
|
if ints[0] != "" {
|
||||||
|
start, err = strconv.ParseInt(ints[0], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Errorf("invalid range: %q", part)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if ints[1] != "" {
|
||||||
|
end, err = strconv.ParseInt(ints[1], 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Errorf("invalid range: %q", part)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil, errors.Errorf("invalid range: %q", part)
|
||||||
|
}
|
||||||
|
crs = append(crs, chunkRange{start: start, end: end})
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
walkChunkRange := func(cr chunkRange, size int64, cb func(chunk int64)) {
|
||||||
|
if size <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
chunks := (size-1)/f.ChunkSize() + 1
|
||||||
|
|
||||||
|
start, end := cr.start, cr.end
|
||||||
|
if start < 0 {
|
||||||
|
start += chunks
|
||||||
|
}
|
||||||
|
if end <= 0 {
|
||||||
|
end += chunks
|
||||||
|
}
|
||||||
|
if end <= start {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case start < 0:
|
||||||
|
start = 0
|
||||||
|
case start >= chunks:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case end <= start:
|
||||||
|
end = start + 1
|
||||||
|
case end >= chunks:
|
||||||
|
end = chunks
|
||||||
|
}
|
||||||
|
for i := start; i < end; i++ {
|
||||||
|
cb(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
walkChunkRanges := func(crs []chunkRange, size int64, cb func(chunk int64)) {
|
||||||
|
for _, cr := range crs {
|
||||||
|
walkChunkRange(cr, size, cb)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
v, ok := in["chunks"]
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("missing chunks parameter")
|
||||||
|
}
|
||||||
|
s, ok := v.(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("invalid chunks parameter")
|
||||||
|
}
|
||||||
|
delete(in, "chunks")
|
||||||
|
crs, err := parseChunks(s)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "invalid chunks parameter")
|
||||||
|
}
|
||||||
|
var files [][2]string
|
||||||
|
for k, v := range in {
|
||||||
|
if !strings.HasPrefix(k, "file") {
|
||||||
|
return nil, errors.Errorf("invalid parameter %s=%s", k, v)
|
||||||
|
}
|
||||||
|
switch v := v.(type) {
|
||||||
|
case string:
|
||||||
|
files = append(files, [2]string{v, f.unwrapRemote(v)})
|
||||||
|
default:
|
||||||
|
return nil, errors.Errorf("invalid parameter %s=%s", k, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
type fileStatus struct {
|
||||||
|
Error error
|
||||||
|
FetchedChunks int
|
||||||
|
}
|
||||||
|
fetchedChunks := make(map[string]fileStatus, len(files))
|
||||||
|
for _, pair := range files {
|
||||||
|
file, remote := pair[0], pair[1]
|
||||||
|
var status fileStatus
|
||||||
|
o, err := f.NewObject(remote)
|
||||||
|
if err != nil {
|
||||||
|
status.Error = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
co := o.(*Object)
|
||||||
|
err = co.refreshFromSource(true)
|
||||||
|
if err != nil {
|
||||||
|
status.Error = err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
handle := NewObjectHandle(co, f)
|
||||||
|
handle.UseMemory = false
|
||||||
|
handle.scaleWorkers(1)
|
||||||
|
walkChunkRanges(crs, co.Size(), func(chunk int64) {
|
||||||
|
_, err := handle.getChunk(chunk * f.ChunkSize())
|
||||||
|
if err != nil {
|
||||||
|
if status.Error == nil {
|
||||||
|
status.Error = err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
status.FetchedChunks++
|
||||||
|
}
|
||||||
|
})
|
||||||
|
fetchedChunks[file] = status
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc.Params{"status": fetchedChunks}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files
|
// receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files
|
||||||
func (f *Fs) receiveChangeNotify(forgetPath string, entryType fs.EntryType) {
|
func (f *Fs) receiveChangeNotify(forgetPath string, entryType fs.EntryType) {
|
||||||
if crypt, yes := f.isWrappedByCrypt(); yes {
|
if crypt, yes := f.isWrappedByCrypt(); yes {
|
||||||
|
|
Loading…
Add table
Reference in a new issue