forked from TrueCloudLab/rclone
fs/asyncreader: optionally user mmap for memory allocation with --use-mmap
#2200
This replaces the `sync.Pool` allocator with lib/pool. This implements a pool of buffers of up to 64MB which can be re-used but is flushed every 5 seconds. If `--use-mmap` is set then rclone will use mmap for memory allocations which is much better at returning memory to the OS.
This commit is contained in:
parent
bed2971bf0
commit
eb91356e28
4 changed files with 40 additions and 24 deletions
|
@ -5,21 +5,22 @@ package asyncreader
|
|||
import (
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ncw/rclone/fs"
|
||||
"github.com/ncw/rclone/lib/pool"
|
||||
"github.com/ncw/rclone/lib/readers"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
// BufferSize is the default size of the async buffer
|
||||
BufferSize = 1024 * 1024
|
||||
softStartInitial = 4 * 1024
|
||||
BufferSize = 1024 * 1024
|
||||
softStartInitial = 4 * 1024
|
||||
bufferCacheSize = 64 // max number of buffers to keep in cache
|
||||
bufferCacheFlushTime = 5 * time.Second // flush the cached buffers after this long
|
||||
)
|
||||
|
||||
var asyncBufferPool = sync.Pool{
|
||||
New: func() interface{} { return newBuffer() },
|
||||
}
|
||||
|
||||
var errorStreamAbandoned = errors.New("stream abandoned")
|
||||
|
||||
// AsyncReader will do async read-ahead from the input reader
|
||||
|
@ -98,16 +99,25 @@ func (a *AsyncReader) init(rd io.ReadCloser, buffers int) {
|
|||
}()
|
||||
}
|
||||
|
||||
// bufferPool is a global pool of buffers
|
||||
var bufferPool *pool.Pool
|
||||
var bufferPoolOnce sync.Once
|
||||
|
||||
// return the buffer to the pool (clearing it)
|
||||
func (a *AsyncReader) putBuffer(b *buffer) {
|
||||
b.clear()
|
||||
asyncBufferPool.Put(b)
|
||||
bufferPool.Put(b.buf)
|
||||
b.buf = nil
|
||||
}
|
||||
|
||||
// get a buffer from the pool
|
||||
func (a *AsyncReader) getBuffer() *buffer {
|
||||
b := asyncBufferPool.Get().(*buffer)
|
||||
return b
|
||||
bufferPoolOnce.Do(func() {
|
||||
// Initialise the buffer pool when used
|
||||
bufferPool = pool.New(bufferCacheFlushTime, BufferSize, bufferCacheSize, fs.Config.UseMmap)
|
||||
})
|
||||
return &buffer{
|
||||
buf: bufferPool.Get(),
|
||||
}
|
||||
}
|
||||
|
||||
// Read will return the next available data.
|
||||
|
@ -295,20 +305,6 @@ type buffer struct {
|
|||
offset int
|
||||
}
|
||||
|
||||
func newBuffer() *buffer {
|
||||
return &buffer{
|
||||
buf: make([]byte, BufferSize),
|
||||
err: nil,
|
||||
}
|
||||
}
|
||||
|
||||
// clear returns the buffer to its full size and clears the members
|
||||
func (b *buffer) clear() {
|
||||
b.buf = b.buf[:cap(b.buf)]
|
||||
b.err = nil
|
||||
b.offset = 0
|
||||
}
|
||||
|
||||
// isEmpty returns true is offset is at end of
|
||||
// buffer, or
|
||||
func (b *buffer) isEmpty() bool {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue