archiver: Improve buffer pool

This commit is contained in:
Alexander Neumann 2018-04-29 15:34:41 +02:00
parent 39ac12f6ea
commit 78bd591c7c
4 changed files with 28 additions and 29 deletions

View file

@ -172,7 +172,7 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID,
// adds a newline after each object) // adds a newline after each object)
buf = append(buf, '\n') buf = append(buf, '\n')
b := Buffer{Data: buf} b := &Buffer{Data: buf}
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
if res.Err() != nil { if res.Err() != nil {
return restic.ID{}, s, res.Err() return restic.ID{}, s, res.Err()

View file

@ -45,7 +45,7 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
// Save stores a blob in the repo. It checks the index and the known blobs // Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. The second return parameter is true if the blob was // before saving anything. The second return parameter is true if the blob was
// previously unknown. // previously unknown.
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf Buffer) FutureBlob { func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan saveBlobResponse, 1) ch := make(chan saveBlobResponse, 1)
s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch} s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}
@ -91,7 +91,7 @@ func (s *FutureBlob) Length() int {
type saveBlobJob struct { type saveBlobJob struct {
restic.BlobType restic.BlobType
buf Buffer buf *Buffer
ch chan<- saveBlobResponse ch chan<- saveBlobResponse
} }

View file

@ -9,19 +9,19 @@ import (
// be called so the underlying slice is put back into the pool. // be called so the underlying slice is put back into the pool.
type Buffer struct { type Buffer struct {
Data []byte Data []byte
Put func([]byte) Put func(*Buffer)
} }
// Release puts the buffer back into the pool it came from. // Release puts the buffer back into the pool it came from.
func (b Buffer) Release() { func (b *Buffer) Release() {
if b.Put != nil { if b.Put != nil {
b.Put(b.Data) b.Put(b)
} }
} }
// BufferPool implements a limited set of reusable buffers. // BufferPool implements a limited set of reusable buffers.
type BufferPool struct { type BufferPool struct {
ch chan []byte ch chan *Buffer
chM sync.Mutex chM sync.Mutex
defaultSize int defaultSize int
clearOnce sync.Once clearOnce sync.Once
@ -33,7 +33,7 @@ type BufferPool struct {
// back. // back.
func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool { func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool {
b := &BufferPool{ b := &BufferPool{
ch: make(chan []byte, max), ch: make(chan *Buffer, max),
defaultSize: defaultSize, defaultSize: defaultSize,
} }
go func() { go func() {
@ -44,22 +44,29 @@ func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool {
} }
// Get returns a new buffer, either from the pool or newly allocated. // Get returns a new buffer, either from the pool or newly allocated.
func (pool *BufferPool) Get() Buffer { func (pool *BufferPool) Get() *Buffer {
b := Buffer{Put: pool.put}
pool.chM.Lock() pool.chM.Lock()
defer pool.chM.Unlock() defer pool.chM.Unlock()
select { select {
case buf := <-pool.ch: case buf := <-pool.ch:
b.Data = buf return buf
default: default:
b.Data = make([]byte, pool.defaultSize) }
b := &Buffer{
Put: pool.Put,
Data: make([]byte, pool.defaultSize),
} }
return b return b
} }
func (pool *BufferPool) put(b []byte) { // Put returns a buffer to the pool for reuse.
func (pool *BufferPool) Put(b *Buffer) {
if cap(b.Data) > pool.defaultSize {
return
}
pool.chM.Lock() pool.chM.Lock()
defer pool.chM.Unlock() defer pool.chM.Unlock()
select { select {
@ -68,14 +75,6 @@ func (pool *BufferPool) put(b []byte) {
} }
} }
// Put returns a buffer to the pool for reuse.
func (pool *BufferPool) Put(b Buffer) {
if cap(b.Data) > pool.defaultSize {
return
}
pool.put(b.Data)
}
// clear empties the buffer so that all items can be garbage collected. // clear empties the buffer so that all items can be garbage collected.
func (pool *BufferPool) clear() { func (pool *BufferPool) clear() {
pool.clearOnce.Do(func() { pool.clearOnce.Do(func() {

View file

@ -61,20 +61,19 @@ type FileSaver struct {
NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error) NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error)
} }
// NewFileSaver returns a new file saver. A worker pool with workers is // NewFileSaver returns a new file saver. A worker pool with fileWorkers is
// started, it is stopped when ctx is cancelled. // started, it is stopped when ctx is cancelled.
func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
ch := make(chan saveFileJob, fileWorkers) ch := make(chan saveFileJob)
poolSize := fileWorkers debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
if blobWorkers > fileWorkers {
poolSize = blobWorkers poolSize := fileWorkers + blobWorkers
}
s := &FileSaver{ s := &FileSaver{
fs: fs, fs: fs,
blobSaver: blobSaver, blobSaver: blobSaver,
saveFilePool: NewBufferPool(ctx, int(poolSize)*3/2, chunker.MaxSize/2), saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize),
pol: pol, pol: pol,
ch: ch, ch: ch,
@ -156,6 +155,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
buf.Release() buf.Release()
break break
} }
buf.Data = chunk.Data buf.Data = chunk.Data
size += uint64(chunk.Length) size += uint64(chunk.Length)