index: let MasterIndex.Save also delete obsolete indexes

This commit is contained in:
Michael Eischer 2024-01-20 15:58:06 +01:00
parent bedff1ed6d
commit cb50832d50
7 changed files with 69 additions and 68 deletions

View file

@ -15,6 +15,7 @@ import (
"github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui" "github.com/restic/restic/internal/ui"
"github.com/restic/restic/internal/ui/progress"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -766,7 +767,7 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
return errors.Fatalf("%s", err) return errors.Fatalf("%s", err)
} }
} else if len(plan.ignorePacks) != 0 { } else if len(plan.ignorePacks) != 0 {
err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil) err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil, false)
if err != nil { if err != nil {
return errors.Fatalf("%s", err) return errors.Fatalf("%s", err)
} }
@ -778,7 +779,7 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
} }
if opts.unsafeRecovery { if opts.unsafeRecovery {
_, err = writeIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil) err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil, true)
if err != nil { if err != nil {
return errors.Fatalf("%s", err) return errors.Fatalf("%s", err)
} }
@ -788,23 +789,22 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
return nil return nil
} }
func writeIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs) (restic.IDSet, error) { func rebuildIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool) error {
Verbosef("rebuilding index\n") Verbosef("rebuilding index\n")
bar := newProgressMax(!gopts.Quiet, 0, "packs processed") bar := newProgressMax(!gopts.Quiet, 0, "packs processed")
obsoleteIndexes, err := repo.Index().Save(ctx, repo, removePacks, extraObsolete, bar) return repo.Index().Save(ctx, repo, removePacks, extraObsolete, restic.MasterIndexSaveOpts{
bar.Done() SaveProgress: bar,
return obsoleteIndexes, err DeleteProgress: func() *progress.Counter {
return newProgressMax(!gopts.Quiet, 0, "old indexes deleted")
},
DeleteReport: func(id restic.ID, err error) {
if gopts.verbosity > 2 {
Verbosef("removed index %v\n", id.String())
} }
},
func rebuildIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs) error { SkipDeletion: skipDeletion,
obsoleteIndexes, err := writeIndexFiles(ctx, gopts, repo, removePacks, extraObsolete) })
if err != nil {
return err
}
Verbosef("deleting obsolete index files\n")
return DeleteFilesChecked(ctx, gopts, repo, obsoleteIndexes, restic.IndexFile)
} }
func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots restic.IDSet, quiet bool) (usedBlobs restic.CountedBlobSet, err error) { func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots restic.IDSet, quiet bool) (usedBlobs restic.CountedBlobSet, err error) {

View file

@ -154,7 +154,7 @@ func rebuildIndex(ctx context.Context, opts RepairIndexOptions, gopts GlobalOpti
} }
} }
err = rebuildIndexFiles(ctx, gopts, repo, removePacks, obsoleteIndexes) err = rebuildIndexFiles(ctx, gopts, repo, removePacks, obsoleteIndexes, false)
if err != nil { if err != nil {
return err return err
} }

View file

@ -145,7 +145,7 @@ func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repo
bar.Done() bar.Done()
// remove salvaged packs from index // remove salvaged packs from index
err = rebuildIndexFiles(ctx, gopts, repo, ids, nil) err = rebuildIndexFiles(ctx, gopts, repo, ids, nil, false)
if err != nil { if err != nil {
return errors.Fatalf("%s", err) return errors.Fatalf("%s", err)
} }

View file

@ -9,7 +9,6 @@ import (
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -267,23 +266,22 @@ func (mi *MasterIndex) MergeFinalIndexes() error {
// Save saves all known indexes to index files, leaving out any // Save saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist from finalized indexes. // packs whose ID is contained in packBlacklist from finalized indexes.
// The new index contains the IDs of all known indexes in the "supersedes" // It also removes the old index files and those listed in extraObsolete.
// field. The IDs are also returned in the IDSet obsolete. func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, excludePacks restic.IDSet, extraObsolete restic.IDs, opts restic.MasterIndexSaveOpts) error {
// After calling this function, you should remove the obsolete index files. p := opts.SaveProgress
func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverUnpacked, packBlacklist restic.IDSet, extraObsolete restic.IDs, p *progress.Counter) (obsolete restic.IDSet, err error) { p.SetMax(uint64(len(mi.Packs(excludePacks))))
p.SetMax(uint64(len(mi.Packs(packBlacklist))))
mi.idxMutex.Lock() mi.idxMutex.Lock()
defer mi.idxMutex.Unlock() defer mi.idxMutex.Unlock()
debug.Log("start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist) debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(mi.idx), excludePacks)
newIndex := NewIndex() newIndex := NewIndex()
obsolete = restic.NewIDSet() obsolete := restic.NewIDSet()
// track spawned goroutines using wg, create a new context which is // track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs. // cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx) wg, wgCtx := errgroup.WithContext(ctx)
ch := make(chan *Index) ch := make(chan *Index)
@ -310,21 +308,21 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverUnpacked, pack
debug.Log("adding index %d", i) debug.Log("adding index %d", i)
for pbs := range idx.EachByPack(ctx, packBlacklist) { for pbs := range idx.EachByPack(wgCtx, excludePacks) {
newIndex.StorePack(pbs.PackID, pbs.Blobs) newIndex.StorePack(pbs.PackID, pbs.Blobs)
p.Add(1) p.Add(1)
if IndexFull(newIndex, mi.compress) { if IndexFull(newIndex, mi.compress) {
select { select {
case ch <- newIndex: case ch <- newIndex:
case <-ctx.Done(): case <-wgCtx.Done():
return ctx.Err() return wgCtx.Err()
} }
newIndex = NewIndex() newIndex = NewIndex()
} }
} }
} }
err = newIndex.AddToSupersedes(extraObsolete...) err := newIndex.AddToSupersedes(extraObsolete...)
if err != nil { if err != nil {
return err return err
} }
@ -332,7 +330,7 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverUnpacked, pack
select { select {
case ch <- newIndex: case ch <- newIndex:
case <-ctx.Done(): case <-wgCtx.Done():
} }
return nil return nil
}) })
@ -341,7 +339,7 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverUnpacked, pack
worker := func() error { worker := func() error {
for idx := range ch { for idx := range ch {
idx.Finalize() idx.Finalize()
if _, err := SaveIndex(ctx, repo, idx); err != nil { if _, err := SaveIndex(wgCtx, repo, idx); err != nil {
return err return err
} }
} }
@ -354,9 +352,27 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverUnpacked, pack
for i := 0; i < workerCount; i++ { for i := 0; i < workerCount; i++ {
wg.Go(worker) wg.Go(worker)
} }
err = wg.Wait() err := wg.Wait()
p.Done()
if err != nil {
return err
}
return obsolete, err if opts.SkipDeletion {
return nil
}
p = nil
if opts.DeleteProgress != nil {
p = opts.DeleteProgress()
}
defer p.Done()
return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error {
if opts.DeleteReport != nil {
opts.DeleteReport(id, err)
}
return err
}, p)
} }
// SaveIndex saves an index in the repository. // SaveIndex saves an index in the repository.

View file

@ -8,7 +8,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/checker" "github.com/restic/restic/internal/checker"
"github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/index" "github.com/restic/restic/internal/index"
@ -363,20 +362,11 @@ func testIndexSave(t *testing.T, version uint) {
t.Fatal(err) t.Fatal(err)
} }
obsoletes, err := repo.Index().Save(context.TODO(), repo, nil, nil, nil) err = repo.Index().Save(context.TODO(), repo, nil, nil, restic.MasterIndexSaveOpts{})
if err != nil { if err != nil {
t.Fatalf("unable to save new index: %v", err) t.Fatalf("unable to save new index: %v", err)
} }
for id := range obsoletes {
t.Logf("remove index %v", id.Str())
h := backend.Handle{Type: restic.IndexFile, Name: id.String()}
err = repo.Backend().Remove(context.TODO(), h)
if err != nil {
t.Errorf("error removing index %v: %v", id, err)
}
}
checker := checker.New(repo, false) checker := checker.New(repo, false)
err = checker.LoadSnapshots(context.TODO()) err = checker.LoadSnapshots(context.TODO())
if err != nil { if err != nil {

View file

@ -173,39 +173,27 @@ func flush(t *testing.T, repo restic.Repository) {
func rebuildIndex(t *testing.T, repo restic.Repository) { func rebuildIndex(t *testing.T, repo restic.Repository) {
err := repo.SetIndex(index.NewMasterIndex()) err := repo.SetIndex(index.NewMasterIndex())
if err != nil { rtest.OK(t, err)
t.Fatal(err)
}
packs := make(map[restic.ID]int64) packs := make(map[restic.ID]int64)
err = repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { err = repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error {
packs[id] = size packs[id] = size
return nil return nil
}) })
if err != nil { rtest.OK(t, err)
t.Fatal(err)
}
_, err = repo.(*repository.Repository).CreateIndexFromPacks(context.TODO(), packs, nil) _, err = repo.(*repository.Repository).CreateIndexFromPacks(context.TODO(), packs, nil)
if err != nil { rtest.OK(t, err)
t.Fatal(err)
}
var obsoleteIndexes restic.IDs
err = repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error { err = repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error {
h := backend.Handle{ obsoleteIndexes = append(obsoleteIndexes, id)
Type: restic.IndexFile, return nil
Name: id.String(),
}
return repo.Backend().Remove(context.TODO(), h)
}) })
if err != nil { rtest.OK(t, err)
t.Fatal(err)
}
_, err = repo.Index().Save(context.TODO(), repo, restic.NewIDSet(), nil, nil) err = repo.Index().Save(context.TODO(), repo, restic.NewIDSet(), obsoleteIndexes, restic.MasterIndexSaveOpts{})
if err != nil { rtest.OK(t, err)
t.Fatal(err)
}
} }
func reloadIndex(t *testing.T, repo restic.Repository) { func reloadIndex(t *testing.T, repo restic.Repository) {

View file

@ -89,6 +89,13 @@ type PackBlobs struct {
Blobs []Blob Blobs []Blob
} }
type MasterIndexSaveOpts struct {
SaveProgress *progress.Counter
DeleteProgress func() *progress.Counter
DeleteReport func(id ID, err error)
SkipDeletion bool
}
// MasterIndex keeps track of the blobs are stored within files. // MasterIndex keeps track of the blobs are stored within files.
type MasterIndex interface { type MasterIndex interface {
Has(BlobHandle) bool Has(BlobHandle) bool
@ -99,7 +106,7 @@ type MasterIndex interface {
Each(ctx context.Context, fn func(PackedBlob)) Each(ctx context.Context, fn func(PackedBlob))
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
Save(ctx context.Context, repo SaverUnpacked, packBlacklist IDSet, extraObsolete IDs, p *progress.Counter) (obsolete IDSet, err error) Save(ctx context.Context, repo Repository, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error
} }
// Lister allows listing files in a backend. // Lister allows listing files in a backend.