forked from TrueCloudLab/restic
Merge pull request #481 from restic/fix-memory-usage
Use tempfiles for not-yet-uploaded pack files
This commit is contained in:
commit
afd0eb7f67
7 changed files with 271 additions and 30 deletions
|
@ -27,6 +27,7 @@ func (cmd CmdInit) Execute(args []string) error {
|
|||
}
|
||||
|
||||
s := repository.New(be)
|
||||
|
||||
err = s.Init(cmd.global.password)
|
||||
if err != nil {
|
||||
cmd.global.Exitf(1, "creating key in backend at %s failed: %v\n", cmd.global.Repo, err)
|
||||
|
|
|
@ -79,6 +79,7 @@ func testArchiverDuplication(t *testing.T) {
|
|||
}
|
||||
|
||||
repo := repository.New(forgetfulBackend())
|
||||
|
||||
err = repo.Init("foo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -83,26 +83,29 @@ type Packer struct {
|
|||
|
||||
bytes uint
|
||||
k *crypto.Key
|
||||
buf *bytes.Buffer
|
||||
wr io.Writer
|
||||
|
||||
m sync.Mutex
|
||||
}
|
||||
|
||||
// NewPacker returns a new Packer that can be used to pack blobs
|
||||
// together.
|
||||
func NewPacker(k *crypto.Key, buf []byte) *Packer {
|
||||
return &Packer{k: k, buf: bytes.NewBuffer(buf)}
|
||||
// together. If wr is nil, a bytes.Buffer is used.
|
||||
func NewPacker(k *crypto.Key, wr io.Writer) *Packer {
|
||||
if wr == nil {
|
||||
wr = bytes.NewBuffer(nil)
|
||||
}
|
||||
return &Packer{k: k, wr: wr}
|
||||
}
|
||||
|
||||
// Add saves the data read from rd as a new blob to the packer. Returned is the
|
||||
// number of bytes written to the pack.
|
||||
func (p *Packer) Add(t BlobType, id backend.ID, rd io.Reader) (int64, error) {
|
||||
func (p *Packer) Add(t BlobType, id backend.ID, data []byte) (int, error) {
|
||||
p.m.Lock()
|
||||
defer p.m.Unlock()
|
||||
|
||||
c := Blob{Type: t, ID: id}
|
||||
|
||||
n, err := io.Copy(p.buf, rd)
|
||||
n, err := p.wr.Write(data)
|
||||
c.Length = uint(n)
|
||||
c.Offset = p.bytes
|
||||
p.bytes += uint(n)
|
||||
|
@ -121,8 +124,9 @@ type headerEntry struct {
|
|||
}
|
||||
|
||||
// Finalize writes the header for all added blobs and finalizes the pack.
|
||||
// Returned are all bytes written, including the header.
|
||||
func (p *Packer) Finalize() ([]byte, error) {
|
||||
// Returned are the number of bytes written, including the header. If the
|
||||
// underlying writer implements io.Closer, it is closed.
|
||||
func (p *Packer) Finalize() (uint, error) {
|
||||
p.m.Lock()
|
||||
defer p.m.Unlock()
|
||||
|
||||
|
@ -131,37 +135,41 @@ func (p *Packer) Finalize() ([]byte, error) {
|
|||
hdrBuf := bytes.NewBuffer(nil)
|
||||
bytesHeader, err := p.writeHeader(hdrBuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
encryptedHeader, err := crypto.Encrypt(p.k, nil, hdrBuf.Bytes())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// append the header
|
||||
n, err := p.buf.Write(encryptedHeader)
|
||||
n, err := p.wr.Write(encryptedHeader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return 0, err
|
||||
}
|
||||
|
||||
hdrBytes := bytesHeader + crypto.Extension
|
||||
if uint(n) != hdrBytes {
|
||||
return nil, errors.New("wrong number of bytes written")
|
||||
return 0, errors.New("wrong number of bytes written")
|
||||
}
|
||||
|
||||
bytesWritten += hdrBytes
|
||||
|
||||
// write length
|
||||
err = binary.Write(p.buf, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension))
|
||||
err = binary.Write(p.wr, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return 0, err
|
||||
}
|
||||
bytesWritten += uint(binary.Size(uint32(0)))
|
||||
|
||||
p.bytes = uint(bytesWritten)
|
||||
|
||||
return p.buf.Bytes(), nil
|
||||
if w, ok := p.wr.(io.Closer); ok {
|
||||
return bytesWritten, w.Close()
|
||||
}
|
||||
|
||||
return bytesWritten, nil
|
||||
}
|
||||
|
||||
// writeHeader constructs and writes the header to wr.
|
||||
|
@ -208,6 +216,11 @@ func (p *Packer) Blobs() []Blob {
|
|||
return p.blobs
|
||||
}
|
||||
|
||||
// Writer return the underlying writer.
|
||||
func (p *Packer) Writer() io.Writer {
|
||||
return p.wr
|
||||
}
|
||||
|
||||
func (p *Packer) String() string {
|
||||
return fmt.Sprintf("<Packer %d blobs, %d bytes>", len(p.blobs), p.bytes)
|
||||
}
|
||||
|
|
|
@ -38,12 +38,13 @@ func newPack(t testing.TB, k *crypto.Key) ([]Buf, []byte, uint) {
|
|||
// pack blobs
|
||||
p := pack.NewPacker(k, nil)
|
||||
for _, b := range bufs {
|
||||
p.Add(pack.Tree, b.id, bytes.NewReader(b.data))
|
||||
p.Add(pack.Tree, b.id, b.data)
|
||||
}
|
||||
|
||||
packData, err := p.Finalize()
|
||||
_, err := p.Finalize()
|
||||
OK(t, err)
|
||||
|
||||
packData := p.Writer().(*bytes.Buffer).Bytes()
|
||||
return bufs, packData, p.Size()
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"restic/backend"
|
||||
|
@ -9,21 +13,42 @@ import (
|
|||
"restic/pack"
|
||||
)
|
||||
|
||||
// Saver implements saving data in a backend.
|
||||
type Saver interface {
|
||||
Save(h backend.Handle, jp []byte) error
|
||||
}
|
||||
|
||||
// packerManager keeps a list of open packs and creates new on demand.
|
||||
type packerManager struct {
|
||||
be backend.Backend
|
||||
be Saver
|
||||
key *crypto.Key
|
||||
pm sync.Mutex
|
||||
packs []*pack.Packer
|
||||
|
||||
pool sync.Pool
|
||||
}
|
||||
|
||||
const minPackSize = 4 * 1024 * 1024
|
||||
const maxPackSize = 16 * 1024 * 1024
|
||||
const maxPackers = 200
|
||||
|
||||
// newPackerManager returns an new packer manager which writes temporary files
|
||||
// to a temporary directory
|
||||
func newPackerManager(be Saver, key *crypto.Key) *packerManager {
|
||||
return &packerManager{
|
||||
be: be,
|
||||
key: key,
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, (minPackSize+maxPackSize)/2)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// findPacker returns a packer for a new blob of size bytes. Either a new one is
|
||||
// created or one is returned that already has some blobs.
|
||||
func (r *packerManager) findPacker(size uint) (*pack.Packer, error) {
|
||||
func (r *packerManager) findPacker(size uint) (packer *pack.Packer, err error) {
|
||||
r.pm.Lock()
|
||||
defer r.pm.Unlock()
|
||||
|
||||
|
@ -42,7 +67,12 @@ func (r *packerManager) findPacker(size uint) (*pack.Packer, error) {
|
|||
|
||||
// no suitable packer found, return new
|
||||
debug.Log("Repo.findPacker", "create new pack for %d bytes", size)
|
||||
return pack.NewPacker(r.key, nil), nil
|
||||
tmpfile, err := ioutil.TempFile("", "restic-temp-pack-")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return pack.NewPacker(r.key, tmpfile), nil
|
||||
}
|
||||
|
||||
// insertPacker appends p to s.packs.
|
||||
|
@ -57,11 +87,28 @@ func (r *packerManager) insertPacker(p *pack.Packer) {
|
|||
// savePacker stores p in the backend.
|
||||
func (r *Repository) savePacker(p *pack.Packer) error {
|
||||
debug.Log("Repo.savePacker", "save packer with %d blobs\n", p.Count())
|
||||
data, err := p.Finalize()
|
||||
n, err := p.Finalize()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tmpfile := p.Writer().(*os.File)
|
||||
f, err := os.Open(tmpfile.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data := make([]byte, n)
|
||||
m, err := io.ReadFull(f, data)
|
||||
|
||||
if uint(m) != n {
|
||||
return fmt.Errorf("read wrong number of bytes from %v: want %v, got %v", tmpfile.Name(), n, m)
|
||||
}
|
||||
|
||||
if err = f.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id := backend.Hash(data)
|
||||
h := backend.Handle{Type: backend.Data, Name: id.String()}
|
||||
|
||||
|
@ -73,6 +120,11 @@ func (r *Repository) savePacker(p *pack.Packer) error {
|
|||
|
||||
debug.Log("Repo.savePacker", "saved as %v", h)
|
||||
|
||||
err = os.Remove(tmpfile.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update blobs in the index
|
||||
for _, b := range p.Blobs() {
|
||||
debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), id.Str())
|
||||
|
|
174
src/restic/repository/packer_manager_test.go
Normal file
174
src/restic/repository/packer_manager_test.go
Normal file
|
@ -0,0 +1,174 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"restic/backend"
|
||||
"restic/backend/mem"
|
||||
"restic/crypto"
|
||||
"restic/pack"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type randReader struct {
|
||||
src rand.Source
|
||||
rand *rand.Rand
|
||||
}
|
||||
|
||||
func newRandReader(src rand.Source) *randReader {
|
||||
return &randReader{
|
||||
src: src,
|
||||
rand: rand.New(src),
|
||||
}
|
||||
}
|
||||
|
||||
// Read generates len(p) random bytes and writes them into p. It
|
||||
// always returns len(p) and a nil error.
|
||||
func (r *randReader) Read(p []byte) (n int, err error) {
|
||||
for i := 0; i < len(p); i += 7 {
|
||||
val := r.src.Int63()
|
||||
for j := 0; i+j < len(p) && j < 7; j++ {
|
||||
p[i+j] = byte(val)
|
||||
val >>= 8
|
||||
}
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func randomID(rd io.Reader) backend.ID {
|
||||
id := backend.ID{}
|
||||
_, err := io.ReadFull(rd, id[:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
const maxBlobSize = 1 << 20
|
||||
|
||||
func saveFile(t testing.TB, be Saver, filename string, n int) {
|
||||
f, err := os.Open(filename)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
data := make([]byte, n)
|
||||
m, err := io.ReadFull(f, data)
|
||||
|
||||
if m != n {
|
||||
t.Fatalf("read wrong number of bytes from %v: want %v, got %v", filename, m, n)
|
||||
}
|
||||
|
||||
if err = f.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
h := backend.Handle{Type: backend.Data, Name: backend.Hash(data).String()}
|
||||
|
||||
err = be.Save(h, data)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = os.Remove(filename)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func fillPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager, buf []byte) (bytes int) {
|
||||
for i := 0; i < 100; i++ {
|
||||
l := rnd.rand.Intn(1 << 20)
|
||||
seed := rnd.rand.Int63()
|
||||
|
||||
packer, err := pm.findPacker(uint(l))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
rd := newRandReader(rand.NewSource(seed))
|
||||
id := randomID(rd)
|
||||
buf = buf[:l]
|
||||
_, err = io.ReadFull(rd, buf)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
n, err := packer.Add(pack.Data, id, buf)
|
||||
if n != l {
|
||||
t.Errorf("Add() returned invalid number of bytes: want %v, got %v", n, l)
|
||||
}
|
||||
bytes += l
|
||||
|
||||
if packer.Size() < minPackSize && pm.countPacker() < maxPackers {
|
||||
pm.insertPacker(packer)
|
||||
continue
|
||||
}
|
||||
|
||||
bytesWritten, err := packer.Finalize()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
tmpfile := packer.Writer().(*os.File)
|
||||
saveFile(t, be, tmpfile.Name(), int(bytesWritten))
|
||||
}
|
||||
|
||||
return bytes
|
||||
}
|
||||
|
||||
func flushRemainingPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager) (bytes int) {
|
||||
if pm.countPacker() > 0 {
|
||||
for _, packer := range pm.packs {
|
||||
n, err := packer.Finalize()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
bytes += int(n)
|
||||
|
||||
tmpfile := packer.Writer().(*os.File)
|
||||
saveFile(t, be, tmpfile.Name(), bytes)
|
||||
}
|
||||
}
|
||||
|
||||
return bytes
|
||||
}
|
||||
|
||||
type fakeBackend struct{}
|
||||
|
||||
func (f *fakeBackend) Save(h backend.Handle, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestPackerManager(t *testing.T) {
|
||||
rnd := newRandReader(rand.NewSource(23))
|
||||
|
||||
be := mem.New()
|
||||
pm := newPackerManager(be, crypto.NewRandomKey())
|
||||
|
||||
blobBuf := make([]byte, maxBlobSize)
|
||||
|
||||
bytes := fillPacks(t, rnd, be, pm, blobBuf)
|
||||
bytes += flushRemainingPacks(t, rnd, be, pm)
|
||||
|
||||
t.Logf("saved %d bytes", bytes)
|
||||
}
|
||||
|
||||
func BenchmarkPackerManager(t *testing.B) {
|
||||
rnd := newRandReader(rand.NewSource(23))
|
||||
|
||||
be := &fakeBackend{}
|
||||
pm := newPackerManager(be, crypto.NewRandomKey())
|
||||
blobBuf := make([]byte, maxBlobSize)
|
||||
|
||||
t.ResetTimer()
|
||||
|
||||
bytes := 0
|
||||
for i := 0; i < t.N; i++ {
|
||||
bytes += fillPacks(t, rnd, be, pm, blobBuf)
|
||||
}
|
||||
|
||||
bytes += flushRemainingPacks(t, rnd, be, pm)
|
||||
t.Logf("saved %d bytes", bytes)
|
||||
}
|
|
@ -28,13 +28,13 @@ type Repository struct {
|
|||
|
||||
// New returns a new repository with backend be.
|
||||
func New(be backend.Backend) *Repository {
|
||||
return &Repository{
|
||||
be: be,
|
||||
idx: NewMasterIndex(),
|
||||
packerManager: &packerManager{
|
||||
be: be,
|
||||
},
|
||||
repo := &Repository{
|
||||
be: be,
|
||||
idx: NewMasterIndex(),
|
||||
packerManager: newPackerManager(be, nil),
|
||||
}
|
||||
|
||||
return repo
|
||||
}
|
||||
|
||||
// Find loads the list of all blobs of type t and searches for names which start
|
||||
|
@ -195,7 +195,7 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
|
|||
}
|
||||
|
||||
// save ciphertext
|
||||
_, err = packer.Add(t, *id, bytes.NewReader(ciphertext))
|
||||
_, err = packer.Add(t, *id, ciphertext)
|
||||
if err != nil {
|
||||
return backend.ID{}, err
|
||||
}
|
||||
|
@ -299,7 +299,6 @@ func (r *Repository) Flush() error {
|
|||
}
|
||||
}
|
||||
r.packs = r.packs[:0]
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue