From 537b4c310a0c37a4dd308e229aaeb9ed312265d5 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 12 Sep 2021 00:03:41 +0200 Subject: [PATCH] copy: Implement by reusing repack The repack operation copies all selected blobs from a set of pack files into new pack files. For prune the source and destination repositories are identical. To implement copy, just use a different source and destination repository. --- changelog/unreleased/pull-3513 | 10 ++++ cmd/restic/cmd_copy.go | 86 +++++++++++------------------- cmd/restic/cmd_prune.go | 2 +- internal/repository/repack.go | 8 +-- internal/repository/repack_test.go | 43 ++++++++++++++- 5 files changed, 86 insertions(+), 63 deletions(-) create mode 100644 changelog/unreleased/pull-3513 diff --git a/changelog/unreleased/pull-3513 b/changelog/unreleased/pull-3513 new file mode 100644 index 000000000..fe7717bd7 --- /dev/null +++ b/changelog/unreleased/pull-3513 @@ -0,0 +1,10 @@ +Enhancement: Improve speed of copy command + +The copy command could require a long time to copy snapshots for non-local +backends. This has been improved to provide a throughput comparable to the +restore command. + +In addition, the command now displays a progress bar. + +https://github.com/restic/restic/issues/2923 +https://github.com/restic/restic/pull/3513 diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 4d7a192e0..048210ba8 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "golang.org/x/sync/errgroup" @@ -131,17 +132,11 @@ func runCopy(opts CopyOptions, gopts GlobalOptions, args []string) error { } } Verbosef(" copy started, this may take a while...\n") - - if err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree); err != nil { + if err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree, gopts.Quiet); err != nil { return err } debug.Log("tree copied") - if err = dstRepo.Flush(ctx); err != nil { - return err - } - debug.Log("flushed packs and saved index") - // save snapshot sn.Parent = nil // Parent does not have relevance in the new repo. // Use Original as a persistent snapshot ID @@ -176,82 +171,61 @@ func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool { return true } -const numCopyWorkers = 8 - func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository, - visitedTrees restic.IDSet, rootTreeID restic.ID) error { + visitedTrees restic.IDSet, rootTreeID restic.ID, quiet bool) error { - idChan := make(chan restic.ID) - wg, ctx := errgroup.WithContext(ctx) + wg, wgCtx := errgroup.WithContext(ctx) - treeStream := restic.StreamTrees(ctx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool { + treeStream := restic.StreamTrees(wgCtx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool { visited := visitedTrees.Has(treeID) visitedTrees.Insert(treeID) return visited }, nil) + copyBlobs := restic.NewBlobSet() + packList := restic.NewIDSet() + + enqueue := func(h restic.BlobHandle) { + pb := srcRepo.Index().Lookup(h) + copyBlobs.Insert(h) + for _, p := range pb { + packList.Insert(p.PackID) + } + } + wg.Go(func() error { - defer close(idChan) - // reused buffer - var buf []byte for tree := range treeStream { if tree.Error != nil { return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error) } // Do we already have this tree blob? - if !dstRepo.Index().Has(restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}) { + treeHandle := restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob} + if !dstRepo.Index().Has(treeHandle) { // copy raw tree bytes to avoid problems if the serialization changes - var err error - buf, err = srcRepo.LoadBlob(ctx, restic.TreeBlob, tree.ID, buf) - if err != nil { - return fmt.Errorf("LoadBlob(%v) for tree returned error %v", tree.ID, err) - } - - _, _, err = dstRepo.SaveBlob(ctx, restic.TreeBlob, buf, tree.ID, false) - if err != nil { - return fmt.Errorf("SaveBlob(%v) for tree returned error %v", tree.ID.Str(), err) - } + enqueue(treeHandle) } for _, entry := range tree.Nodes { // Recursion into directories is handled by StreamTrees // Copy the blobs for this file. for _, blobID := range entry.Content { - select { - case idChan <- blobID: - case <-ctx.Done(): - return ctx.Err() + h := restic.BlobHandle{Type: restic.DataBlob, ID: blobID} + if !dstRepo.Index().Has(h) { + enqueue(h) } } } } return nil }) - - for i := 0; i < numCopyWorkers; i++ { - wg.Go(func() error { - // reused buffer - var buf []byte - for blobID := range idChan { - // Do we already have this data blob? - if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { - continue - } - debug.Log("Copying blob %s\n", blobID.Str()) - var err error - buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf) - if err != nil { - return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) - } - - _, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false) - if err != nil { - return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) - } - } - return nil - }) + err := wg.Wait() + if err != nil { + return err } - return wg.Wait() + + bar := newProgressMax(!quiet, uint64(len(packList)), "packs copied") + _, err = repository.Repack(ctx, srcRepo, dstRepo, packList, copyBlobs, bar) + bar.Done() + return err } diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index 253e3704a..a03ae7474 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -500,7 +500,7 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB if len(repackPacks) != 0 { Verbosef("repacking packs\n") bar := newProgressMax(!gopts.Quiet, uint64(len(repackPacks)), "packs repacked") - _, err := repository.Repack(ctx, repo, repackPacks, keepBlobs, bar) + _, err := repository.Repack(ctx, repo, repo, repackPacks, keepBlobs, bar) bar.Done() if err != nil { return errors.Fatalf("%s", err) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 0734c8206..14fef3f20 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -20,7 +20,7 @@ const numRepackWorkers = 8 // // The map keepBlobs is modified by Repack, it is used to keep track of which // blobs have been processed. -func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { +func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs)) var keepMutex sync.Mutex @@ -29,7 +29,7 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { defer close(downloadQueue) - for pbs := range repo.Index().ListPacks(ctx, packs) { + for pbs := range repo.Index().ListPacks(wgCtx, packs) { var packBlobs []restic.Blob keepMutex.Lock() // filter out unnecessary blobs @@ -70,7 +70,7 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee } // We do want to save already saved blobs! - _, _, err = repo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true) + _, _, err = dstRepo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true) if err != nil { return err } @@ -94,7 +94,7 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee return nil, err } - if err := repo.Flush(ctx); err != nil { + if err := dstRepo.Flush(ctx); err != nil { return nil, err } diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 108c167d9..e40f5f6af 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -142,7 +142,7 @@ func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSe } func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs restic.BlobSet) { - repackedBlobs, err := repository.Repack(context.TODO(), repo, packs, blobs, nil) + repackedBlobs, err := repository.Repack(context.TODO(), repo, repo, packs, blobs, nil) if err != nil { t.Fatal(err) } @@ -278,6 +278,45 @@ func TestRepack(t *testing.T) { } } +func TestRepackCopy(t *testing.T) { + repo, cleanup := repository.TestRepository(t) + defer cleanup() + dstRepo, dstCleanup := repository.TestRepository(t) + defer dstCleanup() + + seed := time.Now().UnixNano() + rand.Seed(seed) + t.Logf("rand seed is %v", seed) + + createRandomBlobs(t, repo, 100, 0.7) + saveIndex(t, repo) + + _, keepBlobs := selectBlobs(t, repo, 0.2) + copyPacks := findPacksForBlobs(t, repo, keepBlobs) + + _, err := repository.Repack(context.TODO(), repo, dstRepo, copyPacks, keepBlobs, nil) + if err != nil { + t.Fatal(err) + } + rebuildIndex(t, dstRepo) + reloadIndex(t, dstRepo) + + idx := dstRepo.Index() + + for h := range keepBlobs { + list := idx.Lookup(h) + if len(list) == 0 { + t.Errorf("unable to find blob %v in repo", h.ID.Str()) + continue + } + + if len(list) != 1 { + t.Errorf("expected one pack in the list, got: %v", list) + continue + } + } +} + func TestRepackWrongBlob(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() @@ -293,7 +332,7 @@ func TestRepackWrongBlob(t *testing.T) { _, keepBlobs := selectBlobs(t, repo, 0) rewritePacks := findPacksForBlobs(t, repo, keepBlobs) - _, err := repository.Repack(context.TODO(), repo, rewritePacks, keepBlobs, nil) + _, err := repository.Repack(context.TODO(), repo, repo, rewritePacks, keepBlobs, nil) if err == nil { t.Fatal("expected repack to fail but got no error") }