repository: Limit to a single pending pack file

Use only a single not completed pack file to keep the number of open and
active pack files low. The main change here is to defer hashing the pack
file to the upload step. This prevents the pack assembly step to become
a bottleneck as the only task is now to write data to the temporary pack
file.

The tests are cleaned up to no longer reimplement packer manager
functions.
This commit is contained in:
Michael Eischer 2021-08-22 15:10:00 +02:00
parent fa25d6118e
commit 753e56ee29
3 changed files with 95 additions and 171 deletions

View file

@ -1,9 +1,10 @@
package repository
import (
"bufio"
"context"
"hash"
"io"
"io/ioutil"
"os"
"runtime"
"sync"
@ -23,31 +24,28 @@ import (
// Packer holds a pack.Packer together with a hash writer.
type Packer struct {
*pack.Packer
hw *hashing.Writer
beHw *hashing.Writer
tmpfile *os.File
bufWr *bufio.Writer
}
// packerManager keeps a list of open packs and creates new on demand.
type packerManager struct {
tpe restic.BlobType
key *crypto.Key
hasherFn func() hash.Hash
queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error
pm sync.Mutex
packers []*Packer
packer *Packer
}
const minPackSize = 4 * 1024 * 1024
// newPackerManager returns an new packer manager which writes temporary files
// to a temporary directory
func newPackerManager(key *crypto.Key, hasherFn func() hash.Hash, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
func newPackerManager(key *crypto.Key, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
return &packerManager{
tpe: tpe,
key: key,
hasherFn: hasherFn,
queueFn: queueFn,
}
}
@ -56,24 +54,34 @@ func (r *packerManager) Flush(ctx context.Context) error {
r.pm.Lock()
defer r.pm.Unlock()
debug.Log("manually flushing %d packs", len(r.packers))
for _, packer := range r.packers {
err := r.queueFn(ctx, r.tpe, packer)
if r.packer != nil {
debug.Log("manually flushing pending pack")
err := r.queueFn(ctx, r.tpe, r.packer)
if err != nil {
return err
}
r.packer = nil
}
r.packers = r.packers[:0]
return nil
}
func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) {
packer, err := r.findPacker()
r.pm.Lock()
defer r.pm.Unlock()
var err error
packer := r.packer
if r.packer == nil {
packer, err = r.newPacker()
if err != nil {
return 0, err
}
}
// remember packer
r.packer = packer
// save ciphertext
// Add only appends bytes in memory to avoid being a scaling bottleneck
size, err := packer.Add(t, id, ciphertext, uncompressedLength)
if err != nil {
return 0, err
@ -82,10 +90,12 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest
// if the pack is not full enough, put back to the list
if packer.Size() < minPackSize {
debug.Log("pack is not full enough (%d bytes)", packer.Size())
r.insertPacker(packer)
return size, nil
}
// forget full packer
r.packer = nil
// call while holding lock to prevent findPacker from creating new packers if the uploaders are busy
// else write the pack to the backend
err = r.queueFn(ctx, t, packer)
if err != nil {
@ -97,56 +107,24 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest
// findPacker returns a packer for a new blob of size bytes. Either a new one is
// created or one is returned that already has some blobs.
func (r *packerManager) findPacker() (packer *Packer, err error) {
r.pm.Lock()
defer r.pm.Unlock()
// search for a suitable packer
if len(r.packers) > 0 {
p := r.packers[0]
last := len(r.packers) - 1
r.packers[0] = r.packers[last]
r.packers[last] = nil // Allow GC of stale reference.
r.packers = r.packers[:last]
return p, nil
}
// no suitable packer found, return new
func (r *packerManager) newPacker() (packer *Packer, err error) {
debug.Log("create new pack")
tmpfile, err := fs.TempFile("", "restic-temp-pack-")
if err != nil {
return nil, errors.Wrap(err, "fs.TempFile")
}
w := io.Writer(tmpfile)
beHasher := r.hasherFn()
var beHw *hashing.Writer
if beHasher != nil {
beHw = hashing.NewWriter(w, beHasher)
w = beHw
}
hw := hashing.NewWriter(w, sha256.New())
p := pack.NewPacker(r.key, hw)
bufWr := bufio.NewWriter(tmpfile)
p := pack.NewPacker(r.key, bufWr)
packer = &Packer{
Packer: p,
beHw: beHw,
hw: hw,
tmpfile: tmpfile,
bufWr: bufWr,
}
return packer, nil
}
// insertPacker appends p to s.packs.
func (r *packerManager) insertPacker(p *Packer) {
r.pm.Lock()
defer r.pm.Unlock()
r.packers = append(r.packers, p)
debug.Log("%d packers\n", len(r.packers))
}
// savePacker stores p in the backend.
func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error {
debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size())
@ -154,20 +132,43 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe
if err != nil {
return err
}
id := restic.IDFromHash(p.hw.Sum(nil))
h := restic.Handle{Type: restic.PackFile, Name: id.String(),
ContainedBlobType: t}
var beHash []byte
if p.beHw != nil {
beHash = p.beHw.Sum(nil)
}
rd, err := restic.NewFileReader(p.tmpfile, beHash)
err = p.bufWr.Flush()
if err != nil {
return err
}
err = r.be.Save(ctx, h, rd)
// calculate sha256 hash in a second pass
var rd io.Reader
rd, err = restic.NewFileReader(p.tmpfile, nil)
if err != nil {
return err
}
beHasher := r.be.Hasher()
var beHr *hashing.Reader
if beHasher != nil {
beHr = hashing.NewReader(rd, beHasher)
rd = beHr
}
hr := hashing.NewReader(rd, sha256.New())
_, err = io.Copy(ioutil.Discard, hr)
if err != nil {
return err
}
id := restic.IDFromHash(hr.Sum(nil))
h := restic.Handle{Type: restic.PackFile, Name: id.String(),
ContainedBlobType: t}
var beHash []byte
if beHr != nil {
beHash = beHr.Sum(nil)
}
rrd, err := restic.NewFileReader(p.tmpfile, beHash)
if err != nil {
return err
}
err = r.be.Save(ctx, h, rrd)
if err != nil {
debug.Log("Save(%v) error: %v", h, err)
return err
@ -198,11 +199,3 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe
}
return r.idx.SaveFullIndex(ctx, r)
}
// countPacker returns the number of open (unfinished) packers.
func (r *packerManager) countPacker() int {
r.pm.Lock()
defer r.pm.Unlock()
return len(r.packers)
}

View file

@ -2,26 +2,16 @@ package repository
import (
"context"
"hash"
"io"
"math/rand"
"os"
"sync"
"testing"
"github.com/restic/restic/internal/backend/mem"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/mock"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
)
// Saver implements saving data in a backend.
type Saver interface {
Save(context.Context, restic.Handle, restic.RewindReader) error
Hasher() hash.Hash
}
func randomID(rd io.Reader) restic.ID {
id := restic.ID{}
_, err := io.ReadFull(rd, id[:])
@ -40,91 +30,27 @@ func min(a, b int) int {
return b
}
func saveFile(t testing.TB, be Saver, length int, f *os.File, id restic.ID, hash []byte) {
h := restic.Handle{Type: restic.PackFile, Name: id.String()}
t.Logf("save file %v", h)
rd, err := restic.NewFileReader(f, hash)
if err != nil {
t.Fatal(err)
}
err = be.Save(context.TODO(), h, rd)
if err != nil {
t.Fatal(err)
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := fs.RemoveIfExists(f.Name()); err != nil {
t.Fatal(err)
}
}
func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf []byte) (bytes int) {
func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) {
for i := 0; i < 100; i++ {
l := rnd.Intn(maxBlobSize)
packer, err := pm.findPacker()
if err != nil {
t.Fatal(err)
}
id := randomID(rnd)
buf = buf[:l]
// Only change a few bytes so we know we're not benchmarking the RNG.
rnd.Read(buf[:min(l, 4)])
n, err := packer.Add(restic.DataBlob, id, buf, 0)
n, err := pm.SaveBlob(context.TODO(), restic.DataBlob, id, buf, 0)
if err != nil {
t.Fatal(err)
}
if n != l+37 {
if n != l+37 && n != l+37+36 {
t.Errorf("Add() returned invalid number of bytes: want %v, got %v", l, n)
}
bytes += l
if packer.Size() < minPackSize {
pm.insertPacker(packer)
continue
bytes += n
}
err = packer.Finalize()
err := pm.Flush(context.TODO())
if err != nil {
t.Fatal(err)
}
packID := restic.IDFromHash(packer.hw.Sum(nil))
var beHash []byte
if packer.beHw != nil {
beHash = packer.beHw.Sum(nil)
}
saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash)
}
return bytes
}
func flushRemainingPacks(t testing.TB, be Saver, pm *packerManager) (bytes int) {
if pm.countPacker() > 0 {
for _, packer := range pm.packers {
err := packer.Finalize()
if err != nil {
t.Fatal(err)
}
bytes += packer.HeaderOverhead()
packID := restic.IDFromHash(packer.hw.Sum(nil))
var beHash []byte
if packer.beHw != nil {
beHash = packer.beHw.Sum(nil)
}
saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash)
}
}
return bytes
}
@ -143,13 +69,21 @@ func TestPackerManager(t *testing.T) {
func testPackerManager(t testing.TB) int64 {
rnd := rand.New(rand.NewSource(randomSeed))
be := mem.New()
pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil)
savedBytes := int(0)
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, tp restic.BlobType, p *Packer) error {
err := p.Finalize()
if err != nil {
return err
}
savedBytes += int(p.Size())
return nil
})
blobBuf := make([]byte, maxBlobSize)
bytes := fillPacks(t, rnd, be, pm, blobBuf)
bytes += flushRemainingPacks(t, be, pm)
bytes := fillPacks(t, rnd, pm, blobBuf)
// bytes does not include the last packs header
test.Equals(t, savedBytes, bytes+36)
t.Logf("saved %d bytes", bytes)
return int64(bytes)
@ -162,10 +96,6 @@ func BenchmarkPackerManager(t *testing.B) {
})
rnd := rand.New(rand.NewSource(randomSeed))
be := &mock.Backend{
SaveFn: func(context.Context, restic.Handle, restic.RewindReader) error { return nil },
}
blobBuf := make([]byte, maxBlobSize)
t.ReportAllocs()
@ -174,8 +104,9 @@ func BenchmarkPackerManager(t *testing.B) {
for i := 0; i < t.N; i++ {
rnd.Seed(randomSeed)
pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil)
fillPacks(t, rnd, be, pm, blobBuf)
flushRemainingPacks(t, be, pm)
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, t restic.BlobType, p *Packer) error {
return nil
})
fillPacks(t, rnd, pm, blobBuf)
}
}

View file

@ -520,8 +520,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group)
innerWg, ctx := errgroup.WithContext(ctx)
r.packerWg = innerWg
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections())
r.treePM = newPackerManager(r.key, r.be.Hasher, restic.TreeBlob, r.uploader.QueuePacker)
r.dataPM = newPackerManager(r.key, r.be.Hasher, restic.DataBlob, r.uploader.QueuePacker)
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.uploader.QueuePacker)
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.uploader.QueuePacker)
wg.Go(func() error {
return innerWg.Wait()