Implement streaming chunker using io.Reader

This commit is contained in:
Alexander Neumann 2015-02-08 22:54:45 +01:00
parent a5c33d80d8
commit bda33e612c
6 changed files with 300 additions and 147 deletions

View file

@ -1,6 +1,7 @@
package restic
import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
@ -15,8 +16,9 @@ import (
)
const (
maxConcurrentFiles = 8
maxConcurrentBlobs = 8
maxConcurrentFiles = 16
maxConcurrentBlobs = 16
chunkerBufSize = 512 * chunker.KiB
)
type Archiver struct {
@ -61,10 +63,7 @@ func NewArchiver(s Server, p *Progress) (*Archiver, error) {
return arch, nil
}
func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) {
// compute plaintext hash
id := backend.Hash(data)
func (arch *Archiver) Save(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) {
debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str())
// test if this blob is already known
@ -76,7 +75,7 @@ func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) {
}
// else encrypt and save data
blob, err = arch.s.Save(t, data, id)
blob, err = arch.s.SaveFrom(t, id, length, rd)
// store blob in storage map
smapblob := arch.m.Insert(blob)
@ -161,94 +160,67 @@ func (arch *Archiver) SaveFile(node *Node) (Blobs, error) {
var blobs Blobs
// if the file is small enough, store it directly
if node.Size < chunker.MinSize {
// acquire token
token := <-arch.blobToken
defer func() {
arch.blobToken <- token
}()
// store all chunks
chnker := chunker.New(file, chunkerBufSize, sha256.New)
chans := [](<-chan Blob){}
//defer chnker.Free()
buf := GetChunkBuf("blob single file")
defer FreeChunkBuf("blob single file", buf)
n, err := io.ReadFull(file, buf)
if err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
return nil, arrar.Annotate(err, "SaveFile() read small file")
chunks := 0
for {
buf := GetChunkBuf("blob chunker")
chunk, err := chnker.Next()
if err == io.EOF {
FreeChunkBuf("blob chunker", buf)
break
}
if err == io.EOF {
// use empty blob list for empty files
blobs = Blobs{}
} else {
blob, err := arch.Save(backend.Data, buf[:n])
if err != nil {
FreeChunkBuf("blob chunker", buf)
return nil, arrar.Annotate(err, "SaveFile() chunker.Next()")
}
chunks++
// acquire token, start goroutine to save chunk
token := <-arch.blobToken
resCh := make(chan Blob, 1)
go func(ch chan<- Blob) {
blob, err := arch.Save(backend.Data, chunk.Digest, chunk.Length, chunk.Reader(file))
// TODO handle error
if err != nil {
return nil, arrar.Annotate(err, "SaveFile() save chunk")
panic(err)
}
FreeChunkBuf("blob chunker", buf)
arch.p.Report(Stat{Bytes: blob.Size})
arch.blobToken <- token
ch <- blob
}(resCh)
blobs = Blobs{blob}
}
} else {
// else store all chunks
chnker := chunker.New(file)
chans := [](<-chan Blob){}
defer chnker.Free()
chans = append(chans, resCh)
}
chunks := 0
blobs = []Blob{}
for _, ch := range chans {
blobs = append(blobs, <-ch)
}
for {
buf := GetChunkBuf("blob chunker")
chunk, err := chnker.Next(buf)
if err == io.EOF {
FreeChunkBuf("blob chunker", buf)
break
}
if err != nil {
FreeChunkBuf("blob chunker", buf)
return nil, arrar.Annotate(err, "SaveFile() chunker.Next()")
}
chunks++
// acquire token, start goroutine to save chunk
token := <-arch.blobToken
resCh := make(chan Blob, 1)
go func(ch chan<- Blob) {
blob, err := arch.Save(backend.Data, chunk.Data)
// TODO handle error
if err != nil {
panic(err)
}
FreeChunkBuf("blob chunker", buf)
arch.p.Report(Stat{Bytes: blob.Size})
arch.blobToken <- token
ch <- blob
}(resCh)
chans = append(chans, resCh)
}
blobs = []Blob{}
for _, ch := range chans {
blobs = append(blobs, <-ch)
}
if len(blobs) != chunks {
return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(blobs))
}
if len(blobs) != chunks {
return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(blobs))
}
var bytes uint64
node.Content = make([]backend.ID, len(blobs))
debug.Log("Archiver.Save", "checking size for file %s", node.path)
for i, blob := range blobs {
node.Content[i] = blob.ID
bytes += blob.Size
debug.Log("Archiver.Save", " adding blob %s", blob)
}
if bytes != node.Size {

View file

@ -2,6 +2,7 @@ package restic_test
import (
"bytes"
"crypto/sha256"
"io"
"math/rand"
"testing"
@ -25,22 +26,26 @@ func get_random(seed, count int) []byte {
return buf
}
const bufSize = chunker.MiB
func BenchmarkChunkEncrypt(b *testing.B) {
data := get_random(23, 10<<20) // 10MiB
rd := bytes.NewReader(data)
be := setupBackend(b)
defer teardownBackend(b, be)
key := setupKey(b, be, "geheim")
chunkBuf := make([]byte, chunker.MaxSize)
chunkBuf := make([]byte, restic.CiphertextExtension+chunker.MaxSize)
b.ResetTimer()
b.SetBytes(int64(len(data)))
for i := 0; i < b.N; i++ {
ch := chunker.New(bytes.NewReader(data))
rd.Seek(0, 0)
ch := chunker.New(rd, bufSize, sha256.New)
for {
chunk_data, err := ch.Next(chunkBuf)
chunk, err := ch.Next()
if err == io.EOF {
break
@ -48,8 +53,13 @@ func BenchmarkChunkEncrypt(b *testing.B) {
ok(b, err)
buf := make([]byte, restic.CiphertextExtension+chunker.MaxSize)
_, err = key.Encrypt(buf, chunk_data.Data)
// reduce length of chunkBuf
chunkBuf = chunkBuf[:chunk.Length]
n, err := io.ReadFull(chunk.Reader(rd), chunkBuf)
ok(b, err)
assert(b, uint(n) == chunk.Length, "invalid length: got %d, expected %d", n, chunk.Length)
_, err = key.Encrypt(chunkBuf, chunkBuf)
ok(b, err)
}
}

View file

@ -35,7 +35,7 @@ func (b Blob) Valid() bool {
}
func (b Blob) String() string {
return fmt.Sprintf("Blob<%s -> %s>",
b.ID.Str(),
b.Storage.Str())
return fmt.Sprintf("Blob<%s (%d) -> %s (%d)>",
b.ID.Str(), b.Size,
b.Storage.Str(), b.StorageSize)
}

View file

@ -1,6 +1,7 @@
package chunker
import (
"hash"
"io"
"sync"
)
@ -36,10 +37,14 @@ var (
// A chunk is one content-dependent chunk of bytes whose end was cut when the
// Rabin Fingerprint had the value stored in Cut.
type Chunk struct {
Start int
Length int
Start uint
Length uint
Cut uint64
Data []byte
Digest []byte
}
func (c Chunk) Reader(r io.ReaderAt) io.Reader {
return io.NewSectionReader(r, int64(c.Start), int64(c.Length))
}
// A chunker internally holds everything needed to split content.
@ -47,30 +52,32 @@ type Chunker struct {
rd io.Reader
closed bool
window []byte
window [WindowSize]byte
wpos int
buf []byte
bpos int
bmax int
bpos uint
bmax uint
start int
count int
pos int
start uint
count uint
pos uint
pre int // wait for this many bytes before start calculating an new chunk
pre uint // wait for this many bytes before start calculating an new chunk
digest uint64
h hash.Hash
hfn func() hash.Hash
}
// New returns a new Chunker that reads from data from rd.
func New(rd io.Reader, bufsize int) *Chunker {
func New(rd io.Reader, bufsize int, hashfn func() hash.Hash) *Chunker {
once.Do(fill_tables)
c := &Chunker{
window: make([]byte, WindowSize),
buf: make([]byte, bufsize),
rd: rd,
buf: make([]byte, bufsize),
hfn: hashfn,
rd: rd,
}
c.reset()
@ -87,6 +94,9 @@ func (c *Chunker) reset() {
c.pos = 0
c.count = 0
c.slide(1)
c.resetHash()
// do not start a new chunk unless at least MinSize bytes have been read
c.pre = MinSize - WindowSize
}
@ -135,7 +145,7 @@ func fill_tables() {
func (c *Chunker) Next() (*Chunk, error) {
for {
if c.bpos >= c.bmax {
n, err := io.ReadFull(c.rd, c.buf)
n, err := io.ReadFull(c.rd, c.buf[:])
if err == io.ErrUnexpectedEOF {
err = nil
@ -155,6 +165,7 @@ func (c *Chunker) Next() (*Chunk, error) {
Start: c.start,
Length: c.count,
Cut: c.digest,
Digest: c.hashDigest(),
}, nil
}
}
@ -164,21 +175,25 @@ func (c *Chunker) Next() (*Chunk, error) {
}
c.bpos = 0
c.bmax = n
c.bmax = uint(n)
}
// check if bytes have to be dismissed before starting a new chunk
if c.pre > 0 {
n := c.bmax - c.bpos
if c.pre > n {
c.pre -= n
if c.pre > uint(n) {
c.pre -= uint(n)
c.updateHash(c.buf[c.bpos:c.bmax])
c.count += n
c.pos += n
c.count += uint(n)
c.pos += uint(n)
c.bpos = c.bmax
continue
}
c.updateHash(c.buf[c.bpos : c.bpos+c.pre])
c.bpos += c.pre
c.count += c.pre
c.pos += c.pre
@ -198,18 +213,23 @@ func (c *Chunker) Next() (*Chunk, error) {
c.digest |= uint64(b)
c.digest ^= mod_table[index]
// end inline
if (c.count+i+1 >= MinSize && (c.digest&splitmask) == 0) || c.count+i+1 >= MaxSize {
c.count += i + 1
c.pos += i + 1
c.bpos += i + 1
if (c.count+uint(i)+1 >= MinSize && (c.digest&splitmask) == 0) || c.count+uint(i)+1 >= MaxSize {
c.updateHash(c.buf[c.bpos : c.bpos+uint(i)+1])
c.count += uint(i) + 1
c.pos += uint(i) + 1
c.bpos += uint(i) + 1
chunk := &Chunk{
Start: c.start,
Length: c.count,
Cut: c.digest,
Digest: c.hashDigest(),
}
c.resetHash()
// keep position
pos := c.pos
c.reset()
@ -222,12 +242,39 @@ func (c *Chunker) Next() (*Chunk, error) {
}
steps := c.bmax - c.bpos
if steps > 0 {
c.updateHash(c.buf[c.bpos : c.bpos+steps])
}
c.count += steps
c.pos += steps
c.bpos = c.bmax
}
}
func (c *Chunker) resetHash() {
if c.hfn != nil {
c.h = c.hfn()
}
}
func (c *Chunker) updateHash(data []byte) {
if c.h != nil {
// the hashes from crypto/sha* do not return an error
_, err := c.h.Write(data)
if err != nil {
panic(err)
}
}
}
func (c *Chunker) hashDigest() []byte {
if c.h == nil {
return nil
}
return c.h.Sum(nil)
}
func (c *Chunker) append(b byte) {
index := c.digest >> uint(pol_shift)
c.digest <<= 8

View file

@ -2,7 +2,11 @@ package chunker_test
import (
"bytes"
"crypto/md5"
"crypto/sha256"
"encoding/hex"
"flag"
"hash"
"io"
"math/rand"
"os"
@ -14,9 +18,19 @@ import (
var benchmarkFile = flag.String("bench.file", "", "read from this file for benchmark")
var testBufSize = flag.Int("test.bufsize", 256*1024, "use this buffer size for benchmark")
func parseDigest(s string) []byte {
d, err := hex.DecodeString(s)
if err != nil {
panic(err)
}
return d
}
type chunk struct {
Length int
Length uint
CutFP uint64
Digest []byte
}
// created for 32MB of random data out of math/rand's Uint32() seeded by
@ -26,41 +40,44 @@ type chunk struct {
// window size 64, avg chunksize 1<<20, min chunksize 1<<19, max chunksize 1<<23
// polynom 0x3DA3358B4DC173
var chunks1 = []chunk{
chunk{2163460, 0x000b98d4cdf00000},
chunk{643703, 0x000d4e8364d00000},
chunk{1528956, 0x0015a25c2ef00000},
chunk{1955808, 0x00102a8242e00000},
chunk{2222372, 0x00045da878000000},
chunk{2538687, 0x00198a8179900000},
chunk{609606, 0x001d4e8d17100000},
chunk{1205738, 0x000a7204dd600000},
chunk{959742, 0x00183e71e1400000},
chunk{4036109, 0x001fec043c700000},
chunk{1525894, 0x000b1574b1500000},
chunk{1352720, 0x00018965f2e00000},
chunk{811884, 0x00155628aa100000},
chunk{1282314, 0x001909a0a1400000},
chunk{1318021, 0x001cceb980000000},
chunk{948640, 0x0011f7a470a00000},
chunk{645464, 0x00030ce2d9400000},
chunk{533758, 0x0004435c53c00000},
chunk{1128303, 0x0000c48517800000},
chunk{800374, 0x000968473f900000},
chunk{2453512, 0x001e197c92600000},
chunk{2651975, 0x000ae6c868000000},
chunk{237392, 0x0000000000000001},
chunk{2163460, 0x000b98d4cdf00000, parseDigest("4b94cb2cf293855ea43bf766731c74969b91aa6bf3c078719aabdd19860d590d")},
chunk{643703, 0x000d4e8364d00000, parseDigest("5727a63c0964f365ab8ed2ccf604912f2ea7be29759a2b53ede4d6841e397407")},
chunk{1528956, 0x0015a25c2ef00000, parseDigest("a73759636a1e7a2758767791c69e81b69fb49236c6929e5d1b654e06e37674ba")},
chunk{1955808, 0x00102a8242e00000, parseDigest("c955fb059409b25f07e5ae09defbbc2aadf117c97a3724e06ad4abd2787e6824")},
chunk{2222372, 0x00045da878000000, parseDigest("6ba5e9f7e1b310722be3627716cf469be941f7f3e39a4c3bcefea492ec31ee56")},
chunk{2538687, 0x00198a8179900000, parseDigest("8687937412f654b5cfe4a82b08f28393a0c040f77c6f95e26742c2fc4254bfde")},
chunk{609606, 0x001d4e8d17100000, parseDigest("5da820742ff5feb3369112938d3095785487456f65a8efc4b96dac4be7ebb259")},
chunk{1205738, 0x000a7204dd600000, parseDigest("cc70d8fad5472beb031b1aca356bcab86c7368f40faa24fe5f8922c6c268c299")},
chunk{959742, 0x00183e71e1400000, parseDigest("4065bdd778f95676c92b38ac265d361f81bff17d76e5d9452cf985a2ea5a4e39")},
chunk{4036109, 0x001fec043c700000, parseDigest("b9cf166e75200eb4993fc9b6e22300a6790c75e6b0fc8f3f29b68a752d42f275")},
chunk{1525894, 0x000b1574b1500000, parseDigest("2f238180e4ca1f7520a05f3d6059233926341090f9236ce677690c1823eccab3")},
chunk{1352720, 0x00018965f2e00000, parseDigest("afd12f13286a3901430de816e62b85cc62468c059295ce5888b76b3af9028d84")},
chunk{811884, 0x00155628aa100000, parseDigest("42d0cdb1ee7c48e552705d18e061abb70ae7957027db8ae8db37ec756472a70a")},
chunk{1282314, 0x001909a0a1400000, parseDigest("819721c2457426eb4f4c7565050c44c32076a56fa9b4515a1c7796441730eb58")},
chunk{1318021, 0x001cceb980000000, parseDigest("842eb53543db55bacac5e25cb91e43cc2e310fe5f9acc1aee86bdf5e91389374")},
chunk{948640, 0x0011f7a470a00000, parseDigest("b8e36bf7019bb96ac3fb7867659d2167d9d3b3148c09fe0de45850b8fe577185")},
chunk{645464, 0x00030ce2d9400000, parseDigest("5584bd27982191c3329f01ed846bfd266e96548dfa87018f745c33cfc240211d")},
chunk{533758, 0x0004435c53c00000, parseDigest("4da778a25b72a9a0d53529eccfe2e5865a789116cb1800f470d8df685a8ab05d")},
chunk{1128303, 0x0000c48517800000, parseDigest("08c6b0b38095b348d80300f0be4c5184d2744a17147c2cba5cc4315abf4c048f")},
chunk{800374, 0x000968473f900000, parseDigest("820284d2c8fd243429674c996d8eb8d3450cbc32421f43113e980f516282c7bf")},
chunk{2453512, 0x001e197c92600000, parseDigest("5fa870ed107c67704258e5e50abe67509fb73562caf77caa843b5f243425d853")},
chunk{2651975, 0x000ae6c868000000, parseDigest("181347d2bbec32bef77ad5e9001e6af80f6abcf3576549384d334ee00c1988d8")},
chunk{237392, 0x0000000000000001, parseDigest("fcd567f5d866357a8e299fd5b2359bb2c8157c30395229c4e9b0a353944a7978")},
}
// test if nullbytes are correctly split, even if length is a multiple of MinSize.
var chunks2 = []chunk{
chunk{chunker.MinSize, 0},
chunk{chunker.MinSize, 0},
chunk{chunker.MinSize, 0},
chunk{chunker.MinSize, 0},
chunk{chunker.MinSize, 0, parseDigest("07854d2fef297a06ba81685e660c332de36d5d18d546927d30daad6d7fda1541")},
chunk{chunker.MinSize, 0, parseDigest("07854d2fef297a06ba81685e660c332de36d5d18d546927d30daad6d7fda1541")},
chunk{chunker.MinSize, 0, parseDigest("07854d2fef297a06ba81685e660c332de36d5d18d546927d30daad6d7fda1541")},
chunk{chunker.MinSize, 0, parseDigest("07854d2fef297a06ba81685e660c332de36d5d18d546927d30daad6d7fda1541")},
}
func test_with_data(t *testing.T, chnker *chunker.Chunker, chunks []chunk) {
for i, chunk := range chunks {
func test_with_data(t *testing.T, chnker *chunker.Chunker, testChunks []chunk) []*chunker.Chunk {
chunks := []*chunker.Chunk{}
pos := uint(0)
for i, chunk := range testChunks {
c, err := chnker.Next()
if err != nil {
@ -72,15 +89,28 @@ func test_with_data(t *testing.T, chnker *chunker.Chunker, chunks []chunk) {
}
if c != nil {
if c.Start != pos {
t.Fatalf("Start for chunk %d does not match: expected %d, got %d",
i, c.Start, pos)
}
if c.Length != chunk.Length {
t.Fatalf("Length for chunk %d does not match: expected %d, got %d",
i, chunk.Length, c.Length)
i, c.Length, chunk.Length)
}
if c.Cut != chunk.CutFP {
t.Fatalf("Cut fingerprint for chunk %d/%d does not match: expected %016x, got %016x",
i, len(chunks)-1, chunk.CutFP, c.Cut)
i, len(chunks)-1, c.Cut, chunk.CutFP)
}
if !bytes.Equal(c.Digest, chunk.Digest) {
t.Fatalf("Digest fingerprint for chunk %d/%d does not match: expected %q, got %q",
i, len(chunks)-1, hex.EncodeToString(c.Digest), hex.EncodeToString(chunk.Digest))
}
pos += c.Length
chunks = append(chunks, c)
}
}
@ -93,6 +123,8 @@ func test_with_data(t *testing.T, chnker *chunker.Chunker, chunks []chunk) {
if err != io.EOF {
t.Fatal("wrong error returned after last chunk")
}
return chunks
}
func get_random(seed, count int) []byte {
@ -113,12 +145,34 @@ func get_random(seed, count int) []byte {
func TestChunker(t *testing.T) {
// setup data source
buf := get_random(23, 32*1024*1024)
ch := chunker.New(bytes.NewReader(buf), *testBufSize)
test_with_data(t, ch, chunks1)
ch := chunker.New(bytes.NewReader(buf), *testBufSize, sha256.New)
chunks := test_with_data(t, ch, chunks1)
// test reader
for i, c := range chunks {
rd := c.Reader(bytes.NewReader(buf))
h := sha256.New()
n, err := io.Copy(h, rd)
if err != nil {
t.Fatalf("io.Copy(): %v", err)
}
if uint(n) != chunks1[i].Length {
t.Fatalf("reader returned wrong number of bytes: expected %d, got %d",
chunks1[i].Length, n)
}
d := h.Sum(nil)
if !bytes.Equal(d, chunks1[i].Digest) {
t.Fatalf("wrong hash returned: expected %02x, got %02x",
chunks1[i].Digest, d)
}
}
// setup nullbyte data source
buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize)
ch = chunker.New(bytes.NewReader(buf), *testBufSize)
ch = chunker.New(bytes.NewReader(buf), *testBufSize, sha256.New)
test_with_data(t, ch, chunks2)
}
@ -127,12 +181,12 @@ func TestChunkerReuse(t *testing.T) {
// test multiple uses of the same chunker
for i := 0; i < 4; i++ {
buf := get_random(23, 32*1024*1024)
ch := chunker.New(bytes.NewReader(buf), *testBufSize)
ch := chunker.New(bytes.NewReader(buf), *testBufSize, sha256.New)
test_with_data(t, ch, chunks1)
}
}
func BenchmarkChunker(b *testing.B) {
func benchmarkChunker(b *testing.B, hash func() hash.Hash) {
var (
rd io.ReadSeeker
size int
@ -167,7 +221,7 @@ func BenchmarkChunker(b *testing.B) {
chunks = 0
rd.Seek(0, 0)
ch := chunker.New(rd, *testBufSize)
ch := chunker.New(rd, *testBufSize, hash)
for {
_, err := ch.Next()
@ -186,3 +240,15 @@ func BenchmarkChunker(b *testing.B) {
b.Logf("%d chunks, average chunk size: %d bytes", chunks, size/chunks)
}
func BenchmarkChunkerWithSHA256(b *testing.B) {
benchmarkChunker(b, sha256.New)
}
func BenchmarkChunkerWithMD5(b *testing.B) {
benchmarkChunker(b, md5.New)
}
func BenchmarkChunker(b *testing.B) {
benchmarkChunker(b, nil)
}

View file

@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
@ -179,6 +180,63 @@ func (s Server) Save(t backend.Type, data []byte, id backend.ID) (Blob, error) {
return blob, nil
}
// SaveFrom encrypts data read from rd and stores it to the backend as type t.
func (s Server) SaveFrom(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) {
if id == nil {
return Blob{}, errors.New("id is nil")
}
// create a new blob
blob := Blob{
ID: id,
Size: uint64(length),
}
var ciphertext []byte
// allocate slice for plaintext
plaintext := GetChunkBuf("ch.Save()")
defer FreeChunkBuf("ch.Save()", plaintext)
// if the data is small enough, use a slice from the pool for the ciphertext
if length <= maxCiphertextSize-ivSize-hmacSize {
ciphertext = GetChunkBuf("ch.Save()")
defer FreeChunkBuf("ch.Save()", ciphertext)
} else {
l := length + ivSize + hmacSize
debug.Log("Server.Save", "create large slice of %d bytes for ciphertext", l)
// use a new slice
ciphertext = make([]byte, l)
}
plaintext = plaintext[:length]
_, err := io.ReadFull(rd, plaintext)
if err != nil {
return Blob{}, err
}
// encrypt blob
n, err := s.Encrypt(ciphertext, plaintext)
if err != nil {
return Blob{}, err
}
ciphertext = ciphertext[:n]
// save blob
sid, err := s.Create(t, ciphertext)
if err != nil {
return Blob{}, err
}
blob.Storage = sid
blob.StorageSize = uint64(len(ciphertext))
return blob, nil
}
// SaveJSON serialises item as JSON and uses Save() to store it to the backend as type t.
func (s Server) SaveJSON(t backend.Type, item interface{}) (Blob, error) {
// convert to json