vfs: implement partial reads in --vfs-cache-mode full

This allows reads to only read part of the file and it keeps on disk a
cache of what parts of each file have been loaded.

File data itself is kept in sparse files.
This commit is contained in:
Nick Craig-Wood 2020-02-29 18:08:22 +00:00
parent d84527a730
commit 917cb4acb3
10 changed files with 2359 additions and 1610 deletions

View file

@ -38,21 +38,17 @@ type File struct {
inode uint64 // inode number - read only
size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned
mu sync.RWMutex // protects the following
d *Dir // parent directory
dPath string // path of parent directory. NB dir rename means all Files are flushed
o fs.Object // NB o may be nil if file is being written
leaf string // leaf name of the object
rwOpenCount int // number of open files on this handle
writers []Handle // writers for this file
nwriters int32 // len(writers) which is read/updated with atomic
readWriters int // how many RWFileHandle are open for writing
readWriterClosing bool // is an RWFileHandle currently cosing?
modified bool // has the cache file be modified by an RWFileHandle?
pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written
pendingRenameFun func(ctx context.Context) error // will be run/renamed after all writers close
appendMode bool // file was opened with O_APPEND
sys interface{} // user defined info to be attached here
mu sync.RWMutex // protects the following
d *Dir // parent directory
dPath string // path of parent directory. NB dir rename means all Files are flushed
o fs.Object // NB o may be nil if file is being written
leaf string // leaf name of the object
writers []Handle // writers for this file
nwriters int32 // len(writers) which is read/updated with atomic
pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written
pendingRenameFun func(ctx context.Context) error // will be run/renamed after all writers close
appendMode bool // file was opened with O_APPEND
sys interface{} // user defined info to be attached here
muRW sync.Mutex // synchronize RWFileHandle.openPending(), RWFileHandle.close() and File.Remove
}
@ -124,11 +120,6 @@ func (f *File) Path() string {
return path.Join(dPath, leaf)
}
// osPath returns the full path of the file in the cache in OS format
func (f *File) osPath() string {
return f.d.vfs.cache.ToOSPath(f.Path())
}
// Sys returns underlying data source (can be nil) - satisfies Node interface
func (f *File) Sys() interface{} {
f.mu.RLock()
@ -184,10 +175,11 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error {
return err
}
oldPath := f.Path()
// File.mu is unlocked here to call Dir.Path()
newPath := path.Join(destDir.Path(), newName)
renameCall := func(ctx context.Context) error {
renameCall := func(ctx context.Context) (err error) {
// chain rename calls if any
if oldPendingRenameFun != nil {
err := oldPendingRenameFun(ctx)
@ -199,44 +191,46 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error {
f.mu.RLock()
o := f.o
f.mu.RUnlock()
if o == nil {
return errors.New("Cannot rename: file object is not available")
}
if o.Remote() == newPath {
return nil // no need to rename
}
var newObject fs.Object
// if o is nil then are writing the file so no need to rename the object
if o != nil {
if o.Remote() == newPath {
return nil // no need to rename
}
// do the move of the remote object
dstOverwritten, _ := d.Fs().NewObject(ctx, newPath)
newObject, err := operations.Move(ctx, d.Fs(), dstOverwritten, newPath, o)
if err != nil {
fs.Errorf(f.Path(), "File.Rename error: %v", err)
return err
}
// do the move of the remote object
dstOverwritten, _ := d.Fs().NewObject(ctx, newPath)
newObject, err = operations.Move(ctx, d.Fs(), dstOverwritten, newPath, o)
if err != nil {
fs.Errorf(f.Path(), "File.Rename error: %v", err)
return err
}
// newObject can be nil here for example if --dry-run
if newObject == nil {
err = errors.New("rename failed: nil object returned")
fs.Errorf(f.Path(), "File.Rename %v", err)
return err
// newObject can be nil here for example if --dry-run
if newObject == nil {
err = errors.New("rename failed: nil object returned")
fs.Errorf(f.Path(), "File.Rename %v", err)
return err
}
}
// Rename in the cache
if f.d.vfs.cache != nil {
if err := f.d.vfs.cache.Rename(oldPath, newPath, newObject); err != nil {
fs.Infof(f.Path(), "File.Rename failed in Cache: %v", err)
}
}
// Update the node with the new details
fs.Debugf(f.Path(), "Updating file with %v %p", newObject, f)
// f.rename(destDir, newObject)
f.mu.Lock()
f.o = newObject
if newObject != nil {
f.o = newObject
}
f.pendingRenameFun = nil
f.mu.Unlock()
return nil
}
// Rename in the cache if it exists
if f.d.vfs.cache != nil && f.d.vfs.cache.Exists(f.Path()) {
if err := f.d.vfs.cache.Rename(f.Path(), newPath); err != nil {
fs.Infof(f.Path(), "File.Rename failed in Cache: %v", err)
}
}
// rename the file object
dPath := destDir.Path()
f.mu.Lock()
@ -246,7 +240,12 @@ func (f *File) rename(ctx context.Context, destDir *Dir, newName string) error {
writing := f._writingInProgress()
f.mu.Unlock()
if writing {
// Delay the rename if not using RW caching. For the minimal case we
// need to look in the cache to see if caching is in use.
CacheMode := d.vfs.Opt.CacheMode
if writing &&
(CacheMode < vfscommon.CacheModeMinimal ||
(CacheMode == vfscommon.CacheModeMinimal && !f.d.vfs.cache.Exists(f.Path()))) {
fs.Debugf(f.Path(), "File is currently open, delaying rename %p", f)
f.mu.Lock()
f.pendingRenameFun = renameCall
@ -262,14 +261,11 @@ func (f *File) addWriter(h Handle) {
f.mu.Lock()
f.writers = append(f.writers, h)
atomic.AddInt32(&f.nwriters, 1)
if _, ok := h.(*RWFileHandle); ok {
f.readWriters++
}
f.mu.Unlock()
}
// delWriter removes a write handle from the file
func (f *File) delWriter(h Handle, modifiedCacheFile bool) (lastWriterAndModified bool) {
func (f *File) delWriter(h Handle) {
f.mu.Lock()
defer f.applyPendingRename()
defer f.mu.Unlock()
@ -286,51 +282,6 @@ func (f *File) delWriter(h Handle, modifiedCacheFile bool) (lastWriterAndModifie
} else {
fs.Debugf(f._path(), "File.delWriter couldn't find handle")
}
if _, ok := h.(*RWFileHandle); ok {
f.readWriters--
}
f.readWriterClosing = true
if modifiedCacheFile {
f.modified = true
}
lastWriterAndModified = len(f.writers) == 0 && f.modified
if lastWriterAndModified {
f.modified = false
}
return
}
// addRWOpen should be called by ReadWriteHandle when they have
// actually opened the file for read or write.
func (f *File) addRWOpen() {
f.mu.Lock()
f.rwOpenCount++
f.mu.Unlock()
}
// delRWOpen should be called by ReadWriteHandle when they have closed
// an actually opene file for read or write.
func (f *File) delRWOpen() {
f.mu.Lock()
f.rwOpenCount--
f.mu.Unlock()
}
// rwOpens returns how many active open ReadWriteHandles there are.
// Note that file handles which are in pending open state aren't
// counted.
func (f *File) rwOpens() int {
f.mu.RLock()
defer f.mu.RUnlock()
return f.rwOpenCount
}
// finishWriterClose resets the readWriterClosing flag
func (f *File) finishWriterClose() {
f.mu.Lock()
f.readWriterClosing = false
f.mu.Unlock()
f.applyPendingRename()
}
// activeWriters returns the number of writers on the file
@ -413,11 +364,6 @@ func (f *File) _applyPendingModTime() error {
return errors.New("Cannot apply ModTime, file object is not available")
}
// set the time of the file in the cache
if f.d.vfs.cache != nil {
f.d.vfs.cache.SetModTime(f._path(), f.pendingModTime)
}
// set the time of the object
err := f.o.SetModTime(context.TODO(), f.pendingModTime)
switch err {
@ -430,13 +376,18 @@ func (f *File) _applyPendingModTime() error {
return err
}
// set the time of the file in the cache
if f.d.vfs.cache != nil {
f.d.vfs.cache.SetModTime(f._path(), f.pendingModTime)
}
return nil
}
// _writingInProgress returns true of there are any open writers
// Call with read lock held
func (f *File) _writingInProgress() bool {
return f.o == nil || len(f.writers) != 0 || f.readWriterClosing
return f.o == nil || len(f.writers) != 0
}
// Update the size while writing
@ -486,12 +437,11 @@ func (f *File) waitForValidObject() (o fs.Object, err error) {
f.mu.RLock()
o = f.o
nwriters := len(f.writers)
wclosing := f.readWriterClosing
f.mu.RUnlock()
if o != nil {
return o, nil
}
if nwriters == 0 && !wclosing {
if nwriters == 0 {
return nil, errors.New("can't open file - writer failed")
}
time.Sleep(100 * time.Millisecond)
@ -690,7 +640,7 @@ func (f *File) Open(flags int) (fd Handle, err error) {
d := f.d
f.mu.RUnlock()
CacheMode := d.vfs.Opt.CacheMode
if CacheMode >= vfscommon.CacheModeMinimal && (d.vfs.cache.Opens(f.Path()) > 0 || d.vfs.cache.Exists(f.Path())) {
if CacheMode >= vfscommon.CacheModeMinimal && (d.vfs.cache.InUse(f.Path()) || d.vfs.cache.Exists(f.Path())) {
fd, err = f.openRW(flags)
} else if read && write {
if CacheMode >= vfscommon.CacheModeMinimal {

View file

@ -1,18 +1,15 @@
package vfs
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"runtime"
"sync"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/vfs/vfscache"
)
// RWFileHandle is a handle that can be open for read and write.
@ -20,21 +17,29 @@ import (
// It will be open to a temporary file which, when closed, will be
// transferred to the remote.
type RWFileHandle struct {
// read only variables
file *File
d *Dir
flags int // open flags
item *vfscache.Item // cached file item
// read write variables protected by mutex
mu sync.Mutex
fd *os.File
offset int64 // file pointer offset
file *File
d *Dir
flags int // open flags
closed bool // set if handle has been closed
closed bool // set if handle has been closed
opened bool
writeCalled bool // if any Write() methods have been called
changed bool // file contents was changed in any other way
}
func newRWFileHandle(d *Dir, f *File, flags int) (fh *RWFileHandle, err error) {
defer log.Trace(f.Path(), "")("err=%v", &err)
// get an item to represent this from the cache
item := d.vfs.cache.Item(f.Path())
exists := f.exists() // || item.Exists()
// if O_CREATE and O_EXCL are set and if path already exists, then return EEXIST
if flags&(os.O_CREATE|os.O_EXCL) == os.O_CREATE|os.O_EXCL && f.exists() {
if flags&(os.O_CREATE|os.O_EXCL) == os.O_CREATE|os.O_EXCL && exists {
return nil, EEXIST
}
@ -42,130 +47,62 @@ func newRWFileHandle(d *Dir, f *File, flags int) (fh *RWFileHandle, err error) {
file: f,
d: d,
flags: flags,
item: item,
}
// mark the file as open in the cache - must be done before the mkdir
fh.d.VFS().cache.Open(fh.file.Path())
// Make a place for the file
_, err = d.VFS().cache.Mkdir(fh.file.Path())
if err != nil {
fh.d.VFS().cache.Close(fh.file.Path())
return nil, errors.Wrap(err, "open RW handle failed to make cache directory")
}
rdwrMode := fh.flags & accessModeMask
if rdwrMode != os.O_RDONLY {
fh.file.addWriter(fh)
}
// truncate or create files immediately to prepare the cache
if fh.flags&os.O_TRUNC != 0 || fh.flags&(os.O_CREATE) != 0 && !f.exists() {
if err := fh.openPending(false); err != nil {
fh.file.delWriter(fh, false)
return nil, err
// truncate immediately if O_TRUNC is set or O_CREATE is set and file doesn't exist
if !fh.readOnly() && (fh.flags&os.O_TRUNC != 0 || (fh.flags&os.O_CREATE != 0 && !(exists || item.Exists()))) {
err = fh.Truncate(0)
if err != nil {
return nil, errors.Wrap(err, "cache open with O_TRUNC: failed to truncate")
}
// we definitely need to write back the item even if we don't write to it
item.Dirty()
}
if !fh.readOnly() {
fh.file.addWriter(fh)
}
return fh, nil
}
// readOnly returns whether flags say fh is read only
func (fh *RWFileHandle) readOnly() bool {
return (fh.flags & accessModeMask) == os.O_RDONLY
}
// writeOnly returns whether flags say fh is write only
func (fh *RWFileHandle) writeOnly() bool {
return (fh.flags & accessModeMask) == os.O_WRONLY
}
// openPending opens the file if there is a pending open
//
// call with the lock held
func (fh *RWFileHandle) openPending(truncate bool) (err error) {
func (fh *RWFileHandle) openPending() (err error) {
if fh.opened {
return nil
}
defer log.Trace(fh.logPrefix(), "")("err=%v", &err)
fh.file.muRW.Lock()
defer fh.file.muRW.Unlock()
o := fh.file.getObject()
err = fh.item.Open(o)
if err != nil {
return errors.Wrap(err, "open RW handle failed to open cache file")
}
var fd *os.File
cacheFileOpenFlags := fh.flags
// if not truncating the file, need to read it first
if fh.flags&os.O_TRUNC == 0 && !truncate {
// If the remote object exists AND its cached file exists locally AND there are no
// other RW handles with it open, then attempt to update it.
if o != nil && fh.file.rwOpens() == 0 {
err = fh.d.VFS().cache.Check(context.TODO(), o, fh.file.Path())
if err != nil {
return errors.Wrap(err, "open RW handle failed to check cache file")
}
}
// try to open an existing cache file
fd, err = file.OpenFile(fh.file.osPath(), cacheFileOpenFlags&^os.O_CREATE, 0600)
if os.IsNotExist(err) {
// cache file does not exist, so need to fetch it if we have an object to fetch
// it from
if o != nil {
err = fh.d.VFS().cache.Fetch(context.TODO(), o, fh.file.Path())
if err != nil {
cause := errors.Cause(err)
if cause != fs.ErrorObjectNotFound && cause != fs.ErrorDirNotFound {
// return any non NotFound errors
return errors.Wrap(err, "open RW handle failed to cache file")
}
// continue here with err=fs.Error{Object,Dir}NotFound
}
}
// if err == nil, then we have cached the file successfully, otherwise err is
// indicating some kind of non existent file/directory either
// os.IsNotExist(err) or fs.Error{Object,Dir}NotFound
if err != nil {
if fh.flags&os.O_CREATE != 0 {
// if the object wasn't found AND O_CREATE is set then
// ignore error as we are about to create the file
fh.file.setSize(0)
fh.changed = true
} else {
return errors.Wrap(err, "open RW handle failed to cache file")
}
}
} else if err != nil {
return errors.Wrap(err, "cache open file failed")
} else {
fs.Debugf(fh.logPrefix(), "Opened existing cached copy with flags=%s", decodeOpenFlags(fh.flags))
}
size := fh._size() // update size in file and read size
if fh.flags&os.O_APPEND != 0 {
fh.offset = size
fs.Debugf(fh.logPrefix(), "open at offset %d", fh.offset)
} else {
// Set the size to 0 since we are truncating and flag we need to write it back
fh.file.setSize(0)
fh.changed = true
if fh.flags&os.O_CREATE == 0 && fh.file.exists() {
// create an empty file if it exists on the source
err = ioutil.WriteFile(fh.file.osPath(), []byte{}, 0600)
if err != nil {
return errors.Wrap(err, "cache open failed to create zero length file")
}
}
// Windows doesn't seem to deal well with O_TRUNC and
// certain access modes so truncate the file if it
// exists in these cases.
if runtime.GOOS == "windows" && fh.flags&os.O_APPEND != 0 {
cacheFileOpenFlags &^= os.O_TRUNC
_, err = os.Stat(fh.file.osPath())
if err == nil {
err = os.Truncate(fh.file.osPath(), 0)
if err != nil {
return errors.Wrap(err, "cache open failed to truncate")
}
}
}
fh.offset = 0
}
if fd == nil {
fs.Debugf(fh.logPrefix(), "Opening cached copy with flags=%s", decodeOpenFlags(fh.flags))
fd, err = file.OpenFile(fh.file.osPath(), cacheFileOpenFlags, 0600)
if err != nil {
return errors.Wrap(err, "cache open file failed")
}
}
fh.fd = fd
fh.opened = true
fh.file.addRWOpen()
fh.d.addObject(fh.file) // make sure the directory has this object in it now
return nil
}
@ -188,79 +125,16 @@ func (fh *RWFileHandle) Node() Node {
return fh.file
}
// Returns whether the file needs to be written back.
//
// If write hasn't been called and the file hasn't been changed in any other
// way we haven't modified it so we don't need to transfer it
// updateSize updates the size of the file if necessary
//
// Must be called with fh.mu held
func (fh *RWFileHandle) modified() bool {
if !fh.writeCalled && !fh.changed {
fs.Debugf(fh.logPrefix(), "not modified so not transferring")
return false
func (fh *RWFileHandle) updateSize() {
// If read only or not opened then ignore
if fh.readOnly() || !fh.opened {
return
}
return true
}
// flushWrites flushes any pending writes to cloud storage
//
// Must be called with fh.muRW held
func (fh *RWFileHandle) flushWrites(closeFile bool) error {
if fh.closed && !closeFile {
return nil
}
rdwrMode := fh.flags & accessModeMask
writer := rdwrMode != os.O_RDONLY
// If read only then return
if !fh.opened && rdwrMode == os.O_RDONLY {
return nil
}
isCopied := false
if writer {
isCopied = fh.file.delWriter(fh, fh.modified())
defer fh.file.finishWriterClose()
}
// If we aren't creating or truncating the file then
// we haven't modified it so don't need to transfer it
if fh.flags&(os.O_CREATE|os.O_TRUNC) != 0 {
if err := fh.openPending(false); err != nil {
return err
}
}
if writer && fh.opened {
fi, err := fh.fd.Stat()
if err != nil {
fs.Errorf(fh.logPrefix(), "Failed to stat cache file: %v", err)
} else {
fh.file.setSize(fi.Size())
}
}
// Close the underlying file
if fh.opened && closeFile {
err := fh.fd.Close()
if err != nil {
err = errors.Wrap(err, "failed to close cache file")
return err
}
}
if isCopied {
o, err := fh.d.VFS().cache.Store(context.TODO(), fh.file.getObject(), fh.file.Path())
if err != nil {
fs.Errorf(fh.logPrefix(), "%v", err)
return err
}
fh.file.setObject(o)
fs.Debugf(o, "transferred to remote")
}
return nil
size := fh._size()
fh.file.setSize(size)
}
// close the file handle returning EBADF if it has been
@ -278,15 +152,19 @@ func (fh *RWFileHandle) close() (err error) {
if fh.closed {
return ECLOSED
}
fh.closed = true
defer func() {
if fh.opened {
fh.file.delRWOpen()
}
fh.d.VFS().cache.Close(fh.file.Path())
}()
return fh.flushWrites(true)
fh.closed = true
fh.updateSize()
if fh.opened {
err = fh.item.Close(fh.file.setObject)
fh.opened = false
}
if !fh.readOnly() {
fh.file.delWriter(fh)
}
return err
}
// Close closes the file
@ -301,37 +179,10 @@ func (fh *RWFileHandle) Close() error {
// single opened file, Flush can be called multiple times.
func (fh *RWFileHandle) Flush() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if !fh.opened {
return nil
}
if fh.closed {
fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush nothing to do")
return nil
}
// fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush")
if !fh.opened {
fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unopened handle")
return nil
}
// If Write hasn't been called then ignore the Flush - Release
// will pick it up
if !fh.writeCalled {
fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unwritten handle")
return nil
}
fh.file.muRW.Lock()
defer fh.file.muRW.Unlock()
err := fh.flushWrites(false)
if err != nil {
fs.Errorf(fh.logPrefix(), "RWFileHandle.Flush error: %v", err)
} else {
// fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush OK")
}
return err
fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush")
fh.updateSize()
fh.mu.Unlock()
return nil
}
// Release is called when we are finished with the file handle
@ -341,35 +192,35 @@ func (fh *RWFileHandle) Flush() error {
func (fh *RWFileHandle) Release() error {
fh.mu.Lock()
defer fh.mu.Unlock()
fs.Debugf(fh.logPrefix(), "RWFileHandle.Release")
if fh.closed {
fs.Debugf(fh.logPrefix(), "RWFileHandle.Release nothing to do")
// Don't return an error if called twice
return nil
}
fs.Debugf(fh.logPrefix(), "RWFileHandle.Release closing")
err := fh.close()
if err != nil {
fs.Errorf(fh.logPrefix(), "RWFileHandle.Release error: %v", err)
} else {
// fs.Debugf(fh.logPrefix(), "RWFileHandle.Release OK")
}
return err
}
// _size returns the size of the underlying file
// _size returns the size of the underlying file and also sets it in
// the owning file
//
// call with the lock held
//
// FIXME what if a file was partially read in - this may return the wrong thing?
// FIXME need to make sure we extend the file to the maximum when creating it
func (fh *RWFileHandle) _size() int64 {
if !fh.opened {
return fh.file.Size()
}
fi, err := fh.fd.Stat()
size, err := fh.item.GetSize()
if err != nil {
return 0
o := fh.file.getObject()
if o != nil {
size = o.Size()
} else {
fs.Errorf(fh.logPrefix(), "Couldn't read size of file")
size = 0
}
}
return fi.Size()
fh.file.setSize(size)
return size
}
// Size returns the size of the underlying file
@ -390,16 +241,20 @@ func (fh *RWFileHandle) Stat() (os.FileInfo, error) {
//
// call with lock held
func (fh *RWFileHandle) _readAt(b []byte, off int64) (n int, err error) {
defer log.Trace(fh.logPrefix(), "size=%d, off=%d", len(b), off)("n=%d, err=%v", &n, &err)
if fh.closed {
return n, ECLOSED
}
if fh.flags&accessModeMask == os.O_WRONLY {
if fh.writeOnly() {
return n, EBADF
}
if err = fh.openPending(false); err != nil {
if off >= fh._size() {
return n, io.EOF
}
if err = fh.openPending(); err != nil {
return n, err
}
return fh.fd.ReadAt(b, off)
return fh.item.ReadAt(b, off)
}
// ReadAt bytes from the file at off
@ -428,7 +283,7 @@ func (fh *RWFileHandle) Seek(offset int64, whence int) (ret int64, err error) {
if !fh.opened && offset == 0 && whence != 2 {
return 0, nil
}
if err = fh.openPending(false); err != nil {
if err = fh.openPending(); err != nil {
return ret, err
}
switch whence {
@ -444,32 +299,30 @@ func (fh *RWFileHandle) Seek(offset int64, whence int) (ret int64, err error) {
// WriteAt bytes to the file at off
func (fh *RWFileHandle) _writeAt(b []byte, off int64) (n int, err error) {
defer log.Trace(fh.logPrefix(), "size=%d, off=%d", len(b), off)("n=%d, err=%v", &n, &err)
if fh.closed {
return n, ECLOSED
}
if fh.flags&accessModeMask == os.O_RDONLY {
if fh.readOnly() {
return n, EBADF
}
if err = fh.openPending(false); err != nil {
if err = fh.openPending(); err != nil {
return n, err
}
if fh.flags&os.O_APPEND != 0 {
// From open(2): Before each write(2), the file offset is
// positioned at the end of the file, as if with lseek(2).
size := fh._size()
fh.offset = size
off = fh.offset
}
fh.writeCalled = true
if fh.flags&os.O_APPEND != 0 {
// if append is set, call Write as WriteAt returns an error if append is set
n, err = fh.fd.Write(b)
} else {
n, err = fh.fd.WriteAt(b, off)
}
n, err = fh.item.WriteAt(b, off)
if err != nil {
return n, err
}
fi, err := fh.fd.Stat()
if err != nil {
return n, errors.Wrap(err, "failed to stat cache file")
}
fh.file.setSize(fi.Size())
_ = fh._size()
return n, err
}
@ -477,7 +330,11 @@ func (fh *RWFileHandle) _writeAt(b []byte, off int64) (n int, err error) {
func (fh *RWFileHandle) WriteAt(b []byte, off int64) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh._writeAt(b, off)
n, err = fh._writeAt(b, off)
if fh.flags&os.O_APPEND != 0 {
fh.offset += int64(n)
}
return n, err
}
// Write bytes to the file
@ -494,6 +351,17 @@ func (fh *RWFileHandle) WriteString(s string) (n int, err error) {
return fh.Write([]byte(s))
}
// Truncate file to given size
//
// Call with mutex held
func (fh *RWFileHandle) _truncate(size int64) (err error) {
if size == fh._size() {
return nil
}
fh.file.setSize(size)
return fh.item.Truncate(size)
}
// Truncate file to given size
func (fh *RWFileHandle) Truncate(size int64) (err error) {
fh.mu.Lock()
@ -501,12 +369,10 @@ func (fh *RWFileHandle) Truncate(size int64) (err error) {
if fh.closed {
return ECLOSED
}
if err = fh.openPending(size == 0); err != nil {
if err = fh.openPending(); err != nil {
return err
}
fh.changed = true
fh.file.setSize(size)
return fh.fd.Truncate(size)
return fh._truncate(size)
}
// Sync commits the current contents of the file to stable storage. Typically,
@ -521,10 +387,10 @@ func (fh *RWFileHandle) Sync() error {
if !fh.opened {
return nil
}
if fh.flags&accessModeMask == os.O_RDONLY {
if fh.readOnly() {
return nil
}
return fh.fd.Sync()
return fh.item.Sync()
}
func (fh *RWFileHandle) logPrefix() string {
@ -549,7 +415,7 @@ func (fh *RWFileHandle) Chown(uid, gid int) error {
// Fd returns the integer Unix file descriptor referencing the open file.
func (fh *RWFileHandle) Fd() uintptr {
return fh.fd.Fd()
return 0xdeadbeef // FIXME
}
// Name returns the name of the file from the underlying Object.

View file

@ -91,15 +91,9 @@ func TestRWFileHandleMethodsRead(t *testing.T) {
// Size
assert.Equal(t, int64(16), fh.Size())
// No opens yet
assert.Equal(t, 0, fh.file.rwOpens())
// Read 1
assert.Equal(t, "0", rwReadString(t, fh, 1))
// Open after the read
assert.Equal(t, 1, fh.file.rwOpens())
// Read remainder
assert.Equal(t, "123456789abcdef", rwReadString(t, fh, 256))
@ -124,9 +118,6 @@ func TestRWFileHandleMethodsRead(t *testing.T) {
assert.Equal(t, nil, fh.Close())
assert.True(t, fh.closed)
// No opens again
assert.Equal(t, 0, fh.file.rwOpens())
// Close again
assert.Equal(t, ECLOSED, fh.Close())
}
@ -292,10 +283,6 @@ func TestRWFileHandleMethodsWrite(t *testing.T) {
vfs, fh := rwHandleCreateWriteOnly(t, r)
defer cleanup(t, r, vfs)
// 1 opens since we opened with O_CREATE and the file didn't
// exist in the cache
assert.Equal(t, 1, fh.file.rwOpens())
// String
assert.Equal(t, "file1 (rw)", fh.String())
assert.Equal(t, "<nil *RWFileHandle>", (*RWFileHandle)(nil).String())
@ -323,9 +310,6 @@ func TestRWFileHandleMethodsWrite(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 5, n)
// Open after the write
assert.Equal(t, 1, fh.file.rwOpens())
// Offset #2
assert.Equal(t, int64(5), offset())
assert.Equal(t, int64(5), node.Size())
@ -356,9 +340,6 @@ func TestRWFileHandleMethodsWrite(t *testing.T) {
// Close
assert.NoError(t, fh.Close())
// No opens again
assert.Equal(t, 0, fh.file.rwOpens())
// Check double close
err = fh.Close()
assert.Equal(t, ECLOSED, err)
@ -388,7 +369,6 @@ func TestRWFileHandleWriteAt(t *testing.T) {
assert.Equal(t, int64(0), offset())
assert.True(t, fh.opened)
assert.False(t, fh.writeCalled)
assert.True(t, fh.changed)
// Write the data
n, err := fh.WriteAt([]byte("hello**"), 0)
@ -468,6 +448,7 @@ func TestRWFileHandleFlushWrite(t *testing.T) {
n, err := fh.Write([]byte("hello"))
assert.NoError(t, err)
assert.Equal(t, 5, n)
assert.True(t, fh.opened)
// Check Flush does not close file if write called
err = fh.Flush()
@ -601,7 +582,7 @@ func testRWFileHandleOpenTest(t *testing.T, vfs *VFS, test *openTest) {
// first try with file not existing
_, err := vfs.Stat(fileName)
require.True(t, os.IsNotExist(err), test.what)
require.True(t, os.IsNotExist(err))
f, openNonExistentErr := vfs.OpenFile(fileName, test.flags, 0666)
@ -617,16 +598,16 @@ func testRWFileHandleOpenTest(t *testing.T, vfs *VFS, test *openTest) {
// close
err = f.Close()
require.NoError(t, err, test.what)
require.NoError(t, err)
}
// write the file
f, err = vfs.OpenFile(fileName, os.O_WRONLY|os.O_CREATE, 0777)
require.NoError(t, err, test.what)
require.NoError(t, err)
_, err = f.Write([]byte("hello"))
require.NoError(t, err, test.what)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err, test.what)
require.NoError(t, err)
// then open file and try with file existing
@ -643,32 +624,32 @@ func testRWFileHandleOpenTest(t *testing.T, vfs *VFS, test *openTest) {
// close
err = f.Close()
require.NoError(t, err, test.what)
require.NoError(t, err)
}
// read the file
f, err = vfs.OpenFile(fileName, os.O_RDONLY, 0)
require.NoError(t, err, test.what)
require.NoError(t, err)
buf, err := ioutil.ReadAll(f)
require.NoError(t, err, test.what)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err, test.what)
require.NoError(t, err)
contents := string(buf)
// remove file
node, err := vfs.Stat(fileName)
require.NoError(t, err, test.what)
require.NoError(t, err)
err = node.Remove()
require.NoError(t, err, test.what)
require.NoError(t, err)
// check
assert.Equal(t, test.openNonExistentErr, openNonExistentErr, "openNonExistentErr: %s: want=%v, got=%v", test.what, test.openNonExistentErr, openNonExistentErr)
assert.Equal(t, test.readNonExistentErr, readNonExistentErr, "readNonExistentErr: %s: want=%v, got=%v", test.what, test.readNonExistentErr, readNonExistentErr)
assert.Equal(t, test.writeNonExistentErr, writeNonExistentErr, "writeNonExistentErr: %s: want=%v, got=%v", test.what, test.writeNonExistentErr, writeNonExistentErr)
assert.Equal(t, test.openExistingErr, openExistingErr, "openExistingErr: %s: want=%v, got=%v", test.what, test.openExistingErr, openExistingErr)
assert.Equal(t, test.readExistingErr, readExistingErr, "readExistingErr: %s: want=%v, got=%v", test.what, test.readExistingErr, readExistingErr)
assert.Equal(t, test.writeExistingErr, writeExistingErr, "writeExistingErr: %s: want=%v, got=%v", test.what, test.writeExistingErr, writeExistingErr)
assert.Equal(t, test.contents, contents, test.what)
assert.Equal(t, test.openNonExistentErr, openNonExistentErr, "openNonExistentErr: want=%v, got=%v", test.openNonExistentErr, openNonExistentErr)
assert.Equal(t, test.readNonExistentErr, readNonExistentErr, "readNonExistentErr: want=%v, got=%v", test.readNonExistentErr, readNonExistentErr)
assert.Equal(t, test.writeNonExistentErr, writeNonExistentErr, "writeNonExistentErr: want=%v, got=%v", test.writeNonExistentErr, writeNonExistentErr)
assert.Equal(t, test.openExistingErr, openExistingErr, "openExistingErr: want=%v, got=%v", test.openExistingErr, openExistingErr)
assert.Equal(t, test.readExistingErr, readExistingErr, "readExistingErr: want=%v, got=%v", test.readExistingErr, readExistingErr)
assert.Equal(t, test.writeExistingErr, writeExistingErr, "writeExistingErr: want=%v, got=%v", test.writeExistingErr, writeExistingErr)
assert.Equal(t, test.contents, contents)
}
func TestRWFileHandleOpenTests(t *testing.T) {
@ -679,7 +660,9 @@ func TestRWFileHandleOpenTests(t *testing.T) {
defer cleanup(t, r, vfs)
for _, test := range openTests {
testRWFileHandleOpenTest(t, vfs, &test)
t.Run(test.what, func(t *testing.T) {
testRWFileHandleOpenTest(t, vfs, &test)
})
}
}

513
vfs/vfscache/cache.go Normal file
View file

@ -0,0 +1,513 @@
// Package vfscache deals with caching of files locally for the VFS layer
package vfscache
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
fscache "github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/vfs/vfscommon"
)
// NB as Cache and Item are tightly linked it is necessary to have a
// total lock ordering between them. So Cache.mu must always be
// taken before Item.mu to avoid deadlocks.
//
// Cache may call into Item but care is needed if Item calls Cache
// FIXME size in cache needs to be size on disk if we have sparse files...
// Cache opened files
type Cache struct {
// read only - no locking needed to read these
fremote fs.Fs // fs for the remote we are caching
fcache fs.Fs // fs for the cache directory
fcacheMeta fs.Fs // fs for the cache metadata directory
opt *vfscommon.Options // vfs Options
root string // root of the cache directory
metaRoot string // root of the cache metadata directory
hashType hash.Type // hash to use locally and remotely
hashOption *fs.HashesOption // corresponding OpenOption
mu sync.Mutex // protects the following variables
item map[string]*Item // files/directories in the cache
used int64 // total size of files in the cache
}
// New creates a new cache heirachy for fremote
//
// This starts background goroutines which can be cancelled with the
// context passed in.
func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options) (*Cache, error) {
fRoot := filepath.FromSlash(fremote.Root())
if runtime.GOOS == "windows" {
if strings.HasPrefix(fRoot, `\\?`) {
fRoot = fRoot[3:]
}
fRoot = strings.Replace(fRoot, ":", "", -1)
}
root := filepath.Join(config.CacheDir, "vfs", fremote.Name(), fRoot)
fs.Debugf(nil, "vfs cache root is %q", root)
metaRoot := filepath.Join(config.CacheDir, "vfsMeta", fremote.Name(), fRoot)
fs.Debugf(nil, "vfs metadata cache root is %q", root)
fcache, err := fscache.Get(root)
if err != nil {
return nil, errors.Wrap(err, "failed to create cache remote")
}
fcacheMeta, err := fscache.Get(root)
if err != nil {
return nil, errors.Wrap(err, "failed to create cache meta remote")
}
hashType, hashOption := operations.CommonHash(fcache, fremote)
c := &Cache{
fremote: fremote,
fcache: fcache,
fcacheMeta: fcacheMeta,
opt: opt,
root: root,
metaRoot: metaRoot,
item: make(map[string]*Item),
hashType: hashType,
hashOption: hashOption,
}
// Make sure cache directories exist
_, err = c.mkdir("")
if err != nil {
return nil, errors.Wrap(err, "failed to make cache directory")
}
// load in the cache and metadata off disk
err = c.reload()
if err != nil {
return nil, errors.Wrap(err, "failed to load cache")
}
// Remove any empty directories
c.purgeEmptyDirs()
go c.cleaner(ctx)
return c, nil
}
// clean returns the cleaned version of name for use in the index map
//
// name should be a remote path not an osPath
func clean(name string) string {
name = strings.Trim(name, "/")
name = path.Clean(name)
if name == "." || name == "/" {
name = ""
}
return name
}
// toOSPath turns a remote relative name into an OS path in the cache
func (c *Cache) toOSPath(name string) string {
return filepath.Join(c.root, filepath.FromSlash(name))
}
// toOSPathMeta turns a remote relative name into an OS path in the
// cache for the metadata
func (c *Cache) toOSPathMeta(name string) string {
return filepath.Join(c.metaRoot, filepath.FromSlash(name))
}
// mkdir makes the directory for name in the cache and returns an os
// path for the file
func (c *Cache) mkdir(name string) (string, error) {
parent := vfscommon.FindParent(name)
leaf := filepath.Base(name)
parentPath := c.toOSPath(parent)
err := os.MkdirAll(parentPath, 0700)
if err != nil {
return "", errors.Wrap(err, "make cache directory failed")
}
parentPathMeta := c.toOSPathMeta(parent)
err = os.MkdirAll(parentPathMeta, 0700)
if err != nil {
return "", errors.Wrap(err, "make cache meta directory failed")
}
return filepath.Join(parentPath, leaf), nil
}
// _get gets name from the cache or creates a new one
//
// It returns the item and found as to whether this item was found in
// the cache (or just created).
//
// name should be a remote path not an osPath
//
// must be called with mu held
func (c *Cache) _get(name string) (item *Item, found bool) {
item = c.item[name]
found = item != nil
if !found {
item = newItem(c, name)
c.item[name] = item
}
return item, found
}
// put puts item under name in the cache
//
// It returns an old item if there was one or nil if not.
//
// name should be a remote path not an osPath
func (c *Cache) put(name string, item *Item) (oldItem *Item) {
name = clean(name)
c.mu.Lock()
oldItem = c.item[name]
if oldItem != item {
c.item[name] = item
} else {
oldItem = nil
}
c.mu.Unlock()
return oldItem
}
// InUse returns whether the name is in use in the cache
//
// name should be a remote path not an osPath
func (c *Cache) InUse(name string) bool {
name = clean(name)
c.mu.Lock()
item := c.item[name]
c.mu.Unlock()
if item == nil {
return false
}
return item.inUse()
}
// get gets a file name from the cache or creates a new one
//
// It returns the item and found as to whether this item was found in
// the cache (or just created).
//
// name should be a remote path not an osPath
func (c *Cache) get(name string) (item *Item, found bool) {
name = clean(name)
c.mu.Lock()
item, found = c._get(name)
c.mu.Unlock()
return item, found
}
// Item gets a cache item for name
//
// To use it item.Open will need to be called
//
// name should be a remote path not an osPath
func (c *Cache) Item(name string) (item *Item) {
item, _ = c.get(name)
return item
}
// Exists checks to see if the file exists in the cache or not
//
// FIXME check the metadata exists here too?
func (c *Cache) Exists(name string) bool {
osPath := c.toOSPath(name)
fi, err := os.Stat(osPath)
if err != nil {
return false
}
// checks for non-regular files (e.g. directories, symlinks, devices, etc.)
if !fi.Mode().IsRegular() {
return false
}
return true
}
// rename with os.Rename and more checking
func rename(osOldPath, osNewPath string) error {
sfi, err := os.Stat(osOldPath)
if err != nil {
// Just do nothing if the source does not exist
if os.IsNotExist(err) {
return nil
}
return errors.Wrapf(err, "Failed to stat source: %s", osOldPath)
}
if !sfi.Mode().IsRegular() {
// cannot copy non-regular files (e.g., directories, symlinks, devices, etc.)
return errors.Errorf("Non-regular source file: %s (%q)", sfi.Name(), sfi.Mode().String())
}
dfi, err := os.Stat(osNewPath)
if err != nil {
if !os.IsNotExist(err) {
return errors.Wrapf(err, "Failed to stat destination: %s", osNewPath)
}
parent := vfscommon.OsFindParent(osNewPath)
err = os.MkdirAll(parent, 0700)
if err != nil {
return errors.Wrapf(err, "Failed to create parent dir: %s", parent)
}
} else {
if !(dfi.Mode().IsRegular()) {
return errors.Errorf("Non-regular destination file: %s (%q)", dfi.Name(), dfi.Mode().String())
}
if os.SameFile(sfi, dfi) {
return nil
}
}
if err = os.Rename(osOldPath, osNewPath); err != nil {
return errors.Wrapf(err, "Failed to rename in cache: %s to %s", osOldPath, osNewPath)
}
return nil
}
// Rename the item in cache
func (c *Cache) Rename(name string, newName string, newObj fs.Object) (err error) {
item, _ := c.get(name)
err = item.rename(name, newName, newObj)
if err != nil {
return err
}
// Move the item in the cache
c.mu.Lock()
if item, ok := c.item[name]; ok {
c.item[newName] = item
delete(c.item, name)
}
c.mu.Unlock()
fs.Infof(name, "Renamed in cache to %q", newName)
return nil
}
// Remove should be called if name is deleted
func (c *Cache) Remove(name string) {
item, _ := c.get(name)
item.remove("file deleted")
}
// SetModTime should be called to set the modification time of the cache file
func (c *Cache) SetModTime(name string, modTime time.Time) {
item, _ := c.get(name)
item.setModTime(modTime)
}
// CleanUp empties the cache of everything
func (c *Cache) CleanUp() error {
err1 := os.RemoveAll(c.root)
err2 := os.RemoveAll(c.metaRoot)
if err1 != nil {
return err1
}
return err2
}
// walk walks the cache calling the function
func (c *Cache) walk(dir string, fn func(osPath string, fi os.FileInfo, name string) error) error {
return filepath.Walk(dir, func(osPath string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
// Find path relative to the cache root
name, err := filepath.Rel(dir, osPath)
if err != nil {
return errors.Wrap(err, "filepath.Rel failed in walk")
}
if name == "." {
name = ""
}
// And convert into slashes
name = filepath.ToSlash(name)
return fn(osPath, fi, name)
})
}
// reload walks the cache loading metadata files
func (c *Cache) reload() error {
err := c.walk(c.root, func(osPath string, fi os.FileInfo, name string) error {
if !fi.IsDir() {
_, _ = c.get(name)
}
return nil
})
if err != nil {
return errors.Wrap(err, "failed to walk cache")
}
err = c.walk(c.root, func(osPathMeta string, fi os.FileInfo, name string) error {
if !fi.IsDir() {
_, _ = c.get(name)
}
return nil
})
if err != nil {
return errors.Wrap(err, "failed to walk meta cache")
}
return err
}
// purgeOld gets rid of any files that are over age
func (c *Cache) purgeOld(maxAge time.Duration) {
c._purgeOld(maxAge, func(item *Item) {
item.remove("too old")
})
}
func (c *Cache) _purgeOld(maxAge time.Duration, remove func(item *Item)) {
c.mu.Lock()
defer c.mu.Unlock()
cutoff := time.Now().Add(-maxAge)
for name, item := range c.item {
if !item.inUse() {
// If not locked and access time too long ago - delete the file
dt := item.getATime().Sub(cutoff)
// fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.info.ATime, cutoff, dt)
if dt < 0 {
remove(item)
// Remove the entry
delete(c.item, name)
}
}
}
}
// Purge any empty directories
func (c *Cache) purgeEmptyDirs() {
ctx := context.Background()
err := operations.Rmdirs(ctx, c.fcache, "", true)
if err != nil {
fs.Errorf(c.fcache, "Failed to remove empty directories from cache: %v", err)
}
err = operations.Rmdirs(ctx, c.fcacheMeta, "", true)
if err != nil {
fs.Errorf(c.fcache, "Failed to remove empty directories from metadata cache: %v", err)
}
}
// Remove any files that are over quota starting from the
// oldest first
func (c *Cache) purgeOverQuota(quota int64) {
c._purgeOverQuota(quota, func(item *Item) {
item.remove("over quota")
})
}
// updateUsed updates c.used so it is accurate
func (c *Cache) updateUsed() {
c.mu.Lock()
defer c.mu.Unlock()
newUsed := int64(0)
for _, item := range c.item {
newUsed += item.getDiskSize()
}
c.used = newUsed
}
func (c *Cache) _purgeOverQuota(quota int64, remove func(item *Item)) {
c.updateUsed()
c.mu.Lock()
defer c.mu.Unlock()
if quota <= 0 || c.used < quota {
return
}
var items Items
// Make a slice of unused files
for _, item := range c.item {
if !item.inUse() {
items = append(items, item)
}
}
sort.Sort(items)
// Remove items until the quota is OK
for _, item := range items {
if c.used < quota {
break
}
c.used -= item.getDiskSize()
remove(item)
// Remove the entry
delete(c.item, item.name)
}
}
// clean empties the cache of stuff if it can
func (c *Cache) clean() {
// Cache may be empty so end
_, err := os.Stat(c.root)
if os.IsNotExist(err) {
return
}
c.mu.Lock()
oldItems, oldUsed := len(c.item), fs.SizeSuffix(c.used)
c.mu.Unlock()
// Remove any files that are over age
c.purgeOld(c.opt.CacheMaxAge)
// Now remove any files that are over quota starting from the
// oldest first
c.purgeOverQuota(int64(c.opt.CacheMaxSize))
// Stats
c.mu.Lock()
newItems, newUsed := len(c.item), fs.SizeSuffix(c.used)
c.mu.Unlock()
fs.Infof(nil, "Cleaned the cache: objects %d (was %d), total size %v (was %v)", newItems, oldItems, newUsed, oldUsed)
}
// cleaner calls clean at regular intervals
//
// doesn't return until context is cancelled
func (c *Cache) cleaner(ctx context.Context) {
if c.opt.CachePollInterval <= 0 {
fs.Debugf(nil, "Cache cleaning thread disabled because poll interval <= 0")
return
}
// Start cleaning the cache immediately
c.clean()
// Then every interval specified
timer := time.NewTicker(c.opt.CachePollInterval)
defer timer.Stop()
for {
select {
case <-timer.C:
c.clean()
case <-ctx.Done():
fs.Debugf(nil, "cache cleaner exiting")
return
}
}
}
// Check the local file is up to date in the cache
func (c *Cache) Check(ctx context.Context, o fs.Object, remote string) (err error) {
defer log.Trace(o, "remote=%q", remote)("err=%v", &err)
item, _ := c.get(remote)
return item.checkObject(o)
}

472
vfs/vfscache/cache_test.go Normal file
View file

@ -0,0 +1,472 @@
package vfscache
import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"testing"
"time"
_ "github.com/rclone/rclone/backend/local" // import the local backend
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestMain drives the tests
func TestMain(m *testing.M) {
fstest.TestMain(m)
}
// convert c.item to a string
func itemAsString(c *Cache) []string {
c.mu.Lock()
defer c.mu.Unlock()
var out []string
for name, item := range c.item {
out = append(out, fmt.Sprintf("name=%q opens=%d size=%d", filepath.ToSlash(name), item.opens, item.info.Size))
}
sort.Strings(out)
return out
}
func TestCacheNew(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// FIXME need to be writing to the actual file here
// need a item.WriteAt/item.ReadAt method I think
// Disable the cache cleaner as it interferes with these tests
opt := vfscommon.DefaultOpt
opt.CachePollInterval = 0
c, err := New(ctx, r.Fremote, &opt)
require.NoError(t, err)
assert.Contains(t, c.root, "vfs")
assert.Contains(t, c.fcache.Root(), filepath.Base(r.Fremote.Root()))
assert.Equal(t, []string(nil), itemAsString(c))
// mkdir
p, err := c.mkdir("potato")
require.NoError(t, err)
assert.Equal(t, "potato", filepath.Base(p))
assert.Equal(t, []string(nil), itemAsString(c))
fi, err := os.Stat(filepath.Dir(p))
require.NoError(t, err)
assert.True(t, fi.IsDir())
// get
item, _ := c.get("potato")
item2, _ := c.get("potato")
assert.Equal(t, item, item2)
assert.WithinDuration(t, time.Now(), item.info.ATime, time.Second)
// open
assert.Equal(t, []string{
`name="potato" opens=0 size=0`,
}, itemAsString(c))
potato := c.Item("/potato")
require.NoError(t, potato.Open(nil))
assert.Equal(t, []string{
`name="potato" opens=1 size=0`,
}, itemAsString(c))
assert.WithinDuration(t, time.Now(), potato.info.ATime, time.Second)
assert.Equal(t, 1, potato.opens)
// write the file
require.NoError(t, potato.Truncate(5))
atime := time.Now()
potato.info.ATime = atime
// err = ioutil.WriteFile(p, []byte("hello"), 0600)
// require.NoError(t, err)
// read its atime
// updateAtimes
//potato.ATime = time.Now().Add(-24 * time.Hour)
assert.Equal(t, []string{
`name="potato" opens=1 size=5`,
}, itemAsString(c))
assert.True(t, atime.Equal(potato.info.ATime), fmt.Sprintf("%v != %v", atime, potato.info.ATime))
// try purging with file open
c.purgeOld(10 * time.Second)
// _, err = os.Stat(p)
// assert.NoError(t, err)
// close
assert.Equal(t, []string{
`name="potato" opens=1 size=5`,
}, itemAsString(c))
require.NoError(t, potato.Truncate(6))
assert.Equal(t, []string{
`name="potato" opens=1 size=6`,
}, itemAsString(c))
require.NoError(t, potato.Close(nil))
assert.Equal(t, []string{
`name="potato" opens=0 size=6`,
}, itemAsString(c))
item, _ = c.get("potato")
assert.WithinDuration(t, time.Now(), item.info.ATime, time.Second)
assert.Equal(t, 0, item.opens)
// try purging with file closed
c.purgeOld(10 * time.Second)
// ...nothing should happend
// _, err = os.Stat(p)
// assert.NoError(t, err)
//.. purge again with -ve age
c.purgeOld(-10 * time.Second)
_, err = os.Stat(p)
assert.True(t, os.IsNotExist(err))
// clean - have tested the internals already
c.clean()
// cleanup
err = c.CleanUp()
require.NoError(t, err)
_, err = os.Stat(c.root)
assert.True(t, os.IsNotExist(err))
}
func TestCacheOpens(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt)
require.NoError(t, err)
defer func() { require.NoError(t, c.CleanUp()) }()
assert.Equal(t, []string(nil), itemAsString(c))
potato := c.Item("potato")
require.NoError(t, potato.Open(nil))
assert.Equal(t, []string{
`name="potato" opens=1 size=0`,
}, itemAsString(c))
require.NoError(t, potato.Open(nil))
assert.Equal(t, []string{
`name="potato" opens=2 size=0`,
}, itemAsString(c))
require.NoError(t, potato.Close(nil))
assert.Equal(t, []string{
`name="potato" opens=1 size=0`,
}, itemAsString(c))
require.NoError(t, potato.Close(nil))
assert.Equal(t, []string{
`name="potato" opens=0 size=0`,
}, itemAsString(c))
require.NoError(t, potato.Open(nil))
a1 := c.Item("a//b/c/d/one")
a2 := c.Item("a/b/c/d/e/two")
a3 := c.Item("a/b/c/d/e/f/three")
require.NoError(t, a1.Open(nil))
require.NoError(t, a2.Open(nil))
require.NoError(t, a3.Open(nil))
assert.Equal(t, []string{
`name="a/b/c/d/e/f/three" opens=1 size=0`,
`name="a/b/c/d/e/two" opens=1 size=0`,
`name="a/b/c/d/one" opens=1 size=0`,
`name="potato" opens=1 size=0`,
}, itemAsString(c))
require.NoError(t, potato.Close(nil))
require.NoError(t, a1.Close(nil))
require.NoError(t, a2.Close(nil))
require.NoError(t, a3.Close(nil))
assert.Equal(t, []string{
`name="a/b/c/d/e/f/three" opens=0 size=0`,
`name="a/b/c/d/e/two" opens=0 size=0`,
`name="a/b/c/d/one" opens=0 size=0`,
`name="potato" opens=0 size=0`,
}, itemAsString(c))
}
// test the open, mkdir, purge, close, purge sequence
func TestCacheOpenMkdir(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Disable the cache cleaner as it interferes with these tests
opt := vfscommon.DefaultOpt
opt.CachePollInterval = 0
c, err := New(ctx, r.Fremote, &opt)
require.NoError(t, err)
defer func() { require.NoError(t, c.CleanUp()) }()
// open
potato := c.Item("sub/potato")
require.NoError(t, potato.Open(nil))
assert.Equal(t, []string{
`name="sub/potato" opens=1 size=0`,
}, itemAsString(c))
// mkdir
p, err := c.mkdir("sub/potato")
require.NoError(t, err)
assert.Equal(t, "potato", filepath.Base(p))
assert.Equal(t, []string{
`name="sub/potato" opens=1 size=0`,
}, itemAsString(c))
// test directory exists
fi, err := os.Stat(filepath.Dir(p))
require.NoError(t, err)
assert.True(t, fi.IsDir())
// clean the cache
c.purgeOld(-10 * time.Second)
// test directory still exists
fi, err = os.Stat(filepath.Dir(p))
require.NoError(t, err)
assert.True(t, fi.IsDir())
// close
require.NoError(t, potato.Close(nil))
assert.Equal(t, []string{
`name="sub/potato" opens=0 size=0`,
}, itemAsString(c))
// clean the cache
c.purgeOld(-10 * time.Second)
c.purgeEmptyDirs()
assert.Equal(t, []string(nil), itemAsString(c))
// FIXME test directory does not exist
// _, err = os.Stat(filepath.Dir(p))
// require.True(t, os.IsNotExist(err))
}
func TestCachePurgeOld(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt)
require.NoError(t, err)
defer func() { require.NoError(t, c.CleanUp()) }()
// Test funcs
var removed []string
//removedDir := true
removeFile := func(item *Item) {
removed = append(removed, item.name)
item._remove("TestCachePurgeOld")
}
// removeDir := func(name string) bool {
// if removedDir {
// removed = append(removed, filepath.ToSlash(name)+"/")
// }
// return removedDir
// }
removed = nil
c._purgeOld(-10*time.Second, removeFile)
// FIXME c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string(nil), removed)
potato2 := c.Item("sub/dir2/potato2")
require.NoError(t, potato2.Open(nil))
potato := c.Item("sub/dir/potato")
require.NoError(t, potato.Open(nil))
require.NoError(t, potato2.Close(nil))
require.NoError(t, potato.Open(nil))
assert.Equal(t, []string{
`name="sub/dir/potato" opens=2 size=0`,
`name="sub/dir2/potato2" opens=0 size=0`,
}, itemAsString(c))
removed = nil
// removedDir = true
c._purgeOld(-10*time.Second, removeFile)
// FIXME c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string{
"sub/dir2/potato2",
}, removed)
require.NoError(t, potato.Close(nil))
removed = nil
// removedDir = true
c._purgeOld(-10*time.Second, removeFile)
// FIXME c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string(nil), removed)
require.NoError(t, potato.Close(nil))
assert.Equal(t, []string{
`name="sub/dir/potato" opens=0 size=0`,
}, itemAsString(c))
removed = nil
// removedDir = false
c._purgeOld(10*time.Second, removeFile)
// FIXME c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string(nil), removed)
assert.Equal(t, []string{
`name="sub/dir/potato" opens=0 size=0`,
}, itemAsString(c))
removed = nil
// removedDir = true
c._purgeOld(-10*time.Second, removeFile)
// FIXME c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string{
"sub/dir/potato",
}, removed)
assert.Equal(t, []string(nil), itemAsString(c))
}
func TestCachePurgeOverQuota(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Disable the cache cleaner as it interferes with these tests
opt := vfscommon.DefaultOpt
opt.CachePollInterval = 0
c, err := New(ctx, r.Fremote, &opt)
require.NoError(t, err)
// Test funcs
var removed []string
remove := func(item *Item) {
removed = append(removed, item.name)
item._remove("TestCachePurgeOverQuota")
}
removed = nil
c._purgeOverQuota(-1, remove)
assert.Equal(t, []string(nil), removed)
removed = nil
c._purgeOverQuota(0, remove)
assert.Equal(t, []string(nil), removed)
removed = nil
c._purgeOverQuota(1, remove)
assert.Equal(t, []string(nil), removed)
// Make some test files
potato := c.Item("sub/dir/potato")
require.NoError(t, potato.Open(nil))
require.NoError(t, potato.Truncate(5))
potato2 := c.Item("sub/dir2/potato2")
require.NoError(t, potato2.Open(nil))
require.NoError(t, potato2.Truncate(6))
assert.Equal(t, []string{
`name="sub/dir/potato" opens=1 size=5`,
`name="sub/dir2/potato2" opens=1 size=6`,
}, itemAsString(c))
// Check nothing removed
removed = nil
c._purgeOverQuota(1, remove)
assert.Equal(t, []string(nil), removed)
// Close the files
require.NoError(t, potato.Close(nil))
require.NoError(t, potato2.Close(nil))
assert.Equal(t, []string{
`name="sub/dir/potato" opens=0 size=5`,
`name="sub/dir2/potato2" opens=0 size=6`,
}, itemAsString(c))
// Update the stats to read the total size
c.updateUsed()
// make potato2 definitely after potato
t1 := time.Now().Add(10 * time.Second)
require.NoError(t, potato2.Truncate(6))
potato2.info.ATime = t1
// Check only potato removed to get below quota
removed = nil
c._purgeOverQuota(10, remove)
assert.Equal(t, []string{
"sub/dir/potato",
}, removed)
assert.Equal(t, int64(6), c.used)
assert.Equal(t, []string{
`name="sub/dir2/potato2" opens=0 size=6`,
}, itemAsString(c))
// Put potato back
potato = c.Item("sub/dir/potato")
require.NoError(t, potato.Open(nil))
require.NoError(t, potato.Truncate(5))
require.NoError(t, potato.Close(nil))
// Update the stats to read the total size
c.updateUsed()
assert.Equal(t, []string{
`name="sub/dir/potato" opens=0 size=5`,
`name="sub/dir2/potato2" opens=0 size=6`,
}, itemAsString(c))
// make potato definitely after potato2
t2 := t1.Add(20 * time.Second)
require.NoError(t, potato.Truncate(5))
potato.info.ATime = t2
// Check only potato2 removed to get below quota
removed = nil
c._purgeOverQuota(10, remove)
assert.Equal(t, []string{
"sub/dir2/potato2",
}, removed)
assert.Equal(t, int64(5), c.used)
c.purgeEmptyDirs()
assert.Equal(t, []string{
`name="sub/dir/potato" opens=0 size=5`,
}, itemAsString(c))
// Now purge everything
removed = nil
c._purgeOverQuota(1, remove)
assert.Equal(t, []string{
"sub/dir/potato",
}, removed)
assert.Equal(t, int64(0), c.used)
c.purgeEmptyDirs()
assert.Equal(t, []string(nil), itemAsString(c))
// Check nothing left behind
c.clean()
assert.Equal(t, int64(0), c.used)
assert.Equal(t, []string(nil), itemAsString(c))
}

310
vfs/vfscache/downloader.go Normal file
View file

@ -0,0 +1,310 @@
package vfscache
import (
"context"
"io"
"io/ioutil"
"os"
"sync"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/accounting"
"github.com/rclone/rclone/fs/asyncreader"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/ranges"
"github.com/rclone/rclone/lib/readers"
)
// downloader represents a running download for a file
type downloader struct {
// write only
mu sync.Mutex
ctx context.Context
item *Item
src fs.Object // source object
fcache fs.Fs // destination Fs
osPath string
// per download
out *os.File // file we are writing to
offset int64 // current offset
waiters []waiter
tr *accounting.Transfer
in *accounting.Account // input we are reading from
downloading bool // whether the download thread is running
finished chan struct{} // closed when download finished
}
// waiter is a range we are waiting for and a channel to signal
type waiter struct {
r ranges.Range
errChan chan<- error
}
func newDownloader(item *Item, fcache fs.Fs, remote string, src fs.Object) (dl *downloader, err error) {
defer log.Trace(src, "remote=%q", remote)("dl=%+v, err=%v", &dl, &err)
dl = &downloader{
ctx: context.Background(),
item: item,
src: src,
fcache: fcache,
osPath: item.c.toOSPath(remote),
}
// make sure there is a cache file
_, err = os.Stat(dl.osPath)
if err == nil {
// do nothing
} else if os.IsNotExist(err) {
fs.Debugf(src, "creating empty file")
err = item._truncateToCurrentSize()
if err != nil {
return nil, errors.Wrap(err, "newDownloader: failed to create empty file")
}
} else {
return nil, errors.Wrap(err, "newDownloader: failed to stat cache file")
}
return dl, nil
}
// close any waiters with the error passed in
//
// call with lock held
func (dl *downloader) _closeWaiters(err error) {
for _, waiter := range dl.waiters {
waiter.errChan <- err
}
dl.waiters = nil
}
// Write writes len(p) bytes from p to the underlying data stream. It
// returns the number of bytes written from p (0 <= n <= len(p)) and
// any error encountered that caused the write to stop early. Write
// must return a non-nil error if it returns n < len(p). Write must
// not modify the slice data, even temporarily.
//
// Implementations must not retain p.
func (dl *downloader) Write(p []byte) (n int, err error) {
defer log.Trace(dl.src, "p_len=%d", len(p))("n=%d, err=%v", &n, &err)
var (
// Range we wish to write
r = ranges.Range{Pos: dl.offset, Size: int64(len(p))}
curr ranges.Range
present bool
nn int
)
// Check to see what regions are already present
dl.mu.Lock()
defer dl.mu.Unlock()
dl.item.mu.Lock()
defer dl.item.mu.Unlock()
// Write the range out ignoring already written chunks
// FIXME might stop downloading if we are ignoring chunks?
for err == nil && !r.IsEmpty() {
curr, r, present = dl.item.info.Rs.Find(r)
if curr.Pos != dl.offset {
return n, errors.New("internal error: offset of range is wrong")
}
if present {
// if present want to skip this range
fs.Debugf(dl.src, "skip chunk offset=%d size=%d", dl.offset, curr.Size)
nn = int(curr.Size)
_, err = dl.out.Seek(curr.Size, io.SeekCurrent)
if err != nil {
nn = 0
}
} else {
// if range not present then we want to write it
fs.Debugf(dl.src, "write chunk offset=%d size=%d", dl.offset, curr.Size)
nn, err = dl.out.Write(p[:curr.Size])
dl.item.info.Rs.Insert(ranges.Range{Pos: dl.offset, Size: int64(nn)})
}
dl.offset += int64(nn)
p = p[nn:]
n += nn
}
if n > 0 {
if len(dl.waiters) > 0 {
newWaiters := dl.waiters[:0]
for _, waiter := range dl.waiters {
if dl.item.info.Rs.Present(waiter.r) {
waiter.errChan <- nil
} else {
newWaiters = append(newWaiters, waiter)
}
}
dl.waiters = newWaiters
}
}
if err != nil && err != io.EOF {
dl._closeWaiters(err)
}
return n, err
}
// start the download running from offset
func (dl *downloader) start(offset int64) (err error) {
err = dl.open(offset)
if err != nil {
_ = dl.close(err)
return errors.Wrap(err, "failed to open downloader")
}
go func() {
err := dl.download()
_ = dl.close(err)
if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned {
fs.Errorf(dl.src, "Failed to download: %v", err)
// FIXME set an error here????
}
}()
return nil
}
// open the file from offset
//
// should be called on a fresh downloader
func (dl *downloader) open(offset int64) (err error) {
defer log.Trace(dl.src, "offset=%d", offset)("err=%v", &err)
dl.finished = make(chan struct{})
defer close(dl.finished)
dl.downloading = true
dl.tr = accounting.Stats(dl.ctx).NewTransfer(dl.src)
size := dl.src.Size()
if size < 0 {
// FIXME should just completely download these
return errors.New("can't open unknown sized file")
}
// FIXME hashType needs to ignore when --no-checksum is set too? Which is a VFS flag.
var rangeOption *fs.RangeOption
if offset > 0 {
rangeOption = &fs.RangeOption{Start: offset, End: size - 1}
}
in0, err := operations.NewReOpen(dl.ctx, dl.src, fs.Config.LowLevelRetries, dl.item.c.hashOption, rangeOption)
if err != nil {
return errors.Wrap(err, "vfs reader: failed to open source file")
}
dl.in = dl.tr.Account(in0).WithBuffer() // account and buffer the transfer
dl.out, err = file.OpenFile(dl.osPath, os.O_CREATE|os.O_WRONLY, 0700)
if err != nil {
return errors.Wrap(err, "vfs reader: failed to open cache file")
}
dl.offset = offset
err = file.SetSparse(dl.out)
if err != nil {
fs.Debugf(dl.src, "vfs reader: failed to set as a sparse file: %v", err)
}
_, err = dl.out.Seek(offset, io.SeekStart)
if err != nil {
return errors.Wrap(err, "vfs reader: failed to seek")
}
// FIXME set mod time
// FIXME check checksums
return nil
}
var errStop = errors.New("vfs downloader: reading stopped")
// stop the downloader if running and close everything
func (dl *downloader) stop() {
defer log.Trace(dl.src, "")("")
dl.mu.Lock()
if !dl.downloading || dl.in == nil {
dl.mu.Unlock()
return
}
// stop the downloader
dl.in.StopBuffering()
oldReader := dl.in.GetReader()
dl.in.UpdateReader(ioutil.NopCloser(readers.ErrorReader{Err: errStop}))
err := oldReader.Close()
if err != nil {
fs.Debugf(dl.src, "vfs downloader: stop close old failed: %v", err)
}
dl.mu.Unlock()
// wait for downloader to finish...
<-dl.finished
}
func (dl *downloader) close(inErr error) (err error) {
defer log.Trace(dl.src, "inErr=%v", err)("err=%v", &err)
dl.stop()
dl.mu.Lock()
if dl.in != nil {
fs.CheckClose(dl.in, &err)
dl.in = nil
}
if dl.tr != nil {
dl.tr.Done(inErr)
dl.tr = nil
}
if dl.out != nil {
fs.CheckClose(dl.out, &err)
dl.out = nil
}
dl._closeWaiters(err)
dl.downloading = false
dl.mu.Unlock()
return nil
}
/*
FIXME
need gating at all the Read/Write sites
need to pass in offset somehow and start the readfile off
need to end when offset is reached
need to be able to quit on demand
Need offset to be passed to NewReOpen
*/
// fetch the (offset, size) block from the remote file
func (dl *downloader) download() (err error) {
defer log.Trace(dl.src, "")("err=%v", &err)
_, err = dl.in.WriteTo(dl)
if err != nil {
return errors.Wrap(err, "vfs reader: failed to write to cache file")
}
return nil
}
// ensure the range is present
func (dl *downloader) ensure(r ranges.Range) (err error) {
defer log.Trace(dl.src, "r=%+v", r)("err=%v", &err)
errChan := make(chan error)
waiter := waiter{
r: r,
errChan: errChan,
}
dl.mu.Lock()
// FIXME racey - might have finished here
dl.waiters = append(dl.waiters, waiter)
dl.mu.Unlock()
return <-errChan
}
// ensure the range is present
func (dl *downloader) running() bool {
dl.mu.Lock()
defer dl.mu.Unlock()
return dl.downloading
}

858
vfs/vfscache/item.go Normal file
View file

@ -0,0 +1,858 @@
package vfscache
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/log"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/lib/file"
"github.com/rclone/rclone/lib/ranges"
)
// NB as Cache and Item are tightly linked it is necessary to have a
// total lock ordering between them. So Cache.mu must always be
// taken before Item.mu to avoid deadlocks.
//
// Cache may call into Item but care is needed if Item calls Cache
//
// A lot of the Cache methods do not require locking, these include
//
// - Cache.toOSPath
// - Cache.toOSPathMeta
// - Cache.mkdir
// - Cache.objectFingerprint
// NB Item and downloader are tightly linked so it is necessary to
// have a total lock ordering between them. downloader.mu must always
// be taken before Item.mu. downloader may call into Item but Item may
// **not** call downloader methods with Item.mu held, except for
//
// - downloader.running
// Item is stored in the item map
//
// These are written to the backing store to store status
type Item struct {
// read only
c *Cache // cache this is part of
mu sync.Mutex // protect the variables
name string // name in the VFS
opens int // number of times file is open
downloader *downloader // if the file is being downloaded to cache
o fs.Object // object we are caching - may be nil
fd *os.File // handle we are using to read and write to the file
metaDirty bool // set if the info needs writeback
info Info // info about the file to persist to backing store
}
// Info is persisted to backing store
type Info struct {
ModTime time.Time // last time file was modified
ATime time.Time // last time file was accessed
Size int64 // size of the file
Rs ranges.Ranges // which parts of the file are present
Fingerprint string // fingerprint of remote object
Dirty bool // set if the backing file has been modifed
}
// Items are a slice of *Item ordered by ATime
type Items []*Item
func (v Items) Len() int { return len(v) }
func (v Items) Swap(i, j int) { v[i], v[j] = v[j], v[i] }
func (v Items) Less(i, j int) bool {
if i == j {
return false
}
iItem := v[i]
jItem := v[j]
iItem.mu.Lock()
defer iItem.mu.Unlock()
jItem.mu.Lock()
defer jItem.mu.Unlock()
return iItem.info.ATime.Before(jItem.info.ATime)
}
// clean the item after its cache file has been deleted
func (info *Info) clean() {
*info = Info{}
info.ModTime = time.Now()
info.ATime = info.ModTime
}
// StoreFn is called back with an object after it has been uploaded
type StoreFn func(fs.Object)
// newItem returns an item for the cache
func newItem(c *Cache, name string) (item *Item) {
now := time.Now()
item = &Item{
c: c,
name: name,
info: Info{
ModTime: now,
ATime: now,
},
}
// check the cache file exists
osPath := c.toOSPath(name)
fi, statErr := os.Stat(osPath)
if statErr != nil {
if os.IsNotExist(statErr) {
item._removeMeta("cache file doesn't exist")
} else {
item._remove(fmt.Sprintf("failed to stat cache file: %v", statErr))
}
}
// Try to load the metadata
exists, err := item.load()
if !exists {
item._removeFile("metadata doesn't exist")
} else if err != nil {
item._remove(fmt.Sprintf("failed to load metadata: %v", err))
}
// Get size estimate (which is best we can do until Open() called)
if statErr == nil {
item.info.Size = fi.Size()
}
return item
}
// inUse returns true if the item is open or dirty
func (item *Item) inUse() bool {
item.mu.Lock()
defer item.mu.Unlock()
return item.opens != 0 || item.metaDirty || item.info.Dirty
}
// getATime returns the ATime of the item
func (item *Item) getATime() time.Time {
item.mu.Lock()
defer item.mu.Unlock()
return item.info.ATime
}
// getDiskSize returns the size on disk (approximately) of the item
//
// We return the sizes of the chunks we have fetched, however there is
// likely to be some overhead which we are not taking into account.
func (item *Item) getDiskSize() int64 {
item.mu.Lock()
defer item.mu.Unlock()
return item.info.Rs.Size()
}
// load reads an item from the disk or returns nil if not found
func (item *Item) load() (exists bool, err error) {
item.mu.Lock()
defer item.mu.Unlock()
osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache
in, err := os.Open(osPathMeta)
if err != nil {
if os.IsNotExist(err) {
return false, err
}
return true, errors.Wrap(err, "vfs cache item: failed to read metadata")
}
defer fs.CheckClose(in, &err)
decoder := json.NewDecoder(in)
err = decoder.Decode(&item.info)
if err != nil {
return true, errors.Wrap(err, "vfs cache item: corrupt metadata")
}
item.metaDirty = false
return true, nil
}
// save writes an item to the disk
//
// call with the lock held
func (item *Item) _save() (err error) {
osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache
out, err := os.Create(osPathMeta)
if err != nil {
return errors.Wrap(err, "vfs cache item: failed to write metadata")
}
defer fs.CheckClose(out, &err)
encoder := json.NewEncoder(out)
encoder.SetIndent("", "\t")
err = encoder.Encode(item.info)
if err != nil {
return errors.Wrap(err, "vfs cache item: failed to encode metadata")
}
item.metaDirty = false
return nil
}
// truncate the item to the given size, creating it if necessary
//
// this does not mark the object as dirty
//
// call with the lock held
func (item *Item) _truncate(size int64) (err error) {
if size < 0 {
// FIXME ignore unknown length files
return nil
}
// Use open handle if available
fd := item.fd
if fd == nil {
osPath := item.c.toOSPath(item.name) // No locking in Cache
fd, err = file.OpenFile(osPath, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return errors.Wrap(err, "vfs item truncate: failed to open cache file")
}
defer fs.CheckClose(fd, &err)
err = file.SetSparse(fd)
if err != nil {
fs.Debugf(item.name, "vfs item truncate: failed to set as a sparse file: %v", err)
}
}
fs.Debugf(item.name, "vfs cache: truncate to size=%d", size)
err = fd.Truncate(size)
if err != nil {
return errors.Wrap(err, "vfs truncate: failed to truncate")
}
item.info.Size = size
return nil
}
// Truncate the item to the current size, creating if necessary
//
// This does not mark the object as dirty
//
// call with the lock held
func (item *Item) _truncateToCurrentSize() (err error) {
size, err := item._getSize()
if err != nil && !os.IsNotExist(errors.Cause(err)) {
return errors.Wrap(err, "truncate to current size")
}
if size < 0 {
// FIXME ignore unknown length files
return nil
}
err = item._truncate(size)
if err != nil {
return err
}
return nil
}
// Truncate the item to the given size, creating it if necessary
//
// If the new size is shorter than the existing size then the object
// will be shortened and marked as dirty.
//
// If the new size is longer than the old size then the object will be
// extended and the extended data will be filled with zeros. The
// object will be marked as dirty in this case also.
func (item *Item) Truncate(size int64) (err error) {
item.mu.Lock()
defer item.mu.Unlock()
// Read old size
oldSize, err := item._getSize()
if err != nil {
if !os.IsNotExist(errors.Cause(err)) {
return errors.Wrap(err, "truncate failed to read size")
}
oldSize = 0
}
err = item._truncate(size)
if err != nil {
return err
}
changed := true
if size > oldSize {
// Truncate extends the file in which case all new bytes are
// read as zeros. In this case we must show we have written to
// the new parts of the file.
item._written(oldSize, size)
} else if size < oldSize {
// Truncate shrinks the file so clip the downloaded ranges
item.info.Rs = item.info.Rs.Intersection(ranges.Range{Pos: 0, Size: size})
} else {
changed = item.o == nil
}
if changed {
item._dirty()
}
return nil
}
// _getSize gets the current size of the item and updates item.info.Size
//
// Call with mutex held
func (item *Item) _getSize() (size int64, err error) {
var fi os.FileInfo
if item.fd != nil {
fi, err = item.fd.Stat()
} else {
osPath := item.c.toOSPath(item.name) // No locking in Cache
fi, err = os.Stat(osPath)
}
if err != nil {
if os.IsNotExist(err) && item.o != nil {
size = item.o.Size()
err = nil
}
} else {
size = fi.Size()
}
if err == nil {
item.info.Size = size
}
return size, err
}
// GetSize gets the current size of the item
func (item *Item) GetSize() (size int64, err error) {
item.mu.Lock()
defer item.mu.Unlock()
return item._getSize()
}
// _exists returns whether the backing file for the item exists or not
//
// call with mutex held
func (item *Item) _exists() bool {
osPath := item.c.toOSPath(item.name) // No locking in Cache
_, err := os.Stat(osPath)
return err == nil
}
// Exists returns whether the backing file for the item exists or not
func (item *Item) Exists() bool {
item.mu.Lock()
defer item.mu.Unlock()
return item._exists()
}
// _dirty marks the item as changed and needing writeback
//
// call with lock held
func (item *Item) _dirty() {
item.info.ModTime = time.Now()
item.info.ATime = item.info.ModTime
item.metaDirty = true
if !item.info.Dirty {
item.info.Dirty = true
err := item._save()
if err != nil {
fs.Errorf(item.name, "vfs cache: failed to save item info: %v", err)
}
}
}
// Dirty marks the item as changed and needing writeback
func (item *Item) Dirty() {
item.mu.Lock()
item._dirty()
item.mu.Unlock()
}
// Open the local file from the object passed in (which may be nil)
// which implies we are about to create the file
func (item *Item) Open(o fs.Object) (err error) {
defer log.Trace(o, "item=%p", item)("err=%v", &err)
item.mu.Lock()
defer item.mu.Unlock()
item.info.ATime = time.Now()
item.opens++
osPath, err := item.c.mkdir(item.name) // No locking in Cache
if err != nil {
return errors.Wrap(err, "vfs cache item: open mkdir failed")
}
err = item._checkObject(o)
if err != nil {
return errors.Wrap(err, "vfs cache item: check object failed")
}
if item.opens != 1 {
return nil
}
if item.fd != nil {
return errors.New("vfs cache item: internal error: didn't Close file")
}
fd, err := file.OpenFile(osPath, os.O_RDWR, 0600)
if err != nil {
return errors.Wrap(err, "vfs cache item: open failed")
}
err = file.SetSparse(fd)
if err != nil {
fs.Debugf(item.name, "vfs cache item: failed to set as a sparse file: %v", err)
}
item.fd = fd
err = item._save()
if err != nil {
return err
}
// Unlock the Item.mu so we can call some methods which take Cache.mu
item.mu.Unlock()
// Ensure this item is in the cache. It is possible a cache
// expiry has run and removed the item if it had no opens so
// we put it back here. If there was an item with opens
// already then return an error. This shouldn't happen because
// there should only be one vfs.File with a pointer to this
// item in at a time.
oldItem := item.c.put(item.name, item) // LOCKING in Cache method
if oldItem != nil {
oldItem.mu.Lock()
if oldItem.opens != 0 {
// Put the item back and return an error
item.c.put(item.name, oldItem) // LOCKING in Cache method
err = errors.Errorf("internal error: item %q already open in the cache", item.name)
}
oldItem.mu.Unlock()
}
// Relock the Item.mu for the return
item.mu.Lock()
return err
}
// Store stores the local cache file to the remote object, returning
// the new remote object. objOld is the old object if known.
//
// call with item lock held
func (item *Item) _store() (err error) {
defer log.Trace(item.name, "item=%p", item)("err=%v", &err)
ctx := context.Background()
// Ensure any segments not transferred are brought in
err = item._ensure(0, item.info.Size)
if err != nil {
return errors.Wrap(err, "vfs cache: failed to download missing parts of cache file")
}
// Transfer the temp file to the remote
cacheObj, err := item.c.fcache.NewObject(ctx, item.name)
if err != nil {
return errors.Wrap(err, "vfs cache: failed to find cache file")
}
o, err := operations.Copy(ctx, item.c.fremote, item.o, item.name, cacheObj)
if err != nil {
return errors.Wrap(err, "vfs cache: failed to transfer file from cache to remote")
}
item.o = o
item._updateFingerprint()
return nil
}
// Close the cache file
func (item *Item) Close(storeFn StoreFn) (err error) {
defer log.Trace(item.o, "Item.Close")("err=%v", &err)
var (
downloader *downloader
o fs.Object
)
// close downloader and set item with mutex unlocked
defer func() {
if downloader != nil {
closeErr := downloader.close(nil)
if closeErr != nil && err == nil {
err = closeErr
}
}
if err == nil && storeFn != nil && o != nil {
// Write the object back to the VFS layer
storeFn(item.o)
}
}()
item.mu.Lock()
defer item.mu.Unlock()
item.info.ATime = time.Now()
item.opens--
if item.opens < 0 {
return os.ErrClosed
} else if item.opens > 0 {
return nil
}
// Update the size on close
_, _ = item._getSize()
err = item._save()
if err != nil {
return errors.Wrap(err, "close failed to save item")
}
// close the downloader
downloader = item.downloader
item.downloader = nil
// close the file handle
if item.fd == nil {
return errors.New("vfs cache item: internal error: didn't Open file")
}
err = item.fd.Close()
item.fd = nil
// if the item hasn't been changed but has been completed then
// set the modtime from the object otherwise set it from the info
if item._exists() {
if !item.info.Dirty && item.o != nil {
item._setModTime(item.o.ModTime(context.Background()))
} else {
item._setModTime(item.info.ModTime)
}
}
// upload the file to backing store if changed
if item.info.Dirty {
fs.Debugf(item.name, "item changed - writeback")
err = item._store()
if err != nil {
fs.Errorf(item.name, "%v", err)
return err
}
fs.Debugf(item.o, "transferred to remote")
item.info.Dirty = false
o = item.o
}
return err
}
// check the fingerprint of an object and update the item or delete
// the cached file accordingly
//
// It ensures the file is the correct size for the object
//
// call with lock held
func (item *Item) _checkObject(o fs.Object) error {
if o == nil {
if item.info.Fingerprint != "" {
// no remote object && local object
// remove local object
item._remove("stale (remote deleted)")
} else {
// no remote object && no local object
// OK
}
} else {
remoteFingerprint := fs.Fingerprint(context.TODO(), o, false)
fs.Debugf(item.name, "vfs cache: checking remote fingerprint %q against cached fingerprint %q", remoteFingerprint, item.info.Fingerprint)
if item.info.Fingerprint != "" {
// remote object && local object
if remoteFingerprint != item.info.Fingerprint {
fs.Debugf(item.name, "vfs cache: removing cached entry as stale (remote fingerprint %q != cached fingerprint %q)", remoteFingerprint, item.info.Fingerprint)
item._remove("stale (remote is different)")
}
} else {
// remote object && no local object
// Set fingerprint
item.info.Fingerprint = remoteFingerprint
item.metaDirty = true
}
item.info.Size = o.Size()
}
item.o = o
err := item._truncateToCurrentSize()
if err != nil {
return errors.Wrap(err, "vfs cache item: open truncate failed")
}
return nil
}
// check the fingerprint of an object and update the item or delete
// the cached file accordingly.
//
// It ensures the file is the correct size for the object
func (item *Item) checkObject(o fs.Object) error {
item.mu.Lock()
defer item.mu.Unlock()
return item._checkObject(o)
}
// remove the cached file
//
// call with lock held
func (item *Item) _removeFile(reason string) {
osPath := item.c.toOSPath(item.name) // No locking in Cache
err := os.Remove(osPath)
if err != nil {
if !os.IsNotExist(err) {
fs.Errorf(item.name, "Failed to remove cache file as %s: %v", reason, err)
}
} else {
fs.Infof(item.name, "Removed cache file as %s", reason)
}
}
// remove the metadata
//
// call with lock held
func (item *Item) _removeMeta(reason string) {
osPathMeta := item.c.toOSPathMeta(item.name) // No locking in Cache
err := os.Remove(osPathMeta)
if err != nil {
if !os.IsNotExist(err) {
fs.Errorf(item.name, "Failed to remove metadata from cache as %s: %v", reason, err)
}
} else {
fs.Infof(item.name, "Removed metadata from cache as %s", reason)
}
}
// remove the cached file and empty the metadata
//
// call with lock held
func (item *Item) _remove(reason string) {
item.info.clean()
item.metaDirty = false
item._removeFile(reason)
item._removeMeta(reason)
}
// remove the cached file and empty the metadata
func (item *Item) remove(reason string) {
item.mu.Lock()
item._remove(reason)
item.mu.Unlock()
}
// create a downloader for the item
//
// call with item mutex held
func (item *Item) _newDownloader() (err error) {
// If no cached object then can't download
if item.o == nil {
return errors.New("vfs cache: internal error: tried to download nil object")
}
// If downloading the object already stop the downloader and restart it
if item.downloader != nil {
item.mu.Unlock()
_ = item.downloader.close(nil)
item.mu.Lock()
item.downloader = nil
}
item.downloader, err = newDownloader(item, item.c.fremote, item.name, item.o)
return err
}
// _present returns true if the whole file has been downloaded
//
// call with the lock held
func (item *Item) _present() bool {
if item.downloader != nil && item.downloader.running() {
return false
}
return item.info.Rs.Present(ranges.Range{Pos: 0, Size: item.info.Size})
}
// ensure the range from offset, size is present in the backing file
//
// call with the item lock held
func (item *Item) _ensure(offset, size int64) (err error) {
defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("err=%v", &err)
if offset+size > item.info.Size {
size = item.info.Size - offset
}
r := ranges.Range{Pos: offset, Size: size}
present := item.info.Rs.Present(r)
downloader := item.downloader
fs.Debugf(nil, "looking for range=%+v in %+v - present %v", r, item.info.Rs, present)
if present {
return nil
}
// FIXME pass in offset here to decide to seek?
err = item._newDownloader()
if err != nil {
return errors.Wrap(err, "Ensure: failed to start downloader")
}
downloader = item.downloader
if downloader == nil {
return errors.New("internal error: downloader is nil")
}
if !downloader.running() {
// FIXME need to make sure we start in the correct place because some of offset,size might exist
// FIXME this could stop an old download
item.mu.Unlock()
err = downloader.start(offset)
item.mu.Lock()
if err != nil {
return errors.Wrap(err, "Ensure: failed to run downloader")
}
}
item.mu.Unlock()
defer item.mu.Lock()
return item.downloader.ensure(r)
}
// _written marks the (offset, size) as present in the backing file
//
// This is called by the downloader downloading file segments and the
// vfs layer writing to the file.
//
// call with lock held
func (item *Item) _written(offset, size int64) {
defer log.Trace(item.name, "offset=%d, size=%d", offset, size)("")
item.info.Rs.Insert(ranges.Range{Pos: offset, Size: offset + size})
item.metaDirty = true
}
// update the fingerprint of the object if any
//
// call with lock held
func (item *Item) _updateFingerprint() {
if item.o == nil {
return
}
oldFingerprint := item.info.Fingerprint
item.info.Fingerprint = fs.Fingerprint(context.TODO(), item.o, false)
if oldFingerprint != item.info.Fingerprint {
fs.Debugf(item.o, "fingerprint now %q", item.info.Fingerprint)
item.metaDirty = true
}
}
// setModTime of the cache file
//
// call with lock held
func (item *Item) _setModTime(modTime time.Time) {
fs.Debugf(item.name, "vfs cache: setting modification time to %v", modTime)
osPath := item.c.toOSPath(item.name) // No locking in Cache
err := os.Chtimes(osPath, modTime, modTime)
if err != nil {
fs.Errorf(item.name, "Failed to set modification time of cached file: %v", err)
}
}
// setModTime of the cache file and in the Item
func (item *Item) setModTime(modTime time.Time) {
defer log.Trace(item.name, "modTime=%v", modTime)("")
item.mu.Lock()
item._updateFingerprint()
item._setModTime(modTime)
item.mu.Unlock()
}
// ReadAt bytes from the file at off
func (item *Item) ReadAt(b []byte, off int64) (n int, err error) {
item.mu.Lock()
if item.fd == nil {
item.mu.Unlock()
return 0, errors.New("vfs cache item ReadAt: internal error: didn't Open file")
}
err = item._ensure(off, int64(len(b)))
if err != nil {
item.mu.Unlock()
return n, err
}
item.info.ATime = time.Now()
item.mu.Unlock()
// Do the reading with Item.mu unlocked
return item.fd.ReadAt(b, off)
}
// WriteAt bytes to the file at off
func (item *Item) WriteAt(b []byte, off int64) (n int, err error) {
item.mu.Lock()
if item.fd == nil {
item.mu.Unlock()
return 0, errors.New("vfs cache item WriteAt: internal error: didn't Open file")
}
item.mu.Unlock()
// Do the writing with Item.mu unlocked
n, err = item.fd.WriteAt(b, off)
item.mu.Lock()
item._written(off, int64(n))
if n > 0 {
item._dirty()
}
item.mu.Unlock()
return n, err
}
// Sync commits the current contents of the file to stable storage. Typically,
// this means flushing the file system's in-memory copy of recently written
// data to disk.
func (item *Item) Sync() (err error) {
item.mu.Lock()
defer item.mu.Unlock()
if item.fd == nil {
return errors.New("vfs cache item sync: internal error: didn't Open file")
}
// sync the file and the metadata to disk
err = item.fd.Sync()
if err != nil {
return errors.Wrap(err, "vfs cache item sync: failed to sync file")
}
err = item._save()
if err != nil {
return errors.Wrap(err, "vfs cache item sync: failed to sync metadata")
}
return nil
}
// rename the item
func (item *Item) rename(name string, newName string, newObj fs.Object) (err error) {
var downloader *downloader
// close downloader with mutex unlocked
defer func() {
if downloader != nil {
_ = downloader.close(nil)
}
}()
item.mu.Lock()
defer item.mu.Unlock()
// stop downloader
downloader = item.downloader
item.downloader = nil
// Set internal state
item.name = newName
item.o = newObj
// Rename cache file if it exists
err = rename(item.c.toOSPath(name), item.c.toOSPath(newName)) // No locking in Cache
if err != nil {
return err
}
// Rename meta file if it exists
err = rename(item.c.toOSPathMeta(name), item.c.toOSPathMeta(newName)) // No locking in Cache
if err != nil {
return err
}
return nil
}

View file

@ -1,600 +0,0 @@
// Package vfscache deals with caching of files locally for the VFS layer
package vfscache
import (
"context"
"os"
"path"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"time"
"github.com/djherbis/times"
"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
fscache "github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/operations"
"github.com/rclone/rclone/vfs/vfscommon"
)
// Cache opened files
type Cache struct {
fremote fs.Fs // fs for the remote we are caching
fcache fs.Fs // fs for the cache directory
opt *vfscommon.Options // vfs Options
root string // root of the cache directory
itemMu sync.Mutex // protects the following variables
item map[string]*cacheItem // files/directories in the cache
used int64 // total size of files in the cache
}
// cacheItem is stored in the item map
type cacheItem struct {
opens int // number of times file is open
atime time.Time // last time file was accessed
isFile bool // if this is a file or a directory
size int64 // size of the cached item
}
// newCacheItem returns an item for the cache
func newCacheItem(isFile bool) *cacheItem {
return &cacheItem{atime: time.Now(), isFile: isFile}
}
// New creates a new cache heirachy for fremote
//
// This starts background goroutines which can be cancelled with the
// context passed in.
func New(ctx context.Context, fremote fs.Fs, opt *vfscommon.Options) (*Cache, error) {
fRoot := filepath.FromSlash(fremote.Root())
if runtime.GOOS == "windows" {
if strings.HasPrefix(fRoot, `\\?`) {
fRoot = fRoot[3:]
}
fRoot = strings.Replace(fRoot, ":", "", -1)
}
root := filepath.Join(config.CacheDir, "vfs", fremote.Name(), fRoot)
fs.Debugf(nil, "vfs cache root is %q", root)
fcache, err := fscache.Get(root)
if err != nil {
return nil, errors.Wrap(err, "failed to create cache remote")
}
c := &Cache{
fremote: fremote,
fcache: fcache,
opt: opt,
root: root,
item: make(map[string]*cacheItem),
}
go c.cleaner(ctx)
return c, nil
}
// clean returns the cleaned version of name for use in the index map
//
// name should be a remote path not an osPath
func clean(name string) string {
name = strings.Trim(name, "/")
name = path.Clean(name)
if name == "." || name == "/" {
name = ""
}
return name
}
// ToOSPath turns a remote relative name into an OS path in the cache
func (c *Cache) ToOSPath(name string) string {
return filepath.Join(c.root, filepath.FromSlash(name))
}
// Mkdir makes the directory for name in the cache and returns an os
// path for the file
//
// name should be a remote path not an osPath
func (c *Cache) Mkdir(name string) (string, error) {
parent := vfscommon.FindParent(name)
leaf := path.Base(name)
parentPath := c.ToOSPath(parent)
err := os.MkdirAll(parentPath, 0700)
if err != nil {
return "", errors.Wrap(err, "make cache directory failed")
}
c.cacheDir(parent)
return filepath.Join(parentPath, leaf), nil
}
// _get gets name from the cache or creates a new one
//
// It returns the item and found as to whether this item was found in
// the cache (or just created).
//
// name should be a remote path not an osPath
//
// must be called with itemMu held
func (c *Cache) _get(isFile bool, name string) (item *cacheItem, found bool) {
item = c.item[name]
found = item != nil
if !found {
item = newCacheItem(isFile)
c.item[name] = item
}
return item, found
}
// Opens returns the number of opens that are on the file
//
// name should be a remote path not an osPath
func (c *Cache) Opens(name string) int {
name = clean(name)
c.itemMu.Lock()
defer c.itemMu.Unlock()
item := c.item[name]
if item == nil {
return 0
}
return item.opens
}
// get gets name from the cache or creates a new one
//
// name should be a remote path not an osPath
func (c *Cache) get(name string) *cacheItem {
name = clean(name)
c.itemMu.Lock()
item, _ := c._get(true, name)
c.itemMu.Unlock()
return item
}
// updateStat sets the atime of the name to that passed in if it is
// newer than the existing or there isn't an existing time.
//
// it also sets the size
//
// name should be a remote path not an osPath
func (c *Cache) updateStat(name string, when time.Time, size int64) {
name = clean(name)
c.itemMu.Lock()
item, found := c._get(true, name)
if !found || when.Sub(item.atime) > 0 {
fs.Debugf(name, "updateTime: setting atime to %v", when)
item.atime = when
}
item.size = size
c.itemMu.Unlock()
}
// _open marks name as open, must be called with the lock held
//
// name should be a remote path not an osPath
func (c *Cache) _open(isFile bool, name string) {
for {
item, _ := c._get(isFile, name)
item.opens++
item.atime = time.Now()
if name == "" {
break
}
isFile = false
name = vfscommon.FindParent(name)
}
}
// Open marks name as open
//
// name should be a remote path not an osPath
func (c *Cache) Open(name string) {
name = clean(name)
c.itemMu.Lock()
c._open(true, name)
c.itemMu.Unlock()
}
// cacheDir marks a directory and its parents as being in the cache
//
// name should be a remote path not an osPath
func (c *Cache) cacheDir(name string) {
name = clean(name)
c.itemMu.Lock()
defer c.itemMu.Unlock()
for {
item := c.item[name]
if item != nil {
break
}
c.item[name] = newCacheItem(false)
if name == "" {
break
}
name = vfscommon.FindParent(name)
}
}
// Exists checks to see if the file exists in the cache or not
func (c *Cache) Exists(name string) bool {
osPath := c.ToOSPath(name)
fi, err := os.Stat(osPath)
if err != nil {
return false
}
// checks for non-regular files (e.g. directories, symlinks, devices, etc.)
if !fi.Mode().IsRegular() {
return false
}
return true
}
// Rename the file in cache
func (c *Cache) Rename(name string, newName string) (err error) {
osOldPath := c.ToOSPath(name)
osNewPath := c.ToOSPath(newName)
sfi, err := os.Stat(osOldPath)
if err != nil {
return errors.Wrapf(err, "Failed to stat source: %s", osOldPath)
}
if !sfi.Mode().IsRegular() {
// cannot copy non-regular files (e.g., directories, symlinks, devices, etc.)
return errors.Errorf("Non-regular source file: %s (%q)", sfi.Name(), sfi.Mode().String())
}
dfi, err := os.Stat(osNewPath)
if err != nil {
if !os.IsNotExist(err) {
return errors.Wrapf(err, "Failed to stat destination: %s", osNewPath)
}
parent := vfscommon.OsFindParent(osNewPath)
err = os.MkdirAll(parent, 0700)
if err != nil {
return errors.Wrapf(err, "Failed to create parent dir: %s", parent)
}
} else {
if !(dfi.Mode().IsRegular()) {
return errors.Errorf("Non-regular destination file: %s (%q)", dfi.Name(), dfi.Mode().String())
}
if os.SameFile(sfi, dfi) {
return nil
}
}
if err = os.Rename(osOldPath, osNewPath); err != nil {
return errors.Wrapf(err, "Failed to rename in cache: %s to %s", osOldPath, osNewPath)
}
// Rename the cache item
c.itemMu.Lock()
if oldItem, ok := c.item[name]; ok {
c.item[newName] = oldItem
delete(c.item, name)
}
c.itemMu.Unlock()
fs.Infof(name, "Renamed in cache")
return nil
}
// _close marks name as closed - must be called with the lock held
func (c *Cache) _close(isFile bool, name string) {
for {
item, _ := c._get(isFile, name)
item.opens--
item.atime = time.Now()
if item.opens < 0 {
fs.Errorf(name, "cache: double close")
}
osPath := c.ToOSPath(name)
fi, err := os.Stat(osPath)
// Update the size on close
if err == nil && !fi.IsDir() {
item.size = fi.Size()
}
if name == "" {
break
}
isFile = false
name = vfscommon.FindParent(name)
}
}
// Close marks name as closed
//
// name should be a remote path not an osPath
func (c *Cache) Close(name string) {
name = clean(name)
c.itemMu.Lock()
c._close(true, name)
c.itemMu.Unlock()
}
// Remove should be called if name is deleted
func (c *Cache) Remove(name string) {
osPath := c.ToOSPath(name)
err := os.Remove(osPath)
if err != nil && !os.IsNotExist(err) {
fs.Errorf(name, "Failed to remove from cache: %v", err)
} else {
fs.Infof(name, "Removed from cache")
}
}
// removeDir should be called if dir is deleted and returns true if
// the directory is gone.
func (c *Cache) removeDir(dir string) bool {
osPath := c.ToOSPath(dir)
err := os.Remove(osPath)
if err == nil || os.IsNotExist(err) {
if err == nil {
fs.Debugf(dir, "Removed empty directory")
}
return true
}
if !os.IsExist(err) {
fs.Errorf(dir, "Failed to remove cached dir: %v", err)
}
return false
}
// SetModTime should be called to set the modification time of the cache file
func (c *Cache) SetModTime(name string, modTime time.Time) {
osPath := c.ToOSPath(name)
err := os.Chtimes(osPath, modTime, modTime)
if err != nil {
fs.Errorf(name, "Failed to set modification time of cached file: %v", err)
}
}
// CleanUp empties the cache of everything
func (c *Cache) CleanUp() error {
return os.RemoveAll(c.root)
}
// walk walks the cache calling the function
func (c *Cache) walk(fn func(osPath string, fi os.FileInfo, name string) error) error {
return filepath.Walk(c.root, func(osPath string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
// Find path relative to the cache root
name, err := filepath.Rel(c.root, osPath)
if err != nil {
return errors.Wrap(err, "filepath.Rel failed in walk")
}
if name == "." {
name = ""
}
// And convert into slashes
name = filepath.ToSlash(name)
return fn(osPath, fi, name)
})
}
// updateStats walks the cache updating any atimes and sizes it finds
//
// it also updates used
func (c *Cache) updateStats() error {
var newUsed int64
err := c.walk(func(osPath string, fi os.FileInfo, name string) error {
if !fi.IsDir() {
// Update the atime with that of the file
atime := times.Get(fi).AccessTime()
c.updateStat(name, atime, fi.Size())
newUsed += fi.Size()
} else {
c.cacheDir(name)
}
return nil
})
c.itemMu.Lock()
c.used = newUsed
c.itemMu.Unlock()
return err
}
// purgeOld gets rid of any files that are over age
func (c *Cache) purgeOld(maxAge time.Duration) {
c._purgeOld(maxAge, c.Remove)
}
func (c *Cache) _purgeOld(maxAge time.Duration, remove func(name string)) {
c.itemMu.Lock()
defer c.itemMu.Unlock()
cutoff := time.Now().Add(-maxAge)
for name, item := range c.item {
if item.isFile && item.opens == 0 {
// If not locked and access time too long ago - delete the file
dt := item.atime.Sub(cutoff)
// fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.atime, cutoff, dt)
if dt < 0 {
remove(name)
// Remove the entry
delete(c.item, name)
}
}
}
}
// Purge any empty directories
func (c *Cache) purgeEmptyDirs() {
c._purgeEmptyDirs(c.removeDir)
}
func (c *Cache) _purgeEmptyDirs(removeDir func(name string) bool) {
c.itemMu.Lock()
defer c.itemMu.Unlock()
var dirs []string
for name, item := range c.item {
if !item.isFile && item.opens == 0 {
dirs = append(dirs, name)
}
}
// remove empty directories in reverse alphabetical order
sort.Strings(dirs)
for i := len(dirs) - 1; i >= 0; i-- {
dir := dirs[i]
// Remove the entry
if removeDir(dir) {
delete(c.item, dir)
}
}
}
// This is a cacheItem with a name for sorting
type cacheNamedItem struct {
name string
item *cacheItem
}
type cacheNamedItems []cacheNamedItem
func (v cacheNamedItems) Len() int { return len(v) }
func (v cacheNamedItems) Swap(i, j int) { v[i], v[j] = v[j], v[i] }
func (v cacheNamedItems) Less(i, j int) bool { return v[i].item.atime.Before(v[j].item.atime) }
// Remove any files that are over quota starting from the
// oldest first
func (c *Cache) purgeOverQuota(quota int64) {
c._purgeOverQuota(quota, c.Remove)
}
func (c *Cache) _purgeOverQuota(quota int64, remove func(name string)) {
c.itemMu.Lock()
defer c.itemMu.Unlock()
if quota <= 0 || c.used < quota {
return
}
var items cacheNamedItems
// Make a slice of unused files
for name, item := range c.item {
if item.isFile && item.opens == 0 {
items = append(items, cacheNamedItem{
name: name,
item: item,
})
}
}
sort.Sort(items)
// Remove items until the quota is OK
for _, item := range items {
if c.used < quota {
break
}
remove(item.name)
// Remove the entry
delete(c.item, item.name)
c.used -= item.item.size
}
}
// clean empties the cache of stuff if it can
func (c *Cache) clean() {
// Cache may be empty so end
_, err := os.Stat(c.root)
if os.IsNotExist(err) {
return
}
c.itemMu.Lock()
oldItems, oldUsed := len(c.item), fs.SizeSuffix(c.used)
c.itemMu.Unlock()
// first walk the FS to update the atimes and sizes
err = c.updateStats()
if err != nil {
fs.Errorf(nil, "Error traversing cache %q: %v", c.root, err)
}
// Remove any files that are over age
c.purgeOld(c.opt.CacheMaxAge)
// Now remove any files that are over quota starting from the
// oldest first
c.purgeOverQuota(int64(c.opt.CacheMaxSize))
// Remove any empty directories
c.purgeEmptyDirs()
// Stats
c.itemMu.Lock()
newItems, newUsed := len(c.item), fs.SizeSuffix(c.used)
c.itemMu.Unlock()
fs.Infof(nil, "Cleaned the cache: objects %d (was %d), total size %v (was %v)", newItems, oldItems, newUsed, oldUsed)
}
// cleaner calls clean at regular intervals
//
// doesn't return until context is cancelled
func (c *Cache) cleaner(ctx context.Context) {
if c.opt.CachePollInterval <= 0 {
fs.Debugf(nil, "Cache cleaning thread disabled because poll interval <= 0")
return
}
// Start cleaning the cache immediately
c.clean()
// Then every interval specified
timer := time.NewTicker(c.opt.CachePollInterval)
defer timer.Stop()
for {
select {
case <-timer.C:
c.clean()
case <-ctx.Done():
fs.Debugf(nil, "cache cleaner exiting")
return
}
}
}
// copy an object to or from the remote while accounting for it
func copyObj(f fs.Fs, dst fs.Object, remote string, src fs.Object) (newDst fs.Object, err error) {
if operations.NeedTransfer(context.TODO(), dst, src) {
newDst, err = operations.Copy(context.TODO(), f, dst, remote, src)
} else {
newDst = dst
}
return newDst, err
}
// Check the local file is up to date in the cache
func (c *Cache) Check(ctx context.Context, o fs.Object, remote string) error {
cacheObj, err := c.fcache.NewObject(ctx, remote)
if err == nil && cacheObj != nil {
_, err = copyObj(c.fcache, cacheObj, remote, o)
if err != nil {
return errors.Wrap(err, "failed to update cached file")
}
}
return nil
}
// Fetch fetches the object to the cache file
func (c *Cache) Fetch(ctx context.Context, o fs.Object, remote string) error {
_, err := copyObj(c.fcache, nil, remote, o)
return err
}
// Store stores the local cache file to the remote object, returning
// the new remote object. objOld is the old object if known.
func (c *Cache) Store(ctx context.Context, objOld fs.Object, remote string) (fs.Object, error) {
// Transfer the temp file to the remote
cacheObj, err := c.fcache.NewObject(ctx, remote)
if err != nil {
return nil, errors.Wrap(err, "failed to find cache file")
}
if objOld != nil {
remote = objOld.Remote() // use the path of the actual object if available
}
o, err := copyObj(c.fremote, objOld, remote, cacheObj)
if err != nil {
return nil, errors.Wrap(err, "failed to transfer file from cache to remote")
}
return o, nil
}

View file

@ -1,602 +0,0 @@
package vfscache
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sort"
"testing"
"time"
"github.com/djherbis/times"
_ "github.com/rclone/rclone/backend/local" // import the local backend
"github.com/rclone/rclone/fstest"
"github.com/rclone/rclone/vfs/vfscommon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestMain drives the tests
func TestMain(m *testing.M) {
fstest.TestMain(m)
}
// convert c.item to a string
func itemAsString(c *Cache) []string {
c.itemMu.Lock()
defer c.itemMu.Unlock()
var out []string
for name, item := range c.item {
out = append(out, fmt.Sprintf("name=%q isFile=%v opens=%d size=%d", filepath.ToSlash(name), item.isFile, item.opens, item.size))
}
sort.Strings(out)
return out
}
func TestCacheNew(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Disable the cache cleaner as it interferes with these tests
opt := vfscommon.DefaultOpt
opt.CachePollInterval = 0
c, err := New(ctx, r.Fremote, &opt)
require.NoError(t, err)
assert.Contains(t, c.root, "vfs")
assert.Contains(t, c.fcache.Root(), filepath.Base(r.Fremote.Root()))
assert.Equal(t, []string(nil), itemAsString(c))
// mkdir
p, err := c.Mkdir("potato")
require.NoError(t, err)
assert.Equal(t, "potato", filepath.Base(p))
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
}, itemAsString(c))
fi, err := os.Stat(filepath.Dir(p))
require.NoError(t, err)
assert.True(t, fi.IsDir())
// get
item := c.get("potato")
item2 := c.get("potato")
assert.Equal(t, item, item2)
assert.WithinDuration(t, time.Now(), item.atime, time.Second)
// updateTime
//.. before
t1 := time.Now().Add(-60 * time.Minute)
c.updateStat("potato", t1, 0)
item = c.get("potato")
assert.NotEqual(t, t1, item.atime)
assert.Equal(t, 0, item.opens)
//..after
t2 := time.Now().Add(60 * time.Minute)
c.updateStat("potato", t2, 0)
item = c.get("potato")
assert.Equal(t, t2, item.atime)
assert.Equal(t, 0, item.opens)
// open
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="potato" isFile=true opens=0 size=0`,
}, itemAsString(c))
c.Open("/potato")
assert.Equal(t, []string{
`name="" isFile=false opens=1 size=0`,
`name="potato" isFile=true opens=1 size=0`,
}, itemAsString(c))
item = c.get("potato")
assert.WithinDuration(t, time.Now(), item.atime, time.Second)
assert.Equal(t, 1, item.opens)
// write the file
err = ioutil.WriteFile(p, []byte("hello"), 0600)
require.NoError(t, err)
// read its atime
fi, err = os.Stat(p)
assert.NoError(t, err)
atime := times.Get(fi).AccessTime()
// updateAtimes
item = c.get("potato")
item.atime = time.Now().Add(-24 * time.Hour)
err = c.updateStats()
require.NoError(t, err)
assert.Equal(t, []string{
`name="" isFile=false opens=1 size=0`,
`name="potato" isFile=true opens=1 size=5`,
}, itemAsString(c))
item = c.get("potato")
assert.Equal(t, atime, item.atime)
// updateAtimes - not in the cache
oldItem := item
c.itemMu.Lock()
delete(c.item, "potato") // remove from cache
c.itemMu.Unlock()
err = c.updateStats()
require.NoError(t, err)
assert.Equal(t, []string{
`name="" isFile=false opens=1 size=0`,
`name="potato" isFile=true opens=0 size=5`,
}, itemAsString(c))
item = c.get("potato")
assert.Equal(t, atime, item.atime)
c.itemMu.Lock()
c.item["potato"] = oldItem // restore to cache
c.itemMu.Unlock()
// try purging with file open
c.purgeOld(10 * time.Second)
_, err = os.Stat(p)
assert.NoError(t, err)
// close
assert.Equal(t, []string{
`name="" isFile=false opens=1 size=0`,
`name="potato" isFile=true opens=1 size=5`,
}, itemAsString(c))
c.updateStat("potato", t2, 6)
assert.Equal(t, []string{
`name="" isFile=false opens=1 size=0`,
`name="potato" isFile=true opens=1 size=6`,
}, itemAsString(c))
c.Close("potato/")
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="potato" isFile=true opens=0 size=5`,
}, itemAsString(c))
item = c.get("potato")
assert.WithinDuration(t, time.Now(), item.atime, time.Second)
assert.Equal(t, 0, item.opens)
// try purging with file closed
c.purgeOld(10 * time.Second)
// ...nothing should happen
_, err = os.Stat(p)
assert.NoError(t, err)
//.. purge again with -ve age
c.purgeOld(-10 * time.Second)
_, err = os.Stat(p)
assert.True(t, os.IsNotExist(err))
// clean - have tested the internals already
c.clean()
// cleanup
err = c.CleanUp()
require.NoError(t, err)
_, err = os.Stat(c.root)
assert.True(t, os.IsNotExist(err))
}
func TestCacheOpens(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt)
require.NoError(t, err)
assert.Equal(t, []string(nil), itemAsString(c))
c.Open("potato")
assert.Equal(t, []string{
`name="" isFile=false opens=1 size=0`,
`name="potato" isFile=true opens=1 size=0`,
}, itemAsString(c))
c.Open("potato")
assert.Equal(t, []string{
`name="" isFile=false opens=2 size=0`,
`name="potato" isFile=true opens=2 size=0`,
}, itemAsString(c))
c.Close("potato")
assert.Equal(t, []string{
`name="" isFile=false opens=1 size=0`,
`name="potato" isFile=true opens=1 size=0`,
}, itemAsString(c))
c.Close("potato")
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="potato" isFile=true opens=0 size=0`,
}, itemAsString(c))
c.Open("potato")
c.Open("a//b/c/d/one")
c.Open("a/b/c/d/e/two")
c.Open("a/b/c/d/e/f/three")
assert.Equal(t, []string{
`name="" isFile=false opens=4 size=0`,
`name="a" isFile=false opens=3 size=0`,
`name="a/b" isFile=false opens=3 size=0`,
`name="a/b/c" isFile=false opens=3 size=0`,
`name="a/b/c/d" isFile=false opens=3 size=0`,
`name="a/b/c/d/e" isFile=false opens=2 size=0`,
`name="a/b/c/d/e/f" isFile=false opens=1 size=0`,
`name="a/b/c/d/e/f/three" isFile=true opens=1 size=0`,
`name="a/b/c/d/e/two" isFile=true opens=1 size=0`,
`name="a/b/c/d/one" isFile=true opens=1 size=0`,
`name="potato" isFile=true opens=1 size=0`,
}, itemAsString(c))
c.Close("potato")
c.Close("a/b/c/d/one")
c.Close("a/b/c/d/e/two")
c.Close("a/b/c//d/e/f/three")
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="a" isFile=false opens=0 size=0`,
`name="a/b" isFile=false opens=0 size=0`,
`name="a/b/c" isFile=false opens=0 size=0`,
`name="a/b/c/d" isFile=false opens=0 size=0`,
`name="a/b/c/d/e" isFile=false opens=0 size=0`,
`name="a/b/c/d/e/f" isFile=false opens=0 size=0`,
`name="a/b/c/d/e/f/three" isFile=true opens=0 size=0`,
`name="a/b/c/d/e/two" isFile=true opens=0 size=0`,
`name="a/b/c/d/one" isFile=true opens=0 size=0`,
`name="potato" isFile=true opens=0 size=0`,
}, itemAsString(c))
}
// test the open, mkdir, purge, close, purge sequence
func TestCacheOpenMkdir(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Disable the cache cleaner as it interferes with these tests
opt := vfscommon.DefaultOpt
opt.CachePollInterval = 0
c, err := New(ctx, r.Fremote, &opt)
require.NoError(t, err)
// open
c.Open("sub/potato")
assert.Equal(t, []string{
`name="" isFile=false opens=1 size=0`,
`name="sub" isFile=false opens=1 size=0`,
`name="sub/potato" isFile=true opens=1 size=0`,
}, itemAsString(c))
// mkdir
p, err := c.Mkdir("sub/potato")
require.NoError(t, err)
assert.Equal(t, "potato", filepath.Base(p))
assert.Equal(t, []string{
`name="" isFile=false opens=1 size=0`,
`name="sub" isFile=false opens=1 size=0`,
`name="sub/potato" isFile=true opens=1 size=0`,
}, itemAsString(c))
// test directory exists
fi, err := os.Stat(filepath.Dir(p))
require.NoError(t, err)
assert.True(t, fi.IsDir())
// clean the cache
c.purgeOld(-10 * time.Second)
// test directory still exists
fi, err = os.Stat(filepath.Dir(p))
require.NoError(t, err)
assert.True(t, fi.IsDir())
// close
c.Close("sub/potato")
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="sub" isFile=false opens=0 size=0`,
`name="sub/potato" isFile=true opens=0 size=0`,
}, itemAsString(c))
// clean the cache
c.purgeOld(-10 * time.Second)
c.purgeEmptyDirs()
assert.Equal(t, []string(nil), itemAsString(c))
// test directory does not exist
_, err = os.Stat(filepath.Dir(p))
require.True(t, os.IsNotExist(err))
}
func TestCacheCacheDir(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt)
require.NoError(t, err)
assert.Equal(t, []string(nil), itemAsString(c))
c.cacheDir("dir")
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="dir" isFile=false opens=0 size=0`,
}, itemAsString(c))
c.cacheDir("dir/sub")
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="dir" isFile=false opens=0 size=0`,
`name="dir/sub" isFile=false opens=0 size=0`,
}, itemAsString(c))
c.cacheDir("dir/sub2/subsub2")
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="dir" isFile=false opens=0 size=0`,
`name="dir/sub" isFile=false opens=0 size=0`,
`name="dir/sub2" isFile=false opens=0 size=0`,
`name="dir/sub2/subsub2" isFile=false opens=0 size=0`,
}, itemAsString(c))
}
func TestCachePurgeOld(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := New(ctx, r.Fremote, &vfscommon.DefaultOpt)
require.NoError(t, err)
// Test funcs
var removed []string
removedDir := true
removeFile := func(name string) {
removed = append(removed, filepath.ToSlash(name))
}
removeDir := func(name string) bool {
if removedDir {
removed = append(removed, filepath.ToSlash(name)+"/")
}
return removedDir
}
removed = nil
c._purgeOld(-10*time.Second, removeFile)
c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string(nil), removed)
c.Open("sub/dir2/potato2")
c.Open("sub/dir/potato")
c.Close("sub/dir2/potato2")
c.Open("sub/dir/potato")
assert.Equal(t, []string{
`name="" isFile=false opens=2 size=0`,
`name="sub" isFile=false opens=2 size=0`,
`name="sub/dir" isFile=false opens=2 size=0`,
`name="sub/dir/potato" isFile=true opens=2 size=0`,
`name="sub/dir2" isFile=false opens=0 size=0`,
`name="sub/dir2/potato2" isFile=true opens=0 size=0`,
}, itemAsString(c))
removed = nil
removedDir = true
c._purgeOld(-10*time.Second, removeFile)
c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string{
"sub/dir2/potato2",
"sub/dir2/",
}, removed)
c.Close("sub/dir/potato")
removed = nil
removedDir = true
c._purgeOld(-10*time.Second, removeFile)
c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string(nil), removed)
c.Close("sub/dir/potato")
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="sub" isFile=false opens=0 size=0`,
`name="sub/dir" isFile=false opens=0 size=0`,
`name="sub/dir/potato" isFile=true opens=0 size=0`,
}, itemAsString(c))
removed = nil
removedDir = false
c._purgeOld(10*time.Second, removeFile)
c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string(nil), removed)
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="sub" isFile=false opens=0 size=0`,
`name="sub/dir" isFile=false opens=0 size=0`,
`name="sub/dir/potato" isFile=true opens=0 size=0`,
}, itemAsString(c))
removed = nil
removedDir = true
c._purgeOld(-10*time.Second, removeFile)
c._purgeEmptyDirs(removeDir)
assert.Equal(t, []string{
"sub/dir/potato",
"sub/dir/",
"sub/",
"/",
}, removed)
assert.Equal(t, []string(nil), itemAsString(c))
}
func TestCachePurgeOverQuota(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Disable the cache cleaner as it interferes with these tests
opt := vfscommon.DefaultOpt
opt.CachePollInterval = 0
c, err := New(ctx, r.Fremote, &opt)
require.NoError(t, err)
// Test funcs
var removed []string
remove := func(name string) {
removed = append(removed, filepath.ToSlash(name))
c.Remove(name)
}
removed = nil
c._purgeOverQuota(-1, remove)
assert.Equal(t, []string(nil), removed)
removed = nil
c._purgeOverQuota(0, remove)
assert.Equal(t, []string(nil), removed)
removed = nil
c._purgeOverQuota(1, remove)
assert.Equal(t, []string(nil), removed)
// Make some test files
c.Open("sub/dir/potato")
p, err := c.Mkdir("sub/dir/potato")
require.NoError(t, err)
err = ioutil.WriteFile(p, []byte("hello"), 0600)
require.NoError(t, err)
p, err = c.Mkdir("sub/dir2/potato2")
c.Open("sub/dir2/potato2")
require.NoError(t, err)
err = ioutil.WriteFile(p, []byte("hello2"), 0600)
require.NoError(t, err)
assert.Equal(t, []string{
`name="" isFile=false opens=2 size=0`,
`name="sub" isFile=false opens=2 size=0`,
`name="sub/dir" isFile=false opens=1 size=0`,
`name="sub/dir/potato" isFile=true opens=1 size=0`,
`name="sub/dir2" isFile=false opens=1 size=0`,
`name="sub/dir2/potato2" isFile=true opens=1 size=0`,
}, itemAsString(c))
// Check nothing removed
removed = nil
c._purgeOverQuota(1, remove)
assert.Equal(t, []string(nil), removed)
// Close the files
c.Close("sub/dir/potato")
c.Close("sub/dir2/potato2")
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="sub" isFile=false opens=0 size=0`,
`name="sub/dir" isFile=false opens=0 size=0`,
`name="sub/dir/potato" isFile=true opens=0 size=5`,
`name="sub/dir2" isFile=false opens=0 size=0`,
`name="sub/dir2/potato2" isFile=true opens=0 size=6`,
}, itemAsString(c))
// Update the stats to read the total size
err = c.updateStats()
require.NoError(t, err)
assert.Equal(t, int64(11), c.used)
// make potato2 definitely after potato
t1 := time.Now().Add(10 * time.Second)
c.updateStat("sub/dir2/potato2", t1, 6)
// Check only potato removed to get below quota
removed = nil
c._purgeOverQuota(10, remove)
assert.Equal(t, []string{
"sub/dir/potato",
}, removed)
assert.Equal(t, int64(6), c.used)
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="sub" isFile=false opens=0 size=0`,
`name="sub/dir" isFile=false opens=0 size=0`,
`name="sub/dir2" isFile=false opens=0 size=0`,
`name="sub/dir2/potato2" isFile=true opens=0 size=6`,
}, itemAsString(c))
// Put potato back
c.Open("sub/dir/potato")
p, err = c.Mkdir("sub/dir/potato")
require.NoError(t, err)
err = ioutil.WriteFile(p, []byte("hello"), 0600)
require.NoError(t, err)
c.Close("sub/dir/potato")
// Update the stats to read the total size
err = c.updateStats()
require.NoError(t, err)
assert.Equal(t, int64(11), c.used)
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="sub" isFile=false opens=0 size=0`,
`name="sub/dir" isFile=false opens=0 size=0`,
`name="sub/dir/potato" isFile=true opens=0 size=5`,
`name="sub/dir2" isFile=false opens=0 size=0`,
`name="sub/dir2/potato2" isFile=true opens=0 size=6`,
}, itemAsString(c))
// make potato definitely after potato2
t2 := t1.Add(20 * time.Second)
c.updateStat("sub/dir/potato", t2, 5)
// Check only potato2 removed to get below quota
removed = nil
c._purgeOverQuota(10, remove)
assert.Equal(t, []string{
"sub/dir2/potato2",
}, removed)
assert.Equal(t, int64(5), c.used)
c.purgeEmptyDirs()
assert.Equal(t, []string{
`name="" isFile=false opens=0 size=0`,
`name="sub" isFile=false opens=0 size=0`,
`name="sub/dir" isFile=false opens=0 size=0`,
`name="sub/dir/potato" isFile=true opens=0 size=5`,
}, itemAsString(c))
// Now purge everything
removed = nil
c._purgeOverQuota(1, remove)
assert.Equal(t, []string{
"sub/dir/potato",
}, removed)
assert.Equal(t, int64(0), c.used)
c.purgeEmptyDirs()
assert.Equal(t, []string(nil), itemAsString(c))
// Check nothing left behind
c.clean()
assert.Equal(t, int64(0), c.used)
assert.Equal(t, []string(nil), itemAsString(c))
}

View file

@ -189,8 +189,7 @@ func (fh *WriteFileHandle) close() (err error) {
fh.closed = true
// leave writer open until file is transferred
defer func() {
fh.file.delWriter(fh, false)
fh.file.finishWriterClose()
fh.file.delWriter(fh)
}()
// If file not opened and not safe to truncate then leave file intact
if !fh.opened && !fh.safeToTruncate() {