rclone/vfs/cache.go
Nick Craig-Wood 84191ac6dc vfs: fix incorrect modtime for mv into mount with --vfs-cache-modes write
When a file has its modtime set while it is open we delay setting the
modtime until the file is closed.

The file is then uploaded in Flush. In Release we check the cached
file has been uploaded by comparing modtimes and or hashes and upload
it again if it has changed.

Before this change we forgot to change the time on the cached file
when we updated the time file on the object, so this mean that Release
reset the time to the wrong time and uploaded the file again on
remotes which don't support hashes (eg crypt).

The fix was to set the modtime of the cached file at the same time we
set the modtime of the remote object. This means that the files check
as identical in Release so it doesn't try to upload the file.

This means that we avoid a double upload and the modtime is correct.

See: https://forum.rclone.org/t/modification-time-with-vfs-cache/13906/8
2020-01-19 12:52:48 +00:00

597 lines
14 KiB
Go

// This deals with caching of files locally
package vfs
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"time"
"github.com/djherbis/times"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
fscache "github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/config"
)
// CacheMode controls the functionality of the cache
type CacheMode byte
// CacheMode options
const (
CacheModeOff CacheMode = iota // cache nothing - return errors for writes which can't be satisfied
CacheModeMinimal // cache only the minimum, eg read/write opens
CacheModeWrites // cache all files opened with write intent
CacheModeFull // cache all files opened in any mode
)
var cacheModeToString = []string{
CacheModeOff: "off",
CacheModeMinimal: "minimal",
CacheModeWrites: "writes",
CacheModeFull: "full",
}
// String turns a CacheMode into a string
func (l CacheMode) String() string {
if l >= CacheMode(len(cacheModeToString)) {
return fmt.Sprintf("CacheMode(%d)", l)
}
return cacheModeToString[l]
}
// Set a CacheMode
func (l *CacheMode) Set(s string) error {
for n, name := range cacheModeToString {
if s != "" && name == s {
*l = CacheMode(n)
return nil
}
}
return errors.Errorf("Unknown cache mode level %q", s)
}
// Type of the value
func (l *CacheMode) Type() string {
return "CacheMode"
}
// cache opened files
type cache struct {
f fs.Fs // fs for the cache directory
opt *Options // vfs Options
root string // root of the cache directory
itemMu sync.Mutex // protects the following variables
item map[string]*cacheItem // files/directories in the cache
used int64 // total size of files in the cache
}
// cacheItem is stored in the item map
type cacheItem struct {
opens int // number of times file is open
atime time.Time // last time file was accessed
isFile bool // if this is a file or a directory
size int64 // size of the cached item
}
// newCacheItem returns an item for the cache
func newCacheItem(isFile bool) *cacheItem {
return &cacheItem{atime: time.Now(), isFile: isFile}
}
// newCache creates a new cache heirachy for f
//
// This starts background goroutines which can be cancelled with the
// context passed in.
func newCache(ctx context.Context, f fs.Fs, opt *Options) (*cache, error) {
fRoot := filepath.FromSlash(f.Root())
if runtime.GOOS == "windows" {
if strings.HasPrefix(fRoot, `\\?`) {
fRoot = fRoot[3:]
}
fRoot = strings.Replace(fRoot, ":", "", -1)
}
root := filepath.Join(config.CacheDir, "vfs", f.Name(), fRoot)
fs.Debugf(nil, "vfs cache root is %q", root)
f, err := fscache.Get(root)
if err != nil {
return nil, errors.Wrap(err, "failed to create cache remote")
}
c := &cache{
f: f,
opt: opt,
root: root,
item: make(map[string]*cacheItem),
}
go c.cleaner(ctx)
return c, nil
}
// findParent returns the parent directory of name, or "" for the root
func findParent(name string) string {
parent := filepath.Dir(name)
if parent == "." || parent == "/" {
parent = ""
}
return parent
}
// clean returns the cleaned version of name for use in the index map
func clean(name string) string {
name = strings.Trim(name, "/")
name = filepath.Clean(name)
if name == "." || name == "/" {
name = ""
}
return name
}
// toOSPath turns a remote relative name into an OS path in the cache
func (c *cache) toOSPath(name string) string {
return filepath.Join(c.root, filepath.FromSlash(name))
}
// mkdir makes the directory for name in the cache and returns an os
// path for the file
func (c *cache) mkdir(name string) (string, error) {
parent := findParent(name)
leaf := filepath.Base(name)
parentPath := c.toOSPath(parent)
err := os.MkdirAll(parentPath, 0700)
if err != nil {
return "", errors.Wrap(err, "make cache directory failed")
}
c.cacheDir(parent)
return filepath.Join(parentPath, leaf), nil
}
// _get gets name from the cache or creates a new one
//
// It returns the item and found as to whether this item was found in
// the cache (or just created).
//
// name should be a remote path not an osPath
//
// must be called with itemMu held
func (c *cache) _get(isFile bool, name string) (item *cacheItem, found bool) {
item = c.item[name]
found = item != nil
if !found {
item = newCacheItem(isFile)
c.item[name] = item
}
return item, found
}
// opens returns the number of opens that are on the file
//
// name should be a remote path not an osPath
func (c *cache) opens(name string) int {
name = clean(name)
c.itemMu.Lock()
defer c.itemMu.Unlock()
item := c.item[name]
if item == nil {
return 0
}
return item.opens
}
// get gets name from the cache or creates a new one
//
// name should be a remote path not an osPath
func (c *cache) get(name string) *cacheItem {
name = clean(name)
c.itemMu.Lock()
item, _ := c._get(true, name)
c.itemMu.Unlock()
return item
}
// updateStat sets the atime of the name to that passed in if it is
// newer than the existing or there isn't an existing time.
//
// it also sets the size
//
// name should be a remote path not an osPath
func (c *cache) updateStat(name string, when time.Time, size int64) {
name = clean(name)
c.itemMu.Lock()
item, found := c._get(true, name)
if !found || when.Sub(item.atime) > 0 {
fs.Debugf(name, "updateTime: setting atime to %v", when)
item.atime = when
}
item.size = size
c.itemMu.Unlock()
}
// _open marks name as open, must be called with the lock held
//
// name should be a remote path not an osPath
func (c *cache) _open(isFile bool, name string) {
for {
item, _ := c._get(isFile, name)
item.opens++
item.atime = time.Now()
if name == "" {
break
}
isFile = false
name = findParent(name)
}
}
// open marks name as open
//
// name should be a remote path not an osPath
func (c *cache) open(name string) {
name = clean(name)
c.itemMu.Lock()
c._open(true, name)
c.itemMu.Unlock()
}
// cacheDir marks a directory and its parents as being in the cache
//
// name should be a remote path not an osPath
func (c *cache) cacheDir(name string) {
name = clean(name)
c.itemMu.Lock()
defer c.itemMu.Unlock()
for {
item := c.item[name]
if item != nil {
break
}
c.item[name] = newCacheItem(false)
if name == "" {
break
}
name = findParent(name)
}
}
// exists checks to see if the file exists in the cache or not
func (c *cache) exists(name string) bool {
osPath := c.toOSPath(name)
fi, err := os.Stat(osPath)
if err != nil {
return false
}
// checks for non-regular files (e.g. directories, symlinks, devices, etc.)
if !fi.Mode().IsRegular() {
return false
}
return true
}
// renames the file in cache
func (c *cache) rename(name string, newName string) (err error) {
osOldPath := c.toOSPath(name)
osNewPath := c.toOSPath(newName)
sfi, err := os.Stat(osOldPath)
if err != nil {
return errors.Wrapf(err, "Failed to stat source: %s", osOldPath)
}
if !sfi.Mode().IsRegular() {
// cannot copy non-regular files (e.g., directories, symlinks, devices, etc.)
return errors.Errorf("Non-regular source file: %s (%q)", sfi.Name(), sfi.Mode().String())
}
dfi, err := os.Stat(osNewPath)
if err != nil {
if !os.IsNotExist(err) {
return errors.Wrapf(err, "Failed to stat destination: %s", osNewPath)
}
parent := findParent(osNewPath)
err = os.MkdirAll(parent, 0700)
if err != nil {
return errors.Wrapf(err, "Failed to create parent dir: %s", parent)
}
} else {
if !(dfi.Mode().IsRegular()) {
return errors.Errorf("Non-regular destination file: %s (%q)", dfi.Name(), dfi.Mode().String())
}
if os.SameFile(sfi, dfi) {
return nil
}
}
if err = os.Rename(osOldPath, osNewPath); err != nil {
return errors.Wrapf(err, "Failed to rename in cache: %s to %s", osOldPath, osNewPath)
}
// Rename the cache item
c.itemMu.Lock()
if oldItem, ok := c.item[name]; ok {
c.item[newName] = oldItem
delete(c.item, name)
}
c.itemMu.Unlock()
fs.Infof(name, "Renamed in cache")
return nil
}
// _close marks name as closed - must be called with the lock held
func (c *cache) _close(isFile bool, name string) {
for {
item, _ := c._get(isFile, name)
item.opens--
item.atime = time.Now()
if item.opens < 0 {
fs.Errorf(name, "cache: double close")
}
osPath := c.toOSPath(name)
fi, err := os.Stat(osPath)
// Update the size on close
if err == nil && !fi.IsDir() {
item.size = fi.Size()
}
if name == "" {
break
}
isFile = false
name = findParent(name)
}
}
// close marks name as closed
//
// name should be a remote path not an osPath
func (c *cache) close(name string) {
name = clean(name)
c.itemMu.Lock()
c._close(true, name)
c.itemMu.Unlock()
}
// remove should be called if name is deleted
func (c *cache) remove(name string) {
osPath := c.toOSPath(name)
err := os.Remove(osPath)
if err != nil && !os.IsNotExist(err) {
fs.Errorf(name, "Failed to remove from cache: %v", err)
} else {
fs.Infof(name, "Removed from cache")
}
}
// removeDir should be called if dir is deleted and returns true if
// the directory is gone.
func (c *cache) removeDir(dir string) bool {
osPath := c.toOSPath(dir)
err := os.Remove(osPath)
if err == nil || os.IsNotExist(err) {
if err == nil {
fs.Debugf(dir, "Removed empty directory")
}
return true
}
if !os.IsExist(err) {
fs.Errorf(dir, "Failed to remove cached dir: %v", err)
}
return false
}
// setModTime should be called to set the modification time of the cache file
func (c *cache) setModTime(name string, modTime time.Time) {
osPath := c.toOSPath(name)
err := os.Chtimes(osPath, modTime, modTime)
if err != nil {
fs.Errorf(name, "Failed to set modification time of cached file: %v", err)
}
}
// cleanUp empties the cache of everything
func (c *cache) cleanUp() error {
return os.RemoveAll(c.root)
}
// walk walks the cache calling the function
func (c *cache) walk(fn func(osPath string, fi os.FileInfo, name string) error) error {
return filepath.Walk(c.root, func(osPath string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
// Find path relative to the cache root
name, err := filepath.Rel(c.root, osPath)
if err != nil {
return errors.Wrap(err, "filepath.Rel failed in walk")
}
if name == "." {
name = ""
}
// And convert into slashes
name = filepath.ToSlash(name)
return fn(osPath, fi, name)
})
}
// updateStats walks the cache updating any atimes and sizes it finds
//
// it also updates used
func (c *cache) updateStats() error {
var newUsed int64
err := c.walk(func(osPath string, fi os.FileInfo, name string) error {
if !fi.IsDir() {
// Update the atime with that of the file
atime := times.Get(fi).AccessTime()
c.updateStat(name, atime, fi.Size())
newUsed += fi.Size()
} else {
c.cacheDir(name)
}
return nil
})
c.itemMu.Lock()
c.used = newUsed
c.itemMu.Unlock()
return err
}
// purgeOld gets rid of any files that are over age
func (c *cache) purgeOld(maxAge time.Duration) {
c._purgeOld(maxAge, c.remove)
}
func (c *cache) _purgeOld(maxAge time.Duration, remove func(name string)) {
c.itemMu.Lock()
defer c.itemMu.Unlock()
cutoff := time.Now().Add(-maxAge)
for name, item := range c.item {
if item.isFile && item.opens == 0 {
// If not locked and access time too long ago - delete the file
dt := item.atime.Sub(cutoff)
// fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.atime, cutoff, dt)
if dt < 0 {
remove(name)
// Remove the entry
delete(c.item, name)
}
}
}
}
// Purge any empty directories
func (c *cache) purgeEmptyDirs() {
c._purgeEmptyDirs(c.removeDir)
}
func (c *cache) _purgeEmptyDirs(removeDir func(name string) bool) {
c.itemMu.Lock()
defer c.itemMu.Unlock()
var dirs []string
for name, item := range c.item {
if !item.isFile && item.opens == 0 {
dirs = append(dirs, name)
}
}
// remove empty directories in reverse alphabetical order
sort.Strings(dirs)
for i := len(dirs) - 1; i >= 0; i-- {
dir := dirs[i]
// Remove the entry
if removeDir(dir) {
delete(c.item, dir)
}
}
}
// This is a cacheItem with a name for sorting
type cacheNamedItem struct {
name string
item *cacheItem
}
type cacheNamedItems []cacheNamedItem
func (v cacheNamedItems) Len() int { return len(v) }
func (v cacheNamedItems) Swap(i, j int) { v[i], v[j] = v[j], v[i] }
func (v cacheNamedItems) Less(i, j int) bool { return v[i].item.atime.Before(v[j].item.atime) }
// Remove any files that are over quota starting from the
// oldest first
func (c *cache) purgeOverQuota(quota int64) {
c._purgeOverQuota(quota, c.remove)
}
func (c *cache) _purgeOverQuota(quota int64, remove func(name string)) {
c.itemMu.Lock()
defer c.itemMu.Unlock()
if quota <= 0 || c.used < quota {
return
}
var items cacheNamedItems
// Make a slice of unused files
for name, item := range c.item {
if item.isFile && item.opens == 0 {
items = append(items, cacheNamedItem{
name: name,
item: item,
})
}
}
sort.Sort(items)
// Remove items until the quota is OK
for _, item := range items {
if c.used < quota {
break
}
remove(item.name)
// Remove the entry
delete(c.item, item.name)
c.used -= item.item.size
}
}
// clean empties the cache of stuff if it can
func (c *cache) clean() {
// Cache may be empty so end
_, err := os.Stat(c.root)
if os.IsNotExist(err) {
return
}
c.itemMu.Lock()
oldItems, oldUsed := len(c.item), fs.SizeSuffix(c.used)
c.itemMu.Unlock()
// first walk the FS to update the atimes and sizes
err = c.updateStats()
if err != nil {
fs.Errorf(nil, "Error traversing cache %q: %v", c.root, err)
}
// Remove any files that are over age
c.purgeOld(c.opt.CacheMaxAge)
// Now remove any files that are over quota starting from the
// oldest first
c.purgeOverQuota(int64(c.opt.CacheMaxSize))
// Remove any empty directories
c.purgeEmptyDirs()
// Stats
c.itemMu.Lock()
newItems, newUsed := len(c.item), fs.SizeSuffix(c.used)
c.itemMu.Unlock()
fs.Infof(nil, "Cleaned the cache: objects %d (was %d), total size %v (was %v)", newItems, oldItems, newUsed, oldUsed)
}
// cleaner calls clean at regular intervals
//
// doesn't return until context is cancelled
func (c *cache) cleaner(ctx context.Context) {
if c.opt.CachePollInterval <= 0 {
fs.Debugf(nil, "Cache cleaning thread disabled because poll interval <= 0")
return
}
// Start cleaning the cache immediately
c.clean()
// Then every interval specified
timer := time.NewTicker(c.opt.CachePollInterval)
defer timer.Stop()
for {
select {
case <-timer.C:
c.clean()
case <-ctx.Done():
fs.Debugf(nil, "cache cleaner exiting")
return
}
}
}