diff --git a/fs/buffer.go b/fs/buffer.go index 9ba3fb3cf..436b42383 100644 --- a/fs/buffer.go +++ b/fs/buffer.go @@ -30,6 +30,7 @@ type asyncReader struct { cur *buffer // Current buffer being served exited chan struct{} // Channel is closed been the async reader shuts down size int // size of buffer to use + closed bool // whether we have closed the underlying stream } // newAsyncReader returns a reader that will asynchronously read from @@ -162,13 +163,12 @@ func (a *asyncReader) WriteTo(w io.Writer) (n int64, err error) { } } -// close will ensure that the underlying async reader is shut down. -// If closeIn is set it will also close the input supplied on -// newAsyncReader. -func (a *asyncReader) close(closeIn bool) (err error) { - // Return if already closed +// Abandon will ensure that the underlying async reader is shut down. +// It will NOT close the input supplied on newAsyncReader. +func (a *asyncReader) Abandon() { select { case <-a.exit: + // Do nothing if reader routine already exited return default: } @@ -178,26 +178,22 @@ func (a *asyncReader) close(closeIn bool) (err error) { // Return any outstanding buffers to the Pool if a.cur != nil { a.putBuffer(a.cur) + a.cur = nil } for b := range a.ready { a.putBuffer(b) } - if closeIn { - return a.in.Close() - } - return nil } // Close will ensure that the underlying async reader is shut down. // It will also close the input supplied on newAsyncReader. func (a *asyncReader) Close() (err error) { - return a.close(true) -} - -// Abandon will ensure that the underlying async reader is shut down. -// It will NOT close the input supplied on newAsyncReader. -func (a *asyncReader) Abandon() { - _ = a.close(false) + a.Abandon() + if a.closed { + return nil + } + a.closed = true + return a.in.Close() } // Internal buffer