From a36c01372d50332fc6f973634c8485ffc1d47116 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 22 Jan 2017 17:53:00 +0100 Subject: [PATCH] Use streaming functions for saving data in repo --- src/restic/id.go | 10 +++ src/restic/repository/packer_manager.go | 82 +++++++++++--------- src/restic/repository/packer_manager_test.go | 44 +++++------ src/restic/repository/repository.go | 6 +- 4 files changed, 76 insertions(+), 66 deletions(-) diff --git a/src/restic/id.go b/src/restic/id.go index 08cb6f64b..c64508a4e 100644 --- a/src/restic/id.go +++ b/src/restic/id.go @@ -114,3 +114,13 @@ func (id *ID) UnmarshalJSON(b []byte) error { return nil } + +// IDFromHash returns the ID for the hash. +func IDFromHash(hash []byte) (id ID) { + if len(hash) != idSize { + panic("invalid hash type, not enough/too many bytes") + } + + copy(id[:], hash) + return id +} diff --git a/src/restic/repository/packer_manager.go b/src/restic/repository/packer_manager.go index cad988f0c..95fe10c0a 100644 --- a/src/restic/repository/packer_manager.go +++ b/src/restic/repository/packer_manager.go @@ -1,7 +1,7 @@ package repository import ( - "bytes" + "crypto/sha256" "io" "io/ioutil" "os" @@ -9,6 +9,7 @@ import ( "sync" "restic/errors" + "restic/hashing" "restic/crypto" "restic/debug" @@ -21,12 +22,19 @@ type Saver interface { Save(restic.Handle, io.Reader) error } +// Packer holds a pack.Packer together with a hash writer. +type Packer struct { + *pack.Packer + hw *hashing.Writer + tmpfile *os.File +} + // packerManager keeps a list of open packs and creates new on demand. type packerManager struct { - be Saver - key *crypto.Key - pm sync.Mutex - packs []*pack.Packer + be Saver + key *crypto.Key + pm sync.Mutex + packers []*Packer pool sync.Pool } @@ -51,18 +59,18 @@ func newPackerManager(be Saver, key *crypto.Key) *packerManager { // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. -func (r *packerManager) findPacker(size uint) (packer *pack.Packer, err error) { +func (r *packerManager) findPacker(size uint) (packer *Packer, err error) { r.pm.Lock() defer r.pm.Unlock() // search for a suitable packer - if len(r.packs) > 0 { + if len(r.packers) > 0 { debug.Log("searching packer for %d bytes\n", size) - for i, p := range r.packs { - if p.Size()+size < maxPackSize { + for i, p := range r.packers { + if p.Packer.Size()+size < maxPackSize { debug.Log("found packer %v", p) // remove from list - r.packs = append(r.packs[:i], r.packs[i+1:]...) + r.packers = append(r.packers[:i], r.packers[i+1:]...) return p, nil } } @@ -75,50 +83,43 @@ func (r *packerManager) findPacker(size uint) (packer *pack.Packer, err error) { return nil, errors.Wrap(err, "ioutil.TempFile") } - return pack.NewPacker(r.key, tmpfile), nil + hw := hashing.NewWriter(tmpfile, sha256.New()) + p := pack.NewPacker(r.key, hw) + packer = &Packer{ + Packer: p, + hw: hw, + tmpfile: tmpfile, + } + + return packer, nil } // insertPacker appends p to s.packs. -func (r *packerManager) insertPacker(p *pack.Packer) { +func (r *packerManager) insertPacker(p *Packer) { r.pm.Lock() defer r.pm.Unlock() - r.packs = append(r.packs, p) - debug.Log("%d packers\n", len(r.packs)) + r.packers = append(r.packers, p) + debug.Log("%d packers\n", len(r.packers)) } // savePacker stores p in the backend. -func (r *Repository) savePacker(p *pack.Packer) error { - debug.Log("save packer with %d blobs\n", p.Count()) - n, err := p.Finalize() +func (r *Repository) savePacker(p *Packer) error { + debug.Log("save packer with %d blobs\n", p.Packer.Count()) + _, err := p.Packer.Finalize() if err != nil { return err } - tmpfile := p.Writer().(*os.File) - f, err := fs.Open(tmpfile.Name()) + f, err := fs.Open(p.tmpfile.Name()) if err != nil { return errors.Wrap(err, "Open") } - data := make([]byte, n) - m, err := io.ReadFull(f, data) - if err != nil { - return errors.Wrap(err, "ReadFul") - } - - if uint(m) != n { - return errors.Errorf("read wrong number of bytes from %v: want %v, got %v", tmpfile.Name(), n, m) - } - - if err = f.Close(); err != nil { - return errors.Wrap(err, "Close") - } - - id := restic.Hash(data) + id := restic.IDFromHash(p.hw.Sum(nil)) h := restic.Handle{Type: restic.DataFile, Name: id.String()} - err = r.be.Save(h, bytes.NewReader(data)) + err = r.be.Save(h, f) if err != nil { debug.Log("Save(%v) error: %v", h, err) return err @@ -126,13 +127,18 @@ func (r *Repository) savePacker(p *pack.Packer) error { debug.Log("saved as %v", h) - err = fs.Remove(tmpfile.Name()) + err = f.Close() + if err != nil { + return errors.Wrap(err, "close tempfile") + } + + err = fs.Remove(p.tmpfile.Name()) if err != nil { return errors.Wrap(err, "Remove") } // update blobs in the index - for _, b := range p.Blobs() { + for _, b := range p.Packer.Blobs() { debug.Log(" updating blob %v to pack %v", b.ID.Str(), id.Str()) r.idx.Store(restic.PackedBlob{ Blob: restic.Blob{ @@ -153,5 +159,5 @@ func (r *packerManager) countPacker() int { r.pm.Lock() defer r.pm.Unlock() - return len(r.packs) + return len(r.packers) } diff --git a/src/restic/repository/packer_manager_test.go b/src/restic/repository/packer_manager_test.go index e1880d2f6..59b502662 100644 --- a/src/restic/repository/packer_manager_test.go +++ b/src/restic/repository/packer_manager_test.go @@ -1,7 +1,6 @@ package repository import ( - "bytes" "io" "math/rand" "os" @@ -48,26 +47,22 @@ func randomID(rd io.Reader) restic.ID { const maxBlobSize = 1 << 20 -func saveFile(t testing.TB, be Saver, filename string, n int) { +func saveFile(t testing.TB, be Saver, filename string, id restic.ID) { f, err := os.Open(filename) if err != nil { t.Fatal(err) } - data := make([]byte, n) - m, err := io.ReadFull(f, data) + defer func() { + if err := f.Close(); err != nil { + t.Fatal(err) + } + }() - if m != n { - t.Fatalf("read wrong number of bytes from %v: want %v, got %v", filename, m, n) - } + h := restic.Handle{Type: restic.DataFile, Name: id.String()} + t.Logf("save file %v", h) - if err = f.Close(); err != nil { - t.Fatal(err) - } - - h := restic.Handle{Type: restic.DataFile, Name: restic.Hash(data).String()} - - err = be.Save(h, bytes.NewReader(data)) + err = be.Save(h, f) if err != nil { t.Fatal(err) } @@ -107,13 +102,13 @@ func fillPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager, buf [ continue } - bytesWritten, err := packer.Finalize() + _, err = packer.Finalize() if err != nil { t.Fatal(err) } - tmpfile := packer.Writer().(*os.File) - saveFile(t, be, tmpfile.Name(), int(bytesWritten)) + packID := restic.IDFromHash(packer.hw.Sum(nil)) + saveFile(t, be, packer.tmpfile.Name(), packID) } return bytes @@ -121,15 +116,15 @@ func fillPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager, buf [ func flushRemainingPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager) (bytes int) { if pm.countPacker() > 0 { - for _, packer := range pm.packs { + for _, packer := range pm.packers { n, err := packer.Finalize() if err != nil { t.Fatal(err) } bytes += int(n) - tmpfile := packer.Writer().(*os.File) - saveFile(t, be, tmpfile.Name(), bytes) + packID := restic.IDFromHash(packer.hw.Sum(nil)) + saveFile(t, be, packer.tmpfile.Name(), packID) } } @@ -156,16 +151,15 @@ func BenchmarkPackerManager(t *testing.B) { be := &mock.Backend{ SaveFn: func(restic.Handle, io.Reader) error { return nil }, } - pm := newPackerManager(be, crypto.NewRandomKey()) blobBuf := make([]byte, maxBlobSize) t.ResetTimer() - bytes := 0 for i := 0; i < t.N; i++ { + bytes := 0 + pm := newPackerManager(be, crypto.NewRandomKey()) bytes += fillPacks(t, rnd, be, pm, blobBuf) + bytes += flushRemainingPacks(t, rnd, be, pm) + t.Logf("saved %d bytes", bytes) } - - bytes += flushRemainingPacks(t, rnd, be, pm) - t.Logf("saved %d bytes", bytes) } diff --git a/src/restic/repository/repository.go b/src/restic/repository/repository.go index ac6aee87e..56229233f 100644 --- a/src/restic/repository/repository.go +++ b/src/restic/repository/repository.go @@ -235,15 +235,15 @@ func (r *Repository) Flush() error { r.pm.Lock() defer r.pm.Unlock() - debug.Log("manually flushing %d packs", len(r.packs)) + debug.Log("manually flushing %d packs", len(r.packerManager.packers)) - for _, p := range r.packs { + for _, p := range r.packerManager.packers { err := r.savePacker(p) if err != nil { return err } } - r.packs = r.packs[:0] + r.packerManager.packers = r.packerManager.packers[:0] return nil }