vfs: downloader: limit the reader to 10 errors before giving up
This commit is contained in:
parent
d273a9d82d
commit
042e5fe097
1 changed files with 48 additions and 10 deletions
|
@ -23,6 +23,8 @@ const (
|
|||
maxSkipBytes = 1024 * 1024
|
||||
// time between background kicks of waiters to pick up errors
|
||||
backgroundKickerInterval = 5 * time.Second
|
||||
// maximum number of errors before declaring dead
|
||||
maxErrorCount = 10
|
||||
)
|
||||
|
||||
// downloaders is a number of downloader~s and a queue of waiters
|
||||
|
@ -39,9 +41,11 @@ type downloaders struct {
|
|||
wg sync.WaitGroup
|
||||
|
||||
// Read write
|
||||
mu sync.Mutex
|
||||
dls []*downloader
|
||||
waiters []waiter
|
||||
mu sync.Mutex
|
||||
dls []*downloader
|
||||
waiters []waiter
|
||||
errorCount int // number of consecutive errors
|
||||
lastErr error // last error received
|
||||
}
|
||||
|
||||
// waiter is a range we are waiting for and a channel to signal when
|
||||
|
@ -104,6 +108,36 @@ func newDownloaders(item *Item, fcache fs.Fs, remote string, src fs.Object) (dls
|
|||
return dls
|
||||
}
|
||||
|
||||
// Accumulate errors for this downloader
|
||||
//
|
||||
// It should be called with
|
||||
//
|
||||
// n bytes downloaded
|
||||
// err is error from download
|
||||
//
|
||||
// call with lock held
|
||||
func (dls *downloaders) _countErrors(n int64, err error) {
|
||||
if err == nil && n != 0 {
|
||||
if dls.errorCount != 0 {
|
||||
fs.Infof(dls.src, "Resetting error count to 0")
|
||||
dls.errorCount = 0
|
||||
dls.lastErr = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
dls.errorCount++
|
||||
dls.lastErr = err
|
||||
fs.Infof(dls.src, "Error count now %d: %v", dls.errorCount, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (dls *downloaders) countErrors(n int64, err error) {
|
||||
dls.mu.Lock()
|
||||
dls._countErrors(n, err)
|
||||
dls.mu.Unlock()
|
||||
}
|
||||
|
||||
// Make a new downloader, starting it to download r
|
||||
//
|
||||
// call with lock held
|
||||
|
@ -130,8 +164,9 @@ func (dls *downloaders) _newDownloader(r ranges.Range) (dl *downloader, err erro
|
|||
dl.wg.Add(1)
|
||||
go func() {
|
||||
defer dl.wg.Done()
|
||||
err := dl.download()
|
||||
n, err := dl.download()
|
||||
_ = dl.close(err)
|
||||
dl.dls.countErrors(n, err)
|
||||
if err != nil {
|
||||
fs.Errorf(dl.dls.src, "Failed to download: %v", err)
|
||||
}
|
||||
|
@ -271,6 +306,7 @@ func (dls *downloaders) _ensureDownloader(r ranges.Range) (err error) {
|
|||
// Downloader not found so start a new one
|
||||
dl, err = dls._newDownloader(r)
|
||||
if err != nil {
|
||||
dls._countErrors(0, err)
|
||||
return errors.Wrap(err, "failed to start downloader")
|
||||
}
|
||||
return err
|
||||
|
@ -328,8 +364,10 @@ func (dls *downloaders) kickWaiters() (err error) {
|
|||
}
|
||||
}
|
||||
|
||||
if true {
|
||||
|
||||
if dls.errorCount > maxErrorCount {
|
||||
fs.Errorf(dls.src, "Too many errors %d/%d: last error: %v", dls.errorCount, maxErrorCount, dls.lastErr)
|
||||
dls._closeWaiters(dls.lastErr)
|
||||
return dls.lastErr
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -504,13 +542,13 @@ func (dl *downloader) stopAndClose(inErr error) (err error) {
|
|||
}
|
||||
|
||||
// Start downloading to the local file starting at offset until maxOffset.
|
||||
func (dl *downloader) download() (err error) {
|
||||
func (dl *downloader) download() (n int64, err error) {
|
||||
defer log.Trace(dl.dls.src, "")("err=%v", &err)
|
||||
_, err = dl.in.WriteTo(dl)
|
||||
n, err = dl.in.WriteTo(dl)
|
||||
if err != nil && errors.Cause(err) != asyncreader.ErrorStreamAbandoned {
|
||||
return errors.Wrap(err, "vfs reader: failed to write to cache file")
|
||||
return n, errors.Wrap(err, "vfs reader: failed to write to cache file")
|
||||
}
|
||||
return nil
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// setRange makes sure the downloader is downloading the range passed in
|
||||
|
|
Loading…
Reference in a new issue