Merge pull request #3489 from MichaelEischer/async-pack-uploads

Asynchronously upload pack files
This commit is contained in:
MichaelEischer 2022-07-03 11:56:05 +02:00 committed by GitHub
commit cd50feb66f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 441 additions and 271 deletions

View file

@ -0,0 +1,13 @@
Enhancement: Improve backup speed with many small files
We have restructured the backup pipeline to continue reading files while all
upload connections are busy. This allows the backup to already prepare the next
data file such that the upload can continue right once the last one has
completed. This can especially improve the backup performance for high latency
backends.
The upload concurrency is now controlled using the `-o <backend-name>.connections=5`
option.
https://github.com/restic/restic/issues/2696
https://github.com/restic/restic/pull/3489

View file

@ -9,6 +9,7 @@ import (
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
var cmdRecover = &cobra.Command{
@ -131,15 +132,27 @@ func runRecover(gopts GlobalOptions) error {
}
}
treeID, err := repo.SaveTree(gopts.ctx, tree)
wg, ctx := errgroup.WithContext(gopts.ctx)
repo.StartPackUploader(ctx, wg)
var treeID restic.ID
wg.Go(func() error {
var err error
treeID, err = repo.SaveTree(ctx, tree)
if err != nil {
return errors.Fatalf("unable to save new tree to the repo: %v", err)
}
err = repo.Flush(gopts.ctx)
err = repo.Flush(ctx)
if err != nil {
return errors.Fatalf("unable to save blobs to the repo: %v", err)
}
return nil
})
err = wg.Wait()
if err != nil {
return err
}
return createSnapshot(gopts.ctx, "/recover", hostname, []string{"recovered"}, repo, &treeID)

View file

@ -89,10 +89,6 @@ func changeTags(ctx context.Context, repo *repository.Repository, sn *restic.Sna
debug.Log("new snapshot saved as %v", id)
if err = repo.Flush(ctx); err != nil {
return false, err
}
// Remove the old snapshot.
h := restic.Handle{Type: restic.SnapshotFile, Name: sn.ID().String()}
if err = repo.Backend().Remove(ctx, h); err != nil {

View file

@ -0,0 +1,40 @@
..
Normally, there are no heading levels assigned to certain characters as the structure is
determined from the succession of headings. However, this convention is used in Pythons
Style Guide for documenting which you may follow:
# with overline, for parts
* for chapters
= for sections
- for subsections
^ for subsubsections
" for paragraphs
########################
Tuning Backup Parameters
########################
Restic offers a few parameters that allow tuning the backup. The default values should
work well in general although specific use cases can benefit from different non-default
values. As the restic commands evolve over time, the optimal value for each parameter
can also change across restic versions.
Backend Connections
===================
Restic uses a global limit for the number of concurrent connections to a backend.
This limit can be configured using ``-o <backend-name>.connections=5``, for example for
the REST backend the parameter would be ``-o rest.connections=5``. By default restic uses
``5`` connections for each backend, except for the local backend which uses a limit of ``2``.
The defaults should work well in most cases. For high-latency backends it can be beneficial
to increase the number of connections. Please be aware that this increases the resource
consumption of restic and that a too high connection count *will degrade performace*.
Compression
===========
For a repository using a least repository format version 2, you can configure how data
is compressed with the option ``--compression``. It can be set to ```auto``` (the default,
which will compress very fast), ``max`` (which will trade backup speed and CPU usage for
slightly better compression), or ``off`` (which disables compression). Each setting is
only applied for the single run of restic.

View file

@ -9,6 +9,7 @@ Restic Documentation
030_preparing_a_new_repo
040_backup
045_working_with_repos
047_tuning_backup_parameters
050_restore
060_forget
070_encryption

View file

@ -122,13 +122,18 @@ func (o Options) ApplyDefaults() Options {
}
if o.SaveBlobConcurrency == 0 {
o.SaveBlobConcurrency = uint(runtime.NumCPU())
// blob saving is CPU bound due to hash checking and encryption
// the actual upload is handled by the repository itself
o.SaveBlobConcurrency = uint(runtime.GOMAXPROCS(0))
}
if o.SaveTreeConcurrency == 0 {
// use a relatively high concurrency here, having multiple SaveTree
// workers is cheap
o.SaveTreeConcurrency = o.SaveBlobConcurrency * 20
// can either wait for a file, wait for a tree, serialize a tree or wait for saveblob
// the last two are cpu-bound and thus mutually exclusive.
// Also allow waiting for FileReadConcurrency files, this is the maximum of FutureFiles
// which currently can be in progress. The main backup loop blocks when trying to queue
// more files to read.
o.SaveTreeConcurrency = uint(runtime.GOMAXPROCS(0)) + o.FileReadConcurrency
}
return o
@ -801,10 +806,15 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
return nil, restic.ID{}, err
}
wg, wgCtx := errgroup.WithContext(ctx)
var rootTreeID restic.ID
wgUp, wgUpCtx := errgroup.WithContext(ctx)
arch.Repo.StartPackUploader(wgUpCtx, wgUp)
wgUp.Go(func() error {
wg, wgCtx := errgroup.WithContext(wgUpCtx)
start := time.Now()
var rootTreeID restic.ID
var stats ItemStats
wg.Go(func() error {
arch.runWorkers(wgCtx, wg)
@ -829,12 +839,14 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
if err != nil {
debug.Log("error while saving tree: %v", err)
return nil, restic.ID{}, err
return err
}
arch.CompleteItem("/", nil, nil, stats, time.Since(start))
err = arch.Repo.Flush(ctx)
return arch.Repo.Flush(ctx)
})
err = wgUp.Wait()
if err != nil {
return nil, restic.ID{}, err
}

View file

@ -42,6 +42,7 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo rest
func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) {
wg, ctx := errgroup.WithContext(context.TODO())
repo.StartPackUploader(ctx, wg)
arch := New(repo, filesystem, Options{})
arch.runWorkers(ctx, wg)
@ -213,6 +214,7 @@ func TestArchiverSave(t *testing.T) {
defer cleanup()
wg, ctx := errgroup.WithContext(ctx)
repo.StartPackUploader(ctx, wg)
arch := New(repo, fs.Track{FS: fs.Local{}}, Options{})
arch.Error = func(item string, fi os.FileInfo, err error) error {
@ -281,6 +283,7 @@ func TestArchiverSaveReaderFS(t *testing.T) {
defer cleanup()
wg, ctx := errgroup.WithContext(ctx)
repo.StartPackUploader(ctx, wg)
ts := time.Now()
filename := "xx"
@ -830,6 +833,7 @@ func TestArchiverSaveDir(t *testing.T) {
defer cleanup()
wg, ctx := errgroup.WithContext(context.Background())
repo.StartPackUploader(ctx, wg)
arch := New(repo, fs.Track{FS: fs.Local{}}, Options{})
arch.runWorkers(ctx, wg)
@ -914,6 +918,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
// archiver did save the same tree several times
for i := 0; i < 5; i++ {
wg, ctx := errgroup.WithContext(context.TODO())
repo.StartPackUploader(ctx, wg)
arch := New(repo, fs.Track{FS: fs.Local{}}, Options{})
arch.runWorkers(ctx, wg)
@ -1097,6 +1102,7 @@ func TestArchiverSaveTree(t *testing.T) {
}
wg, ctx := errgroup.WithContext(context.TODO())
repo.StartPackUploader(ctx, wg)
arch.runWorkers(ctx, wg)
@ -2239,6 +2245,7 @@ func TestRacyFileSwap(t *testing.T) {
defer cancel()
wg, ctx := errgroup.WithContext(ctx)
repo.StartPackUploader(ctx, wg)
arch := New(repo, fs.Track{FS: statfs}, Options{})
arch.Error = func(item string, fi os.FileInfo, err error) error {

View file

@ -20,6 +20,7 @@ import (
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
)
var checkerTestData = filepath.Join("testdata", "checker-test-repo.tar.gz")
@ -476,6 +477,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) {
Nodes: []*restic.Node{damagedNode},
}
wg, wgCtx := errgroup.WithContext(ctx)
repo.StartPackUploader(wgCtx, wg)
id, err := repo.SaveTree(ctx, damagedTree)
test.OK(t, repo.Flush(ctx))
test.OK(t, err)
@ -483,6 +486,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) {
buf, err := repo.LoadBlob(ctx, restic.TreeBlob, id, nil)
test.OK(t, err)
wg, wgCtx = errgroup.WithContext(ctx)
repo.StartPackUploader(wgCtx, wg)
_, _, _, err = repo.SaveBlob(ctx, restic.DataBlob, buf, id, false)
test.OK(t, err)

View file

@ -70,14 +70,13 @@ type compressedHeaderEntry struct {
}
// Finalize writes the header for all added blobs and finalizes the pack.
// Returned are the number of bytes written, not yet reported by Add.
func (p *Packer) Finalize() (int, error) {
func (p *Packer) Finalize() error {
p.m.Lock()
defer p.m.Unlock()
header, err := p.makeHeader()
if err != nil {
return 0, err
return err
}
encryptedHeader := make([]byte, 0, restic.CiphertextLength(len(header)))
@ -88,22 +87,27 @@ func (p *Packer) Finalize() (int, error) {
// append the header
n, err := p.wr.Write(encryptedHeader)
if err != nil {
return 0, errors.Wrap(err, "Write")
return errors.Wrap(err, "Write")
}
hdrBytes := len(encryptedHeader)
if n != hdrBytes {
return 0, errors.New("wrong number of bytes written")
return errors.New("wrong number of bytes written")
}
// write length
err = binary.Write(p.wr, binary.LittleEndian, uint32(hdrBytes))
if err != nil {
return 0, errors.Wrap(err, "binary.Write")
return errors.Wrap(err, "binary.Write")
}
p.bytes += uint(hdrBytes + binary.Size(uint32(0)))
return restic.CiphertextLength(0) + binary.Size(uint32(0)), nil
return nil
}
// HeaderOverhead returns an estimate of the number of bytes written by a call to Finalize.
func (p *Packer) HeaderOverhead() int {
return restic.CiphertextLength(0) + binary.Size(uint32(0))
}
// makeHeader constructs the header for p.

View file

@ -42,7 +42,7 @@ func newPack(t testing.TB, k *crypto.Key, lengths []int) ([]Buf, []byte, uint) {
rtest.OK(t, err)
}
_, err := p.Finalize()
err := p.Finalize()
rtest.OK(t, err)
return bufs, buf.Bytes(), p.Size()

View file

@ -9,6 +9,7 @@ import (
"github.com/restic/restic/internal/backend/mem"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
// Test saving a blob and loading it again, with varying buffer sizes.
@ -23,6 +24,9 @@ func FuzzSaveLoadBlob(f *testing.F) {
id := restic.Hash(blob)
repo, _ := TestRepositoryWithBackend(t, mem.New(), 2)
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
_, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, blob, id, false)
if err != nil {
t.Fatal(err)

View file

@ -1,9 +1,10 @@
package repository
import (
"bufio"
"context"
"hash"
"io"
"io/ioutil"
"os"
"runtime"
"sync"
@ -20,129 +21,171 @@ import (
"github.com/minio/sha256-simd"
)
// Saver implements saving data in a backend.
type Saver interface {
Save(context.Context, restic.Handle, restic.RewindReader) error
Hasher() hash.Hash
}
// Packer holds a pack.Packer together with a hash writer.
type Packer struct {
*pack.Packer
hw *hashing.Writer
beHw *hashing.Writer
tmpfile *os.File
bufWr *bufio.Writer
}
// packerManager keeps a list of open packs and creates new on demand.
type packerManager struct {
be Saver
tpe restic.BlobType
key *crypto.Key
queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error
pm sync.Mutex
packers []*Packer
packer *Packer
}
const minPackSize = 4 * 1024 * 1024
// newPackerManager returns an new packer manager which writes temporary files
// to a temporary directory
func newPackerManager(be Saver, key *crypto.Key) *packerManager {
func newPackerManager(key *crypto.Key, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
return &packerManager{
be: be,
tpe: tpe,
key: key,
queueFn: queueFn,
}
}
func (r *packerManager) Flush(ctx context.Context) error {
r.pm.Lock()
defer r.pm.Unlock()
if r.packer != nil {
debug.Log("manually flushing pending pack")
err := r.queueFn(ctx, r.tpe, r.packer)
if err != nil {
return err
}
r.packer = nil
}
return nil
}
func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) {
r.pm.Lock()
defer r.pm.Unlock()
var err error
packer := r.packer
if r.packer == nil {
packer, err = r.newPacker()
if err != nil {
return 0, err
}
}
// remember packer
r.packer = packer
// save ciphertext
// Add only appends bytes in memory to avoid being a scaling bottleneck
size, err := packer.Add(t, id, ciphertext, uncompressedLength)
if err != nil {
return 0, err
}
// if the pack is not full enough, put back to the list
if packer.Size() < minPackSize {
debug.Log("pack is not full enough (%d bytes)", packer.Size())
return size, nil
}
// forget full packer
r.packer = nil
// call while holding lock to prevent findPacker from creating new packers if the uploaders are busy
// else write the pack to the backend
err = r.queueFn(ctx, t, packer)
if err != nil {
return 0, err
}
return size + packer.HeaderOverhead(), nil
}
// findPacker returns a packer for a new blob of size bytes. Either a new one is
// created or one is returned that already has some blobs.
func (r *packerManager) findPacker() (packer *Packer, err error) {
r.pm.Lock()
defer r.pm.Unlock()
// search for a suitable packer
if len(r.packers) > 0 {
p := r.packers[0]
last := len(r.packers) - 1
r.packers[0] = r.packers[last]
r.packers[last] = nil // Allow GC of stale reference.
r.packers = r.packers[:last]
return p, nil
}
// no suitable packer found, return new
func (r *packerManager) newPacker() (packer *Packer, err error) {
debug.Log("create new pack")
tmpfile, err := fs.TempFile("", "restic-temp-pack-")
if err != nil {
return nil, errors.Wrap(err, "fs.TempFile")
}
w := io.Writer(tmpfile)
beHasher := r.be.Hasher()
var beHw *hashing.Writer
if beHasher != nil {
beHw = hashing.NewWriter(w, beHasher)
w = beHw
}
hw := hashing.NewWriter(w, sha256.New())
p := pack.NewPacker(r.key, hw)
bufWr := bufio.NewWriter(tmpfile)
p := pack.NewPacker(r.key, bufWr)
packer = &Packer{
Packer: p,
beHw: beHw,
hw: hw,
tmpfile: tmpfile,
bufWr: bufWr,
}
return packer, nil
}
// insertPacker appends p to s.packs.
func (r *packerManager) insertPacker(p *Packer) {
r.pm.Lock()
defer r.pm.Unlock()
r.packers = append(r.packers, p)
debug.Log("%d packers\n", len(r.packers))
}
// savePacker stores p in the backend.
func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) (int, error) {
func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error {
debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size())
hdrOverhead, err := p.Packer.Finalize()
err := p.Packer.Finalize()
if err != nil {
return 0, err
return err
}
err = p.bufWr.Flush()
if err != nil {
return err
}
id := restic.IDFromHash(p.hw.Sum(nil))
// calculate sha256 hash in a second pass
var rd io.Reader
rd, err = restic.NewFileReader(p.tmpfile, nil)
if err != nil {
return err
}
beHasher := r.be.Hasher()
var beHr *hashing.Reader
if beHasher != nil {
beHr = hashing.NewReader(rd, beHasher)
rd = beHr
}
hr := hashing.NewReader(rd, sha256.New())
_, err = io.Copy(ioutil.Discard, hr)
if err != nil {
return err
}
id := restic.IDFromHash(hr.Sum(nil))
h := restic.Handle{Type: restic.PackFile, Name: id.String(),
ContainedBlobType: t}
var beHash []byte
if p.beHw != nil {
beHash = p.beHw.Sum(nil)
if beHr != nil {
beHash = beHr.Sum(nil)
}
rd, err := restic.NewFileReader(p.tmpfile, beHash)
rrd, err := restic.NewFileReader(p.tmpfile, beHash)
if err != nil {
return 0, err
return err
}
err = r.be.Save(ctx, h, rd)
err = r.be.Save(ctx, h, rrd)
if err != nil {
debug.Log("Save(%v) error: %v", h, err)
return 0, err
return err
}
debug.Log("saved as %v", h)
err = p.tmpfile.Close()
if err != nil {
return 0, errors.Wrap(err, "close tempfile")
return errors.Wrap(err, "close tempfile")
}
// on windows the tempfile is automatically deleted on close
if runtime.GOOS != "windows" {
err = fs.RemoveIfExists(p.tmpfile.Name())
if err != nil {
return 0, errors.Wrap(err, "Remove")
return errors.Wrap(err, "Remove")
}
}
@ -152,15 +195,7 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe
// Save index if full
if r.noAutoIndexUpdate {
return hdrOverhead, nil
return nil
}
return hdrOverhead, r.idx.SaveFullIndex(ctx, r)
}
// countPacker returns the number of open (unfinished) packers.
func (r *packerManager) countPacker() int {
r.pm.Lock()
defer r.pm.Unlock()
return len(r.packers)
return r.idx.SaveFullIndex(ctx, r)
}

View file

@ -4,15 +4,12 @@ import (
"context"
"io"
"math/rand"
"os"
"sync"
"testing"
"github.com/restic/restic/internal/backend/mem"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/mock"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
)
func randomID(rd io.Reader) restic.ID {
@ -33,91 +30,27 @@ func min(a, b int) int {
return b
}
func saveFile(t testing.TB, be Saver, length int, f *os.File, id restic.ID, hash []byte) {
h := restic.Handle{Type: restic.PackFile, Name: id.String()}
t.Logf("save file %v", h)
rd, err := restic.NewFileReader(f, hash)
if err != nil {
t.Fatal(err)
}
err = be.Save(context.TODO(), h, rd)
if err != nil {
t.Fatal(err)
}
if err := f.Close(); err != nil {
t.Fatal(err)
}
if err := fs.RemoveIfExists(f.Name()); err != nil {
t.Fatal(err)
}
}
func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf []byte) (bytes int) {
func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) {
for i := 0; i < 100; i++ {
l := rnd.Intn(maxBlobSize)
packer, err := pm.findPacker()
if err != nil {
t.Fatal(err)
}
id := randomID(rnd)
buf = buf[:l]
// Only change a few bytes so we know we're not benchmarking the RNG.
rnd.Read(buf[:min(l, 4)])
n, err := packer.Add(restic.DataBlob, id, buf, 0)
n, err := pm.SaveBlob(context.TODO(), restic.DataBlob, id, buf, 0)
if err != nil {
t.Fatal(err)
}
if n != l+37 {
if n != l+37 && n != l+37+36 {
t.Errorf("Add() returned invalid number of bytes: want %v, got %v", l, n)
}
bytes += l
if packer.Size() < minPackSize {
pm.insertPacker(packer)
continue
}
_, err = packer.Finalize()
if err != nil {
t.Fatal(err)
}
packID := restic.IDFromHash(packer.hw.Sum(nil))
var beHash []byte
if packer.beHw != nil {
beHash = packer.beHw.Sum(nil)
}
saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash)
}
return bytes
}
func flushRemainingPacks(t testing.TB, be Saver, pm *packerManager) (bytes int) {
if pm.countPacker() > 0 {
for _, packer := range pm.packers {
n, err := packer.Finalize()
if err != nil {
t.Fatal(err)
}
bytes += n
packID := restic.IDFromHash(packer.hw.Sum(nil))
var beHash []byte
if packer.beHw != nil {
beHash = packer.beHw.Sum(nil)
}
saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash)
err := pm.Flush(context.TODO())
if err != nil {
t.Fatal(err)
}
}
return bytes
}
@ -136,13 +69,21 @@ func TestPackerManager(t *testing.T) {
func testPackerManager(t testing.TB) int64 {
rnd := rand.New(rand.NewSource(randomSeed))
be := mem.New()
pm := newPackerManager(be, crypto.NewRandomKey())
savedBytes := int(0)
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, tp restic.BlobType, p *Packer) error {
err := p.Finalize()
if err != nil {
return err
}
savedBytes += int(p.Size())
return nil
})
blobBuf := make([]byte, maxBlobSize)
bytes := fillPacks(t, rnd, be, pm, blobBuf)
bytes += flushRemainingPacks(t, be, pm)
bytes := fillPacks(t, rnd, pm, blobBuf)
// bytes does not include the last packs header
test.Equals(t, savedBytes, bytes+36)
t.Logf("saved %d bytes", bytes)
return int64(bytes)
@ -155,10 +96,6 @@ func BenchmarkPackerManager(t *testing.B) {
})
rnd := rand.New(rand.NewSource(randomSeed))
be := &mock.Backend{
SaveFn: func(context.Context, restic.Handle, restic.RewindReader) error { return nil },
}
blobBuf := make([]byte, maxBlobSize)
t.ReportAllocs()
@ -167,8 +104,9 @@ func BenchmarkPackerManager(t *testing.B) {
for i := 0; i < t.N; i++ {
rnd.Seed(randomSeed)
pm := newPackerManager(be, crypto.NewRandomKey())
fillPacks(t, rnd, be, pm, blobBuf)
flushRemainingPacks(t, be, pm)
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, t restic.BlobType, p *Packer) error {
return nil
})
fillPacks(t, rnd, pm, blobBuf)
}
}

View file

@ -0,0 +1,63 @@
package repository
import (
"context"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
// SavePacker implements saving a pack in the repository.
type SavePacker interface {
savePacker(ctx context.Context, t restic.BlobType, p *Packer) error
}
type uploadTask struct {
packer *Packer
tpe restic.BlobType
}
type packerUploader struct {
uploadQueue chan uploadTask
}
func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, connections uint) *packerUploader {
pu := &packerUploader{
uploadQueue: make(chan uploadTask),
}
for i := 0; i < int(connections); i++ {
wg.Go(func() error {
for {
select {
case t, ok := <-pu.uploadQueue:
if !ok {
return nil
}
err := repo.savePacker(ctx, t.tpe, t.packer)
if err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
})
}
return pu
}
func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *Packer) (err error) {
select {
case <-ctx.Done():
return ctx.Err()
case pu.uploadQueue <- uploadTask{tpe: t, packer: p}:
}
return nil
}
func (pu *packerUploader) TriggerShutdown() {
close(pu.uploadQueue)
}

View file

@ -28,9 +28,25 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
return nil, errors.Fatal("repack step requires a backend connection limit of at least two")
}
var keepMutex sync.Mutex
wg, wgCtx := errgroup.WithContext(ctx)
dstRepo.StartPackUploader(wgCtx, wg)
wg.Go(func() error {
var err error
obsoletePacks, err = repack(wgCtx, repo, dstRepo, packs, keepBlobs, p)
return err
})
if err := wg.Wait(); err != nil {
return nil, err
}
return obsoletePacks, nil
}
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
wg, wgCtx := errgroup.WithContext(ctx)
var keepMutex sync.Mutex
downloadQueue := make(chan restic.PackBlobs)
wg.Go(func() error {
defer close(downloadQueue)

View file

@ -8,6 +8,7 @@ import (
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
func randomSize(min, max int) int {
@ -15,6 +16,9 @@ func randomSize(min, max int) int {
}
func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData float32) {
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
for i := 0; i < blobs; i++ {
var (
tpe restic.BlobType
@ -46,6 +50,7 @@ func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData fl
if err = repo.Flush(context.Background()); err != nil {
t.Fatalf("repo.Flush() returned error %v", err)
}
repo.StartPackUploader(context.TODO(), &wg)
}
}
@ -62,6 +67,8 @@ func createRandomWrongBlob(t testing.TB, repo restic.Repository) {
// invert first data byte
buf[0] ^= 0xff
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
_, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, id, false)
if err != nil {
t.Fatalf("SaveFrom() error %v", err)

View file

@ -41,6 +41,8 @@ type Repository struct {
noAutoIndexUpdate bool
packerWg *errgroup.Group
uploader *packerUploader
treePM *packerManager
dataPM *packerManager
@ -103,8 +105,6 @@ func New(be restic.Backend, opts Options) *Repository {
be: be,
opts: opts,
idx: NewMasterIndex(),
dataPM: newPackerManager(be, nil),
treePM: newPackerManager(be, nil),
}
return repo
@ -416,31 +416,7 @@ func (r *Repository) saveAndEncrypt(ctx context.Context, t restic.BlobType, data
panic(fmt.Sprintf("invalid type: %v", t))
}
packer, err := pm.findPacker()
if err != nil {
return 0, err
}
// save ciphertext
size, err = packer.Add(t, id, ciphertext, uncompressedLength)
if err != nil {
return 0, err
}
// if the pack is not full enough, put back to the list
if packer.Size() < minPackSize {
debug.Log("pack is not full enough (%d bytes)", packer.Size())
pm.insertPacker(packer)
return size, nil
}
// else write the pack to the backend
hdrSize, err := r.savePacker(ctx, t, packer)
if err != nil {
return 0, err
}
return size + hdrSize, nil
return pm.SaveBlob(ctx, t, id, ciphertext, uncompressedLength)
}
// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the
@ -536,31 +512,45 @@ func (r *Repository) Flush(ctx context.Context) error {
return r.idx.SaveIndex(ctx, r)
}
// flushPacks saves all remaining packs.
func (r *Repository) flushPacks(ctx context.Context) error {
pms := []struct {
t restic.BlobType
pm *packerManager
}{
{restic.DataBlob, r.dataPM},
{restic.TreeBlob, r.treePM},
func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) {
if r.packerWg != nil {
panic("uploader already started")
}
for _, p := range pms {
p.pm.pm.Lock()
innerWg, ctx := errgroup.WithContext(ctx)
r.packerWg = innerWg
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections())
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.uploader.QueuePacker)
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.uploader.QueuePacker)
debug.Log("manually flushing %d packs", len(p.pm.packers))
for _, packer := range p.pm.packers {
_, err := r.savePacker(ctx, p.t, packer)
wg.Go(func() error {
return innerWg.Wait()
})
}
// FlushPacks saves all remaining packs.
func (r *Repository) flushPacks(ctx context.Context) error {
if r.packerWg == nil {
return nil
}
err := r.treePM.Flush(ctx)
if err != nil {
p.pm.pm.Unlock()
return err
}
err = r.dataPM.Flush(ctx)
if err != nil {
return err
}
p.pm.packers = p.pm.packers[:0]
p.pm.pm.Unlock()
}
return nil
r.uploader.TriggerShutdown()
err = r.packerWg.Wait()
r.treePM = nil
r.dataPM = nil
r.uploader = nil
r.packerWg = nil
return err
}
// Backend returns the backend for the repository.
@ -715,8 +705,6 @@ func (r *Repository) SearchKey(ctx context.Context, password string, maxKeys int
}
r.key = key.master
r.dataPM.key = key.master
r.treePM.key = key.master
r.keyName = key.Name()
cfg, err := restic.LoadConfig(ctx, r)
if err == crypto.ErrUnauthenticated {
@ -768,8 +756,6 @@ func (r *Repository) init(ctx context.Context, password string, cfg restic.Confi
}
r.key = key.master
r.dataPM.key = key.master
r.treePM.key = key.master
r.keyName = key.Name()
r.setConfig(cfg)
_, err = r.SaveJSONUnpacked(ctx, restic.ConfigFile, cfg)

View file

@ -22,6 +22,7 @@ import (
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
rtest "github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
)
var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20}
@ -43,6 +44,9 @@ func testSave(t *testing.T, version uint) {
id := restic.Hash(data)
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
// save
sid, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}, false)
rtest.OK(t, err)
@ -82,6 +86,9 @@ func testSaveFrom(t *testing.T, version uint) {
id := restic.Hash(data)
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
// save
id2, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, id, false)
rtest.OK(t, err)
@ -187,6 +194,9 @@ func testLoadBlob(t *testing.T, version uint) {
_, err := io.ReadFull(rnd, buf)
rtest.OK(t, err)
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false)
rtest.OK(t, err)
rtest.OK(t, repo.Flush(context.Background()))
@ -220,6 +230,9 @@ func benchmarkLoadBlob(b *testing.B, version uint) {
_, err := io.ReadFull(rnd, buf)
rtest.OK(b, err)
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false)
rtest.OK(b, err)
rtest.OK(b, repo.Flush(context.Background()))
@ -389,6 +402,9 @@ func benchmarkLoadIndex(b *testing.B, version uint) {
// saveRandomDataBlobs generates random data blobs and saves them to the repository.
func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) {
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
for i := 0; i < num; i++ {
size := rand.Int() % sizeMax

View file

@ -5,6 +5,7 @@ import (
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
// Repository stores data in a backend. It provides high-level functions and
@ -34,6 +35,10 @@ type Repository interface {
// the the pack header.
ListPack(context.Context, ID, int64) ([]Blob, uint32, error)
// StartPackUploader start goroutines to upload new pack files. The errgroup
// is used to immediately notify about an upload error. Flush() will also return
// that error.
StartPackUploader(ctx context.Context, wg *errgroup.Group)
Flush(context.Context) error
SaveUnpacked(context.Context, FileType, []byte) (ID, error)

View file

@ -10,6 +10,7 @@ import (
"time"
"github.com/restic/chunker"
"golang.org/x/sync/errgroup"
)
// fakeFile returns a reader which yields deterministic pseudo-random data.
@ -169,9 +170,17 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int,
rand: rand.New(rand.NewSource(seed)),
}
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
treeID := fs.saveTree(context.TODO(), seed, depth)
snapshot.Tree = &treeID
err = repo.Flush(context.Background())
if err != nil {
t.Fatal(err)
}
id, err := repo.SaveJSONUnpacked(context.TODO(), SnapshotFile, snapshot)
if err != nil {
t.Fatal(err)
@ -181,11 +190,6 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int,
t.Logf("saved snapshot %v", id.Str())
err = repo.Flush(context.Background())
if err != nil {
t.Fatal(err)
}
return snapshot
}

View file

@ -12,6 +12,7 @@ import (
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
)
var testFiles = []struct {
@ -98,6 +99,8 @@ func TestLoadTree(t *testing.T) {
repo, cleanup := repository.TestRepository(t)
defer cleanup()
var wg errgroup.Group
repo.StartPackUploader(context.TODO(), &wg)
// save tree
tree := restic.NewTree(0)
id, err := repo.SaveTree(context.TODO(), tree)

View file

@ -15,6 +15,7 @@ import (
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
)
type Node interface{}
@ -122,8 +123,9 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot) (*res
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg, wgCtx := errgroup.WithContext(ctx)
repo.StartPackUploader(wgCtx, wg)
treeID := saveDir(t, repo, snapshot.Nodes, 1000)
err := repo.Flush(ctx)
if err != nil {
t.Fatal(err)