forked from TrueCloudLab/restic
Merge pull request #2840 from aawsome/delete-parallel
Make delete parallel
This commit is contained in:
commit
49d3efe547
5 changed files with 98 additions and 60 deletions
7
changelog/unreleased/pull-2840
Normal file
7
changelog/unreleased/pull-2840
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
Enhancement: Speed-up file deletion in forget, prune and rebuild-index
|
||||||
|
|
||||||
|
We've sped up the file deletion for the commands forget, prune and
|
||||||
|
rebuild-index, especially for remote repositories.
|
||||||
|
Deletion was sequential before and is now run in parallel.
|
||||||
|
|
||||||
|
https://github.com/restic/restic/pull/2840
|
|
@ -94,34 +94,22 @@ func runForget(opts ForgetOptions, gopts GlobalOptions, args []string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
removeSnapshots := 0
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(gopts.ctx)
|
ctx, cancel := context.WithCancel(gopts.ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
var snapshots restic.Snapshots
|
var snapshots restic.Snapshots
|
||||||
|
removeSnIDs := restic.NewIDSet()
|
||||||
|
|
||||||
for sn := range FindFilteredSnapshots(ctx, repo, opts.Hosts, opts.Tags, opts.Paths, args) {
|
for sn := range FindFilteredSnapshots(ctx, repo, opts.Hosts, opts.Tags, opts.Paths, args) {
|
||||||
snapshots = append(snapshots, sn)
|
snapshots = append(snapshots, sn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var jsonGroups []*ForgetGroup
|
||||||
|
|
||||||
if len(args) > 0 {
|
if len(args) > 0 {
|
||||||
// When explicit snapshots args are given, remove them immediately.
|
// When explicit snapshots args are given, remove them immediately.
|
||||||
for _, sn := range snapshots {
|
for _, sn := range snapshots {
|
||||||
if !opts.DryRun {
|
removeSnIDs.Insert(*sn.ID())
|
||||||
h := restic.Handle{Type: restic.SnapshotFile, Name: sn.ID().String()}
|
|
||||||
if err = repo.Backend().Remove(gopts.ctx, h); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if !gopts.JSON {
|
|
||||||
Verbosef("removed snapshot %v\n", sn.ID().Str())
|
|
||||||
}
|
|
||||||
removeSnapshots++
|
|
||||||
} else {
|
|
||||||
if !gopts.JSON {
|
|
||||||
Verbosef("would have removed snapshot %v\n", sn.ID().Str())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
snapshotGroups, _, err := restic.GroupSnapshots(snapshots, opts.GroupBy)
|
snapshotGroups, _, err := restic.GroupSnapshots(snapshots, opts.GroupBy)
|
||||||
|
@ -151,8 +139,6 @@ func runForget(opts ForgetOptions, gopts GlobalOptions, args []string) error {
|
||||||
Verbosef("Applying Policy: %v\n", policy)
|
Verbosef("Applying Policy: %v\n", policy)
|
||||||
}
|
}
|
||||||
|
|
||||||
var jsonGroups []*ForgetGroup
|
|
||||||
|
|
||||||
for k, snapshotGroup := range snapshotGroups {
|
for k, snapshotGroup := range snapshotGroups {
|
||||||
if gopts.Verbose >= 1 && !gopts.JSON {
|
if gopts.Verbose >= 1 && !gopts.JSON {
|
||||||
err = PrintSnapshotGroupHeader(gopts.stdout, k)
|
err = PrintSnapshotGroupHeader(gopts.stdout, k)
|
||||||
|
@ -191,36 +177,36 @@ func runForget(opts ForgetOptions, gopts GlobalOptions, args []string) error {
|
||||||
|
|
||||||
jsonGroups = append(jsonGroups, &fg)
|
jsonGroups = append(jsonGroups, &fg)
|
||||||
|
|
||||||
removeSnapshots += len(remove)
|
|
||||||
|
|
||||||
if !opts.DryRun {
|
|
||||||
for _, sn := range remove {
|
for _, sn := range remove {
|
||||||
h := restic.Handle{Type: restic.SnapshotFile, Name: sn.ID().String()}
|
removeSnIDs.Insert(*sn.ID())
|
||||||
err = repo.Backend().Remove(gopts.ctx, h)
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(removeSnIDs) > 0 {
|
||||||
|
if !opts.DryRun {
|
||||||
|
err := DeleteFilesChecked(gopts, repo, removeSnIDs, restic.SnapshotFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if !gopts.JSON {
|
||||||
|
Printf("Would have removed the following snapshots:\n%v\n\n", removeSnIDs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if gopts.JSON {
|
if gopts.JSON && len(jsonGroups) > 0 {
|
||||||
err = printJSONForget(gopts.stdout, jsonGroups)
|
err = printJSONForget(gopts.stdout, jsonGroups)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if removeSnapshots > 0 && opts.Prune {
|
if len(removeSnIDs) > 0 && opts.Prune && !opts.DryRun {
|
||||||
if !gopts.JSON {
|
|
||||||
Verbosef("%d snapshots have been removed, running prune\n", removeSnapshots)
|
|
||||||
}
|
|
||||||
if !opts.DryRun {
|
|
||||||
return pruneRepository(gopts, repo)
|
return pruneRepository(gopts, repo)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -296,17 +296,8 @@ func pruneRepository(gopts GlobalOptions, repo restic.Repository) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(removePacks) != 0 {
|
if len(removePacks) != 0 {
|
||||||
bar = newProgressMax(!gopts.Quiet, uint64(len(removePacks)), "packs deleted")
|
Verbosef("remove %d old packs\n", len(removePacks))
|
||||||
bar.Start()
|
DeleteFiles(gopts, repo, removePacks, restic.DataFile)
|
||||||
for packID := range removePacks {
|
|
||||||
h := restic.Handle{Type: restic.DataFile, Name: packID.String()}
|
|
||||||
err = repo.Backend().Remove(ctx, h)
|
|
||||||
if err != nil {
|
|
||||||
Warnf("unable to remove file %v from the repository\n", packID.Str())
|
|
||||||
}
|
|
||||||
bar.Report(restic.Stat{Blobs: 1})
|
|
||||||
}
|
|
||||||
bar.Done()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Verbosef("done\n")
|
Verbosef("done\n")
|
||||||
|
|
|
@ -92,15 +92,7 @@ func rebuildIndex(ctx context.Context, repo restic.Repository, ignorePacks resti
|
||||||
Verbosef("saved new indexes as %v\n", ids)
|
Verbosef("saved new indexes as %v\n", ids)
|
||||||
|
|
||||||
Verbosef("remove %d old index files\n", len(supersedes))
|
Verbosef("remove %d old index files\n", len(supersedes))
|
||||||
|
DeleteFiles(globalOptions, repo, restic.NewIDSet(supersedes...), restic.IndexFile)
|
||||||
for _, id := range supersedes {
|
|
||||||
if err := repo.Backend().Remove(ctx, restic.Handle{
|
|
||||||
Type: restic.IndexFile,
|
|
||||||
Name: id.String(),
|
|
||||||
}); err != nil {
|
|
||||||
Warnf("error removing old index %v: %v\n", id.Str(), err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
62
cmd/restic/delete.go
Normal file
62
cmd/restic/delete.go
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DeleteFiles deletes the given fileList of fileType in parallel
|
||||||
|
// it will print a warning if there is an error, but continue deleting the remaining files
|
||||||
|
func DeleteFiles(gopts GlobalOptions, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) {
|
||||||
|
deleteFiles(gopts, true, repo, fileList, fileType)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteFiles deletes the given fileList of fileType in parallel
|
||||||
|
// if an error occurs, it will cancel and return this error
|
||||||
|
func DeleteFilesChecked(gopts GlobalOptions, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) error {
|
||||||
|
return deleteFiles(gopts, false, repo, fileList, fileType)
|
||||||
|
}
|
||||||
|
|
||||||
|
const numDeleteWorkers = 8
|
||||||
|
|
||||||
|
// deleteFiles deletes the given fileList of fileType in parallel
|
||||||
|
// if ignoreError=true, it will print a warning if there was an error, else it will abort.
|
||||||
|
func deleteFiles(gopts GlobalOptions, ignoreError bool, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) error {
|
||||||
|
totalCount := len(fileList)
|
||||||
|
fileChan := make(chan restic.ID)
|
||||||
|
go func() {
|
||||||
|
for id := range fileList {
|
||||||
|
fileChan <- id
|
||||||
|
}
|
||||||
|
close(fileChan)
|
||||||
|
}()
|
||||||
|
|
||||||
|
bar := newProgressMax(!gopts.JSON && !gopts.Quiet, uint64(totalCount), "files deleted")
|
||||||
|
wg, ctx := errgroup.WithContext(gopts.ctx)
|
||||||
|
bar.Start()
|
||||||
|
for i := 0; i < numDeleteWorkers; i++ {
|
||||||
|
wg.Go(func() error {
|
||||||
|
for id := range fileChan {
|
||||||
|
h := restic.Handle{Type: fileType, Name: id.String()}
|
||||||
|
err := repo.Backend().Remove(ctx, h)
|
||||||
|
if err != nil {
|
||||||
|
if !gopts.JSON {
|
||||||
|
Warnf("unable to remove %v from the repository\n", h)
|
||||||
|
}
|
||||||
|
if !ignoreError {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !gopts.JSON && gopts.verbosity >= 2 {
|
||||||
|
Verbosef("removed %v\n", h)
|
||||||
|
}
|
||||||
|
bar.Report(restic.Stat{Blobs: 1})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err := wg.Wait()
|
||||||
|
bar.Done()
|
||||||
|
return err
|
||||||
|
}
|
Loading…
Reference in a new issue