vfs: fix download threads timing out

Before this fix, download threads would fill up the buffer and then
timeout even though data was still being read from them. If the client
was streaming slower than network speed this caused the downloader to
stop and be restarted continuously. This caused more potential for
skips in the download and unecessary network transactions.

This patch fixes that behaviour - as long as a downloader is being
read from more often than once every 5 seconds, it won't timeout.

This was done by:

- kicking the downloader whenever ensureDownloader is called
- making the downloader loop if it has already downloaded past the maxOffset
- making setRange() always kick the downloader
This commit is contained in:
Nick Craig-Wood 2020-08-05 15:29:45 +01:00
parent 109b695621
commit 4d7f91309b

View file

@ -273,6 +273,8 @@ func (dls *Downloaders) _closeWaiters(err error) {
// //
// call with lock held // call with lock held
func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) { func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) {
// defer log.Trace(dls.src, "r=%v", r)("err=%v", &err)
// The window includes potentially unread data in the buffer // The window includes potentially unread data in the buffer
window := int64(fs.Config.BufferSize) window := int64(fs.Config.BufferSize)
@ -290,22 +292,30 @@ func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) {
// If the range is entirely present then we only need to start a // If the range is entirely present then we only need to start a
// dowloader if the window isn't full. // dowloader if the window isn't full.
startNew := true
if r.IsEmpty() { if r.IsEmpty() {
// Make a new range which includes the window // Make a new range which includes the window
rWindow := r rWindow := r
if rWindow.Size < window { rWindow.Size += window
rWindow.Size = window
}
// Clip rWindow to stuff which needs downloading // Clip rWindow to stuff which needs downloading
rWindow = dls.item.FindMissing(rWindow) rWindowClipped := dls.item.FindMissing(rWindow)
// If rWindow is empty then just return without starting a
// downloader as there is no data within the window which needs // If rWindowClipped is empty then don't start a new downloader
// downloading. // if there isn't an existing one as there is no data within the
if rWindow.IsEmpty() { // window which needs downloading. We do want to kick an
return nil // existing one though to stop it timing out.
if rWindowClipped.IsEmpty() {
// Don't start any more downloaders
startNew = false
// Start downloading at the start of the unread window
// This likely has been downloaded already but it will
// kick the downloader
r.Pos = rWindow.End()
} else {
// Start downloading at the start of the unread window
r.Pos = rWindowClipped.Pos
} }
// Start downloading at the start of the unread window
r.Pos = rWindow.Pos
// But don't write anything for the moment // But don't write anything for the moment
r.Size = 0 r.Size = 0
} }
@ -329,6 +339,9 @@ func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) {
return nil return nil
} }
} }
if !startNew {
return nil
}
// Downloader not found so start a new one // Downloader not found so start a new one
dl, err = dls._newDownloader(r) dl, err = dls._newDownloader(r)
if err != nil { if err != nil {
@ -431,13 +444,15 @@ func (dl *downloader) Write(p []byte) (n int, err error) {
// - we are quitting // - we are quitting
// - we get kicked // - we get kicked
// - timeout happens // - timeout happens
if dl.offset >= dl.maxOffset { loop:
for dl.offset >= dl.maxOffset {
var timeout = time.NewTimer(maxDownloaderIdleTime) var timeout = time.NewTimer(maxDownloaderIdleTime)
dl.mu.Unlock() dl.mu.Unlock()
select { select {
case <-dl.quit: case <-dl.quit:
dl.mu.Lock() dl.mu.Lock()
timeout.Stop() timeout.Stop()
break loop
case <-dl.kick: case <-dl.kick:
dl.mu.Lock() dl.mu.Lock()
timeout.Stop() timeout.Stop()
@ -448,6 +463,7 @@ func (dl *downloader) Write(p []byte) (n int, err error) {
fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it timed out") fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it timed out")
dl._stop() dl._stop()
} }
break loop
} }
} }
@ -589,17 +605,18 @@ func (dl *downloader) download() (n int64, err error) {
// setRange makes sure the downloader is downloading the range passed in // setRange makes sure the downloader is downloading the range passed in
func (dl *downloader) setRange(r ranges.Range) { func (dl *downloader) setRange(r ranges.Range) {
// defer log.Trace(dl.dls.src, "r=%v", r)("")
dl.mu.Lock() dl.mu.Lock()
maxOffset := r.End() maxOffset := r.End()
if maxOffset > dl.maxOffset { if maxOffset > dl.maxOffset {
dl.maxOffset = maxOffset dl.maxOffset = maxOffset
// fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset)
select {
case dl.kick <- struct{}{}:
default:
}
} }
dl.mu.Unlock() dl.mu.Unlock()
// fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset)
select {
case dl.kick <- struct{}{}:
default:
}
} }
// get the current range this downloader is working on // get the current range this downloader is working on