forked from TrueCloudLab/rclone
vfs: make ReadAt for non cached files work better with non-sequential reads
This makes ReadAt for non cached files wait a short time (up to 5mS)
if it gets an out of order read (which would normally cause a seek and
which take a long time) to see if the gap will be filled with an in
order read.
This makes mount2 based on go-fuse work more efficiently and enables
async reading in normal mount.
A similar change was done for WriteAt in af030f74f5
This commit is contained in:
parent
be5392f448
commit
5beeac7959
1 changed files with 40 additions and 1 deletions
39
vfs/read.go
39
vfs/read.go
|
@ -5,6 +5,8 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/rclone/rclone/fs"
|
"github.com/rclone/rclone/fs"
|
||||||
|
@ -18,6 +20,7 @@ type ReadFileHandle struct {
|
||||||
baseHandle
|
baseHandle
|
||||||
done func(err error)
|
done func(err error)
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
cond *sync.Cond // cond lock for out of sequence reads
|
||||||
closed bool // set if handle has been closed
|
closed bool // set if handle has been closed
|
||||||
r *accounting.Account
|
r *accounting.Account
|
||||||
readCalled bool // set if read has been called
|
readCalled bool // set if read has been called
|
||||||
|
@ -60,6 +63,7 @@ func newReadFileHandle(f *File) (*ReadFileHandle, error) {
|
||||||
size: nonNegative(o.Size()),
|
size: nonNegative(o.Size()),
|
||||||
sizeUnknown: o.Size() < 0,
|
sizeUnknown: o.Size() < 0,
|
||||||
}
|
}
|
||||||
|
fh.cond = sync.NewCond(&fh.mu)
|
||||||
return fh, nil
|
return fh, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,6 +225,40 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) {
|
||||||
fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF)
|
fs.Errorf(fh.remote, "ReadFileHandle.Read error: %v", EBADF)
|
||||||
return 0, ECLOSED
|
return 0, ECLOSED
|
||||||
}
|
}
|
||||||
|
maxBuf := 1024 * 1024
|
||||||
|
if len(p) < maxBuf {
|
||||||
|
maxBuf = len(p)
|
||||||
|
}
|
||||||
|
if gap := off - fh.offset; gap > 0 && gap < int64(8*maxBuf) {
|
||||||
|
// Set a background timer so we don't wait for long
|
||||||
|
// Waits here potentially affect all seeks so need to keep them short
|
||||||
|
// This time here was made by finding the smallest when mounting a local backend
|
||||||
|
// that didn't cause seeks.
|
||||||
|
const maxWait = 5 * time.Millisecond
|
||||||
|
timeout := time.NewTimer(maxWait)
|
||||||
|
done := make(chan struct{})
|
||||||
|
abort := int32(0)
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-timeout.C:
|
||||||
|
// set abort flag an give all the waiting goroutines a kick on timeout
|
||||||
|
atomic.StoreInt32(&abort, 1)
|
||||||
|
fs.Debugf(fh.remote, "aborting in-sequence read wait, off=%d", off)
|
||||||
|
fh.cond.Broadcast()
|
||||||
|
case <-done:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for fh.offset != off && atomic.LoadInt32(&abort) == 0 {
|
||||||
|
fs.Debugf(fh.remote, "waiting for in-sequence read to %d for %v", off, maxWait)
|
||||||
|
fh.cond.Wait()
|
||||||
|
}
|
||||||
|
// tidy up end timer
|
||||||
|
close(done)
|
||||||
|
timeout.Stop()
|
||||||
|
if fh.offset != off {
|
||||||
|
fs.Debugf(fh.remote, "failed to wait for in-sequence read to %d", off)
|
||||||
|
}
|
||||||
|
}
|
||||||
doSeek := off != fh.offset
|
doSeek := off != fh.offset
|
||||||
if doSeek && fh.noSeek {
|
if doSeek && fh.noSeek {
|
||||||
return 0, ESPIPE
|
return 0, ESPIPE
|
||||||
|
@ -292,6 +330,7 @@ func (fh *ReadFileHandle) readAt(p []byte, off int64) (n int, err error) {
|
||||||
err = io.EOF
|
err = io.EOF
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fh.cond.Broadcast() // wake everyone up waiting for an in-sequence read
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue