Prevent double closes on async buffer

This commit is contained in:
Nick Craig-Wood 2017-02-17 08:37:53 +00:00
parent 928be0f1fd
commit ac62ef430d

View file

@ -30,6 +30,7 @@ type asyncReader struct {
cur *buffer // Current buffer being served cur *buffer // Current buffer being served
exited chan struct{} // Channel is closed been the async reader shuts down exited chan struct{} // Channel is closed been the async reader shuts down
size int // size of buffer to use size int // size of buffer to use
closed bool // whether we have closed the underlying stream
} }
// newAsyncReader returns a reader that will asynchronously read from // newAsyncReader returns a reader that will asynchronously read from
@ -162,13 +163,12 @@ func (a *asyncReader) WriteTo(w io.Writer) (n int64, err error) {
} }
} }
// close will ensure that the underlying async reader is shut down. // Abandon will ensure that the underlying async reader is shut down.
// If closeIn is set it will also close the input supplied on // It will NOT close the input supplied on newAsyncReader.
// newAsyncReader. func (a *asyncReader) Abandon() {
func (a *asyncReader) close(closeIn bool) (err error) {
// Return if already closed
select { select {
case <-a.exit: case <-a.exit:
// Do nothing if reader routine already exited
return return
default: default:
} }
@ -178,26 +178,22 @@ func (a *asyncReader) close(closeIn bool) (err error) {
// Return any outstanding buffers to the Pool // Return any outstanding buffers to the Pool
if a.cur != nil { if a.cur != nil {
a.putBuffer(a.cur) a.putBuffer(a.cur)
a.cur = nil
} }
for b := range a.ready { for b := range a.ready {
a.putBuffer(b) a.putBuffer(b)
} }
if closeIn {
return a.in.Close()
}
return nil
} }
// Close will ensure that the underlying async reader is shut down. // Close will ensure that the underlying async reader is shut down.
// It will also close the input supplied on newAsyncReader. // It will also close the input supplied on newAsyncReader.
func (a *asyncReader) Close() (err error) { func (a *asyncReader) Close() (err error) {
return a.close(true) a.Abandon()
} if a.closed {
return nil
// Abandon will ensure that the underlying async reader is shut down. }
// It will NOT close the input supplied on newAsyncReader. a.closed = true
func (a *asyncReader) Abandon() { return a.in.Close()
_ = a.close(false)
} }
// Internal buffer // Internal buffer