diff --git a/vfs/file.go b/vfs/file.go index c7fe43d1c..8ac9aaac7 100644 --- a/vfs/file.go +++ b/vfs/file.go @@ -14,14 +14,21 @@ import ( // File represents a file type File struct { - inode uint64 // inode number - size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned - d *Dir // parent directory - read only - mu sync.RWMutex // protects the following - o fs.Object // NB o may be nil if file is being written - leaf string // leaf name of the object - writers []Handle // writers for this file - pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written + inode uint64 // inode number + size int64 // size of file - read and written with atomic int64 - must be 64 bit aligned + d *Dir // parent directory - read only + + mu sync.Mutex // protects the following + o fs.Object // NB o may be nil if file is being written + leaf string // leaf name of the object + writers []Handle // writers for this file + readWriters int // how many RWFileHandle are open for writing + readWriterClosing bool // is a RWFileHandle currently cosing? + modified bool // has the cache file be modified by a RWFileHandle? + pendingModTime time.Time // will be applied once o becomes available, i.e. after file was written + + muClose sync.Mutex // synchonize RWFileHandle.close() + muOpen sync.Mutex // synchonize RWFileHandle.openPending() } // newFile creates a new File @@ -95,12 +102,16 @@ func (f *File) rename(d *Dir, o fs.Object) { func (f *File) addWriter(h Handle) { f.mu.Lock() f.writers = append(f.writers, h) + if _, ok := h.(*RWFileHandle); ok { + f.readWriters++ + } f.mu.Unlock() } // delWriter removes a write handle from the file -func (f *File) delWriter(h Handle) { +func (f *File) delWriter(h Handle, modifiedCacheFile bool) (lastWriterAndModified bool) { f.mu.Lock() + defer f.mu.Unlock() var found = -1 for i := range f.writers { if f.writers[i] == h { @@ -113,6 +124,24 @@ func (f *File) delWriter(h Handle) { } else { fs.Debugf(f.o, "File.delWriter couldn't find handle") } + if _, ok := h.(*RWFileHandle); ok { + f.readWriters-- + } + f.readWriterClosing = true + if modifiedCacheFile { + f.modified = true + } + lastWriterAndModified = len(f.writers) == 0 && f.modified + if lastWriterAndModified { + f.modified = false + } + return +} + +// finishWriterClose resets the readWriterClosing flag +func (f *File) finishWriterClose() { + f.mu.Lock() + f.readWriterClosing = false f.mu.Unlock() } @@ -132,7 +161,7 @@ func (f *File) ModTime() (modTime time.Time) { if !f.d.vfs.Opt.NoModTime { // if o is nil it isn't valid yet or there are writers, so return the size so far - if f.o == nil || len(f.writers) != 0 { + if f.o == nil || len(f.writers) != 0 || f.readWriterClosing { if !f.pendingModTime.IsZero() { return f.pendingModTime } @@ -158,7 +187,7 @@ func (f *File) Size() int64 { defer f.mu.Unlock() // if o is nil it isn't valid yet or there are writers, so return the size so far - if f.o == nil || len(f.writers) != 0 { + if f.o == nil || len(f.writers) != 0 || f.readWriterClosing { return atomic.LoadInt64(&f.size) } return nonNegative(f.o.Size()) @@ -174,7 +203,8 @@ func (f *File) SetModTime(modTime time.Time) error { f.pendingModTime = modTime - if f.o != nil { + // Only update the ModTime when there are no writers, setObject will do it + if f.o != nil && len(f.writers) == 0 && !f.readWriterClosing { return f.applyPendingModTime() } @@ -238,11 +268,12 @@ func (f *File) waitForValidObject() (o fs.Object, err error) { f.mu.Lock() o = f.o nwriters := len(f.writers) + wclosing := f.readWriterClosing f.mu.Unlock() if o != nil { return o, nil } - if nwriters == 0 { + if nwriters == 0 && !wclosing { return nil, errors.New("can't open file - writer failed") } time.Sleep(100 * time.Millisecond) @@ -400,7 +431,10 @@ func (f *File) Open(flags int) (fd Handle, err error) { // Open the correct sort of handle CacheMode := f.d.vfs.Opt.CacheMode - if read && write { + CacheItem := f.d.vfs.cache.get(f.Path()) + if CacheMode >= CacheModeMinimal && CacheItem.opens > 0 { + fd, err = f.openRW(flags) + } else if read && write { if CacheMode >= CacheModeMinimal { fd, err = f.openRW(flags) } else { @@ -437,6 +471,8 @@ func (f *File) Truncate(size int64) (err error) { copy(writers, f.writers) f.mu.Unlock() + // FIXME: handle closing writer + // If have writers then call truncate for each writer if len(writers) != 0 { fs.Debugf(f.o, "Truncating %d file handles", len(writers)) diff --git a/vfs/read_write.go b/vfs/read_write.go index ef2300243..cf8631395 100644 --- a/vfs/read_write.go +++ b/vfs/read_write.go @@ -1,6 +1,7 @@ package vfs import ( + "fmt" "io" "os" "runtime" @@ -69,6 +70,14 @@ func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandl fh.file.addWriter(fh) } + // truncate or create files immediately to prepare the cache + if fh.flags&os.O_TRUNC != 0 || fh.flags&(os.O_CREATE) != 0 && !f.exists() { + if err := fh.openPending(false); err != nil { + fh.file.delWriter(fh, false) + return nil, err + } + } + return fh, nil } @@ -80,29 +89,42 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) { return nil } - cacheFileOpenFlags := fh.flags + fh.file.muOpen.Lock() + defer fh.file.muOpen.Unlock() + var fd *os.File + cacheFileOpenFlags := fh.flags // if not truncating the file, need to read it first if fh.flags&os.O_TRUNC == 0 && !truncate { - // Fetch the file if it hasn't changed - // FIXME retries - err = operations.CopyFile(fh.d.vfs.cache.f, fh.d.vfs.f, fh.remote, fh.remote) - if err != nil { - // if the object wasn't found AND O_CREATE is set then... - cause := errors.Cause(err) - notFound := cause == fs.ErrorObjectNotFound || cause == fs.ErrorDirNotFound - if notFound { - // Remove cached item if there is one - rmErr := os.Remove(fh.osPath) - if rmErr != nil && !os.IsNotExist(rmErr) { - return errors.Wrap(rmErr, "open RW handle failed to delete stale cache file") + // try to open a exising cache file + fd, err = os.OpenFile(fh.osPath, cacheFileOpenFlags&^os.O_CREATE, 0600) + if os.IsNotExist(err) { + // Fetch the file if it hasn't changed + // FIXME retries + err = operations.CopyFile(fh.d.vfs.cache.f, fh.d.vfs.f, fh.remote, fh.remote) + if err != nil { + // if the object wasn't found AND O_CREATE is set then... + cause := errors.Cause(err) + notFound := cause == fs.ErrorObjectNotFound || cause == fs.ErrorDirNotFound + if notFound { + // Remove cached item if there is one + rmErr := os.Remove(fh.osPath) + if rmErr != nil && !os.IsNotExist(rmErr) { + return errors.Wrap(rmErr, "open RW handle failed to delete stale cache file") + } + } + if notFound && fh.flags&os.O_CREATE != 0 { + // ...ignore error as we are about to create the file + fh.file.setSize(0) + fh.writeCalled = true + } else { + return errors.Wrap(err, "open RW handle failed to cache file") } } - if notFound && fh.flags&os.O_CREATE != 0 { - // ...ignore error as we are about to create the file - } else { - return errors.Wrap(err, "open RW handle failed to cache file") - } + } else if err != nil { + return errors.Wrap(err, "cache open file failed") + } else { + fs.Debugf(fh.logPrefix(), "Opened existing cached copy with flags=%s", decodeOpenFlags(fh.flags)) } } else { // Set the size to 0 since we are truncating and flag we need to write it back @@ -127,10 +149,12 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) { } } - fs.Debugf(fh.remote, "Opening cached copy with flags=%s", decodeOpenFlags(fh.flags)) - fd, err := os.OpenFile(fh.osPath, cacheFileOpenFlags, 0600) - if err != nil { - return errors.Wrap(err, "cache open file failed") + if fd == nil { + fs.Debugf(fh.logPrefix(), "Opening cached copy with flags=%s", decodeOpenFlags(fh.flags)) + fd, err = os.OpenFile(fh.osPath, cacheFileOpenFlags, 0600) + if err != nil { + return errors.Wrap(err, "cache open file failed") + } } fh.File = fd fh.opened = true @@ -166,85 +190,74 @@ func (fh *RWFileHandle) Node() Node { // Note that we leave the file around in the cache on error conditions // to give the user a chance to recover it. func (fh *RWFileHandle) close() (err error) { - defer log.Trace(fh.remote, "")("err=%v", &err) + defer log.Trace(fh.logPrefix(), "")("err=%v", &err) + fh.file.muClose.Lock() + defer fh.file.muClose.Unlock() + if fh.closed { return ECLOSED } fh.closed = true defer fh.d.vfs.cache.close(fh.remote) rdwrMode := fh.flags & accessModeMask - if rdwrMode != os.O_RDONLY { - // leave writer open until file is transferred - defer fh.file.delWriter(fh) + writer := rdwrMode != os.O_RDONLY + + // If read only then return + if !fh.opened && rdwrMode == os.O_RDONLY { + return nil } - if !fh.opened { - // If read only then return - if rdwrMode == os.O_RDONLY { - return nil - } - // If we aren't creating or truncating the file then - // we haven't modified it so don't need to transfer it - if fh.flags&(os.O_CREATE|os.O_TRUNC) == 0 { - return nil - } - // Otherwise open the file - // FIXME this could be more efficient + + copy := false + if writer { + copy = fh.file.delWriter(fh, fh.modified()) + defer fh.file.finishWriterClose() + } + + // If we aren't creating or truncating the file then + // we haven't modified it so don't need to transfer it + if fh.flags&(os.O_CREATE|os.O_TRUNC) != 0 { if err := fh.openPending(false); err != nil { return err } } - if rdwrMode != os.O_RDONLY { + + if writer && fh.opened { fi, err := fh.File.Stat() if err != nil { - fs.Errorf(fh.remote, "Failed to stat cache file: %v", err) + fs.Errorf(fh.logPrefix(), "Failed to stat cache file: %v", err) } else { fh.file.setSize(fi.Size()) } } // Close the underlying file - err = fh.File.Close() - if err != nil { - return err + if fh.opened { + err = fh.File.Close() + if err != nil { + err = errors.Wrap(err, "failed to close cache file") + return err + } } - // FIXME measure whether we actually did any writes or not - - // no writes means no transfer? - if rdwrMode == os.O_RDONLY && fh.flags&os.O_TRUNC == 0 { - fs.Debugf(fh.remote, "read only and not truncating so not transferring") - return nil - } - - // If write hasn't been called and we aren't creating or - // truncating the file then we haven't modified it so don't - // need to transfer it - if !fh.writeCalled && fh.flags&(os.O_CREATE|os.O_TRUNC) == 0 { - fs.Debugf(fh.remote, "not modified so not transferring") - return nil - } - - // Transfer the temp file to the remote - // FIXME retries - if fh.d.vfs.Opt.CacheMode < CacheModeFull { - err = operations.MoveFile(fh.d.vfs.f, fh.d.vfs.cache.f, fh.remote, fh.remote) - } else { + if copy { + // Transfer the temp file to the remote + // FIXME retries err = operations.CopyFile(fh.d.vfs.f, fh.d.vfs.cache.f, fh.remote, fh.remote) - } - if err != nil { - err = errors.Wrap(err, "failed to transfer file from cache to remote") - fs.Errorf(fh.remote, "%v", err) - return err - } + if err != nil { + err = errors.Wrap(err, "failed to transfer file from cache to remote") + fs.Errorf(fh.logPrefix(), "%v", err) + return err + } - // FIXME get MoveFile to return this object - o, err := fh.d.vfs.f.NewObject(fh.remote) - if err != nil { - err = errors.Wrap(err, "failed to find object after transfer to remote") - fs.Errorf(fh.remote, "%v", err) - return err + o, err := fh.d.vfs.f.NewObject(fh.remote) + if err != nil { + err = errors.Wrap(err, "failed to find object after transfer to remote") + fs.Errorf(fh.logPrefix(), "%v", err) + return err + } + fh.file.setObject(o) + fs.Debugf(o, "transferred to remote") } - fh.file.setObject(o) - fs.Debugf(o, "transferred to remote") return nil } @@ -256,6 +269,24 @@ func (fh *RWFileHandle) Close() error { return fh.close() } +func (fh *RWFileHandle) modified() bool { + rdwrMode := fh.flags & accessModeMask + // no writes means no transfer? + if rdwrMode == os.O_RDONLY && fh.flags&os.O_TRUNC == 0 { + fs.Debugf(fh.logPrefix(), "read only and not truncating so not transferring") + return false + } + + // If write hasn't been called and we aren't creating or + // truncating the file then we haven't modified it so don't + // need to transfer it + if !fh.writeCalled && fh.flags&(os.O_CREATE|os.O_TRUNC) == 0 { + fs.Debugf(fh.logPrefix(), "not modified so not transferring") + return false + } + return true +} + // Flush is called each time the file or directory is closed. // Because there can be multiple file descriptors referring to a // single opened file, Flush can be called multiple times. @@ -266,26 +297,26 @@ func (fh *RWFileHandle) Flush() error { return nil } if fh.closed { - fs.Debugf(fh.remote, "RWFileHandle.Flush nothing to do") + fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush nothing to do") return nil } - // fs.Debugf(fh.remote, "RWFileHandle.Flush") + // fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush") if !fh.opened { - fs.Debugf(fh.remote, "RWFileHandle.Flush ignoring flush on unopened handle") + fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unopened handle") return nil } // If Write hasn't been called then ignore the Flush - Release // will pick it up if !fh.writeCalled { - fs.Debugf(fh.remote, "RWFileHandle.Flush ignoring flush on unwritten handle") + fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush ignoring flush on unwritten handle") return nil } err := fh.close() if err != nil { - fs.Errorf(fh.remote, "RWFileHandle.Flush error: %v", err) + fs.Errorf(fh.logPrefix(), "RWFileHandle.Flush error: %v", err) } else { - // fs.Debugf(fh.remote, "RWFileHandle.Flush OK") + // fs.Debugf(fh.logPrefix(), "RWFileHandle.Flush OK") } return err } @@ -298,15 +329,15 @@ func (fh *RWFileHandle) Release() error { fh.mu.Lock() defer fh.mu.Unlock() if fh.closed { - fs.Debugf(fh.remote, "RWFileHandle.Release nothing to do") + fs.Debugf(fh.logPrefix(), "RWFileHandle.Release nothing to do") return nil } - fs.Debugf(fh.remote, "RWFileHandle.Release closing") + fs.Debugf(fh.logPrefix(), "RWFileHandle.Release closing") err := fh.close() if err != nil { - fs.Errorf(fh.remote, "RWFileHandle.Release error: %v", err) + fs.Errorf(fh.logPrefix(), "RWFileHandle.Release error: %v", err) } else { - // fs.Debugf(fh.remote, "RWFileHandle.Release OK") + // fs.Debugf(fh.logPrefix(), "RWFileHandle.Release OK") } return err } @@ -429,7 +460,6 @@ func (fh *RWFileHandle) WriteString(s string) (n int, err error) { return err }) return n, err - } // Truncate file to given size @@ -464,3 +494,7 @@ func (fh *RWFileHandle) Sync() error { } return fh.File.Sync() } + +func (fh *RWFileHandle) logPrefix() string { + return fmt.Sprintf("%s(%p)", fh.remote, fh) +} diff --git a/vfs/read_write_test.go b/vfs/read_write_test.go index 90713e631..06af52f59 100644 --- a/vfs/read_write_test.go +++ b/vfs/read_write_test.go @@ -12,6 +12,12 @@ import ( "github.com/stretchr/testify/require" ) +func cleanup(t *testing.T, r *fstest.Run, vfs *VFS) { + assert.NoError(t, vfs.CleanUp()) + vfs.Shutdown() + r.Finalise() +} + // Open a file for write func rwHandleCreateReadOnly(t *testing.T, r *fstest.Run) (*VFS, *RWFileHandle) { vfs := New(r.Fremote, nil) @@ -53,8 +59,8 @@ func rwReadString(t *testing.T, fh *RWFileHandle, n int) string { func TestRWFileHandleMethodsRead(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() - _, fh := rwHandleCreateReadOnly(t, r) + vfs, fh := rwHandleCreateReadOnly(t, r) + defer cleanup(t, r, vfs) // String assert.Equal(t, "dir/file1 (rw)", fh.String()) @@ -101,8 +107,8 @@ func TestRWFileHandleMethodsRead(t *testing.T) { func TestRWFileHandleSeek(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() - _, fh := rwHandleCreateReadOnly(t, r) + vfs, fh := rwHandleCreateReadOnly(t, r) + defer cleanup(t, r, vfs) assert.Equal(t, "0", rwReadString(t, fh, 1)) @@ -140,8 +146,8 @@ func TestRWFileHandleSeek(t *testing.T) { func TestRWFileHandleReadAt(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() - _, fh := rwHandleCreateReadOnly(t, r) + vfs, fh := rwHandleCreateReadOnly(t, r) + defer cleanup(t, r, vfs) // read from start buf := make([]byte, 1) @@ -191,8 +197,8 @@ func TestRWFileHandleReadAt(t *testing.T) { func TestRWFileHandleFlushRead(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() - _, fh := rwHandleCreateReadOnly(t, r) + vfs, fh := rwHandleCreateReadOnly(t, r) + defer cleanup(t, r, vfs) // Check Flush does nothing if read not called err := fh.Flush() @@ -221,8 +227,8 @@ func TestRWFileHandleFlushRead(t *testing.T) { func TestRWFileHandleReleaseRead(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() - _, fh := rwHandleCreateReadOnly(t, r) + vfs, fh := rwHandleCreateReadOnly(t, r) + defer cleanup(t, r, vfs) // Read data buf := make([]byte, 256) @@ -245,8 +251,8 @@ func TestRWFileHandleReleaseRead(t *testing.T) { func TestRWFileHandleMethodsWrite(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() vfs, fh := rwHandleCreateWriteOnly(t, r) + defer cleanup(t, r, vfs) // String assert.Equal(t, "file1 (rw)", fh.String()) @@ -320,8 +326,8 @@ func TestRWFileHandleMethodsWrite(t *testing.T) { func TestRWFileHandleWriteAt(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() vfs, fh := rwHandleCreateWriteOnly(t, r) + defer cleanup(t, r, vfs) offset := func() int64 { n, err := fh.Seek(0, 1) @@ -331,7 +337,8 @@ func TestRWFileHandleWriteAt(t *testing.T) { // Preconditions assert.Equal(t, int64(0), offset()) - assert.False(t, fh.writeCalled) + assert.True(t, fh.opened) + assert.True(t, fh.writeCalled) // Write the data n, err := fh.WriteAt([]byte("hello**"), 0) @@ -366,8 +373,8 @@ func TestRWFileHandleWriteAt(t *testing.T) { func TestRWFileHandleWriteNoWrite(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() vfs, fh := rwHandleCreateWriteOnly(t, r) + defer cleanup(t, r, vfs) // Close the file without writing to it err := fh.Close() @@ -395,13 +402,11 @@ func TestRWFileHandleWriteNoWrite(t *testing.T) { func TestRWFileHandleFlushWrite(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() - _, fh := rwHandleCreateWriteOnly(t, r) + vfs, fh := rwHandleCreateWriteOnly(t, r) + defer cleanup(t, r, vfs) - // Check Flush does nothing if write not called - err := fh.Flush() - assert.NoError(t, err) - assert.False(t, fh.closed) + // Check that the file has been create and is open + assert.True(t, fh.opened) // Write some data n, err := fh.Write([]byte("hello")) @@ -421,8 +426,8 @@ func TestRWFileHandleFlushWrite(t *testing.T) { func TestRWFileHandleReleaseWrite(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() - _, fh := rwHandleCreateWriteOnly(t, r) + vfs, fh := rwHandleCreateWriteOnly(t, r) + defer cleanup(t, r, vfs) // Write some data n, err := fh.Write([]byte("hello")) @@ -515,9 +520,9 @@ func testRWFileHandleOpenTest(t *testing.T, vfs *VFS, test *openTest) { func TestRWFileHandleOpenTests(t *testing.T) { r := fstest.NewRun(t) - defer r.Finalise() - vfs := New(r.Fremote, nil) + defer cleanup(t, r, vfs) + vfs.Opt.CacheMode = CacheModeFull for _, test := range openTests { testRWFileHandleOpenTest(t, vfs, &test) diff --git a/vfs/write.go b/vfs/write.go index 99777a88f..b96037a92 100644 --- a/vfs/write.go +++ b/vfs/write.go @@ -181,7 +181,7 @@ func (fh *WriteFileHandle) close() (err error) { } fh.closed = true // leave writer open until file is transferred - defer fh.file.delWriter(fh) + defer fh.file.delWriter(fh, false) // If file not opened and not safe to truncate then then leave file intact if !fh.opened && !fh.safeToTruncate() { return nil