vfs: don't cache the object in read and read/write handles

This should help with inconsistent reads when the source object changes.
This commit is contained in:
Nick Craig-Wood 2018-03-01 15:50:23 +00:00
parent ebfeec9fb4
commit 7fb53a031c
3 changed files with 49 additions and 38 deletions

View file

@ -252,6 +252,13 @@ func (f *File) setObject(o fs.Object) {
f.d.addObject(f) f.d.addObject(f)
} }
// Get the current fs.Object - may be nil
func (f *File) getObject() fs.Object {
f.mu.Lock()
defer f.mu.Unlock()
return f.o
}
// exists returns whether the file exists already // exists returns whether the file exists already
func (f *File) exists() bool { func (f *File) exists() bool {
f.mu.Lock() f.mu.Lock()
@ -284,13 +291,13 @@ func (f *File) waitForValidObject() (o fs.Object, err error) {
// openRead open the file for read // openRead open the file for read
func (f *File) openRead() (fh *ReadFileHandle, err error) { func (f *File) openRead() (fh *ReadFileHandle, err error) {
// if o is nil it isn't valid yet // if o is nil it isn't valid yet
o, err := f.waitForValidObject() _, err = f.waitForValidObject()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// fs.Debugf(o, "File.openRead") // fs.Debugf(o, "File.openRead")
fh, err = newReadFileHandle(f, o) fh, err = newReadFileHandle(f)
if err != nil { if err != nil {
fs.Errorf(f, "File.openRead failed: %v", err) fs.Errorf(f, "File.openRead failed: %v", err)
return nil, err return nil, err

View file

@ -17,7 +17,6 @@ type ReadFileHandle struct {
mu sync.Mutex mu sync.Mutex
closed bool // set if handle has been closed closed bool // set if handle has been closed
r *accounting.Account r *accounting.Account
o fs.Object
readCalled bool // set if read has been called readCalled bool // set if read has been called
size int64 // size of the object size int64 // size of the object
offset int64 // offset of read of o offset int64 // offset of read of o
@ -26,6 +25,7 @@ type ReadFileHandle struct {
file *File file *File
hash *hash.MultiHasher hash *hash.MultiHasher
opened bool opened bool
remote string
} }
// Check interfaces // Check interfaces
@ -36,9 +36,10 @@ var (
_ io.Closer = (*ReadFileHandle)(nil) _ io.Closer = (*ReadFileHandle)(nil)
) )
func newReadFileHandle(f *File, o fs.Object) (*ReadFileHandle, error) { func newReadFileHandle(f *File) (*ReadFileHandle, error) {
var mhash *hash.MultiHasher var mhash *hash.MultiHasher
var err error var err error
o := f.getObject()
if !f.d.vfs.Opt.NoChecksum { if !f.d.vfs.Opt.NoChecksum {
mhash, err = hash.NewMultiHasherTypes(o.Fs().Hashes()) mhash, err = hash.NewMultiHasherTypes(o.Fs().Hashes())
if err != nil { if err != nil {
@ -47,7 +48,7 @@ func newReadFileHandle(f *File, o fs.Object) (*ReadFileHandle, error) {
} }
fh := &ReadFileHandle{ fh := &ReadFileHandle{
o: o, remote: o.Remote(),
noSeek: f.d.vfs.Opt.NoSeek, noSeek: f.d.vfs.Opt.NoSeek,
file: f, file: f,
hash: mhash, hash: mhash,
@ -62,13 +63,14 @@ func (fh *ReadFileHandle) openPending() (err error) {
if fh.opened { if fh.opened {
return nil return nil
} }
r, err := fh.o.Open() o := fh.file.getObject()
r, err := o.Open()
if err != nil { if err != nil {
return err return err
} }
fh.r = accounting.NewAccount(r, fh.o).WithBuffer() // account the transfer fh.r = accounting.NewAccount(r, o).WithBuffer() // account the transfer
fh.opened = true fh.opened = true
accounting.Stats.Transferring(fh.o.Remote()) accounting.Stats.Transferring(o.Remote())
return nil return nil
} }
@ -107,23 +109,24 @@ func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) {
r := oldReader r := oldReader
// Can we seek it directly? // Can we seek it directly?
if do, ok := oldReader.(io.Seeker); !reopen && ok { if do, ok := oldReader.(io.Seeker); !reopen && ok {
fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset) fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset)
_, err = do.Seek(offset, 0) _, err = do.Seek(offset, 0)
if err != nil { if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read io.Seeker failed: %v", err) fs.Debugf(fh.remote, "ReadFileHandle.Read io.Seeker failed: %v", err)
return err return err
} }
} else { } else {
fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d", fh.offset, offset) fs.Debugf(fh.remote, "ReadFileHandle.seek from %d to %d", fh.offset, offset)
// close old one // close old one
err = oldReader.Close() err = oldReader.Close()
if err != nil { if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read seek close old failed: %v", err) fs.Debugf(fh.remote, "ReadFileHandle.Read seek close old failed: %v", err)
} }
// re-open with a seek // re-open with a seek
r, err = fh.o.Open(&fs.SeekOption{Offset: offset}) o := fh.file.getObject()
r, err = o.Open(&fs.SeekOption{Offset: offset})
if err != nil { if err != nil {
fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err) fs.Debugf(fh.remote, "ReadFileHandle.Read seek failed: %v", err)
return err return err
} }
} }
@ -187,9 +190,9 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) {
if err != nil { if err != nil {
return 0, err return 0, err
} }
// fs.Debugf(fh.o, "ReadFileHandle.Read size %d offset %d", reqSize, off) // fs.Debugf(fh.remote, "ReadFileHandle.Read size %d offset %d", reqSize, off)
if fh.closed { if fh.closed {
fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", EBADF) fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF)
return 0, ECLOSED return 0, ECLOSED
} }
doSeek := off != fh.offset doSeek := off != fh.offset
@ -206,7 +209,7 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) {
// file - if so just return EOF leaving the underlying // file - if so just return EOF leaving the underlying
// file in an unchanged state. // file in an unchanged state.
if off >= fh.size { if off >= fh.size {
fs.Debugf(fh.o, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", off, fh.size) fs.Debugf(fh.remote, "ReadFileHandle.Read attempt to read beyond end of file: %d > %d", off, fh.size)
return 0, io.EOF return 0, io.EOF
} }
// Otherwise do the seek // Otherwise do the seek
@ -235,20 +238,20 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) {
break break
} }
retries++ retries++
fs.Errorf(fh.o, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, fs.Config.LowLevelRetries, err) fs.Errorf(fh.remote, "ReadFileHandle.Read error: low level retry %d/%d: %v", retries, fs.Config.LowLevelRetries, err)
doSeek = true doSeek = true
doReopen = true doReopen = true
} }
if err != nil { if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Read error: %v", err) fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", err)
} else { } else {
fh.offset = newOffset fh.offset = newOffset
// fs.Debugf(fh.o, "ReadFileHandle.Read OK") // fs.Debugf(fh.remote, "ReadFileHandle.Read OK")
if fh.hash != nil { if fh.hash != nil {
_, err = fh.hash.Write(p[:n]) _, err = fh.hash.Write(p[:n])
if err != nil { if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Read HashError: %v", err) fs.Errorf(fh.remote, "ReadFileHandle.Read HashError: %v", err)
return 0, err return 0, err
} }
} }
@ -266,8 +269,9 @@ func (fh *ReadFileHandle) checkHash() error {
return nil return nil
} }
o := fh.file.getObject()
for hashType, dstSum := range fh.hash.Sums() { for hashType, dstSum := range fh.hash.Sums() {
srcSum, err := fh.o.Hash(hashType) srcSum, err := o.Hash(hashType)
if err != nil { if err != nil {
return err return err
} }
@ -324,7 +328,7 @@ func (fh *ReadFileHandle) close() error {
fh.closed = true fh.closed = true
if fh.opened { if fh.opened {
accounting.Stats.DoneTransferring(fh.o.Remote(), true) accounting.Stats.DoneTransferring(fh.remote, true)
// Close first so that we have hashes // Close first so that we have hashes
err := fh.r.Close() err := fh.r.Close()
if err != nil { if err != nil {
@ -355,14 +359,14 @@ func (fh *ReadFileHandle) Flush() error {
if !fh.opened { if !fh.opened {
return nil return nil
} }
// fs.Debugf(fh.o, "ReadFileHandle.Flush") // fs.Debugf(fh.remote, "ReadFileHandle.Flush")
if err := fh.checkHash(); err != nil { if err := fh.checkHash(); err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Flush error: %v", err) fs.Errorf(fh.remote, "ReadFileHandle.Flush error: %v", err)
return err return err
} }
// fs.Debugf(fh.o, "ReadFileHandle.Flush OK") // fs.Debugf(fh.remote, "ReadFileHandle.Flush OK")
return nil return nil
} }
@ -377,15 +381,15 @@ func (fh *ReadFileHandle) Release() error {
return nil return nil
} }
if fh.closed { if fh.closed {
fs.Debugf(fh.o, "ReadFileHandle.Release nothing to do") fs.Debugf(fh.remote, "ReadFileHandle.Release nothing to do")
return nil return nil
} }
fs.Debugf(fh.o, "ReadFileHandle.Release closing") fs.Debugf(fh.remote, "ReadFileHandle.Release closing")
err := fh.close() err := fh.close()
if err != nil { if err != nil {
fs.Errorf(fh.o, "ReadFileHandle.Release error: %v", err) fs.Errorf(fh.remote, "ReadFileHandle.Release error: %v", err)
} else { } else {
// fs.Debugf(fh.o, "ReadFileHandle.Release OK") // fs.Debugf(fh.remote, "ReadFileHandle.Release OK")
} }
return err return err
} }

View file

@ -23,7 +23,6 @@ type RWFileHandle struct {
*os.File *os.File
mu sync.Mutex mu sync.Mutex
closed bool // set if handle has been closed closed bool // set if handle has been closed
o fs.Object // may be nil
remote string remote string
file *File file *File
d *Dir d *Dir
@ -51,7 +50,6 @@ func newRWFileHandle(d *Dir, f *File, remote string, flags int) (fh *RWFileHandl
} }
fh = &RWFileHandle{ fh = &RWFileHandle{
o: f.o,
file: f, file: f,
d: d, d: d,
remote: remote, remote: remote,
@ -107,16 +105,18 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) {
fh.file.muOpen.Lock() fh.file.muOpen.Lock()
defer fh.file.muOpen.Unlock() defer fh.file.muOpen.Unlock()
o := fh.file.getObject()
var fd *os.File var fd *os.File
cacheFileOpenFlags := fh.flags cacheFileOpenFlags := fh.flags
// if not truncating the file, need to read it first // if not truncating the file, need to read it first
if fh.flags&os.O_TRUNC == 0 && !truncate { if fh.flags&os.O_TRUNC == 0 && !truncate {
// If the remote object exists AND its cached file exists locally AND there are no // If the remote object exists AND its cached file exists locally AND there are no
// other handles with it open writers, then attempt to update it. // other handles with it open writers, then attempt to update it.
if fh.o != nil && fh.d.vfs.cache.opens(fh.remote) <= 1 { if o != nil && fh.d.vfs.cache.opens(fh.remote) <= 1 {
cacheObj, err := fh.d.vfs.cache.f.NewObject(fh.remote) cacheObj, err := fh.d.vfs.cache.f.NewObject(fh.remote)
if err == nil && cacheObj != nil { if err == nil && cacheObj != nil {
cacheObj, err = copyObj(fh.d.vfs.cache.f, cacheObj, fh.remote, fh.o) cacheObj, err = copyObj(fh.d.vfs.cache.f, cacheObj, fh.remote, o)
if err != nil { if err != nil {
return errors.Wrap(err, "open RW handle failed to update cached file") return errors.Wrap(err, "open RW handle failed to update cached file")
} }
@ -128,8 +128,8 @@ func (fh *RWFileHandle) openPending(truncate bool) (err error) {
if os.IsNotExist(err) { if os.IsNotExist(err) {
// cache file does not exist, so need to fetch it if we have an object to fetch // cache file does not exist, so need to fetch it if we have an object to fetch
// it from // it from
if fh.o != nil { if o != nil {
_, err = copyObj(fh.d.vfs.cache.f, nil, fh.remote, fh.o) _, err = copyObj(fh.d.vfs.cache.f, nil, fh.remote, o)
if err != nil { if err != nil {
cause := errors.Cause(err) cause := errors.Cause(err)
if cause != fs.ErrorObjectNotFound && cause != fs.ErrorDirNotFound { if cause != fs.ErrorObjectNotFound && cause != fs.ErrorDirNotFound {
@ -296,7 +296,7 @@ func (fh *RWFileHandle) close() (err error) {
return err return err
} }
o, err := copyObj(fh.d.vfs.f, fh.o, fh.remote, cacheObj) o, err := copyObj(fh.d.vfs.f, fh.file.getObject(), fh.remote, cacheObj)
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to transfer file from cache to remote") err = errors.Wrap(err, "failed to transfer file from cache to remote")
fs.Errorf(fh.logPrefix(), "%v", err) fs.Errorf(fh.logPrefix(), "%v", err)