diff --git a/fs/accounting.go b/fs/accounting.go index 40926f72f..f9cd600ea 100644 --- a/fs/accounting.go +++ b/fs/accounting.go @@ -351,13 +351,13 @@ func NewAccountSizeNameWithBuffer(in io.ReadCloser, size int64, name string) *Ac const bufSize = 1024 * 1024 var buffers int if size >= int64(Config.BufferSize) { - buffers = int(int64(Config.BufferSize) / bufSize) + buffers = int(int64(Config.BufferSize) / asyncBufferSize) } else { - buffers = int(size / bufSize) + buffers = int(size / asyncBufferSize) } // On big files add a buffer if buffers > 0 { - newIn, err := newAsyncReader(in, buffers, bufSize) + newIn, err := newAsyncReader(in, buffers) if err != nil { Errorf(name, "Failed to make buffer: %v", err) } else { diff --git a/fs/buffer.go b/fs/buffer.go index 8a7a51431..754e79d7b 100644 --- a/fs/buffer.go +++ b/fs/buffer.go @@ -2,10 +2,17 @@ package fs import ( "io" + "sync" "github.com/pkg/errors" ) +const asyncBufferSize = 128 * 1024 + +var asyncBufferPool = sync.Pool{ + New: func() interface{} { return newBuffer() }, +} + // asyncReader will do async read-ahead from the input reader // and make the data available as an io.Reader. // This should be fully transparent, except that once an error @@ -13,25 +20,21 @@ import ( type asyncReader struct { in io.ReadCloser // Input reader ready chan *buffer // Buffers ready to be handed to the reader - reuse chan *buffer // Buffers to reuse for input reading + token chan struct{} // Tokens which allow a buffer to be taken exit chan struct{} // Closes when finished buffers int // Number of buffers err error // If an error has occurred it is here cur *buffer // Current buffer being served exited chan struct{} // Channel is closed been the async reader shuts down - closed bool // Has the parent reader been closed? } // newAsyncReader returns a reader that will asynchronously read from -// the supplied Reader into a number of buffers each with a given size. +// the supplied Reader into a number of buffers each of size asyncBufferSize // It will start reading from the input at once, maybe even before this // function has returned. // The input can be read from the returned reader. // When done use Close to release the buffers and close the supplied input. -func newAsyncReader(rd io.ReadCloser, buffers, size int) (io.ReadCloser, error) { - if size <= 0 { - return nil, errors.New("buffer size too small") - } +func newAsyncReader(rd io.ReadCloser, buffers int) (io.ReadCloser, error) { if buffers <= 0 { return nil, errors.New("number of buffers too small") } @@ -39,35 +42,36 @@ func newAsyncReader(rd io.ReadCloser, buffers, size int) (io.ReadCloser, error) return nil, errors.New("nil reader supplied") } a := &asyncReader{} - a.init(rd, buffers, size) + a.init(rd, buffers) return a, nil } -func (a *asyncReader) init(rd io.ReadCloser, buffers, size int) { +func (a *asyncReader) init(rd io.ReadCloser, buffers int) { a.in = rd a.ready = make(chan *buffer, buffers) - a.reuse = make(chan *buffer, buffers) + a.token = make(chan struct{}, buffers) a.exit = make(chan struct{}, 0) a.exited = make(chan struct{}, 0) a.buffers = buffers a.cur = nil - // Create buffers + // Create tokens for i := 0; i < buffers; i++ { - a.reuse <- newBuffer(size) + a.token <- struct{}{} } // Start async reader go func() { // Ensure that when we exit this is signalled. defer close(a.exited) + defer close(a.ready) for { select { - case b := <-a.reuse: + case <-a.token: + b := a.getBuffer() err := b.read(a.in) a.ready <- b if err != nil { - close(a.ready) return } case <-a.exit: @@ -77,11 +81,23 @@ func (a *asyncReader) init(rd io.ReadCloser, buffers, size int) { }() } +// return the buffer to the pool (clearing it) +func (a *asyncReader) putBuffer(b *buffer) { + b.clear() + asyncBufferPool.Put(b) +} + +// get a buffer from the pool +func (a *asyncReader) getBuffer() *buffer { + return asyncBufferPool.Get().(*buffer) +} + // Read will return the next available data. func (a *asyncReader) fill() (err error) { if a.cur.isEmpty() { if a.cur != nil { - a.reuse <- a.cur + a.putBuffer(a.cur) + a.token <- struct{}{} a.cur = nil } b, ok := <-a.ready @@ -139,17 +155,23 @@ func (a *asyncReader) WriteTo(w io.Writer) (n int64, err error) { // Close will ensure that the underlying async reader is shut down. // It will also close the input supplied on newAsyncReader. func (a *asyncReader) Close() (err error) { + // Return if already closed select { - case <-a.exited: + case <-a.exit: + return default: - close(a.exit) - <-a.exited } - if !a.closed { - a.closed = true - return a.in.Close() + // Close and wait for go routine + close(a.exit) + <-a.exited + // Return any outstanding buffers to the Pool + if a.cur != nil { + a.putBuffer(a.cur) } - return nil + for b := range a.ready { + a.putBuffer(b) + } + return a.in.Close() } // Internal buffer @@ -159,11 +181,20 @@ type buffer struct { buf []byte err error offset int - size int } -func newBuffer(size int) *buffer { - return &buffer{buf: make([]byte, size), err: nil, size: size} +func newBuffer() *buffer { + return &buffer{ + buf: make([]byte, asyncBufferSize), + err: nil, + } +} + +// clear returns the buffer to its full size and clears the members +func (b *buffer) clear() { + b.buf = b.buf[:cap(b.buf)] + b.err = nil + b.offset = 0 } // isEmpty returns true is offset is at end of @@ -183,7 +214,7 @@ func (b *buffer) isEmpty() bool { // Any error encountered during the read is returned. func (b *buffer) read(rd io.Reader) error { var n int - n, b.err = ReadFill(rd, b.buf[0:b.size]) + n, b.err = ReadFill(rd, b.buf) b.buf = b.buf[0:n] b.offset = 0 return b.err diff --git a/fs/buffer_test.go b/fs/buffer_test.go index 466e7d6be..278f57678 100644 --- a/fs/buffer_test.go +++ b/fs/buffer_test.go @@ -15,7 +15,7 @@ import ( func TestAsyncReader(t *testing.T) { buf := ioutil.NopCloser(bytes.NewBufferString("Testbuffer")) - ar, err := newAsyncReader(buf, 4, 10000) + ar, err := newAsyncReader(buf, 4) require.NoError(t, err) var dst = make([]byte, 100) @@ -40,7 +40,7 @@ func TestAsyncReader(t *testing.T) { // Test Close without reading everything buf = ioutil.NopCloser(bytes.NewBuffer(make([]byte, 50000))) - ar, err = newAsyncReader(buf, 4, 100) + ar, err = newAsyncReader(buf, 4) require.NoError(t, err) err = ar.Close() require.NoError(t, err) @@ -49,7 +49,7 @@ func TestAsyncReader(t *testing.T) { func TestAsyncWriteTo(t *testing.T) { buf := ioutil.NopCloser(bytes.NewBufferString("Testbuffer")) - ar, err := newAsyncReader(buf, 4, 10000) + ar, err := newAsyncReader(buf, 4) require.NoError(t, err) var dst = &bytes.Buffer{} @@ -68,20 +68,14 @@ func TestAsyncWriteTo(t *testing.T) { func TestAsyncReaderErrors(t *testing.T) { // test nil reader - _, err := newAsyncReader(nil, 4, 10000) + _, err := newAsyncReader(nil, 4) require.Error(t, err) // invalid buffer number buf := ioutil.NopCloser(bytes.NewBufferString("Testbuffer")) - _, err = newAsyncReader(buf, 0, 10000) + _, err = newAsyncReader(buf, 0) require.Error(t, err) - _, err = newAsyncReader(buf, -1, 10000) - require.Error(t, err) - - // invalid buffer size - _, err = newAsyncReader(buf, 4, 0) - require.Error(t, err) - _, err = newAsyncReader(buf, 4, -1) + _, err = newAsyncReader(buf, -1) require.Error(t, err) } @@ -161,7 +155,7 @@ func TestAsyncReaderSizes(t *testing.T) { bufsize := bufsizes[k] read := readmaker.fn(strings.NewReader(text)) buf := bufio.NewReaderSize(read, bufsize) - ar, _ := newAsyncReader(ioutil.NopCloser(buf), l, 100) + ar, _ := newAsyncReader(ioutil.NopCloser(buf), l) s := bufreader.fn(ar) // "timeout" expects the Reader to recover, asyncReader does not. if s != text && readmaker.name != "timeout" { @@ -200,7 +194,7 @@ func TestAsyncReaderWriteTo(t *testing.T) { bufsize := bufsizes[k] read := readmaker.fn(strings.NewReader(text)) buf := bufio.NewReaderSize(read, bufsize) - ar, _ := newAsyncReader(ioutil.NopCloser(buf), l, 100) + ar, _ := newAsyncReader(ioutil.NopCloser(buf), l) dst := &bytes.Buffer{} wt := ar.(io.WriterTo) _, err := wt.WriteTo(dst)