forked from TrueCloudLab/restic
copy: Implement by reusing repack
The repack operation copies all selected blobs from a set of pack files into new pack files. For prune the source and destination repositories are identical. To implement copy, just use a different source and destination repository.
This commit is contained in:
parent
4d5db61bd0
commit
537b4c310a
5 changed files with 86 additions and 63 deletions
10
changelog/unreleased/pull-3513
Normal file
10
changelog/unreleased/pull-3513
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
Enhancement: Improve speed of copy command
|
||||||
|
|
||||||
|
The copy command could require a long time to copy snapshots for non-local
|
||||||
|
backends. This has been improved to provide a throughput comparable to the
|
||||||
|
restore command.
|
||||||
|
|
||||||
|
In addition, the command now displays a progress bar.
|
||||||
|
|
||||||
|
https://github.com/restic/restic/issues/2923
|
||||||
|
https://github.com/restic/restic/pull/3513
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
|
"github.com/restic/restic/internal/repository"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
@ -131,17 +132,11 @@ func runCopy(opts CopyOptions, gopts GlobalOptions, args []string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Verbosef(" copy started, this may take a while...\n")
|
Verbosef(" copy started, this may take a while...\n")
|
||||||
|
if err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree, gopts.Quiet); err != nil {
|
||||||
if err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
debug.Log("tree copied")
|
debug.Log("tree copied")
|
||||||
|
|
||||||
if err = dstRepo.Flush(ctx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
debug.Log("flushed packs and saved index")
|
|
||||||
|
|
||||||
// save snapshot
|
// save snapshot
|
||||||
sn.Parent = nil // Parent does not have relevance in the new repo.
|
sn.Parent = nil // Parent does not have relevance in the new repo.
|
||||||
// Use Original as a persistent snapshot ID
|
// Use Original as a persistent snapshot ID
|
||||||
|
@ -176,82 +171,61 @@ func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
const numCopyWorkers = 8
|
|
||||||
|
|
||||||
func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
|
func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
|
||||||
visitedTrees restic.IDSet, rootTreeID restic.ID) error {
|
visitedTrees restic.IDSet, rootTreeID restic.ID, quiet bool) error {
|
||||||
|
|
||||||
idChan := make(chan restic.ID)
|
wg, wgCtx := errgroup.WithContext(ctx)
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
|
||||||
|
|
||||||
treeStream := restic.StreamTrees(ctx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool {
|
treeStream := restic.StreamTrees(wgCtx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool {
|
||||||
visited := visitedTrees.Has(treeID)
|
visited := visitedTrees.Has(treeID)
|
||||||
visitedTrees.Insert(treeID)
|
visitedTrees.Insert(treeID)
|
||||||
return visited
|
return visited
|
||||||
}, nil)
|
}, nil)
|
||||||
|
|
||||||
|
copyBlobs := restic.NewBlobSet()
|
||||||
|
packList := restic.NewIDSet()
|
||||||
|
|
||||||
|
enqueue := func(h restic.BlobHandle) {
|
||||||
|
pb := srcRepo.Index().Lookup(h)
|
||||||
|
copyBlobs.Insert(h)
|
||||||
|
for _, p := range pb {
|
||||||
|
packList.Insert(p.PackID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
defer close(idChan)
|
|
||||||
// reused buffer
|
|
||||||
var buf []byte
|
|
||||||
for tree := range treeStream {
|
for tree := range treeStream {
|
||||||
if tree.Error != nil {
|
if tree.Error != nil {
|
||||||
return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error)
|
return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do we already have this tree blob?
|
// Do we already have this tree blob?
|
||||||
if !dstRepo.Index().Has(restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}) {
|
treeHandle := restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}
|
||||||
|
if !dstRepo.Index().Has(treeHandle) {
|
||||||
// copy raw tree bytes to avoid problems if the serialization changes
|
// copy raw tree bytes to avoid problems if the serialization changes
|
||||||
var err error
|
enqueue(treeHandle)
|
||||||
buf, err = srcRepo.LoadBlob(ctx, restic.TreeBlob, tree.ID, buf)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("LoadBlob(%v) for tree returned error %v", tree.ID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, _, err = dstRepo.SaveBlob(ctx, restic.TreeBlob, buf, tree.ID, false)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("SaveBlob(%v) for tree returned error %v", tree.ID.Str(), err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, entry := range tree.Nodes {
|
for _, entry := range tree.Nodes {
|
||||||
// Recursion into directories is handled by StreamTrees
|
// Recursion into directories is handled by StreamTrees
|
||||||
// Copy the blobs for this file.
|
// Copy the blobs for this file.
|
||||||
for _, blobID := range entry.Content {
|
for _, blobID := range entry.Content {
|
||||||
select {
|
h := restic.BlobHandle{Type: restic.DataBlob, ID: blobID}
|
||||||
case idChan <- blobID:
|
if !dstRepo.Index().Has(h) {
|
||||||
case <-ctx.Done():
|
enqueue(h)
|
||||||
return ctx.Err()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
err := wg.Wait()
|
||||||
for i := 0; i < numCopyWorkers; i++ {
|
|
||||||
wg.Go(func() error {
|
|
||||||
// reused buffer
|
|
||||||
var buf []byte
|
|
||||||
for blobID := range idChan {
|
|
||||||
// Do we already have this data blob?
|
|
||||||
if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
debug.Log("Copying blob %s\n", blobID.Str())
|
|
||||||
var err error
|
|
||||||
buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false)
|
bar := newProgressMax(!quiet, uint64(len(packList)), "packs copied")
|
||||||
if err != nil {
|
_, err = repository.Repack(ctx, srcRepo, dstRepo, packList, copyBlobs, bar)
|
||||||
return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err)
|
bar.Done()
|
||||||
}
|
return err
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return wg.Wait()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -500,7 +500,7 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB
|
||||||
if len(repackPacks) != 0 {
|
if len(repackPacks) != 0 {
|
||||||
Verbosef("repacking packs\n")
|
Verbosef("repacking packs\n")
|
||||||
bar := newProgressMax(!gopts.Quiet, uint64(len(repackPacks)), "packs repacked")
|
bar := newProgressMax(!gopts.Quiet, uint64(len(repackPacks)), "packs repacked")
|
||||||
_, err := repository.Repack(ctx, repo, repackPacks, keepBlobs, bar)
|
_, err := repository.Repack(ctx, repo, repo, repackPacks, keepBlobs, bar)
|
||||||
bar.Done()
|
bar.Done()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Fatalf("%s", err)
|
return errors.Fatalf("%s", err)
|
||||||
|
|
|
@ -20,7 +20,7 @@ const numRepackWorkers = 8
|
||||||
//
|
//
|
||||||
// The map keepBlobs is modified by Repack, it is used to keep track of which
|
// The map keepBlobs is modified by Repack, it is used to keep track of which
|
||||||
// blobs have been processed.
|
// blobs have been processed.
|
||||||
func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
|
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
|
||||||
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
|
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
|
||||||
|
|
||||||
var keepMutex sync.Mutex
|
var keepMutex sync.Mutex
|
||||||
|
@ -29,7 +29,7 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee
|
||||||
downloadQueue := make(chan restic.PackBlobs)
|
downloadQueue := make(chan restic.PackBlobs)
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
defer close(downloadQueue)
|
defer close(downloadQueue)
|
||||||
for pbs := range repo.Index().ListPacks(ctx, packs) {
|
for pbs := range repo.Index().ListPacks(wgCtx, packs) {
|
||||||
var packBlobs []restic.Blob
|
var packBlobs []restic.Blob
|
||||||
keepMutex.Lock()
|
keepMutex.Lock()
|
||||||
// filter out unnecessary blobs
|
// filter out unnecessary blobs
|
||||||
|
@ -70,7 +70,7 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee
|
||||||
}
|
}
|
||||||
|
|
||||||
// We do want to save already saved blobs!
|
// We do want to save already saved blobs!
|
||||||
_, _, err = repo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true)
|
_, _, err = dstRepo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -94,7 +94,7 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := repo.Flush(ctx); err != nil {
|
if err := dstRepo.Flush(ctx); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -142,7 +142,7 @@ func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSe
|
||||||
}
|
}
|
||||||
|
|
||||||
func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs restic.BlobSet) {
|
func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs restic.BlobSet) {
|
||||||
repackedBlobs, err := repository.Repack(context.TODO(), repo, packs, blobs, nil)
|
repackedBlobs, err := repository.Repack(context.TODO(), repo, repo, packs, blobs, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -278,6 +278,45 @@ func TestRepack(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRepackCopy(t *testing.T) {
|
||||||
|
repo, cleanup := repository.TestRepository(t)
|
||||||
|
defer cleanup()
|
||||||
|
dstRepo, dstCleanup := repository.TestRepository(t)
|
||||||
|
defer dstCleanup()
|
||||||
|
|
||||||
|
seed := time.Now().UnixNano()
|
||||||
|
rand.Seed(seed)
|
||||||
|
t.Logf("rand seed is %v", seed)
|
||||||
|
|
||||||
|
createRandomBlobs(t, repo, 100, 0.7)
|
||||||
|
saveIndex(t, repo)
|
||||||
|
|
||||||
|
_, keepBlobs := selectBlobs(t, repo, 0.2)
|
||||||
|
copyPacks := findPacksForBlobs(t, repo, keepBlobs)
|
||||||
|
|
||||||
|
_, err := repository.Repack(context.TODO(), repo, dstRepo, copyPacks, keepBlobs, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
rebuildIndex(t, dstRepo)
|
||||||
|
reloadIndex(t, dstRepo)
|
||||||
|
|
||||||
|
idx := dstRepo.Index()
|
||||||
|
|
||||||
|
for h := range keepBlobs {
|
||||||
|
list := idx.Lookup(h)
|
||||||
|
if len(list) == 0 {
|
||||||
|
t.Errorf("unable to find blob %v in repo", h.ID.Str())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(list) != 1 {
|
||||||
|
t.Errorf("expected one pack in the list, got: %v", list)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRepackWrongBlob(t *testing.T) {
|
func TestRepackWrongBlob(t *testing.T) {
|
||||||
repo, cleanup := repository.TestRepository(t)
|
repo, cleanup := repository.TestRepository(t)
|
||||||
defer cleanup()
|
defer cleanup()
|
||||||
|
@ -293,7 +332,7 @@ func TestRepackWrongBlob(t *testing.T) {
|
||||||
_, keepBlobs := selectBlobs(t, repo, 0)
|
_, keepBlobs := selectBlobs(t, repo, 0)
|
||||||
rewritePacks := findPacksForBlobs(t, repo, keepBlobs)
|
rewritePacks := findPacksForBlobs(t, repo, keepBlobs)
|
||||||
|
|
||||||
_, err := repository.Repack(context.TODO(), repo, rewritePacks, keepBlobs, nil)
|
_, err := repository.Repack(context.TODO(), repo, repo, rewritePacks, keepBlobs, nil)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatal("expected repack to fail but got no error")
|
t.Fatal("expected repack to fail but got no error")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue