From 4d7f91309b73fb46d58cba439d9ffe7fe236dd24 Mon Sep 17 00:00:00 2001 From: Nick Craig-Wood Date: Wed, 5 Aug 2020 15:29:45 +0100 Subject: [PATCH] vfs: fix download threads timing out Before this fix, download threads would fill up the buffer and then timeout even though data was still being read from them. If the client was streaming slower than network speed this caused the downloader to stop and be restarted continuously. This caused more potential for skips in the download and unecessary network transactions. This patch fixes that behaviour - as long as a downloader is being read from more often than once every 5 seconds, it won't timeout. This was done by: - kicking the downloader whenever ensureDownloader is called - making the downloader loop if it has already downloaded past the maxOffset - making setRange() always kick the downloader --- vfs/vfscache/downloaders/downloaders.go | 51 ++++++++++++++++--------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/vfs/vfscache/downloaders/downloaders.go b/vfs/vfscache/downloaders/downloaders.go index 08f230bab..f2c22ee2c 100644 --- a/vfs/vfscache/downloaders/downloaders.go +++ b/vfs/vfscache/downloaders/downloaders.go @@ -273,6 +273,8 @@ func (dls *Downloaders) _closeWaiters(err error) { // // call with lock held func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) { + // defer log.Trace(dls.src, "r=%v", r)("err=%v", &err) + // The window includes potentially unread data in the buffer window := int64(fs.Config.BufferSize) @@ -290,22 +292,30 @@ func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) { // If the range is entirely present then we only need to start a // dowloader if the window isn't full. + startNew := true if r.IsEmpty() { // Make a new range which includes the window rWindow := r - if rWindow.Size < window { - rWindow.Size = window - } + rWindow.Size += window + // Clip rWindow to stuff which needs downloading - rWindow = dls.item.FindMissing(rWindow) - // If rWindow is empty then just return without starting a - // downloader as there is no data within the window which needs - // downloading. - if rWindow.IsEmpty() { - return nil + rWindowClipped := dls.item.FindMissing(rWindow) + + // If rWindowClipped is empty then don't start a new downloader + // if there isn't an existing one as there is no data within the + // window which needs downloading. We do want to kick an + // existing one though to stop it timing out. + if rWindowClipped.IsEmpty() { + // Don't start any more downloaders + startNew = false + // Start downloading at the start of the unread window + // This likely has been downloaded already but it will + // kick the downloader + r.Pos = rWindow.End() + } else { + // Start downloading at the start of the unread window + r.Pos = rWindowClipped.Pos } - // Start downloading at the start of the unread window - r.Pos = rWindow.Pos // But don't write anything for the moment r.Size = 0 } @@ -329,6 +339,9 @@ func (dls *Downloaders) _ensureDownloader(r ranges.Range) (err error) { return nil } } + if !startNew { + return nil + } // Downloader not found so start a new one dl, err = dls._newDownloader(r) if err != nil { @@ -431,13 +444,15 @@ func (dl *downloader) Write(p []byte) (n int, err error) { // - we are quitting // - we get kicked // - timeout happens - if dl.offset >= dl.maxOffset { +loop: + for dl.offset >= dl.maxOffset { var timeout = time.NewTimer(maxDownloaderIdleTime) dl.mu.Unlock() select { case <-dl.quit: dl.mu.Lock() timeout.Stop() + break loop case <-dl.kick: dl.mu.Lock() timeout.Stop() @@ -448,6 +463,7 @@ func (dl *downloader) Write(p []byte) (n int, err error) { fs.Debugf(dl.dls.src, "vfs cache: stopping download thread as it timed out") dl._stop() } + break loop } } @@ -589,17 +605,18 @@ func (dl *downloader) download() (n int64, err error) { // setRange makes sure the downloader is downloading the range passed in func (dl *downloader) setRange(r ranges.Range) { + // defer log.Trace(dl.dls.src, "r=%v", r)("") dl.mu.Lock() maxOffset := r.End() if maxOffset > dl.maxOffset { dl.maxOffset = maxOffset - // fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset) - select { - case dl.kick <- struct{}{}: - default: - } } dl.mu.Unlock() + // fs.Debugf(dl.dls.src, "kicking downloader with maxOffset %d", maxOffset) + select { + case dl.kick <- struct{}{}: + default: + } } // get the current range this downloader is working on