diff --git a/changelog/unreleased/issue-2696 b/changelog/unreleased/issue-2696 new file mode 100644 index 000000000..1732363f4 --- /dev/null +++ b/changelog/unreleased/issue-2696 @@ -0,0 +1,13 @@ +Enhancement: Improve backup speed with many small files + +We have restructured the backup pipeline to continue reading files while all +upload connections are busy. This allows the backup to already prepare the next +data file such that the upload can continue right once the last one has +completed. This can especially improve the backup performance for high latency +backends. + +The upload concurrency is now controlled using the `-o .connections=5` +option. + +https://github.com/restic/restic/issues/2696 +https://github.com/restic/restic/pull/3489 diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index 93810ce0f..42cabaa68 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" ) var cmdRecover = &cobra.Command{ @@ -131,14 +132,26 @@ func runRecover(gopts GlobalOptions) error { } } - treeID, err := repo.SaveTree(gopts.ctx, tree) - if err != nil { - return errors.Fatalf("unable to save new tree to the repo: %v", err) - } + wg, ctx := errgroup.WithContext(gopts.ctx) + repo.StartPackUploader(ctx, wg) - err = repo.Flush(gopts.ctx) + var treeID restic.ID + wg.Go(func() error { + var err error + treeID, err = repo.SaveTree(ctx, tree) + if err != nil { + return errors.Fatalf("unable to save new tree to the repo: %v", err) + } + + err = repo.Flush(ctx) + if err != nil { + return errors.Fatalf("unable to save blobs to the repo: %v", err) + } + return nil + }) + err = wg.Wait() if err != nil { - return errors.Fatalf("unable to save blobs to the repo: %v", err) + return err } return createSnapshot(gopts.ctx, "/recover", hostname, []string{"recovered"}, repo, &treeID) diff --git a/cmd/restic/cmd_tag.go b/cmd/restic/cmd_tag.go index e6688b2fc..b05cd6e55 100644 --- a/cmd/restic/cmd_tag.go +++ b/cmd/restic/cmd_tag.go @@ -89,10 +89,6 @@ func changeTags(ctx context.Context, repo *repository.Repository, sn *restic.Sna debug.Log("new snapshot saved as %v", id) - if err = repo.Flush(ctx); err != nil { - return false, err - } - // Remove the old snapshot. h := restic.Handle{Type: restic.SnapshotFile, Name: sn.ID().String()} if err = repo.Backend().Remove(ctx, h); err != nil { diff --git a/doc/047_tuning_backup_parameters.rst b/doc/047_tuning_backup_parameters.rst new file mode 100644 index 000000000..e63dc5f87 --- /dev/null +++ b/doc/047_tuning_backup_parameters.rst @@ -0,0 +1,40 @@ +.. + Normally, there are no heading levels assigned to certain characters as the structure is + determined from the succession of headings. However, this convention is used in Python’s + Style Guide for documenting which you may follow: + # with overline, for parts + * for chapters + = for sections + - for subsections + ^ for subsubsections + " for paragraphs +######################## +Tuning Backup Parameters +######################## + +Restic offers a few parameters that allow tuning the backup. The default values should +work well in general although specific use cases can benefit from different non-default +values. As the restic commands evolve over time, the optimal value for each parameter +can also change across restic versions. + + +Backend Connections +=================== + +Restic uses a global limit for the number of concurrent connections to a backend. +This limit can be configured using ``-o .connections=5``, for example for +the REST backend the parameter would be ``-o rest.connections=5``. By default restic uses +``5`` connections for each backend, except for the local backend which uses a limit of ``2``. +The defaults should work well in most cases. For high-latency backends it can be beneficial +to increase the number of connections. Please be aware that this increases the resource +consumption of restic and that a too high connection count *will degrade performace*. + + +Compression +=========== + +For a repository using a least repository format version 2, you can configure how data +is compressed with the option ``--compression``. It can be set to ```auto``` (the default, +which will compress very fast), ``max`` (which will trade backup speed and CPU usage for +slightly better compression), or ``off`` (which disables compression). Each setting is +only applied for the single run of restic. diff --git a/doc/index.rst b/doc/index.rst index 69bbb8483..034dbda23 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -9,6 +9,7 @@ Restic Documentation 030_preparing_a_new_repo 040_backup 045_working_with_repos + 047_tuning_backup_parameters 050_restore 060_forget 070_encryption diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 43153030a..7d28fcb30 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -122,13 +122,18 @@ func (o Options) ApplyDefaults() Options { } if o.SaveBlobConcurrency == 0 { - o.SaveBlobConcurrency = uint(runtime.NumCPU()) + // blob saving is CPU bound due to hash checking and encryption + // the actual upload is handled by the repository itself + o.SaveBlobConcurrency = uint(runtime.GOMAXPROCS(0)) } if o.SaveTreeConcurrency == 0 { - // use a relatively high concurrency here, having multiple SaveTree - // workers is cheap - o.SaveTreeConcurrency = o.SaveBlobConcurrency * 20 + // can either wait for a file, wait for a tree, serialize a tree or wait for saveblob + // the last two are cpu-bound and thus mutually exclusive. + // Also allow waiting for FileReadConcurrency files, this is the maximum of FutureFiles + // which currently can be in progress. The main backup loop blocks when trying to queue + // more files to read. + o.SaveTreeConcurrency = uint(runtime.GOMAXPROCS(0)) + o.FileReadConcurrency } return o @@ -801,40 +806,47 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return nil, restic.ID{}, err } - wg, wgCtx := errgroup.WithContext(ctx) - start := time.Now() - var rootTreeID restic.ID - var stats ItemStats - wg.Go(func() error { - arch.runWorkers(wgCtx, wg) - debug.Log("starting snapshot") - tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) + wgUp, wgUpCtx := errgroup.WithContext(ctx) + arch.Repo.StartPackUploader(wgUpCtx, wgUp) + + wgUp.Go(func() error { + wg, wgCtx := errgroup.WithContext(wgUpCtx) + start := time.Now() + + var stats ItemStats + wg.Go(func() error { + arch.runWorkers(wgCtx, wg) + + debug.Log("starting snapshot") + tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) + if err != nil { + return err + } + + if len(tree.Nodes) == 0 { + return errors.New("snapshot is empty") + } + + rootTreeID, stats, err = arch.saveTree(wgCtx, tree) + arch.stopWorkers() + return err + }) + + err = wg.Wait() + debug.Log("err is %v", err) + if err != nil { + debug.Log("error while saving tree: %v", err) return err } - if len(tree.Nodes) == 0 { - return errors.New("snapshot is empty") - } + arch.CompleteItem("/", nil, nil, stats, time.Since(start)) - rootTreeID, stats, err = arch.saveTree(wgCtx, tree) - arch.stopWorkers() - return err + return arch.Repo.Flush(ctx) }) - - err = wg.Wait() - debug.Log("err is %v", err) - - if err != nil { - debug.Log("error while saving tree: %v", err) - return nil, restic.ID{}, err - } - - arch.CompleteItem("/", nil, nil, stats, time.Since(start)) - - err = arch.Repo.Flush(ctx) + err = wgUp.Wait() if err != nil { return nil, restic.ID{}, err } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index dfc24b27b..1ae126014 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -42,6 +42,7 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo rest func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch := New(repo, filesystem, Options{}) arch.runWorkers(ctx, wg) @@ -213,6 +214,7 @@ func TestArchiverSave(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { @@ -281,6 +283,7 @@ func TestArchiverSaveReaderFS(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) ts := time.Now() filename := "xx" @@ -830,6 +833,7 @@ func TestArchiverSaveDir(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(context.Background()) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.runWorkers(ctx, wg) @@ -914,6 +918,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) { // archiver did save the same tree several times for i := 0; i < 5; i++ { wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.runWorkers(ctx, wg) @@ -1097,6 +1102,7 @@ func TestArchiverSaveTree(t *testing.T) { } wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch.runWorkers(ctx, wg) @@ -2239,6 +2245,7 @@ func TestRacyFileSwap(t *testing.T) { defer cancel() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: statfs}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index f658613c3..a4245d574 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -20,6 +20,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var checkerTestData = filepath.Join("testdata", "checker-test-repo.tar.gz") @@ -476,6 +477,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { Nodes: []*restic.Node{damagedNode}, } + wg, wgCtx := errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) id, err := repo.SaveTree(ctx, damagedTree) test.OK(t, repo.Flush(ctx)) test.OK(t, err) @@ -483,6 +486,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { buf, err := repo.LoadBlob(ctx, restic.TreeBlob, id, nil) test.OK(t, err) + wg, wgCtx = errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) _, _, _, err = repo.SaveBlob(ctx, restic.DataBlob, buf, id, false) test.OK(t, err) diff --git a/internal/pack/pack.go b/internal/pack/pack.go index cf0093e70..1d991ccb5 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -70,14 +70,13 @@ type compressedHeaderEntry struct { } // Finalize writes the header for all added blobs and finalizes the pack. -// Returned are the number of bytes written, not yet reported by Add. -func (p *Packer) Finalize() (int, error) { +func (p *Packer) Finalize() error { p.m.Lock() defer p.m.Unlock() header, err := p.makeHeader() if err != nil { - return 0, err + return err } encryptedHeader := make([]byte, 0, restic.CiphertextLength(len(header))) @@ -88,22 +87,27 @@ func (p *Packer) Finalize() (int, error) { // append the header n, err := p.wr.Write(encryptedHeader) if err != nil { - return 0, errors.Wrap(err, "Write") + return errors.Wrap(err, "Write") } hdrBytes := len(encryptedHeader) if n != hdrBytes { - return 0, errors.New("wrong number of bytes written") + return errors.New("wrong number of bytes written") } // write length err = binary.Write(p.wr, binary.LittleEndian, uint32(hdrBytes)) if err != nil { - return 0, errors.Wrap(err, "binary.Write") + return errors.Wrap(err, "binary.Write") } p.bytes += uint(hdrBytes + binary.Size(uint32(0))) - return restic.CiphertextLength(0) + binary.Size(uint32(0)), nil + return nil +} + +// HeaderOverhead returns an estimate of the number of bytes written by a call to Finalize. +func (p *Packer) HeaderOverhead() int { + return restic.CiphertextLength(0) + binary.Size(uint32(0)) } // makeHeader constructs the header for p. diff --git a/internal/pack/pack_test.go b/internal/pack/pack_test.go index 6170e807c..7c8250613 100644 --- a/internal/pack/pack_test.go +++ b/internal/pack/pack_test.go @@ -42,7 +42,7 @@ func newPack(t testing.TB, k *crypto.Key, lengths []int) ([]Buf, []byte, uint) { rtest.OK(t, err) } - _, err := p.Finalize() + err := p.Finalize() rtest.OK(t, err) return bufs, buf.Bytes(), p.Size() diff --git a/internal/repository/fuzz_test.go b/internal/repository/fuzz_test.go index 3847f37f7..7a98477b6 100644 --- a/internal/repository/fuzz_test.go +++ b/internal/repository/fuzz_test.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) // Test saving a blob and loading it again, with varying buffer sizes. @@ -23,6 +24,9 @@ func FuzzSaveLoadBlob(f *testing.F) { id := restic.Hash(blob) repo, _ := TestRepositoryWithBackend(t, mem.New(), 2) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + _, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, blob, id, false) if err != nil { t.Fatal(err) diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index c7acec7f1..32b2c9b7a 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -1,9 +1,10 @@ package repository import ( + "bufio" "context" - "hash" "io" + "io/ioutil" "os" "runtime" "sync" @@ -20,129 +21,171 @@ import ( "github.com/minio/sha256-simd" ) -// Saver implements saving data in a backend. -type Saver interface { - Save(context.Context, restic.Handle, restic.RewindReader) error - Hasher() hash.Hash -} - // Packer holds a pack.Packer together with a hash writer. type Packer struct { *pack.Packer - hw *hashing.Writer - beHw *hashing.Writer tmpfile *os.File + bufWr *bufio.Writer } // packerManager keeps a list of open packs and creates new on demand. type packerManager struct { - be Saver + tpe restic.BlobType key *crypto.Key - pm sync.Mutex - packers []*Packer + queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error + + pm sync.Mutex + packer *Packer } const minPackSize = 4 * 1024 * 1024 // newPackerManager returns an new packer manager which writes temporary files // to a temporary directory -func newPackerManager(be Saver, key *crypto.Key) *packerManager { +func newPackerManager(key *crypto.Key, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { return &packerManager{ - be: be, - key: key, + tpe: tpe, + key: key, + queueFn: queueFn, } } +func (r *packerManager) Flush(ctx context.Context) error { + r.pm.Lock() + defer r.pm.Unlock() + + if r.packer != nil { + debug.Log("manually flushing pending pack") + err := r.queueFn(ctx, r.tpe, r.packer) + if err != nil { + return err + } + r.packer = nil + } + return nil +} + +func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) { + r.pm.Lock() + defer r.pm.Unlock() + + var err error + packer := r.packer + if r.packer == nil { + packer, err = r.newPacker() + if err != nil { + return 0, err + } + } + // remember packer + r.packer = packer + + // save ciphertext + // Add only appends bytes in memory to avoid being a scaling bottleneck + size, err := packer.Add(t, id, ciphertext, uncompressedLength) + if err != nil { + return 0, err + } + + // if the pack is not full enough, put back to the list + if packer.Size() < minPackSize { + debug.Log("pack is not full enough (%d bytes)", packer.Size()) + return size, nil + } + // forget full packer + r.packer = nil + + // call while holding lock to prevent findPacker from creating new packers if the uploaders are busy + // else write the pack to the backend + err = r.queueFn(ctx, t, packer) + if err != nil { + return 0, err + } + + return size + packer.HeaderOverhead(), nil +} + // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. -func (r *packerManager) findPacker() (packer *Packer, err error) { - r.pm.Lock() - defer r.pm.Unlock() - - // search for a suitable packer - if len(r.packers) > 0 { - p := r.packers[0] - last := len(r.packers) - 1 - r.packers[0] = r.packers[last] - r.packers[last] = nil // Allow GC of stale reference. - r.packers = r.packers[:last] - return p, nil - } - - // no suitable packer found, return new +func (r *packerManager) newPacker() (packer *Packer, err error) { debug.Log("create new pack") tmpfile, err := fs.TempFile("", "restic-temp-pack-") if err != nil { return nil, errors.Wrap(err, "fs.TempFile") } - w := io.Writer(tmpfile) - beHasher := r.be.Hasher() - var beHw *hashing.Writer - if beHasher != nil { - beHw = hashing.NewWriter(w, beHasher) - w = beHw - } - - hw := hashing.NewWriter(w, sha256.New()) - p := pack.NewPacker(r.key, hw) + bufWr := bufio.NewWriter(tmpfile) + p := pack.NewPacker(r.key, bufWr) packer = &Packer{ Packer: p, - beHw: beHw, - hw: hw, tmpfile: tmpfile, + bufWr: bufWr, } return packer, nil } -// insertPacker appends p to s.packs. -func (r *packerManager) insertPacker(p *Packer) { - r.pm.Lock() - defer r.pm.Unlock() - - r.packers = append(r.packers, p) - debug.Log("%d packers\n", len(r.packers)) -} - // savePacker stores p in the backend. -func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) (int, error) { +func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error { debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size()) - hdrOverhead, err := p.Packer.Finalize() + err := p.Packer.Finalize() if err != nil { - return 0, err + return err + } + err = p.bufWr.Flush() + if err != nil { + return err } - id := restic.IDFromHash(p.hw.Sum(nil)) + // calculate sha256 hash in a second pass + var rd io.Reader + rd, err = restic.NewFileReader(p.tmpfile, nil) + if err != nil { + return err + } + beHasher := r.be.Hasher() + var beHr *hashing.Reader + if beHasher != nil { + beHr = hashing.NewReader(rd, beHasher) + rd = beHr + } + + hr := hashing.NewReader(rd, sha256.New()) + _, err = io.Copy(ioutil.Discard, hr) + if err != nil { + return err + } + + id := restic.IDFromHash(hr.Sum(nil)) h := restic.Handle{Type: restic.PackFile, Name: id.String(), ContainedBlobType: t} var beHash []byte - if p.beHw != nil { - beHash = p.beHw.Sum(nil) + if beHr != nil { + beHash = beHr.Sum(nil) } - rd, err := restic.NewFileReader(p.tmpfile, beHash) + rrd, err := restic.NewFileReader(p.tmpfile, beHash) if err != nil { - return 0, err + return err } - err = r.be.Save(ctx, h, rd) + err = r.be.Save(ctx, h, rrd) if err != nil { debug.Log("Save(%v) error: %v", h, err) - return 0, err + return err } debug.Log("saved as %v", h) err = p.tmpfile.Close() if err != nil { - return 0, errors.Wrap(err, "close tempfile") + return errors.Wrap(err, "close tempfile") } // on windows the tempfile is automatically deleted on close if runtime.GOOS != "windows" { err = fs.RemoveIfExists(p.tmpfile.Name()) if err != nil { - return 0, errors.Wrap(err, "Remove") + return errors.Wrap(err, "Remove") } } @@ -152,15 +195,7 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe // Save index if full if r.noAutoIndexUpdate { - return hdrOverhead, nil + return nil } - return hdrOverhead, r.idx.SaveFullIndex(ctx, r) -} - -// countPacker returns the number of open (unfinished) packers. -func (r *packerManager) countPacker() int { - r.pm.Lock() - defer r.pm.Unlock() - - return len(r.packers) + return r.idx.SaveFullIndex(ctx, r) } diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 633e1be35..67a33c757 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -4,15 +4,12 @@ import ( "context" "io" "math/rand" - "os" "sync" "testing" - "github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/crypto" - "github.com/restic/restic/internal/fs" - "github.com/restic/restic/internal/mock" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" ) func randomID(rd io.Reader) restic.ID { @@ -33,91 +30,27 @@ func min(a, b int) int { return b } -func saveFile(t testing.TB, be Saver, length int, f *os.File, id restic.ID, hash []byte) { - h := restic.Handle{Type: restic.PackFile, Name: id.String()} - t.Logf("save file %v", h) - - rd, err := restic.NewFileReader(f, hash) - if err != nil { - t.Fatal(err) - } - - err = be.Save(context.TODO(), h, rd) - if err != nil { - t.Fatal(err) - } - - if err := f.Close(); err != nil { - t.Fatal(err) - } - - if err := fs.RemoveIfExists(f.Name()); err != nil { - t.Fatal(err) - } -} - -func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf []byte) (bytes int) { +func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) { for i := 0; i < 100; i++ { l := rnd.Intn(maxBlobSize) - - packer, err := pm.findPacker() - if err != nil { - t.Fatal(err) - } - id := randomID(rnd) buf = buf[:l] // Only change a few bytes so we know we're not benchmarking the RNG. rnd.Read(buf[:min(l, 4)]) - n, err := packer.Add(restic.DataBlob, id, buf, 0) + n, err := pm.SaveBlob(context.TODO(), restic.DataBlob, id, buf, 0) if err != nil { t.Fatal(err) } - if n != l+37 { + if n != l+37 && n != l+37+36 { t.Errorf("Add() returned invalid number of bytes: want %v, got %v", l, n) } - bytes += l - - if packer.Size() < minPackSize { - pm.insertPacker(packer) - continue - } - - _, err = packer.Finalize() - if err != nil { - t.Fatal(err) - } - - packID := restic.IDFromHash(packer.hw.Sum(nil)) - var beHash []byte - if packer.beHw != nil { - beHash = packer.beHw.Sum(nil) - } - saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash) + bytes += n } - - return bytes -} - -func flushRemainingPacks(t testing.TB, be Saver, pm *packerManager) (bytes int) { - if pm.countPacker() > 0 { - for _, packer := range pm.packers { - n, err := packer.Finalize() - if err != nil { - t.Fatal(err) - } - bytes += n - - packID := restic.IDFromHash(packer.hw.Sum(nil)) - var beHash []byte - if packer.beHw != nil { - beHash = packer.beHw.Sum(nil) - } - saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash) - } + err := pm.Flush(context.TODO()) + if err != nil { + t.Fatal(err) } - return bytes } @@ -136,13 +69,21 @@ func TestPackerManager(t *testing.T) { func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) - be := mem.New() - pm := newPackerManager(be, crypto.NewRandomKey()) + savedBytes := int(0) + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, tp restic.BlobType, p *Packer) error { + err := p.Finalize() + if err != nil { + return err + } + savedBytes += int(p.Size()) + return nil + }) blobBuf := make([]byte, maxBlobSize) - bytes := fillPacks(t, rnd, be, pm, blobBuf) - bytes += flushRemainingPacks(t, be, pm) + bytes := fillPacks(t, rnd, pm, blobBuf) + // bytes does not include the last packs header + test.Equals(t, savedBytes, bytes+36) t.Logf("saved %d bytes", bytes) return int64(bytes) @@ -155,10 +96,6 @@ func BenchmarkPackerManager(t *testing.B) { }) rnd := rand.New(rand.NewSource(randomSeed)) - - be := &mock.Backend{ - SaveFn: func(context.Context, restic.Handle, restic.RewindReader) error { return nil }, - } blobBuf := make([]byte, maxBlobSize) t.ReportAllocs() @@ -167,8 +104,9 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(be, crypto.NewRandomKey()) - fillPacks(t, rnd, be, pm, blobBuf) - flushRemainingPacks(t, be, pm) + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, t restic.BlobType, p *Packer) error { + return nil + }) + fillPacks(t, rnd, pm, blobBuf) } } diff --git a/internal/repository/packer_uploader.go b/internal/repository/packer_uploader.go new file mode 100644 index 000000000..30c8f77af --- /dev/null +++ b/internal/repository/packer_uploader.go @@ -0,0 +1,63 @@ +package repository + +import ( + "context" + + "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" +) + +// SavePacker implements saving a pack in the repository. +type SavePacker interface { + savePacker(ctx context.Context, t restic.BlobType, p *Packer) error +} + +type uploadTask struct { + packer *Packer + tpe restic.BlobType +} + +type packerUploader struct { + uploadQueue chan uploadTask +} + +func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, connections uint) *packerUploader { + pu := &packerUploader{ + uploadQueue: make(chan uploadTask), + } + + for i := 0; i < int(connections); i++ { + wg.Go(func() error { + for { + select { + case t, ok := <-pu.uploadQueue: + if !ok { + return nil + } + err := repo.savePacker(ctx, t.tpe, t.packer) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + return pu +} + +func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *Packer) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + case pu.uploadQueue <- uploadTask{tpe: t, packer: p}: + } + + return nil +} + +func (pu *packerUploader) TriggerShutdown() { + close(pu.uploadQueue) +} diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 80902c11c..7840e714a 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -28,9 +28,25 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito return nil, errors.Fatal("repack step requires a backend connection limit of at least two") } - var keepMutex sync.Mutex wg, wgCtx := errgroup.WithContext(ctx) + dstRepo.StartPackUploader(wgCtx, wg) + wg.Go(func() error { + var err error + obsoletePacks, err = repack(wgCtx, repo, dstRepo, packs, keepBlobs, p) + return err + }) + + if err := wg.Wait(); err != nil { + return nil, err + } + return obsoletePacks, nil +} + +func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { + wg, wgCtx := errgroup.WithContext(ctx) + + var keepMutex sync.Mutex downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { defer close(downloadQueue) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 248477292..9389d643e 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -8,6 +8,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) func randomSize(min, max int) int { @@ -15,6 +16,9 @@ func randomSize(min, max int) int { } func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData float32) { + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + for i := 0; i < blobs; i++ { var ( tpe restic.BlobType @@ -46,6 +50,7 @@ func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData fl if err = repo.Flush(context.Background()); err != nil { t.Fatalf("repo.Flush() returned error %v", err) } + repo.StartPackUploader(context.TODO(), &wg) } } @@ -62,6 +67,8 @@ func createRandomWrongBlob(t testing.TB, repo restic.Repository) { // invert first data byte buf[0] ^= 0xff + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) _, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, id, false) if err != nil { t.Fatalf("SaveFrom() error %v", err) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 3ecf853d2..a185032b5 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -41,8 +41,10 @@ type Repository struct { noAutoIndexUpdate bool - treePM *packerManager - dataPM *packerManager + packerWg *errgroup.Group + uploader *packerUploader + treePM *packerManager + dataPM *packerManager allocEnc sync.Once allocDec sync.Once @@ -100,11 +102,9 @@ func (c *CompressionMode) Type() string { // New returns a new repository with backend be. func New(be restic.Backend, opts Options) *Repository { repo := &Repository{ - be: be, - opts: opts, - idx: NewMasterIndex(), - dataPM: newPackerManager(be, nil), - treePM: newPackerManager(be, nil), + be: be, + opts: opts, + idx: NewMasterIndex(), } return repo @@ -416,31 +416,7 @@ func (r *Repository) saveAndEncrypt(ctx context.Context, t restic.BlobType, data panic(fmt.Sprintf("invalid type: %v", t)) } - packer, err := pm.findPacker() - if err != nil { - return 0, err - } - - // save ciphertext - size, err = packer.Add(t, id, ciphertext, uncompressedLength) - if err != nil { - return 0, err - } - - // if the pack is not full enough, put back to the list - if packer.Size() < minPackSize { - debug.Log("pack is not full enough (%d bytes)", packer.Size()) - pm.insertPacker(packer) - return size, nil - } - - // else write the pack to the backend - hdrSize, err := r.savePacker(ctx, t, packer) - if err != nil { - return 0, err - } - - return size + hdrSize, nil + return pm.SaveBlob(ctx, t, id, ciphertext, uncompressedLength) } // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the @@ -536,31 +512,45 @@ func (r *Repository) Flush(ctx context.Context) error { return r.idx.SaveIndex(ctx, r) } -// flushPacks saves all remaining packs. +func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) { + if r.packerWg != nil { + panic("uploader already started") + } + + innerWg, ctx := errgroup.WithContext(ctx) + r.packerWg = innerWg + r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) + r.treePM = newPackerManager(r.key, restic.TreeBlob, r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, restic.DataBlob, r.uploader.QueuePacker) + + wg.Go(func() error { + return innerWg.Wait() + }) +} + +// FlushPacks saves all remaining packs. func (r *Repository) flushPacks(ctx context.Context) error { - pms := []struct { - t restic.BlobType - pm *packerManager - }{ - {restic.DataBlob, r.dataPM}, - {restic.TreeBlob, r.treePM}, + if r.packerWg == nil { + return nil } - for _, p := range pms { - p.pm.pm.Lock() - - debug.Log("manually flushing %d packs", len(p.pm.packers)) - for _, packer := range p.pm.packers { - _, err := r.savePacker(ctx, p.t, packer) - if err != nil { - p.pm.pm.Unlock() - return err - } - } - p.pm.packers = p.pm.packers[:0] - p.pm.pm.Unlock() + err := r.treePM.Flush(ctx) + if err != nil { + return err } - return nil + err = r.dataPM.Flush(ctx) + if err != nil { + return err + } + r.uploader.TriggerShutdown() + err = r.packerWg.Wait() + + r.treePM = nil + r.dataPM = nil + r.uploader = nil + r.packerWg = nil + + return err } // Backend returns the backend for the repository. @@ -715,8 +705,6 @@ func (r *Repository) SearchKey(ctx context.Context, password string, maxKeys int } r.key = key.master - r.dataPM.key = key.master - r.treePM.key = key.master r.keyName = key.Name() cfg, err := restic.LoadConfig(ctx, r) if err == crypto.ErrUnauthenticated { @@ -768,8 +756,6 @@ func (r *Repository) init(ctx context.Context, password string, cfg restic.Confi } r.key = key.master - r.dataPM.key = key.master - r.treePM.key = key.master r.keyName = key.Name() r.setConfig(cfg) _, err = r.SaveJSONUnpacked(ctx, restic.ConfigFile, cfg) diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index f0e25f520..38d3117a5 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -22,6 +22,7 @@ import ( "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20} @@ -43,6 +44,9 @@ func testSave(t *testing.T, version uint) { id := restic.Hash(data) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + // save sid, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}, false) rtest.OK(t, err) @@ -82,6 +86,9 @@ func testSaveFrom(t *testing.T, version uint) { id := restic.Hash(data) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + // save id2, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, id, false) rtest.OK(t, err) @@ -187,6 +194,9 @@ func testLoadBlob(t *testing.T, version uint) { _, err := io.ReadFull(rnd, buf) rtest.OK(t, err) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(t, err) rtest.OK(t, repo.Flush(context.Background())) @@ -220,6 +230,9 @@ func benchmarkLoadBlob(b *testing.B, version uint) { _, err := io.ReadFull(rnd, buf) rtest.OK(b, err) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(b, err) rtest.OK(b, repo.Flush(context.Background())) @@ -389,6 +402,9 @@ func benchmarkLoadIndex(b *testing.B, version uint) { // saveRandomDataBlobs generates random data blobs and saves them to the repository. func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) { + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + for i := 0; i < num; i++ { size := rand.Int() % sizeMax diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 35fdbabcb..73f53adb1 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -5,6 +5,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/ui/progress" + "golang.org/x/sync/errgroup" ) // Repository stores data in a backend. It provides high-level functions and @@ -34,6 +35,10 @@ type Repository interface { // the the pack header. ListPack(context.Context, ID, int64) ([]Blob, uint32, error) + // StartPackUploader start goroutines to upload new pack files. The errgroup + // is used to immediately notify about an upload error. Flush() will also return + // that error. + StartPackUploader(ctx context.Context, wg *errgroup.Group) Flush(context.Context) error SaveUnpacked(context.Context, FileType, []byte) (ID, error) diff --git a/internal/restic/testing.go b/internal/restic/testing.go index 54621c183..cc5e7d890 100644 --- a/internal/restic/testing.go +++ b/internal/restic/testing.go @@ -10,6 +10,7 @@ import ( "time" "github.com/restic/chunker" + "golang.org/x/sync/errgroup" ) // fakeFile returns a reader which yields deterministic pseudo-random data. @@ -169,9 +170,17 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, rand: rand.New(rand.NewSource(seed)), } + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + treeID := fs.saveTree(context.TODO(), seed, depth) snapshot.Tree = &treeID + err = repo.Flush(context.Background()) + if err != nil { + t.Fatal(err) + } + id, err := repo.SaveJSONUnpacked(context.TODO(), SnapshotFile, snapshot) if err != nil { t.Fatal(err) @@ -181,11 +190,6 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, t.Logf("saved snapshot %v", id.Str()) - err = repo.Flush(context.Background()) - if err != nil { - t.Fatal(err) - } - return snapshot } diff --git a/internal/restic/tree_test.go b/internal/restic/tree_test.go index 598a0ad4b..0f9b9a9d3 100644 --- a/internal/restic/tree_test.go +++ b/internal/restic/tree_test.go @@ -12,6 +12,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var testFiles = []struct { @@ -98,6 +99,8 @@ func TestLoadTree(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) // save tree tree := restic.NewTree(0) id, err := repo.SaveTree(context.TODO(), tree) diff --git a/internal/restorer/restorer_test.go b/internal/restorer/restorer_test.go index 7e7e0c4c6..512c144ab 100644 --- a/internal/restorer/restorer_test.go +++ b/internal/restorer/restorer_test.go @@ -15,6 +15,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) type Node interface{} @@ -122,8 +123,9 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot) (*res ctx, cancel := context.WithCancel(context.Background()) defer cancel() + wg, wgCtx := errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) treeID := saveDir(t, repo, snapshot.Nodes, 1000) - err := repo.Flush(ctx) if err != nil { t.Fatal(err)