diff --git a/cmd/mount/read.go b/cmd/mount/read.go index 3cd88522b..2680d9fd4 100644 --- a/cmd/mount/read.go +++ b/cmd/mount/read.go @@ -46,30 +46,33 @@ var _ fusefs.HandleReader = (*ReadFileHandle)(nil) // if reopen is true, then we won't attempt to use an io.Seeker interface // // Must be called with fh.mu held -func (fh *ReadFileHandle) seek(offset int64, reopen bool) error { - // Can we seek it directly? +func (fh *ReadFileHandle) seek(offset int64, reopen bool) (err error) { + fh.r.StopBuffering() // stop the background reading first oldReader := fh.r.GetReader() + r := oldReader + // Can we seek it directly? if do, ok := oldReader.(io.Seeker); !reopen && ok { fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d (io.Seeker)", fh.offset, offset) - _, err := do.Seek(offset, 0) + _, err = do.Seek(offset, 0) if err != nil { fs.Debugf(fh.o, "ReadFileHandle.Read io.Seeker failed: %v", err) return err } } else { fs.Debugf(fh.o, "ReadFileHandle.seek from %d to %d", fh.offset, offset) - // if not re-open with a seek - r, err := fh.o.Open(&fs.SeekOption{Offset: offset}) - if err != nil { - fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err) - return err - } + // close old one err = oldReader.Close() if err != nil { fs.Debugf(fh.o, "ReadFileHandle.Read seek close old failed: %v", err) } - fh.r.UpdateReader(r) + // re-open with a seek + r, err = fh.o.Open(&fs.SeekOption{Offset: offset}) + if err != nil { + fs.Debugf(fh.o, "ReadFileHandle.Read seek failed: %v", err) + return err + } } + fh.r.UpdateReader(r) fh.offset = offset return nil } diff --git a/fs/accounting.go b/fs/accounting.go index ef85f4a8e..e9d6163e9 100644 --- a/fs/accounting.go +++ b/fs/accounting.go @@ -308,6 +308,7 @@ type Account struct { // shouldn't. mu sync.Mutex in io.ReadCloser + origIn io.ReadCloser size int64 name string statmu sync.Mutex // Separate mutex for stat values. @@ -318,6 +319,7 @@ type Account struct { avg ewma.MovingAverage // Moving average of last few measurements closed bool // set if the file is closed exit chan struct{} // channel that will be closed when transfer is finished + withBuf bool // is using a buffered in wholeFileDisabled bool // disables the whole file when doing parts } @@ -327,6 +329,7 @@ type Account struct { func NewAccountSizeName(in io.ReadCloser, size int64, name string) *Account { acc := &Account{ in: in, + origIn: in, size: size, name: name, exit: make(chan struct{}), @@ -368,7 +371,10 @@ func bufferReadCloser(in io.ReadCloser, size int64, name string) io.ReadCloser { // // If the file is above a certain size it adds an Async reader func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Account { - return NewAccountSizeName(bufferReadCloser(in, size, name), size, name) + acc := NewAccountSizeName(in, size, name) + acc.in = bufferReadCloser(in, size, name) + acc.withBuf = true + return acc } // NewAccountWithBuffer makes a Account reader for an object @@ -382,16 +388,26 @@ func NewAccountWithBuffer(in io.ReadCloser, obj Object) *Account { func (acc *Account) GetReader() io.ReadCloser { acc.mu.Lock() defer acc.mu.Unlock() - return acc.in + return acc.origIn +} + +// StopBuffering stops the async buffer doing any more buffering +func (acc *Account) StopBuffering() { + if asyncIn, ok := acc.in.(*asyncReader); ok { + asyncIn.Abandon() + } } // UpdateReader updates the underlying io.ReadCloser func (acc *Account) UpdateReader(in io.ReadCloser) { acc.mu.Lock() - if asyncIn, ok := acc.in.(*asyncReader); ok { - asyncIn.Abandon() + acc.StopBuffering() + acc.origIn = in + if acc.withBuf { + acc.in = bufferReadCloser(in, acc.size, acc.name) + } else { + acc.in = in } - acc.in = bufferReadCloser(in, acc.size, acc.name) acc.mu.Unlock() }