rclone/vfs/read_write.go

465 lines
11 KiB
Go
Raw Normal View History

package vfs
import (
"io"
"os"
"runtime"
"sync"
"github.com/ncw/rclone/fs"
"github.com/pkg/errors"
)
// RWFileHandle is a handle that can be open for read and write.
//
// It will be open to a temporary file which, when closed, will be
// transferred to the remote.
type RWFileHandle struct {
*os.File
mu sync.Mutex
closed bool // set if handle has been closed
o fs.Object // may be nil
remote string
file *File
d *Dir
opened bool
flags int // open flags
osPath string // path to the file in the cache
writeCalled bool // if any Write() methods have been called
}
// Check interfaces
var (
_ io.Reader = (*RWFileHandle)(nil)
_ io.ReaderAt = (*RWFileHandle)(nil)
_ io.Writer = (*RWFileHandle)(nil)
_ io.WriterAt = (*RWFileHandle)(nil)
_ io.Seeker = (*RWFileHandle)(nil)
_ io.Closer = (*RWFileHandle)(nil)
)
func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandle, err error) {
// if O_CREATE and O_EXCL are set and if path already exists, then return EEXIST
if flags&(os.O_CREATE|os.O_EXCL) == os.O_CREATE|os.O_EXCL && f.exists() {
return nil, EEXIST
}
fh = &RWFileHandle{
o: f.o,
file: f,
d: d,
remote: remote,
flags: flags,
}
// mark the file as open in the cache - must be done before the mkdir
fh.d.vfs.cache.open(fh.remote)
// Make a place for the file
fh.osPath, err = d.vfs.cache.mkdir(remote)
if err != nil {
fh.d.vfs.cache.close(fh.remote)
return nil, errors.Wrap(err, "open RW handle failed to make cache directory")
}
rdwrMode := fh.flags & accessModeMask
if rdwrMode != os.O_RDONLY {
fh.file.addWriter(fh)
}
return fh, nil
}
// openPending opens the file if there is a pending open
//
// call with the lock held
func (fh *RWFileHandle) openPending(truncate bool) (err error) {
if fh.opened {
return nil
}
cacheFileOpenFlags := fh.flags
// if not truncating the file, need to read it first
if fh.flags&os.O_TRUNC == 0 && !truncate {
// Fetch the file if it hasn't changed
// FIXME retries
err = fs.CopyFile(fh.d.vfs.cache.f, fh.d.vfs.f, fh.remote, fh.remote)
if err != nil {
// if the object wasn't found AND O_CREATE is set then...
cause := errors.Cause(err)
notFound := cause == fs.ErrorObjectNotFound || cause == fs.ErrorDirNotFound
if notFound {
// Remove cached item if there is one
rmErr := os.Remove(fh.osPath)
if rmErr != nil && !os.IsNotExist(rmErr) {
return errors.Wrap(rmErr, "open RW handle failed to delete stale cache file")
}
}
if notFound && fh.flags&os.O_CREATE != 0 {
// ...ignore error as we are about to create the file
} else {
return errors.Wrap(err, "open RW handle failed to cache file")
}
}
} else {
// Set the size to 0 since we are truncating and flag we need to write it back
fh.file.setSize(0)
fh.writeCalled = true
if fh.flags&os.O_CREATE != 0 && fh.file.exists() {
// create and empty file if it exists on the source
cacheFileOpenFlags |= os.O_CREATE
}
// Windows doesn't seem to deal well with O_TRUNC and
// certain access modes so so truncate the file if it
// exists in these cases.
if runtime.GOOS == "windows" && (fh.flags&accessModeMask == os.O_RDONLY || fh.flags|os.O_APPEND != 0) {
cacheFileOpenFlags &^= os.O_TRUNC
_, err = os.Stat(fh.osPath)
if err == nil {
err = os.Truncate(fh.osPath, 0)
if err != nil {
return errors.Wrap(err, "cache open failed to truncate")
}
}
}
}
fs.Debugf(fh.remote, "Opening cached copy with flags=%s", decodeOpenFlags(fh.flags))
fd, err := os.OpenFile(fh.osPath, cacheFileOpenFlags, 0600)
if err != nil {
return errors.Wrap(err, "cache open file failed")
}
fh.File = fd
fh.opened = true
2017-11-18 11:53:22 +00:00
fh.d.addObject(fh.file) // make sure the directory has this object in it now
return nil
}
// String converts it to printable
func (fh *RWFileHandle) String() string {
if fh == nil {
return "<nil *RWFileHandle>"
}
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.file == nil {
return "<nil *RWFileHandle.file>"
}
return fh.file.String() + " (rw)"
}
// Node returns the Node assocuated with this - satisfies Noder interface
func (fh *RWFileHandle) Node() Node {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file
}
// close the file handle returning EBADF if it has been
// closed already.
//
// Must be called with fh.mu held
//
// Note that we leave the file around in the cache on error conditions
// to give the user a chance to recover it.
func (fh *RWFileHandle) close() (err error) {
defer fs.Trace(fh.remote, "")("err=%v", &err)
if fh.closed {
return ECLOSED
}
fh.closed = true
defer fh.d.vfs.cache.close(fh.remote)
rdwrMode := fh.flags & accessModeMask
if rdwrMode != os.O_RDONLY {
// leave writer open until file is transferred
defer fh.file.delWriter(fh)
}
if !fh.opened {
// If read only then return
if rdwrMode == os.O_RDONLY {
return nil
}
// If we aren't creating or truncating the file then
// we haven't modified it so don't need to transfer it
if fh.flags&(os.O_CREATE|os.O_TRUNC) == 0 {
return nil
}
// Otherwise open the file
// FIXME this could be more efficient
if err := fh.openPending(false); err != nil {
return err
}
}
if rdwrMode != os.O_RDONLY {
fi, err := fh.File.Stat()
if err != nil {
fs.Errorf(fh.remote, "Failed to stat cache file: %v", err)
} else {
fh.file.setSize(fi.Size())
}
}
// Close the underlying file
err = fh.File.Close()
if err != nil {
return err
}
// FIXME measure whether we actually did any writes or not -
// no writes means no transfer?
if rdwrMode == os.O_RDONLY && fh.flags&os.O_TRUNC == 0 {
fs.Debugf(fh.remote, "read only and not truncating so not transferring")
return nil
}
// If write hasn't been called and we aren't creating or
// truncating the file then we haven't modified it so don't
// need to transfer it
if !fh.writeCalled && fh.flags&(os.O_CREATE|os.O_TRUNC) == 0 {
fs.Debugf(fh.remote, "not modified so not transferring")
return nil
}
// Transfer the temp file to the remote
// FIXME retries
if fh.d.vfs.Opt.CacheMode < CacheModeFull {
err = fs.MoveFile(fh.d.vfs.f, fh.d.vfs.cache.f, fh.remote, fh.remote)
} else {
err = fs.CopyFile(fh.d.vfs.f, fh.d.vfs.cache.f, fh.remote, fh.remote)
}
if err != nil {
err = errors.Wrap(err, "failed to transfer file from cache to remote")
fs.Errorf(fh.remote, "%v", err)
return err
}
// FIXME get MoveFile to return this object
o, err := fh.d.vfs.f.NewObject(fh.remote)
if err != nil {
err = errors.Wrap(err, "failed to find object after transfer to remote")
fs.Errorf(fh.remote, "%v", err)
return err
}
fh.file.setObject(o)
fs.Debugf(o, "transferred to remote")
return nil
}
// Close closes the file
func (fh *RWFileHandle) Close() error {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.close()
}
// Flush is called each time the file or directory is closed.
// Because there can be multiple file descriptors referring to a
// single opened file, Flush can be called multiple times.
func (fh *RWFileHandle) Flush() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if !fh.opened {
return nil
}
if fh.closed {
fs.Debugf(fh.remote, "RWFileHandle.Flush nothing to do")
return nil
}
// fs.Debugf(fh.remote, "RWFileHandle.Flush")
if !fh.opened {
fs.Debugf(fh.remote, "RWFileHandle.Flush ignoring flush on unopened handle")
return nil
}
// If Write hasn't been called then ignore the Flush - Release
// will pick it up
if !fh.writeCalled {
fs.Debugf(fh.remote, "RWFileHandle.Flush ignoring flush on unwritten handle")
return nil
}
err := fh.close()
if err != nil {
fs.Errorf(fh.remote, "RWFileHandle.Flush error: %v", err)
} else {
// fs.Debugf(fh.remote, "RWFileHandle.Flush OK")
}
return err
}
// Release is called when we are finished with the file handle
//
// It isn't called directly from userspace so the error is ignored by
// the kernel
func (fh *RWFileHandle) Release() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
fs.Debugf(fh.remote, "RWFileHandle.Release nothing to do")
return nil
}
fs.Debugf(fh.remote, "RWFileHandle.Release closing")
err := fh.close()
if err != nil {
fs.Errorf(fh.remote, "RWFileHandle.Release error: %v", err)
} else {
// fs.Debugf(fh.remote, "RWFileHandle.Release OK")
}
return err
}
// Size returns the size of the underlying file
func (fh *RWFileHandle) Size() int64 {
fh.mu.Lock()
defer fh.mu.Unlock()
if !fh.opened {
return fh.file.Size()
}
fi, err := fh.File.Stat()
if err != nil {
return 0
}
return fi.Size()
}
// Stat returns info about the file
func (fh *RWFileHandle) Stat() (os.FileInfo, error) {
fh.mu.Lock()
defer fh.mu.Unlock()
return fh.file, nil
}
// readFn is a general purpose read function - pass in a closure to do
// the actual read
func (fh *RWFileHandle) readFn(read func() (int, error)) (n int, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return 0, ECLOSED
}
if fh.flags&accessModeMask == os.O_WRONLY {
return 0, EBADF
}
if err = fh.openPending(false); err != nil {
return n, err
}
return read()
}
// Read bytes from the file
func (fh *RWFileHandle) Read(b []byte) (n int, err error) {
return fh.readFn(func() (int, error) {
return fh.File.Read(b)
})
}
// ReadAt bytes from the file at off
func (fh *RWFileHandle) ReadAt(b []byte, off int64) (n int, err error) {
return fh.readFn(func() (int, error) {
return fh.File.ReadAt(b, off)
})
}
// Seek to new file position
func (fh *RWFileHandle) Seek(offset int64, whence int) (ret int64, err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return 0, ECLOSED
}
if err = fh.openPending(false); err != nil {
return ret, err
}
return fh.File.Seek(offset, whence)
}
// writeFn general purpose write call
//
// Pass a closure to do the actual write
func (fh *RWFileHandle) writeFn(write func() error) (err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if fh.flags&accessModeMask == os.O_RDONLY {
return EBADF
}
if err = fh.openPending(false); err != nil {
return err
}
fh.writeCalled = true
err = write()
if err != nil {
return err
}
fi, err := fh.File.Stat()
if err != nil {
return errors.Wrap(err, "failed to stat cache file")
}
fh.file.setSize(fi.Size())
return nil
}
// Write bytes to the file
func (fh *RWFileHandle) Write(b []byte) (n int, err error) {
err = fh.writeFn(func() error {
n, err = fh.File.Write(b)
return err
})
return n, err
}
// WriteAt bytes to the file at off
func (fh *RWFileHandle) WriteAt(b []byte, off int64) (n int, err error) {
err = fh.writeFn(func() error {
n, err = fh.File.WriteAt(b, off)
return err
})
return n, err
}
// WriteString a string to the file
func (fh *RWFileHandle) WriteString(s string) (n int, err error) {
err = fh.writeFn(func() error {
n, err = fh.File.WriteString(s)
return err
})
return n, err
}
// Truncate file to given size
func (fh *RWFileHandle) Truncate(size int64) (err error) {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if err = fh.openPending(size == 0); err != nil {
return err
}
fh.writeCalled = true
fh.file.setSize(size)
return fh.File.Truncate(size)
}
// Sync commits the current contents of the file to stable storage. Typically,
// this means flushing the file system's in-memory copy of recently written
// data to disk.
func (fh *RWFileHandle) Sync() error {
fh.mu.Lock()
defer fh.mu.Unlock()
if fh.closed {
return ECLOSED
}
if !fh.opened {
return nil
}
if fh.flags&accessModeMask == os.O_RDONLY {
return nil
}
return fh.File.Sync()
}