diff --git a/fs/buffer.go b/fs/buffer.go index 26cd1770c..9ba3fb3cf 100644 --- a/fs/buffer.go +++ b/fs/buffer.go @@ -7,7 +7,10 @@ import ( "github.com/pkg/errors" ) -const asyncBufferSize = 128 * 1024 +const ( + asyncBufferSize = 1024 * 1024 + softStartInitial = 4 * 1024 +) var asyncBufferPool = sync.Pool{ New: func() interface{} { return newBuffer() }, @@ -26,6 +29,7 @@ type asyncReader struct { err error // If an error has occurred it is here cur *buffer // Current buffer being served exited chan struct{} // Channel is closed been the async reader shuts down + size int // size of buffer to use } // newAsyncReader returns a reader that will asynchronously read from @@ -54,6 +58,7 @@ func (a *asyncReader) init(rd io.ReadCloser, buffers int) { a.exited = make(chan struct{}, 0) a.buffers = buffers a.cur = nil + a.size = softStartInitial // Create tokens for i := 0; i < buffers; i++ { @@ -69,6 +74,10 @@ func (a *asyncReader) init(rd io.ReadCloser, buffers int) { select { case <-a.token: b := a.getBuffer() + if a.size < asyncBufferSize { + b.buf = b.buf[:a.size] + a.size <<= 1 + } err := b.read(a.in) a.ready <- b if err != nil { @@ -89,7 +98,8 @@ func (a *asyncReader) putBuffer(b *buffer) { // get a buffer from the pool func (a *asyncReader) getBuffer() *buffer { - return asyncBufferPool.Get().(*buffer) + b := asyncBufferPool.Get().(*buffer) + return b } // Read will return the next available data.