lib/pool: only flush buffers if they are unused between flush intervals

This commit is contained in:
Nick Craig-Wood 2019-02-03 19:07:50 +00:00
parent 08c4854e00
commit da90069462
2 changed files with 260 additions and 70 deletions

View file

@ -5,21 +5,31 @@ package pool
import ( import (
"fmt" "fmt"
"log" "log"
"sync/atomic" "sync"
"time" "time"
"github.com/ncw/rclone/lib/mmap" "github.com/ncw/rclone/lib/mmap"
) )
// Pool of internal buffers // Pool of internal buffers
//
// We hold buffers in cache. Every time we Get or Put we update
// minFill which is the minimum len(cache) seen.
//
// Every flushTime we remove minFill buffers from the cache as they
// were not used in the previous flushTime interval.
type Pool struct { type Pool struct {
cache chan []byte mu sync.Mutex
bufferSize int cache [][]byte
timer *time.Timer minFill int // the minimum fill of the cache
inUse int32 bufferSize int
flushTime time.Duration poolSize int
alloc func(int) ([]byte, error) timer *time.Timer
free func([]byte) error inUse int
flushTime time.Duration
flushPending bool
alloc func(int) ([]byte, error)
free func([]byte) error
} }
// New makes a buffer pool // New makes a buffer pool
@ -30,7 +40,8 @@ type Pool struct {
// useMmap should be set to use mmap allocations // useMmap should be set to use mmap allocations
func New(flushTime time.Duration, bufferSize, poolSize int, useMmap bool) *Pool { func New(flushTime time.Duration, bufferSize, poolSize int, useMmap bool) *Pool {
bp := &Pool{ bp := &Pool{
cache: make(chan []byte, poolSize), cache: make([][]byte, 0, poolSize),
poolSize: poolSize,
flushTime: flushTime, flushTime: flushTime,
bufferSize: bufferSize, bufferSize: bufferSize,
} }
@ -45,47 +56,113 @@ func New(flushTime time.Duration, bufferSize, poolSize int, useMmap bool) *Pool
return nil return nil
} }
} }
bp.timer = time.AfterFunc(flushTime, bp.Flush) bp.timer = time.AfterFunc(flushTime, bp.flushAged)
return bp return bp
} }
// get gets the last buffer in bp.cache
//
// Call with mu held
func (bp *Pool) get() []byte {
n := len(bp.cache) - 1
buf := bp.cache[n]
bp.cache[n] = nil // clear buffer pointer from bp.cache
bp.cache = bp.cache[:n]
return buf
}
// put puts the buffer on the end of bp.cache
//
// Call with mu held
func (bp *Pool) put(buf []byte) {
bp.cache = append(bp.cache, buf)
}
// flush n entries from the entire buffer pool
// Call with mu held
func (bp *Pool) flush(n int) {
for i := 0; i < n; i++ {
_ = bp.get()
}
bp.minFill = len(bp.cache)
}
// Flush the entire buffer pool // Flush the entire buffer pool
func (bp *Pool) Flush() { func (bp *Pool) Flush() {
for { bp.mu.Lock()
select { bp.flush(len(bp.cache))
case b := <-bp.cache: bp.mu.Unlock()
bp.freeBuffer(b) }
default:
return // Remove bp.minFill buffers
} func (bp *Pool) flushAged() {
bp.mu.Lock()
bp.flushPending = false
bp.flush(bp.minFill)
// If there are still items in the cache, schedule another flush
if len(bp.cache) != 0 {
bp.kickFlusher()
} }
bp.mu.Unlock()
} }
// InUse returns the approximate number of buffers in use which // InUse returns the number of buffers in use which haven't been
// haven't been returned to the pool. // returned to the pool
func (bp *Pool) InUse() int { func (bp *Pool) InUse() int {
return int(atomic.LoadInt32(&bp.inUse)) bp.mu.Lock()
defer bp.mu.Unlock()
return bp.inUse
} }
// starts or resets the buffer flusher timer // InPool returns the number of buffers in the pool
func (bp *Pool) InPool() int {
bp.mu.Lock()
defer bp.mu.Unlock()
return len(bp.cache)
}
// starts or resets the buffer flusher timer - call with mu held
func (bp *Pool) kickFlusher() { func (bp *Pool) kickFlusher() {
if bp.flushPending {
return
}
bp.flushPending = true
bp.timer.Reset(bp.flushTime) bp.timer.Reset(bp.flushTime)
} }
// Make sure minFill is correct - call with mu held
func (bp *Pool) updateMinFill() {
if len(bp.cache) < bp.minFill {
bp.minFill = len(bp.cache)
}
}
// Get a buffer from the pool or allocate one // Get a buffer from the pool or allocate one
func (bp *Pool) Get() []byte { func (bp *Pool) Get() []byte {
select { bp.mu.Lock()
case b := <-bp.cache: var buf []byte
return b waitTime := time.Millisecond
default: for {
if len(bp.cache) > 0 {
buf = bp.get()
break
} else {
var err error
buf, err = bp.alloc(bp.bufferSize)
if err == nil {
break
}
log.Printf("Failed to get memory for buffer, waiting for %v: %v", waitTime, err)
bp.mu.Unlock()
time.Sleep(waitTime)
bp.mu.Lock()
waitTime *= 2
}
} }
mem, err := bp.alloc(bp.bufferSize) bp.inUse++
if err != nil { bp.updateMinFill()
log.Printf("Failed to get memory for buffer, waiting for a freed one: %v", err) bp.mu.Unlock()
return <-bp.cache return buf
}
atomic.AddInt32(&bp.inUse, 1)
return mem
} }
// freeBuffer returns mem to the os if required // freeBuffer returns mem to the os if required
@ -93,8 +170,6 @@ func (bp *Pool) freeBuffer(mem []byte) {
err := bp.free(mem) err := bp.free(mem)
if err != nil { if err != nil {
log.Printf("Failed to free memory: %v", err) log.Printf("Failed to free memory: %v", err)
} else {
atomic.AddInt32(&bp.inUse, -1)
} }
} }
@ -102,17 +177,19 @@ func (bp *Pool) freeBuffer(mem []byte) {
// //
// Note that if you try to return a buffer of the wrong size to Put it // Note that if you try to return a buffer of the wrong size to Put it
// will panic. // will panic.
func (bp *Pool) Put(mem []byte) { func (bp *Pool) Put(buf []byte) {
mem = mem[0:cap(mem)] bp.mu.Lock()
if len(mem) != bp.bufferSize { defer bp.mu.Unlock()
panic(fmt.Sprintf("Returning buffer sized %d but expecting %d", len(mem), bp.bufferSize)) buf = buf[0:cap(buf)]
if len(buf) != bp.bufferSize {
panic(fmt.Sprintf("Returning buffer sized %d but expecting %d", len(buf), bp.bufferSize))
} }
select { if len(bp.cache) < bp.poolSize {
case bp.cache <- mem: bp.put(buf)
bp.kickFlusher() } else {
return bp.freeBuffer(buf)
default:
} }
bp.freeBuffer(mem) bp.inUse--
mem = nil bp.updateMinFill()
bp.kickFlusher()
} }

View file

@ -1,53 +1,95 @@
package pool package pool
import ( import (
"errors"
"fmt"
"math/rand"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func testGetPut(t *testing.T, useMmap bool) { // makes the allocations be unreliable
func makeUnreliable(bp *Pool) {
bp.alloc = func(size int) ([]byte, error) {
if rand.Intn(3) != 0 {
return nil, errors.New("failed to allocate memory")
}
return make([]byte, size), nil
}
bp.free = func(b []byte) error {
if rand.Intn(3) != 0 {
return errors.New("failed to free memory")
}
return nil
}
}
func testGetPut(t *testing.T, useMmap bool, unreliable bool) {
bp := New(60*time.Second, 4096, 2, useMmap) bp := New(60*time.Second, 4096, 2, useMmap)
if unreliable {
makeUnreliable(bp)
}
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
b1 := bp.Get() b1 := bp.Get()
assert.Equal(t, 1, bp.InUse()) assert.Equal(t, 1, bp.InUse())
assert.Equal(t, 0, bp.InPool())
b2 := bp.Get() b2 := bp.Get()
assert.Equal(t, 2, bp.InUse()) assert.Equal(t, 2, bp.InUse())
assert.Equal(t, 0, bp.InPool())
b3 := bp.Get() b3 := bp.Get()
assert.Equal(t, 3, bp.InUse()) assert.Equal(t, 3, bp.InUse())
assert.Equal(t, 0, bp.InPool())
bp.Put(b1) bp.Put(b1)
assert.Equal(t, 3, bp.InUse()) assert.Equal(t, 2, bp.InUse())
assert.Equal(t, 1, bp.InPool())
bp.Put(b2) bp.Put(b2)
assert.Equal(t, 3, bp.InUse()) assert.Equal(t, 1, bp.InUse())
assert.Equal(t, 2, bp.InPool())
bp.Put(b3) bp.Put(b3)
assert.Equal(t, 2, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool())
addr := func(b []byte) string {
return fmt.Sprintf("%p", &b[0])
}
b1a := bp.Get() b1a := bp.Get()
assert.Equal(t, b1, b1a) assert.Equal(t, addr(b2), addr(b1a))
assert.Equal(t, 2, bp.InUse()) assert.Equal(t, 1, bp.InUse())
assert.Equal(t, 1, bp.InPool())
b2a := bp.Get() b2a := bp.Get()
assert.Equal(t, b1, b2a) assert.Equal(t, addr(b1), addr(b2a))
assert.Equal(t, 2, bp.InUse()) assert.Equal(t, 2, bp.InUse())
assert.Equal(t, 0, bp.InPool())
bp.Put(b1a) bp.Put(b1a)
bp.Put(b2a) bp.Put(b2a)
assert.Equal(t, 2, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool())
assert.Panics(t, func() {
bp.Put(make([]byte, 1))
})
bp.Flush() bp.Flush()
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 0, bp.InPool())
} }
func testFlusher(t *testing.T, useMmap bool) { func testFlusher(t *testing.T, useMmap bool, unreliable bool) {
bp := New(50*time.Millisecond, 4096, 2, useMmap) bp := New(50*time.Millisecond, 4096, 2, useMmap)
if unreliable {
makeUnreliable(bp)
}
b1 := bp.Get() b1 := bp.Get()
b2 := bp.Get() b2 := bp.Get()
@ -55,38 +97,109 @@ func testFlusher(t *testing.T, useMmap bool) {
bp.Put(b1) bp.Put(b1)
bp.Put(b2) bp.Put(b2)
bp.Put(b3) bp.Put(b3)
assert.Equal(t, 2, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool())
bp.mu.Lock()
assert.Equal(t, 0, bp.minFill)
assert.Equal(t, true, bp.flushPending)
bp.mu.Unlock()
checkFlushHasHappened := func() { checkFlushHasHappened := func(desired int) {
var n int var n int
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
n = bp.InUse() n = bp.InPool()
if n == 0 { if n <= desired {
break break
} }
} }
assert.Equal(t, 0, n) assert.Equal(t, desired, n)
} }
checkFlushHasHappened() checkFlushHasHappened(0)
assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 0, bp.InPool())
bp.mu.Lock()
assert.Equal(t, 0, bp.minFill)
assert.Equal(t, false, bp.flushPending)
bp.mu.Unlock()
// Now do manual aging to check it is working properly
bp = New(100*time.Second, 4096, 2, useMmap)
// Check the new one doesn't get flushed
b1 = bp.Get() b1 = bp.Get()
b2 = bp.Get()
bp.Put(b1) bp.Put(b1)
assert.Equal(t, 1, bp.InUse()) bp.Put(b2)
checkFlushHasHappened() bp.mu.Lock()
assert.Equal(t, 0, bp.minFill)
assert.Equal(t, true, bp.flushPending)
bp.mu.Unlock()
bp.flushAged()
assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool())
bp.mu.Lock()
assert.Equal(t, 2, bp.minFill)
assert.Equal(t, true, bp.flushPending)
bp.mu.Unlock()
bp.Put(bp.Get())
assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool())
bp.mu.Lock()
assert.Equal(t, 1, bp.minFill)
assert.Equal(t, true, bp.flushPending)
bp.mu.Unlock()
bp.flushAged()
assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 1, bp.InPool())
bp.mu.Lock()
assert.Equal(t, 1, bp.minFill)
assert.Equal(t, true, bp.flushPending)
bp.mu.Unlock()
bp.flushAged()
assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 0, bp.InPool())
bp.mu.Lock()
assert.Equal(t, 0, bp.minFill)
assert.Equal(t, false, bp.flushPending)
bp.mu.Unlock()
} }
func TestPool(t *testing.T) { func TestPool(t *testing.T) {
for _, useMmap := range []bool{false, true} { for _, test := range []struct {
name := "make" name string
if useMmap { useMmap bool
name = "mmap" unreliable bool
} }{
t.Run(name, func(t *testing.T) { {
t.Run("GetPut", func(t *testing.T) { testGetPut(t, useMmap) }) name: "make",
t.Run("Flusher", func(t *testing.T) { testFlusher(t, useMmap) }) useMmap: false,
unreliable: false,
},
{
name: "mmap",
useMmap: true,
unreliable: false,
},
{
name: "canFail",
useMmap: false,
unreliable: true,
},
} {
t.Run(test.name, func(t *testing.T) {
t.Run("GetPut", func(t *testing.T) { testGetPut(t, test.useMmap, test.unreliable) })
t.Run("Flusher", func(t *testing.T) { testFlusher(t, test.useMmap, test.unreliable) })
}) })
} }
} }