vfs: add read write files and caching #711

This adds new flags to mount, cmount, serve *

    --cache-max-age duration         Max age of objects in the cache. (default 1h0m0s)
    --cache-mode string              Cache mode off|minimal|writes|full (default "off")
    --cache-poll-interval duration   Interval to poll the cache for stale objects. (default 1m0s)
This commit is contained in:
Nick Craig-Wood 2017-11-06 21:38:52 +00:00
parent bb0ce0cb5f
commit 7f20e1d7f3
10 changed files with 1438 additions and 55 deletions

276
vfs/cache.go Normal file
View file

@ -0,0 +1,276 @@
// This deals with caching of files locally
package vfs
import (
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/djherbis/times"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// CacheMode controls the functionality of the cache
type CacheMode byte
// CacheMode options
const (
CacheModeOff CacheMode = iota // cache nothing - return errors for writes which can't be satisfied
CacheModeMinimal // cache only the minimum, eg read/write opens
CacheModeWrites // cache all files opened with write intent
CacheModeFull // cache all files opened in any mode
)
var cacheModeToString = []string{
CacheModeOff: "off",
CacheModeMinimal: "minimal",
CacheModeWrites: "writes",
CacheModeFull: "full",
}
// String turns a CacheMode into a string
func (l CacheMode) String() string {
if l >= CacheMode(len(cacheModeToString)) {
return fmt.Sprintf("CacheMode(%d)", l)
}
return cacheModeToString[l]
}
// Set a CacheMode
func (l *CacheMode) Set(s string) error {
for n, name := range cacheModeToString {
if s != "" && name == s {
*l = CacheMode(n)
return nil
}
}
return errors.Errorf("Unknown cache mode level %q", s)
}
// Type of the value
func (l *CacheMode) Type() string {
return "string"
}
// cache opened files
type cache struct {
f fs.Fs // fs for the cache directory
opt *Options // vfs Options
root string // root of the cache directory
itemMu sync.Mutex // protects the next two maps
item map[string]*cacheItem // files in the cache
}
// cacheItem is stored in the item map
type cacheItem struct {
opens int // number of times file is open
atime time.Time // last time file was accessed
}
// newCacheItem returns an item for the cache
func newCacheItem() *cacheItem {
return &cacheItem{atime: time.Now()}
}
// newCache creates a new cache heirachy for f
//
// This starts background goroutines which can be cancelled with the
// context passed in.
func newCache(ctx context.Context, f fs.Fs, opt *Options) (*cache, error) {
fRoot := filepath.FromSlash(f.Root())
if runtime.GOOS == "windows" {
if strings.HasPrefix(fRoot, `\\?`) {
fRoot = fRoot[3:]
}
fRoot = strings.Replace(fRoot, ":", "", -1)
}
root := filepath.Join(fs.CacheDir, "vfs", f.Name(), fRoot)
fs.Debugf(nil, "vfs cache root is %q", root)
f, err := fs.NewFs(root)
if err != nil {
return nil, errors.Wrap(err, "failed to create cache remote")
}
c := &cache{
f: f,
opt: opt,
root: root,
item: make(map[string]*cacheItem),
}
go c.cleaner(ctx)
return c, nil
}
// mkdir makes the directory for name in the cache and returns an os
// path for the file
func (c *cache) mkdir(name string) (string, error) {
parent := path.Dir(name)
if parent == "." {
parent = ""
}
leaf := path.Base(name)
parentPath := filepath.Join(c.root, filepath.FromSlash(parent))
err := os.MkdirAll(parentPath, 0700)
if err != nil {
return "", errors.Wrap(err, "make cache directory failed")
}
return filepath.Join(parentPath, leaf), nil
}
// _get gets name from the cache or creates a new one
//
// must be called with itemMu held
func (c *cache) _get(name string) *cacheItem {
item := c.item[name]
if item == nil {
item = newCacheItem()
c.item[name] = item
}
return item
}
// get gets name from the cache or creates a new one
func (c *cache) get(name string) *cacheItem {
c.itemMu.Lock()
item := c._get(name)
c.itemMu.Unlock()
return item
}
// updateTime sets the atime of the name to that passed in if it is
// newer than the existing or there isn't an existing time.
func (c *cache) updateTime(name string, when time.Time) {
c.itemMu.Lock()
item := c._get(name)
if when.Sub(item.atime) > 0 {
fs.Debugf(name, "updateTime: setting atime to %v", when)
item.atime = when
}
c.itemMu.Unlock()
}
// open marks name as open
func (c *cache) open(name string) {
c.itemMu.Lock()
item := c._get(name)
item.opens++
item.atime = time.Now()
c.itemMu.Unlock()
}
// close marks name as closed
func (c *cache) close(name string) {
c.itemMu.Lock()
item := c._get(name)
item.opens--
item.atime = time.Now()
if item.opens < 0 {
fs.Errorf(name, "cache: double close")
}
c.itemMu.Unlock()
}
// cleanUp empties the cache of everything
func (c *cache) cleanUp() error {
return os.RemoveAll(c.root)
}
// updateAtimes walks the cache updating any atimes it finds
func (c *cache) updateAtimes() error {
return filepath.Walk(c.root, func(osPath string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if !fi.IsDir() {
// Find path relative to the cache root
name, err := filepath.Rel(c.root, osPath)
if err != nil {
return errors.Wrap(err, "filepath.Rel failed in updatAtimes")
}
// And convert into slashes
name = filepath.ToSlash(name)
// Update the atime with that of the file
atime := times.Get(fi).AccessTime()
c.updateTime(name, atime)
}
return nil
})
}
// purgeOld gets rid of any files that are over age
func (c *cache) purgeOld(maxAge time.Duration) {
c.itemMu.Lock()
defer c.itemMu.Unlock()
cutoff := time.Now().Add(-maxAge)
for name, item := range c.item {
// If not locked and access time too long ago - delete the file
dt := item.atime.Sub(cutoff)
// fs.Debugf(name, "atime=%v cutoff=%v, dt=%v", item.atime, cutoff, dt)
if item.opens == 0 && dt < 0 {
osPath := filepath.Join(c.root, filepath.FromSlash(name))
err := os.Remove(osPath)
if err != nil {
fs.Errorf(name, "Failed to remove from cache: %v", err)
} else {
fs.Debugf(name, "Removed from cache")
}
// Remove the entry
delete(c.item, name)
}
}
}
// clean empties the cache of stuff if it can
func (c *cache) clean() {
// Cache may be empty so end
_, err := os.Stat(c.root)
if os.IsNotExist(err) {
return
}
fs.Debugf(nil, "Cleaning the cache")
// first walk the FS to update the atimes
err = c.updateAtimes()
if err != nil {
fs.Errorf(nil, "Error traversing cache %q: %v", c.root, err)
}
// Now remove any files that are over age
c.purgeOld(c.opt.CacheMaxAge)
// Now tidy up any empty directories
err = fs.Rmdirs(c.f, "")
if err != nil {
fs.Errorf(c.f, "Failed to remove empty directories from cache: %v", err)
}
}
// cleaner calls clean at regular intervals
//
// doesn't return until context is cancelled
func (c *cache) cleaner(ctx context.Context) {
timer := time.NewTicker(c.opt.CachePollInterval)
defer timer.Stop()
for {
select {
case <-timer.C:
c.clean()
case <-ctx.Done():
fs.Debugf(nil, "cache cleaner exiting")
return
}
}
}

144
vfs/cache_test.go Normal file
View file

@ -0,0 +1,144 @@
package vfs
import (
"io/ioutil"
"log"
"os"
"path/filepath"
"testing"
"time"
"github.com/djherbis/times"
"github.com/ncw/rclone/fstest"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context" // switch to "context" when we stop supporting go1.6
)
// Check CacheMode it satisfies the pflag interface
var _ pflag.Value = (*CacheMode)(nil)
func TestCacheModeString(t *testing.T) {
assert.Equal(t, "off", CacheModeOff.String())
assert.Equal(t, "full", CacheModeFull.String())
assert.Equal(t, "CacheMode(17)", CacheMode(17).String())
}
func TestCacheModeSet(t *testing.T) {
var m CacheMode
err := m.Set("full")
assert.NoError(t, err)
assert.Equal(t, CacheModeFull, m)
err = m.Set("potato")
assert.Error(t, err, "Unknown cache mode level")
err = m.Set("")
assert.Error(t, err, "Unknown cache mode level")
}
func TestCacheModeType(t *testing.T) {
var m CacheMode
assert.Equal(t, "string", m.Type())
}
func TestCacheNew(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c, err := newCache(ctx, r.Fremote, &DefaultOpt)
require.NoError(t, err)
assert.Contains(t, c.root, "vfs")
assert.Contains(t, c.f.Root(), filepath.Base(r.Fremote.Root()))
// mkdir
p, err := c.mkdir("potato")
require.NoError(t, err)
assert.Equal(t, "potato", filepath.Base(p))
fi, err := os.Stat(filepath.Dir(p))
require.NoError(t, err)
assert.True(t, fi.IsDir())
// get
item := c.get("potato")
item2 := c.get("potato")
assert.Equal(t, item, item2)
assert.WithinDuration(t, time.Now(), item.atime, time.Second)
// updateTime
//.. before
t1 := time.Now().Add(-60 * time.Minute)
c.updateTime("potato", t1)
item = c.get("potato")
assert.NotEqual(t, t1, item.atime)
assert.Equal(t, 0, item.opens)
//..after
t2 := time.Now().Add(60 * time.Minute)
c.updateTime("potato", t2)
item = c.get("potato")
assert.Equal(t, t2, item.atime)
assert.Equal(t, 0, item.opens)
// open
c.open("potato")
item = c.get("potato")
assert.WithinDuration(t, time.Now(), item.atime, time.Second)
assert.Equal(t, 1, item.opens)
// write the file
err = ioutil.WriteFile(p, []byte("hello"), 0600)
require.NoError(t, err)
// read its atime
fi, err = os.Stat(p)
assert.NoError(t, err)
atime := times.Get(fi).AccessTime()
// updateAtimes
log.Printf("updateAtimes")
item = c.get("potato")
item.atime = time.Now().Add(-24 * time.Hour)
err = c.updateAtimes()
require.NoError(t, err)
item = c.get("potato")
assert.Equal(t, atime, item.atime)
// try purging with file open
c.purgeOld(10 * time.Second)
_, err = os.Stat(p)
assert.NoError(t, err)
// close
c.updateTime("potato", t2)
c.close("potato")
item = c.get("potato")
assert.WithinDuration(t, time.Now(), item.atime, time.Second)
assert.Equal(t, 0, item.opens)
// try purging with file closed
c.purgeOld(10 * time.Second)
// ...nothing should happend
_, err = os.Stat(p)
assert.NoError(t, err)
//.. purge again with -ve age
c.purgeOld(-10 * time.Second)
_, err = os.Stat(p)
assert.True(t, os.IsNotExist(err))
// clean - have tested the internals already
c.clean()
// cleanup
err = c.cleanUp()
require.NoError(t, err)
_, err = os.Stat(c.root)
assert.True(t, os.IsNotExist(err))
}

View file

@ -306,9 +306,12 @@ func (d *Dir) ReadDirAll() (items Nodes, err error) {
return items, nil
}
// accessModeMask masks off the read modes from the flags
const accessModeMask = (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)
// Open the directory according to the flags provided
func (d *Dir) Open(flags int) (fd Handle, err error) {
rdwrMode := flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)
rdwrMode := flags & accessModeMask
if rdwrMode != os.O_RDONLY {
fs.Errorf(d, "Can only open directories read only")
return nil, EPERM
@ -498,3 +501,8 @@ func (d *Dir) Fsync() error {
func (d *Dir) VFS() *VFS {
return d.vfs
}
// Truncate changes the size of the named file.
func (d *Dir) Truncate(size int64) error {
return ENOSYS
}

View file

@ -282,7 +282,7 @@ func TestDirCreate(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, int64(0), file.Size())
fd, err := file.Open(os.O_WRONLY)
fd, err := file.Open(os.O_WRONLY | os.O_CREATE)
require.NoError(t, err)
// FIXME Note that this fails with the current implementation

View file

@ -189,7 +189,7 @@ func (f *File) setObject(o fs.Object) {
}
// Wait for f.o to become non nil for a short time returning it or an
// error
// error. Use when opening a read handle.
//
// Call without the mutex held
func (f *File) waitForValidObject() (o fs.Object, err error) {
@ -219,9 +219,8 @@ func (f *File) OpenRead() (fh *ReadFileHandle, err error) {
// fs.Debugf(o, "File.OpenRead")
fh, err = newReadFileHandle(f, o)
err = errors.Wrap(err, "open for read")
if err != nil {
err = errors.Wrap(err, "open for read")
fs.Errorf(f, "File.OpenRead failed: %v", err)
return nil, err
}
@ -236,15 +235,32 @@ func (f *File) OpenWrite() (fh *WriteFileHandle, err error) {
// fs.Debugf(o, "File.OpenWrite")
fh, err = newWriteFileHandle(f.d, f, f.path())
err = errors.Wrap(err, "open for write")
if err != nil {
err = errors.Wrap(err, "open for write")
fs.Errorf(f, "File.OpenWrite failed: %v", err)
return nil, err
}
return fh, nil
}
// OpenRW open the file for read and write using a temporay file
//
// It uses the open flags passed in.
func (f *File) OpenRW(flags int) (fh *RWFileHandle, err error) {
if flags&accessModeMask != os.O_RDONLY && f.d.vfs.Opt.ReadOnly {
return nil, EROFS
}
// fs.Debugf(o, "File.OpenRW")
fh, err = newRWFileHandle(f.d, f, f.path(), flags)
if err != nil {
err = errors.Wrap(err, "open for read write")
fs.Errorf(f, "File.OpenRW failed: %v", err)
return nil, err
}
return fh, nil
}
// Fsync the file
//
// Note that we don't do anything except return OK
@ -290,25 +306,85 @@ func (f *File) VFS() *VFS {
}
// Open a file according to the flags provided
//
// O_RDONLY open the file read-only.
// O_WRONLY open the file write-only.
// O_RDWR open the file read-write.
//
// O_APPEND append data to the file when writing.
// O_CREATE create a new file if none exists.
// O_EXCL used with O_CREATE, file must not exist
// O_SYNC open for synchronous I/O.
// O_TRUNC if possible, truncate file when opene
//
// We ignore O_SYNC and O_EXCL
func (f *File) Open(flags int) (fd Handle, err error) {
rdwrMode := flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)
var read bool
var (
write bool // if set need write support
read bool // if set need read support
rdwrMode = flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR)
)
// Figure out the read/write intents
switch {
case rdwrMode == os.O_RDONLY:
read = true
case rdwrMode == os.O_WRONLY || (rdwrMode == os.O_RDWR && (flags&os.O_TRUNC) != 0):
read = false
case rdwrMode == os.O_WRONLY:
write = true
case rdwrMode == os.O_RDWR:
fs.Errorf(f, "Can't open for Read and Write")
return nil, EPERM
read = true
write = true
default:
fs.Errorf(f, "Can't figure out how to open with flags: 0x%X", flags)
return nil, EPERM
}
if read {
fd, err = f.OpenRead()
// If append is set then set read to force OpenRW
if flags&os.O_APPEND != 0 {
read = true
}
// If truncate is set then set write to force OpenRW
if flags&os.O_TRUNC != 0 {
write = true
}
// FIXME discover if file is in cache or not?
// Open the correct sort of handle
CacheMode := f.d.vfs.Opt.CacheMode
if read && write {
if CacheMode >= CacheModeMinimal {
fd, err = f.OpenRW(flags)
} else if flags&os.O_TRUNC != 0 {
fd, err = f.OpenWrite()
} else {
fs.Errorf(f, "Can't open for read and write without cache")
return nil, EPERM
}
} else if write {
if CacheMode >= CacheModeWrites {
fd, err = f.OpenRW(flags)
} else {
fd, err = f.OpenWrite()
}
} else if read {
if CacheMode >= CacheModeFull {
fd, err = f.OpenRW(flags)
} else {
fd, err = f.OpenRead()
}
} else {
fd, err = f.OpenWrite()
fs.Errorf(f, "Can't figure out how to open with flags: 0x%X", flags)
return nil, EPERM
}
return fd, err
}
// Truncate changes the size of the named file.
func (f *File) Truncate(size int64) error {
if f.d.vfs.Opt.CacheMode >= CacheModeWrites {
}
// FIXME
return ENOSYS
}

View file

@ -160,7 +160,7 @@ func TestFileOpen(t *testing.T) {
_, file, _ := fileCreate(t, r)
fd, err := file.Open(os.O_RDONLY)
assert.NoError(t, err)
require.NoError(t, err)
_, ok := fd.(*ReadFileHandle)
assert.True(t, ok)
require.NoError(t, fd.Close())
@ -171,12 +171,6 @@ func TestFileOpen(t *testing.T) {
assert.True(t, ok)
require.NoError(t, fd.Close())
fd, err = file.Open(os.O_RDWR | os.O_TRUNC)
assert.NoError(t, err)
_, ok = fd.(*WriteFileHandle)
assert.True(t, ok)
require.NoError(t, fd.Close())
fd, err = file.Open(os.O_RDWR)
assert.Equal(t, EPERM, err)

406
vfs/read_write.go Normal file
View file

@ -0,0 +1,406 @@
package vfs
import (
"io"
"os"
"sync"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
)
// RWFileHandle is a handle that can be open for read and write.
//
// It will be open to a temporary file which, when closed, will be
// transferred to the remote.
type RWFileHandle struct {
*os.File
mu sync.Mutex
closed bool // set if handle has been closed
o fs.Object // may be nil
remote string
file *File
d *Dir
opened bool
flags int // open flags
osPath string // path to the file in the cache
writeCalled bool // if any Write() methods have been called
}
// Check interfaces
var (
_ io.Reader = (*RWFileHandle)(nil)
_ io.ReaderAt = (*RWFileHandle)(nil)
_ io.Writer = (*RWFileHandle)(nil)
_ io.WriterAt = (*RWFileHandle)(nil)
_ io.Seeker = (*RWFileHandle)(nil)
_ io.Closer = (*RWFileHandle)(nil)
)
func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandle, err error) {
// Make a place for the file
osPath, err := d.vfs.cache.mkdir(remote)
if err != nil {
return nil, errors.Wrap(err, "open RW handle failed to make cache directory")
}
fh = &RWFileHandle{
o: f.o,
file: f,
d: d,
remote: remote,
flags: flags,
osPath: osPath,
}
return fh, nil
}
// openPending opens the file if there is a pending open
//
// call with the lock held
func (fh *RWFileHandle) openPending(truncate bool) (err error) {
if fh.opened {
return nil
}
rdwrMode := fh.flags & accessModeMask
// if not truncating the file, need to read it first
if fh.flags&os.O_TRUNC == 0 && !truncate {
// Fetch the file if it hasn't changed
// FIXME retries
err = fs.CopyFile(fh.d.vfs.cache.f, fh.d.vfs.f, fh.remote, fh.remote)
if err != nil {
// if the object wasn't found AND O_CREATE is set then...
cause := errors.Cause(err)
notFound := cause == fs.ErrorObjectNotFound || cause == fs.ErrorDirNotFound
if notFound {
// Remove cached item if there is one
err = os.Remove(fh.osPath)
if err != nil && !os.IsNotExist(err) {
return errors.Wrap(err, "open RW handle failed to delete stale cache file")
}
}
if notFound && fh.flags&os.O_CREATE != 0 {
// ...ignore error as we are about to create the file
} else {
return errors.Wrap(err, "open RW handle failed to cache file")
}
}
} else {
// Set the size to 0 since we are truncating
fh.file.setSize(0)
}
if rdwrMode != os.O_RDONLY {
fh.file.addWriters(1)
}
fs.Debugf(fh.remote, "Opening cached copy with flags=0x%02X", fh.flags)
fd, err := os.OpenFile(fh.osPath, fh.flags|os.O_CREATE, 0600)
if err != nil {
return errors.Wrap(err, "cache open file failed")
}
fh.File = fd
fh.opened = true
fh.d.vfs.cache.open(fh.osPath)
return nil
}
// String converts it to printable
func (fh *RWFileHandle) String() string {
if fh == nil {
return "<nil *RWFileHandle>"
}
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.file == nil {
return "<nil *RWFileHandle.file>"
}
return fh.file.String() + " (rw)"
}
// Node returns the Node assocuated with this - satisfies Noder interface
func (fh *RWFileHandle) Node() Node {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file
}
// close the file handle returning EBADF if it has been
// closed already.
//
// Must be called with fh.mu held
//
// Note that we leave the file around in the cache on error conditions
// to give the user a chance to recover it.
func (fh *RWFileHandle) close() (err error) {
defer fs.Trace(fh.remote, "")("err=%v", &err)
if fh.closed {
return ECLOSED
}
fh.closed = true
rdwrMode := fh.flags & accessModeMask
if !fh.opened {
// If read only then return
if rdwrMode == os.O_RDONLY {
return nil
}
// If we aren't creating or truncating the file then
// we haven't modified it so don't need to transfer it
if fh.flags&(os.O_CREATE|os.O_TRUNC) == 0 {
return nil
}
// Otherwise open the file
// FIXME this could be more efficient
if err := fh.openPending(false); err != nil {
return err
}
}
if rdwrMode != os.O_RDONLY {
fh.file.addWriters(-1)
fi, err := fh.File.Stat()
if err != nil {
fs.Errorf(fh.remote, "Failed to stat cache file: %v", err)
} else {
fh.file.setSize(fi.Size())
}
}
fh.d.vfs.cache.close(fh.osPath)
// Close the underlying file
err = fh.File.Close()
if err != nil {
return err
}
// FIXME measure whether we actually did any writes or not -
// no writes means no transfer?
if rdwrMode == os.O_RDONLY {
fs.Debugf(fh.remote, "read only so not transferring")
return nil
}
// If write hasn't been called and we aren't creating or
// truncating the file then we haven't modified it so don't
// need to transfer it
if !fh.writeCalled && fh.flags&(os.O_CREATE|os.O_TRUNC) == 0 {
fs.Debugf(fh.remote, "not modified so not transferring")
return nil
}
// Transfer the temp file to the remote
// FIXME retries
if fh.d.vfs.Opt.CacheMode < CacheModeFull {
err = fs.MoveFile(fh.d.vfs.f, fh.d.vfs.cache.f, fh.remote, fh.remote)
} else {
err = fs.CopyFile(fh.d.vfs.f, fh.d.vfs.cache.f, fh.remote, fh.remote)
}
if err != nil {
err = errors.Wrap(err, "failed to transfer file from cache to remote")
fs.Errorf(fh.remote, "%v", err)
return err
}
// FIXME get MoveFile to return this object
o, err := fh.d.vfs.f.NewObject(fh.remote)
if err != nil {
err = errors.Wrap(err, "failed to find object after transfer to remote")
fs.Errorf(fh.remote, "%v", err)
return err
}
fh.file.setObject(o)
fs.Debugf(o, "transferred to remote")
return nil
}
// Close closes the file
func (fh *RWFileHandle) Close() error {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.close()
}
// Flush is called each time the file or directory is closed.
// Because there can be multiple file descriptors referring to a
// single opened file, Flush can be called multiple times.
func (fh *RWFileHandle) Flush() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if !fh.opened {
return nil
}
if fh.closed {
fs.Debugf(fh.remote, "RWFileHandle.Flush nothing to do")
return nil
}
// fs.Debugf(fh.remote, "RWFileHandle.Flush")
if !fh.opened {
fs.Debugf(fh.remote, "RWFileHandle.Flush ignoring flush on unopened handle")
return nil
}
// If Write hasn't been called then ignore the Flush - Release
// will pick it up
if !fh.writeCalled {
fs.Debugf(fh.remote, "RWFileHandle.Flush ignoring flush on unwritten handle")
return nil
}
err := fh.close()
if err != nil {
fs.Errorf(fh.remote, "RWFileHandle.Flush error: %v", err)
} else {
// fs.Debugf(fh.remote, "RWFileHandle.Flush OK")
}
return err
}
// Release is called when we are finished with the file handle
//
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *RWFileHandle) Release() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
fs.Debugf(fh.remote, "RWFileHandle.Release nothing to do")
return nil
}
fs.Debugf(fh.remote, "RWFileHandle.Release closing")
err := fh.close()
if err != nil {
fs.Errorf(fh.remote, "RWFileHandle.Release error: %v", err)
} else {
// fs.Debugf(fh.remote, "RWFileHandle.Release OK")
}
return err
}
// Size returns the size of the underlying file
func (fh *RWFileHandle) Size() int64 {
fh.mu.Lock()
defer fh.mu.Unlock()
if !fh.opened {
return fh.file.Size()
}
fi, err := fh.File.Stat()
if err != nil {
return 0
}
return fi.Size()
}
// Stat returns info about the file
func (fh *RWFileHandle) Stat() (os.FileInfo, error) {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file, nil
}
// Read bytes from the file
func (fh *RWFileHandle) Read(b []byte) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return 0, ECLOSED
}
if err = fh.openPending(false); err != nil {
return n, err
}
return fh.File.Read(b)
}
// ReadAt bytes from the file at off
func (fh *RWFileHandle) ReadAt(b []byte, off int64) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return 0, ECLOSED
}
if err = fh.openPending(false); err != nil {
return n, err
}
return fh.File.ReadAt(b, off)
}
// Seek to new file position
func (fh *RWFileHandle) Seek(offset int64, whence int) (ret int64, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return 0, ECLOSED
}
if err = fh.openPending(false); err != nil {
return ret, err
}
return fh.File.Seek(offset, whence)
}
// writeFn general purpose write call
//
// Pass a closure to do the actual write
func (fh *RWFileHandle) writeFn(write func() error) (err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if err = fh.openPending(false); err != nil {
return err
}
fh.writeCalled = true
err = write()
if err != nil {
return err
}
fi, err := fh.File.Stat()
if err != nil {
return errors.Wrap(err, "failed to stat cache file")
}
fh.file.setSize(fi.Size())
return nil
}
// Write bytes to the file
func (fh *RWFileHandle) Write(b []byte) (n int, err error) {
err = fh.writeFn(func() error {
n, err = fh.File.Write(b)
return err
})
return n, err
}
// WriteAt bytes to the file at off
func (fh *RWFileHandle) WriteAt(b []byte, off int64) (n int, err error) {
err = fh.writeFn(func() error {
n, err = fh.File.WriteAt(b, off)
return err
})
return n, err
}
// WriteString a string to the file
func (fh *RWFileHandle) WriteString(s string) (n int, err error) {
err = fh.writeFn(func() error {
n, err = fh.File.WriteString(s)
return err
})
return n, err
}
// Truncate file to given size
func (fh *RWFileHandle) Truncate(size int64) (err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if err = fh.openPending(size == 0); err != nil {
return err
}
fh.writeCalled = true
fh.file.setSize(size)
return fh.File.Truncate(size)
}

432
vfs/read_write_test.go Normal file
View file

@ -0,0 +1,432 @@
package vfs
import (
"io"
"os"
"testing"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/fstest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// Open a file for write
func rwHandleCreateReadOnly(t *testing.T, r *fstest.Run) (*VFS, *RWFileHandle) {
vfs := New(r.Fremote, nil)
vfs.Opt.CacheMode = CacheModeFull
file1 := r.WriteObject("dir/file1", "0123456789abcdef", t1)
fstest.CheckItems(t, r.Fremote, file1)
h, err := vfs.OpenFile("dir/file1", os.O_RDONLY, 0777)
require.NoError(t, err)
fh, ok := h.(*RWFileHandle)
require.True(t, ok)
return vfs, fh
}
// Open a file for write
func rwHandleCreateWriteOnly(t *testing.T, r *fstest.Run) (*VFS, *RWFileHandle) {
vfs := New(r.Fremote, nil)
vfs.Opt.CacheMode = CacheModeFull
h, err := vfs.OpenFile("file1", os.O_WRONLY|os.O_CREATE, 0777)
require.NoError(t, err)
fh, ok := h.(*RWFileHandle)
require.True(t, ok)
return vfs, fh
}
// read data from the string
func rwReadString(t *testing.T, fh *RWFileHandle, n int) string {
buf := make([]byte, n)
n, err := fh.Read(buf)
if err != io.EOF {
assert.NoError(t, err)
}
return string(buf[:n])
}
func TestRWFileHandleMethodsRead(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
_, fh := rwHandleCreateReadOnly(t, r)
// String
assert.Equal(t, "dir/file1 (rw)", fh.String())
assert.Equal(t, "<nil *RWFileHandle>", (*RWFileHandle)(nil).String())
assert.Equal(t, "<nil *RWFileHandle.file>", new(RWFileHandle).String())
// Node
node := fh.Node()
assert.Equal(t, "file1", node.Name())
// Size
assert.Equal(t, int64(16), fh.Size())
// Read 1
assert.Equal(t, "0", rwReadString(t, fh, 1))
// Read remainder
assert.Equal(t, "123456789abcdef", rwReadString(t, fh, 256))
// Read EOF
buf := make([]byte, 16)
_, err := fh.Read(buf)
assert.Equal(t, io.EOF, err)
// Stat
var fi os.FileInfo
fi, err = fh.Stat()
assert.NoError(t, err)
assert.Equal(t, int64(16), fi.Size())
assert.Equal(t, "file1", fi.Name())
// Close
assert.False(t, fh.closed)
assert.Equal(t, nil, fh.Close())
assert.True(t, fh.closed)
// Close again
assert.Equal(t, ECLOSED, fh.Close())
}
func TestRWFileHandleSeek(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
_, fh := rwHandleCreateReadOnly(t, r)
assert.Equal(t, "0", rwReadString(t, fh, 1))
// 0 means relative to the origin of the file,
n, err := fh.Seek(5, 0)
assert.NoError(t, err)
assert.Equal(t, int64(5), n)
assert.Equal(t, "5", rwReadString(t, fh, 1))
// 1 means relative to the current offset
n, err = fh.Seek(-3, 1)
assert.NoError(t, err)
assert.Equal(t, int64(3), n)
assert.Equal(t, "3", rwReadString(t, fh, 1))
// 2 means relative to the end.
n, err = fh.Seek(-3, 2)
assert.NoError(t, err)
assert.Equal(t, int64(13), n)
assert.Equal(t, "d", rwReadString(t, fh, 1))
// Seek off the end
n, err = fh.Seek(100, 0)
assert.NoError(t, err)
// Get the error on read
buf := make([]byte, 16)
l, err := fh.Read(buf)
assert.Equal(t, io.EOF, err)
assert.Equal(t, 0, l)
// Close
assert.Equal(t, nil, fh.Close())
}
func TestRWFileHandleReadAt(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
_, fh := rwHandleCreateReadOnly(t, r)
// read from start
buf := make([]byte, 1)
n, err := fh.ReadAt(buf, 0)
require.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, "0", string(buf[:n]))
// seek forwards
n, err = fh.ReadAt(buf, 5)
require.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, "5", string(buf[:n]))
// seek backwards
n, err = fh.ReadAt(buf, 1)
require.NoError(t, err)
assert.Equal(t, 1, n)
assert.Equal(t, "1", string(buf[:n]))
// read exactly to the end
buf = make([]byte, 6)
n, err = fh.ReadAt(buf, 10)
require.NoError(t, err)
assert.Equal(t, 6, n)
assert.Equal(t, "abcdef", string(buf[:n]))
// read off the end
buf = make([]byte, 256)
n, err = fh.ReadAt(buf, 10)
assert.Equal(t, io.EOF, err)
assert.Equal(t, 6, n)
assert.Equal(t, "abcdef", string(buf[:n]))
// read starting off the end
n, err = fh.ReadAt(buf, 100)
assert.Equal(t, io.EOF, err)
assert.Equal(t, 0, n)
// Properly close the file
assert.NoError(t, fh.Close())
// check reading on closed file
n, err = fh.ReadAt(buf, 100)
assert.Equal(t, ECLOSED, err)
}
func TestRWFileHandleFlushRead(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
_, fh := rwHandleCreateReadOnly(t, r)
// Check Flush does nothing if read not called
err := fh.Flush()
assert.NoError(t, err)
assert.False(t, fh.closed)
// Read data
buf := make([]byte, 256)
n, err := fh.Read(buf)
assert.True(t, err == io.EOF || err == nil)
assert.Equal(t, 16, n)
// Check Flush does nothing if read called
err = fh.Flush()
assert.NoError(t, err)
assert.False(t, fh.closed)
// Check flush does nothing if called again
err = fh.Flush()
assert.NoError(t, err)
assert.False(t, fh.closed)
// Properly close the file
assert.NoError(t, fh.Close())
}
func TestRWFileHandleReleaseRead(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
_, fh := rwHandleCreateReadOnly(t, r)
// Read data
buf := make([]byte, 256)
n, err := fh.Read(buf)
assert.True(t, err == io.EOF || err == nil)
assert.Equal(t, 16, n)
// Check Release closes file
err = fh.Release()
assert.NoError(t, err)
assert.True(t, fh.closed)
// Check Release does nothing if called again
err = fh.Release()
assert.NoError(t, err)
assert.True(t, fh.closed)
}
/// ------------------------------------------------------------
func TestRWFileHandleMethodsWrite(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
vfs, fh := rwHandleCreateWriteOnly(t, r)
// String
assert.Equal(t, "file1 (rw)", fh.String())
assert.Equal(t, "<nil *RWFileHandle>", (*RWFileHandle)(nil).String())
assert.Equal(t, "<nil *RWFileHandle.file>", new(RWFileHandle).String())
// Node
node := fh.Node()
assert.Equal(t, "file1", node.Name())
offset := func() int64 {
n, err := fh.Seek(0, 1)
require.NoError(t, err)
return n
}
// Offset #1
assert.Equal(t, int64(0), offset())
assert.Equal(t, int64(0), node.Size())
// Size #1
assert.Equal(t, int64(0), fh.Size())
// Write
n, err := fh.Write([]byte("hello"))
assert.NoError(t, err)
assert.Equal(t, 5, n)
// Offset #2
assert.Equal(t, int64(5), offset())
assert.Equal(t, int64(5), node.Size())
// Size #2
assert.Equal(t, int64(5), fh.Size())
// WriteString
n, err = fh.WriteString(" world!")
assert.NoError(t, err)
assert.Equal(t, 7, n)
// Stat
var fi os.FileInfo
fi, err = fh.Stat()
assert.NoError(t, err)
assert.Equal(t, int64(12), fi.Size())
assert.Equal(t, "file1", fi.Name())
// Truncate
err = fh.Truncate(11)
assert.NoError(t, err)
// Close
assert.NoError(t, fh.Close())
// Check double close
err = fh.Close()
assert.Equal(t, ECLOSED, err)
// check vfs
root, err := vfs.Root()
checkListing(t, root, []string{"file1,11,false"})
// check the underlying r.Fremote but not the modtime
file1 := fstest.NewItem("file1", "hello world", t1)
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, []string{}, fs.ModTimeNotSupported)
}
func TestRWFileHandleWriteAt(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
vfs, fh := rwHandleCreateWriteOnly(t, r)
offset := func() int64 {
n, err := fh.Seek(0, 1)
require.NoError(t, err)
return n
}
// Preconditions
assert.Equal(t, int64(0), offset())
assert.False(t, fh.writeCalled)
// Write the data
n, err := fh.WriteAt([]byte("hello**"), 0)
assert.NoError(t, err)
assert.Equal(t, 7, n)
// After write
assert.Equal(t, int64(0), offset())
assert.True(t, fh.writeCalled)
// Write more data
n, err = fh.WriteAt([]byte(" world"), 5)
assert.NoError(t, err)
assert.Equal(t, 6, n)
// Close
assert.NoError(t, fh.Close())
// Check can't write on closed handle
n, err = fh.WriteAt([]byte("hello"), 0)
assert.Equal(t, ECLOSED, err)
assert.Equal(t, 0, n)
// check vfs
root, err := vfs.Root()
checkListing(t, root, []string{"file1,11,false"})
// check the underlying r.Fremote but not the modtime
file1 := fstest.NewItem("file1", "hello world", t1)
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1}, []string{}, fs.ModTimeNotSupported)
}
func TestRWFileHandleWriteNoWrite(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
vfs, fh := rwHandleCreateWriteOnly(t, r)
// Close the file without writing to it
err := fh.Close()
assert.NoError(t, err)
// Create a different file (not in the cache)
h, err := vfs.OpenFile("file2", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0777)
require.NoError(t, err)
// Close it with Flush and Release
err = h.Flush()
assert.NoError(t, err)
err = h.Release()
assert.NoError(t, err)
// check vfs
root, err := vfs.Root()
checkListing(t, root, []string{"file1,0,false", "file2,0,false"})
// check the underlying r.Fremote but not the modtime
file1 := fstest.NewItem("file1", "", t1)
file2 := fstest.NewItem("file2", "", t1)
fstest.CheckListingWithPrecision(t, r.Fremote, []fstest.Item{file1, file2}, []string{}, fs.ModTimeNotSupported)
}
func TestRWFileHandleFlushWrite(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
_, fh := rwHandleCreateWriteOnly(t, r)
// Check Flush does nothing if write not called
err := fh.Flush()
assert.NoError(t, err)
assert.False(t, fh.closed)
// Write some data
n, err := fh.Write([]byte("hello"))
assert.NoError(t, err)
assert.Equal(t, 5, n)
// Check Flush closes file if write called
err = fh.Flush()
assert.NoError(t, err)
assert.True(t, fh.closed)
// Check flush does nothing if called again
err = fh.Flush()
assert.NoError(t, err)
assert.True(t, fh.closed)
}
func TestRWFileHandleReleaseWrite(t *testing.T) {
r := fstest.NewRun(t)
defer r.Finalise()
_, fh := rwHandleCreateWriteOnly(t, r)
// Write some data
n, err := fh.Write([]byte("hello"))
assert.NoError(t, err)
assert.Equal(t, 5, n)
// Check Release closes file
err = fh.Release()
assert.NoError(t, err)
assert.True(t, fh.closed)
// Check Release does nothing if called again
err = fh.Release()
assert.NoError(t, err)
assert.True(t, fh.closed)
}

View file

@ -27,21 +27,25 @@ import (
"time"
"github.com/ncw/rclone/fs"
"golang.org/x/net/context" // switch to "context" when we stop supporting go1.6
)
// DefaultOpt is the default values uses for Opt
var DefaultOpt = Options{
NoModTime: false,
NoChecksum: false,
NoSeek: false,
DirCacheTime: 5 * 60 * time.Second,
PollInterval: time.Minute,
ReadOnly: false,
Umask: 0,
UID: ^uint32(0), // these values instruct WinFSP-FUSE to use the current user
GID: ^uint32(0), // overriden for non windows in mount_unix.go
DirPerms: os.FileMode(0777) | os.ModeDir,
FilePerms: os.FileMode(0666),
NoModTime: false,
NoChecksum: false,
NoSeek: false,
DirCacheTime: 5 * 60 * time.Second,
PollInterval: time.Minute,
ReadOnly: false,
Umask: 0,
UID: ^uint32(0), // these values instruct WinFSP-FUSE to use the current user
GID: ^uint32(0), // overriden for non windows in mount_unix.go
DirPerms: os.FileMode(0777) | os.ModeDir,
FilePerms: os.FileMode(0666),
CacheMode: CacheModeOff,
CacheMaxAge: 3600 * time.Second,
CachePollInterval: 60 * time.Second,
}
// Node represents either a directory (*Dir) or a file (*File)
@ -56,6 +60,7 @@ type Node interface {
DirEntry() fs.DirEntry
VFS() *VFS
Open(flags int) (Handle, error)
Truncate(size int64) error
}
// Check interfaces
@ -84,12 +89,12 @@ var (
_ Noder = (*Dir)(nil)
_ Noder = (*ReadFileHandle)(nil)
_ Noder = (*WriteFileHandle)(nil)
_ Noder = (*RWFileHandle)(nil)
_ Noder = (*DirHandle)(nil)
)
// Handle is the interface statisified by open files or directories.
// It is the methods on *os.File. Not all of them are supported.
type Handle interface {
// OsFiler is the methods on *os.File
type OsFiler interface {
Chdir() error
Chmod(mode os.FileMode) error
Chown(uid, gid int) error
@ -107,10 +112,18 @@ type Handle interface {
Write(b []byte) (n int, err error)
WriteAt(b []byte, off int64) (n int, err error)
WriteString(s string) (n int, err error)
}
// Handle is the interface statisified by open files or directories.
// It is the methods on *os.File, plus a few more useful for FUSE
// filingsystems. Not all of them are supported.
type Handle interface {
OsFiler
// Additional methods useful for FUSE filesystems
Flush() error
Release() error
Node() Node
// Size() int64
}
// baseHandle implements all the missing methods
@ -137,34 +150,42 @@ func (h baseHandle) Flush() (err error) { retu
func (h baseHandle) Release() (err error) { return ENOSYS }
func (h baseHandle) Node() Node { return nil }
//func (h baseHandle) Size() int64 { return 0 }
// Check interfaces
var (
_ Handle = (*baseHandle)(nil)
_ Handle = (*ReadFileHandle)(nil)
_ Handle = (*WriteFileHandle)(nil)
_ Handle = (*DirHandle)(nil)
_ OsFiler = (*os.File)(nil)
_ Handle = (*baseHandle)(nil)
_ Handle = (*ReadFileHandle)(nil)
_ Handle = (*WriteFileHandle)(nil)
_ Handle = (*DirHandle)(nil)
)
// VFS represents the top level filing system
type VFS struct {
f fs.Fs
root *Dir
Opt Options
f fs.Fs
root *Dir
Opt Options
cache *cache
cancel context.CancelFunc
}
// Options is options for creating the vfs
type Options struct {
NoSeek bool // don't allow seeking if set
NoChecksum bool // don't check checksums if set
ReadOnly bool // if set VFS is read only
NoModTime bool // don't read mod times for files
DirCacheTime time.Duration // how long to consider directory listing cache valid
PollInterval time.Duration
Umask int
UID uint32
GID uint32
DirPerms os.FileMode
FilePerms os.FileMode
NoSeek bool // don't allow seeking if set
NoChecksum bool // don't check checksums if set
ReadOnly bool // if set VFS is read only
NoModTime bool // don't read mod times for files
DirCacheTime time.Duration // how long to consider directory listing cache valid
PollInterval time.Duration
Umask int
UID uint32
GID uint32
DirPerms os.FileMode
FilePerms os.FileMode
CacheMode CacheMode
CacheMaxAge time.Duration
CachePollInterval time.Duration
}
// New creates a new VFS and root directory. If opt is nil, then
@ -200,9 +221,32 @@ func New(f fs.Fs, opt *Options) *VFS {
fs.Logf(f, "poll-interval is not supported by this remote")
}
}
// Create the cache
ctx, cancel := context.WithCancel(context.Background())
vfs.cancel = cancel
cache, err := newCache(ctx, f, &vfs.Opt) // FIXME pass on context or get from Opt?
if err != nil {
// FIXME
panic(fmt.Sprintf("failed to create local cache: %v", err))
}
vfs.cache = cache
return vfs
}
// Shutdown stops any background go-routines
func (vfs *VFS) Shutdown() {
if vfs.cancel != nil {
vfs.cancel()
vfs.cancel = nil
}
}
// CleanUp deletes the contents of the cache
func (vfs *VFS) CleanUp() error {
return vfs.cache.cleanUp()
}
// Root returns the root node
func (vfs *VFS) Root() (*Dir, error) {
// fs.Debugf(vfs.f, "Root()")

View file

@ -19,5 +19,8 @@ func AddFlags(flags *pflag.FlagSet) {
flags.DurationVarP(&Opt.DirCacheTime, "dir-cache-time", "", Opt.DirCacheTime, "Time to cache directory entries for.")
flags.DurationVarP(&Opt.PollInterval, "poll-interval", "", Opt.PollInterval, "Time to wait between polling for changes. Must be smaller than dir-cache-time. Only on supported remotes. Set to 0 to disable.")
flags.BoolVarP(&Opt.ReadOnly, "read-only", "", Opt.ReadOnly, "Mount read-only.")
flags.VarP(&Opt.CacheMode, "cache-mode", "", "Cache mode off|minimal|writes|full")
flags.DurationVarP(&Opt.CachePollInterval, "cache-poll-interval", "", Opt.CachePollInterval, "Interval to poll the cache for stale objects.")
flags.DurationVarP(&Opt.CacheMaxAge, "cache-max-age", "", Opt.CacheMaxAge, "Max age of objects in the cache.")
platformFlags(flags)
}