From 38cc4393f6d827c5f3f8b5d58e9e8d4ffd6f9763 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Sat, 10 Oct 2020 07:42:22 +0200 Subject: [PATCH 1/4] Add Masterindex.Save(); Add Index.Packs() --- internal/repository/index.go | 49 ++++++++++++ internal/repository/master_index.go | 98 ++++++++++++++++-------- internal/repository/master_index_test.go | 64 ++++++++++++++++ internal/restic/repository.go | 1 + 4 files changed, 178 insertions(+), 34 deletions(-) diff --git a/internal/repository/index.go b/internal/repository/index.go index c5dcf4f56..30fe3ddb9 100644 --- a/internal/repository/index.go +++ b/internal/repository/index.go @@ -275,6 +275,55 @@ func (idx *Index) Each(ctx context.Context) <-chan restic.PackedBlob { return ch } +type EachByPackResult struct { + packID restic.ID + blobs []restic.Blob +} + +// EachByPack returns a channel that yields all blobs known to the index +// grouped by packID but ignoring blobs with a packID in packPlacklist. +// When the context is cancelled, the background goroutine +// terminates. This blocks any modification of the index. +func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult { + idx.m.Lock() + + ch := make(chan EachByPackResult) + + go func() { + defer idx.m.Unlock() + defer func() { + close(ch) + }() + + for typ := range idx.byType { + byPack := make(map[restic.ID][]*indexEntry) + m := &idx.byType[typ] + m.foreach(func(e *indexEntry) bool { + packID := idx.packs[e.packIndex] + if !packBlacklist.Has(packID) { + byPack[packID] = append(byPack[packID], e) + } + return true + }) + + for packID, pack := range byPack { + var result EachByPackResult + result.packID = packID + for _, e := range pack { + result.blobs = append(result.blobs, idx.toPackedBlob(e, restic.BlobType(typ)).Blob) + } + select { + case <-ctx.Done(): + return + case ch <- result: + } + } + } + }() + + return ch +} + // Packs returns all packs in this index func (idx *Index) Packs() restic.IDSet { idx.m.Lock() diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index 6e84d3b1b..ddc72fd3c 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -97,6 +97,19 @@ func (mi *MasterIndex) Has(id restic.ID, tpe restic.BlobType) bool { return false } +// Packs returns all packs that are covered by the index. +func (mi *MasterIndex) Packs() restic.IDSet { + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() + + packs := restic.NewIDSet() + for _, idx := range mi.idx { + packs.Merge(idx.Packs()) + } + + return packs +} + // Count returns the number of blobs of type t in the index. func (mi *MasterIndex) Count(t restic.BlobType) (n uint) { mi.idxMutex.RLock() @@ -248,49 +261,66 @@ func (mi *MasterIndex) MergeFinalIndexes() { mi.idx = newIdx } -// RebuildIndex combines all known indexes to a new index, leaving out any +// Save saves all known indexes to index files, leaving out any // packs whose ID is contained in packBlacklist. The new index contains the IDs -// of all known indexes in the "supersedes" field. -func (mi *MasterIndex) RebuildIndex(ctx context.Context, packBlacklist restic.IDSet) (*Index, error) { +// of all known indexes in the "supersedes" field. The IDs are also returned in +// the IDSet obsolete +// After calling this function, you should remove the obsolete index files. +func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBlacklist restic.IDSet, p *restic.Progress) (obsolete restic.IDSet, err error) { + p.Start() + defer p.Done() + mi.idxMutex.Lock() defer mi.idxMutex.Unlock() debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist) newIndex := NewIndex() + obsolete = restic.NewIDSet() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - for i, idx := range mi.idx { - debug.Log("adding index %d", i) - - for pb := range idx.Each(ctx) { - if packBlacklist.Has(pb.PackID) { - continue - } - - newIndex.Store(pb) - } - - if !idx.Final() { - debug.Log("index %d isn't final, don't add to supersedes field", i) - continue - } - - ids, err := idx.IDs() - if err != nil { - debug.Log("index %d does not have an ID: %v", err) - return nil, err - } - - debug.Log("adding index ids %v to supersedes field", ids) - - err = newIndex.AddToSupersedes(ids...) - if err != nil { - return nil, err + finalize := func() error { + newIndex.Finalize() + if _, err := SaveIndex(ctx, repo, newIndex); err != nil { + return err } + newIndex = NewIndex() + return nil } - return newIndex, nil + for i, idx := range mi.idx { + if idx.Final() { + ids, err := idx.IDs() + if err != nil { + debug.Log("index %d does not have an ID: %v", err) + return nil, err + } + + debug.Log("adding index ids %v to supersedes field", ids) + + err = newIndex.AddToSupersedes(ids...) + if err != nil { + return nil, err + } + obsolete.Merge(restic.NewIDSet(ids...)) + } else { + debug.Log("index %d isn't final, don't add to supersedes field", i) + } + + debug.Log("adding index %d", i) + + for pbs := range idx.EachByPack(ctx, packBlacklist) { + newIndex.StorePack(pbs.packID, pbs.blobs) + p.Report(restic.Stat{Blobs: 1}) + if IndexFull(newIndex) { + if err := finalize(); err != nil { + return nil, err + } + } + } + } + if err := finalize(); err != nil { + return nil, err + } + + return } diff --git a/internal/repository/master_index_test.go b/internal/repository/master_index_test.go index 28eba03a8..aa9905321 100644 --- a/internal/repository/master_index_test.go +++ b/internal/repository/master_index_test.go @@ -5,7 +5,9 @@ import ( "fmt" "math/rand" "testing" + "time" + "github.com/restic/restic/internal/checker" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" @@ -322,3 +324,65 @@ func BenchmarkMasterIndexLookupBlobSize(b *testing.B) { mIdx.LookupSize(lookupID, restic.DataBlob) } } + +var ( + snapshotTime = time.Unix(1470492820, 207401672) + depth = 3 +) + +func createFilledRepo(t testing.TB, snapshots int, dup float32) (restic.Repository, func()) { + repo, cleanup := repository.TestRepository(t) + + for i := 0; i < 3; i++ { + restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(i)*time.Second), depth, dup) + } + + return repo, cleanup +} + +func TestIndexSave(t *testing.T) { + repo, cleanup := createFilledRepo(t, 3, 0) + defer cleanup() + + repo.LoadIndex(context.TODO()) + + obsoletes, err := repo.Index().(*repository.MasterIndex).Save(context.TODO(), repo, nil, nil) + if err != nil { + t.Fatalf("unable to save new index: %v", err) + } + + for id := range obsoletes { + t.Logf("remove index %v", id.Str()) + h := restic.Handle{Type: restic.IndexFile, Name: id.String()} + err = repo.Backend().Remove(context.TODO(), h) + if err != nil { + t.Errorf("error removing index %v: %v", id, err) + } + } + + checker := checker.New(repo) + hints, errs := checker.LoadIndex(context.TODO()) + for _, h := range hints { + t.Logf("hint: %v\n", h) + } + + for _, err := range errs { + t.Errorf("checker found error: %v", err) + } + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + errCh := make(chan error) + go checker.Structure(ctx, errCh) + i := 0 + for err := range errCh { + t.Errorf("checker returned error: %v", err) + i++ + if i == 10 { + t.Errorf("more than 10 errors returned, skipping the rest") + cancel() + break + } + } +} diff --git a/internal/restic/repository.go b/internal/restic/repository.go index dcda37f04..5efdfbc03 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -62,6 +62,7 @@ type MasterIndex interface { Has(ID, BlobType) bool Lookup(ID, BlobType) []PackedBlob Count(BlobType) uint + Packs() IDSet // Each returns a channel that yields all blobs known to the index. When // the context is cancelled, the background goroutine terminates. This From fd330305563af8502601f0884f829341bc29f9c9 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Sat, 10 Oct 2020 18:54:13 +0200 Subject: [PATCH 2/4] Use in-memory index to rebuild index in prune --- changelog/unreleased/pull-2718 | 4 ++++ cmd/restic/cmd_prune.go | 23 ++++++++++++++++++++++- doc/060_forget.rst | 20 ++++++++------------ internal/repository/repository.go | 5 ++++- 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/changelog/unreleased/pull-2718 b/changelog/unreleased/pull-2718 index 56af35d71..a9eefab79 100644 --- a/changelog/unreleased/pull-2718 +++ b/changelog/unreleased/pull-2718 @@ -3,6 +3,8 @@ Enhancement: Improve pruning performance and make pruning more customizable The `prune` command is now much faster. This is especially the case for remote repositories or repositories with not much data to remove. Also the memory usage of the `prune` command is now reduced. +Restic used to rebuild the index from scratch after pruning. This is now +changed and the index rebuilding uses the information already known by `prune`. By default, the `prune` command no longer removes all unused data. This behavior can be fine-tuned by new options, like the acceptable amount of unused space or @@ -14,9 +16,11 @@ also shows what `prune` would do. Fixes several open issues, e.g.: https://github.com/restic/restic/issues/1140 +https://github.com/restic/restic/issues/1599 https://github.com/restic/restic/issues/1985 https://github.com/restic/restic/issues/2112 https://github.com/restic/restic/issues/2227 https://github.com/restic/restic/issues/2305 https://github.com/restic/restic/pull/2718 +https://github.com/restic/restic/pull/2842 diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index 4fbc5fab7..a8f87a6ff 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -471,19 +471,26 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB DeleteFiles(gopts, repo, removePacksFirst, restic.PackFile) } + packsAddedByRepack := 0 if len(repackPacks) != 0 { + packsAddedByRepack -= len(repo.Index().Packs()) Verbosef("repacking packs\n") bar := newProgressMax(!gopts.Quiet, uint64(len(repackPacks)), "packs repacked") _, err := repository.Repack(ctx, repo, repackPacks, keepBlobs, bar) if err != nil { return err } + packsAddedByRepack += len(repo.Index().Packs()) + // Also remove repacked packs removePacks.Merge(repackPacks) } if len(removePacks) != 0 { - if err = rebuildIndex(ctx, repo, removePacks); err != nil { + totalpacks := int(stats.packs.used+stats.packs.partlyUsed+stats.packs.unused) - + len(removePacks) + packsAddedByRepack + err = rebuildIndexFiles(gopts, repo, removePacks, uint64(totalpacks)) + if err != nil { return err } @@ -495,6 +502,20 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB return nil } +func rebuildIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, packcount uint64) error { + Verbosef("rebuilding index\n") + + bar := newProgressMax(!gopts.Quiet, packcount, "packs processed") + obsoleteIndexes, err := (repo.Index()).(*repository.MasterIndex). + Save(gopts.ctx, repo, removePacks, bar) + if err != nil { + return err + } + + Verbosef("deleting obsolete index files\n") + return DeleteFilesChecked(gopts, repo, obsoleteIndexes, restic.IndexFile) +} + func getUsedBlobs(gopts GlobalOptions, repo restic.Repository, snapshots []*restic.Snapshot) (usedBlobs restic.BlobSet, err error) { ctx := gopts.ctx diff --git a/doc/060_forget.rst b/doc/060_forget.rst index 08381f180..79e8c9b3d 100644 --- a/doc/060_forget.rst +++ b/doc/060_forget.rst @@ -99,12 +99,10 @@ command must be run: repacking packs [0:00] 100.00% 2 / 2 packs repacked - counting files in repo - [0:00] 100.00% 3 / 3 packs - finding old index files - saved new indexes as [59270b3a] - remove 4 old index files - [0:00] 100.00% 4 / 4 files deleted + rebuilding index + [0:00] 100.00% 3 / 3 packs processed + deleting obsolete index files + [0:00] 100.00% 3 / 3 files deleted removing 3 old packs [0:00] 100.00% 3 / 3 files deleted done @@ -147,12 +145,10 @@ to ``forget``: repacking packs [0:00] 100.00% 2 / 2 packs repacked - counting files in repo - [0:00] 100.00% 3 / 3 packs - finding old index files - saved new indexes as [59270b3a] - remove 4 old index files - [0:00] 100.00% 4 / 4 files deleted + rebuilding index + [0:00] 100.00% 3 / 3 packs processed + deleting obsolete index files + [0:00] 100.00% 3 / 3 files deleted removing 3 old packs [0:00] 100.00% 3 / 3 files deleted done diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 77299c277..051b99252 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -322,7 +322,10 @@ func (r *Repository) Flush(ctx context.Context) error { return err } - // Save index after flushing + // Save index after flushing only if noAutoIndexUpdate is not set + if r.noAutoIndexUpdate { + return nil + } return r.SaveIndex(ctx) } From d2e53730d609ed5db918c2ce3c5f6866b0ec4108 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Sat, 17 Oct 2020 21:55:46 +0200 Subject: [PATCH 3/4] Add test that repo.List is only called once --- changelog/unreleased/pull-2718 | 7 +++-- cmd/restic/integration_test.go | 56 ++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/changelog/unreleased/pull-2718 b/changelog/unreleased/pull-2718 index a9eefab79..1a0985b5b 100644 --- a/changelog/unreleased/pull-2718 +++ b/changelog/unreleased/pull-2718 @@ -3,8 +3,11 @@ Enhancement: Improve pruning performance and make pruning more customizable The `prune` command is now much faster. This is especially the case for remote repositories or repositories with not much data to remove. Also the memory usage of the `prune` command is now reduced. -Restic used to rebuild the index from scratch after pruning. This is now -changed and the index rebuilding uses the information already known by `prune`. +Restic used to rebuild the index from scratch after pruning. This could lead +to missing packs in the index in some cases for eventually consistent +backends, like e.g. AWS S3. +This behavior is now changed and the index rebuilding uses the information +already known by `prune`. By default, the `prune` command no longer removes all unused data. This behavior can be fine-tuned by new options, like the acceptable amount of unused space or diff --git a/cmd/restic/integration_test.go b/cmd/restic/integration_test.go index 0faf07cb5..42fe34886 100644 --- a/cmd/restic/integration_test.go +++ b/cmd/restic/integration_test.go @@ -1559,6 +1559,62 @@ func testEdgeCaseRepo(t *testing.T, tarfile string, optionsCheck CheckOptions, o } } +// a listOnceBackend only allows listing once per filetype +// listing filetypes more than once may cause problems with eventually consistent +// backends (like e.g. AWS S3) as the second listing may be inconsistent to what +// is expected by the first listing + some operations. +type listOnceBackend struct { + restic.Backend + listedFileType map[restic.FileType]bool +} + +func newListOnceBackend(be restic.Backend) *listOnceBackend { + return &listOnceBackend{ + Backend: be, + listedFileType: make(map[restic.FileType]bool), + } +} + +func (be *listOnceBackend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { + if t != restic.LockFile && be.listedFileType[t] { + return errors.Errorf("tried listing type %v the second time", t) + } + be.listedFileType[t] = true + return be.Backend.List(ctx, t, fn) +} + +func TestPruneListOnce(t *testing.T) { + env, cleanup := withTestEnvironment(t) + defer cleanup() + + env.gopts.backendTestHook = func(r restic.Backend) (restic.Backend, error) { + return newListOnceBackend(r), nil + } + + pruneOpts := PruneOptions{MaxUnused: "0"} + checkOpts := CheckOptions{ReadData: true, CheckUnused: true} + + testSetupBackupData(t, env) + opts := BackupOptions{} + + testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9")}, opts, env.gopts) + firstSnapshot := testRunList(t, "snapshots", env.gopts) + rtest.Assert(t, len(firstSnapshot) == 1, + "expected one snapshot, got %v", firstSnapshot) + + testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9", "2")}, opts, env.gopts) + testRunBackup(t, "", []string{filepath.Join(env.testdata, "0", "0", "9", "3")}, opts, env.gopts) + + snapshotIDs := testRunList(t, "snapshots", env.gopts) + rtest.Assert(t, len(snapshotIDs) == 3, + "expected 3 snapshot, got %v", snapshotIDs) + + testRunForgetJSON(t, env.gopts) + testRunForget(t, env.gopts, firstSnapshot[0].String()) + testRunPrune(t, env.gopts, pruneOpts) + rtest.OK(t, runCheck(checkOpts, env.gopts, nil)) +} + func TestHardLink(t *testing.T) { // this test assumes a test set with a single directory containing hard linked files env, cleanup := withTestEnvironment(t) From 47277c4b4c945698f1837869eae14926ebd8f211 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 6 Nov 2020 11:26:35 +0100 Subject: [PATCH 4/4] Add comments, clarify computation --- cmd/restic/cmd_prune.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index a8f87a6ff..49889f6eb 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -473,14 +473,19 @@ func prune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, usedB packsAddedByRepack := 0 if len(repackPacks) != 0 { - packsAddedByRepack -= len(repo.Index().Packs()) + // Remember the number of unique packs before repacking + packsBeforeRepacking := len(repo.Index().Packs()) + Verbosef("repacking packs\n") bar := newProgressMax(!gopts.Quiet, uint64(len(repackPacks)), "packs repacked") _, err := repository.Repack(ctx, repo, repackPacks, keepBlobs, bar) if err != nil { return err } - packsAddedByRepack += len(repo.Index().Packs()) + + // Since repacking will only add new packs, we can calculate the number + // of packs like this: + packsAddedByRepack = len(repo.Index().Packs()) - packsBeforeRepacking // Also remove repacked packs removePacks.Merge(repackPacks)