From 120ccc87546722112a0758a68dab2cfe934b3cfb Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Aug 2021 22:52:05 +0200 Subject: [PATCH] repository: Rework blob saving to use an async pack uploader Previously, SaveAndEncrypt would assemble blobs into packs and either return immediately if the pack is not yet full or upload the pack file otherwise. The upload will block the current goroutine until it finishes. Now, the upload is done using separate goroutines. This requires changes to the error handling. As uploads are no longer tied to a SaveAndEncrypt call, failed uploads are signaled using an errgroup. To count the uploaded amount of data, the pack header overhead is no longer returned by `packer.Finalize` but rather by `packer.HeaderOverhead`. This helper method is necessary to continue returning the pack header overhead directly to the responsible call to `repository.SaveBlob`. Without the method this would not be possible, as packs are finalized asynchronously. --- cmd/restic/cmd_recover.go | 25 ++++-- internal/archiver/archiver.go | 59 ++++++------ internal/archiver/archiver_test.go | 7 ++ internal/checker/checker_test.go | 5 ++ internal/pack/pack.go | 18 ++-- internal/pack/pack_test.go | 2 +- internal/repository/fuzz_test.go | 4 + internal/repository/packer_manager.go | 84 ++++++++++++----- internal/repository/packer_manager_test.go | 17 ++-- internal/repository/packer_uploader.go | 63 +++++++++++++ internal/repository/repack.go | 18 +++- internal/repository/repack_test.go | 7 ++ internal/repository/repository.go | 100 +++++++++------------ internal/repository/repository_test.go | 16 ++++ internal/restic/repository.go | 5 ++ internal/restic/testing.go | 14 +-- internal/restic/tree_test.go | 3 + internal/restorer/restorer_test.go | 4 +- 18 files changed, 321 insertions(+), 130 deletions(-) create mode 100644 internal/repository/packer_uploader.go diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index 93810ce0f..42cabaa68 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" ) var cmdRecover = &cobra.Command{ @@ -131,14 +132,26 @@ func runRecover(gopts GlobalOptions) error { } } - treeID, err := repo.SaveTree(gopts.ctx, tree) - if err != nil { - return errors.Fatalf("unable to save new tree to the repo: %v", err) - } + wg, ctx := errgroup.WithContext(gopts.ctx) + repo.StartPackUploader(ctx, wg) - err = repo.Flush(gopts.ctx) + var treeID restic.ID + wg.Go(func() error { + var err error + treeID, err = repo.SaveTree(ctx, tree) + if err != nil { + return errors.Fatalf("unable to save new tree to the repo: %v", err) + } + + err = repo.Flush(ctx) + if err != nil { + return errors.Fatalf("unable to save blobs to the repo: %v", err) + } + return nil + }) + err = wg.Wait() if err != nil { - return errors.Fatalf("unable to save blobs to the repo: %v", err) + return err } return createSnapshot(gopts.ctx, "/recover", hostname, []string{"recovered"}, repo, &treeID) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 43153030a..90b9e6904 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -801,40 +801,47 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return nil, restic.ID{}, err } - wg, wgCtx := errgroup.WithContext(ctx) - start := time.Now() - var rootTreeID restic.ID - var stats ItemStats - wg.Go(func() error { - arch.runWorkers(wgCtx, wg) - debug.Log("starting snapshot") - tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) + wgUp, wgUpCtx := errgroup.WithContext(ctx) + arch.Repo.StartPackUploader(wgUpCtx, wgUp) + + wgUp.Go(func() error { + wg, wgCtx := errgroup.WithContext(wgUpCtx) + start := time.Now() + + var stats ItemStats + wg.Go(func() error { + arch.runWorkers(wgCtx, wg) + + debug.Log("starting snapshot") + tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) + if err != nil { + return err + } + + if len(tree.Nodes) == 0 { + return errors.New("snapshot is empty") + } + + rootTreeID, stats, err = arch.saveTree(wgCtx, tree) + arch.stopWorkers() + return err + }) + + err = wg.Wait() + debug.Log("err is %v", err) + if err != nil { + debug.Log("error while saving tree: %v", err) return err } - if len(tree.Nodes) == 0 { - return errors.New("snapshot is empty") - } + arch.CompleteItem("/", nil, nil, stats, time.Since(start)) - rootTreeID, stats, err = arch.saveTree(wgCtx, tree) - arch.stopWorkers() - return err + return arch.Repo.Flush(ctx) }) - - err = wg.Wait() - debug.Log("err is %v", err) - - if err != nil { - debug.Log("error while saving tree: %v", err) - return nil, restic.ID{}, err - } - - arch.CompleteItem("/", nil, nil, stats, time.Since(start)) - - err = arch.Repo.Flush(ctx) + err = wgUp.Wait() if err != nil { return nil, restic.ID{}, err } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index dfc24b27b..1ae126014 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -42,6 +42,7 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo rest func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch := New(repo, filesystem, Options{}) arch.runWorkers(ctx, wg) @@ -213,6 +214,7 @@ func TestArchiverSave(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { @@ -281,6 +283,7 @@ func TestArchiverSaveReaderFS(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) ts := time.Now() filename := "xx" @@ -830,6 +833,7 @@ func TestArchiverSaveDir(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(context.Background()) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.runWorkers(ctx, wg) @@ -914,6 +918,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) { // archiver did save the same tree several times for i := 0; i < 5; i++ { wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.runWorkers(ctx, wg) @@ -1097,6 +1102,7 @@ func TestArchiverSaveTree(t *testing.T) { } wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch.runWorkers(ctx, wg) @@ -2239,6 +2245,7 @@ func TestRacyFileSwap(t *testing.T) { defer cancel() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: statfs}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index f658613c3..a4245d574 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -20,6 +20,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var checkerTestData = filepath.Join("testdata", "checker-test-repo.tar.gz") @@ -476,6 +477,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { Nodes: []*restic.Node{damagedNode}, } + wg, wgCtx := errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) id, err := repo.SaveTree(ctx, damagedTree) test.OK(t, repo.Flush(ctx)) test.OK(t, err) @@ -483,6 +486,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { buf, err := repo.LoadBlob(ctx, restic.TreeBlob, id, nil) test.OK(t, err) + wg, wgCtx = errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) _, _, _, err = repo.SaveBlob(ctx, restic.DataBlob, buf, id, false) test.OK(t, err) diff --git a/internal/pack/pack.go b/internal/pack/pack.go index cf0093e70..1d991ccb5 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -70,14 +70,13 @@ type compressedHeaderEntry struct { } // Finalize writes the header for all added blobs and finalizes the pack. -// Returned are the number of bytes written, not yet reported by Add. -func (p *Packer) Finalize() (int, error) { +func (p *Packer) Finalize() error { p.m.Lock() defer p.m.Unlock() header, err := p.makeHeader() if err != nil { - return 0, err + return err } encryptedHeader := make([]byte, 0, restic.CiphertextLength(len(header))) @@ -88,22 +87,27 @@ func (p *Packer) Finalize() (int, error) { // append the header n, err := p.wr.Write(encryptedHeader) if err != nil { - return 0, errors.Wrap(err, "Write") + return errors.Wrap(err, "Write") } hdrBytes := len(encryptedHeader) if n != hdrBytes { - return 0, errors.New("wrong number of bytes written") + return errors.New("wrong number of bytes written") } // write length err = binary.Write(p.wr, binary.LittleEndian, uint32(hdrBytes)) if err != nil { - return 0, errors.Wrap(err, "binary.Write") + return errors.Wrap(err, "binary.Write") } p.bytes += uint(hdrBytes + binary.Size(uint32(0))) - return restic.CiphertextLength(0) + binary.Size(uint32(0)), nil + return nil +} + +// HeaderOverhead returns an estimate of the number of bytes written by a call to Finalize. +func (p *Packer) HeaderOverhead() int { + return restic.CiphertextLength(0) + binary.Size(uint32(0)) } // makeHeader constructs the header for p. diff --git a/internal/pack/pack_test.go b/internal/pack/pack_test.go index 6170e807c..7c8250613 100644 --- a/internal/pack/pack_test.go +++ b/internal/pack/pack_test.go @@ -42,7 +42,7 @@ func newPack(t testing.TB, k *crypto.Key, lengths []int) ([]Buf, []byte, uint) { rtest.OK(t, err) } - _, err := p.Finalize() + err := p.Finalize() rtest.OK(t, err) return bufs, buf.Bytes(), p.Size() diff --git a/internal/repository/fuzz_test.go b/internal/repository/fuzz_test.go index 3847f37f7..7a98477b6 100644 --- a/internal/repository/fuzz_test.go +++ b/internal/repository/fuzz_test.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) // Test saving a blob and loading it again, with varying buffer sizes. @@ -23,6 +24,9 @@ func FuzzSaveLoadBlob(f *testing.F) { id := restic.Hash(blob) repo, _ := TestRepositoryWithBackend(t, mem.New(), 2) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + _, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, blob, id, false) if err != nil { t.Fatal(err) diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index c7acec7f1..e3b43b24e 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -20,12 +20,6 @@ import ( "github.com/minio/sha256-simd" ) -// Saver implements saving data in a backend. -type Saver interface { - Save(context.Context, restic.Handle, restic.RewindReader) error - Hasher() hash.Hash -} - // Packer holds a pack.Packer together with a hash writer. type Packer struct { *pack.Packer @@ -36,8 +30,11 @@ type Packer struct { // packerManager keeps a list of open packs and creates new on demand. type packerManager struct { - be Saver - key *crypto.Key + tpe restic.BlobType + key *crypto.Key + hasherFn func() hash.Hash + queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error + pm sync.Mutex packers []*Packer } @@ -46,13 +43,58 @@ const minPackSize = 4 * 1024 * 1024 // newPackerManager returns an new packer manager which writes temporary files // to a temporary directory -func newPackerManager(be Saver, key *crypto.Key) *packerManager { +func newPackerManager(key *crypto.Key, hasherFn func() hash.Hash, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { return &packerManager{ - be: be, - key: key, + tpe: tpe, + key: key, + hasherFn: hasherFn, + queueFn: queueFn, } } +func (r *packerManager) Flush(ctx context.Context) error { + r.pm.Lock() + defer r.pm.Unlock() + + debug.Log("manually flushing %d packs", len(r.packers)) + for _, packer := range r.packers { + err := r.queueFn(ctx, r.tpe, packer) + if err != nil { + return err + } + } + r.packers = r.packers[:0] + return nil +} + +func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) { + packer, err := r.findPacker() + if err != nil { + return 0, err + } + + // save ciphertext + size, err := packer.Add(t, id, ciphertext, uncompressedLength) + if err != nil { + return 0, err + } + + // if the pack is not full enough, put back to the list + if packer.Size() < minPackSize { + debug.Log("pack is not full enough (%d bytes)", packer.Size()) + r.insertPacker(packer) + return size, nil + } + + // else write the pack to the backend + err = r.queueFn(ctx, t, packer) + if err != nil { + return 0, err + } + + return size + packer.HeaderOverhead(), nil +} + // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. func (r *packerManager) findPacker() (packer *Packer, err error) { @@ -77,7 +119,7 @@ func (r *packerManager) findPacker() (packer *Packer, err error) { } w := io.Writer(tmpfile) - beHasher := r.be.Hasher() + beHasher := r.hasherFn() var beHw *hashing.Writer if beHasher != nil { beHw = hashing.NewWriter(w, beHasher) @@ -106,11 +148,11 @@ func (r *packerManager) insertPacker(p *Packer) { } // savePacker stores p in the backend. -func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) (int, error) { +func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error { debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size()) - hdrOverhead, err := p.Packer.Finalize() + err := p.Packer.Finalize() if err != nil { - return 0, err + return err } id := restic.IDFromHash(p.hw.Sum(nil)) @@ -122,27 +164,27 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe } rd, err := restic.NewFileReader(p.tmpfile, beHash) if err != nil { - return 0, err + return err } err = r.be.Save(ctx, h, rd) if err != nil { debug.Log("Save(%v) error: %v", h, err) - return 0, err + return err } debug.Log("saved as %v", h) err = p.tmpfile.Close() if err != nil { - return 0, errors.Wrap(err, "close tempfile") + return errors.Wrap(err, "close tempfile") } // on windows the tempfile is automatically deleted on close if runtime.GOOS != "windows" { err = fs.RemoveIfExists(p.tmpfile.Name()) if err != nil { - return 0, errors.Wrap(err, "Remove") + return errors.Wrap(err, "Remove") } } @@ -152,9 +194,9 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe // Save index if full if r.noAutoIndexUpdate { - return hdrOverhead, nil + return nil } - return hdrOverhead, r.idx.SaveFullIndex(ctx, r) + return r.idx.SaveFullIndex(ctx, r) } // countPacker returns the number of open (unfinished) packers. diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 633e1be35..8713aad4e 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -2,6 +2,7 @@ package repository import ( "context" + "hash" "io" "math/rand" "os" @@ -15,6 +16,12 @@ import ( "github.com/restic/restic/internal/restic" ) +// Saver implements saving data in a backend. +type Saver interface { + Save(context.Context, restic.Handle, restic.RewindReader) error + Hasher() hash.Hash +} + func randomID(rd io.Reader) restic.ID { id := restic.ID{} _, err := io.ReadFull(rd, id[:]) @@ -84,7 +91,7 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf [] continue } - _, err = packer.Finalize() + err = packer.Finalize() if err != nil { t.Fatal(err) } @@ -103,11 +110,11 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf [] func flushRemainingPacks(t testing.TB, be Saver, pm *packerManager) (bytes int) { if pm.countPacker() > 0 { for _, packer := range pm.packers { - n, err := packer.Finalize() + err := packer.Finalize() if err != nil { t.Fatal(err) } - bytes += n + bytes += packer.HeaderOverhead() packID := restic.IDFromHash(packer.hw.Sum(nil)) var beHash []byte @@ -137,7 +144,7 @@ func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) be := mem.New() - pm := newPackerManager(be, crypto.NewRandomKey()) + pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) blobBuf := make([]byte, maxBlobSize) @@ -167,7 +174,7 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(be, crypto.NewRandomKey()) + pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) fillPacks(t, rnd, be, pm, blobBuf) flushRemainingPacks(t, be, pm) } diff --git a/internal/repository/packer_uploader.go b/internal/repository/packer_uploader.go new file mode 100644 index 000000000..30c8f77af --- /dev/null +++ b/internal/repository/packer_uploader.go @@ -0,0 +1,63 @@ +package repository + +import ( + "context" + + "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" +) + +// SavePacker implements saving a pack in the repository. +type SavePacker interface { + savePacker(ctx context.Context, t restic.BlobType, p *Packer) error +} + +type uploadTask struct { + packer *Packer + tpe restic.BlobType +} + +type packerUploader struct { + uploadQueue chan uploadTask +} + +func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, connections uint) *packerUploader { + pu := &packerUploader{ + uploadQueue: make(chan uploadTask), + } + + for i := 0; i < int(connections); i++ { + wg.Go(func() error { + for { + select { + case t, ok := <-pu.uploadQueue: + if !ok { + return nil + } + err := repo.savePacker(ctx, t.tpe, t.packer) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + return pu +} + +func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *Packer) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + case pu.uploadQueue <- uploadTask{tpe: t, packer: p}: + } + + return nil +} + +func (pu *packerUploader) TriggerShutdown() { + close(pu.uploadQueue) +} diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 80902c11c..7840e714a 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -28,9 +28,25 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito return nil, errors.Fatal("repack step requires a backend connection limit of at least two") } - var keepMutex sync.Mutex wg, wgCtx := errgroup.WithContext(ctx) + dstRepo.StartPackUploader(wgCtx, wg) + wg.Go(func() error { + var err error + obsoletePacks, err = repack(wgCtx, repo, dstRepo, packs, keepBlobs, p) + return err + }) + + if err := wg.Wait(); err != nil { + return nil, err + } + return obsoletePacks, nil +} + +func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { + wg, wgCtx := errgroup.WithContext(ctx) + + var keepMutex sync.Mutex downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { defer close(downloadQueue) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 248477292..9389d643e 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -8,6 +8,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) func randomSize(min, max int) int { @@ -15,6 +16,9 @@ func randomSize(min, max int) int { } func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData float32) { + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + for i := 0; i < blobs; i++ { var ( tpe restic.BlobType @@ -46,6 +50,7 @@ func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData fl if err = repo.Flush(context.Background()); err != nil { t.Fatalf("repo.Flush() returned error %v", err) } + repo.StartPackUploader(context.TODO(), &wg) } } @@ -62,6 +67,8 @@ func createRandomWrongBlob(t testing.TB, repo restic.Repository) { // invert first data byte buf[0] ^= 0xff + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) _, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, id, false) if err != nil { t.Fatalf("SaveFrom() error %v", err) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 3ecf853d2..7f42dbdd5 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -41,8 +41,10 @@ type Repository struct { noAutoIndexUpdate bool - treePM *packerManager - dataPM *packerManager + packerWg *errgroup.Group + uploader *packerUploader + treePM *packerManager + dataPM *packerManager allocEnc sync.Once allocDec sync.Once @@ -100,11 +102,9 @@ func (c *CompressionMode) Type() string { // New returns a new repository with backend be. func New(be restic.Backend, opts Options) *Repository { repo := &Repository{ - be: be, - opts: opts, - idx: NewMasterIndex(), - dataPM: newPackerManager(be, nil), - treePM: newPackerManager(be, nil), + be: be, + opts: opts, + idx: NewMasterIndex(), } return repo @@ -416,31 +416,7 @@ func (r *Repository) saveAndEncrypt(ctx context.Context, t restic.BlobType, data panic(fmt.Sprintf("invalid type: %v", t)) } - packer, err := pm.findPacker() - if err != nil { - return 0, err - } - - // save ciphertext - size, err = packer.Add(t, id, ciphertext, uncompressedLength) - if err != nil { - return 0, err - } - - // if the pack is not full enough, put back to the list - if packer.Size() < minPackSize { - debug.Log("pack is not full enough (%d bytes)", packer.Size()) - pm.insertPacker(packer) - return size, nil - } - - // else write the pack to the backend - hdrSize, err := r.savePacker(ctx, t, packer) - if err != nil { - return 0, err - } - - return size + hdrSize, nil + return pm.SaveBlob(ctx, t, id, ciphertext, uncompressedLength) } // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the @@ -536,31 +512,45 @@ func (r *Repository) Flush(ctx context.Context) error { return r.idx.SaveIndex(ctx, r) } -// flushPacks saves all remaining packs. +func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) { + if r.packerWg != nil { + panic("uploader already started") + } + + innerWg, ctx := errgroup.WithContext(ctx) + r.packerWg = innerWg + r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) + r.treePM = newPackerManager(r.key, r.be.Hasher, restic.TreeBlob, r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, r.be.Hasher, restic.DataBlob, r.uploader.QueuePacker) + + wg.Go(func() error { + return innerWg.Wait() + }) +} + +// FlushPacks saves all remaining packs. func (r *Repository) flushPacks(ctx context.Context) error { - pms := []struct { - t restic.BlobType - pm *packerManager - }{ - {restic.DataBlob, r.dataPM}, - {restic.TreeBlob, r.treePM}, + if r.packerWg == nil { + return nil } - for _, p := range pms { - p.pm.pm.Lock() - - debug.Log("manually flushing %d packs", len(p.pm.packers)) - for _, packer := range p.pm.packers { - _, err := r.savePacker(ctx, p.t, packer) - if err != nil { - p.pm.pm.Unlock() - return err - } - } - p.pm.packers = p.pm.packers[:0] - p.pm.pm.Unlock() + err := r.treePM.Flush(ctx) + if err != nil { + return err } - return nil + err = r.dataPM.Flush(ctx) + if err != nil { + return err + } + r.uploader.TriggerShutdown() + err = r.packerWg.Wait() + + r.treePM = nil + r.dataPM = nil + r.uploader = nil + r.packerWg = nil + + return err } // Backend returns the backend for the repository. @@ -715,8 +705,6 @@ func (r *Repository) SearchKey(ctx context.Context, password string, maxKeys int } r.key = key.master - r.dataPM.key = key.master - r.treePM.key = key.master r.keyName = key.Name() cfg, err := restic.LoadConfig(ctx, r) if err == crypto.ErrUnauthenticated { @@ -768,8 +756,6 @@ func (r *Repository) init(ctx context.Context, password string, cfg restic.Confi } r.key = key.master - r.dataPM.key = key.master - r.treePM.key = key.master r.keyName = key.Name() r.setConfig(cfg) _, err = r.SaveJSONUnpacked(ctx, restic.ConfigFile, cfg) diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index f0e25f520..38d3117a5 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -22,6 +22,7 @@ import ( "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20} @@ -43,6 +44,9 @@ func testSave(t *testing.T, version uint) { id := restic.Hash(data) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + // save sid, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}, false) rtest.OK(t, err) @@ -82,6 +86,9 @@ func testSaveFrom(t *testing.T, version uint) { id := restic.Hash(data) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + // save id2, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, id, false) rtest.OK(t, err) @@ -187,6 +194,9 @@ func testLoadBlob(t *testing.T, version uint) { _, err := io.ReadFull(rnd, buf) rtest.OK(t, err) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(t, err) rtest.OK(t, repo.Flush(context.Background())) @@ -220,6 +230,9 @@ func benchmarkLoadBlob(b *testing.B, version uint) { _, err := io.ReadFull(rnd, buf) rtest.OK(b, err) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(b, err) rtest.OK(b, repo.Flush(context.Background())) @@ -389,6 +402,9 @@ func benchmarkLoadIndex(b *testing.B, version uint) { // saveRandomDataBlobs generates random data blobs and saves them to the repository. func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) { + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + for i := 0; i < num; i++ { size := rand.Int() % sizeMax diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 35fdbabcb..73f53adb1 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -5,6 +5,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/ui/progress" + "golang.org/x/sync/errgroup" ) // Repository stores data in a backend. It provides high-level functions and @@ -34,6 +35,10 @@ type Repository interface { // the the pack header. ListPack(context.Context, ID, int64) ([]Blob, uint32, error) + // StartPackUploader start goroutines to upload new pack files. The errgroup + // is used to immediately notify about an upload error. Flush() will also return + // that error. + StartPackUploader(ctx context.Context, wg *errgroup.Group) Flush(context.Context) error SaveUnpacked(context.Context, FileType, []byte) (ID, error) diff --git a/internal/restic/testing.go b/internal/restic/testing.go index 54621c183..cc5e7d890 100644 --- a/internal/restic/testing.go +++ b/internal/restic/testing.go @@ -10,6 +10,7 @@ import ( "time" "github.com/restic/chunker" + "golang.org/x/sync/errgroup" ) // fakeFile returns a reader which yields deterministic pseudo-random data. @@ -169,9 +170,17 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, rand: rand.New(rand.NewSource(seed)), } + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + treeID := fs.saveTree(context.TODO(), seed, depth) snapshot.Tree = &treeID + err = repo.Flush(context.Background()) + if err != nil { + t.Fatal(err) + } + id, err := repo.SaveJSONUnpacked(context.TODO(), SnapshotFile, snapshot) if err != nil { t.Fatal(err) @@ -181,11 +190,6 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, t.Logf("saved snapshot %v", id.Str()) - err = repo.Flush(context.Background()) - if err != nil { - t.Fatal(err) - } - return snapshot } diff --git a/internal/restic/tree_test.go b/internal/restic/tree_test.go index 598a0ad4b..0f9b9a9d3 100644 --- a/internal/restic/tree_test.go +++ b/internal/restic/tree_test.go @@ -12,6 +12,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var testFiles = []struct { @@ -98,6 +99,8 @@ func TestLoadTree(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) // save tree tree := restic.NewTree(0) id, err := repo.SaveTree(context.TODO(), tree) diff --git a/internal/restorer/restorer_test.go b/internal/restorer/restorer_test.go index 7e7e0c4c6..512c144ab 100644 --- a/internal/restorer/restorer_test.go +++ b/internal/restorer/restorer_test.go @@ -15,6 +15,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) type Node interface{} @@ -122,8 +123,9 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot) (*res ctx, cancel := context.WithCancel(context.Background()) defer cancel() + wg, wgCtx := errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) treeID := saveDir(t, repo, snapshot.Nodes, 1000) - err := repo.Flush(ctx) if err != nil { t.Fatal(err)