Merge branch 'remove_chunker_reset_and_pool'

This commit is contained in:
Alexander Neumann 2015-05-05 00:56:56 +02:00
commit ae89ac183d
8 changed files with 52 additions and 233 deletions

View file

@ -1,6 +1,7 @@
package restic package restic
import ( import (
"crypto/sha256"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -183,10 +184,8 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) error {
return err return err
} }
chnker := GetChunker("archiver.SaveFile") chnker := chunker.New(file, arch.s.Config.ChunkerPolynomial, sha256.New())
chnker.Reset(file, arch.s.Config.ChunkerPolynomial)
resultChannels := [](<-chan saveResult){} resultChannels := [](<-chan saveResult){}
defer FreeChunker("archiver.SaveFile", chnker)
for { for {
chunk, err := chnker.Next() chunk, err := chnker.Next()

View file

@ -2,6 +2,7 @@ package restic_test
import ( import (
"bytes" "bytes"
"crypto/sha256"
"flag" "flag"
"io" "io"
"testing" "testing"
@ -17,17 +18,14 @@ import (
var benchArchiveDirectory = flag.String("test.benchdir", ".", "benchmark archiving a real directory (default: .)") var benchArchiveDirectory = flag.String("test.benchdir", ".", "benchmark archiving a real directory (default: .)")
var testPol = chunker.Pol(0x3DA3358B4DC173) var testPol = chunker.Pol(0x3DA3358B4DC173)
const bufSize = chunker.MiB
type Rdr interface { type Rdr interface {
io.ReadSeeker io.ReadSeeker
io.ReaderAt io.ReaderAt
} }
func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.Key) { func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.Key) {
ch := restic.GetChunker("BenchmarkChunkEncrypt")
rd.Seek(0, 0) rd.Seek(0, 0)
ch.Reset(rd, testPol) ch := chunker.New(rd, testPol, sha256.New())
for { for {
chunk, err := ch.Next() chunk, err := ch.Next()
@ -47,8 +45,6 @@ func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.K
_, err = crypto.Encrypt(key, buf2, buf) _, err = crypto.Encrypt(key, buf2, buf)
OK(b, err) OK(b, err)
} }
restic.FreeChunker("BenchmarkChunkEncrypt", ch)
} }
func BenchmarkChunkEncrypt(b *testing.B) { func BenchmarkChunkEncrypt(b *testing.B) {
@ -58,8 +54,8 @@ func BenchmarkChunkEncrypt(b *testing.B) {
s := SetupBackend(b) s := SetupBackend(b)
defer TeardownBackend(b, s) defer TeardownBackend(b, s)
buf := restic.GetChunkBuf("BenchmarkChunkEncrypt") buf := make([]byte, chunker.MaxSize)
buf2 := restic.GetChunkBuf("BenchmarkChunkEncrypt") buf2 := make([]byte, chunker.MaxSize)
b.ResetTimer() b.ResetTimer()
b.SetBytes(int64(len(data))) b.SetBytes(int64(len(data)))
@ -67,15 +63,10 @@ func BenchmarkChunkEncrypt(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
benchmarkChunkEncrypt(b, buf, buf2, rd, s.Key()) benchmarkChunkEncrypt(b, buf, buf2, rd, s.Key())
} }
restic.FreeChunkBuf("BenchmarkChunkEncrypt", buf)
restic.FreeChunkBuf("BenchmarkChunkEncrypt", buf2)
} }
func benchmarkChunkEncryptP(b *testing.PB, buf []byte, rd Rdr, key *crypto.Key) { func benchmarkChunkEncryptP(b *testing.PB, buf []byte, rd Rdr, key *crypto.Key) {
ch := restic.GetChunker("BenchmarkChunkEncryptP") ch := chunker.New(rd, testPol, sha256.New())
rd.Seek(0, 0)
ch.Reset(rd, testPol)
for { for {
chunk, err := ch.Next() chunk, err := ch.Next()
@ -88,8 +79,6 @@ func benchmarkChunkEncryptP(b *testing.PB, buf []byte, rd Rdr, key *crypto.Key)
io.ReadFull(chunk.Reader(rd), buf) io.ReadFull(chunk.Reader(rd), buf)
crypto.Encrypt(key, buf, buf) crypto.Encrypt(key, buf, buf)
} }
restic.FreeChunker("BenchmarkChunkEncryptP", ch)
} }
func BenchmarkChunkEncryptParallel(b *testing.B) { func BenchmarkChunkEncryptParallel(b *testing.B) {
@ -98,7 +87,7 @@ func BenchmarkChunkEncryptParallel(b *testing.B) {
data := Random(23, 10<<20) // 10MiB data := Random(23, 10<<20) // 10MiB
buf := restic.GetChunkBuf("BenchmarkChunkEncryptParallel") buf := make([]byte, chunker.MaxSize)
b.ResetTimer() b.ResetTimer()
b.SetBytes(int64(len(data))) b.SetBytes(int64(len(data)))
@ -109,8 +98,6 @@ func BenchmarkChunkEncryptParallel(b *testing.B) {
benchmarkChunkEncryptP(pb, buf, rd, s.Key()) benchmarkChunkEncryptP(pb, buf, rd, s.Key())
} }
}) })
restic.FreeChunkBuf("BenchmarkChunkEncryptParallel", buf)
} }
func archiveDirectory(b testing.TB) { func archiveDirectory(b testing.TB) {

View file

@ -23,8 +23,14 @@ const (
MaxSize = 8 * MiB MaxSize = 8 * MiB
splitmask = (1 << averageBits) - 1 splitmask = (1 << averageBits) - 1
chunkerBufSize = 512 * KiB
) )
var bufPool = sync.Pool{
New: func() interface{} { return make([]byte, chunkerBufSize) },
}
type tables struct { type tables struct {
out [256]Pol out [256]Pol
mod [256]Pol mod [256]Pol
@ -79,38 +85,35 @@ type Chunker struct {
h hash.Hash h hash.Hash
} }
// New returns a new Chunker based on polynomial p that reads from data from rd // New returns a new Chunker based on polynomial p that reads from rd
// with bufsize and pass all data to hash along the way. // with bufsize and pass all data to hash along the way.
func New(rd io.Reader, p Pol, bufsize int, hash hash.Hash) *Chunker { func New(rd io.Reader, pol Pol, h hash.Hash) *Chunker {
c := &Chunker{ c := &Chunker{
buf: make([]byte, bufsize), buf: bufPool.Get().([]byte),
h: hash, h: h,
pol: pol,
rd: rd,
} }
c.Reset(rd, p)
c.reset()
return c return c
} }
// Reset restarts a chunker so that it can be reused with a different func (c *Chunker) reset() {
// polynomial and reader. c.polShift = uint(c.pol.Deg() - 8)
func (c *Chunker) Reset(rd io.Reader, p Pol) {
c.pol = p
c.polShift = uint(p.Deg() - 8)
c.fillTables() c.fillTables()
c.rd = rd
for i := 0; i < windowSize; i++ { for i := 0; i < windowSize; i++ {
c.window[i] = 0 c.window[i] = 0
} }
c.closed = false c.closed = false
c.digest = 0 c.digest = 0
c.wpos = 0 c.wpos = 0
c.pos = 0
c.start = 0
c.count = 0 c.count = 0
c.slide(1)
if p != 0 { c.start = c.pos
c.slide(1)
}
if c.h != nil { if c.h != nil {
c.h.Reset() c.h.Reset()
@ -200,6 +203,9 @@ func (c *Chunker) Next() (*Chunk, error) {
if err == io.EOF && !c.closed { if err == io.EOF && !c.closed {
c.closed = true c.closed = true
// return the buffer to the pool
bufPool.Put(c.buf)
// return current chunk, if any bytes have been processed // return current chunk, if any bytes have been processed
if c.count > 0 { if c.count > 0 {
return &Chunk{ return &Chunk{
@ -276,16 +282,7 @@ func (c *Chunker) Next() (*Chunk, error) {
Digest: c.hashDigest(), Digest: c.hashDigest(),
} }
if c.h != nil { c.reset()
c.h.Reset()
}
// reset chunker, but keep position
pos := c.pos
c.Reset(c.rd, c.pol)
c.pos = pos
c.start = pos
c.pre = MinSize - windowSize
return chunk, nil return chunk, nil
} }

View file

@ -19,7 +19,6 @@ import (
) )
var benchmarkFile = flag.String("bench.file", "", "read from this file for benchmark") var benchmarkFile = flag.String("bench.file", "", "read from this file for benchmark")
var testBufSize = flag.Int("test.bufsize", 256*1024, "use this buffer size for benchmark")
func parseDigest(s string) []byte { func parseDigest(s string) []byte {
d, err := hex.DecodeString(s) d, err := hex.DecodeString(s)
@ -151,7 +150,7 @@ func getRandom(seed, count int) []byte {
func TestChunker(t *testing.T) { func TestChunker(t *testing.T) {
// setup data source // setup data source
buf := getRandom(23, 32*1024*1024) buf := getRandom(23, 32*1024*1024)
ch := chunker.New(bytes.NewReader(buf), testPol, *testBufSize, sha256.New()) ch := chunker.New(bytes.NewReader(buf), testPol, sha256.New())
chunks := testWithData(t, ch, chunks1) chunks := testWithData(t, ch, chunks1)
// test reader // test reader
@ -178,7 +177,7 @@ func TestChunker(t *testing.T) {
// setup nullbyte data source // setup nullbyte data source
buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize) buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize)
ch = chunker.New(bytes.NewReader(buf), testPol, *testBufSize, sha256.New()) ch = chunker.New(bytes.NewReader(buf), testPol, sha256.New())
testWithData(t, ch, chunks2) testWithData(t, ch, chunks2)
} }
@ -194,7 +193,7 @@ func TestChunkerWithRandomPolynomial(t *testing.T) {
t.Logf("generating random polynomial took %v", time.Since(start)) t.Logf("generating random polynomial took %v", time.Since(start))
start = time.Now() start = time.Now()
ch := chunker.New(bytes.NewReader(buf), p, *testBufSize, sha256.New()) ch := chunker.New(bytes.NewReader(buf), p, sha256.New())
t.Logf("creating chunker took %v", time.Since(start)) t.Logf("creating chunker took %v", time.Since(start))
// make sure that first chunk is different // make sure that first chunk is different
@ -211,7 +210,8 @@ func TestChunkerWithRandomPolynomial(t *testing.T) {
func TestChunkerWithoutHash(t *testing.T) { func TestChunkerWithoutHash(t *testing.T) {
// setup data source // setup data source
buf := getRandom(23, 32*1024*1024) buf := getRandom(23, 32*1024*1024)
ch := chunker.New(bytes.NewReader(buf), testPol, *testBufSize, nil)
ch := chunker.New(bytes.NewReader(buf), testPol, nil)
chunks := testWithData(t, ch, chunks1) chunks := testWithData(t, ch, chunks1)
// test reader // test reader
@ -241,30 +241,17 @@ func TestChunkerWithoutHash(t *testing.T) {
// setup nullbyte data source // setup nullbyte data source
buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize) buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize)
ch = chunker.New(bytes.NewReader(buf), testPol, *testBufSize, sha256.New()) ch = chunker.New(bytes.NewReader(buf), testPol, sha256.New())
testWithData(t, ch, chunks2) testWithData(t, ch, chunks2)
} }
func TestChunkerReuse(t *testing.T) {
// test multiple uses of the same chunker
ch := chunker.New(nil, testPol, *testBufSize, sha256.New())
buf := getRandom(23, 32*1024*1024)
for i := 0; i < 4; i++ {
ch.Reset(bytes.NewReader(buf), testPol)
testWithData(t, ch, chunks1)
}
}
func benchmarkChunker(b *testing.B, hash hash.Hash) { func benchmarkChunker(b *testing.B, hash hash.Hash) {
var ( var (
rd io.ReadSeeker rd io.ReadSeeker
size int size int
) )
b.Logf("using bufsize %v", *testBufSize)
if *benchmarkFile != "" { if *benchmarkFile != "" {
b.Logf("using file %q for benchmark", *benchmarkFile) b.Logf("using file %q for benchmark", *benchmarkFile)
f, err := os.Open(*benchmarkFile) f, err := os.Open(*benchmarkFile)
@ -284,8 +271,6 @@ func benchmarkChunker(b *testing.B, hash hash.Hash) {
rd = bytes.NewReader(getRandom(23, size)) rd = bytes.NewReader(getRandom(23, size))
} }
ch := chunker.New(rd, testPol, *testBufSize, hash)
b.ResetTimer() b.ResetTimer()
b.SetBytes(int64(size)) b.SetBytes(int64(size))
@ -294,7 +279,7 @@ func benchmarkChunker(b *testing.B, hash hash.Hash) {
chunks = 0 chunks = 0
rd.Seek(0, 0) rd.Seek(0, 0)
ch.Reset(rd, testPol) ch := chunker.New(rd, testPol, hash)
for { for {
_, err := ch.Next() _, err := ch.Next()
@ -333,6 +318,6 @@ func BenchmarkNewChunker(b *testing.B) {
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
chunker.New(bytes.NewBuffer(nil), p, *testBufSize, nil) chunker.New(bytes.NewBuffer(nil), p, nil)
} }
} }

View file

@ -10,7 +10,6 @@ import (
"golang.org/x/crypto/ssh/terminal" "golang.org/x/crypto/ssh/terminal"
"github.com/jessevdk/go-flags" "github.com/jessevdk/go-flags"
"github.com/restic/restic"
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
"github.com/restic/restic/backend/local" "github.com/restic/restic/backend/local"
"github.com/restic/restic/backend/sftp" "github.com/restic/restic/backend/sftp"
@ -182,7 +181,4 @@ func main() {
if err != nil { if err != nil {
os.Exit(1) os.Exit(1)
} }
// this prints some statistics for memory management using the debug package
restic.PoolAlloc()
} }

View file

@ -276,11 +276,6 @@ func Decrypt(ks *Key, plaintext []byte, ciphertextWithMac []byte) ([]byte, error
panic("trying to decrypt invalid data: ciphertext too small") panic("trying to decrypt invalid data: ciphertext too small")
} }
if cap(plaintext) < len(ciphertextWithMac) {
// extend plaintext
plaintext = append(plaintext, make([]byte, len(ciphertextWithMac)-cap(plaintext))...)
}
// extract mac // extract mac
l := len(ciphertextWithMac) - macSize l := len(ciphertextWithMac) - macSize
ciphertextWithIV, mac := ciphertextWithMac[:l], ciphertextWithMac[l:] ciphertextWithIV, mac := ciphertextWithMac[:l], ciphertextWithMac[l:]
@ -293,6 +288,11 @@ func Decrypt(ks *Key, plaintext []byte, ciphertextWithMac []byte) ([]byte, error
// extract iv // extract iv
iv, ciphertext := ciphertextWithIV[:ivSize], ciphertextWithIV[ivSize:] iv, ciphertext := ciphertextWithIV[:ivSize], ciphertextWithIV[ivSize:]
if cap(plaintext) < len(ciphertext) {
// extend plaintext
plaintext = append(plaintext, make([]byte, len(ciphertext)-cap(plaintext))...)
}
// decrypt data // decrypt data
c, err := aes.NewCipher(ks.Encrypt[:]) c, err := aes.NewCipher(ks.Encrypt[:])
if err != nil { if err != nil {

View file

@ -8,7 +8,6 @@ import (
"os" "os"
"testing" "testing"
"github.com/restic/restic"
"github.com/restic/restic/chunker" "github.com/restic/restic/chunker"
"github.com/restic/restic/crypto" "github.com/restic/restic/crypto"
. "github.com/restic/restic/test" . "github.com/restic/restic/test"
@ -29,7 +28,9 @@ func TestEncryptDecrypt(t *testing.T) {
_, err := io.ReadFull(RandomReader(42, size), data) _, err := io.ReadFull(RandomReader(42, size), data)
OK(t, err) OK(t, err)
ciphertext, err := crypto.Encrypt(k, restic.GetChunkBuf("TestEncryptDecrypt"), data) buf := make([]byte, size+crypto.Extension)
ciphertext, err := crypto.Encrypt(k, buf, data)
OK(t, err) OK(t, err)
Assert(t, len(ciphertext) == len(data)+crypto.Extension, Assert(t, len(ciphertext) == len(data)+crypto.Extension,
"ciphertext length does not match: want %d, got %d", "ciphertext length does not match: want %d, got %d",
@ -41,8 +42,6 @@ func TestEncryptDecrypt(t *testing.T) {
"plaintext length does not match: want %d, got %d", "plaintext length does not match: want %d, got %d",
len(data), len(plaintext)) len(data), len(plaintext))
restic.FreeChunkBuf("TestEncryptDecrypt", ciphertext)
Equals(t, plaintext, data) Equals(t, plaintext, data)
} }
} }
@ -107,10 +106,10 @@ func TestCornerCases(t *testing.T) {
"wrong length returned for ciphertext, expected 0, got %d", "wrong length returned for ciphertext, expected 0, got %d",
len(c)) len(c))
// this should decrypt to an empty slice // this should decrypt to nil
p, err := crypto.Decrypt(k, nil, c) p, err := crypto.Decrypt(k, nil, c)
OK(t, err) OK(t, err)
Equals(t, []byte{}, p) Equals(t, []byte(nil), p)
// test encryption for same slice, this should return an error // test encryption for same slice, this should return an error
_, err = crypto.Encrypt(k, c, c) _, err = crypto.Encrypt(k, c, c)
@ -226,8 +225,6 @@ func BenchmarkEncryptDecryptReader(b *testing.B) {
_, err = io.Copy(ioutil.Discard, r) _, err = io.Copy(ioutil.Discard, r)
OK(b, err) OK(b, err)
} }
restic.PoolAlloc()
} }
func BenchmarkDecrypt(b *testing.B) { func BenchmarkDecrypt(b *testing.B) {
@ -236,10 +233,8 @@ func BenchmarkDecrypt(b *testing.B) {
k := crypto.NewRandomKey() k := crypto.NewRandomKey()
ciphertext := restic.GetChunkBuf("BenchmarkDecrypt") plaintext := make([]byte, size)
defer restic.FreeChunkBuf("BenchmarkDecrypt", ciphertext) ciphertext := make([]byte, size+crypto.Extension)
plaintext := restic.GetChunkBuf("BenchmarkDecrypt")
defer restic.FreeChunkBuf("BenchmarkDecrypt", plaintext)
ciphertext, err := crypto.Encrypt(k, ciphertext, data) ciphertext, err := crypto.Encrypt(k, ciphertext, data)
OK(b, err) OK(b, err)

140
pools.go
View file

@ -1,140 +0,0 @@
package restic
import (
"crypto/sha256"
"sync"
"github.com/restic/restic/chunker"
"github.com/restic/restic/crypto"
"github.com/restic/restic/debug"
)
type poolStats struct {
m sync.Mutex
mget map[string]int
mput map[string]int
mmax map[string]int
new int
get int
put int
max int
}
const (
maxCiphertextSize = crypto.Extension + chunker.MaxSize
chunkerBufSize = 512 * chunker.KiB
)
func (s *poolStats) Get(k string) {
s.m.Lock()
defer s.m.Unlock()
s.get++
cur := s.get - s.put
if cur > s.max {
s.max = cur
}
if k != "" {
if _, ok := s.mget[k]; !ok {
s.mget[k] = 0
s.mput[k] = 0
s.mmax[k] = 0
}
s.mget[k]++
cur = s.mget[k] - s.mput[k]
if cur > s.mmax[k] {
s.mmax[k] = cur
}
}
}
func (s *poolStats) Put(k string) {
s.m.Lock()
defer s.m.Unlock()
s.put++
if k != "" {
s.mput[k]++
}
}
func newPoolStats() *poolStats {
return &poolStats{
mget: make(map[string]int),
mput: make(map[string]int),
mmax: make(map[string]int),
}
}
var (
chunkPool = sync.Pool{New: newChunkBuf}
chunkerPool = sync.Pool{New: newChunker}
chunkStats = newPoolStats()
nodeStats = newPoolStats()
chunkerStats = newPoolStats()
)
func newChunkBuf() interface{} {
chunkStats.m.Lock()
defer chunkStats.m.Unlock()
chunkStats.new++
// create buffer for iv, data and mac
return make([]byte, maxCiphertextSize)
}
func newChunker() interface{} {
chunkStats.m.Lock()
defer chunkStats.m.Unlock()
chunkStats.new++
// create a new chunker with a nil reader and null polynomial
return chunker.New(nil, 0, chunkerBufSize, sha256.New())
}
func GetChunkBuf(s string) []byte {
chunkStats.Get(s)
return chunkPool.Get().([]byte)
}
func FreeChunkBuf(s string, buf []byte) {
chunkStats.Put(s)
chunkPool.Put(buf)
}
func GetChunker(s string) *chunker.Chunker {
chunkerStats.Get(s)
return chunkerPool.Get().(*chunker.Chunker)
}
func FreeChunker(s string, ch *chunker.Chunker) {
chunkerStats.Put(s)
chunkerPool.Put(ch)
}
func PoolAlloc() {
debug.Log("pools.PoolAlloc", "pool stats for chunk: new %d, get %d, put %d, diff %d, max %d\n",
chunkStats.new, chunkStats.get, chunkStats.put, chunkStats.get-chunkStats.put, chunkStats.max)
for k, v := range chunkStats.mget {
debug.Log("pools.PoolAlloc", "pool stats for chunk[%s]: get %d, put %d, diff %d, max %d\n",
k, v, chunkStats.mput[k], v-chunkStats.mput[k], chunkStats.mmax[k])
}
debug.Log("pools.PoolAlloc", "pool stats for node: new %d, get %d, put %d, diff %d, max %d\n",
nodeStats.new, nodeStats.get, nodeStats.put, nodeStats.get-nodeStats.put, nodeStats.max)
for k, v := range nodeStats.mget {
debug.Log("pools.PoolAlloc", "pool stats for node[%s]: get %d, put %d, diff %d, max %d\n", k, v, nodeStats.mput[k], v-nodeStats.mput[k], nodeStats.mmax[k])
}
debug.Log("pools.PoolAlloc", "pool stats for chunker: new %d, get %d, put %d, diff %d, max %d\n",
chunkerStats.new, chunkerStats.get, chunkerStats.put, chunkerStats.get-chunkerStats.put, chunkerStats.max)
for k, v := range chunkerStats.mget {
debug.Log("pools.PoolAlloc", "pool stats for chunker[%s]: get %d, put %d, diff %d, max %d\n", k, v, chunkerStats.mput[k], v-chunkerStats.mput[k], chunkerStats.mmax[k])
}
}