lib/pool: a buffer recycling library which can be optionally be used with mmap

This commit is contained in:
Nick Craig-Wood 2019-01-30 15:11:01 +00:00
parent f0696dfe30
commit bed2971bf0
2 changed files with 210 additions and 0 deletions

118
lib/pool/pool.go Normal file
View file

@ -0,0 +1,118 @@
// Package pool implements a memory pool similar in concept to
// sync.Pool but with more determinism.
package pool
import (
"fmt"
"log"
"sync/atomic"
"time"
"github.com/ncw/rclone/lib/mmap"
)
// Pool of internal buffers
type Pool struct {
cache chan []byte
bufferSize int
timer *time.Timer
inUse int32
flushTime time.Duration
alloc func(int) ([]byte, error)
free func([]byte) error
}
// New makes a buffer pool
//
// flushTime is the interval the buffer pools is flushed
// bufferSize is the size of the allocations
// poolSize is the maximum number of free buffers in the pool
// useMmap should be set to use mmap allocations
func New(flushTime time.Duration, bufferSize, poolSize int, useMmap bool) *Pool {
bp := &Pool{
cache: make(chan []byte, poolSize),
flushTime: flushTime,
bufferSize: bufferSize,
}
if useMmap {
bp.alloc = mmap.Alloc
bp.free = mmap.Free
} else {
bp.alloc = func(size int) ([]byte, error) {
return make([]byte, size), nil
}
bp.free = func([]byte) error {
return nil
}
}
bp.timer = time.AfterFunc(flushTime, bp.Flush)
return bp
}
// Flush the entire buffer pool
func (bp *Pool) Flush() {
for {
select {
case b := <-bp.cache:
bp.freeBuffer(b)
default:
return
}
}
}
// InUse returns the approximate number of buffers in use which
// haven't been returned to the pool.
func (bp *Pool) InUse() int {
return int(atomic.LoadInt32(&bp.inUse))
}
// starts or resets the buffer flusher timer
func (bp *Pool) kickFlusher() {
bp.timer.Reset(bp.flushTime)
}
// Get a buffer from the pool or allocate one
func (bp *Pool) Get() []byte {
select {
case b := <-bp.cache:
return b
default:
}
mem, err := bp.alloc(bp.bufferSize)
if err != nil {
log.Printf("Failed to get memory for buffer, waiting for a freed one: %v", err)
return <-bp.cache
}
atomic.AddInt32(&bp.inUse, 1)
return mem
}
// freeBuffer returns mem to the os if required
func (bp *Pool) freeBuffer(mem []byte) {
err := bp.free(mem)
if err != nil {
log.Printf("Failed to free memory: %v", err)
} else {
atomic.AddInt32(&bp.inUse, -1)
}
}
// Put returns the buffer to the buffer cache or frees it
//
// Note that if you try to return a buffer of the wrong size to Put it
// will panic.
func (bp *Pool) Put(mem []byte) {
mem = mem[0:cap(mem)]
if len(mem) != bp.bufferSize {
panic(fmt.Sprintf("Returning buffer sized %d but expecting %d", len(mem), bp.bufferSize))
}
select {
case bp.cache <- mem:
bp.kickFlusher()
return
default:
}
bp.freeBuffer(mem)
mem = nil
}

92
lib/pool/pool_test.go Normal file
View file

@ -0,0 +1,92 @@
package pool
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func testGetPut(t *testing.T, useMmap bool) {
bp := New(60*time.Second, 4096, 2, useMmap)
assert.Equal(t, 0, bp.InUse())
b1 := bp.Get()
assert.Equal(t, 1, bp.InUse())
b2 := bp.Get()
assert.Equal(t, 2, bp.InUse())
b3 := bp.Get()
assert.Equal(t, 3, bp.InUse())
bp.Put(b1)
assert.Equal(t, 3, bp.InUse())
bp.Put(b2)
assert.Equal(t, 3, bp.InUse())
bp.Put(b3)
assert.Equal(t, 2, bp.InUse())
b1a := bp.Get()
assert.Equal(t, b1, b1a)
assert.Equal(t, 2, bp.InUse())
b2a := bp.Get()
assert.Equal(t, b1, b2a)
assert.Equal(t, 2, bp.InUse())
bp.Put(b1a)
bp.Put(b2a)
assert.Equal(t, 2, bp.InUse())
bp.Flush()
assert.Equal(t, 0, bp.InUse())
}
func testFlusher(t *testing.T, useMmap bool) {
bp := New(50*time.Millisecond, 4096, 2, useMmap)
b1 := bp.Get()
b2 := bp.Get()
b3 := bp.Get()
bp.Put(b1)
bp.Put(b2)
bp.Put(b3)
assert.Equal(t, 2, bp.InUse())
checkFlushHasHappened := func() {
var n int
for i := 0; i < 10; i++ {
time.Sleep(100 * time.Millisecond)
n = bp.InUse()
if n == 0 {
break
}
}
assert.Equal(t, 0, n)
}
checkFlushHasHappened()
b1 = bp.Get()
bp.Put(b1)
assert.Equal(t, 1, bp.InUse())
checkFlushHasHappened()
}
func TestPool(t *testing.T) {
for _, useMmap := range []bool{false, true} {
name := "make"
if useMmap {
name = "mmap"
}
t.Run(name, func(t *testing.T) {
t.Run("GetPut", func(t *testing.T) { testGetPut(t, useMmap) })
t.Run("Flusher", func(t *testing.T) { testFlusher(t, useMmap) })
})
}
}