From bba4c69a2a59a0190bb8f4e6b9914b7a656bba59 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Aug 2021 22:26:09 +0200 Subject: [PATCH 1/7] tag: Remove unnecessary flush call --- cmd/restic/cmd_tag.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/cmd/restic/cmd_tag.go b/cmd/restic/cmd_tag.go index e6688b2fc..b05cd6e55 100644 --- a/cmd/restic/cmd_tag.go +++ b/cmd/restic/cmd_tag.go @@ -89,10 +89,6 @@ func changeTags(ctx context.Context, repo *repository.Repository, sn *restic.Sna debug.Log("new snapshot saved as %v", id) - if err = repo.Flush(ctx); err != nil { - return false, err - } - // Remove the old snapshot. h := restic.Handle{Type: restic.SnapshotFile, Name: sn.ID().String()} if err = repo.Backend().Remove(ctx, h); err != nil { From 120ccc87546722112a0758a68dab2cfe934b3cfb Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Aug 2021 22:52:05 +0200 Subject: [PATCH 2/7] repository: Rework blob saving to use an async pack uploader Previously, SaveAndEncrypt would assemble blobs into packs and either return immediately if the pack is not yet full or upload the pack file otherwise. The upload will block the current goroutine until it finishes. Now, the upload is done using separate goroutines. This requires changes to the error handling. As uploads are no longer tied to a SaveAndEncrypt call, failed uploads are signaled using an errgroup. To count the uploaded amount of data, the pack header overhead is no longer returned by `packer.Finalize` but rather by `packer.HeaderOverhead`. This helper method is necessary to continue returning the pack header overhead directly to the responsible call to `repository.SaveBlob`. Without the method this would not be possible, as packs are finalized asynchronously. --- cmd/restic/cmd_recover.go | 25 ++++-- internal/archiver/archiver.go | 59 ++++++------ internal/archiver/archiver_test.go | 7 ++ internal/checker/checker_test.go | 5 ++ internal/pack/pack.go | 18 ++-- internal/pack/pack_test.go | 2 +- internal/repository/fuzz_test.go | 4 + internal/repository/packer_manager.go | 84 ++++++++++++----- internal/repository/packer_manager_test.go | 17 ++-- internal/repository/packer_uploader.go | 63 +++++++++++++ internal/repository/repack.go | 18 +++- internal/repository/repack_test.go | 7 ++ internal/repository/repository.go | 100 +++++++++------------ internal/repository/repository_test.go | 16 ++++ internal/restic/repository.go | 5 ++ internal/restic/testing.go | 14 +-- internal/restic/tree_test.go | 3 + internal/restorer/restorer_test.go | 4 +- 18 files changed, 321 insertions(+), 130 deletions(-) create mode 100644 internal/repository/packer_uploader.go diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index 93810ce0f..42cabaa68 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" ) var cmdRecover = &cobra.Command{ @@ -131,14 +132,26 @@ func runRecover(gopts GlobalOptions) error { } } - treeID, err := repo.SaveTree(gopts.ctx, tree) - if err != nil { - return errors.Fatalf("unable to save new tree to the repo: %v", err) - } + wg, ctx := errgroup.WithContext(gopts.ctx) + repo.StartPackUploader(ctx, wg) - err = repo.Flush(gopts.ctx) + var treeID restic.ID + wg.Go(func() error { + var err error + treeID, err = repo.SaveTree(ctx, tree) + if err != nil { + return errors.Fatalf("unable to save new tree to the repo: %v", err) + } + + err = repo.Flush(ctx) + if err != nil { + return errors.Fatalf("unable to save blobs to the repo: %v", err) + } + return nil + }) + err = wg.Wait() if err != nil { - return errors.Fatalf("unable to save blobs to the repo: %v", err) + return err } return createSnapshot(gopts.ctx, "/recover", hostname, []string{"recovered"}, repo, &treeID) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 43153030a..90b9e6904 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -801,40 +801,47 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return nil, restic.ID{}, err } - wg, wgCtx := errgroup.WithContext(ctx) - start := time.Now() - var rootTreeID restic.ID - var stats ItemStats - wg.Go(func() error { - arch.runWorkers(wgCtx, wg) - debug.Log("starting snapshot") - tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) + wgUp, wgUpCtx := errgroup.WithContext(ctx) + arch.Repo.StartPackUploader(wgUpCtx, wgUp) + + wgUp.Go(func() error { + wg, wgCtx := errgroup.WithContext(wgUpCtx) + start := time.Now() + + var stats ItemStats + wg.Go(func() error { + arch.runWorkers(wgCtx, wg) + + debug.Log("starting snapshot") + tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) + if err != nil { + return err + } + + if len(tree.Nodes) == 0 { + return errors.New("snapshot is empty") + } + + rootTreeID, stats, err = arch.saveTree(wgCtx, tree) + arch.stopWorkers() + return err + }) + + err = wg.Wait() + debug.Log("err is %v", err) + if err != nil { + debug.Log("error while saving tree: %v", err) return err } - if len(tree.Nodes) == 0 { - return errors.New("snapshot is empty") - } + arch.CompleteItem("/", nil, nil, stats, time.Since(start)) - rootTreeID, stats, err = arch.saveTree(wgCtx, tree) - arch.stopWorkers() - return err + return arch.Repo.Flush(ctx) }) - - err = wg.Wait() - debug.Log("err is %v", err) - - if err != nil { - debug.Log("error while saving tree: %v", err) - return nil, restic.ID{}, err - } - - arch.CompleteItem("/", nil, nil, stats, time.Since(start)) - - err = arch.Repo.Flush(ctx) + err = wgUp.Wait() if err != nil { return nil, restic.ID{}, err } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index dfc24b27b..1ae126014 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -42,6 +42,7 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo rest func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch := New(repo, filesystem, Options{}) arch.runWorkers(ctx, wg) @@ -213,6 +214,7 @@ func TestArchiverSave(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { @@ -281,6 +283,7 @@ func TestArchiverSaveReaderFS(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) ts := time.Now() filename := "xx" @@ -830,6 +833,7 @@ func TestArchiverSaveDir(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(context.Background()) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.runWorkers(ctx, wg) @@ -914,6 +918,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) { // archiver did save the same tree several times for i := 0; i < 5; i++ { wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.runWorkers(ctx, wg) @@ -1097,6 +1102,7 @@ func TestArchiverSaveTree(t *testing.T) { } wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch.runWorkers(ctx, wg) @@ -2239,6 +2245,7 @@ func TestRacyFileSwap(t *testing.T) { defer cancel() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: statfs}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index f658613c3..a4245d574 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -20,6 +20,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var checkerTestData = filepath.Join("testdata", "checker-test-repo.tar.gz") @@ -476,6 +477,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { Nodes: []*restic.Node{damagedNode}, } + wg, wgCtx := errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) id, err := repo.SaveTree(ctx, damagedTree) test.OK(t, repo.Flush(ctx)) test.OK(t, err) @@ -483,6 +486,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { buf, err := repo.LoadBlob(ctx, restic.TreeBlob, id, nil) test.OK(t, err) + wg, wgCtx = errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) _, _, _, err = repo.SaveBlob(ctx, restic.DataBlob, buf, id, false) test.OK(t, err) diff --git a/internal/pack/pack.go b/internal/pack/pack.go index cf0093e70..1d991ccb5 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -70,14 +70,13 @@ type compressedHeaderEntry struct { } // Finalize writes the header for all added blobs and finalizes the pack. -// Returned are the number of bytes written, not yet reported by Add. -func (p *Packer) Finalize() (int, error) { +func (p *Packer) Finalize() error { p.m.Lock() defer p.m.Unlock() header, err := p.makeHeader() if err != nil { - return 0, err + return err } encryptedHeader := make([]byte, 0, restic.CiphertextLength(len(header))) @@ -88,22 +87,27 @@ func (p *Packer) Finalize() (int, error) { // append the header n, err := p.wr.Write(encryptedHeader) if err != nil { - return 0, errors.Wrap(err, "Write") + return errors.Wrap(err, "Write") } hdrBytes := len(encryptedHeader) if n != hdrBytes { - return 0, errors.New("wrong number of bytes written") + return errors.New("wrong number of bytes written") } // write length err = binary.Write(p.wr, binary.LittleEndian, uint32(hdrBytes)) if err != nil { - return 0, errors.Wrap(err, "binary.Write") + return errors.Wrap(err, "binary.Write") } p.bytes += uint(hdrBytes + binary.Size(uint32(0))) - return restic.CiphertextLength(0) + binary.Size(uint32(0)), nil + return nil +} + +// HeaderOverhead returns an estimate of the number of bytes written by a call to Finalize. +func (p *Packer) HeaderOverhead() int { + return restic.CiphertextLength(0) + binary.Size(uint32(0)) } // makeHeader constructs the header for p. diff --git a/internal/pack/pack_test.go b/internal/pack/pack_test.go index 6170e807c..7c8250613 100644 --- a/internal/pack/pack_test.go +++ b/internal/pack/pack_test.go @@ -42,7 +42,7 @@ func newPack(t testing.TB, k *crypto.Key, lengths []int) ([]Buf, []byte, uint) { rtest.OK(t, err) } - _, err := p.Finalize() + err := p.Finalize() rtest.OK(t, err) return bufs, buf.Bytes(), p.Size() diff --git a/internal/repository/fuzz_test.go b/internal/repository/fuzz_test.go index 3847f37f7..7a98477b6 100644 --- a/internal/repository/fuzz_test.go +++ b/internal/repository/fuzz_test.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) // Test saving a blob and loading it again, with varying buffer sizes. @@ -23,6 +24,9 @@ func FuzzSaveLoadBlob(f *testing.F) { id := restic.Hash(blob) repo, _ := TestRepositoryWithBackend(t, mem.New(), 2) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + _, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, blob, id, false) if err != nil { t.Fatal(err) diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index c7acec7f1..e3b43b24e 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -20,12 +20,6 @@ import ( "github.com/minio/sha256-simd" ) -// Saver implements saving data in a backend. -type Saver interface { - Save(context.Context, restic.Handle, restic.RewindReader) error - Hasher() hash.Hash -} - // Packer holds a pack.Packer together with a hash writer. type Packer struct { *pack.Packer @@ -36,8 +30,11 @@ type Packer struct { // packerManager keeps a list of open packs and creates new on demand. type packerManager struct { - be Saver - key *crypto.Key + tpe restic.BlobType + key *crypto.Key + hasherFn func() hash.Hash + queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error + pm sync.Mutex packers []*Packer } @@ -46,13 +43,58 @@ const minPackSize = 4 * 1024 * 1024 // newPackerManager returns an new packer manager which writes temporary files // to a temporary directory -func newPackerManager(be Saver, key *crypto.Key) *packerManager { +func newPackerManager(key *crypto.Key, hasherFn func() hash.Hash, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { return &packerManager{ - be: be, - key: key, + tpe: tpe, + key: key, + hasherFn: hasherFn, + queueFn: queueFn, } } +func (r *packerManager) Flush(ctx context.Context) error { + r.pm.Lock() + defer r.pm.Unlock() + + debug.Log("manually flushing %d packs", len(r.packers)) + for _, packer := range r.packers { + err := r.queueFn(ctx, r.tpe, packer) + if err != nil { + return err + } + } + r.packers = r.packers[:0] + return nil +} + +func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) { + packer, err := r.findPacker() + if err != nil { + return 0, err + } + + // save ciphertext + size, err := packer.Add(t, id, ciphertext, uncompressedLength) + if err != nil { + return 0, err + } + + // if the pack is not full enough, put back to the list + if packer.Size() < minPackSize { + debug.Log("pack is not full enough (%d bytes)", packer.Size()) + r.insertPacker(packer) + return size, nil + } + + // else write the pack to the backend + err = r.queueFn(ctx, t, packer) + if err != nil { + return 0, err + } + + return size + packer.HeaderOverhead(), nil +} + // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. func (r *packerManager) findPacker() (packer *Packer, err error) { @@ -77,7 +119,7 @@ func (r *packerManager) findPacker() (packer *Packer, err error) { } w := io.Writer(tmpfile) - beHasher := r.be.Hasher() + beHasher := r.hasherFn() var beHw *hashing.Writer if beHasher != nil { beHw = hashing.NewWriter(w, beHasher) @@ -106,11 +148,11 @@ func (r *packerManager) insertPacker(p *Packer) { } // savePacker stores p in the backend. -func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) (int, error) { +func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error { debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size()) - hdrOverhead, err := p.Packer.Finalize() + err := p.Packer.Finalize() if err != nil { - return 0, err + return err } id := restic.IDFromHash(p.hw.Sum(nil)) @@ -122,27 +164,27 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe } rd, err := restic.NewFileReader(p.tmpfile, beHash) if err != nil { - return 0, err + return err } err = r.be.Save(ctx, h, rd) if err != nil { debug.Log("Save(%v) error: %v", h, err) - return 0, err + return err } debug.Log("saved as %v", h) err = p.tmpfile.Close() if err != nil { - return 0, errors.Wrap(err, "close tempfile") + return errors.Wrap(err, "close tempfile") } // on windows the tempfile is automatically deleted on close if runtime.GOOS != "windows" { err = fs.RemoveIfExists(p.tmpfile.Name()) if err != nil { - return 0, errors.Wrap(err, "Remove") + return errors.Wrap(err, "Remove") } } @@ -152,9 +194,9 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe // Save index if full if r.noAutoIndexUpdate { - return hdrOverhead, nil + return nil } - return hdrOverhead, r.idx.SaveFullIndex(ctx, r) + return r.idx.SaveFullIndex(ctx, r) } // countPacker returns the number of open (unfinished) packers. diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 633e1be35..8713aad4e 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -2,6 +2,7 @@ package repository import ( "context" + "hash" "io" "math/rand" "os" @@ -15,6 +16,12 @@ import ( "github.com/restic/restic/internal/restic" ) +// Saver implements saving data in a backend. +type Saver interface { + Save(context.Context, restic.Handle, restic.RewindReader) error + Hasher() hash.Hash +} + func randomID(rd io.Reader) restic.ID { id := restic.ID{} _, err := io.ReadFull(rd, id[:]) @@ -84,7 +91,7 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf [] continue } - _, err = packer.Finalize() + err = packer.Finalize() if err != nil { t.Fatal(err) } @@ -103,11 +110,11 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf [] func flushRemainingPacks(t testing.TB, be Saver, pm *packerManager) (bytes int) { if pm.countPacker() > 0 { for _, packer := range pm.packers { - n, err := packer.Finalize() + err := packer.Finalize() if err != nil { t.Fatal(err) } - bytes += n + bytes += packer.HeaderOverhead() packID := restic.IDFromHash(packer.hw.Sum(nil)) var beHash []byte @@ -137,7 +144,7 @@ func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) be := mem.New() - pm := newPackerManager(be, crypto.NewRandomKey()) + pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) blobBuf := make([]byte, maxBlobSize) @@ -167,7 +174,7 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(be, crypto.NewRandomKey()) + pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) fillPacks(t, rnd, be, pm, blobBuf) flushRemainingPacks(t, be, pm) } diff --git a/internal/repository/packer_uploader.go b/internal/repository/packer_uploader.go new file mode 100644 index 000000000..30c8f77af --- /dev/null +++ b/internal/repository/packer_uploader.go @@ -0,0 +1,63 @@ +package repository + +import ( + "context" + + "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" +) + +// SavePacker implements saving a pack in the repository. +type SavePacker interface { + savePacker(ctx context.Context, t restic.BlobType, p *Packer) error +} + +type uploadTask struct { + packer *Packer + tpe restic.BlobType +} + +type packerUploader struct { + uploadQueue chan uploadTask +} + +func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, connections uint) *packerUploader { + pu := &packerUploader{ + uploadQueue: make(chan uploadTask), + } + + for i := 0; i < int(connections); i++ { + wg.Go(func() error { + for { + select { + case t, ok := <-pu.uploadQueue: + if !ok { + return nil + } + err := repo.savePacker(ctx, t.tpe, t.packer) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + return pu +} + +func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *Packer) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + case pu.uploadQueue <- uploadTask{tpe: t, packer: p}: + } + + return nil +} + +func (pu *packerUploader) TriggerShutdown() { + close(pu.uploadQueue) +} diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 80902c11c..7840e714a 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -28,9 +28,25 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito return nil, errors.Fatal("repack step requires a backend connection limit of at least two") } - var keepMutex sync.Mutex wg, wgCtx := errgroup.WithContext(ctx) + dstRepo.StartPackUploader(wgCtx, wg) + wg.Go(func() error { + var err error + obsoletePacks, err = repack(wgCtx, repo, dstRepo, packs, keepBlobs, p) + return err + }) + + if err := wg.Wait(); err != nil { + return nil, err + } + return obsoletePacks, nil +} + +func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { + wg, wgCtx := errgroup.WithContext(ctx) + + var keepMutex sync.Mutex downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { defer close(downloadQueue) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 248477292..9389d643e 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -8,6 +8,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) func randomSize(min, max int) int { @@ -15,6 +16,9 @@ func randomSize(min, max int) int { } func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData float32) { + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + for i := 0; i < blobs; i++ { var ( tpe restic.BlobType @@ -46,6 +50,7 @@ func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData fl if err = repo.Flush(context.Background()); err != nil { t.Fatalf("repo.Flush() returned error %v", err) } + repo.StartPackUploader(context.TODO(), &wg) } } @@ -62,6 +67,8 @@ func createRandomWrongBlob(t testing.TB, repo restic.Repository) { // invert first data byte buf[0] ^= 0xff + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) _, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, id, false) if err != nil { t.Fatalf("SaveFrom() error %v", err) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 3ecf853d2..7f42dbdd5 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -41,8 +41,10 @@ type Repository struct { noAutoIndexUpdate bool - treePM *packerManager - dataPM *packerManager + packerWg *errgroup.Group + uploader *packerUploader + treePM *packerManager + dataPM *packerManager allocEnc sync.Once allocDec sync.Once @@ -100,11 +102,9 @@ func (c *CompressionMode) Type() string { // New returns a new repository with backend be. func New(be restic.Backend, opts Options) *Repository { repo := &Repository{ - be: be, - opts: opts, - idx: NewMasterIndex(), - dataPM: newPackerManager(be, nil), - treePM: newPackerManager(be, nil), + be: be, + opts: opts, + idx: NewMasterIndex(), } return repo @@ -416,31 +416,7 @@ func (r *Repository) saveAndEncrypt(ctx context.Context, t restic.BlobType, data panic(fmt.Sprintf("invalid type: %v", t)) } - packer, err := pm.findPacker() - if err != nil { - return 0, err - } - - // save ciphertext - size, err = packer.Add(t, id, ciphertext, uncompressedLength) - if err != nil { - return 0, err - } - - // if the pack is not full enough, put back to the list - if packer.Size() < minPackSize { - debug.Log("pack is not full enough (%d bytes)", packer.Size()) - pm.insertPacker(packer) - return size, nil - } - - // else write the pack to the backend - hdrSize, err := r.savePacker(ctx, t, packer) - if err != nil { - return 0, err - } - - return size + hdrSize, nil + return pm.SaveBlob(ctx, t, id, ciphertext, uncompressedLength) } // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the @@ -536,31 +512,45 @@ func (r *Repository) Flush(ctx context.Context) error { return r.idx.SaveIndex(ctx, r) } -// flushPacks saves all remaining packs. +func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) { + if r.packerWg != nil { + panic("uploader already started") + } + + innerWg, ctx := errgroup.WithContext(ctx) + r.packerWg = innerWg + r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) + r.treePM = newPackerManager(r.key, r.be.Hasher, restic.TreeBlob, r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, r.be.Hasher, restic.DataBlob, r.uploader.QueuePacker) + + wg.Go(func() error { + return innerWg.Wait() + }) +} + +// FlushPacks saves all remaining packs. func (r *Repository) flushPacks(ctx context.Context) error { - pms := []struct { - t restic.BlobType - pm *packerManager - }{ - {restic.DataBlob, r.dataPM}, - {restic.TreeBlob, r.treePM}, + if r.packerWg == nil { + return nil } - for _, p := range pms { - p.pm.pm.Lock() - - debug.Log("manually flushing %d packs", len(p.pm.packers)) - for _, packer := range p.pm.packers { - _, err := r.savePacker(ctx, p.t, packer) - if err != nil { - p.pm.pm.Unlock() - return err - } - } - p.pm.packers = p.pm.packers[:0] - p.pm.pm.Unlock() + err := r.treePM.Flush(ctx) + if err != nil { + return err } - return nil + err = r.dataPM.Flush(ctx) + if err != nil { + return err + } + r.uploader.TriggerShutdown() + err = r.packerWg.Wait() + + r.treePM = nil + r.dataPM = nil + r.uploader = nil + r.packerWg = nil + + return err } // Backend returns the backend for the repository. @@ -715,8 +705,6 @@ func (r *Repository) SearchKey(ctx context.Context, password string, maxKeys int } r.key = key.master - r.dataPM.key = key.master - r.treePM.key = key.master r.keyName = key.Name() cfg, err := restic.LoadConfig(ctx, r) if err == crypto.ErrUnauthenticated { @@ -768,8 +756,6 @@ func (r *Repository) init(ctx context.Context, password string, cfg restic.Confi } r.key = key.master - r.dataPM.key = key.master - r.treePM.key = key.master r.keyName = key.Name() r.setConfig(cfg) _, err = r.SaveJSONUnpacked(ctx, restic.ConfigFile, cfg) diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index f0e25f520..38d3117a5 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -22,6 +22,7 @@ import ( "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20} @@ -43,6 +44,9 @@ func testSave(t *testing.T, version uint) { id := restic.Hash(data) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + // save sid, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}, false) rtest.OK(t, err) @@ -82,6 +86,9 @@ func testSaveFrom(t *testing.T, version uint) { id := restic.Hash(data) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + // save id2, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, id, false) rtest.OK(t, err) @@ -187,6 +194,9 @@ func testLoadBlob(t *testing.T, version uint) { _, err := io.ReadFull(rnd, buf) rtest.OK(t, err) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(t, err) rtest.OK(t, repo.Flush(context.Background())) @@ -220,6 +230,9 @@ func benchmarkLoadBlob(b *testing.B, version uint) { _, err := io.ReadFull(rnd, buf) rtest.OK(b, err) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(b, err) rtest.OK(b, repo.Flush(context.Background())) @@ -389,6 +402,9 @@ func benchmarkLoadIndex(b *testing.B, version uint) { // saveRandomDataBlobs generates random data blobs and saves them to the repository. func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) { + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + for i := 0; i < num; i++ { size := rand.Int() % sizeMax diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 35fdbabcb..73f53adb1 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -5,6 +5,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/ui/progress" + "golang.org/x/sync/errgroup" ) // Repository stores data in a backend. It provides high-level functions and @@ -34,6 +35,10 @@ type Repository interface { // the the pack header. ListPack(context.Context, ID, int64) ([]Blob, uint32, error) + // StartPackUploader start goroutines to upload new pack files. The errgroup + // is used to immediately notify about an upload error. Flush() will also return + // that error. + StartPackUploader(ctx context.Context, wg *errgroup.Group) Flush(context.Context) error SaveUnpacked(context.Context, FileType, []byte) (ID, error) diff --git a/internal/restic/testing.go b/internal/restic/testing.go index 54621c183..cc5e7d890 100644 --- a/internal/restic/testing.go +++ b/internal/restic/testing.go @@ -10,6 +10,7 @@ import ( "time" "github.com/restic/chunker" + "golang.org/x/sync/errgroup" ) // fakeFile returns a reader which yields deterministic pseudo-random data. @@ -169,9 +170,17 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, rand: rand.New(rand.NewSource(seed)), } + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + treeID := fs.saveTree(context.TODO(), seed, depth) snapshot.Tree = &treeID + err = repo.Flush(context.Background()) + if err != nil { + t.Fatal(err) + } + id, err := repo.SaveJSONUnpacked(context.TODO(), SnapshotFile, snapshot) if err != nil { t.Fatal(err) @@ -181,11 +190,6 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, t.Logf("saved snapshot %v", id.Str()) - err = repo.Flush(context.Background()) - if err != nil { - t.Fatal(err) - } - return snapshot } diff --git a/internal/restic/tree_test.go b/internal/restic/tree_test.go index 598a0ad4b..0f9b9a9d3 100644 --- a/internal/restic/tree_test.go +++ b/internal/restic/tree_test.go @@ -12,6 +12,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var testFiles = []struct { @@ -98,6 +99,8 @@ func TestLoadTree(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) // save tree tree := restic.NewTree(0) id, err := repo.SaveTree(context.TODO(), tree) diff --git a/internal/restorer/restorer_test.go b/internal/restorer/restorer_test.go index 7e7e0c4c6..512c144ab 100644 --- a/internal/restorer/restorer_test.go +++ b/internal/restorer/restorer_test.go @@ -15,6 +15,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) type Node interface{} @@ -122,8 +123,9 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot) (*res ctx, cancel := context.WithCancel(context.Background()) defer cancel() + wg, wgCtx := errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) treeID := saveDir(t, repo, snapshot.Nodes, 1000) - err := repo.Flush(ctx) if err != nil { t.Fatal(err) From bba1e81719d1e8b5033f0380c18b9588a5448532 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Aug 2021 23:18:37 +0200 Subject: [PATCH 3/7] archiver: Limit blob saver count to GOMAXPROCS Now with the asynchronous uploaders there's no more benefit from using more blob savers than we have CPUs. Thus use just one blob saver for each CPU we are allowed to use. --- internal/archiver/archiver.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 90b9e6904..5aa509449 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -122,7 +122,9 @@ func (o Options) ApplyDefaults() Options { } if o.SaveBlobConcurrency == 0 { - o.SaveBlobConcurrency = uint(runtime.NumCPU()) + // blob saving is CPU bound due to hash checking and encryption + // the actual upload is handled by the repository itself + o.SaveBlobConcurrency = uint(runtime.GOMAXPROCS(0)) } if o.SaveTreeConcurrency == 0 { From fa25d6118ed8f514a6915aff50c3bd037b4bb0ac Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Aug 2021 23:33:43 +0200 Subject: [PATCH 4/7] archiver: Reduce tree saver concurrency Large amount of tree savers have no obvious benefit, however they can increase the amount of (potentially large) trees kept in memory. --- internal/archiver/archiver.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 5aa509449..7d28fcb30 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -128,9 +128,12 @@ func (o Options) ApplyDefaults() Options { } if o.SaveTreeConcurrency == 0 { - // use a relatively high concurrency here, having multiple SaveTree - // workers is cheap - o.SaveTreeConcurrency = o.SaveBlobConcurrency * 20 + // can either wait for a file, wait for a tree, serialize a tree or wait for saveblob + // the last two are cpu-bound and thus mutually exclusive. + // Also allow waiting for FileReadConcurrency files, this is the maximum of FutureFiles + // which currently can be in progress. The main backup loop blocks when trying to queue + // more files to read. + o.SaveTreeConcurrency = uint(runtime.GOMAXPROCS(0)) + o.FileReadConcurrency } return o From 753e56ee29ece17c237e3a40252f5cdec9825ba1 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 22 Aug 2021 15:10:00 +0200 Subject: [PATCH 5/7] repository: Limit to a single pending pack file Use only a single not completed pack file to keep the number of open and active pack files low. The main change here is to defer hashing the pack file to the upload step. This prevents the pack assembly step to become a bottleneck as the only task is now to write data to the temporary pack file. The tests are cleaned up to no longer reimplement packer manager functions. --- internal/repository/packer_manager.go | 145 ++++++++++----------- internal/repository/packer_manager_test.go | 117 ++++------------- internal/repository/repository.go | 4 +- 3 files changed, 95 insertions(+), 171 deletions(-) diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index e3b43b24e..32b2c9b7a 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -1,9 +1,10 @@ package repository import ( + "bufio" "context" - "hash" "io" + "io/ioutil" "os" "runtime" "sync" @@ -23,32 +24,29 @@ import ( // Packer holds a pack.Packer together with a hash writer. type Packer struct { *pack.Packer - hw *hashing.Writer - beHw *hashing.Writer tmpfile *os.File + bufWr *bufio.Writer } // packerManager keeps a list of open packs and creates new on demand. type packerManager struct { - tpe restic.BlobType - key *crypto.Key - hasherFn func() hash.Hash - queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error + tpe restic.BlobType + key *crypto.Key + queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error - pm sync.Mutex - packers []*Packer + pm sync.Mutex + packer *Packer } const minPackSize = 4 * 1024 * 1024 // newPackerManager returns an new packer manager which writes temporary files // to a temporary directory -func newPackerManager(key *crypto.Key, hasherFn func() hash.Hash, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { +func newPackerManager(key *crypto.Key, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { return &packerManager{ - tpe: tpe, - key: key, - hasherFn: hasherFn, - queueFn: queueFn, + tpe: tpe, + key: key, + queueFn: queueFn, } } @@ -56,24 +54,34 @@ func (r *packerManager) Flush(ctx context.Context) error { r.pm.Lock() defer r.pm.Unlock() - debug.Log("manually flushing %d packs", len(r.packers)) - for _, packer := range r.packers { - err := r.queueFn(ctx, r.tpe, packer) + if r.packer != nil { + debug.Log("manually flushing pending pack") + err := r.queueFn(ctx, r.tpe, r.packer) if err != nil { return err } + r.packer = nil } - r.packers = r.packers[:0] return nil } func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) { - packer, err := r.findPacker() - if err != nil { - return 0, err + r.pm.Lock() + defer r.pm.Unlock() + + var err error + packer := r.packer + if r.packer == nil { + packer, err = r.newPacker() + if err != nil { + return 0, err + } } + // remember packer + r.packer = packer // save ciphertext + // Add only appends bytes in memory to avoid being a scaling bottleneck size, err := packer.Add(t, id, ciphertext, uncompressedLength) if err != nil { return 0, err @@ -82,10 +90,12 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest // if the pack is not full enough, put back to the list if packer.Size() < minPackSize { debug.Log("pack is not full enough (%d bytes)", packer.Size()) - r.insertPacker(packer) return size, nil } + // forget full packer + r.packer = nil + // call while holding lock to prevent findPacker from creating new packers if the uploaders are busy // else write the pack to the backend err = r.queueFn(ctx, t, packer) if err != nil { @@ -97,56 +107,24 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. -func (r *packerManager) findPacker() (packer *Packer, err error) { - r.pm.Lock() - defer r.pm.Unlock() - - // search for a suitable packer - if len(r.packers) > 0 { - p := r.packers[0] - last := len(r.packers) - 1 - r.packers[0] = r.packers[last] - r.packers[last] = nil // Allow GC of stale reference. - r.packers = r.packers[:last] - return p, nil - } - - // no suitable packer found, return new +func (r *packerManager) newPacker() (packer *Packer, err error) { debug.Log("create new pack") tmpfile, err := fs.TempFile("", "restic-temp-pack-") if err != nil { return nil, errors.Wrap(err, "fs.TempFile") } - w := io.Writer(tmpfile) - beHasher := r.hasherFn() - var beHw *hashing.Writer - if beHasher != nil { - beHw = hashing.NewWriter(w, beHasher) - w = beHw - } - - hw := hashing.NewWriter(w, sha256.New()) - p := pack.NewPacker(r.key, hw) + bufWr := bufio.NewWriter(tmpfile) + p := pack.NewPacker(r.key, bufWr) packer = &Packer{ Packer: p, - beHw: beHw, - hw: hw, tmpfile: tmpfile, + bufWr: bufWr, } return packer, nil } -// insertPacker appends p to s.packs. -func (r *packerManager) insertPacker(p *Packer) { - r.pm.Lock() - defer r.pm.Unlock() - - r.packers = append(r.packers, p) - debug.Log("%d packers\n", len(r.packers)) -} - // savePacker stores p in the backend. func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error { debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size()) @@ -154,20 +132,43 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe if err != nil { return err } - - id := restic.IDFromHash(p.hw.Sum(nil)) - h := restic.Handle{Type: restic.PackFile, Name: id.String(), - ContainedBlobType: t} - var beHash []byte - if p.beHw != nil { - beHash = p.beHw.Sum(nil) - } - rd, err := restic.NewFileReader(p.tmpfile, beHash) + err = p.bufWr.Flush() if err != nil { return err } - err = r.be.Save(ctx, h, rd) + // calculate sha256 hash in a second pass + var rd io.Reader + rd, err = restic.NewFileReader(p.tmpfile, nil) + if err != nil { + return err + } + beHasher := r.be.Hasher() + var beHr *hashing.Reader + if beHasher != nil { + beHr = hashing.NewReader(rd, beHasher) + rd = beHr + } + + hr := hashing.NewReader(rd, sha256.New()) + _, err = io.Copy(ioutil.Discard, hr) + if err != nil { + return err + } + + id := restic.IDFromHash(hr.Sum(nil)) + h := restic.Handle{Type: restic.PackFile, Name: id.String(), + ContainedBlobType: t} + var beHash []byte + if beHr != nil { + beHash = beHr.Sum(nil) + } + rrd, err := restic.NewFileReader(p.tmpfile, beHash) + if err != nil { + return err + } + + err = r.be.Save(ctx, h, rrd) if err != nil { debug.Log("Save(%v) error: %v", h, err) return err @@ -198,11 +199,3 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe } return r.idx.SaveFullIndex(ctx, r) } - -// countPacker returns the number of open (unfinished) packers. -func (r *packerManager) countPacker() int { - r.pm.Lock() - defer r.pm.Unlock() - - return len(r.packers) -} diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 8713aad4e..67a33c757 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -2,26 +2,16 @@ package repository import ( "context" - "hash" "io" "math/rand" - "os" "sync" "testing" - "github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/crypto" - "github.com/restic/restic/internal/fs" - "github.com/restic/restic/internal/mock" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" ) -// Saver implements saving data in a backend. -type Saver interface { - Save(context.Context, restic.Handle, restic.RewindReader) error - Hasher() hash.Hash -} - func randomID(rd io.Reader) restic.ID { id := restic.ID{} _, err := io.ReadFull(rd, id[:]) @@ -40,91 +30,27 @@ func min(a, b int) int { return b } -func saveFile(t testing.TB, be Saver, length int, f *os.File, id restic.ID, hash []byte) { - h := restic.Handle{Type: restic.PackFile, Name: id.String()} - t.Logf("save file %v", h) - - rd, err := restic.NewFileReader(f, hash) - if err != nil { - t.Fatal(err) - } - - err = be.Save(context.TODO(), h, rd) - if err != nil { - t.Fatal(err) - } - - if err := f.Close(); err != nil { - t.Fatal(err) - } - - if err := fs.RemoveIfExists(f.Name()); err != nil { - t.Fatal(err) - } -} - -func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf []byte) (bytes int) { +func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) { for i := 0; i < 100; i++ { l := rnd.Intn(maxBlobSize) - - packer, err := pm.findPacker() - if err != nil { - t.Fatal(err) - } - id := randomID(rnd) buf = buf[:l] // Only change a few bytes so we know we're not benchmarking the RNG. rnd.Read(buf[:min(l, 4)]) - n, err := packer.Add(restic.DataBlob, id, buf, 0) + n, err := pm.SaveBlob(context.TODO(), restic.DataBlob, id, buf, 0) if err != nil { t.Fatal(err) } - if n != l+37 { + if n != l+37 && n != l+37+36 { t.Errorf("Add() returned invalid number of bytes: want %v, got %v", l, n) } - bytes += l - - if packer.Size() < minPackSize { - pm.insertPacker(packer) - continue - } - - err = packer.Finalize() - if err != nil { - t.Fatal(err) - } - - packID := restic.IDFromHash(packer.hw.Sum(nil)) - var beHash []byte - if packer.beHw != nil { - beHash = packer.beHw.Sum(nil) - } - saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash) + bytes += n } - - return bytes -} - -func flushRemainingPacks(t testing.TB, be Saver, pm *packerManager) (bytes int) { - if pm.countPacker() > 0 { - for _, packer := range pm.packers { - err := packer.Finalize() - if err != nil { - t.Fatal(err) - } - bytes += packer.HeaderOverhead() - - packID := restic.IDFromHash(packer.hw.Sum(nil)) - var beHash []byte - if packer.beHw != nil { - beHash = packer.beHw.Sum(nil) - } - saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash) - } + err := pm.Flush(context.TODO()) + if err != nil { + t.Fatal(err) } - return bytes } @@ -143,13 +69,21 @@ func TestPackerManager(t *testing.T) { func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) - be := mem.New() - pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) + savedBytes := int(0) + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, tp restic.BlobType, p *Packer) error { + err := p.Finalize() + if err != nil { + return err + } + savedBytes += int(p.Size()) + return nil + }) blobBuf := make([]byte, maxBlobSize) - bytes := fillPacks(t, rnd, be, pm, blobBuf) - bytes += flushRemainingPacks(t, be, pm) + bytes := fillPacks(t, rnd, pm, blobBuf) + // bytes does not include the last packs header + test.Equals(t, savedBytes, bytes+36) t.Logf("saved %d bytes", bytes) return int64(bytes) @@ -162,10 +96,6 @@ func BenchmarkPackerManager(t *testing.B) { }) rnd := rand.New(rand.NewSource(randomSeed)) - - be := &mock.Backend{ - SaveFn: func(context.Context, restic.Handle, restic.RewindReader) error { return nil }, - } blobBuf := make([]byte, maxBlobSize) t.ReportAllocs() @@ -174,8 +104,9 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) - fillPacks(t, rnd, be, pm, blobBuf) - flushRemainingPacks(t, be, pm) + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, t restic.BlobType, p *Packer) error { + return nil + }) + fillPacks(t, rnd, pm, blobBuf) } } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 7f42dbdd5..a185032b5 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -520,8 +520,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) innerWg, ctx := errgroup.WithContext(ctx) r.packerWg = innerWg r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) - r.treePM = newPackerManager(r.key, r.be.Hasher, restic.TreeBlob, r.uploader.QueuePacker) - r.dataPM = newPackerManager(r.key, r.be.Hasher, restic.DataBlob, r.uploader.QueuePacker) + r.treePM = newPackerManager(r.key, restic.TreeBlob, r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, restic.DataBlob, r.uploader.QueuePacker) wg.Go(func() error { return innerWg.Wait() From dbb5860dc91e1f44b5333e3c7d5352875fa6b4a8 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 3 Jul 2022 11:19:24 +0200 Subject: [PATCH 6/7] Document connections and compression option --- doc/047_tuning_backup_parameters.rst | 40 ++++++++++++++++++++++++++++ doc/index.rst | 1 + 2 files changed, 41 insertions(+) create mode 100644 doc/047_tuning_backup_parameters.rst diff --git a/doc/047_tuning_backup_parameters.rst b/doc/047_tuning_backup_parameters.rst new file mode 100644 index 000000000..e63dc5f87 --- /dev/null +++ b/doc/047_tuning_backup_parameters.rst @@ -0,0 +1,40 @@ +.. + Normally, there are no heading levels assigned to certain characters as the structure is + determined from the succession of headings. However, this convention is used in Python’s + Style Guide for documenting which you may follow: + # with overline, for parts + * for chapters + = for sections + - for subsections + ^ for subsubsections + " for paragraphs +######################## +Tuning Backup Parameters +######################## + +Restic offers a few parameters that allow tuning the backup. The default values should +work well in general although specific use cases can benefit from different non-default +values. As the restic commands evolve over time, the optimal value for each parameter +can also change across restic versions. + + +Backend Connections +=================== + +Restic uses a global limit for the number of concurrent connections to a backend. +This limit can be configured using ``-o .connections=5``, for example for +the REST backend the parameter would be ``-o rest.connections=5``. By default restic uses +``5`` connections for each backend, except for the local backend which uses a limit of ``2``. +The defaults should work well in most cases. For high-latency backends it can be beneficial +to increase the number of connections. Please be aware that this increases the resource +consumption of restic and that a too high connection count *will degrade performace*. + + +Compression +=========== + +For a repository using a least repository format version 2, you can configure how data +is compressed with the option ``--compression``. It can be set to ```auto``` (the default, +which will compress very fast), ``max`` (which will trade backup speed and CPU usage for +slightly better compression), or ``off`` (which disables compression). Each setting is +only applied for the single run of restic. diff --git a/doc/index.rst b/doc/index.rst index 69bbb8483..034dbda23 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -9,6 +9,7 @@ Restic Documentation 030_preparing_a_new_repo 040_backup 045_working_with_repos + 047_tuning_backup_parameters 050_restore 060_forget 070_encryption From 74df9d5998a843110c0cb6f8cc7e69ebea6eb038 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 3 Jul 2022 11:34:01 +0200 Subject: [PATCH 7/7] Add changelog for async pack uploads --- changelog/unreleased/issue-2696 | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 changelog/unreleased/issue-2696 diff --git a/changelog/unreleased/issue-2696 b/changelog/unreleased/issue-2696 new file mode 100644 index 000000000..1732363f4 --- /dev/null +++ b/changelog/unreleased/issue-2696 @@ -0,0 +1,13 @@ +Enhancement: Improve backup speed with many small files + +We have restructured the backup pipeline to continue reading files while all +upload connections are busy. This allows the backup to already prepare the next +data file such that the upload can continue right once the last one has +completed. This can especially improve the backup performance for high latency +backends. + +The upload concurrency is now controlled using the `-o .connections=5` +option. + +https://github.com/restic/restic/issues/2696 +https://github.com/restic/restic/pull/3489