mount: factor filesystem code into mountlib and mounttest

This commit is contained in:
Nick Craig-Wood 2017-05-02 22:35:07 +01:00
parent 0c92a64bb3
commit 268fe0004c
19 changed files with 1430 additions and 980 deletions

View file

@ -5,12 +5,11 @@ package mount
import (
"os"
"path"
"strings"
"sync"
"time"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/ncw/rclone/cmd/mountlib"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
"golang.org/x/net/context"
@ -28,187 +27,13 @@ type DirEntry struct {
// Dir represents a directory entry
type Dir struct {
f fs.Fs
path string
modTime time.Time
mu sync.RWMutex // protects the following
read time.Time // time directory entry last read
items map[string]*DirEntry
}
func newDir(f fs.Fs, fsDir *fs.Dir) *Dir {
return &Dir{
f: f,
path: fsDir.Name,
modTime: fsDir.When,
}
}
// ForgetAll ensures the directory and all its children are purged
// from the cache.
func (d *Dir) ForgetAll() {
d.ForgetPath("")
}
// ForgetPath clears the cache for itself and all subdirectories if
// they match the given path. The path is specified relative from the
// directory it is called from.
// It is not possible to traverse the directory tree upwards, i.e.
// you cannot clear the cache for the Dir's ancestors or siblings.
func (d *Dir) ForgetPath(relativePath string) {
absPath := path.Join(d.path, relativePath)
if absPath == "." {
absPath = ""
}
d.walk(absPath, func(dir *Dir) {
fs.Debugf(dir.path, "forgetting directory cache")
dir.read = time.Time{}
dir.items = nil
})
}
// walk runs a function on all directories whose path matches
// the given absolute one. It will be called on a directory's
// children first. It will not apply the function to parent
// nodes, regardless of the given path.
func (d *Dir) walk(absPath string, fun func(*Dir)) {
if d.items != nil {
for _, entry := range d.items {
if dir, ok := entry.node.(*Dir); ok {
dir.walk(absPath, fun)
}
}
}
if d.path == absPath || absPath == "" || strings.HasPrefix(d.path, absPath+"/") {
d.mu.Lock()
defer d.mu.Unlock()
fun(d)
}
}
// rename should be called after the directory is renamed
//
// Reset the directory to new state, discarding all the objects and
// reading everything again
func (d *Dir) rename(newParent *Dir, fsDir *fs.Dir) {
d.ForgetAll()
d.path = fsDir.Name
d.modTime = fsDir.When
d.read = time.Time{}
}
// addObject adds a new object or directory to the directory
//
// note that we add new objects rather than updating old ones
func (d *Dir) addObject(o fs.BasicInfo, node fusefs.Node) *DirEntry {
item := &DirEntry{
o: o,
node: node,
}
d.mu.Lock()
d.items[path.Base(o.Remote())] = item
d.mu.Unlock()
return item
}
// delObject removes an object from the directory
func (d *Dir) delObject(leaf string) {
d.mu.Lock()
delete(d.items, leaf)
d.mu.Unlock()
}
// read the directory
func (d *Dir) readDir() error {
d.mu.Lock()
defer d.mu.Unlock()
when := time.Now()
if d.read.IsZero() {
fs.Debugf(d.path, "Reading directory")
} else {
age := when.Sub(d.read)
if age < dirCacheTime {
return nil
}
fs.Debugf(d.path, "Re-reading directory (%v old)", age)
}
entries, err := fs.ListDirSorted(d.f, false, d.path)
if err == fs.ErrorDirNotFound {
// We treat directory not found as empty because we
// create directories on the fly
} else if err != nil {
return err
}
// NB when we re-read a directory after its cache has expired
// we drop the old files which should lead to correct
// behaviour but may not be very efficient.
// Keep a note of the previous contents of the directory
oldItems := d.items
// Cache the items by name
d.items = make(map[string]*DirEntry, len(entries))
for _, entry := range entries {
switch item := entry.(type) {
case fs.Object:
obj := item
name := path.Base(obj.Remote())
d.items[name] = &DirEntry{
o: obj,
node: nil,
}
case *fs.Dir:
dir := item
name := path.Base(dir.Remote())
// Use old dir value if it exists
if oldItem, ok := oldItems[name]; ok {
if _, ok := oldItem.o.(*fs.Dir); ok {
d.items[name] = oldItem
continue
}
}
d.items[name] = &DirEntry{
o: dir,
node: nil,
}
default:
err = errors.Errorf("unknown type %T", item)
fs.Errorf(d.path, "readDir error: %v", err)
return err
}
}
d.read = when
return nil
}
// lookup a single item in the directory
//
// returns fuse.ENOENT if not found.
func (d *Dir) lookup(leaf string) (*DirEntry, error) {
err := d.readDir()
if err != nil {
return nil, err
}
d.mu.RLock()
item, ok := d.items[leaf]
d.mu.RUnlock()
if !ok {
return nil, fuse.ENOENT
}
return item, nil
}
// Check to see if a directory is empty
func (d *Dir) isEmpty() (bool, error) {
err := d.readDir()
if err != nil {
return false, err
}
d.mu.RLock()
defer d.mu.RUnlock()
return len(d.items) == 0, nil
*mountlib.Dir
// f fs.Fs
// path string
// modTime time.Time
// mu sync.RWMutex // protects the following
// read time.Time // time directory entry last read
// items map[string]*DirEntry
}
// Check interface satsified
@ -219,12 +44,13 @@ func (d *Dir) Attr(ctx context.Context, a *fuse.Attr) error {
a.Gid = gid
a.Uid = uid
a.Mode = os.ModeDir | dirPerms
a.Atime = d.modTime
a.Mtime = d.modTime
a.Ctime = d.modTime
a.Crtime = d.modTime
modTime := d.ModTime()
a.Atime = modTime
a.Mtime = modTime
a.Ctime = modTime
a.Crtime = modTime
// FIXME include Valid so get some caching?
fs.Debugf(d.path, "Dir.Attr %+v", a)
// FIXME fs.Debugf(d.path, "Dir.Attr %+v", a)
return nil
}
@ -232,46 +58,18 @@ func (d *Dir) Attr(ctx context.Context, a *fuse.Attr) error {
var _ fusefs.NodeSetattrer = (*Dir)(nil)
// Setattr handles attribute changes from FUSE. Currently supports ModTime only.
func (d *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
func (d *Dir) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) (err error) {
if noModTime {
return nil
}
d.mu.Lock()
defer d.mu.Unlock()
if req.Valid.MtimeNow() {
d.modTime = time.Now()
err = d.SetModTime(time.Now())
} else if req.Valid.Mtime() {
d.modTime = req.Mtime
err = d.SetModTime(req.Mtime)
}
return nil
}
// lookupNode calls lookup then makes sure the node is not nil in the DirEntry
func (d *Dir) lookupNode(leaf string) (item *DirEntry, err error) {
item, err = d.lookup(leaf)
if err != nil {
return nil, err
}
if item.node != nil {
return item, nil
}
var node fusefs.Node
switch x := item.o.(type) {
case fs.Object:
node, err = newFile(d, x), nil
case *fs.Dir:
node, err = newDir(d.f, x), nil
default:
err = errors.Errorf("unknown type %T", item)
}
if err != nil {
return nil, err
}
item = d.addObject(item.o, node)
return item, nil
return translateError(err)
}
// Check interface satisfied
@ -284,17 +82,17 @@ var _ fusefs.NodeRequestLookuper = (*Dir)(nil)
//
// Lookup need not to handle the names "." and "..".
func (d *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fusefs.Node, err error) {
path := path.Join(d.path, req.Name)
fs.Debugf(path, "Dir.Lookup")
item, err := d.lookupNode(req.Name)
mnode, err := d.Dir.Lookup(req.Name)
if err != nil {
if err != fuse.ENOENT {
fs.Errorf(path, "Dir.Lookup error: %v", err)
}
return nil, err
return nil, translateError(err)
}
fs.Debugf(path, "Dir.Lookup OK")
return item.node, nil
switch x := mnode.(type) {
case *mountlib.File:
return &File{x}, nil
case *mountlib.Dir:
return &Dir{x}, nil
}
panic("bad type")
}
// Check interface satisfied
@ -302,17 +100,13 @@ var _ fusefs.HandleReadDirAller = (*Dir)(nil)
// ReadDirAll reads the contents of the directory
func (d *Dir) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) {
fs.Debugf(d.path, "Dir.ReadDirAll")
err = d.readDir()
items, err := d.Dir.ReadDirAll()
if err != nil {
fs.Debugf(d.path, "Dir.ReadDirAll error: %v", err)
return nil, err
return nil, translateError(err)
}
d.mu.RLock()
defer d.mu.RUnlock()
for _, item := range d.items {
for _, item := range items {
var dirent fuse.Dirent
switch x := item.o.(type) {
switch x := item.Obj.(type) {
case fs.Object:
dirent = fuse.Dirent{
// Inode FIXME ???
@ -326,13 +120,10 @@ func (d *Dir) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error)
Name: path.Base(x.Remote()),
}
default:
err = errors.Errorf("unknown type %T", item)
fs.Errorf(d.path, "Dir.ReadDirAll error: %v", err)
return nil, err
return nil, errors.Errorf("unknown type %T", item)
}
dirents = append(dirents, dirent)
}
fs.Debugf(d.path, "Dir.ReadDirAll OK with %d entries", len(dirents))
return dirents, nil
}
@ -340,39 +131,22 @@ var _ fusefs.NodeCreater = (*Dir)(nil)
// Create makes a new file
func (d *Dir) Create(ctx context.Context, req *fuse.CreateRequest, resp *fuse.CreateResponse) (fusefs.Node, fusefs.Handle, error) {
path := path.Join(d.path, req.Name)
fs.Debugf(path, "Dir.Create")
src := newCreateInfo(d.f, path)
// This gets added to the directory when the file is written
file := newFile(d, nil)
fh, err := newWriteFileHandle(d, file, src)
file, fh, err := d.Dir.Create(req.Name)
if err != nil {
fs.Errorf(path, "Dir.Create error: %v", err)
return nil, nil, err
return nil, nil, translateError(err)
}
fs.Debugf(path, "Dir.Create OK")
return file, fh, nil
return &File{file}, &WriteFileHandle{fh}, err
}
var _ fusefs.NodeMkdirer = (*Dir)(nil)
// Mkdir creates a new directory
func (d *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fusefs.Node, error) {
path := path.Join(d.path, req.Name)
fs.Debugf(path, "Dir.Mkdir")
err := d.f.Mkdir(path)
dir, err := d.Dir.Mkdir(req.Name)
if err != nil {
fs.Errorf(path, "Dir.Mkdir failed to create directory: %v", err)
return nil, err
return nil, translateError(err)
}
fsDir := &fs.Dir{
Name: path,
When: time.Now(),
}
dir := newDir(d.f, fsDir)
d.addObject(fsDir, dir)
fs.Debugf(path, "Dir.Mkdir OK")
return dir, nil
return &Dir{dir}, nil
}
var _ fusefs.NodeRemover = (*Dir)(nil)
@ -381,46 +155,10 @@ var _ fusefs.NodeRemover = (*Dir)(nil)
// the receiver, which must be a directory. The entry to be removed
// may correspond to a file (unlink) or to a directory (rmdir).
func (d *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
path := path.Join(d.path, req.Name)
fs.Debugf(path, "Dir.Remove")
item, err := d.lookupNode(req.Name)
err := d.Dir.Remove(req.Name)
if err != nil {
fs.Errorf(path, "Dir.Remove error: %v", err)
return err
return translateError(err)
}
switch x := item.o.(type) {
case fs.Object:
err = x.Remove()
if err != nil {
fs.Errorf(path, "Dir.Remove file error: %v", err)
return err
}
case *fs.Dir:
// Check directory is empty first
dir := item.node.(*Dir)
empty, err := dir.isEmpty()
if err != nil {
fs.Errorf(path, "Dir.Remove dir error: %v", err)
return err
}
if !empty {
// return fuse.ENOTEMPTY - doesn't exist though so use EEXIST
fs.Errorf(path, "Dir.Remove not empty")
return fuse.EEXIST
}
// remove directory
err = d.f.Rmdir(path)
if err != nil {
fs.Errorf(path, "Dir.Remove failed to remove directory: %v", err)
return err
}
default:
fs.Errorf(path, "Dir.Remove unknown type %T", item)
return errors.Errorf("unknown type %T", item)
}
// Remove the item from the directory listing
d.delObject(req.Name)
fs.Debugf(path, "Dir.Remove OK")
return nil
}
@ -429,82 +167,16 @@ var _ fusefs.NodeRenamer = (*Dir)(nil)
// Rename the file
func (d *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fusefs.Node) error {
oldPath := path.Join(d.path, req.OldName)
destDir, ok := newDir.(*Dir)
if !ok {
err := errors.Errorf("Unknown Dir type %T", newDir)
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
return errors.Errorf("Unknown Dir type %T", newDir)
}
newPath := path.Join(destDir.path, req.NewName)
fs.Debugf(oldPath, "Dir.Rename to %q", newPath)
oldItem, err := d.lookupNode(req.OldName)
err := d.Dir.Rename(req.OldName, req.NewName, destDir.Dir)
if err != nil {
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
var newObj fs.BasicInfo
oldNode := oldItem.node
switch x := oldItem.o.(type) {
case fs.Object:
oldObject := x
// FIXME: could Copy then Delete if Move not available
// - though care needed if case insensitive...
doMove := d.f.Features().Move
if doMove == nil {
err := errors.Errorf("Fs %q can't rename files (no Move)", d.f)
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
newObject, err := doMove(oldObject, newPath)
if err != nil {
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
newObj = newObject
// Update the node with the new details
if oldNode != nil {
if oldFile, ok := oldNode.(*File); ok {
fs.Debugf(oldItem.o, "Updating file with %v %p", newObject, oldFile)
oldFile.rename(destDir, newObject)
}
}
case *fs.Dir:
doDirMove := d.f.Features().DirMove
if doDirMove == nil {
err := errors.Errorf("Fs %q can't rename directories (no DirMove)", d.f)
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
srcRemote := x.Name
dstRemote := newPath
err = doDirMove(d.f, srcRemote, dstRemote)
if err != nil {
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
newDir := new(fs.Dir)
*newDir = *x
newDir.Name = newPath
newObj = newDir
// Update the node with the new details
if oldNode != nil {
if oldDir, ok := oldNode.(*Dir); ok {
fs.Debugf(oldItem.o, "Updating dir with %v %p", newDir, oldDir)
oldDir.rename(destDir, newDir)
}
}
default:
err = errors.Errorf("unknown type %T", oldItem)
fs.Errorf(d.path, "Dir.ReadDirAll error: %v", err)
return err
return translateError(err)
}
// Show moved - delete from old dir and add to new
d.delObject(req.OldName)
destDir.addObject(newObj, oldNode)
fs.Debugf(newPath, "Dir.Rename renamed from %q", oldPath)
return nil
}
@ -512,8 +184,10 @@ func (d *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDir fusefs
var _ fusefs.NodeFsyncer = (*Dir)(nil)
// Fsync the directory
//
// Note that we don't do anything except return OK
func (d *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
err := d.Dir.Fsync()
if err != nil {
return translateError(err)
}
return nil
}

View file

@ -3,48 +3,24 @@
package mount
import (
"sync"
"sync/atomic"
"time"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/ncw/rclone/fs"
"github.com/ncw/rclone/cmd/mountlib"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// File represents a file
type File struct {
size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned
d *Dir // parent directory - read only
mu sync.RWMutex // protects the following
o fs.Object // NB o may be nil if file is being written
writers int // number of writers for this file
pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written
}
// newFile creates a new File
func newFile(d *Dir, o fs.Object) *File {
return &File{
d: d,
o: o,
}
}
// rename should be called to update f.o and f.d after a rename
func (f *File) rename(d *Dir, o fs.Object) {
f.mu.Lock()
f.o = o
f.d = d
f.mu.Unlock()
}
// addWriters increments or decrements the writers
func (f *File) addWriters(n int) {
f.mu.Lock()
f.writers += n
f.mu.Unlock()
*mountlib.File
// size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned
// d *Dir // parent directory - read only
// mu sync.RWMutex // protects the following
// o fs.Object // NB o may be nil if file is being written
// writers int // number of writers for this file
// pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written
}
// Check interface satisfied
@ -52,32 +28,19 @@ var _ fusefs.Node = (*File)(nil)
// Attr fills out the attributes for the file
func (f *File) Attr(ctx context.Context, a *fuse.Attr) error {
f.mu.Lock()
defer f.mu.Unlock()
modTime, Size, Blocks, err := f.File.Attr(noModTime)
if err != nil {
return translateError(err)
}
a.Gid = gid
a.Uid = uid
a.Mode = filePerms
// if o is nil it isn't valid yet, so return the size so far
if f.o == nil {
a.Size = uint64(atomic.LoadInt64(&f.size))
if !noModTime && !f.pendingModTime.IsZero() {
a.Atime = f.pendingModTime
a.Mtime = f.pendingModTime
a.Ctime = f.pendingModTime
a.Crtime = f.pendingModTime
}
} else {
a.Size = uint64(f.o.Size())
if !noModTime {
modTime := f.o.ModTime()
a.Atime = modTime
a.Mtime = modTime
a.Ctime = modTime
a.Crtime = modTime
}
}
a.Blocks = (a.Size + 511) / 512
fs.Debugf(f.o, "File.Attr %+v", a)
a.Size = Size
a.Atime = modTime
a.Mtime = modTime
a.Ctime = modTime
a.Crtime = modTime
a.Blocks = Blocks
return nil
}
@ -89,83 +52,13 @@ func (f *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse
if noModTime {
return nil
}
f.mu.Lock()
defer f.mu.Unlock()
var err error
if req.Valid.MtimeNow() {
f.pendingModTime = time.Now()
err = f.File.SetModTime(time.Now())
} else if req.Valid.Mtime() {
f.pendingModTime = req.Mtime
err = f.File.SetModTime(req.Mtime)
}
if f.o != nil {
return f.applyPendingModTime()
}
// queue up for later, hoping f.o becomes available
return nil
}
// call with the mutex held
func (f *File) applyPendingModTime() error {
defer func() { f.pendingModTime = time.Time{} }()
if f.pendingModTime.IsZero() {
return nil
}
if f.o == nil {
return errors.New("Cannot apply ModTime, file object is not available")
}
err := f.o.SetModTime(f.pendingModTime)
switch err {
case nil:
fs.Debugf(f.o, "File.applyPendingModTime OK")
case fs.ErrorCantSetModTime:
// do nothing, in order to not break "touch somefile" if it exists already
default:
fs.Errorf(f.o, "File.applyPendingModTime error: %v", err)
return err
}
return nil
}
// Update the size while writing
func (f *File) written(n int64) {
atomic.AddInt64(&f.size, n)
}
// Update the object when written
func (f *File) setObject(o fs.Object) {
f.mu.Lock()
defer f.mu.Unlock()
f.o = o
_ = f.applyPendingModTime()
f.d.addObject(o, f)
}
// Wait for f.o to become non nil for a short time returning it or an
// error
//
// Call without the mutex held
func (f *File) waitForValidObject() (o fs.Object, err error) {
for i := 0; i < 50; i++ {
f.mu.Lock()
o = f.o
writers := f.writers
f.mu.Unlock()
if o != nil {
return o, nil
}
if writers == 0 {
return nil, errors.New("can't open file - writer failed")
}
time.Sleep(100 * time.Millisecond)
}
return nil, fuse.ENOENT
return translateError(err)
}
// Check interface satisfied
@ -173,25 +66,19 @@ var _ fusefs.NodeOpener = (*File)(nil)
// Open the file for read or write
func (f *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fh fusefs.Handle, err error) {
// if o is nil it isn't valid yet
o, err := f.waitForValidObject()
if err != nil {
return nil, err
}
fs.Debugf(o, "File.Open %v", req.Flags)
switch {
case req.Flags.IsReadOnly():
if noSeek {
resp.Flags |= fuse.OpenNonSeekable
}
fh, err = newReadFileHandle(o)
err = errors.Wrap(err, "open for read")
var rfh *mountlib.ReadFileHandle
rfh, err = f.File.OpenRead()
fh = &ReadFileHandle{rfh}
case req.Flags.IsWriteOnly() || (req.Flags.IsReadWrite() && (req.Flags&fuse.OpenTruncate) != 0):
resp.Flags |= fuse.OpenNonSeekable
src := newCreateInfo(f.d.f, o.Remote())
fh, err = newWriteFileHandle(f.d, f, src)
err = errors.Wrap(err, "open for write")
var wfh *mountlib.WriteFileHandle
wfh, err = f.File.OpenWrite()
fh = &WriteFileHandle{wfh}
case req.Flags.IsReadWrite():
err = errors.New("can't open for read and write simultaneously")
default:
@ -211,8 +98,7 @@ func (f *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenR
*/
if err != nil {
fs.Errorf(o, "File.Open failed: %v", err)
return nil, err
return nil, translateError(err)
}
return fh, nil
}

View file

@ -5,114 +5,42 @@
package mount
import (
"os"
"os/signal"
"syscall"
"time"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/ncw/rclone/cmd/mountlib"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// FS represents the top level filing system
type FS struct {
f fs.Fs
rootDir *Dir
*mountlib.FS
f fs.Fs
}
// Check interface satistfied
var _ fusefs.FS = (*FS)(nil)
// NewFS makes a new FS
func NewFS(f fs.Fs) *FS {
fsys := &FS{
FS: mountlib.NewFS(f),
f: f,
}
if noSeek {
fsys.FS.NoSeek()
}
return fsys
}
// Root returns the root node
func (f *FS) Root() (fusefs.Node, error) {
fs.Debugf(f.f, "Root()")
if f.rootDir == nil {
fsDir := &fs.Dir{
Name: "",
When: time.Now(),
}
f.rootDir = newDir(f.f, fsDir)
}
return f.rootDir, nil
}
// mountOptions configures the options from the command line flags
func mountOptions(device string) (options []fuse.MountOption) {
options = []fuse.MountOption{
fuse.MaxReadahead(uint32(maxReadAhead)),
fuse.Subtype("rclone"),
fuse.FSName(device), fuse.VolumeName(device),
fuse.NoAppleDouble(),
fuse.NoAppleXattr(),
// Options from benchmarking in the fuse module
//fuse.MaxReadahead(64 * 1024 * 1024),
//fuse.AsyncRead(), - FIXME this causes
// ReadFileHandle.Read error: read /home/files/ISOs/xubuntu-15.10-desktop-amd64.iso: bad file descriptor
// which is probably related to errors people are having
//fuse.WritebackCache(),
}
if allowNonEmpty {
options = append(options, fuse.AllowNonEmptyMount())
}
if allowOther {
options = append(options, fuse.AllowOther())
}
if allowRoot {
options = append(options, fuse.AllowRoot())
}
if defaultPermissions {
options = append(options, fuse.DefaultPermissions())
}
if readOnly {
options = append(options, fuse.ReadOnly())
}
if writebackCache {
options = append(options, fuse.WritebackCache())
}
return options
}
// mount the file system
//
// The mount point will be ready when this returns.
//
// returns an error, and an error channel for the serve process to
// report an error when fusermount is called.
func mount(f fs.Fs, mountpoint string) (*FS, <-chan error, error) {
fs.Debugf(f, "Mounting on %q", mountpoint)
filesys := &FS{
f: f,
}
c, err := fuse.Mount(mountpoint, mountOptions(f.Name()+":"+f.Root())...)
root, err := f.FS.Root()
if err != nil {
return filesys, nil, err
return nil, translateError(err)
}
server := fusefs.New(c, nil)
// Serve the mount point in the background returning error to errChan
errChan := make(chan error, 1)
go func() {
err := server.Serve(filesys)
closeErr := c.Close()
if err == nil {
err = closeErr
}
errChan <- err
}()
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
return filesys, nil, err
}
filesys.startSignalHandler()
return filesys, errChan, nil
return &Dir{root}, nil
}
// Check interface satsified
@ -134,15 +62,21 @@ func (f *FS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.Sta
return nil
}
func (f *FS) startSignalHandler() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
go func() {
for {
<-sigChan
if f.rootDir != nil {
f.rootDir.ForgetAll()
}
// Translate errors from mountlib
func translateError(err error) error {
if err == nil {
return nil
}
cause := errors.Cause(err)
if mErr, ok := cause.(mountlib.Error); ok {
switch mErr {
case mountlib.ENOENT:
return fuse.ENOENT
case mountlib.ENOTEMPTY:
return fuse.EEXIST // return fuse.ENOTEMPTY - doesn't exist though so use EEXIST
case mountlib.EEXIST:
return fuse.EEXIST
}
}()
}
return err
}

View file

@ -12,6 +12,7 @@ import (
"time"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/ncw/rclone/cmd"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
@ -154,6 +155,83 @@ like this:
},
}
// mountOptions configures the options from the command line flags
func mountOptions(device string) (options []fuse.MountOption) {
options = []fuse.MountOption{
fuse.MaxReadahead(uint32(maxReadAhead)),
fuse.Subtype("rclone"),
fuse.FSName(device), fuse.VolumeName(device),
fuse.NoAppleDouble(),
fuse.NoAppleXattr(),
// Options from benchmarking in the fuse module
//fuse.MaxReadahead(64 * 1024 * 1024),
//fuse.AsyncRead(), - FIXME this causes
// ReadFileHandle.Read error: read /home/files/ISOs/xubuntu-15.10-desktop-amd64.iso: bad file descriptor
// which is probably related to errors people are having
//fuse.WritebackCache(),
}
if allowNonEmpty {
options = append(options, fuse.AllowNonEmptyMount())
}
if allowOther {
options = append(options, fuse.AllowOther())
}
if allowRoot {
options = append(options, fuse.AllowRoot())
}
if defaultPermissions {
options = append(options, fuse.DefaultPermissions())
}
if readOnly {
options = append(options, fuse.ReadOnly())
}
if writebackCache {
options = append(options, fuse.WritebackCache())
}
return options
}
// mount the file system
//
// The mount point will be ready when this returns.
//
// returns an error, and an error channel for the serve process to
// report an error when fusermount is called.
func mount(f fs.Fs, mountpoint string) (<-chan error, func() error, error) {
fs.Debugf(f, "Mounting on %q", mountpoint)
c, err := fuse.Mount(mountpoint, mountOptions(f.Name()+":"+f.Root())...)
if err != nil {
return nil, nil, err
}
filesys := NewFS(f)
server := fusefs.New(c, nil)
// Serve the mount point in the background returning error to errChan
errChan := make(chan error, 1)
go func() {
err := server.Serve(filesys)
closeErr := c.Close()
if err == nil {
err = closeErr
}
errChan <- err
}()
// check if the mount process has an error to report
<-c.Ready
if err := c.MountError; err != nil {
return nil, nil, err
}
unmount := func() error {
return fuse.Unmount(mountpoint)
}
return errChan, unmount, nil
}
// Mount mounts the remote at mountpoint.
//
// If noModTime is set then it
@ -175,7 +253,7 @@ func Mount(f fs.Fs, mountpoint string) error {
}
// Mount it
_, errChan, err := mount(f, mountpoint)
errChan, unmount, err := mount(f, mountpoint)
if err != nil {
return errors.Wrap(err, "failed to mount FUSE fs")
}
@ -189,7 +267,7 @@ func Mount(f fs.Fs, mountpoint string) error {
break
// Program abort: umount
case <-sigChan:
err = fuse.Unmount(mountpoint)
err = unmount()
}
if err != nil {

28
cmd/mount/mount_test.go Normal file
View file

@ -0,0 +1,28 @@
package mount
import (
"testing"
"github.com/ncw/rclone/cmd/mountlib/mounttest"
)
func TestMain(m *testing.M) { mounttest.TestMain(m, mount, dirPerms, filePerms) }
func TestDirLs(t *testing.T) { mounttest.TestDirLs(t) }
func TestDirCreateAndRemoveDir(t *testing.T) { mounttest.TestDirCreateAndRemoveDir(t) }
func TestDirCreateAndRemoveFile(t *testing.T) { mounttest.TestDirCreateAndRemoveFile(t) }
func TestDirRenameFile(t *testing.T) { mounttest.TestDirRenameFile(t) }
func TestDirRenameEmptyDir(t *testing.T) { mounttest.TestDirRenameEmptyDir(t) }
func TestDirRenameFullDir(t *testing.T) { mounttest.TestDirRenameFullDir(t) }
func TestDirModTime(t *testing.T) { mounttest.TestDirModTime(t) }
func TestFileModTime(t *testing.T) { mounttest.TestFileModTime(t) }
func TestFileModTimeWithOpenWriters(t *testing.T) { mounttest.TestFileModTimeWithOpenWriters(t) }
func TestMount(t *testing.T) { mounttest.TestMount(t) }
func TestRoot(t *testing.T) { mounttest.TestRoot(t) }
func TestReadByByte(t *testing.T) { mounttest.TestReadByByte(t) }
func TestReadFileDoubleClose(t *testing.T) { mounttest.TestReadFileDoubleClose(t) }
func TestReadSeek(t *testing.T) { mounttest.TestReadSeek(t) }
func TestWriteFileNoWrite(t *testing.T) { mounttest.TestWriteFileNoWrite(t) }
func TestWriteFileWrite(t *testing.T) { mounttest.TestWriteFileWrite(t) }
func TestWriteFileOverwrite(t *testing.T) { mounttest.TestWriteFileOverwrite(t) }
func TestWriteFileDoubleClose(t *testing.T) { mounttest.TestWriteFileDoubleClose(t) }
func TestWriteFileFsync(t *testing.T) { mounttest.TestWriteFileFsync(t) }

View file

@ -3,48 +3,21 @@
package mount
import (
"io"
"sync"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
"github.com/ncw/rclone/cmd/mountlib"
"golang.org/x/net/context"
)
// ReadFileHandle is an open for read file handle on a File
type ReadFileHandle struct {
mu sync.Mutex
closed bool // set if handle has been closed
r *fs.Account
o fs.Object
readCalled bool // set if read has been called
offset int64
hash *fs.MultiHasher
}
func newReadFileHandle(o fs.Object) (*ReadFileHandle, error) {
r, err := o.Open()
if err != nil {
return nil, err
}
var hash *fs.MultiHasher
if !noChecksum {
hash, err = fs.NewMultiHasherTypes(o.Fs().Hashes())
if err != nil {
fs.Errorf(o.Fs(), "newReadFileHandle hash error: %v", err)
}
}
fh := &ReadFileHandle{
o: o,
r: fs.NewAccount(r, o).WithBuffer(), // account the transfer
hash: hash,
}
fs.Stats.Transferring(fh.o.Remote())
return fh, nil
*mountlib.ReadFileHandle
// mu sync.Mutex
// closed bool // set if handle has been closed
// r *fs.Account
// o fs.Object
// readCalled bool // set if read has been called
// offset int64
}
// Check interface satisfied
@ -53,173 +26,24 @@ var _ fusefs.Handle = (*ReadFileHandle)(nil)
// Check interface satisfied
var _ fusefs.HandleReader = (*ReadFileHandle)(nil)
// seek to a new offset
//
// if reopen is true, then we won't attempt to use an io.Seeker interface
//
// Must be called with fh.mu held
func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
fh.r.StopBuffering() // stop the background reading first
fh.hash = nil
oldReader := fh.r.GetReader()
r := oldReader
// Can we seek it directly?
if do, ok := oldReader.(io.Seeker); !reopen && ok {
fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset)
_, err = do.Seek(offset, 0)
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read io.Seeker failed: %v", err)
return err
}
} else {
fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d", fh.offset, offset)
// close old one
err = oldReader.Close()
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read seek close old failed: %v", err)
}
// re-open with a seek
r, err = fh.o.Open(&fs.SeekOption{Offset: offset})
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err)
return err
}
}
fh.r.UpdateReader(r)
fh.offset = offset
return nil
}
// Read from the file handle
func (fh *ReadFileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) (err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
fs.Debugf(fh.o, "ReadFileHandle.Read size %d offset %d", req.Size, req.Offset)
if fh.closed {
fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", errClosedFileHandle)
return errClosedFileHandle
}
doSeek := req.Offset != fh.offset
var n int
var newOffset int64
retries := 0
buf := make([]byte, req.Size)
doReopen := false
for {
if doSeek {
// Are we attempting to seek beyond the end of the
// file - if so just return EOF leaving the underlying
// file in an unchanged state.
if req.Offset >= fh.o.Size() {
fs.Debugf(fh.o, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", req.Offset, fh.o.Size())
resp.Data = nil
return nil
}
// Otherwise do the seek
err = fh.seek(req.Offset, doReopen)
} else {
err = nil
}
if err == nil {
if req.Size > 0 {
fh.readCalled = true
}
// One exception to the above is if we fail to fully populate a
// page cache page; a read into page cache is always page aligned.
// Make sure we never serve a partial read, to avoid that.
n, err = io.ReadFull(fh.r, buf)
newOffset = fh.offset + int64(n)
// if err == nil && rand.Intn(10) == 0 {
// err = errors.New("random error")
// }
if err == nil {
break
} else if (err == io.ErrUnexpectedEOF || err == io.EOF) && newOffset == fh.o.Size() {
// Have read to end of file - reset error
err = nil
break
}
}
if retries >= fs.Config.LowLevelRetries {
break
}
retries++
fs.Errorf(fh.o, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, fs.Config.LowLevelRetries, err)
doSeek = true
doReopen = true
}
data, err := fh.ReadFileHandle.Read(int64(req.Size), req.Offset)
if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", err)
} else {
resp.Data = buf[:n]
fh.offset = newOffset
fs.Debugf(fh.o, "ReadFileHandle.Read OK")
if fh.hash != nil {
_, err = fh.hash.Write(resp.Data)
if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Read HashError: %v", err)
return err
}
}
return translateError(err)
}
return err
}
// close the file handle returning errClosedFileHandle if it has been
// closed already.
//
// Must be called with fh.mu held
func (fh *ReadFileHandle) close() error {
if fh.closed {
return errClosedFileHandle
}
fh.closed = true
fs.Stats.DoneTransferring(fh.o.Remote(), true)
if err := fh.checkHash(); err != nil {
return err
}
return fh.r.Close()
resp.Data = data
return nil
}
// Check interface satisfied
var _ fusefs.HandleFlusher = (*ReadFileHandle)(nil)
func (fh *ReadFileHandle) checkHash() error {
if fh.hash == nil || !fh.readCalled || fh.offset < fh.o.Size() {
return nil
}
for hashType, dstSum := range fh.hash.Sums() {
srcSum, err := fh.o.Hash(hashType)
if err != nil {
return err
}
if !fs.HashEquals(dstSum, srcSum) {
return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, dstSum, srcSum)
}
}
return nil
}
// Flush is called each time the file or directory is closed.
// Because there can be multiple file descriptors referring to a
// single opened file, Flush can be called multiple times.
func (fh *ReadFileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.mu.Lock()
defer fh.mu.Unlock()
fs.Debugf(fh.o, "ReadFileHandle.Flush")
if err := fh.checkHash(); err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err)
return err
}
fs.Debugf(fh.o, "ReadFileHandle.Flush OK")
return nil
return translateError(fh.ReadFileHandle.Flush())
}
var _ fusefs.HandleReleaser = (*ReadFileHandle)(nil)
@ -229,18 +53,5 @@ var _ fusefs.HandleReleaser = (*ReadFileHandle)(nil)
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *ReadFileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
fs.Debugf(fh.o, "ReadFileHandle.Release nothing to do")
return nil
}
fs.Debugf(fh.o, "ReadFileHandle.Release closing")
err := fh.close()
if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Release error: %v", err)
} else {
fs.Debugf(fh.o, "ReadFileHandle.Release OK")
}
return err
return translateError(fh.ReadFileHandle.Release())
}

View file

@ -3,13 +3,11 @@
package mount
import (
"io"
"sync"
"errors"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
"github.com/ncw/rclone/cmd/mountlib"
"golang.org/x/net/context"
)
@ -17,120 +15,25 @@ var errClosedFileHandle = errors.New("Attempt to use closed file handle")
// WriteFileHandle is an open for write handle on a File
type WriteFileHandle struct {
mu sync.Mutex
closed bool // set if handle has been closed
remote string
pipeReader *io.PipeReader
pipeWriter *io.PipeWriter
o fs.Object
result chan error
file *File
writeCalled bool // set the first time Write() is called
hash *fs.MultiHasher
*mountlib.WriteFileHandle
}
// Check interface satisfied
var _ fusefs.Handle = (*WriteFileHandle)(nil)
func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) {
var hash *fs.MultiHasher
if !noChecksum {
var err error
hash, err = fs.NewMultiHasherTypes(src.Fs().Hashes())
if err != nil {
fs.Errorf(src.Fs(), "newWriteFileHandle hash error: %v", err)
}
}
fh := &WriteFileHandle{
remote: src.Remote(),
result: make(chan error, 1),
file: f,
hash: hash,
}
fh.pipeReader, fh.pipeWriter = io.Pipe()
r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()).WithBuffer() // account the transfer
go func() {
o, err := d.f.Put(r, src)
fh.o = o
fh.result <- err
}()
fh.file.addWriters(1)
fs.Stats.Transferring(fh.remote)
return fh, nil
}
// Check interface satisfied
var _ fusefs.HandleWriter = (*WriteFileHandle)(nil)
// Write data to the file handle
func (fh *WriteFileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
fs.Debugf(fh.remote, "WriteFileHandle.Write len=%d", len(req.Data))
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", errClosedFileHandle)
return errClosedFileHandle
}
fh.writeCalled = true
// FIXME should probably check the file isn't being seeked?
n, err := fh.pipeWriter.Write(req.Data)
resp.Size = n
fh.file.written(int64(n))
n, err := fh.WriteFileHandle.Write(req.Data, req.Offset)
if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", err)
return err
}
fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n)
if fh.hash != nil {
_, err = fh.hash.Write(req.Data)
if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.Write HashError: %v", err)
return err
}
return translateError(err)
}
resp.Size = int(n)
return nil
}
// close the file handle returning errClosedFileHandle if it has been
// closed already.
//
// Must be called with fh.mu held
func (fh *WriteFileHandle) close() error {
if fh.closed {
return errClosedFileHandle
}
fh.closed = true
fs.Stats.DoneTransferring(fh.remote, true)
fh.file.addWriters(-1)
writeCloseErr := fh.pipeWriter.Close()
err := <-fh.result
readCloseErr := fh.pipeReader.Close()
if err == nil {
fh.file.setObject(fh.o)
err = writeCloseErr
}
if err == nil {
err = readCloseErr
}
if err == nil && fh.hash != nil {
for hashType, srcSum := range fh.hash.Sums() {
dstSum, err := fh.o.Hash(hashType)
if err != nil {
return err
}
if !fs.HashEquals(srcSum, dstSum) {
return errors.Errorf("corrupted on transfer: %v hash differ %q vs %q", hashType, srcSum, dstSum)
}
}
}
return err
}
// Check interface satisfied
var _ fusefs.HandleFlusher = (*WriteFileHandle)(nil)
// Flush is called on each close() of a file descriptor. So if a
// filesystem wants to return write errors in close() and the file has
// cached dirty data, this is a good place to write back data and
@ -147,23 +50,7 @@ var _ fusefs.HandleFlusher = (*WriteFileHandle)(nil)
// Filesystems shouldn't assume that flush will always be called after
// some writes, or that if will be called at all.
func (fh *WriteFileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
fh.mu.Lock()
defer fh.mu.Unlock()
fs.Debugf(fh.remote, "WriteFileHandle.Flush")
// If Write hasn't been called then ignore the Flush - Release
// will pick it up
if !fh.writeCalled {
fs.Debugf(fh.remote, "WriteFileHandle.Flush ignoring flush on unwritten handle")
return nil
}
err := fh.close()
if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.Flush error: %v", err)
} else {
fs.Debugf(fh.remote, "WriteFileHandle.Flush OK")
}
return err
return translateError(fh.WriteFileHandle.Flush())
}
var _ fusefs.HandleReleaser = (*WriteFileHandle)(nil)
@ -173,18 +60,5 @@ var _ fusefs.HandleReleaser = (*WriteFileHandle)(nil)
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *WriteFileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
fs.Debugf(fh.remote, "WriteFileHandle.Release nothing to do")
return nil
}
fs.Debugf(fh.remote, "WriteFileHandle.Release closing")
err := fh.close()
if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.Release error: %v", err)
} else {
fs.Debugf(fh.remote, "WriteFileHandle.Release OK")
}
return err
return translateError(fh.WriteFileHandle.Release())
}

View file

@ -1,6 +1,4 @@
// +build linux darwin freebsd
package mount
package mountlib
import (
"time"

421
cmd/mountlib/dir.go Normal file
View file

@ -0,0 +1,421 @@
package mountlib
import (
"path"
"sync"
"time"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
)
var dirCacheTime = 60 * time.Second // FIXME needs to be settable
// DirEntry describes the contents of a directory entry
//
// It can be a file or a directory
//
// node may be nil, but o may not
type DirEntry struct {
Obj fs.BasicInfo
Node Node
}
// Dir represents a directory entry
type Dir struct {
fsys *FS
inode uint64 // inode number
f fs.Fs
path string
modTime time.Time
mu sync.RWMutex // protects the following
read time.Time // time directory entry last read
items map[string]*DirEntry
}
func newDir(fsys *FS, f fs.Fs, fsDir *fs.Dir) *Dir {
return &Dir{
fsys: fsys,
f: f,
path: fsDir.Name,
modTime: fsDir.When,
inode: NewInode(),
}
}
// IsFile returns false for Dir - satisfies Node interface
func (d *Dir) IsFile() bool {
return false
}
// Inode returns the inode number - satisfies Node interface
func (d *Dir) Inode() uint64 {
return d.inode
}
// Node returns the Node assocuated with this - satisfies Noder interface
func (d *Dir) Node() Node {
return d
}
// rename should be called after the directory is renamed
//
// Reset the directory to new state, discarding all the objects and
// reading everything again
func (d *Dir) rename(newParent *Dir, fsDir *fs.Dir) {
d.path = fsDir.Name
d.modTime = fsDir.When
d.items = nil
d.read = time.Time{}
}
// addObject adds a new object or directory to the directory
//
// note that we add new objects rather than updating old ones
func (d *Dir) addObject(o fs.BasicInfo, node Node) *DirEntry {
item := &DirEntry{
Obj: o,
Node: node,
}
d.mu.Lock()
d.items[path.Base(o.Remote())] = item
d.mu.Unlock()
return item
}
// delObject removes an object from the directory
func (d *Dir) delObject(leaf string) {
d.mu.Lock()
delete(d.items, leaf)
d.mu.Unlock()
}
// read the directory
func (d *Dir) readDir() error {
d.mu.Lock()
defer d.mu.Unlock()
when := time.Now()
if d.read.IsZero() {
fs.Debugf(d.path, "Reading directory")
} else {
age := when.Sub(d.read)
if age < dirCacheTime {
return nil
}
fs.Debugf(d.path, "Re-reading directory (%v old)", age)
}
entries, err := fs.ListDirSorted(d.f, false, d.path)
if err == fs.ErrorDirNotFound {
// We treat directory not found as empty because we
// create directories on the fly
} else if err != nil {
return err
}
// NB when we re-read a directory after its cache has expired
// we drop the old files which should lead to correct
// behaviour but may not be very efficient.
// Keep a note of the previous contents of the directory
oldItems := d.items
// Cache the items by name
d.items = make(map[string]*DirEntry, len(entries))
for _, entry := range entries {
switch item := entry.(type) {
case fs.Object:
obj := item
name := path.Base(obj.Remote())
d.items[name] = &DirEntry{
Obj: obj,
Node: nil,
}
case *fs.Dir:
dir := item
name := path.Base(dir.Remote())
// Use old dir value if it exists
if oldItem, ok := oldItems[name]; ok {
if _, ok := oldItem.Obj.(*fs.Dir); ok {
d.items[name] = oldItem
continue
}
}
d.items[name] = &DirEntry{
Obj: dir,
Node: nil,
}
default:
err = errors.Errorf("unknown type %T", item)
fs.Errorf(d.path, "readDir error: %v", err)
return err
}
}
d.read = when
return nil
}
// lookup a single item in the directory
//
// returns ENOENT if not found.
func (d *Dir) lookup(leaf string) (*DirEntry, error) {
err := d.readDir()
if err != nil {
return nil, err
}
d.mu.RLock()
item, ok := d.items[leaf]
d.mu.RUnlock()
if !ok {
return nil, ENOENT
}
return item, nil
}
// Check to see if a directory is empty
func (d *Dir) isEmpty() (bool, error) {
err := d.readDir()
if err != nil {
return false, err
}
d.mu.RLock()
defer d.mu.RUnlock()
return len(d.items) == 0, nil
}
// ModTime returns the modification time of the directory
func (d *Dir) ModTime() time.Time {
fs.Debugf(d.path, "Dir.ModTime %v", d.modTime)
return d.modTime
}
// SetModTime sets the modTime for this dir
func (d *Dir) SetModTime(modTime time.Time) error {
d.mu.Lock()
defer d.mu.Unlock()
d.modTime = modTime
return nil
}
// lookupNode calls lookup then makes sure the node is not nil in the DirEntry
func (d *Dir) lookupNode(leaf string) (item *DirEntry, err error) {
item, err = d.lookup(leaf)
if err != nil {
return nil, err
}
if item.Node != nil {
return item, nil
}
var node Node
switch x := item.Obj.(type) {
case fs.Object:
node, err = newFile(d, x), nil
case *fs.Dir:
node, err = newDir(d.fsys, d.f, x), nil
default:
err = errors.Errorf("unknown type %T", item)
}
if err != nil {
return nil, err
}
item = d.addObject(item.Obj, node)
return item, nil
}
// Lookup looks up a specific entry in the receiver.
//
// Lookup should return a Node corresponding to the entry. If the
// name does not exist in the directory, Lookup should return ENOENT.
//
// Lookup need not to handle the names "." and "..".
func (d *Dir) Lookup(name string) (node Node, err error) {
path := path.Join(d.path, name)
fs.Debugf(path, "Dir.Lookup")
item, err := d.lookupNode(name)
if err != nil {
if err != ENOENT {
fs.Errorf(path, "Dir.Lookup error: %v", err)
}
return nil, err
}
fs.Debugf(path, "Dir.Lookup OK")
return item.Node, nil
}
// ReadDirAll reads the contents of the directory
func (d *Dir) ReadDirAll() (items []*DirEntry, err error) {
fs.Debugf(d.path, "Dir.ReadDirAll")
err = d.readDir()
if err != nil {
fs.Debugf(d.path, "Dir.ReadDirAll error: %v", err)
return nil, err
}
d.mu.RLock()
defer d.mu.RUnlock()
for _, item := range d.items {
items = append(items, item)
}
fs.Debugf(d.path, "Dir.ReadDirAll OK with %d entries", len(items))
return items, nil
}
// Create makes a new file
func (d *Dir) Create(name string) (*File, *WriteFileHandle, error) {
path := path.Join(d.path, name)
fs.Debugf(path, "Dir.Create")
src := newCreateInfo(d.f, path)
// This gets added to the directory when the file is written
file := newFile(d, nil)
fh, err := newWriteFileHandle(d, file, src)
if err != nil {
fs.Errorf(path, "Dir.Create error: %v", err)
return nil, nil, err
}
fs.Debugf(path, "Dir.Create OK")
return file, fh, nil
}
// Mkdir creates a new directory
func (d *Dir) Mkdir(name string) (*Dir, error) {
path := path.Join(d.path, name)
fs.Debugf(path, "Dir.Mkdir")
err := d.f.Mkdir(path)
if err != nil {
fs.Errorf(path, "Dir.Mkdir failed to create directory: %v", err)
return nil, err
}
fsDir := &fs.Dir{
Name: path,
When: time.Now(),
}
dir := newDir(d.fsys, d.f, fsDir)
d.addObject(fsDir, dir)
fs.Debugf(path, "Dir.Mkdir OK")
return dir, nil
}
// Remove removes the entry with the given name from
// the receiver, which must be a directory. The entry to be removed
// may correspond to a file (unlink) or to a directory (rmdir).
func (d *Dir) Remove(name string) error {
path := path.Join(d.path, name)
fs.Debugf(path, "Dir.Remove")
item, err := d.lookupNode(name)
if err != nil {
fs.Errorf(path, "Dir.Remove error: %v", err)
return err
}
switch x := item.Obj.(type) {
case fs.Object:
err = x.Remove()
if err != nil {
fs.Errorf(path, "Dir.Remove file error: %v", err)
return err
}
case *fs.Dir:
// Check directory is empty first
dir := item.Node.(*Dir)
empty, err := dir.isEmpty()
if err != nil {
fs.Errorf(path, "Dir.Remove dir error: %v", err)
return err
}
if !empty {
fs.Errorf(path, "Dir.Remove not empty")
return ENOTEMPTY
}
// remove directory
err = d.f.Rmdir(path)
if err != nil {
fs.Errorf(path, "Dir.Remove failed to remove directory: %v", err)
return err
}
default:
fs.Errorf(path, "Dir.Remove unknown type %T", item)
return errors.Errorf("unknown type %T", item)
}
// Remove the item from the directory listing
d.delObject(name)
fs.Debugf(path, "Dir.Remove OK")
return nil
}
// Rename the file
func (d *Dir) Rename(oldName, newName string, destDir *Dir) error {
oldPath := path.Join(d.path, oldName)
newPath := path.Join(destDir.path, newName)
fs.Debugf(oldPath, "Dir.Rename to %q", newPath)
oldItem, err := d.lookupNode(oldName)
if err != nil {
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
var newObj fs.BasicInfo
oldNode := oldItem.Node
switch x := oldItem.Obj.(type) {
case fs.Object:
oldObject := x
// FIXME: could Copy then Delete if Move not available
// - though care needed if case insensitive...
doMove := d.f.Features().Move
if doMove == nil {
err := errors.Errorf("Fs %q can't rename files (no Move)", d.f)
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
newObject, err := doMove(oldObject, newPath)
if err != nil {
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
newObj = newObject
// Update the node with the new details
if oldNode != nil {
if oldFile, ok := oldNode.(*File); ok {
fs.Debugf(oldItem.Obj, "Updating file with %v %p", newObject, oldFile)
oldFile.rename(destDir, newObject)
}
}
case *fs.Dir:
doDirMove := d.f.Features().DirMove
if doDirMove == nil {
err := errors.Errorf("Fs %q can't rename directories (no DirMove)", d.f)
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
srcRemote := x.Name
dstRemote := newPath
err = doDirMove(d.f, srcRemote, dstRemote)
if err != nil {
fs.Errorf(oldPath, "Dir.Rename error: %v", err)
return err
}
newDir := new(fs.Dir)
*newDir = *x
newDir.Name = newPath
newObj = newDir
// Update the node with the new details
if oldNode != nil {
if oldDir, ok := oldNode.(*Dir); ok {
fs.Debugf(oldItem.Obj, "Updating dir with %v %p", newDir, oldDir)
oldDir.rename(destDir, newDir)
}
}
default:
err = errors.Errorf("unknown type %T", oldItem)
fs.Errorf(d.path, "Dir.ReadDirAll error: %v", err)
return err
}
// Show moved - delete from old dir and add to new
d.delObject(oldName)
destDir.addObject(newObj, oldNode)
fs.Debugf(newPath, "Dir.Rename renamed from %q", oldPath)
return nil
}
// Fsync the directory
//
// Note that we don't do anything except return OK
func (d *Dir) Fsync() error {
return nil
}

35
cmd/mountlib/errors.go Normal file
View file

@ -0,0 +1,35 @@
// Cross platform errors
package mountlib
import "fmt"
// Error describes low level errors in a cross platform way
type Error byte
// Low level errors
const (
OK Error = iota
ENOENT
ENOTEMPTY
EEXIST
ESPIPE
EBADF
)
var errorNames = []string{
OK: "Success",
ENOENT: "No such file or directory",
ENOTEMPTY: "Directory not empty",
EEXIST: "File exists",
ESPIPE: "Illegal seek",
EBADF: "Bad file descriptor",
}
// Error renders the error as a string
func (e Error) Error() string {
if int(e) >= len(errorNames) {
return fmt.Sprintf("Low level error %d", e)
}
return errorNames[e]
}

203
cmd/mountlib/file.go Normal file
View file

@ -0,0 +1,203 @@
package mountlib
import (
"sync"
"sync/atomic"
"time"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
)
// File represents a file
type File struct {
inode uint64 // inode number
size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned
d *Dir // parent directory - read only
mu sync.RWMutex // protects the following
o fs.Object // NB o may be nil if file is being written
writers int // number of writers for this file
pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written
}
// newFile creates a new File
func newFile(d *Dir, o fs.Object) *File {
return &File{
d: d,
o: o,
inode: NewInode(),
}
}
// IsFile returns true for File - satisfies Node interface
func (f *File) IsFile() bool {
return true
}
// Inode returns the inode number - satisfies Node interface
func (f *File) Inode() uint64 {
return f.inode
}
// Node returns the Node assocuated with this - satisfies Noder interface
func (f *File) Node() Node {
return f
}
// rename should be called to update f.o and f.d after a rename
func (f *File) rename(d *Dir, o fs.Object) {
f.mu.Lock()
f.o = o
f.d = d
f.mu.Unlock()
}
// addWriters increments or decrements the writers
func (f *File) addWriters(n int) {
f.mu.Lock()
f.writers += n
f.mu.Unlock()
}
// Attr fills out the attributes for the file
func (f *File) Attr(noModTime bool) (modTime time.Time, Size, Blocks uint64, err error) {
f.mu.Lock()
defer f.mu.Unlock()
// if o is nil it isn't valid yet or there are writers, so return the size so far
if f.o == nil || f.writers != 0 {
Size = uint64(atomic.LoadInt64(&f.size))
if !noModTime && !f.pendingModTime.IsZero() {
modTime = f.pendingModTime
}
} else {
Size = uint64(f.o.Size())
if !noModTime {
modTime = f.o.ModTime()
}
}
Blocks = (Size + 511) / 512
fs.Debugf(f.o, "File.Attr modTime=%v, Size=%d, Blocks=%v", modTime, Size, Blocks)
return
}
// SetModTime sets the modtime for the file
func (f *File) SetModTime(modTime time.Time) error {
f.mu.Lock()
defer f.mu.Unlock()
f.pendingModTime = modTime
if f.o != nil {
return f.applyPendingModTime()
}
// queue up for later, hoping f.o becomes available
return nil
}
// call with the mutex held
func (f *File) applyPendingModTime() error {
defer func() { f.pendingModTime = time.Time{} }()
if f.pendingModTime.IsZero() {
return nil
}
if f.o == nil {
return errors.New("Cannot apply ModTime, file object is not available")
}
err := f.o.SetModTime(f.pendingModTime)
switch err {
case nil:
fs.Debugf(f.o, "File.applyPendingModTime OK")
case fs.ErrorCantSetModTime:
// do nothing, in order to not break "touch somefile" if it exists already
default:
fs.Errorf(f.o, "File.applyPendingModTime error: %v", err)
return err
}
return nil
}
// Update the size while writing
func (f *File) setSize(n int64) {
atomic.StoreInt64(&f.size, n)
}
// Update the object when written
func (f *File) setObject(o fs.Object) {
f.mu.Lock()
defer f.mu.Unlock()
f.o = o
_ = f.applyPendingModTime()
f.d.addObject(o, f)
}
// Wait for f.o to become non nil for a short time returning it or an
// error
//
// Call without the mutex held
func (f *File) waitForValidObject() (o fs.Object, err error) {
for i := 0; i < 50; i++ {
f.mu.Lock()
o = f.o
writers := f.writers
f.mu.Unlock()
if o != nil {
return o, nil
}
if writers == 0 {
return nil, errors.New("can't open file - writer failed")
}
time.Sleep(100 * time.Millisecond)
}
return nil, ENOENT
}
// OpenRead open the file for read
func (f *File) OpenRead() (fh *ReadFileHandle, err error) {
// if o is nil it isn't valid yet
o, err := f.waitForValidObject()
if err != nil {
return nil, err
}
fs.Debugf(o, "File.OpenRead")
fh, err = newReadFileHandle(f, o, f.d.fsys.noSeek)
err = errors.Wrap(err, "open for read")
if err != nil {
fs.Errorf(o, "File.OpenRead failed: %v", err)
return nil, err
}
return fh, nil
}
// OpenWrite open the file for write
func (f *File) OpenWrite() (fh *WriteFileHandle, err error) {
// if o is nil it isn't valid yet
o, err := f.waitForValidObject()
if err != nil {
return nil, err
}
fs.Debugf(o, "File.OpenWrite")
src := newCreateInfo(f.d.f, o.Remote())
fh, err = newWriteFileHandle(f.d, f, src)
err = errors.Wrap(err, "open for write")
if err != nil {
fs.Errorf(o, "File.OpenWrite failed: %v", err)
return nil, err
}
return fh, nil
}
// Fsync the file
//
// Note that we don't do anything except return OK
func (f *File) Fsync() error {
return nil
}

116
cmd/mountlib/fs.go Normal file
View file

@ -0,0 +1,116 @@
package mountlib
import (
"strings"
"sync/atomic"
"time"
"github.com/ncw/rclone/fs"
)
// Node represents either a *Dir or a *File
type Node interface {
IsFile() bool
Inode() uint64
}
var (
_ Node = (*File)(nil)
_ Node = (*Dir)(nil)
)
// Noder represents something which can return a node
type Noder interface {
Node() Node
}
var (
_ Noder = (*File)(nil)
_ Noder = (*Dir)(nil)
_ Noder = (*ReadFileHandle)(nil)
_ Noder = (*WriteFileHandle)(nil)
)
// FS represents the top level filing system
type FS struct {
f fs.Fs
root *Dir
noSeek bool // don't allow seeking if set
}
// NewFS creates a new filing system and root directory
func NewFS(f fs.Fs) *FS {
fsDir := &fs.Dir{
Name: "",
When: time.Now(),
}
fsys := &FS{
f: f,
}
fsys.root = newDir(fsys, f, fsDir)
return fsys
}
// NoSeek disables seeking of files
func (fsys *FS) NoSeek() *FS {
fsys.noSeek = true
return fsys
}
// Root returns the root node
func (fsys *FS) Root() (*Dir, error) {
fs.Debugf(fsys.f, "Root()")
return fsys.root, nil
}
var inodeCount uint64
// NewInode creates a new unique inode number
func NewInode() (inode uint64) {
return atomic.AddUint64(&inodeCount, 1)
}
// Lookup finds the Node by path starting from the root
func (f *FS) Lookup(path string) (node Node, err error) {
node = f.root
for path != "" {
i := strings.IndexRune(path, '/')
var name string
if i < 0 {
name, path = path, ""
} else {
name, path = path[:i], path[i+1:]
}
if name == "" {
continue
}
dir, ok := node.(*Dir)
if !ok {
// We need to look in a directory, but found a file
return nil, ENOENT
}
node, err = dir.Lookup(name)
if err != nil {
return nil, err
}
}
return
}
// Statfs is called to obtain file system metadata.
// It should write that data to resp.
func (f *FS) Statfs() error {
/* FIXME
const blockSize = 4096
const fsBlocks = (1 << 50) / blockSize
resp.Blocks = fsBlocks // Total data blocks in file system.
resp.Bfree = fsBlocks // Free blocks in file system.
resp.Bavail = fsBlocks // Free blocks in file system if you're not root.
resp.Files = 1E9 // Total files in file system.
resp.Ffree = 1E9 // Free files in file system.
resp.Bsize = blockSize // Block size
resp.Namelen = 255 // Maximum file name length?
resp.Frsize = blockSize // Fragment size, smallest addressable data size in the file system.
*/
return nil
}

View file

@ -1,6 +1,6 @@
// +build linux darwin freebsd
package mount
package mounttest
import (
"io/ioutil"

View file

@ -1,6 +1,6 @@
// +build linux darwin freebsd
package mount
package mounttest
import (
"os"

View file

@ -2,7 +2,7 @@
// Test suite for rclonefs
package mount
package mounttest
import (
"flag"
@ -33,10 +33,22 @@ var (
LowLevelRetries = flag.Int("low-level-retries", 10, "Number of low level retries")
)
type (
UnmountFn func() error
MountFn func(f fs.Fs, mountpoint string) (<-chan error, func() error, error)
)
var (
mountFn MountFn
)
// TestMain drives the tests
func TestMain(m *testing.M) {
func TestMain(m *testing.M, fn MountFn, dirPerms, filePerms os.FileMode) {
mountFn = fn
flag.Parse()
run = newRun()
run.dirPerms = dirPerms
run.filePerms = filePerms
rc := m.Run()
run.Finalise()
os.Exit(rc)
@ -44,13 +56,14 @@ func TestMain(m *testing.M) {
// Run holds the remotes for a test run
type Run struct {
mountPath string
fremote fs.Fs
fremoteName string
cleanRemote func()
umountResult <-chan error
mountFS *FS
skip bool
mountPath string
fremote fs.Fs
fremoteName string
cleanRemote func()
umountResult <-chan error
umountFn UnmountFn
skip bool
dirPerms, filePerms os.FileMode
}
// run holds the master Run data
@ -103,7 +116,7 @@ func newRun() *Run {
func (r *Run) mount() {
log.Printf("mount %q %q", r.fremote, r.mountPath)
var err error
r.mountFS, r.umountResult, err = mount(r.fremote, r.mountPath)
r.umountResult, r.umountFn, err = mountFn(r.fremote, r.mountPath)
if err != nil {
log.Printf("mount failed: %v", err)
r.skip = true
@ -116,10 +129,17 @@ func (r *Run) umount() {
log.Printf("FUSE not found so skipping umount")
return
}
log.Printf("Calling fusermount -u %q", r.mountPath)
err := exec.Command("fusermount", "-u", r.mountPath).Run()
/*
log.Printf("Calling fusermount -u %q", r.mountPath)
err := exec.Command("fusermount", "-u", r.mountPath).Run()
if err != nil {
log.Printf("fusermount failed: %v", err)
}
*/
log.Printf("Unmounting %q", r.mountPath)
err := r.umountFn()
if err != nil {
log.Printf("fusermount failed: %v", err)
log.Fatalf("signal to umount failed: %v", err)
}
log.Printf("Waiting for umount")
err = <-r.umountResult
@ -182,10 +202,10 @@ func (r *Run) readLocal(t *testing.T, dir dirMap, filepath string) {
if fi.IsDir() {
dir[name+"/"] = struct{}{}
r.readLocal(t, dir, name)
assert.Equal(t, fi.Mode().Perm(), os.FileMode(dirPerms))
assert.Equal(t, r.dirPerms, fi.Mode().Perm())
} else {
dir[fmt.Sprintf("%s %d", name, fi.Size())] = struct{}{}
assert.Equal(t, fi.Mode().Perm(), os.FileMode(filePerms))
assert.Equal(t, r.filePerms, fi.Mode().Perm())
}
}
}
@ -267,5 +287,5 @@ func TestRoot(t *testing.T) {
fi, err := os.Lstat(run.mountPath)
require.NoError(t, err)
assert.True(t, fi.IsDir())
assert.Equal(t, fi.Mode().Perm(), os.FileMode(dirPerms))
assert.Equal(t, fi.Mode().Perm(), run.dirPerms)
}

View file

@ -1,6 +1,6 @@
// +build linux darwin freebsd
package mount
package mounttest
import (
"io"

View file

@ -1,11 +1,12 @@
// +build linux darwin freebsd
package mount
package mounttest
import (
"os"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -21,6 +22,9 @@ func TestWriteFileNoWrite(t *testing.T) {
err = fd.Close()
assert.NoError(t, err)
// FIXME - wait for the Release on the file
time.Sleep(10 * time.Millisecond)
run.checkDir(t, "testnowrite 0")
run.rm(t, "testnowrite")

210
cmd/mountlib/read.go Normal file
View file

@ -0,0 +1,210 @@
package mountlib
import (
"io"
"sync"
"github.com/ncw/rclone/fs"
)
// ReadFileHandle is an open for read file handle on a File
type ReadFileHandle struct {
mu sync.Mutex
closed bool // set if handle has been closed
r *fs.Account
o fs.Object
readCalled bool // set if read has been called
offset int64
noSeek bool
file *File
}
func newReadFileHandle(f *File, o fs.Object, noSeek bool) (*ReadFileHandle, error) {
r, err := o.Open()
if err != nil {
return nil, err
}
fh := &ReadFileHandle{
o: o,
r: fs.NewAccount(r, o).WithBuffer(), // account the transfer
noSeek: noSeek,
file: f,
}
fs.Stats.Transferring(fh.o.Remote())
return fh, nil
}
// Node returns the Node assocuated with this - satisfies Noder interface
func (fh *ReadFileHandle) Node() Node {
return fh.file
}
// seek to a new offset
//
// if reopen is true, then we won't attempt to use an io.Seeker interface
//
// Must be called with fh.mu held
func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
if fh.noSeek {
return ESPIPE
}
fh.r.StopBuffering() // stop the background reading first
oldReader := fh.r.GetReader()
r := oldReader
// Can we seek it directly?
if do, ok := oldReader.(io.Seeker); !reopen && ok {
fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset)
_, err = do.Seek(offset, 0)
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read io.Seeker failed: %v", err)
return err
}
} else {
fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d", fh.offset, offset)
// close old one
err = oldReader.Close()
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read seek close old failed: %v", err)
}
// re-open with a seek
r, err = fh.o.Open(&fs.SeekOption{Offset: offset})
if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err)
return err
}
}
fh.r.UpdateReader(r)
fh.offset = offset
return nil
}
// Read from the file handle
func (fh *ReadFileHandle) Read(reqSize, reqOffset int64) (respData []byte, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
fs.Debugf(fh.o, "ReadFileHandle.Read size %d offset %d", reqSize, reqOffset)
if fh.closed {
fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", EBADF)
return nil, EBADF
}
doSeek := reqOffset != fh.offset
var n int
var newOffset int64
retries := 0
buf := make([]byte, reqSize)
doReopen := false
for {
if doSeek {
// Are we attempting to seek beyond the end of the
// file - if so just return EOF leaving the underlying
// file in an unchanged state.
if reqOffset >= fh.o.Size() {
fs.Debugf(fh.o, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", reqOffset, fh.o.Size())
respData = nil
return nil, nil
}
// Otherwise do the seek
err = fh.seek(reqOffset, doReopen)
} else {
err = nil
}
if err == nil {
if reqSize > 0 {
fh.readCalled = true
}
// One exception to the above is if we fail to fully populate a
// page cache page; a read into page cache is always page aligned.
// Make sure we never serve a partial read, to avoid that.
n, err = io.ReadFull(fh.r, buf)
newOffset = fh.offset + int64(n)
// if err == nil && rand.Intn(10) == 0 {
// err = errors.New("random error")
// }
if err == nil {
break
} else if (err == io.ErrUnexpectedEOF || err == io.EOF) && newOffset == fh.o.Size() {
// Have read to end of file - reset error
err = nil
break
}
}
if retries >= fs.Config.LowLevelRetries {
break
}
retries++
fs.Errorf(fh.o, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, fs.Config.LowLevelRetries, err)
doSeek = true
doReopen = true
}
if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", err)
} else {
respData = buf[:n]
fh.offset = newOffset
fs.Debugf(fh.o, "ReadFileHandle.Read OK")
}
return respData, err
}
// close the file handle returning EBADF if it has been
// closed already.
//
// Must be called with fh.mu held
func (fh *ReadFileHandle) close() error {
if fh.closed {
return EBADF
}
fh.closed = true
fs.Stats.DoneTransferring(fh.o.Remote(), true)
return fh.r.Close()
}
// Flush is called each time the file or directory is closed.
// Because there can be multiple file descriptors referring to a
// single opened file, Flush can be called multiple times.
func (fh *ReadFileHandle) Flush() error {
fh.mu.Lock()
defer fh.mu.Unlock()
fs.Debugf(fh.o, "ReadFileHandle.Flush")
// Ignore the Flush as there is nothing we can sensibly do and
// it seems quite common for Flush to be called from
// different threads each of which have read some data.
if false {
// If Read hasn't been called then ignore the Flush - Release
// will pick it up
if !fh.readCalled {
fs.Debugf(fh.o, "ReadFileHandle.Flush ignoring flush on unread handle")
return nil
}
err := fh.close()
if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err)
return err
}
}
fs.Debugf(fh.o, "ReadFileHandle.Flush OK")
return nil
}
// Release is called when we are finished with the file handle
//
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *ReadFileHandle) Release() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
fs.Debugf(fh.o, "ReadFileHandle.Release nothing to do")
return nil
}
fs.Debugf(fh.o, "ReadFileHandle.Release closing")
err := fh.close()
if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Release error: %v", err)
} else {
fs.Debugf(fh.o, "ReadFileHandle.Release OK")
}
return err
}

158
cmd/mountlib/write.go Normal file
View file

@ -0,0 +1,158 @@
package mountlib
import (
"io"
"sync"
"github.com/ncw/rclone/fs"
)
// WriteFileHandle is an open for write handle on a File
type WriteFileHandle struct {
mu sync.Mutex
closed bool // set if handle has been closed
remote string
pipeReader *io.PipeReader
pipeWriter *io.PipeWriter
o fs.Object
result chan error
file *File
writeCalled bool // set the first time Write() is called
offset int64
}
func newWriteFileHandle(d *Dir, f *File, src fs.ObjectInfo) (*WriteFileHandle, error) {
fh := &WriteFileHandle{
remote: src.Remote(),
result: make(chan error, 1),
file: f,
}
fh.pipeReader, fh.pipeWriter = io.Pipe()
r := fs.NewAccountSizeName(fh.pipeReader, 0, src.Remote()).WithBuffer() // account the transfer
go func() {
o, err := d.f.Put(r, src)
fh.o = o
fh.result <- err
}()
fh.file.addWriters(1)
fh.file.setSize(0)
fs.Stats.Transferring(fh.remote)
return fh, nil
}
// Node returns the Node assocuated with this - satisfies Noder interface
func (fh *WriteFileHandle) Node() Node {
return fh.file
}
// Write data to the file handle
func (fh *WriteFileHandle) Write(data []byte, offset int64) (written int64, err error) {
fs.Debugf(fh.remote, "WriteFileHandle.Write len=%d", len(data))
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.offset != offset {
fs.Errorf(fh.remote, "WriteFileHandle.Write can't seek in file")
return 0, ESPIPE
}
if fh.closed {
fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", EBADF)
return 0, EBADF
}
fh.writeCalled = true
// FIXME should probably check the file isn't being seeked?
n, err := fh.pipeWriter.Write(data)
written = int64(n)
fh.offset += written
fh.file.setSize(fh.offset)
if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.Write error: %v", err)
return 0, err
}
fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n)
return written, nil
}
// Returns the offset of the file pointer
func (fh *WriteFileHandle) Offset() (offset int64) {
return fh.offset
}
// close the file handle returning EBADF if it has been
// closed already.
//
// Must be called with fh.mu held
func (fh *WriteFileHandle) close() error {
if fh.closed {
return EBADF
}
fh.closed = true
fs.Stats.DoneTransferring(fh.remote, true)
fh.file.addWriters(-1)
writeCloseErr := fh.pipeWriter.Close()
err := <-fh.result
readCloseErr := fh.pipeReader.Close()
if err == nil {
fh.file.setObject(fh.o)
err = writeCloseErr
}
if err == nil {
err = readCloseErr
}
return err
}
// Flush is called on each close() of a file descriptor. So if a
// filesystem wants to return write errors in close() and the file has
// cached dirty data, this is a good place to write back data and
// return any errors. Since many applications ignore close() errors
// this is not always useful.
//
// NOTE: The flush() method may be called more than once for each
// open(). This happens if more than one file descriptor refers to an
// opened file due to dup(), dup2() or fork() calls. It is not
// possible to determine if a flush is final, so each flush should be
// treated equally. Multiple write-flush sequences are relatively
// rare, so this shouldn't be a problem.
//
// Filesystems shouldn't assume that flush will always be called after
// some writes, or that if will be called at all.
func (fh *WriteFileHandle) Flush() error {
fh.mu.Lock()
defer fh.mu.Unlock()
fs.Debugf(fh.remote, "WriteFileHandle.Flush")
// If Write hasn't been called then ignore the Flush - Release
// will pick it up
if !fh.writeCalled {
fs.Debugf(fh.remote, "WriteFileHandle.Flush ignoring flush on unwritten handle")
return nil
}
err := fh.close()
if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.Flush error: %v", err)
} else {
fs.Debugf(fh.remote, "WriteFileHandle.Flush OK")
}
return err
}
// Release is called when we are finished with the file handle
//
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *WriteFileHandle) Release() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
fs.Debugf(fh.remote, "WriteFileHandle.Release nothing to do")
return nil
}
fs.Debugf(fh.remote, "WriteFileHandle.Release closing")
err := fh.close()
if err != nil {
fs.Errorf(fh.remote, "WriteFileHandle.Release error: %v", err)
} else {
fs.Debugf(fh.remote, "WriteFileHandle.Release OK")
}
return err
}