diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index 93810ce0f..42cabaa68 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" ) var cmdRecover = &cobra.Command{ @@ -131,14 +132,26 @@ func runRecover(gopts GlobalOptions) error { } } - treeID, err := repo.SaveTree(gopts.ctx, tree) - if err != nil { - return errors.Fatalf("unable to save new tree to the repo: %v", err) - } + wg, ctx := errgroup.WithContext(gopts.ctx) + repo.StartPackUploader(ctx, wg) - err = repo.Flush(gopts.ctx) + var treeID restic.ID + wg.Go(func() error { + var err error + treeID, err = repo.SaveTree(ctx, tree) + if err != nil { + return errors.Fatalf("unable to save new tree to the repo: %v", err) + } + + err = repo.Flush(ctx) + if err != nil { + return errors.Fatalf("unable to save blobs to the repo: %v", err) + } + return nil + }) + err = wg.Wait() if err != nil { - return errors.Fatalf("unable to save blobs to the repo: %v", err) + return err } return createSnapshot(gopts.ctx, "/recover", hostname, []string{"recovered"}, repo, &treeID) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 43153030a..90b9e6904 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -801,40 +801,47 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return nil, restic.ID{}, err } - wg, wgCtx := errgroup.WithContext(ctx) - start := time.Now() - var rootTreeID restic.ID - var stats ItemStats - wg.Go(func() error { - arch.runWorkers(wgCtx, wg) - debug.Log("starting snapshot") - tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) + wgUp, wgUpCtx := errgroup.WithContext(ctx) + arch.Repo.StartPackUploader(wgUpCtx, wgUp) + + wgUp.Go(func() error { + wg, wgCtx := errgroup.WithContext(wgUpCtx) + start := time.Now() + + var stats ItemStats + wg.Go(func() error { + arch.runWorkers(wgCtx, wg) + + debug.Log("starting snapshot") + tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) + if err != nil { + return err + } + + if len(tree.Nodes) == 0 { + return errors.New("snapshot is empty") + } + + rootTreeID, stats, err = arch.saveTree(wgCtx, tree) + arch.stopWorkers() + return err + }) + + err = wg.Wait() + debug.Log("err is %v", err) + if err != nil { + debug.Log("error while saving tree: %v", err) return err } - if len(tree.Nodes) == 0 { - return errors.New("snapshot is empty") - } + arch.CompleteItem("/", nil, nil, stats, time.Since(start)) - rootTreeID, stats, err = arch.saveTree(wgCtx, tree) - arch.stopWorkers() - return err + return arch.Repo.Flush(ctx) }) - - err = wg.Wait() - debug.Log("err is %v", err) - - if err != nil { - debug.Log("error while saving tree: %v", err) - return nil, restic.ID{}, err - } - - arch.CompleteItem("/", nil, nil, stats, time.Since(start)) - - err = arch.Repo.Flush(ctx) + err = wgUp.Wait() if err != nil { return nil, restic.ID{}, err } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index dfc24b27b..1ae126014 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -42,6 +42,7 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo rest func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch := New(repo, filesystem, Options{}) arch.runWorkers(ctx, wg) @@ -213,6 +214,7 @@ func TestArchiverSave(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { @@ -281,6 +283,7 @@ func TestArchiverSaveReaderFS(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) ts := time.Now() filename := "xx" @@ -830,6 +833,7 @@ func TestArchiverSaveDir(t *testing.T) { defer cleanup() wg, ctx := errgroup.WithContext(context.Background()) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.runWorkers(ctx, wg) @@ -914,6 +918,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) { // archiver did save the same tree several times for i := 0; i < 5; i++ { wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.runWorkers(ctx, wg) @@ -1097,6 +1102,7 @@ func TestArchiverSaveTree(t *testing.T) { } wg, ctx := errgroup.WithContext(context.TODO()) + repo.StartPackUploader(ctx, wg) arch.runWorkers(ctx, wg) @@ -2239,6 +2245,7 @@ func TestRacyFileSwap(t *testing.T) { defer cancel() wg, ctx := errgroup.WithContext(ctx) + repo.StartPackUploader(ctx, wg) arch := New(repo, fs.Track{FS: statfs}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index f658613c3..a4245d574 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -20,6 +20,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var checkerTestData = filepath.Join("testdata", "checker-test-repo.tar.gz") @@ -476,6 +477,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { Nodes: []*restic.Node{damagedNode}, } + wg, wgCtx := errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) id, err := repo.SaveTree(ctx, damagedTree) test.OK(t, repo.Flush(ctx)) test.OK(t, err) @@ -483,6 +486,8 @@ func TestCheckerBlobTypeConfusion(t *testing.T) { buf, err := repo.LoadBlob(ctx, restic.TreeBlob, id, nil) test.OK(t, err) + wg, wgCtx = errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) _, _, _, err = repo.SaveBlob(ctx, restic.DataBlob, buf, id, false) test.OK(t, err) diff --git a/internal/pack/pack.go b/internal/pack/pack.go index cf0093e70..1d991ccb5 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -70,14 +70,13 @@ type compressedHeaderEntry struct { } // Finalize writes the header for all added blobs and finalizes the pack. -// Returned are the number of bytes written, not yet reported by Add. -func (p *Packer) Finalize() (int, error) { +func (p *Packer) Finalize() error { p.m.Lock() defer p.m.Unlock() header, err := p.makeHeader() if err != nil { - return 0, err + return err } encryptedHeader := make([]byte, 0, restic.CiphertextLength(len(header))) @@ -88,22 +87,27 @@ func (p *Packer) Finalize() (int, error) { // append the header n, err := p.wr.Write(encryptedHeader) if err != nil { - return 0, errors.Wrap(err, "Write") + return errors.Wrap(err, "Write") } hdrBytes := len(encryptedHeader) if n != hdrBytes { - return 0, errors.New("wrong number of bytes written") + return errors.New("wrong number of bytes written") } // write length err = binary.Write(p.wr, binary.LittleEndian, uint32(hdrBytes)) if err != nil { - return 0, errors.Wrap(err, "binary.Write") + return errors.Wrap(err, "binary.Write") } p.bytes += uint(hdrBytes + binary.Size(uint32(0))) - return restic.CiphertextLength(0) + binary.Size(uint32(0)), nil + return nil +} + +// HeaderOverhead returns an estimate of the number of bytes written by a call to Finalize. +func (p *Packer) HeaderOverhead() int { + return restic.CiphertextLength(0) + binary.Size(uint32(0)) } // makeHeader constructs the header for p. diff --git a/internal/pack/pack_test.go b/internal/pack/pack_test.go index 6170e807c..7c8250613 100644 --- a/internal/pack/pack_test.go +++ b/internal/pack/pack_test.go @@ -42,7 +42,7 @@ func newPack(t testing.TB, k *crypto.Key, lengths []int) ([]Buf, []byte, uint) { rtest.OK(t, err) } - _, err := p.Finalize() + err := p.Finalize() rtest.OK(t, err) return bufs, buf.Bytes(), p.Size() diff --git a/internal/repository/fuzz_test.go b/internal/repository/fuzz_test.go index 3847f37f7..7a98477b6 100644 --- a/internal/repository/fuzz_test.go +++ b/internal/repository/fuzz_test.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) // Test saving a blob and loading it again, with varying buffer sizes. @@ -23,6 +24,9 @@ func FuzzSaveLoadBlob(f *testing.F) { id := restic.Hash(blob) repo, _ := TestRepositoryWithBackend(t, mem.New(), 2) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + _, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, blob, id, false) if err != nil { t.Fatal(err) diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index c7acec7f1..e3b43b24e 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -20,12 +20,6 @@ import ( "github.com/minio/sha256-simd" ) -// Saver implements saving data in a backend. -type Saver interface { - Save(context.Context, restic.Handle, restic.RewindReader) error - Hasher() hash.Hash -} - // Packer holds a pack.Packer together with a hash writer. type Packer struct { *pack.Packer @@ -36,8 +30,11 @@ type Packer struct { // packerManager keeps a list of open packs and creates new on demand. type packerManager struct { - be Saver - key *crypto.Key + tpe restic.BlobType + key *crypto.Key + hasherFn func() hash.Hash + queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error + pm sync.Mutex packers []*Packer } @@ -46,13 +43,58 @@ const minPackSize = 4 * 1024 * 1024 // newPackerManager returns an new packer manager which writes temporary files // to a temporary directory -func newPackerManager(be Saver, key *crypto.Key) *packerManager { +func newPackerManager(key *crypto.Key, hasherFn func() hash.Hash, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { return &packerManager{ - be: be, - key: key, + tpe: tpe, + key: key, + hasherFn: hasherFn, + queueFn: queueFn, } } +func (r *packerManager) Flush(ctx context.Context) error { + r.pm.Lock() + defer r.pm.Unlock() + + debug.Log("manually flushing %d packs", len(r.packers)) + for _, packer := range r.packers { + err := r.queueFn(ctx, r.tpe, packer) + if err != nil { + return err + } + } + r.packers = r.packers[:0] + return nil +} + +func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) { + packer, err := r.findPacker() + if err != nil { + return 0, err + } + + // save ciphertext + size, err := packer.Add(t, id, ciphertext, uncompressedLength) + if err != nil { + return 0, err + } + + // if the pack is not full enough, put back to the list + if packer.Size() < minPackSize { + debug.Log("pack is not full enough (%d bytes)", packer.Size()) + r.insertPacker(packer) + return size, nil + } + + // else write the pack to the backend + err = r.queueFn(ctx, t, packer) + if err != nil { + return 0, err + } + + return size + packer.HeaderOverhead(), nil +} + // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. func (r *packerManager) findPacker() (packer *Packer, err error) { @@ -77,7 +119,7 @@ func (r *packerManager) findPacker() (packer *Packer, err error) { } w := io.Writer(tmpfile) - beHasher := r.be.Hasher() + beHasher := r.hasherFn() var beHw *hashing.Writer if beHasher != nil { beHw = hashing.NewWriter(w, beHasher) @@ -106,11 +148,11 @@ func (r *packerManager) insertPacker(p *Packer) { } // savePacker stores p in the backend. -func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) (int, error) { +func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error { debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size()) - hdrOverhead, err := p.Packer.Finalize() + err := p.Packer.Finalize() if err != nil { - return 0, err + return err } id := restic.IDFromHash(p.hw.Sum(nil)) @@ -122,27 +164,27 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe } rd, err := restic.NewFileReader(p.tmpfile, beHash) if err != nil { - return 0, err + return err } err = r.be.Save(ctx, h, rd) if err != nil { debug.Log("Save(%v) error: %v", h, err) - return 0, err + return err } debug.Log("saved as %v", h) err = p.tmpfile.Close() if err != nil { - return 0, errors.Wrap(err, "close tempfile") + return errors.Wrap(err, "close tempfile") } // on windows the tempfile is automatically deleted on close if runtime.GOOS != "windows" { err = fs.RemoveIfExists(p.tmpfile.Name()) if err != nil { - return 0, errors.Wrap(err, "Remove") + return errors.Wrap(err, "Remove") } } @@ -152,9 +194,9 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe // Save index if full if r.noAutoIndexUpdate { - return hdrOverhead, nil + return nil } - return hdrOverhead, r.idx.SaveFullIndex(ctx, r) + return r.idx.SaveFullIndex(ctx, r) } // countPacker returns the number of open (unfinished) packers. diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 633e1be35..8713aad4e 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -2,6 +2,7 @@ package repository import ( "context" + "hash" "io" "math/rand" "os" @@ -15,6 +16,12 @@ import ( "github.com/restic/restic/internal/restic" ) +// Saver implements saving data in a backend. +type Saver interface { + Save(context.Context, restic.Handle, restic.RewindReader) error + Hasher() hash.Hash +} + func randomID(rd io.Reader) restic.ID { id := restic.ID{} _, err := io.ReadFull(rd, id[:]) @@ -84,7 +91,7 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf [] continue } - _, err = packer.Finalize() + err = packer.Finalize() if err != nil { t.Fatal(err) } @@ -103,11 +110,11 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf [] func flushRemainingPacks(t testing.TB, be Saver, pm *packerManager) (bytes int) { if pm.countPacker() > 0 { for _, packer := range pm.packers { - n, err := packer.Finalize() + err := packer.Finalize() if err != nil { t.Fatal(err) } - bytes += n + bytes += packer.HeaderOverhead() packID := restic.IDFromHash(packer.hw.Sum(nil)) var beHash []byte @@ -137,7 +144,7 @@ func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) be := mem.New() - pm := newPackerManager(be, crypto.NewRandomKey()) + pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) blobBuf := make([]byte, maxBlobSize) @@ -167,7 +174,7 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(be, crypto.NewRandomKey()) + pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) fillPacks(t, rnd, be, pm, blobBuf) flushRemainingPacks(t, be, pm) } diff --git a/internal/repository/packer_uploader.go b/internal/repository/packer_uploader.go new file mode 100644 index 000000000..30c8f77af --- /dev/null +++ b/internal/repository/packer_uploader.go @@ -0,0 +1,63 @@ +package repository + +import ( + "context" + + "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" +) + +// SavePacker implements saving a pack in the repository. +type SavePacker interface { + savePacker(ctx context.Context, t restic.BlobType, p *Packer) error +} + +type uploadTask struct { + packer *Packer + tpe restic.BlobType +} + +type packerUploader struct { + uploadQueue chan uploadTask +} + +func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, connections uint) *packerUploader { + pu := &packerUploader{ + uploadQueue: make(chan uploadTask), + } + + for i := 0; i < int(connections); i++ { + wg.Go(func() error { + for { + select { + case t, ok := <-pu.uploadQueue: + if !ok { + return nil + } + err := repo.savePacker(ctx, t.tpe, t.packer) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + return pu +} + +func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *Packer) (err error) { + select { + case <-ctx.Done(): + return ctx.Err() + case pu.uploadQueue <- uploadTask{tpe: t, packer: p}: + } + + return nil +} + +func (pu *packerUploader) TriggerShutdown() { + close(pu.uploadQueue) +} diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 80902c11c..7840e714a 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -28,9 +28,25 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito return nil, errors.Fatal("repack step requires a backend connection limit of at least two") } - var keepMutex sync.Mutex wg, wgCtx := errgroup.WithContext(ctx) + dstRepo.StartPackUploader(wgCtx, wg) + wg.Go(func() error { + var err error + obsoletePacks, err = repack(wgCtx, repo, dstRepo, packs, keepBlobs, p) + return err + }) + + if err := wg.Wait(); err != nil { + return nil, err + } + return obsoletePacks, nil +} + +func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { + wg, wgCtx := errgroup.WithContext(ctx) + + var keepMutex sync.Mutex downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { defer close(downloadQueue) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 248477292..9389d643e 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -8,6 +8,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) func randomSize(min, max int) int { @@ -15,6 +16,9 @@ func randomSize(min, max int) int { } func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData float32) { + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + for i := 0; i < blobs; i++ { var ( tpe restic.BlobType @@ -46,6 +50,7 @@ func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData fl if err = repo.Flush(context.Background()); err != nil { t.Fatalf("repo.Flush() returned error %v", err) } + repo.StartPackUploader(context.TODO(), &wg) } } @@ -62,6 +67,8 @@ func createRandomWrongBlob(t testing.TB, repo restic.Repository) { // invert first data byte buf[0] ^= 0xff + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) _, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, id, false) if err != nil { t.Fatalf("SaveFrom() error %v", err) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 3ecf853d2..7f42dbdd5 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -41,8 +41,10 @@ type Repository struct { noAutoIndexUpdate bool - treePM *packerManager - dataPM *packerManager + packerWg *errgroup.Group + uploader *packerUploader + treePM *packerManager + dataPM *packerManager allocEnc sync.Once allocDec sync.Once @@ -100,11 +102,9 @@ func (c *CompressionMode) Type() string { // New returns a new repository with backend be. func New(be restic.Backend, opts Options) *Repository { repo := &Repository{ - be: be, - opts: opts, - idx: NewMasterIndex(), - dataPM: newPackerManager(be, nil), - treePM: newPackerManager(be, nil), + be: be, + opts: opts, + idx: NewMasterIndex(), } return repo @@ -416,31 +416,7 @@ func (r *Repository) saveAndEncrypt(ctx context.Context, t restic.BlobType, data panic(fmt.Sprintf("invalid type: %v", t)) } - packer, err := pm.findPacker() - if err != nil { - return 0, err - } - - // save ciphertext - size, err = packer.Add(t, id, ciphertext, uncompressedLength) - if err != nil { - return 0, err - } - - // if the pack is not full enough, put back to the list - if packer.Size() < minPackSize { - debug.Log("pack is not full enough (%d bytes)", packer.Size()) - pm.insertPacker(packer) - return size, nil - } - - // else write the pack to the backend - hdrSize, err := r.savePacker(ctx, t, packer) - if err != nil { - return 0, err - } - - return size + hdrSize, nil + return pm.SaveBlob(ctx, t, id, ciphertext, uncompressedLength) } // SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the @@ -536,31 +512,45 @@ func (r *Repository) Flush(ctx context.Context) error { return r.idx.SaveIndex(ctx, r) } -// flushPacks saves all remaining packs. +func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) { + if r.packerWg != nil { + panic("uploader already started") + } + + innerWg, ctx := errgroup.WithContext(ctx) + r.packerWg = innerWg + r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) + r.treePM = newPackerManager(r.key, r.be.Hasher, restic.TreeBlob, r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, r.be.Hasher, restic.DataBlob, r.uploader.QueuePacker) + + wg.Go(func() error { + return innerWg.Wait() + }) +} + +// FlushPacks saves all remaining packs. func (r *Repository) flushPacks(ctx context.Context) error { - pms := []struct { - t restic.BlobType - pm *packerManager - }{ - {restic.DataBlob, r.dataPM}, - {restic.TreeBlob, r.treePM}, + if r.packerWg == nil { + return nil } - for _, p := range pms { - p.pm.pm.Lock() - - debug.Log("manually flushing %d packs", len(p.pm.packers)) - for _, packer := range p.pm.packers { - _, err := r.savePacker(ctx, p.t, packer) - if err != nil { - p.pm.pm.Unlock() - return err - } - } - p.pm.packers = p.pm.packers[:0] - p.pm.pm.Unlock() + err := r.treePM.Flush(ctx) + if err != nil { + return err } - return nil + err = r.dataPM.Flush(ctx) + if err != nil { + return err + } + r.uploader.TriggerShutdown() + err = r.packerWg.Wait() + + r.treePM = nil + r.dataPM = nil + r.uploader = nil + r.packerWg = nil + + return err } // Backend returns the backend for the repository. @@ -715,8 +705,6 @@ func (r *Repository) SearchKey(ctx context.Context, password string, maxKeys int } r.key = key.master - r.dataPM.key = key.master - r.treePM.key = key.master r.keyName = key.Name() cfg, err := restic.LoadConfig(ctx, r) if err == crypto.ErrUnauthenticated { @@ -768,8 +756,6 @@ func (r *Repository) init(ctx context.Context, password string, cfg restic.Confi } r.key = key.master - r.dataPM.key = key.master - r.treePM.key = key.master r.keyName = key.Name() r.setConfig(cfg) _, err = r.SaveJSONUnpacked(ctx, restic.ConfigFile, cfg) diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index f0e25f520..38d3117a5 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -22,6 +22,7 @@ import ( "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20} @@ -43,6 +44,9 @@ func testSave(t *testing.T, version uint) { id := restic.Hash(data) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + // save sid, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, restic.ID{}, false) rtest.OK(t, err) @@ -82,6 +86,9 @@ func testSaveFrom(t *testing.T, version uint) { id := restic.Hash(data) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + // save id2, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, data, id, false) rtest.OK(t, err) @@ -187,6 +194,9 @@ func testLoadBlob(t *testing.T, version uint) { _, err := io.ReadFull(rnd, buf) rtest.OK(t, err) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(t, err) rtest.OK(t, repo.Flush(context.Background())) @@ -220,6 +230,9 @@ func benchmarkLoadBlob(b *testing.B, version uint) { _, err := io.ReadFull(rnd, buf) rtest.OK(b, err) + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + id, _, _, err := repo.SaveBlob(context.TODO(), restic.DataBlob, buf, restic.ID{}, false) rtest.OK(b, err) rtest.OK(b, repo.Flush(context.Background())) @@ -389,6 +402,9 @@ func benchmarkLoadIndex(b *testing.B, version uint) { // saveRandomDataBlobs generates random data blobs and saves them to the repository. func saveRandomDataBlobs(t testing.TB, repo restic.Repository, num int, sizeMax int) { + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + for i := 0; i < num; i++ { size := rand.Int() % sizeMax diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 35fdbabcb..73f53adb1 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -5,6 +5,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/ui/progress" + "golang.org/x/sync/errgroup" ) // Repository stores data in a backend. It provides high-level functions and @@ -34,6 +35,10 @@ type Repository interface { // the the pack header. ListPack(context.Context, ID, int64) ([]Blob, uint32, error) + // StartPackUploader start goroutines to upload new pack files. The errgroup + // is used to immediately notify about an upload error. Flush() will also return + // that error. + StartPackUploader(ctx context.Context, wg *errgroup.Group) Flush(context.Context) error SaveUnpacked(context.Context, FileType, []byte) (ID, error) diff --git a/internal/restic/testing.go b/internal/restic/testing.go index 54621c183..cc5e7d890 100644 --- a/internal/restic/testing.go +++ b/internal/restic/testing.go @@ -10,6 +10,7 @@ import ( "time" "github.com/restic/chunker" + "golang.org/x/sync/errgroup" ) // fakeFile returns a reader which yields deterministic pseudo-random data. @@ -169,9 +170,17 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, rand: rand.New(rand.NewSource(seed)), } + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) + treeID := fs.saveTree(context.TODO(), seed, depth) snapshot.Tree = &treeID + err = repo.Flush(context.Background()) + if err != nil { + t.Fatal(err) + } + id, err := repo.SaveJSONUnpacked(context.TODO(), SnapshotFile, snapshot) if err != nil { t.Fatal(err) @@ -181,11 +190,6 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, t.Logf("saved snapshot %v", id.Str()) - err = repo.Flush(context.Background()) - if err != nil { - t.Fatal(err) - } - return snapshot } diff --git a/internal/restic/tree_test.go b/internal/restic/tree_test.go index 598a0ad4b..0f9b9a9d3 100644 --- a/internal/restic/tree_test.go +++ b/internal/restic/tree_test.go @@ -12,6 +12,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) var testFiles = []struct { @@ -98,6 +99,8 @@ func TestLoadTree(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() + var wg errgroup.Group + repo.StartPackUploader(context.TODO(), &wg) // save tree tree := restic.NewTree(0) id, err := repo.SaveTree(context.TODO(), tree) diff --git a/internal/restorer/restorer_test.go b/internal/restorer/restorer_test.go index 7e7e0c4c6..512c144ab 100644 --- a/internal/restorer/restorer_test.go +++ b/internal/restorer/restorer_test.go @@ -15,6 +15,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" ) type Node interface{} @@ -122,8 +123,9 @@ func saveSnapshot(t testing.TB, repo restic.Repository, snapshot Snapshot) (*res ctx, cancel := context.WithCancel(context.Background()) defer cancel() + wg, wgCtx := errgroup.WithContext(ctx) + repo.StartPackUploader(wgCtx, wg) treeID := saveDir(t, repo, snapshot.Nodes, 1000) - err := repo.Flush(ctx) if err != nil { t.Fatal(err)