vfs: make write without cache more efficient

This updates the out of sequence write code to be more efficient using
a conditional lock with a timeout.
This commit is contained in:
Nick Craig-Wood 2019-08-07 16:41:45 +01:00
parent c014b2e66b
commit d377842395

View file

@ -5,6 +5,7 @@ import (
"io" "io"
"os" "os"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/rclone/rclone/fs" "github.com/rclone/rclone/fs"
@ -15,6 +16,7 @@ import (
type WriteFileHandle struct { type WriteFileHandle struct {
baseHandle baseHandle
mu sync.Mutex mu sync.Mutex
cond *sync.Cond // cond lock for out of sequence writes
closed bool // set if handle has been closed closed bool // set if handle has been closed
remote string remote string
pipeWriter *io.PipeWriter pipeWriter *io.PipeWriter
@ -42,6 +44,7 @@ func newWriteFileHandle(d *Dir, f *File, remote string, flags int) (*WriteFileHa
result: make(chan error, 1), result: make(chan error, 1),
file: f, file: f,
} }
fh.cond = sync.NewCond(&fh.mu)
fh.file.addWriter(fh) fh.file.addWriter(fh)
return fh, nil return fh, nil
} }
@ -127,14 +130,28 @@ func (fh *WriteFileHandle) writeAt(p []byte, off int64) (n int, err error) {
fs.Errorf(fh.remote, "WriteFileHandle.Write: error: %v", EBADF) fs.Errorf(fh.remote, "WriteFileHandle.Write: error: %v", EBADF)
return 0, ECLOSED return 0, ECLOSED
} }
// Wait a short time for sequential writes to appear if fh.offset != off {
const maxTries = 1000 // Set a background timer so we don't wait forever
const sleepTime = 1 * time.Millisecond timeout := time.NewTimer(10 * time.Second)
for try := 1; fh.offset != off && try <= maxTries; try++ { done := make(chan struct{})
//fs.Debugf(fh.remote, "waiting for in sequence write %d/%d", try, maxTries) abort := int32(0)
fh.mu.Unlock() go func() {
time.Sleep(sleepTime) select {
fh.mu.Lock() case <-timeout.C:
// set abort flag an give all the waiting goroutines a kick on timeout
atomic.StoreInt32(&abort, 1)
fh.cond.Broadcast()
case <-done:
}
}()
// Wait for an in-sequence write or abort
for fh.offset != off && atomic.LoadInt32(&abort) == 0 {
// fs.Debugf(fh.remote, "waiting for in-sequence write to %d", off)
fh.cond.Wait()
}
// tidy up end timer
close(done)
timeout.Stop()
} }
if fh.offset != off { if fh.offset != off {
fs.Errorf(fh.remote, "WriteFileHandle.Write: can't seek in file without --vfs-cache-mode >= writes") fs.Errorf(fh.remote, "WriteFileHandle.Write: can't seek in file without --vfs-cache-mode >= writes")
@ -152,6 +169,7 @@ func (fh *WriteFileHandle) writeAt(p []byte, off int64) (n int, err error) {
return 0, err return 0, err
} }
// fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n) // fs.Debugf(fh.remote, "WriteFileHandle.Write OK (%d bytes written)", n)
fh.cond.Broadcast() // wake everyone up waiting for an in-sequence read
return n, nil return n, nil
} }