lib/pool: fix memory leak by freeing buffers on flush

This commit is contained in:
Nick Craig-Wood 2019-02-06 17:20:54 +00:00
parent fd370fcad2
commit cfba337ef0
2 changed files with 28 additions and 2 deletions

View file

@ -26,6 +26,7 @@ type Pool struct {
poolSize int poolSize int
timer *time.Timer timer *time.Timer
inUse int inUse int
alloced int
flushTime time.Duration flushTime time.Duration
flushPending bool flushPending bool
alloc func(int) ([]byte, error) alloc func(int) ([]byte, error)
@ -82,7 +83,7 @@ func (bp *Pool) put(buf []byte) {
// Call with mu held // Call with mu held
func (bp *Pool) flush(n int) { func (bp *Pool) flush(n int) {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
_ = bp.get() bp.freeBuffer(bp.get())
} }
bp.minFill = len(bp.cache) bp.minFill = len(bp.cache)
} }
@ -121,6 +122,13 @@ func (bp *Pool) InPool() int {
return len(bp.cache) return len(bp.cache)
} }
// Alloced returns the number of buffers allocated and not yet freed
func (bp *Pool) Alloced() int {
bp.mu.Lock()
defer bp.mu.Unlock()
return bp.alloced
}
// starts or resets the buffer flusher timer - call with mu held // starts or resets the buffer flusher timer - call with mu held
func (bp *Pool) kickFlusher() { func (bp *Pool) kickFlusher() {
if bp.flushPending { if bp.flushPending {
@ -150,6 +158,7 @@ func (bp *Pool) Get() []byte {
var err error var err error
buf, err = bp.alloc(bp.bufferSize) buf, err = bp.alloc(bp.bufferSize)
if err == nil { if err == nil {
bp.alloced++
break break
} }
log.Printf("Failed to get memory for buffer, waiting for %v: %v", waitTime, err) log.Printf("Failed to get memory for buffer, waiting for %v: %v", waitTime, err)
@ -165,12 +174,13 @@ func (bp *Pool) Get() []byte {
return buf return buf
} }
// freeBuffer returns mem to the os if required // freeBuffer returns mem to the os if required - call with lock held
func (bp *Pool) freeBuffer(mem []byte) { func (bp *Pool) freeBuffer(mem []byte) {
err := bp.free(mem) err := bp.free(mem)
if err != nil { if err != nil {
log.Printf("Failed to free memory: %v", err) log.Printf("Failed to free memory: %v", err)
} }
bp.alloced--
} }
// Put returns the buffer to the buffer cache or frees it // Put returns the buffer to the buffer cache or frees it

View file

@ -37,26 +37,32 @@ func testGetPut(t *testing.T, useMmap bool, unreliable bool) {
b1 := bp.Get() b1 := bp.Get()
assert.Equal(t, 1, bp.InUse()) assert.Equal(t, 1, bp.InUse())
assert.Equal(t, 0, bp.InPool()) assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 1, bp.Alloced())
b2 := bp.Get() b2 := bp.Get()
assert.Equal(t, 2, bp.InUse()) assert.Equal(t, 2, bp.InUse())
assert.Equal(t, 0, bp.InPool()) assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
b3 := bp.Get() b3 := bp.Get()
assert.Equal(t, 3, bp.InUse()) assert.Equal(t, 3, bp.InUse())
assert.Equal(t, 0, bp.InPool()) assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 3, bp.Alloced())
bp.Put(b1) bp.Put(b1)
assert.Equal(t, 2, bp.InUse()) assert.Equal(t, 2, bp.InUse())
assert.Equal(t, 1, bp.InPool()) assert.Equal(t, 1, bp.InPool())
assert.Equal(t, 3, bp.Alloced())
bp.Put(b2) bp.Put(b2)
assert.Equal(t, 1, bp.InUse()) assert.Equal(t, 1, bp.InUse())
assert.Equal(t, 2, bp.InPool()) assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 3, bp.Alloced())
bp.Put(b3) bp.Put(b3)
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool()) assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
addr := func(b []byte) string { addr := func(b []byte) string {
return fmt.Sprintf("%p", &b[0]) return fmt.Sprintf("%p", &b[0])
@ -65,16 +71,19 @@ func testGetPut(t *testing.T, useMmap bool, unreliable bool) {
assert.Equal(t, addr(b2), addr(b1a)) assert.Equal(t, addr(b2), addr(b1a))
assert.Equal(t, 1, bp.InUse()) assert.Equal(t, 1, bp.InUse())
assert.Equal(t, 1, bp.InPool()) assert.Equal(t, 1, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
b2a := bp.Get() b2a := bp.Get()
assert.Equal(t, addr(b1), addr(b2a)) assert.Equal(t, addr(b1), addr(b2a))
assert.Equal(t, 2, bp.InUse()) assert.Equal(t, 2, bp.InUse())
assert.Equal(t, 0, bp.InPool()) assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
bp.Put(b1a) bp.Put(b1a)
bp.Put(b2a) bp.Put(b2a)
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool()) assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
assert.Panics(t, func() { assert.Panics(t, func() {
bp.Put(make([]byte, 1)) bp.Put(make([]byte, 1))
@ -83,6 +92,7 @@ func testGetPut(t *testing.T, useMmap bool, unreliable bool) {
bp.Flush() bp.Flush()
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 0, bp.InPool()) assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 0, bp.Alloced())
} }
func testFlusher(t *testing.T, useMmap bool, unreliable bool) { func testFlusher(t *testing.T, useMmap bool, unreliable bool) {
@ -99,6 +109,7 @@ func testFlusher(t *testing.T, useMmap bool, unreliable bool) {
bp.Put(b3) bp.Put(b3)
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool()) assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
bp.mu.Lock() bp.mu.Lock()
assert.Equal(t, 0, bp.minFill) assert.Equal(t, 0, bp.minFill)
assert.Equal(t, true, bp.flushPending) assert.Equal(t, true, bp.flushPending)
@ -119,6 +130,7 @@ func testFlusher(t *testing.T, useMmap bool, unreliable bool) {
checkFlushHasHappened(0) checkFlushHasHappened(0)
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 0, bp.InPool()) assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 0, bp.Alloced())
bp.mu.Lock() bp.mu.Lock()
assert.Equal(t, 0, bp.minFill) assert.Equal(t, 0, bp.minFill)
assert.Equal(t, false, bp.flushPending) assert.Equal(t, false, bp.flushPending)
@ -142,6 +154,7 @@ func testFlusher(t *testing.T, useMmap bool, unreliable bool) {
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool()) assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
bp.mu.Lock() bp.mu.Lock()
assert.Equal(t, 2, bp.minFill) assert.Equal(t, 2, bp.minFill)
assert.Equal(t, true, bp.flushPending) assert.Equal(t, true, bp.flushPending)
@ -151,6 +164,7 @@ func testFlusher(t *testing.T, useMmap bool, unreliable bool) {
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 2, bp.InPool()) assert.Equal(t, 2, bp.InPool())
assert.Equal(t, 2, bp.Alloced())
bp.mu.Lock() bp.mu.Lock()
assert.Equal(t, 1, bp.minFill) assert.Equal(t, 1, bp.minFill)
assert.Equal(t, true, bp.flushPending) assert.Equal(t, true, bp.flushPending)
@ -160,6 +174,7 @@ func testFlusher(t *testing.T, useMmap bool, unreliable bool) {
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 1, bp.InPool()) assert.Equal(t, 1, bp.InPool())
assert.Equal(t, 1, bp.Alloced())
bp.mu.Lock() bp.mu.Lock()
assert.Equal(t, 1, bp.minFill) assert.Equal(t, 1, bp.minFill)
assert.Equal(t, true, bp.flushPending) assert.Equal(t, true, bp.flushPending)
@ -169,6 +184,7 @@ func testFlusher(t *testing.T, useMmap bool, unreliable bool) {
assert.Equal(t, 0, bp.InUse()) assert.Equal(t, 0, bp.InUse())
assert.Equal(t, 0, bp.InPool()) assert.Equal(t, 0, bp.InPool())
assert.Equal(t, 0, bp.Alloced())
bp.mu.Lock() bp.mu.Lock()
assert.Equal(t, 0, bp.minFill) assert.Equal(t, 0, bp.minFill)
assert.Equal(t, false, bp.flushPending) assert.Equal(t, false, bp.flushPending)