2018-04-06 19:33:51 +00:00
// +build !plan9
2017-11-12 17:54:25 +00:00
package cache
import (
2018-04-06 18:13:27 +00:00
"context"
2017-11-12 17:54:25 +00:00
"fmt"
"io"
2018-08-30 09:09:16 +00:00
"math"
2018-04-06 18:13:27 +00:00
"os"
"os/signal"
2017-11-12 17:54:25 +00:00
"path"
2017-11-20 14:38:28 +00:00
"path/filepath"
2018-08-30 09:09:16 +00:00
"strconv"
2017-11-12 17:54:25 +00:00
"strings"
"sync"
2017-11-22 16:32:36 +00:00
"syscall"
2018-04-06 18:13:27 +00:00
"time"
2017-11-22 16:32:36 +00:00
2018-01-11 16:05:41 +00:00
"github.com/ncw/rclone/backend/crypt"
2017-11-12 17:54:25 +00:00
"github.com/ncw/rclone/fs"
2018-01-12 16:30:54 +00:00
"github.com/ncw/rclone/fs/config"
2018-05-14 17:06:57 +00:00
"github.com/ncw/rclone/fs/config/configmap"
"github.com/ncw/rclone/fs/config/configstruct"
2018-01-18 20:19:55 +00:00
"github.com/ncw/rclone/fs/config/obscure"
2018-01-12 16:30:54 +00:00
"github.com/ncw/rclone/fs/hash"
2018-03-14 19:49:11 +00:00
"github.com/ncw/rclone/fs/rc"
2018-01-12 16:30:54 +00:00
"github.com/ncw/rclone/fs/walk"
2018-01-30 20:35:53 +00:00
"github.com/ncw/rclone/lib/atexit"
2017-11-12 17:54:25 +00:00
"github.com/pkg/errors"
"golang.org/x/time/rate"
)
const (
// DefCacheChunkSize is the default value for chunk size
2018-05-14 17:06:57 +00:00
DefCacheChunkSize = fs . SizeSuffix ( 5 * 1024 * 1024 )
2017-12-09 21:54:26 +00:00
// DefCacheTotalChunkSize is the default value for the maximum size of stored chunks
2018-05-14 17:06:57 +00:00
DefCacheTotalChunkSize = fs . SizeSuffix ( 10 * 1024 * 1024 * 1024 )
2017-12-09 21:54:26 +00:00
// DefCacheChunkCleanInterval is the interval at which chunks are cleaned
2018-05-14 17:06:57 +00:00
DefCacheChunkCleanInterval = fs . Duration ( time . Minute )
2017-11-12 17:54:25 +00:00
// DefCacheInfoAge is the default value for object info age
2018-05-14 17:06:57 +00:00
DefCacheInfoAge = fs . Duration ( 6 * time . Hour )
2017-11-12 17:54:25 +00:00
// DefCacheReadRetries is the default value for read retries
2017-12-09 21:54:26 +00:00
DefCacheReadRetries = 10
2017-11-12 17:54:25 +00:00
// DefCacheTotalWorkers is how many workers run in parallel to download chunks
DefCacheTotalWorkers = 4
// DefCacheChunkNoMemory will enable or disable in-memory storage for chunks
DefCacheChunkNoMemory = false
// DefCacheRps limits the number of requests per second to the source FS
DefCacheRps = - 1
// DefCacheWrites will cache file data on writes through the cache
DefCacheWrites = false
2018-01-29 22:05:04 +00:00
// DefCacheTmpWaitTime says how long should files be stored in local cache before being uploaded
2018-05-14 17:06:57 +00:00
DefCacheTmpWaitTime = fs . Duration ( 15 * time . Second )
2018-03-08 12:16:18 +00:00
// DefCacheDbWaitTime defines how long the cache backend should wait for the DB to be available
2018-05-14 17:06:57 +00:00
DefCacheDbWaitTime = fs . Duration ( 1 * time . Second )
2017-11-12 17:54:25 +00:00
)
// Register with Fs
func init ( ) {
fs . Register ( & fs . RegInfo {
Name : "cache" ,
Description : "Cache a remote" ,
NewFs : NewFs ,
Options : [ ] fs . Option { {
2018-05-14 17:06:57 +00:00
Name : "remote" ,
Help : "Remote to cache.\nNormally should contain a ':' and a path, eg \"myremote:path/to/dir\",\n\"myremote:bucket\" or maybe \"myremote:\" (not recommended)." ,
Required : true ,
2017-12-09 21:54:26 +00:00
} , {
2018-05-14 17:06:57 +00:00
Name : "plex_url" ,
Help : "The URL of the Plex server" ,
2017-12-09 21:54:26 +00:00
} , {
2018-05-14 17:06:57 +00:00
Name : "plex_username" ,
Help : "The username of the Plex user" ,
2017-12-09 21:54:26 +00:00
} , {
Name : "plex_password" ,
2018-05-14 17:06:57 +00:00
Help : "The password of the Plex user" ,
2017-12-09 21:54:26 +00:00
IsPassword : true ,
2017-11-12 17:54:25 +00:00
} , {
2018-05-14 17:06:57 +00:00
Name : "plex_token" ,
Help : "The plex token for authentication - auto set normally" ,
Hide : fs . OptionHideBoth ,
Advanced : true ,
} , {
Name : "chunk_size" ,
Help : "The size of a chunk. Lower value good for slow connections but can affect seamless reading." ,
Default : DefCacheChunkSize ,
Examples : [ ] fs . OptionExample { {
Value : "1m" ,
Help : "1MB" ,
} , {
Value : "5M" ,
Help : "5 MB" ,
} , {
Value : "10M" ,
Help : "10 MB" ,
} } ,
} , {
Name : "info_age" ,
Help : "How much time should object info (file size, file hashes etc) be stored in cache.\nUse a very high value if you don't plan on changing the source FS from outside the cache.\nAccepted units are: \"s\", \"m\", \"h\"." ,
Default : DefCacheInfoAge ,
Examples : [ ] fs . OptionExample { {
Value : "1h" ,
Help : "1 hour" ,
} , {
Value : "24h" ,
Help : "24 hours" ,
} , {
Value : "48h" ,
Help : "48 hours" ,
} } ,
} , {
Name : "chunk_total_size" ,
Help : "The maximum size of stored chunks. When the storage grows beyond this size, the oldest chunks will be deleted." ,
Default : DefCacheTotalChunkSize ,
Examples : [ ] fs . OptionExample { {
Value : "500M" ,
Help : "500 MB" ,
} , {
Value : "1G" ,
Help : "1 GB" ,
} , {
Value : "10G" ,
Help : "10 GB" ,
} } ,
} , {
Name : "db_path" ,
Default : filepath . Join ( config . CacheDir , "cache-backend" ) ,
Help : "Directory to cache DB" ,
Advanced : true ,
} , {
Name : "chunk_path" ,
Default : filepath . Join ( config . CacheDir , "cache-backend" ) ,
Help : "Directory to cache chunk files" ,
Advanced : true ,
} , {
Name : "db_purge" ,
Default : false ,
Help : "Purge the cache DB before" ,
Hide : fs . OptionHideConfigurator ,
Advanced : true ,
} , {
Name : "chunk_clean_interval" ,
Default : DefCacheChunkCleanInterval ,
Help : "Interval at which chunk cleanup runs" ,
Advanced : true ,
2017-11-12 17:54:25 +00:00
} , {
2018-05-14 17:06:57 +00:00
Name : "read_retries" ,
Default : DefCacheReadRetries ,
Help : "How many times to retry a read from a cache storage" ,
Advanced : true ,
2017-11-12 17:54:25 +00:00
} , {
2018-05-14 17:06:57 +00:00
Name : "workers" ,
Default : DefCacheTotalWorkers ,
Help : "How many workers should run in parallel to download chunks" ,
Advanced : true ,
} , {
Name : "chunk_no_memory" ,
Default : DefCacheChunkNoMemory ,
Help : "Disable the in-memory cache for storing chunks during streaming" ,
Advanced : true ,
} , {
Name : "rps" ,
Default : int ( DefCacheRps ) ,
Help : "Limits the number of requests per second to the source FS. -1 disables the rate limiter" ,
Advanced : true ,
} , {
Name : "writes" ,
Default : DefCacheWrites ,
Help : "Will cache file data on writes through the FS" ,
Advanced : true ,
} , {
Name : "tmp_upload_path" ,
Default : "" ,
Help : "Directory to keep temporary files until they are uploaded to the cloud storage" ,
Advanced : true ,
} , {
Name : "tmp_wait_time" ,
Default : DefCacheTmpWaitTime ,
Help : "How long should files be stored in local cache before being uploaded" ,
Advanced : true ,
} , {
Name : "db_wait_time" ,
Default : DefCacheDbWaitTime ,
Help : "How long to wait for the DB to be available - 0 is unlimited" ,
Advanced : true ,
2017-11-12 17:54:25 +00:00
} } ,
} )
}
2018-05-14 17:06:57 +00:00
// Options defines the configuration for this backend
type Options struct {
Remote string ` config:"remote" `
PlexURL string ` config:"plex_url" `
PlexUsername string ` config:"plex_username" `
PlexPassword string ` config:"plex_password" `
PlexToken string ` config:"plex_token" `
ChunkSize fs . SizeSuffix ` config:"chunk_size" `
InfoAge fs . Duration ` config:"info_age" `
ChunkTotalSize fs . SizeSuffix ` config:"chunk_total_size" `
DbPath string ` config:"db_path" `
ChunkPath string ` config:"chunk_path" `
DbPurge bool ` config:"db_purge" `
ChunkCleanInterval fs . Duration ` config:"chunk_clean_interval" `
ReadRetries int ` config:"read_retries" `
TotalWorkers int ` config:"workers" `
ChunkNoMemory bool ` config:"chunk_no_memory" `
Rps int ` config:"rps" `
StoreWrites bool ` config:"writes" `
TempWritePath string ` config:"tmp_upload_path" `
TempWaitTime fs . Duration ` config:"tmp_wait_time" `
DbWaitTime fs . Duration ` config:"db_wait_time" `
}
2017-11-12 17:54:25 +00:00
// Fs represents a wrapped fs.Fs
type Fs struct {
fs . Fs
2017-12-06 15:14:34 +00:00
wrapper fs . Fs
2017-11-12 17:54:25 +00:00
name string
root string
2018-05-14 17:06:57 +00:00
opt Options // parsed options
2017-11-12 17:54:25 +00:00
features * fs . Features // optional features
2018-01-29 22:05:04 +00:00
cache * Persistent
2018-05-14 17:06:57 +00:00
tempFs fs . Fs
2017-12-09 21:54:26 +00:00
lastChunkCleanup time . Time
cleanupMu sync . Mutex
rateLimiter * rate . Limiter
plexConnector * plexConnector
2018-01-29 22:05:04 +00:00
backgroundRunner * backgroundWriter
cleanupChan chan bool
2018-03-08 20:03:34 +00:00
parentsForgetFn [ ] func ( string , fs . EntryType )
notifiedRemotes map [ string ] bool
notifiedMu sync . Mutex
parentsForgetMu sync . Mutex
2017-11-12 17:54:25 +00:00
}
2018-01-30 13:35:40 +00:00
// parseRootPath returns a cleaned root path and a nil error or "" and an error when the path is invalid
func parseRootPath ( path string ) ( string , error ) {
return strings . Trim ( path , "/" ) , nil
}
2018-01-29 22:05:04 +00:00
// NewFs constructs a Fs from the path, container:path
2018-05-14 17:06:57 +00:00
func NewFs ( name , rootPath string , m configmap . Mapper ) ( fs . Fs , error ) {
// Parse config into Options struct
opt := new ( Options )
err := configstruct . Set ( m , opt )
if err != nil {
return nil , err
}
if opt . ChunkTotalSize < opt . ChunkSize * fs . SizeSuffix ( opt . TotalWorkers ) {
return nil , errors . Errorf ( "don't set cache-total-chunk-size(%v) less than cache-chunk-size(%v) * cache-workers(%v)" ,
opt . ChunkTotalSize , opt . ChunkSize , opt . TotalWorkers )
}
if strings . HasPrefix ( opt . Remote , name + ":" ) {
2017-11-12 17:54:25 +00:00
return nil , errors . New ( "can't point cache remote at itself - check the value of the remote setting" )
}
2018-01-30 13:35:40 +00:00
rpath , err := parseRootPath ( rootPath )
if err != nil {
return nil , errors . Wrapf ( err , "failed to clean root path %q" , rootPath )
}
2018-05-14 17:06:57 +00:00
remotePath := path . Join ( opt . Remote , rpath )
2017-11-12 17:54:25 +00:00
wrappedFs , wrapErr := fs . NewFs ( remotePath )
2018-01-29 22:05:04 +00:00
if wrapErr != nil && wrapErr != fs . ErrorIsFile {
2017-11-12 17:54:25 +00:00
return nil , errors . Wrapf ( wrapErr , "failed to make remote %q to wrap" , remotePath )
}
2018-01-29 22:05:04 +00:00
var fsErr error
2017-11-12 17:54:25 +00:00
fs . Debugf ( name , "wrapped %v:%v at root %v" , wrappedFs . Name ( ) , wrappedFs . Root ( ) , rpath )
2018-01-29 22:05:04 +00:00
if wrapErr == fs . ErrorIsFile {
fsErr = fs . ErrorIsFile
rpath = cleanPath ( path . Dir ( rpath ) )
}
2017-11-12 17:54:25 +00:00
// configure cache backend
2018-05-14 17:06:57 +00:00
if opt . DbPurge {
2017-11-12 17:54:25 +00:00
fs . Debugf ( name , "Purging the DB" )
}
f := & Fs {
2018-05-14 17:06:57 +00:00
Fs : wrappedFs ,
name : name ,
root : rpath ,
opt : * opt ,
lastChunkCleanup : time . Now ( ) . Truncate ( time . Hour * 24 * 30 ) ,
cleanupChan : make ( chan bool , 1 ) ,
notifiedRemotes : make ( map [ string ] bool ) ,
2017-11-12 17:54:25 +00:00
}
2018-05-14 17:06:57 +00:00
f . rateLimiter = rate . NewLimiter ( rate . Limit ( float64 ( opt . Rps ) ) , opt . TotalWorkers )
2017-11-12 17:54:25 +00:00
2017-12-09 21:54:26 +00:00
f . plexConnector = & plexConnector { }
2018-05-14 17:06:57 +00:00
if opt . PlexURL != "" {
if opt . PlexToken != "" {
f . plexConnector , err = newPlexConnectorWithToken ( f , opt . PlexURL , opt . PlexToken )
2017-12-09 21:54:26 +00:00
if err != nil {
2018-05-14 17:06:57 +00:00
return nil , errors . Wrapf ( err , "failed to connect to the Plex API %v" , opt . PlexURL )
2017-12-09 21:54:26 +00:00
}
} else {
2018-05-14 17:06:57 +00:00
if opt . PlexPassword != "" && opt . PlexUsername != "" {
decPass , err := obscure . Reveal ( opt . PlexPassword )
2017-12-09 21:54:26 +00:00
if err != nil {
2018-05-14 17:06:57 +00:00
decPass = opt . PlexPassword
2017-12-09 21:54:26 +00:00
}
2018-05-14 17:06:57 +00:00
f . plexConnector , err = newPlexConnector ( f , opt . PlexURL , opt . PlexUsername , decPass , func ( token string ) {
m . Set ( "plex_token" , token )
} )
2017-12-09 21:54:26 +00:00
if err != nil {
2018-05-14 17:06:57 +00:00
return nil , errors . Wrapf ( err , "failed to connect to the Plex API %v" , opt . PlexURL )
2017-12-09 21:54:26 +00:00
}
}
}
}
2018-05-14 17:06:57 +00:00
dbPath := f . opt . DbPath
chunkPath := f . opt . ChunkPath
2017-12-20 20:43:30 +00:00
// if the dbPath is non default but the chunk path is default, we overwrite the last to follow the same one as dbPath
2018-01-12 16:30:54 +00:00
if dbPath != filepath . Join ( config . CacheDir , "cache-backend" ) &&
chunkPath == filepath . Join ( config . CacheDir , "cache-backend" ) {
2017-12-20 20:43:30 +00:00
chunkPath = dbPath
}
2017-11-20 14:38:28 +00:00
if filepath . Ext ( dbPath ) != "" {
dbPath = filepath . Dir ( dbPath )
2017-11-12 17:54:25 +00:00
}
2017-12-20 20:43:30 +00:00
if filepath . Ext ( chunkPath ) != "" {
chunkPath = filepath . Dir ( chunkPath )
}
2017-11-12 17:54:25 +00:00
err = os . MkdirAll ( dbPath , os . ModePerm )
if err != nil {
return nil , errors . Wrapf ( err , "failed to create cache directory %v" , dbPath )
}
2017-12-20 20:43:30 +00:00
err = os . MkdirAll ( chunkPath , os . ModePerm )
if err != nil {
return nil , errors . Wrapf ( err , "failed to create cache directory %v" , chunkPath )
}
2017-11-12 17:54:25 +00:00
2017-11-20 14:38:28 +00:00
dbPath = filepath . Join ( dbPath , name + ".db" )
2017-12-20 20:43:30 +00:00
chunkPath = filepath . Join ( chunkPath , name )
fs . Infof ( name , "Cache DB path: %v" , dbPath )
fs . Infof ( name , "Cache chunk path: %v" , chunkPath )
f . cache , err = GetPersistent ( dbPath , chunkPath , & Features {
2018-05-14 17:06:57 +00:00
PurgeDb : opt . DbPurge ,
DbWaitTime : time . Duration ( opt . DbWaitTime ) ,
2017-12-09 21:54:26 +00:00
} )
2017-11-12 17:54:25 +00:00
if err != nil {
2017-11-30 10:27:59 +00:00
return nil , errors . Wrapf ( err , "failed to start cache db" )
2017-11-12 17:54:25 +00:00
}
2017-11-22 16:32:36 +00:00
// Trap SIGINT and SIGTERM to close the DB handle gracefully
c := make ( chan os . Signal , 1 )
2018-01-30 20:35:53 +00:00
signal . Notify ( c , syscall . SIGHUP )
atexit . Register ( func ( ) {
2018-05-14 17:06:57 +00:00
if opt . PlexURL != "" {
2018-03-22 19:20:34 +00:00
f . plexConnector . closeWebsocket ( )
}
2018-01-30 20:35:53 +00:00
f . StopBackgroundRunners ( )
} )
2017-11-22 16:32:36 +00:00
go func ( ) {
2017-12-19 13:48:48 +00:00
for {
s := <- c
2018-01-30 20:35:53 +00:00
if s == syscall . SIGHUP {
2017-12-19 13:48:48 +00:00
fs . Infof ( f , "Clearing cache from signal" )
f . DirCacheFlush ( )
}
2017-11-22 16:32:36 +00:00
}
} ( )
2017-11-12 17:54:25 +00:00
2018-05-14 17:06:57 +00:00
fs . Infof ( name , "Chunk Memory: %v" , ! f . opt . ChunkNoMemory )
fs . Infof ( name , "Chunk Size: %v" , f . opt . ChunkSize )
fs . Infof ( name , "Chunk Total Size: %v" , f . opt . ChunkTotalSize )
fs . Infof ( name , "Chunk Clean Interval: %v" , f . opt . ChunkCleanInterval )
fs . Infof ( name , "Workers: %v" , f . opt . TotalWorkers )
fs . Infof ( name , "File Age: %v" , f . opt . InfoAge )
if ! f . opt . StoreWrites {
2018-01-29 22:05:04 +00:00
fs . Infof ( name , "Cache Writes: enabled" )
}
2017-11-12 17:54:25 +00:00
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath != "" {
err = os . MkdirAll ( f . opt . TempWritePath , os . ModePerm )
2018-01-29 22:05:04 +00:00
if err != nil {
2018-05-14 17:06:57 +00:00
return nil , errors . Wrapf ( err , "failed to create cache directory %v" , f . opt . TempWritePath )
2018-01-29 22:05:04 +00:00
}
2018-05-14 17:06:57 +00:00
f . opt . TempWritePath = filepath . ToSlash ( f . opt . TempWritePath )
f . tempFs , err = fs . NewFs ( f . opt . TempWritePath )
2018-01-29 22:05:04 +00:00
if err != nil {
return nil , errors . Wrapf ( err , "failed to create temp fs: %v" , err )
}
2018-05-14 17:06:57 +00:00
fs . Infof ( name , "Upload Temp Rest Time: %v" , f . opt . TempWaitTime )
fs . Infof ( name , "Upload Temp FS: %v" , f . opt . TempWritePath )
2018-01-29 22:05:04 +00:00
f . backgroundRunner , _ = initBackgroundUploader ( f )
go f . backgroundRunner . run ( )
}
go func ( ) {
for {
2018-05-14 17:06:57 +00:00
time . Sleep ( time . Duration ( f . opt . ChunkCleanInterval ) )
2018-01-29 22:05:04 +00:00
select {
case <- f . cleanupChan :
fs . Infof ( f , "stopping cleanup" )
return
default :
fs . Debugf ( f , "starting cleanup" )
f . CleanUpCache ( false )
}
}
} ( )
2017-11-12 17:54:25 +00:00
2018-03-08 20:03:34 +00:00
if doChangeNotify := wrappedFs . Features ( ) . ChangeNotify ; doChangeNotify != nil {
2018-05-14 17:06:57 +00:00
doChangeNotify ( f . receiveChangeNotify , time . Duration ( f . opt . ChunkCleanInterval ) )
2018-02-10 20:01:05 +00:00
}
2017-11-12 17:54:25 +00:00
f . features = ( & fs . Features {
CanHaveEmptyDirectories : true ,
DuplicateFiles : false , // storage doesn't permit this
2017-12-06 15:14:34 +00:00
} ) . Fill ( f ) . Mask ( wrappedFs ) . WrapsFs ( f , wrappedFs )
2018-01-29 22:05:04 +00:00
// override only those features that use a temp fs and it doesn't support them
2018-03-08 20:03:34 +00:00
//f.features.ChangeNotify = f.ChangeNotify
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath != "" {
2018-01-29 22:05:04 +00:00
if f . tempFs . Features ( ) . Copy == nil {
f . features . Copy = nil
}
if f . tempFs . Features ( ) . Move == nil {
f . features . Move = nil
}
if f . tempFs . Features ( ) . Move == nil {
f . features . Move = nil
}
if f . tempFs . Features ( ) . DirMove == nil {
f . features . DirMove = nil
}
if f . tempFs . Features ( ) . MergeDirs == nil {
f . features . MergeDirs = nil
}
}
// even if the wrapped fs doesn't support it, we still want it
2017-12-18 12:55:37 +00:00
f . features . DirCacheFlush = f . DirCacheFlush
2017-11-12 17:54:25 +00:00
2018-03-14 19:49:11 +00:00
rc . Add ( rc . Call {
Path : "cache/expire" ,
Fn : f . httpExpireRemote ,
Title : "Purge a remote from cache" ,
Help : `
Purge a remote from the cache backend . Supports either a directory or a file .
Params :
- remote = path to remote ( required )
- withData = true / false to delete cached data ( chunks ) as well ( optional )
2018-04-23 19:44:44 +00:00
Eg
rclone rc cache / expire remote = path / to / sub / folder /
rclone rc cache / expire remote = / withData = true
2018-03-14 19:49:11 +00:00
` ,
} )
2018-03-21 23:11:20 +00:00
rc . Add ( rc . Call {
Path : "cache/stats" ,
Fn : f . httpStats ,
Title : "Get cache stats" ,
Help : `
Show statistics for the cache remote .
2018-08-30 09:09:16 +00:00
` ,
} )
rc . Add ( rc . Call {
Path : "cache/fetch" ,
Fn : f . rcFetch ,
Title : "Fetch file chunks" ,
Help : `
Ensure the specified file chunks are cached on disk .
The chunks = parameter specifies the file chunks to check .
It takes a comma separated list of array slice indices .
The slice indices are similar to Python slices : start [ : end ]
start is the 0 based chunk number from the beginning of the file
to fetch inclusive . end is 0 based chunk number from the beginning
of the file to fetch exclisive .
Both values can be negative , in which case they count from the back
of the file . The value "-5:" represents the last 5 chunks of a file .
Some valid examples are :
":5,-5:" - > the first and last five chunks
"0,-2" - > the first and the second last chunk
"0:10" - > the first ten chunks
Any parameter with a key that starts with "file" can be used to
specify files to fetch , eg
rclone rc cache / fetch chunks = 0 file = hello file2 = home / goodbye
File names will automatically be encrypted when the a crypt remote
is used on top of the cache .
2018-03-21 23:11:20 +00:00
` ,
} )
2018-01-29 22:05:04 +00:00
return f , fsErr
2017-11-12 17:54:25 +00:00
}
2018-03-21 23:11:20 +00:00
func ( f * Fs ) httpStats ( in rc . Params ) ( out rc . Params , err error ) {
out = make ( rc . Params )
m , err := f . Stats ( )
if err != nil {
return out , errors . Errorf ( "error while getting cache stats" )
}
out [ "status" ] = "ok"
out [ "stats" ] = m
return out , nil
}
2018-08-30 09:09:16 +00:00
func ( f * Fs ) unwrapRemote ( remote string ) string {
remote = cleanPath ( remote )
if remote != "" {
// if it's wrapped by crypt we need to check what format we got
if cryptFs , yes := f . isWrappedByCrypt ( ) ; yes {
_ , err := cryptFs . DecryptFileName ( remote )
// if it failed to decrypt then it is a decrypted format and we need to encrypt it
if err != nil {
return cryptFs . EncryptFileName ( remote )
}
// else it's an encrypted format and we can use it as it is
}
}
return remote
}
2018-03-14 19:49:11 +00:00
func ( f * Fs ) httpExpireRemote ( in rc . Params ) ( out rc . Params , err error ) {
out = make ( rc . Params )
remoteInt , ok := in [ "remote" ]
if ! ok {
return out , errors . Errorf ( "remote is needed" )
}
remote := remoteInt . ( string )
withData := false
_ , ok = in [ "withData" ]
if ok {
withData = true
}
2018-08-30 09:09:16 +00:00
remote = f . unwrapRemote ( remote )
if ! f . cache . HasEntry ( path . Join ( f . Root ( ) , remote ) ) {
return out , errors . Errorf ( "%s doesn't exist in cache" , remote )
2018-03-14 19:49:11 +00:00
}
co := NewObject ( f , remote )
err = f . cache . GetObject ( co )
if err != nil { // it could be a dir
cd := NewDirectory ( f , remote )
err := f . cache . ExpireDir ( cd )
if err != nil {
return out , errors . WithMessage ( err , "error expiring directory" )
}
2018-03-15 09:05:45 +00:00
// notify vfs too
f . notifyChangeUpstream ( cd . Remote ( ) , fs . EntryDirectory )
2018-03-14 19:49:11 +00:00
out [ "status" ] = "ok"
out [ "message" ] = fmt . Sprintf ( "cached directory cleared: %v" , remote )
return out , nil
}
// expire the entry
2018-04-03 19:46:00 +00:00
err = f . cache . ExpireObject ( co , withData )
2018-03-14 19:49:11 +00:00
if err != nil {
return out , errors . WithMessage ( err , "error expiring file" )
}
2018-03-15 09:05:45 +00:00
// notify vfs too
f . notifyChangeUpstream ( co . Remote ( ) , fs . EntryObject )
2018-03-14 19:49:11 +00:00
out [ "status" ] = "ok"
out [ "message" ] = fmt . Sprintf ( "cached file cleared: %v" , remote )
return out , nil
}
2018-08-30 09:09:16 +00:00
func ( f * Fs ) rcFetch ( in rc . Params ) ( rc . Params , error ) {
type chunkRange struct {
start , end int64
}
parseChunks := func ( ranges string ) ( crs [ ] chunkRange , err error ) {
for _ , part := range strings . Split ( ranges , "," ) {
var start , end int64 = 0 , math . MaxInt64
switch ints := strings . Split ( part , ":" ) ; len ( ints ) {
case 1 :
start , err = strconv . ParseInt ( ints [ 0 ] , 10 , 64 )
if err != nil {
return nil , errors . Errorf ( "invalid range: %q" , part )
}
end = start + 1
case 2 :
if ints [ 0 ] != "" {
start , err = strconv . ParseInt ( ints [ 0 ] , 10 , 64 )
if err != nil {
return nil , errors . Errorf ( "invalid range: %q" , part )
}
}
if ints [ 1 ] != "" {
end , err = strconv . ParseInt ( ints [ 1 ] , 10 , 64 )
if err != nil {
return nil , errors . Errorf ( "invalid range: %q" , part )
}
}
default :
return nil , errors . Errorf ( "invalid range: %q" , part )
}
crs = append ( crs , chunkRange { start : start , end : end } )
}
return
}
walkChunkRange := func ( cr chunkRange , size int64 , cb func ( chunk int64 ) ) {
if size <= 0 {
return
}
chunks := ( size - 1 ) / f . ChunkSize ( ) + 1
start , end := cr . start , cr . end
if start < 0 {
start += chunks
}
if end <= 0 {
end += chunks
}
if end <= start {
return
}
switch {
case start < 0 :
start = 0
case start >= chunks :
return
}
switch {
case end <= start :
end = start + 1
case end >= chunks :
end = chunks
}
for i := start ; i < end ; i ++ {
cb ( i )
}
}
walkChunkRanges := func ( crs [ ] chunkRange , size int64 , cb func ( chunk int64 ) ) {
for _ , cr := range crs {
walkChunkRange ( cr , size , cb )
}
}
v , ok := in [ "chunks" ]
if ! ok {
return nil , errors . New ( "missing chunks parameter" )
}
s , ok := v . ( string )
if ! ok {
return nil , errors . New ( "invalid chunks parameter" )
}
delete ( in , "chunks" )
crs , err := parseChunks ( s )
if err != nil {
return nil , errors . Wrap ( err , "invalid chunks parameter" )
}
var files [ ] [ 2 ] string
for k , v := range in {
if ! strings . HasPrefix ( k , "file" ) {
return nil , errors . Errorf ( "invalid parameter %s=%s" , k , v )
}
switch v := v . ( type ) {
case string :
files = append ( files , [ 2 ] string { v , f . unwrapRemote ( v ) } )
default :
return nil , errors . Errorf ( "invalid parameter %s=%s" , k , v )
}
}
type fileStatus struct {
2018-09-03 15:07:07 +00:00
Error string
2018-08-30 09:09:16 +00:00
FetchedChunks int
}
fetchedChunks := make ( map [ string ] fileStatus , len ( files ) )
for _ , pair := range files {
file , remote := pair [ 0 ] , pair [ 1 ]
var status fileStatus
o , err := f . NewObject ( remote )
if err != nil {
2018-09-03 15:07:07 +00:00
fetchedChunks [ file ] = fileStatus { Error : err . Error ( ) }
2018-08-30 09:09:16 +00:00
continue
}
co := o . ( * Object )
err = co . refreshFromSource ( true )
if err != nil {
2018-09-03 15:07:07 +00:00
fetchedChunks [ file ] = fileStatus { Error : err . Error ( ) }
2018-08-30 09:09:16 +00:00
continue
}
handle := NewObjectHandle ( co , f )
handle . UseMemory = false
handle . scaleWorkers ( 1 )
walkChunkRanges ( crs , co . Size ( ) , func ( chunk int64 ) {
_ , err := handle . getChunk ( chunk * f . ChunkSize ( ) )
if err != nil {
2018-09-03 15:07:07 +00:00
if status . Error == "" {
status . Error = err . Error ( )
2018-08-30 09:09:16 +00:00
}
} else {
status . FetchedChunks ++
}
} )
fetchedChunks [ file ] = status
}
return rc . Params { "status" : fetchedChunks } , nil
}
2018-03-08 20:03:34 +00:00
// receiveChangeNotify is a wrapper to notifications sent from the wrapped FS about changed files
func ( f * Fs ) receiveChangeNotify ( forgetPath string , entryType fs . EntryType ) {
2018-03-23 20:41:01 +00:00
if crypt , yes := f . isWrappedByCrypt ( ) ; yes {
decryptedPath , err := crypt . DecryptFileName ( forgetPath )
if err == nil {
fs . Infof ( decryptedPath , "received cache expiry notification" )
} else {
fs . Infof ( forgetPath , "received cache expiry notification" )
}
} else {
fs . Infof ( forgetPath , "received cache expiry notification" )
}
2018-02-10 20:01:05 +00:00
// notify upstreams too (vfs)
2018-03-08 20:03:34 +00:00
f . notifyChangeUpstream ( forgetPath , entryType )
2018-02-10 20:01:05 +00:00
var cd * Directory
2018-03-08 20:03:34 +00:00
if entryType == fs . EntryObject {
co := NewObject ( f , forgetPath )
err := f . cache . GetObject ( co )
2018-06-13 20:57:26 +00:00
if err != nil {
fs . Debugf ( f , "got change notification for non cached entry %v" , co )
}
err = f . cache . ExpireObject ( co , true )
if err != nil {
fs . Debugf ( forgetPath , "notify: error expiring '%v': %v" , co , err )
2018-03-08 20:03:34 +00:00
}
2018-02-10 20:01:05 +00:00
cd = NewDirectory ( f , cleanPath ( path . Dir ( co . Remote ( ) ) ) )
} else {
cd = NewDirectory ( f , forgetPath )
2018-03-31 09:44:09 +00:00
}
// we expire the dir
err := f . cache . ExpireDir ( cd )
if err != nil {
fs . Debugf ( forgetPath , "notify: error expiring '%v': %v" , cd , err )
} else {
fs . Debugf ( forgetPath , "notify: expired '%v'" , cd )
2018-02-10 20:01:05 +00:00
}
2018-03-08 20:03:34 +00:00
f . notifiedMu . Lock ( )
defer f . notifiedMu . Unlock ( )
f . notifiedRemotes [ forgetPath ] = true
f . notifiedRemotes [ cd . Remote ( ) ] = true
2018-02-10 20:01:05 +00:00
}
2018-03-08 20:03:34 +00:00
// notifyChangeUpstreamIfNeeded will check if the wrapped remote doesn't notify on changes
2018-02-11 20:30:58 +00:00
// or if we use a temp fs
2018-03-08 20:03:34 +00:00
func ( f * Fs ) notifyChangeUpstreamIfNeeded ( remote string , entryType fs . EntryType ) {
2018-05-14 17:06:57 +00:00
if f . Fs . Features ( ) . ChangeNotify == nil || f . opt . TempWritePath != "" {
2018-03-08 20:03:34 +00:00
f . notifyChangeUpstream ( remote , entryType )
2018-02-11 20:30:58 +00:00
}
}
2018-03-08 20:03:34 +00:00
// notifyChangeUpstream will loop through all the upstreams and notify
2018-02-10 20:01:05 +00:00
// of the provided remote (should be only a dir)
2018-03-08 20:03:34 +00:00
func ( f * Fs ) notifyChangeUpstream ( remote string , entryType fs . EntryType ) {
f . parentsForgetMu . Lock ( )
defer f . parentsForgetMu . Unlock ( )
2018-02-10 20:01:05 +00:00
if len ( f . parentsForgetFn ) > 0 {
for _ , fn := range f . parentsForgetFn {
2018-03-08 20:03:34 +00:00
fn ( remote , entryType )
2018-02-10 20:01:05 +00:00
}
}
}
2018-03-08 20:03:34 +00:00
// ChangeNotify can subsribe multiple callers
// this is coupled with the wrapped fs ChangeNotify (if it supports it)
2018-02-10 20:01:05 +00:00
// and also notifies other caches (i.e VFS) to clear out whenever something changes
2018-03-08 20:03:34 +00:00
func ( f * Fs ) ChangeNotify ( notifyFunc func ( string , fs . EntryType ) , pollInterval time . Duration ) chan bool {
f . parentsForgetMu . Lock ( )
defer f . parentsForgetMu . Unlock ( )
fs . Debugf ( f , "subscribing to ChangeNotify" )
2018-02-10 20:01:05 +00:00
f . parentsForgetFn = append ( f . parentsForgetFn , notifyFunc )
return make ( chan bool )
}
2017-11-12 17:54:25 +00:00
// Name of the remote (as passed into NewFs)
func ( f * Fs ) Name ( ) string {
return f . name
}
// Root of the remote (as passed into NewFs)
func ( f * Fs ) Root ( ) string {
return f . root
}
// Features returns the optional features of this Fs
func ( f * Fs ) Features ( ) * fs . Features {
return f . features
}
// String returns a description of the FS
func ( f * Fs ) String ( ) string {
2018-01-29 22:05:04 +00:00
return fmt . Sprintf ( "Cache remote %s:%s" , f . name , f . root )
2017-11-12 17:54:25 +00:00
}
// ChunkSize returns the configured chunk size
func ( f * Fs ) ChunkSize ( ) int64 {
2018-05-14 17:06:57 +00:00
return int64 ( f . opt . ChunkSize )
2017-11-12 17:54:25 +00:00
}
2018-01-29 22:05:04 +00:00
// InfoAge returns the configured file age
func ( f * Fs ) InfoAge ( ) time . Duration {
2018-05-14 17:06:57 +00:00
return time . Duration ( f . opt . InfoAge )
2018-01-29 22:05:04 +00:00
}
// TempUploadWaitTime returns the configured temp file upload wait time
func ( f * Fs ) TempUploadWaitTime ( ) time . Duration {
2018-05-14 17:06:57 +00:00
return time . Duration ( f . opt . TempWaitTime )
2018-01-29 22:05:04 +00:00
}
2017-11-12 17:54:25 +00:00
// NewObject finds the Object at remote.
func ( f * Fs ) NewObject ( remote string ) ( fs . Object , error ) {
2018-01-29 22:05:04 +00:00
var err error
fs . Debugf ( f , "new object '%s'" , remote )
2017-11-12 17:54:25 +00:00
co := NewObject ( f , remote )
2018-01-29 22:05:04 +00:00
// search for entry in cache and validate it
err = f . cache . GetObject ( co )
2017-12-18 12:55:37 +00:00
if err != nil {
fs . Debugf ( remote , "find: error: %v" , err )
2018-05-14 17:06:57 +00:00
} else if time . Now ( ) . After ( co . CacheTs . Add ( time . Duration ( f . opt . InfoAge ) ) ) {
2018-01-29 22:05:04 +00:00
fs . Debugf ( co , "find: cold object: %+v" , co )
2017-12-18 12:55:37 +00:00
} else {
2018-05-14 17:06:57 +00:00
fs . Debugf ( co , "find: warm object: %v, expiring on: %v" , co , co . CacheTs . Add ( time . Duration ( f . opt . InfoAge ) ) )
2017-11-12 17:54:25 +00:00
return co , nil
}
2018-01-29 22:05:04 +00:00
// search for entry in source or temp fs
var obj fs . Object
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath != "" {
2018-01-29 22:05:04 +00:00
obj , err = f . tempFs . NewObject ( remote )
// not found in temp fs
if err != nil {
fs . Debugf ( remote , "find: not found in local cache fs" )
obj , err = f . Fs . NewObject ( remote )
} else {
fs . Debugf ( obj , "find: found in local cache fs" )
}
} else {
obj , err = f . Fs . NewObject ( remote )
}
// not found in either fs
2017-11-12 17:54:25 +00:00
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Debugf ( obj , "find failed: not found in either local or remote fs" )
2017-11-12 17:54:25 +00:00
return nil , err
}
2018-01-29 22:05:04 +00:00
// cache the new entry
co = ObjectFromOriginal ( f , obj ) . persist ( )
fs . Debugf ( co , "find: cached object" )
2017-11-12 17:54:25 +00:00
return co , nil
}
// List the objects and directories in dir into entries
func ( f * Fs ) List ( dir string ) ( entries fs . DirEntries , err error ) {
2018-01-29 22:05:04 +00:00
fs . Debugf ( f , "list '%s'" , dir )
2017-12-18 12:55:37 +00:00
cd := ShallowDirectory ( f , dir )
2018-01-29 22:05:04 +00:00
// search for cached dir entries and validate them
2017-11-12 17:54:25 +00:00
entries , err = f . cache . GetDirEntries ( cd )
if err != nil {
2017-12-18 12:55:37 +00:00
fs . Debugf ( dir , "list: error: %v" , err )
2018-05-14 17:06:57 +00:00
} else if time . Now ( ) . After ( cd . CacheTs . Add ( time . Duration ( f . opt . InfoAge ) ) ) {
2017-12-18 12:55:37 +00:00
fs . Debugf ( dir , "list: cold listing: %v" , cd . CacheTs )
2017-11-12 17:54:25 +00:00
} else if len ( entries ) == 0 {
// TODO: read empty dirs from source?
2017-12-18 12:55:37 +00:00
fs . Debugf ( dir , "list: empty listing" )
2017-11-12 17:54:25 +00:00
} else {
2018-05-14 17:06:57 +00:00
fs . Debugf ( dir , "list: warm %v from cache for: %v, expiring on: %v" , len ( entries ) , cd . abs ( ) , cd . CacheTs . Add ( time . Duration ( f . opt . InfoAge ) ) )
2018-01-29 22:05:04 +00:00
fs . Debugf ( dir , "list: cached entries: %v" , entries )
2017-11-12 17:54:25 +00:00
return entries , nil
}
2018-01-29 22:05:04 +00:00
// FIXME need to clean existing cached listing
2017-11-12 17:54:25 +00:00
2018-01-29 22:05:04 +00:00
// we first search any temporary files stored locally
var cachedEntries fs . DirEntries
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath != "" {
2018-01-29 22:05:04 +00:00
queuedEntries , err := f . cache . searchPendingUploadFromDir ( cd . abs ( ) )
if err != nil {
fs . Errorf ( dir , "list: error getting pending uploads: %v" , err )
} else {
fs . Debugf ( dir , "list: read %v from temp fs" , len ( queuedEntries ) )
fs . Debugf ( dir , "list: temp fs entries: %v" , queuedEntries )
for _ , queuedRemote := range queuedEntries {
queuedEntry , err := f . tempFs . NewObject ( f . cleanRootFromPath ( queuedRemote ) )
if err != nil {
fs . Debugf ( dir , "list: temp file not found in local fs: %v" , err )
continue
}
co := ObjectFromOriginal ( f , queuedEntry ) . persist ( )
fs . Debugf ( co , "list: cached temp object" )
cachedEntries = append ( cachedEntries , co )
}
}
}
// search from the source
2017-11-12 17:54:25 +00:00
entries , err = f . Fs . List ( dir )
if err != nil {
return nil , err
}
2017-12-18 12:55:37 +00:00
fs . Debugf ( dir , "list: read %v from source" , len ( entries ) )
2018-01-29 22:05:04 +00:00
fs . Debugf ( dir , "list: source entries: %v" , entries )
2017-11-12 17:54:25 +00:00
2018-01-29 22:05:04 +00:00
// and then iterate over the ones from source (temp Objects will override source ones)
2018-06-03 19:48:13 +00:00
var batchDirectories [ ] * Directory
2017-11-12 17:54:25 +00:00
for _ , entry := range entries {
switch o := entry . ( type ) {
case fs . Object :
2018-01-29 22:05:04 +00:00
// skip over temporary objects (might be uploading)
found := false
for _ , t := range cachedEntries {
if t . Remote ( ) == o . Remote ( ) {
found = true
break
}
}
if found {
continue
}
co := ObjectFromOriginal ( f , o ) . persist ( )
2017-11-12 17:54:25 +00:00
cachedEntries = append ( cachedEntries , co )
2018-01-29 22:05:04 +00:00
fs . Debugf ( dir , "list: cached object: %v" , co )
2017-11-12 17:54:25 +00:00
case fs . Directory :
2018-01-29 22:05:04 +00:00
cdd := DirectoryFromOriginal ( f , o )
2018-03-13 21:43:34 +00:00
// check if the dir isn't expired and add it in cache if it isn't
2018-05-14 17:06:57 +00:00
if cdd2 , err := f . cache . GetDir ( cdd . abs ( ) ) ; err != nil || time . Now ( ) . Before ( cdd2 . CacheTs . Add ( time . Duration ( f . opt . InfoAge ) ) ) {
2018-06-03 19:48:13 +00:00
batchDirectories = append ( batchDirectories , cdd )
2018-03-13 21:43:34 +00:00
}
2018-01-29 22:05:04 +00:00
cachedEntries = append ( cachedEntries , cdd )
2017-11-12 17:54:25 +00:00
default :
2018-01-29 22:05:04 +00:00
fs . Debugf ( entry , "list: Unknown object type %T" , entry )
2017-11-12 17:54:25 +00:00
}
}
2018-06-03 19:48:13 +00:00
err = f . cache . AddBatchDir ( batchDirectories )
if err != nil {
fs . Errorf ( dir , "list: error caching directories from listing %v" , dir )
} else {
fs . Debugf ( dir , "list: cached directories: %v" , len ( batchDirectories ) )
}
2018-01-29 22:05:04 +00:00
// cache dir meta
t := time . Now ( )
cd . CacheTs = & t
err = f . cache . AddDir ( cd )
2017-11-12 17:54:25 +00:00
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( cd , "list: save error: '%v'" , err )
2017-12-18 12:55:37 +00:00
} else {
2018-01-29 22:05:04 +00:00
fs . Debugf ( dir , "list: cached dir: '%v', cache ts: %v" , cd . abs ( ) , cd . CacheTs )
2017-11-12 17:54:25 +00:00
}
return cachedEntries , nil
}
2018-01-12 16:30:54 +00:00
func ( f * Fs ) recurse ( dir string , list * walk . ListRHelper ) error {
2017-11-12 17:54:25 +00:00
entries , err := f . List ( dir )
if err != nil {
return err
}
for i := 0 ; i < len ( entries ) ; i ++ {
innerDir , ok := entries [ i ] . ( fs . Directory )
if ok {
err := f . recurse ( innerDir . Remote ( ) , list )
if err != nil {
return err
}
}
err := list . Add ( entries [ i ] )
if err != nil {
return err
}
}
return nil
}
// ListR lists the objects and directories of the Fs starting
// from dir recursively into out.
func ( f * Fs ) ListR ( dir string , callback fs . ListRCallback ) ( err error ) {
fs . Debugf ( f , "list recursively from '%s'" , dir )
// we check if the source FS supports ListR
// if it does, we'll use that to get all the entries, cache them and return
do := f . Fs . Features ( ) . ListR
if do != nil {
return do ( dir , func ( entries fs . DirEntries ) error {
// we got called back with a set of entries so let's cache them and call the original callback
for _ , entry := range entries {
switch o := entry . ( type ) {
case fs . Object :
_ = f . cache . AddObject ( ObjectFromOriginal ( f , o ) )
case fs . Directory :
_ = f . cache . AddDir ( DirectoryFromOriginal ( f , o ) )
default :
return errors . Errorf ( "Unknown object type %T" , entry )
}
}
// call the original callback
return callback ( entries )
} )
}
// if we're here, we're gonna do a standard recursive traversal and cache everything
2018-01-12 16:30:54 +00:00
list := walk . NewListRHelper ( callback )
2017-11-12 17:54:25 +00:00
err = f . recurse ( dir , list )
if err != nil {
return err
}
return list . Flush ( )
}
// Mkdir makes the directory (container, bucket)
func ( f * Fs ) Mkdir ( dir string ) error {
2018-01-29 22:05:04 +00:00
fs . Debugf ( f , "mkdir '%s'" , dir )
2017-11-12 17:54:25 +00:00
err := f . Fs . Mkdir ( dir )
if err != nil {
return err
}
2018-01-29 22:05:04 +00:00
fs . Debugf ( dir , "mkdir: created dir in source fs" )
2017-11-12 17:54:25 +00:00
2017-12-18 12:55:37 +00:00
cd := NewDirectory ( f , cleanPath ( dir ) )
err = f . cache . AddDir ( cd )
if err != nil {
fs . Errorf ( dir , "mkdir: add error: %v" , err )
2018-01-29 22:05:04 +00:00
} else {
fs . Debugf ( cd , "mkdir: added to cache" )
2017-12-18 12:55:37 +00:00
}
2018-01-29 22:05:04 +00:00
// expire parent of new dir
2017-12-18 12:55:37 +00:00
parentCd := NewDirectory ( f , cleanPath ( path . Dir ( dir ) ) )
err = f . cache . ExpireDir ( parentCd )
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( parentCd , "mkdir: cache expire error: %v" , err )
} else {
fs . Infof ( parentCd , "mkdir: cache expired" )
2017-12-18 12:55:37 +00:00
}
2018-03-08 20:03:34 +00:00
// advertise to ChangeNotify if wrapped doesn't do that
f . notifyChangeUpstreamIfNeeded ( parentCd . Remote ( ) , fs . EntryDirectory )
2017-11-12 17:54:25 +00:00
return nil
}
// Rmdir removes the directory (container, bucket) if empty
func ( f * Fs ) Rmdir ( dir string ) error {
2018-01-29 22:05:04 +00:00
fs . Debugf ( f , "rmdir '%s'" , dir )
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath != "" {
2018-01-29 22:05:04 +00:00
// pause background uploads
f . backgroundRunner . pause ( )
defer f . backgroundRunner . play ( )
// we check if the source exists on the remote and make the same move on it too if it does
// otherwise, we skip this step
_ , err := f . UnWrap ( ) . List ( dir )
if err == nil {
err := f . Fs . Rmdir ( dir )
if err != nil {
return err
}
fs . Debugf ( dir , "rmdir: removed dir in source fs" )
}
var queuedEntries [ ] * Object
err = walk . Walk ( f . tempFs , dir , true , - 1 , func ( path string , entries fs . DirEntries , err error ) error {
for _ , o := range entries {
if oo , ok := o . ( fs . Object ) ; ok {
co := ObjectFromOriginal ( f , oo )
queuedEntries = append ( queuedEntries , co )
}
}
return nil
} )
if err != nil {
fs . Errorf ( dir , "rmdir: error getting pending uploads: %v" , err )
} else {
fs . Debugf ( dir , "rmdir: read %v from temp fs" , len ( queuedEntries ) )
fs . Debugf ( dir , "rmdir: temp fs entries: %v" , queuedEntries )
if len ( queuedEntries ) > 0 {
2018-03-08 20:03:34 +00:00
fs . Errorf ( dir , "rmdir: temporary dir not empty: %v" , queuedEntries )
2018-01-29 22:05:04 +00:00
return fs . ErrorDirectoryNotEmpty
}
}
} else {
err := f . Fs . Rmdir ( dir )
if err != nil {
return err
}
fs . Debugf ( dir , "rmdir: removed dir in source fs" )
2017-11-12 17:54:25 +00:00
}
2017-12-18 12:55:37 +00:00
// remove dir data
d := NewDirectory ( f , dir )
2018-01-29 22:05:04 +00:00
err := f . cache . RemoveDir ( d . abs ( ) )
2017-12-18 12:55:37 +00:00
if err != nil {
fs . Errorf ( dir , "rmdir: remove error: %v" , err )
2018-01-29 22:05:04 +00:00
} else {
fs . Debugf ( d , "rmdir: removed from cache" )
2017-12-18 12:55:37 +00:00
}
// expire parent
parentCd := NewDirectory ( f , cleanPath ( path . Dir ( dir ) ) )
err = f . cache . ExpireDir ( parentCd )
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( dir , "rmdir: cache expire error: %v" , err )
} else {
fs . Infof ( parentCd , "rmdir: cache expired" )
2017-12-18 12:55:37 +00:00
}
2018-03-08 20:03:34 +00:00
// advertise to ChangeNotify if wrapped doesn't do that
f . notifyChangeUpstreamIfNeeded ( parentCd . Remote ( ) , fs . EntryDirectory )
2017-11-12 17:54:25 +00:00
return nil
}
// DirMove moves src, srcRemote to this remote at dstRemote
// using server side move operations.
func ( f * Fs ) DirMove ( src fs . Fs , srcRemote , dstRemote string ) error {
2018-01-29 22:05:04 +00:00
fs . Debugf ( f , "move dir '%s'/'%s' -> '%s'/'%s'" , src . Root ( ) , srcRemote , f . Root ( ) , dstRemote )
2017-11-12 17:54:25 +00:00
do := f . Fs . Features ( ) . DirMove
if do == nil {
return fs . ErrorCantDirMove
}
srcFs , ok := src . ( * Fs )
if ! ok {
fs . Errorf ( srcFs , "can't move directory - not same remote type" )
return fs . ErrorCantDirMove
}
if srcFs . Fs . Name ( ) != f . Fs . Name ( ) {
fs . Errorf ( srcFs , "can't move directory - not wrapping same remotes" )
return fs . ErrorCantDirMove
}
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath != "" {
2018-01-29 22:05:04 +00:00
// pause background uploads
f . backgroundRunner . pause ( )
defer f . backgroundRunner . play ( )
2018-02-14 21:47:45 +00:00
_ , errInWrap := srcFs . UnWrap ( ) . List ( srcRemote )
_ , errInTemp := f . tempFs . List ( srcRemote )
// not found in either fs
if errInWrap != nil && errInTemp != nil {
return fs . ErrorDirNotFound
}
2018-01-29 22:05:04 +00:00
// we check if the source exists on the remote and make the same move on it too if it does
// otherwise, we skip this step
2018-02-14 21:47:45 +00:00
if errInWrap == nil {
2018-01-29 22:05:04 +00:00
err := do ( srcFs . UnWrap ( ) , srcRemote , dstRemote )
if err != nil {
return err
}
fs . Debugf ( srcRemote , "movedir: dir moved in the source fs" )
}
2018-02-14 21:47:45 +00:00
// we need to check if the directory exists in the temp fs
// and skip the move if it doesn't
if errInTemp != nil {
goto cleanup
}
2018-01-29 22:05:04 +00:00
var queuedEntries [ ] * Object
2018-02-14 21:47:45 +00:00
err := walk . Walk ( f . tempFs , srcRemote , true , - 1 , func ( path string , entries fs . DirEntries , err error ) error {
2018-01-29 22:05:04 +00:00
for _ , o := range entries {
if oo , ok := o . ( fs . Object ) ; ok {
co := ObjectFromOriginal ( f , oo )
queuedEntries = append ( queuedEntries , co )
if co . tempFileStartedUpload ( ) {
fs . Errorf ( co , "can't move - upload has already started. need to finish that" )
return fs . ErrorCantDirMove
}
}
}
return nil
} )
if err != nil {
return err
}
fs . Debugf ( srcRemote , "dirmove: read %v from temp fs" , len ( queuedEntries ) )
fs . Debugf ( srcRemote , "dirmove: temp fs entries: %v" , queuedEntries )
do := f . tempFs . Features ( ) . DirMove
if do == nil {
fs . Errorf ( srcRemote , "dirmove: can't move dir in temp fs" )
return fs . ErrorCantDirMove
}
err = do ( f . tempFs , srcRemote , dstRemote )
if err != nil {
return err
}
err = f . cache . ReconcileTempUploads ( f )
if err != nil {
return err
}
} else {
err := do ( srcFs . UnWrap ( ) , srcRemote , dstRemote )
if err != nil {
return err
}
fs . Debugf ( srcRemote , "movedir: dir moved in the source fs" )
2017-11-12 17:54:25 +00:00
}
2018-02-14 21:47:45 +00:00
cleanup :
2017-11-12 17:54:25 +00:00
2017-12-18 12:55:37 +00:00
// delete src dir from cache along with all chunks
2017-11-12 17:54:25 +00:00
srcDir := NewDirectory ( srcFs , srcRemote )
2018-01-29 22:05:04 +00:00
err := f . cache . RemoveDir ( srcDir . abs ( ) )
2017-12-18 12:55:37 +00:00
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( srcDir , "dirmove: remove error: %v" , err )
} else {
fs . Debugf ( srcDir , "dirmove: removed cached dir" )
2017-12-18 12:55:37 +00:00
}
// expire src parent
srcParent := NewDirectory ( f , cleanPath ( path . Dir ( srcRemote ) ) )
err = f . cache . ExpireDir ( srcParent )
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( srcParent , "dirmove: cache expire error: %v" , err )
} else {
fs . Debugf ( srcParent , "dirmove: cache expired" )
2017-12-18 12:55:37 +00:00
}
2018-03-08 20:03:34 +00:00
// advertise to ChangeNotify if wrapped doesn't do that
f . notifyChangeUpstreamIfNeeded ( srcParent . Remote ( ) , fs . EntryDirectory )
2017-12-18 12:55:37 +00:00
// expire parent dir at the destination path
dstParent := NewDirectory ( f , cleanPath ( path . Dir ( dstRemote ) ) )
err = f . cache . ExpireDir ( dstParent )
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( dstParent , "dirmove: cache expire error: %v" , err )
} else {
fs . Debugf ( dstParent , "dirmove: cache expired" )
2017-12-18 12:55:37 +00:00
}
2018-03-08 20:03:34 +00:00
// advertise to ChangeNotify if wrapped doesn't do that
f . notifyChangeUpstreamIfNeeded ( dstParent . Remote ( ) , fs . EntryDirectory )
2017-12-18 12:55:37 +00:00
// TODO: precache dst dir and save the chunks
2017-11-12 17:54:25 +00:00
return nil
}
// cacheReader will split the stream of a reader to be cached at the same time it is read by the original source
func ( f * Fs ) cacheReader ( u io . Reader , src fs . ObjectInfo , originalRead func ( inn io . Reader ) ) {
// create the pipe and tee reader
pr , pw := io . Pipe ( )
tr := io . TeeReader ( u , pw )
// create channel to synchronize
done := make ( chan bool )
defer close ( done )
go func ( ) {
// notify the cache reader that we're complete after the source FS finishes
defer func ( ) {
_ = pw . Close ( )
} ( )
// process original reading
originalRead ( tr )
// signal complete
done <- true
} ( )
go func ( ) {
var offset int64
for {
2018-05-14 17:06:57 +00:00
chunk := make ( [ ] byte , f . opt . ChunkSize )
2017-11-12 17:54:25 +00:00
readSize , err := io . ReadFull ( pr , chunk )
// we ignore 3 failures which are ok:
// 1. EOF - original reading finished and we got a full buffer too
// 2. ErrUnexpectedEOF - original reading finished and partial buffer
// 3. ErrClosedPipe - source remote reader was closed (usually means it reached the end) and we need to stop too
// if we have a different error: we're going to error out the original reading too and stop this
if err != nil && err != io . EOF && err != io . ErrUnexpectedEOF && err != io . ErrClosedPipe {
fs . Errorf ( src , "error saving new data in cache. offset: %v, err: %v" , offset , err )
_ = pr . CloseWithError ( err )
break
}
// if we have some bytes we cache them
if readSize > 0 {
chunk = chunk [ : readSize ]
2017-12-09 21:54:26 +00:00
err2 := f . cache . AddChunk ( cleanPath ( path . Join ( f . root , src . Remote ( ) ) ) , chunk , offset )
2017-11-12 17:54:25 +00:00
if err2 != nil {
fs . Errorf ( src , "error saving new data in cache '%v'" , err2 )
_ = pr . CloseWithError ( err2 )
break
}
offset += int64 ( readSize )
}
// stuff should be closed but let's be sure
if err == io . EOF || err == io . ErrUnexpectedEOF || err == io . ErrClosedPipe {
_ = pr . Close ( )
break
}
}
// signal complete
done <- true
} ( )
// wait until both are done
for c := 0 ; c < 2 ; c ++ {
<- done
}
}
2017-11-30 19:16:45 +00:00
type putFn func ( in io . Reader , src fs . ObjectInfo , options ... fs . OpenOption ) ( fs . Object , error )
2017-11-12 17:54:25 +00:00
2017-11-30 19:16:45 +00:00
// put in to the remote path
func ( f * Fs ) put ( in io . Reader , src fs . ObjectInfo , options [ ] fs . OpenOption , put putFn ) ( fs . Object , error ) {
2017-11-12 17:54:25 +00:00
var err error
var obj fs . Object
2018-01-29 22:05:04 +00:00
// queue for upload and store in temp fs if configured
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath != "" {
2018-03-08 20:03:34 +00:00
// we need to clear the caches before a put through temp fs
parentCd := NewDirectory ( f , cleanPath ( path . Dir ( src . Remote ( ) ) ) )
_ = f . cache . ExpireDir ( parentCd )
f . notifyChangeUpstreamIfNeeded ( parentCd . Remote ( ) , fs . EntryDirectory )
2018-01-29 22:05:04 +00:00
obj , err = f . tempFs . Put ( in , src , options ... )
if err != nil {
fs . Errorf ( obj , "put: failed to upload in temp fs: %v" , err )
return nil , err
}
fs . Infof ( obj , "put: uploaded in temp fs" )
err = f . cache . addPendingUpload ( path . Join ( f . Root ( ) , src . Remote ( ) ) , false )
if err != nil {
fs . Errorf ( obj , "put: failed to queue for upload: %v" , err )
return nil , err
}
fs . Infof ( obj , "put: queued for upload" )
// if cache writes is enabled write it first through cache
2018-05-14 17:06:57 +00:00
} else if f . opt . StoreWrites {
2017-11-12 17:54:25 +00:00
f . cacheReader ( in , src , func ( inn io . Reader ) {
2017-11-30 19:16:45 +00:00
obj , err = put ( inn , src , options ... )
2017-11-12 17:54:25 +00:00
} )
2018-01-29 22:05:04 +00:00
if err == nil {
fs . Debugf ( obj , "put: uploaded to remote fs and saved in cache" )
}
// last option: save it directly in remote fs
2017-11-12 17:54:25 +00:00
} else {
2017-11-30 19:16:45 +00:00
obj , err = put ( in , src , options ... )
2018-01-29 22:05:04 +00:00
if err == nil {
fs . Debugf ( obj , "put: uploaded to remote fs" )
}
2017-11-12 17:54:25 +00:00
}
2018-01-29 22:05:04 +00:00
// validate and stop if errors are found
2017-11-12 17:54:25 +00:00
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( src , "put: error uploading: %v" , err )
2017-11-12 17:54:25 +00:00
return nil , err
}
2018-01-29 22:05:04 +00:00
// cache the new file
2018-06-08 20:33:05 +00:00
cachedObj := ObjectFromOriginal ( f , obj )
// deleting cached chunks and info to be replaced with new ones
_ = f . cache . RemoveObject ( cachedObj . abs ( ) )
cachedObj . persist ( )
2018-01-29 22:05:04 +00:00
fs . Debugf ( cachedObj , "put: added to cache" )
2018-06-08 20:33:05 +00:00
2017-12-18 12:55:37 +00:00
// expire parent
2018-01-29 22:05:04 +00:00
parentCd := NewDirectory ( f , cleanPath ( path . Dir ( cachedObj . Remote ( ) ) ) )
err = f . cache . ExpireDir ( parentCd )
2017-12-18 12:55:37 +00:00
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( cachedObj , "put: cache expire error: %v" , err )
} else {
fs . Infof ( parentCd , "put: cache expired" )
2017-12-18 12:55:37 +00:00
}
2018-03-08 20:03:34 +00:00
// advertise to ChangeNotify
f . notifyChangeUpstreamIfNeeded ( parentCd . Remote ( ) , fs . EntryDirectory )
2017-11-12 17:54:25 +00:00
return cachedObj , nil
}
2017-11-30 19:16:45 +00:00
// Put in to the remote path with the modTime given of the given size
func ( f * Fs ) Put ( in io . Reader , src fs . ObjectInfo , options ... fs . OpenOption ) ( fs . Object , error ) {
2018-01-29 22:05:04 +00:00
fs . Debugf ( f , "put data at '%s'" , src . Remote ( ) )
2017-11-30 19:16:45 +00:00
return f . put ( in , src , options , f . Fs . Put )
}
2017-11-12 17:54:25 +00:00
// PutUnchecked uploads the object
func ( f * Fs ) PutUnchecked ( in io . Reader , src fs . ObjectInfo , options ... fs . OpenOption ) ( fs . Object , error ) {
do := f . Fs . Features ( ) . PutUnchecked
if do == nil {
return nil , errors . New ( "can't PutUnchecked" )
}
2018-01-29 22:05:04 +00:00
fs . Debugf ( f , "put data unchecked in '%s'" , src . Remote ( ) )
2017-11-30 19:16:45 +00:00
return f . put ( in , src , options , do )
}
2017-11-12 17:54:25 +00:00
2017-11-30 19:16:45 +00:00
// PutStream uploads the object
func ( f * Fs ) PutStream ( in io . Reader , src fs . ObjectInfo , options ... fs . OpenOption ) ( fs . Object , error ) {
do := f . Fs . Features ( ) . PutStream
if do == nil {
return nil , errors . New ( "can't PutStream" )
2017-11-12 17:54:25 +00:00
}
2018-01-29 22:05:04 +00:00
fs . Debugf ( f , "put data streaming in '%s'" , src . Remote ( ) )
2017-11-30 19:16:45 +00:00
return f . put ( in , src , options , do )
2017-11-12 17:54:25 +00:00
}
// Copy src to this remote using server side copy operations.
func ( f * Fs ) Copy ( src fs . Object , remote string ) ( fs . Object , error ) {
2018-01-29 22:05:04 +00:00
fs . Debugf ( f , "copy obj '%s' -> '%s'" , src , remote )
2017-11-12 17:54:25 +00:00
do := f . Fs . Features ( ) . Copy
if do == nil {
2017-11-21 22:38:25 +00:00
fs . Errorf ( src , "source remote (%v) doesn't support Copy" , src . Fs ( ) )
2017-11-12 17:54:25 +00:00
return nil , fs . ErrorCantCopy
}
2018-01-29 22:05:04 +00:00
// the source must be a cached object or we abort
2017-11-12 17:54:25 +00:00
srcObj , ok := src . ( * Object )
if ! ok {
fs . Errorf ( srcObj , "can't copy - not same remote type" )
return nil , fs . ErrorCantCopy
}
2018-01-29 22:05:04 +00:00
// both the source cache fs and this cache fs need to wrap the same remote
2017-11-12 17:54:25 +00:00
if srcObj . CacheFs . Fs . Name ( ) != f . Fs . Name ( ) {
2018-01-29 22:05:04 +00:00
fs . Errorf ( srcObj , "can't copy - not wrapping same remotes" )
2017-11-12 17:54:25 +00:00
return nil , fs . ErrorCantCopy
}
2018-01-29 22:05:04 +00:00
// refresh from source or abort
if err := srcObj . refreshFromSource ( false ) ; err != nil {
fs . Errorf ( f , "can't copy %v - %v" , src , err )
2017-11-12 17:54:25 +00:00
return nil , fs . ErrorCantCopy
}
2018-01-29 22:05:04 +00:00
if srcObj . isTempFile ( ) {
// we check if the feature is stil active
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath == "" {
2018-01-29 22:05:04 +00:00
fs . Errorf ( srcObj , "can't copy - this is a local cached file but this feature is turned off this run" )
return nil , fs . ErrorCantCopy
}
do = srcObj . ParentFs . Features ( ) . Copy
if do == nil {
fs . Errorf ( src , "parent remote (%v) doesn't support Copy" , srcObj . ParentFs )
return nil , fs . ErrorCantCopy
}
}
2017-11-12 17:54:25 +00:00
obj , err := do ( srcObj . Object , remote )
if err != nil {
fs . Errorf ( srcObj , "error moving in cache: %v" , err )
return nil , err
}
2018-01-29 22:05:04 +00:00
fs . Debugf ( obj , "copy: file copied" )
2017-11-12 17:54:25 +00:00
// persist new
2017-12-18 12:55:37 +00:00
co := ObjectFromOriginal ( f , obj ) . persist ( )
2018-01-29 22:05:04 +00:00
fs . Debugf ( co , "copy: added to cache" )
2017-12-18 12:55:37 +00:00
// expire the destination path
2018-01-29 22:05:04 +00:00
parentCd := NewDirectory ( f , cleanPath ( path . Dir ( co . Remote ( ) ) ) )
err = f . cache . ExpireDir ( parentCd )
2017-12-18 12:55:37 +00:00
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( parentCd , "copy: cache expire error: %v" , err )
} else {
fs . Infof ( parentCd , "copy: cache expired" )
2017-12-18 12:55:37 +00:00
}
2018-03-08 20:03:34 +00:00
// advertise to ChangeNotify if wrapped doesn't do that
f . notifyChangeUpstreamIfNeeded ( parentCd . Remote ( ) , fs . EntryDirectory )
2017-12-18 12:55:37 +00:00
// expire src parent
srcParent := NewDirectory ( f , cleanPath ( path . Dir ( src . Remote ( ) ) ) )
err = f . cache . ExpireDir ( srcParent )
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( srcParent , "copy: cache expire error: %v" , err )
} else {
fs . Infof ( srcParent , "copy: cache expired" )
2017-12-18 12:55:37 +00:00
}
2018-03-08 20:03:34 +00:00
// advertise to ChangeNotify if wrapped doesn't do that
f . notifyChangeUpstreamIfNeeded ( srcParent . Remote ( ) , fs . EntryDirectory )
2017-11-12 17:54:25 +00:00
2017-12-18 12:55:37 +00:00
return co , nil
2017-11-12 17:54:25 +00:00
}
// Move src to this remote using server side move operations.
func ( f * Fs ) Move ( src fs . Object , remote string ) ( fs . Object , error ) {
2018-01-29 22:05:04 +00:00
fs . Debugf ( f , "moving obj '%s' -> %s" , src , remote )
// if source fs doesn't support move abort
2017-11-12 17:54:25 +00:00
do := f . Fs . Features ( ) . Move
if do == nil {
2017-11-21 22:38:25 +00:00
fs . Errorf ( src , "source remote (%v) doesn't support Move" , src . Fs ( ) )
2017-11-12 17:54:25 +00:00
return nil , fs . ErrorCantMove
}
2018-01-29 22:05:04 +00:00
// the source must be a cached object or we abort
2017-11-12 17:54:25 +00:00
srcObj , ok := src . ( * Object )
if ! ok {
fs . Errorf ( srcObj , "can't move - not same remote type" )
return nil , fs . ErrorCantMove
}
2018-01-29 22:05:04 +00:00
// both the source cache fs and this cache fs need to wrap the same remote
2017-11-12 17:54:25 +00:00
if srcObj . CacheFs . Fs . Name ( ) != f . Fs . Name ( ) {
fs . Errorf ( srcObj , "can't move - not wrapping same remote types" )
return nil , fs . ErrorCantMove
}
2018-01-29 22:05:04 +00:00
// refresh from source or abort
if err := srcObj . refreshFromSource ( false ) ; err != nil {
2017-11-12 17:54:25 +00:00
fs . Errorf ( f , "can't move %v - %v" , src , err )
return nil , fs . ErrorCantMove
}
2018-01-29 22:05:04 +00:00
// if this is a temp object then we perform the changes locally
if srcObj . isTempFile ( ) {
// we check if the feature is stil active
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath == "" {
2018-01-29 22:05:04 +00:00
fs . Errorf ( srcObj , "can't move - this is a local cached file but this feature is turned off this run" )
return nil , fs . ErrorCantMove
}
// pause background uploads
f . backgroundRunner . pause ( )
defer f . backgroundRunner . play ( )
// started uploads can't be moved until they complete
if srcObj . tempFileStartedUpload ( ) {
fs . Errorf ( srcObj , "can't move - upload has already started. need to finish that" )
return nil , fs . ErrorCantMove
}
do = f . tempFs . Features ( ) . Move
// we must also update the pending queue
err := f . cache . updatePendingUpload ( srcObj . abs ( ) , func ( item * tempUploadInfo ) error {
item . DestPath = path . Join ( f . Root ( ) , remote )
item . AddedOn = time . Now ( )
return nil
} )
if err != nil {
fs . Errorf ( srcObj , "failed to rename queued file for upload: %v" , err )
return nil , fs . ErrorCantMove
}
fs . Debugf ( srcObj , "move: queued file moved to %v" , remote )
}
2017-11-12 17:54:25 +00:00
obj , err := do ( srcObj . Object , remote )
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( srcObj , "error moving: %v" , err )
2017-11-12 17:54:25 +00:00
return nil , err
}
2018-01-29 22:05:04 +00:00
fs . Debugf ( obj , "move: file moved" )
2017-11-12 17:54:25 +00:00
// remove old
2017-12-18 12:55:37 +00:00
err = f . cache . RemoveObject ( srcObj . abs ( ) )
if err != nil {
fs . Errorf ( srcObj , "move: remove error: %v" , err )
2018-01-29 22:05:04 +00:00
} else {
fs . Debugf ( srcObj , "move: removed from cache" )
2017-12-18 12:55:37 +00:00
}
// expire old parent
2018-01-29 22:05:04 +00:00
parentCd := NewDirectory ( f , cleanPath ( path . Dir ( srcObj . Remote ( ) ) ) )
err = f . cache . ExpireDir ( parentCd )
2017-12-18 12:55:37 +00:00
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( parentCd , "move: parent cache expire error: %v" , err )
} else {
fs . Infof ( parentCd , "move: cache expired" )
2017-12-18 12:55:37 +00:00
}
2018-03-08 20:03:34 +00:00
// advertise to ChangeNotify if wrapped doesn't do that
f . notifyChangeUpstreamIfNeeded ( parentCd . Remote ( ) , fs . EntryDirectory )
2017-11-12 17:54:25 +00:00
// persist new
2017-12-18 12:55:37 +00:00
cachedObj := ObjectFromOriginal ( f , obj ) . persist ( )
2018-01-29 22:05:04 +00:00
fs . Debugf ( cachedObj , "move: added to cache" )
2017-12-18 12:55:37 +00:00
// expire new parent
2018-01-29 22:05:04 +00:00
parentCd = NewDirectory ( f , cleanPath ( path . Dir ( cachedObj . Remote ( ) ) ) )
err = f . cache . ExpireDir ( parentCd )
2017-12-18 12:55:37 +00:00
if err != nil {
2018-01-29 22:05:04 +00:00
fs . Errorf ( parentCd , "move: expire error: %v" , err )
} else {
fs . Infof ( parentCd , "move: cache expired" )
2017-12-18 12:55:37 +00:00
}
2018-03-08 20:03:34 +00:00
// advertise to ChangeNotify if wrapped doesn't do that
f . notifyChangeUpstreamIfNeeded ( parentCd . Remote ( ) , fs . EntryDirectory )
2017-11-12 17:54:25 +00:00
return cachedObj , nil
}
// Hashes returns the supported hash sets.
2018-01-12 16:30:54 +00:00
func ( f * Fs ) Hashes ( ) hash . Set {
2017-11-12 17:54:25 +00:00
return f . Fs . Hashes ( )
}
// Purge all files in the root and the root directory
func ( f * Fs ) Purge ( ) error {
fs . Infof ( f , "purging cache" )
f . cache . Purge ( )
do := f . Fs . Features ( ) . Purge
if do == nil {
return nil
}
err := do ( )
if err != nil {
return err
}
return nil
}
// CleanUp the trash in the Fs
func ( f * Fs ) CleanUp ( ) error {
f . CleanUpCache ( false )
do := f . Fs . Features ( ) . CleanUp
if do == nil {
return nil
}
return do ( )
}
2018-02-09 20:48:32 +00:00
// About gets quota information from the Fs
2018-04-16 21:19:25 +00:00
func ( f * Fs ) About ( ) ( * fs . Usage , error ) {
2018-02-09 20:48:32 +00:00
do := f . Fs . Features ( ) . About
if do == nil {
2018-04-16 21:19:25 +00:00
return nil , errors . New ( "About not supported" )
2018-02-09 20:48:32 +00:00
}
return do ( )
}
2017-11-12 17:54:25 +00:00
// Stats returns stats about the cache storage
func ( f * Fs ) Stats ( ) ( map [ string ] map [ string ] interface { } , error ) {
return f . cache . Stats ( )
}
2018-01-29 22:05:04 +00:00
// openRateLimited will execute a closure under a rate limiter watch
func ( f * Fs ) openRateLimited ( fn func ( ) ( io . ReadCloser , error ) ) ( io . ReadCloser , error ) {
2017-11-12 17:54:25 +00:00
var err error
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Second * 10 )
defer cancel ( )
start := time . Now ( )
if err = f . rateLimiter . Wait ( ctx ) ; err != nil {
return nil , err
}
elapsed := time . Since ( start )
if elapsed > time . Second * 2 {
fs . Debugf ( f , "rate limited: %s" , elapsed )
}
return fn ( )
}
// CleanUpCache will cleanup only the cache data that is expired
func ( f * Fs ) CleanUpCache ( ignoreLastTs bool ) {
f . cleanupMu . Lock ( )
defer f . cleanupMu . Unlock ( )
2018-05-14 17:06:57 +00:00
if ignoreLastTs || time . Now ( ) . After ( f . lastChunkCleanup . Add ( time . Duration ( f . opt . ChunkCleanInterval ) ) ) {
f . cache . CleanChunksBySize ( int64 ( f . opt . ChunkTotalSize ) )
2017-11-12 17:54:25 +00:00
f . lastChunkCleanup = time . Now ( )
}
}
2018-01-29 22:05:04 +00:00
// StopBackgroundRunners will signall all the runners to stop their work
// can be triggered from a terminate signal or from testing between runs
func ( f * Fs ) StopBackgroundRunners ( ) {
f . cleanupChan <- false
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath != "" && f . backgroundRunner != nil && f . backgroundRunner . isRunning ( ) {
2018-01-29 22:05:04 +00:00
f . backgroundRunner . close ( )
}
f . cache . Close ( )
fs . Debugf ( f , "Services stopped" )
}
2017-11-12 17:54:25 +00:00
// UnWrap returns the Fs that this Fs is wrapping
func ( f * Fs ) UnWrap ( ) fs . Fs {
return f . Fs
}
2017-12-06 15:14:34 +00:00
// WrapFs returns the Fs that is wrapping this Fs
func ( f * Fs ) WrapFs ( ) fs . Fs {
return f . wrapper
}
// SetWrapper sets the Fs that is wrapping this Fs
func ( f * Fs ) SetWrapper ( wrapper fs . Fs ) {
f . wrapper = wrapper
}
2018-01-29 22:05:04 +00:00
// isWrappedByCrypt checks if this is wrapped by a crypt remote
2017-12-09 21:54:26 +00:00
func ( f * Fs ) isWrappedByCrypt ( ) ( * crypt . Fs , bool ) {
if f . wrapper == nil {
return nil , false
}
c , ok := f . wrapper . ( * crypt . Fs )
return c , ok
}
2018-01-29 22:05:04 +00:00
// cleanRootFromPath trims the root of the current fs from a path
func ( f * Fs ) cleanRootFromPath ( p string ) string {
if f . Root ( ) != "" {
p = p [ len ( f . Root ( ) ) : ] // trim out root
if len ( p ) > 0 { // remove first separator
p = p [ 1 : ]
}
}
return p
}
func ( f * Fs ) isRootInPath ( p string ) bool {
if f . Root ( ) == "" {
return true
}
return strings . HasPrefix ( p , f . Root ( ) + "/" )
}
2017-11-12 17:54:25 +00:00
// DirCacheFlush flushes the dir cache
func ( f * Fs ) DirCacheFlush ( ) {
_ = f . cache . RemoveDir ( "" )
}
2018-01-29 22:05:04 +00:00
// GetBackgroundUploadChannel returns a channel that can be listened to for remote activities that happen
// in the background
func ( f * Fs ) GetBackgroundUploadChannel ( ) chan BackgroundUploadState {
2018-05-14 17:06:57 +00:00
if f . opt . TempWritePath != "" {
2018-01-29 22:05:04 +00:00
return f . backgroundRunner . notifyCh
}
return nil
}
2018-03-08 20:03:34 +00:00
func ( f * Fs ) isNotifiedRemote ( remote string ) bool {
f . notifiedMu . Lock ( )
defer f . notifiedMu . Unlock ( )
n , ok := f . notifiedRemotes [ remote ]
if ! ok || ! n {
return false
}
delete ( f . notifiedRemotes , remote )
return n
}
2017-11-12 17:54:25 +00:00
func cleanPath ( p string ) string {
p = path . Clean ( p )
if p == "." || p == "/" {
p = ""
}
return p
}
// Check the interfaces are satisfied
var (
2018-03-08 20:03:34 +00:00
_ fs . Fs = ( * Fs ) ( nil )
_ fs . Purger = ( * Fs ) ( nil )
_ fs . Copier = ( * Fs ) ( nil )
_ fs . Mover = ( * Fs ) ( nil )
_ fs . DirMover = ( * Fs ) ( nil )
_ fs . PutUncheckeder = ( * Fs ) ( nil )
_ fs . PutStreamer = ( * Fs ) ( nil )
_ fs . CleanUpper = ( * Fs ) ( nil )
_ fs . UnWrapper = ( * Fs ) ( nil )
_ fs . Wrapper = ( * Fs ) ( nil )
_ fs . ListRer = ( * Fs ) ( nil )
_ fs . ChangeNotifier = ( * Fs ) ( nil )
2018-02-09 20:48:32 +00:00
_ fs . Abouter = ( * Fs ) ( nil )
2017-11-12 17:54:25 +00:00
)