Merge pull request #4812 from MichaelEischer/streaming-index-rewrite

Resumable prune & memory-efficient index rewrite
This commit is contained in:
Michael Eischer 2024-05-24 21:41:30 +02:00 committed by GitHub
commit ff4775a15f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
49 changed files with 616 additions and 492 deletions

View file

@ -0,0 +1,11 @@
Enhancement: Make `prune` command resumable
When `prune` was interrupted, it a latter `prune` run previously started repacking
the pack files from the start as `prune` did not update the index while repacking.
The `prune` command now supports resuming interrupted prune runs. The update
of the repository index also has been optimized to use less memory and only
rewrite parts of the index that have changed.
https://github.com/restic/restic/issues/3806
https://github.com/restic/restic/pull/4812

View file

@ -167,8 +167,7 @@ func runCat(ctx context.Context, gopts GlobalOptions, args []string) error {
}
for _, t := range []restic.BlobType{restic.DataBlob, restic.TreeBlob} {
bh := restic.BlobHandle{ID: id, Type: t}
if !repo.Index().Has(bh) {
if _, ok := repo.LookupBlobSize(t, id); !ok {
continue
}

View file

@ -187,7 +187,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
packList := restic.NewIDSet()
enqueue := func(h restic.BlobHandle) {
pb := srcRepo.Index().Lookup(h)
pb := srcRepo.LookupBlob(h.Type, h.ID)
copyBlobs.Insert(h)
for _, p := range pb {
packList.Insert(p.PackID)
@ -202,7 +202,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
// Do we already have this tree blob?
treeHandle := restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}
if !dstRepo.Index().Has(treeHandle) {
if _, ok := dstRepo.LookupBlobSize(treeHandle.Type, treeHandle.ID); !ok {
// copy raw tree bytes to avoid problems if the serialization changes
enqueue(treeHandle)
}
@ -212,7 +212,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
// Copy the blobs for this file.
for _, blobID := range entry.Content {
h := restic.BlobHandle{Type: restic.DataBlob, ID: blobID}
if !dstRepo.Index().Has(h) {
if _, ok := dstRepo.LookupBlobSize(h.Type, h.ID); !ok {
enqueue(h)
}
}

View file

@ -492,7 +492,7 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo
blobsLoaded := false
// examine all data the indexes have for the pack file
for b := range repo.Index().ListPacks(ctx, restic.NewIDSet(id)) {
for b := range repo.ListPacksFromIndex(ctx, restic.NewIDSet(id)) {
blobs := b.Blobs
if len(blobs) == 0 {
continue

View file

@ -156,7 +156,7 @@ func updateBlobs(repo restic.Loader, blobs restic.BlobSet, stats *DiffStat) {
stats.TreeBlobs++
}
size, found := repo.LookupBlobSize(h.ID, h.Type)
size, found := repo.LookupBlobSize(h.Type, h.ID)
if !found {
Warnf("unable to find blob size for %v\n", h)
continue

View file

@ -465,7 +465,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
// remember which packs were found in the index
indexPackIDs := make(map[string]struct{})
err := f.repo.Index().Each(wctx, func(pb restic.PackedBlob) {
err := f.repo.ListBlobs(wctx, func(pb restic.PackedBlob) {
idStr := pb.PackID.String()
// keep entry in packIDs as Each() returns individual index entries
matchingID := false
@ -503,15 +503,13 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
}
func (f *Finder) findObjectPack(id string, t restic.BlobType) {
idx := f.repo.Index()
rid, err := restic.ParseID(id)
if err != nil {
Printf("Note: cannot find pack for object '%s', unable to parse ID: %v\n", id, err)
return
}
blobs := idx.Lookup(restic.BlobHandle{ID: rid, Type: t})
blobs := f.repo.LookupBlob(t, rid)
if len(blobs) == 0 {
Printf("Object %s not found in the index\n", rid.Str())
return

View file

@ -162,9 +162,6 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, term
}
func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo *repository.Repository, ignoreSnapshots restic.IDSet, term *termstatus.Terminal) error {
// we do not need index updates while pruning!
repo.DisableAutoIndexUpdate()
if repo.Cache == nil {
Print("warning: running prune without a cache, this may be very slow!\n")
}

View file

@ -61,7 +61,7 @@ func runRecover(ctx context.Context, gopts GlobalOptions) error {
// tree. If it is not referenced, we have a root tree.
trees := make(map[restic.ID]bool)
err = repo.Index().Each(ctx, func(blob restic.PackedBlob) {
err = repo.ListBlobs(ctx, func(blob restic.PackedBlob) {
if blob.Type == restic.TreeBlob {
trees[blob.Blob.ID] = false
}

View file

@ -97,7 +97,7 @@ func runRepairSnapshots(ctx context.Context, gopts GlobalOptions, opts RepairOpt
var newSize uint64
// check all contents and remove if not available
for _, id := range node.Content {
if size, found := repo.LookupBlobSize(id, restic.DataBlob); !found {
if size, found := repo.LookupBlobSize(restic.DataBlob, id); !found {
ok = false
} else {
newContent = append(newContent, id)

View file

@ -124,7 +124,7 @@ func runStats(ctx context.Context, opts StatsOptions, gopts GlobalOptions, args
if opts.countMode == countModeRawData {
// the blob handles have been collected, but not yet counted
for blobHandle := range stats.blobs {
pbs := repo.Index().Lookup(blobHandle)
pbs := repo.LookupBlob(blobHandle.Type, blobHandle.ID)
if len(pbs) == 0 {
return fmt.Errorf("blob %v not found", blobHandle)
}
@ -238,7 +238,7 @@ func statsWalkTree(repo restic.Loader, opts StatsOptions, stats *statsContainer,
}
if _, ok := stats.fileBlobs[nodePath][blobID]; !ok {
// is always a data blob since we're accessing it via a file's Content array
blobSize, found := repo.LookupBlobSize(blobID, restic.DataBlob)
blobSize, found := repo.LookupBlobSize(restic.DataBlob, blobID)
if !found {
return fmt.Errorf("blob %s not found for tree %s", blobID, parentTreeID)
}
@ -378,7 +378,7 @@ func statsDebugBlobs(ctx context.Context, repo restic.Repository) ([restic.NumBl
hist[i] = newSizeHistogram(2 * chunker.MaxSize)
}
err := repo.Index().Each(ctx, func(pb restic.PackedBlob) {
err := repo.ListBlobs(ctx, func(pb restic.PackedBlob) {
hist[pb.Type].Add(uint64(pb.Length))
})

View file

@ -252,7 +252,7 @@ func listTreePacks(gopts GlobalOptions, t *testing.T) restic.IDSet {
rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet()
rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) {
rtest.OK(t, r.ListBlobs(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID)
}
@ -280,7 +280,7 @@ func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, rem
rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet()
rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) {
rtest.OK(t, r.ListBlobs(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID)
}

View file

@ -64,9 +64,19 @@ func (s *ItemStats) Add(other ItemStats) {
s.TreeSizeInRepo += other.TreeSizeInRepo
}
type archiverRepo interface {
restic.Loader
restic.BlobSaver
restic.SaverUnpacked
Config() restic.Config
StartPackUploader(ctx context.Context, wg *errgroup.Group)
Flush(ctx context.Context) error
}
// Archiver saves a directory structure to the repo.
type Archiver struct {
Repo restic.Repository
Repo archiverRepo
SelectByName SelectByNameFunc
Select SelectFunc
FS fs.FS
@ -160,7 +170,7 @@ func (o Options) ApplyDefaults() Options {
}
// New initializes a new archiver.
func New(repo restic.Repository, fs fs.FS, opts Options) *Archiver {
func New(repo archiverRepo, fs fs.FS, opts Options) *Archiver {
arch := &Archiver{
Repo: repo,
SelectByName: func(_ string) bool { return true },
@ -276,7 +286,7 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) (*rest
}
func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error {
if arch.Repo.Index().Has(restic.BlobHandle{ID: id, Type: restic.TreeBlob}) {
if _, ok := arch.Repo.LookupBlobSize(restic.TreeBlob, id); ok {
err = errors.Errorf("tree %v could not be loaded; the repository could be damaged: %v", id, err)
} else {
err = errors.Errorf("tree %v is not known; the repository could be damaged, run `repair index` to try to repair it", id)
@ -390,7 +400,7 @@ func (fn *FutureNode) take(ctx context.Context) futureNodeResult {
func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool {
// check if all blobs are contained in index
for _, id := range previous.Content {
if !arch.Repo.Index().Has(restic.BlobHandle{ID: id, Type: restic.DataBlob}) {
if _, ok := arch.Repo.LookupBlobSize(restic.DataBlob, id); !ok {
return false
}
}

View file

@ -36,7 +36,7 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (string, restic.Repository
return tempdir, repo
}
func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) {
func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS) (*restic.Node, ItemStats) {
wg, ctx := errgroup.WithContext(context.TODO())
repo.StartPackUploader(ctx, wg)
@ -416,14 +416,14 @@ func BenchmarkArchiverSaveFileLarge(b *testing.B) {
}
type blobCountingRepo struct {
restic.Repository
archiverRepo
m sync.Mutex
saved map[restic.BlobHandle]uint
}
func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) {
id, exists, size, err := repo.Repository.SaveBlob(ctx, t, buf, id, storeDuplicate)
id, exists, size, err := repo.archiverRepo.SaveBlob(ctx, t, buf, id, storeDuplicate)
if exists {
return id, exists, size, err
}
@ -435,7 +435,7 @@ func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, b
}
func (repo *blobCountingRepo) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, error) {
id, err := restic.SaveTree(ctx, repo.Repository, t)
id, err := restic.SaveTree(ctx, repo.archiverRepo, t)
h := restic.BlobHandle{ID: id, Type: restic.TreeBlob}
repo.m.Lock()
repo.saved[h]++
@ -465,7 +465,7 @@ func TestArchiverSaveFileIncremental(t *testing.T) {
tempdir := rtest.TempDir(t)
repo := &blobCountingRepo{
Repository: repository.TestRepository(t),
archiverRepo: repository.TestRepository(t),
saved: make(map[restic.BlobHandle]uint),
}
@ -902,7 +902,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
tempdir := rtest.TempDir(t)
repo := &blobCountingRepo{
Repository: repository.TestRepository(t),
archiverRepo: repository.TestRepository(t),
saved: make(map[restic.BlobHandle]uint),
}
@ -2017,7 +2017,7 @@ func (m *TrackFS) OpenFile(name string, flag int, perm os.FileMode) (fs.File, er
}
type failSaveRepo struct {
restic.Repository
archiverRepo
failAfter int32
cnt int32
err error
@ -2029,7 +2029,7 @@ func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []by
return restic.Hash(buf), false, 0, f.err
}
return f.Repository.SaveBlob(ctx, t, buf, id, storeDuplicate)
return f.archiverRepo.SaveBlob(ctx, t, buf, id, storeDuplicate)
}
func TestArchiverAbortEarlyOnError(t *testing.T) {
@ -2105,7 +2105,7 @@ func TestArchiverAbortEarlyOnError(t *testing.T) {
}
testRepo := &failSaveRepo{
Repository: repo,
archiverRepo: repo,
failAfter: int32(test.failAfter),
err: test.err,
}
@ -2134,7 +2134,7 @@ func TestArchiverAbortEarlyOnError(t *testing.T) {
}
}
func snapshot(t testing.TB, repo restic.Repository, fs fs.FS, parent *restic.Snapshot, filename string) (*restic.Snapshot, *restic.Node) {
func snapshot(t testing.TB, repo archiverRepo, fs fs.FS, parent *restic.Snapshot, filename string) (*restic.Snapshot, *restic.Node) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View file

@ -46,7 +46,7 @@ func wrapFileInfo(fi os.FileInfo) os.FileInfo {
return res
}
func statAndSnapshot(t *testing.T, repo restic.Repository, name string) (*restic.Node, *restic.Node) {
func statAndSnapshot(t *testing.T, repo archiverRepo, name string) (*restic.Node, *restic.Node) {
fi := lstat(t, name)
want, err := restic.NodeFromFileInfo(name, fi, false)
rtest.OK(t, err)

View file

@ -10,7 +10,6 @@ import (
"testing"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/index"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"golang.org/x/sync/errgroup"
@ -19,7 +18,6 @@ import (
var errTest = errors.New("test error")
type saveFail struct {
idx restic.MasterIndex
cnt int32
failAt int32
}
@ -33,18 +31,12 @@ func (b *saveFail) SaveBlob(_ context.Context, _ restic.BlobType, _ []byte, id r
return id, false, 0, nil
}
func (b *saveFail) Index() restic.MasterIndex {
return b.idx
}
func TestBlobSaver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg, ctx := errgroup.WithContext(ctx)
saver := &saveFail{
idx: index.NewMasterIndex(),
}
saver := &saveFail{}
b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU()))
@ -100,7 +92,6 @@ func TestBlobSaverError(t *testing.T) {
wg, ctx := errgroup.WithContext(ctx)
saver := &saveFail{
idx: index.NewMasterIndex(),
failAt: int32(test.failAt),
}

View file

@ -11,7 +11,6 @@ import (
"testing"
"time"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
@ -26,7 +25,7 @@ func TestSnapshot(t testing.TB, repo restic.Repository, path string, parent *res
Tags: []string{"test"},
}
if parent != nil {
sn, err := restic.LoadSnapshot(context.TODO(), arch.Repo, *parent)
sn, err := restic.LoadSnapshot(context.TODO(), repo, *parent)
if err != nil {
t.Fatal(err)
}
@ -239,7 +238,7 @@ func TestEnsureFileContent(ctx context.Context, t testing.TB, repo restic.BlobLo
return
}
content := make([]byte, crypto.CiphertextLength(len(file.Content)))
content := make([]byte, len(file.Content))
pos := 0
for _, id := range node.Content {
part, err := repo.LoadBlob(ctx, restic.DataBlob, id, content[pos:])

View file

@ -64,7 +64,7 @@ func TestRoundtrip(t *testing.T) {
}))
defer srv.Close()
rt := newWatchdogRoundtripper(http.DefaultTransport, 50*time.Millisecond, 2)
rt := newWatchdogRoundtripper(http.DefaultTransport, 100*time.Millisecond, 2)
req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(newSlowReader(bytes.NewReader(msg), time.Duration(delay)*time.Millisecond)))
rtest.OK(t, err)

View file

@ -91,9 +91,9 @@ func (c *Checker) LoadSnapshots(ctx context.Context) error {
return err
}
func computePackTypes(ctx context.Context, idx restic.MasterIndex) (map[restic.ID]restic.BlobType, error) {
func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) {
packs := make(map[restic.ID]restic.BlobType)
err := idx.Each(ctx, func(pb restic.PackedBlob) {
err := idx.ListBlobs(ctx, func(pb restic.PackedBlob) {
tpe, exists := packs[pb.PackID]
if exists {
if pb.Type != tpe {
@ -111,33 +111,10 @@ func computePackTypes(ctx context.Context, idx restic.MasterIndex) (map[restic.I
func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []error, errs []error) {
debug.Log("Start")
indexList, err := restic.MemorizeList(ctx, c.repo, restic.IndexFile)
if err != nil {
// abort if an error occurs while listing the indexes
return hints, append(errs, err)
}
if p != nil {
var numIndexFiles uint64
err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error {
numIndexFiles++
return nil
})
if err != nil {
return hints, append(errs, err)
}
p.SetMax(numIndexFiles)
defer p.Done()
}
packToIndex := make(map[restic.ID]restic.IDSet)
err = index.ForAllIndexes(ctx, indexList, c.repo, func(id restic.ID, index *index.Index, oldFormat bool, err error) error {
err := c.masterIndex.Load(ctx, c.repo, p, func(id restic.ID, idx *index.Index, oldFormat bool, err error) error {
debug.Log("process index %v, err %v", id, err)
if p != nil {
p.Add(1)
}
if oldFormat {
debug.Log("index %v has old format", id)
hints = append(hints, &ErrOldIndexFormat{id})
@ -150,11 +127,9 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
return nil
}
c.masterIndex.Insert(index)
debug.Log("process blobs")
cnt := 0
err = index.Each(ctx, func(blob restic.PackedBlob) {
err = idx.Each(ctx, func(blob restic.PackedBlob) {
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
@ -167,22 +142,22 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
return err
})
if err != nil {
// failed to load the index
return hints, append(errs, err)
}
err = c.repo.SetIndex(c.masterIndex)
if err != nil {
debug.Log("SetIndex returned error: %v", err)
errs = append(errs, err)
}
// Merge index before computing pack sizes, as this needs removed duplicates
err = c.masterIndex.MergeFinalIndexes()
if err != nil {
// abort if an error occurs merging the indexes
return hints, append(errs, err)
}
// compute pack size using index entries
c.packs, err = pack.Size(ctx, c.masterIndex, false)
c.packs, err = pack.Size(ctx, c.repo, false)
if err != nil {
return hints, append(errs, err)
}
packTypes, err := computePackTypes(ctx, c.masterIndex)
packTypes, err := computePackTypes(ctx, c.repo)
if err != nil {
return hints, append(errs, err)
}
@ -203,12 +178,6 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
}
}
err = c.repo.SetIndex(c.masterIndex)
if err != nil {
debug.Log("SetIndex returned error: %v", err)
errs = append(errs, err)
}
return hints, errs
}
@ -429,7 +398,7 @@ func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) {
// unfortunately fails in some cases that are not resolvable
// by users, so we omit this check, see #1887
_, found := c.repo.LookupBlobSize(blobID, restic.DataBlob)
_, found := c.repo.LookupBlobSize(restic.DataBlob, blobID)
if !found {
debug.Log("tree %v references blob %v which isn't contained in index", id, blobID)
errs = append(errs, &Error{TreeID: id, Err: errors.Errorf("file %q blob %v not found in index", node.Name, blobID)})
@ -488,7 +457,7 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, er
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err = c.repo.Index().Each(ctx, func(blob restic.PackedBlob) {
err = c.repo.ListBlobs(ctx, func(blob restic.PackedBlob) {
h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
if !c.blobRefs.M.Has(h) {
debug.Log("blob %v not referenced", h)
@ -573,7 +542,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
}
// push packs to ch
for pbs := range c.repo.Index().ListPacks(ctx, packSet) {
for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) {
size := packs[pbs.PackID]
debug.Log("listed %v", pbs.PackID)
select {

View file

@ -461,11 +461,11 @@ func (r *delayRepository) LoadTree(ctx context.Context, id restic.ID) (*restic.T
return restic.LoadTree(ctx, r.Repository, id)
}
func (r *delayRepository) LookupBlobSize(id restic.ID, t restic.BlobType) (uint, bool) {
func (r *delayRepository) LookupBlobSize(t restic.BlobType, id restic.ID) (uint, bool) {
if id == r.DelayTree && t == restic.DataBlob {
r.Unblock()
}
return r.Repository.LookupBlobSize(id, t)
return r.Repository.LookupBlobSize(t, id)
}
func (r *delayRepository) Unblock() {

View file

@ -72,7 +72,7 @@ func (f *file) Open(_ context.Context, _ *fuse.OpenRequest, _ *fuse.OpenResponse
var bytes uint64
cumsize := make([]uint64, 1+len(f.node.Content))
for i, id := range f.node.Content {
size, found := f.root.repo.LookupBlobSize(id, restic.DataBlob)
size, found := f.root.repo.LookupBlobSize(restic.DataBlob, id)
if !found {
return nil, errors.Errorf("id %v not found in repository", id)
}

View file

@ -89,7 +89,7 @@ func TestFuseFile(t *testing.T) {
memfile []byte
)
for _, id := range content {
size, found := repo.LookupBlobSize(id, restic.DataBlob)
size, found := repo.LookupBlobSize(restic.DataBlob, id)
rtest.Assert(t, found, "Expected to find blob id %v", id)
filesize += uint64(size)

View file

@ -1,6 +1,7 @@
package index
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -52,7 +53,6 @@ type Index struct {
final bool // set to true for all indexes read from the backend ("finalized")
ids restic.IDs // set to the IDs of the contained finalized indexes
supersedes restic.IDs
created time.Time
}
@ -197,25 +197,6 @@ func (idx *Index) LookupSize(bh restic.BlobHandle) (plaintextLength uint, found
return uint(crypto.PlaintextLength(int(e.length))), true
}
// Supersedes returns the list of indexes this index supersedes, if any.
func (idx *Index) Supersedes() restic.IDs {
return idx.supersedes
}
// AddToSupersedes adds the ids to the list of indexes superseded by this
// index. If the index has already been finalized, an error is returned.
func (idx *Index) AddToSupersedes(ids ...restic.ID) error {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
return errors.New("index already finalized")
}
idx.supersedes = append(idx.supersedes, ids...)
return nil
}
// Each passes all blobs known to the index to the callback fn. This blocks any
// modification of the index.
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
@ -356,7 +337,7 @@ func (idx *Index) generatePackList() ([]packJSON, error) {
}
type jsonIndex struct {
Supersedes restic.IDs `json:"supersedes,omitempty"`
// removed: Supersedes restic.IDs `json:"supersedes,omitempty"`
Packs []packJSON `json:"packs"`
}
@ -373,12 +354,29 @@ func (idx *Index) Encode(w io.Writer) error {
enc := json.NewEncoder(w)
idxJSON := jsonIndex{
Supersedes: idx.supersedes,
Packs: list,
}
return enc.Encode(idxJSON)
}
// SaveIndex saves an index in the repository.
func (idx *Index) SaveIndex(ctx context.Context, repo restic.SaverUnpacked) (restic.ID, error) {
buf := bytes.NewBuffer(nil)
err := idx.Encode(buf)
if err != nil {
return restic.ID{}, err
}
id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes())
ierr := idx.SetID(id)
if ierr != nil {
// logic bug
panic(ierr)
}
return id, err
}
// Finalize sets the index to final.
func (idx *Index) Finalize() {
debug.Log("finalizing index")
@ -433,7 +431,6 @@ func (idx *Index) Dump(w io.Writer) error {
}
outer := jsonIndex{
Supersedes: idx.Supersedes(),
Packs: list,
}
@ -495,7 +492,6 @@ func (idx *Index) merge(idx2 *Index) error {
}
idx.ids = append(idx.ids, idx2.ids...)
idx.supersedes = append(idx.supersedes, idx2.supersedes...)
return nil
}
@ -545,7 +541,6 @@ func DecodeIndex(buf []byte, id restic.ID) (idx *Index, oldFormat bool, err erro
})
}
}
idx.supersedes = idxJSON.Supersedes
idx.ids = append(idx.ids, id)
idx.final = true

View file

@ -11,7 +11,7 @@ import (
// ForAllIndexes loads all index files in parallel and calls the given callback.
// It is guaranteed that the function is not run concurrently. If the callback
// returns an error, this function is cancelled and also returns that error.
func ForAllIndexes(ctx context.Context, lister restic.Lister, repo restic.ListerLoaderUnpacked,
func ForAllIndexes(ctx context.Context, lister restic.Lister, repo restic.LoaderUnpacked,
fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error {
// decoding an index can take quite some time such that this can be both CPU- or IO-bound

View file

@ -309,8 +309,6 @@ func TestIndexUnserialize(t *testing.T) {
{docExampleV1, 1},
{docExampleV2, 2},
} {
oldIdx := restic.IDs{restic.TestParseID("ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452")}
idx, oldFormat, err := index.DecodeIndex(task.idxBytes, restic.NewRandomID())
rtest.OK(t, err)
rtest.Assert(t, !oldFormat, "new index format recognized as old format")
@ -337,8 +335,6 @@ func TestIndexUnserialize(t *testing.T) {
}
}
rtest.Equals(t, oldIdx, idx.Supersedes())
blobs := listPack(t, idx, exampleLookupTest.packID)
if len(blobs) != len(exampleLookupTest.blobs) {
t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs))
@ -446,8 +442,6 @@ func TestIndexUnserializeOld(t *testing.T) {
rtest.Equals(t, test.offset, blob.Offset)
rtest.Equals(t, test.length, blob.Length)
}
rtest.Equals(t, 0, len(idx.Supersedes()))
}
func TestIndexPacks(t *testing.T) {

View file

@ -1,7 +1,6 @@
package index
import (
"bytes"
"context"
"fmt"
"runtime"
@ -9,6 +8,7 @@ import (
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
@ -22,12 +22,15 @@ type MasterIndex struct {
// NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex {
mi := &MasterIndex{pendingBlobs: restic.NewBlobSet()}
mi.clear()
return mi
}
func (mi *MasterIndex) clear() {
// Always add an empty final index, such that MergeFinalIndexes can merge into this.
// Note that removing this index could lead to a race condition in the rare
// situation that only two indexes exist which are saved and merged concurrently.
idx := []*Index{NewIndex()}
idx[0].Finalize()
return &MasterIndex{idx: idx, pendingBlobs: restic.NewBlobSet()}
mi.idx = []*Index{NewIndex()}
mi.idx[0].Finalize()
}
func (mi *MasterIndex) MarkCompressed() {
@ -267,11 +270,236 @@ func (mi *MasterIndex) MergeFinalIndexes() error {
return nil
}
// Save saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist from finalized indexes.
// It also removes the old index files and those listed in extraObsolete.
func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, extraObsolete restic.IDs, opts restic.MasterIndexSaveOpts) error {
func (mi *MasterIndex) Load(ctx context.Context, r restic.ListerLoaderUnpacked, p *progress.Counter, cb func(id restic.ID, idx *Index, oldFormat bool, err error) error) error {
indexList, err := restic.MemorizeList(ctx, r, restic.IndexFile)
if err != nil {
return err
}
if p != nil {
var numIndexFiles uint64
err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error {
numIndexFiles++
return nil
})
if err != nil {
return err
}
p.SetMax(numIndexFiles)
defer p.Done()
}
err = ForAllIndexes(ctx, indexList, r, func(id restic.ID, idx *Index, oldFormat bool, err error) error {
if p != nil {
p.Add(1)
}
if cb != nil {
err = cb(id, idx, oldFormat, err)
}
if err != nil {
return err
}
// special case to allow check to ignore index loading errors
if idx == nil {
return nil
}
mi.Insert(idx)
return nil
})
if err != nil {
return err
}
return mi.MergeFinalIndexes()
}
type MasterIndexRewriteOpts struct {
SaveProgress *progress.Counter
DeleteProgress func() *progress.Counter
DeleteReport func(id restic.ID, err error)
}
// Rewrite removes packs whose ID is in excludePacks from all known indexes.
// It also removes the rewritten index files and those listed in extraObsolete.
// If oldIndexes is not nil, then only the indexes in this set are processed.
// This is used by repair index to only rewrite and delete the old indexes.
//
// Must not be called concurrently to any other MasterIndex operation.
func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error {
for _, idx := range mi.idx {
if !idx.Final() {
panic("internal error - index must be saved before calling MasterIndex.Rewrite")
}
}
var indexes restic.IDSet
if oldIndexes != nil {
// repair index adds new index entries for already existing pack files
// only remove the old (possibly broken) entries by only processing old indexes
indexes = oldIndexes
} else {
indexes = mi.IDs()
}
p := opts.SaveProgress
p.SetMax(uint64(len(indexes)))
// reset state which is not necessary for Rewrite and just consumes a lot of memory
// the index state would be invalid after Rewrite completes anyways
mi.clear()
runtime.GC()
// copy excludePacks to prevent unintended sideeffects
excludePacks = excludePacks.Clone()
debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(indexes), excludePacks)
wg, wgCtx := errgroup.WithContext(ctx)
idxCh := make(chan restic.ID)
wg.Go(func() error {
defer close(idxCh)
for id := range indexes {
select {
case idxCh <- id:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
})
var rewriteWg sync.WaitGroup
type rewriteTask struct {
idx *Index
oldFormat bool
}
rewriteCh := make(chan rewriteTask)
loader := func() error {
defer rewriteWg.Done()
for id := range idxCh {
buf, err := repo.LoadUnpacked(wgCtx, restic.IndexFile, id)
if err != nil {
return fmt.Errorf("LoadUnpacked(%v): %w", id.Str(), err)
}
idx, oldFormat, err := DecodeIndex(buf, id)
if err != nil {
return err
}
select {
case rewriteCh <- rewriteTask{idx, oldFormat}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
}
// loading an index can take quite some time such that this is probably CPU-bound
// the index files are probably already cached at this point
loaderCount := runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < loaderCount; i++ {
rewriteWg.Add(1)
wg.Go(loader)
}
wg.Go(func() error {
rewriteWg.Wait()
close(rewriteCh)
return nil
})
obsolete := restic.NewIDSet(extraObsolete...)
saveCh := make(chan *Index)
wg.Go(func() error {
defer close(saveCh)
newIndex := NewIndex()
for task := range rewriteCh {
// always rewrite indexes using the old format, that include a pack that must be removed or that are not full
if !task.oldFormat && len(task.idx.Packs().Intersect(excludePacks)) == 0 && IndexFull(task.idx, mi.compress) {
// make sure that each pack is only stored exactly once in the index
excludePacks.Merge(task.idx.Packs())
// index is already up to date
p.Add(1)
continue
}
ids, err := task.idx.IDs()
if err != nil || len(ids) != 1 {
panic("internal error, index has no ID")
}
obsolete.Merge(restic.NewIDSet(ids...))
for pbs := range task.idx.EachByPack(wgCtx, excludePacks) {
newIndex.StorePack(pbs.PackID, pbs.Blobs)
if IndexFull(newIndex, mi.compress) {
select {
case saveCh <- newIndex:
case <-wgCtx.Done():
return wgCtx.Err()
}
newIndex = NewIndex()
}
}
if wgCtx.Err() != nil {
return wgCtx.Err()
}
// make sure that each pack is only stored exactly once in the index
excludePacks.Merge(task.idx.Packs())
p.Add(1)
}
select {
case saveCh <- newIndex:
case <-wgCtx.Done():
}
return nil
})
// a worker receives an index from ch, and saves the index
worker := func() error {
for idx := range saveCh {
idx.Finalize()
if _, err := idx.SaveIndex(wgCtx, repo); err != nil {
return err
}
}
return nil
}
// encoding an index can take quite some time such that this can be CPU- or IO-bound
// do not add repo.Connections() here as there are already the loader goroutines.
workerCount := runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
err := wg.Wait()
p.Done()
if err != nil {
return fmt.Errorf("failed to rewrite indexes: %w", err)
}
p = nil
if opts.DeleteProgress != nil {
p = opts.DeleteProgress()
}
defer p.Done()
return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error {
if opts.DeleteReport != nil {
opts.DeleteReport(id, err)
}
return err
}, p)
}
// SaveFallback saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist from finalized indexes.
// It is only intended for use by prune with the UnsafeRecovery option.
//
// Must not be called concurrently to any other MasterIndex operation.
func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, p *progress.Counter) error {
p.SetMax(uint64(len(mi.Packs(excludePacks))))
mi.idxMutex.Lock()
@ -279,38 +507,23 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke
debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(mi.idx), excludePacks)
newIndex := NewIndex()
obsolete := restic.NewIDSet()
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, wgCtx := errgroup.WithContext(ctx)
ch := make(chan *Index)
wg.Go(func() error {
defer close(ch)
for i, idx := range mi.idx {
newIndex := NewIndex()
for _, idx := range mi.idx {
if idx.Final() {
ids, err := idx.IDs()
if err != nil {
debug.Log("index %d does not have an ID: %v", err)
return err
panic("internal error - finalized index without ID")
}
debug.Log("adding index ids %v to supersedes field", ids)
err = newIndex.AddToSupersedes(ids...)
if err != nil {
return err
}
obsolete.Merge(restic.NewIDSet(ids...))
} else {
debug.Log("index %d isn't final, don't add to supersedes field", i)
}
debug.Log("adding index %d", i)
for pbs := range idx.EachByPack(wgCtx, excludePacks) {
newIndex.StorePack(pbs.PackID, pbs.Blobs)
p.Add(1)
@ -328,12 +541,6 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke
}
}
err := newIndex.AddToSupersedes(extraObsolete...)
if err != nil {
return err
}
obsolete.Merge(restic.NewIDSet(extraObsolete...))
select {
case ch <- newIndex:
case <-wgCtx.Done():
@ -345,66 +552,33 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke
worker := func() error {
for idx := range ch {
idx.Finalize()
if _, err := SaveIndex(wgCtx, repo, idx); err != nil {
if _, err := idx.SaveIndex(wgCtx, repo); err != nil {
return err
}
}
return nil
}
// encoding an index can take quite some time such that this can be both CPU- or IO-bound
workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0)
// keep concurrency bounded as we're on a fallback path
workerCount := int(repo.Connections())
// run workers on ch
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
err := wg.Wait()
p.Done()
if err != nil {
// the index no longer matches to stored state
mi.clear()
return err
}
if opts.SkipDeletion {
return nil
}
p = nil
if opts.DeleteProgress != nil {
p = opts.DeleteProgress()
}
defer p.Done()
return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error {
if opts.DeleteReport != nil {
opts.DeleteReport(id, err)
}
return err
}, p)
}
// SaveIndex saves an index in the repository.
func SaveIndex(ctx context.Context, repo restic.SaverUnpacked, index *Index) (restic.ID, error) {
buf := bytes.NewBuffer(nil)
err := index.Encode(buf)
if err != nil {
return restic.ID{}, err
}
id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes())
ierr := index.SetID(id)
if ierr != nil {
// logic bug
panic(ierr)
}
return id, err
}
// saveIndex saves all indexes in the backend.
func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, indexes ...*Index) error {
for i, idx := range indexes {
debug.Log("Saving index %d", i)
sid, err := SaveIndex(ctx, r, idx)
sid, err := idx.SaveIndex(ctx, r)
if err != nil {
return err
}

View file

@ -355,46 +355,102 @@ func TestIndexSave(t *testing.T) {
}
func testIndexSave(t *testing.T, version uint) {
for _, test := range []struct {
name string
saver func(idx *index.MasterIndex, repo restic.Repository) error
}{
{"rewrite no-op", func(idx *index.MasterIndex, repo restic.Repository) error {
return idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{})
}},
{"rewrite skip-all", func(idx *index.MasterIndex, repo restic.Repository) error {
return idx.Rewrite(context.TODO(), repo, nil, restic.NewIDSet(), nil, index.MasterIndexRewriteOpts{})
}},
{"SaveFallback", func(idx *index.MasterIndex, repo restic.Repository) error {
err := restic.ParallelRemove(context.TODO(), repo, idx.IDs(), restic.IndexFile, nil, nil)
if err != nil {
return nil
}
return idx.SaveFallback(context.TODO(), repo, restic.NewIDSet(), nil)
}},
} {
t.Run(test.name, func(t *testing.T) {
repo := createFilledRepo(t, 3, version)
err := repo.LoadIndex(context.TODO(), nil)
if err != nil {
t.Fatal(err)
idx := index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
blobs := make(map[restic.PackedBlob]struct{})
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobs[pb] = struct{}{}
}))
rtest.OK(t, test.saver(idx, repo))
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if _, ok := blobs[pb]; ok {
delete(blobs, pb)
} else {
t.Fatalf("unexpected blobs %v", pb)
}
}))
rtest.Equals(t, 0, len(blobs), "saved index is missing blobs")
checker.TestCheckRepo(t, repo, false)
})
}
}
err = repo.Index().Save(context.TODO(), repo, nil, nil, restic.MasterIndexSaveOpts{})
if err != nil {
t.Fatalf("unable to save new index: %v", err)
func TestIndexSavePartial(t *testing.T) {
repository.TestAllVersions(t, testIndexSavePartial)
}
checker := checker.New(repo, false)
err = checker.LoadSnapshots(context.TODO())
if err != nil {
t.Error(err)
func testIndexSavePartial(t *testing.T, version uint) {
repo := createFilledRepo(t, 3, version)
// capture blob list before adding fourth snapshot
idx := index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
blobs := make(map[restic.PackedBlob]struct{})
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobs[pb] = struct{}{}
}))
// add+remove new snapshot and track its pack files
packsBefore := listPacks(t, repo)
sn := restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(4)*time.Second), depth)
rtest.OK(t, repo.RemoveUnpacked(context.TODO(), restic.SnapshotFile, *sn.ID()))
packsAfter := listPacks(t, repo)
newPacks := packsAfter.Sub(packsBefore)
// rewrite index and remove pack files of new snapshot
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
rtest.OK(t, idx.Rewrite(context.TODO(), repo, newPacks, nil, nil, index.MasterIndexRewriteOpts{}))
// check blobs
idx = index.NewMasterIndex()
rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil))
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if _, ok := blobs[pb]; ok {
delete(blobs, pb)
} else {
t.Fatalf("unexpected blobs %v", pb)
}
}))
rtest.Equals(t, 0, len(blobs), "saved index is missing blobs")
// remove pack files to make check happy
rtest.OK(t, restic.ParallelRemove(context.TODO(), repo, newPacks, restic.PackFile, nil, nil))
checker.TestCheckRepo(t, repo, false)
}
hints, errs := checker.LoadIndex(context.TODO(), nil)
for _, h := range hints {
t.Logf("hint: %v\n", h)
}
for _, err := range errs {
t.Errorf("checker found error: %v", err)
}
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
errCh := make(chan error)
go checker.Structure(ctx, nil, errCh)
i := 0
for err := range errCh {
t.Errorf("checker returned error: %v", err)
i++
if i == 10 {
t.Errorf("more than 10 errors returned, skipping the rest")
cancel()
break
}
}
func listPacks(t testing.TB, repo restic.Lister) restic.IDSet {
s := restic.NewIDSet()
rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, _ int64) error {
s.Insert(id)
return nil
}))
return s
}

View file

@ -389,10 +389,10 @@ func CalculateHeaderSize(blobs []restic.Blob) int {
// If onlyHdr is set to true, only the size of the header is returned
// Note that this function only gives correct sizes, if there are no
// duplicates in the index.
func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) (map[restic.ID]int64, error) {
func Size(ctx context.Context, mi restic.ListBlobser, onlyHdr bool) (map[restic.ID]int64, error) {
packSize := make(map[restic.ID]int64)
err := mi.Each(ctx, func(blob restic.PackedBlob) {
err := mi.ListBlobs(ctx, func(blob restic.PackedBlob) {
size, ok := packSize[blob.PackID]
if !ok {
size = headerSize

View file

@ -158,11 +158,10 @@ func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs []re
errs = append(errs, errors.Errorf("pack header size does not match, want %v, got %v", idxHdrSize, hdrSize))
}
idx := r.Index()
for _, blob := range blobs {
// Check if blob is contained in index and position is correct
idxHas := false
for _, pb := range idx.Lookup(blob.BlobHandle) {
for _, pb := range r.LookupBlob(blob.BlobHandle.Type, blob.BlobHandle.ID) {
if pb.PackID == id && pb.Blob == blob {
idxHas = true
break

View file

@ -21,8 +21,8 @@ import (
"github.com/minio/sha256-simd"
)
// Packer holds a pack.Packer together with a hash writer.
type Packer struct {
// packer holds a pack.packer together with a hash writer.
type packer struct {
*pack.Packer
tmpfile *os.File
bufWr *bufio.Writer
@ -32,16 +32,16 @@ type Packer struct {
type packerManager struct {
tpe restic.BlobType
key *crypto.Key
queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error
queueFn func(ctx context.Context, t restic.BlobType, p *packer) error
pm sync.Mutex
packer *Packer
packer *packer
packSize uint
}
// newPackerManager returns a new packer manager which writes temporary files
// to a temporary directory
func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager {
func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *packer) error) *packerManager {
return &packerManager{
tpe: tpe,
key: key,
@ -114,7 +114,7 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest
// findPacker returns a packer for a new blob of size bytes. Either a new one is
// created or one is returned that already has some blobs.
func (r *packerManager) newPacker() (packer *Packer, err error) {
func (r *packerManager) newPacker() (pck *packer, err error) {
debug.Log("create new pack")
tmpfile, err := fs.TempFile("", "restic-temp-pack-")
if err != nil {
@ -123,17 +123,17 @@ func (r *packerManager) newPacker() (packer *Packer, err error) {
bufWr := bufio.NewWriter(tmpfile)
p := pack.NewPacker(r.key, bufWr)
packer = &Packer{
pck = &packer{
Packer: p,
tmpfile: tmpfile,
bufWr: bufWr,
}
return packer, nil
return pck, nil
}
// savePacker stores p in the backend.
func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error {
func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *packer) error {
debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size())
err := p.Packer.Finalize()
if err != nil {
@ -200,8 +200,5 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe
r.idx.StorePack(id, p.Packer.Blobs())
// Save index if full
if r.noAutoIndexUpdate {
return nil
}
return r.idx.SaveFullIndex(ctx, r)
}

View file

@ -70,7 +70,7 @@ func testPackerManager(t testing.TB) int64 {
rnd := rand.New(rand.NewSource(randomSeed))
savedBytes := int(0)
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *Packer) error {
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *packer) error {
err := p.Finalize()
if err != nil {
return err
@ -92,7 +92,7 @@ func testPackerManager(t testing.TB) int64 {
func TestPackerManagerWithOversizeBlob(t *testing.T) {
packFiles := int(0)
sizeLimit := uint(512 * 1024)
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, func(ctx context.Context, tp restic.BlobType, p *Packer) error {
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, func(ctx context.Context, tp restic.BlobType, p *packer) error {
packFiles++
return nil
})
@ -122,7 +122,7 @@ func BenchmarkPackerManager(t *testing.B) {
for i := 0; i < t.N; i++ {
rnd.Seed(randomSeed)
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *Packer) error {
pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *packer) error {
return nil
})
fillPacks(t, rnd, pm, blobBuf)

View file

@ -7,13 +7,13 @@ import (
"golang.org/x/sync/errgroup"
)
// SavePacker implements saving a pack in the repository.
type SavePacker interface {
savePacker(ctx context.Context, t restic.BlobType, p *Packer) error
// savePacker implements saving a pack in the repository.
type savePacker interface {
savePacker(ctx context.Context, t restic.BlobType, p *packer) error
}
type uploadTask struct {
packer *Packer
packer *packer
tpe restic.BlobType
}
@ -21,7 +21,7 @@ type packerUploader struct {
uploadQueue chan uploadTask
}
func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, connections uint) *packerUploader {
func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo savePacker, connections uint) *packerUploader {
pu := &packerUploader{
uploadQueue: make(chan uploadTask),
}
@ -48,7 +48,7 @@ func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker,
return pu
}
func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *Packer) (err error) {
func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *packer) (err error) {
select {
case <-ctx.Done():
return ctx.Err()

View file

@ -7,7 +7,6 @@ import (
"sort"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/index"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
@ -67,7 +66,7 @@ type PrunePlan struct {
removePacks restic.IDSet // packs to remove
ignorePacks restic.IDSet // packs to ignore when rebuilding the index
repo restic.Repository
repo *Repository
stats PruneStats
opts PruneOptions
}
@ -75,8 +74,10 @@ type PrunePlan struct {
type packInfo struct {
usedBlobs uint
unusedBlobs uint
duplicateBlobs uint
usedSize uint64
unusedSize uint64
tpe restic.BlobType
uncompressed bool
}
@ -89,7 +90,7 @@ type packInfoWithID struct {
// PlanPrune selects which files to rewrite and which to delete and which blobs to keep.
// Also some summary statistics are returned.
func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error), printer progress.Printer) (*PrunePlan, error) {
func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error), printer progress.Printer) (*PrunePlan, error) {
var stats PruneStats
if opts.UnsafeRecovery {
@ -109,7 +110,7 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g
}
printer.P("searching used packs...\n")
keepBlobs, indexPack, err := packInfoFromIndex(ctx, repo.Index(), usedBlobs, &stats, printer)
keepBlobs, indexPack, err := packInfoFromIndex(ctx, repo, usedBlobs, &stats, printer)
if err != nil {
return nil, err
}
@ -124,7 +125,7 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g
blobCount := keepBlobs.Len()
// when repacking, we do not want to keep blobs which are
// already contained in kept packs, so delete them from keepBlobs
err := repo.Index().Each(ctx, func(blob restic.PackedBlob) {
err := repo.ListBlobs(ctx, func(blob restic.PackedBlob) {
if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) {
return
}
@ -151,11 +152,11 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g
return &plan, nil
}
func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.CountedBlobSet, stats *PruneStats, printer progress.Printer) (restic.CountedBlobSet, map[restic.ID]packInfo, error) {
func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs restic.CountedBlobSet, stats *PruneStats, printer progress.Printer) (restic.CountedBlobSet, map[restic.ID]packInfo, error) {
// iterate over all blobs in index to find out which blobs are duplicates
// The counter in usedBlobs describes how many instances of the blob exist in the repository index
// Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist
err := idx.Each(ctx, func(blob restic.PackedBlob) {
err := idx.ListBlobs(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle
count, ok := usedBlobs[bh]
if ok {
@ -205,7 +206,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
hasDuplicates := false
// iterate over all blobs in index to generate packInfo
err = idx.Each(ctx, func(blob restic.PackedBlob) {
err = idx.ListBlobs(ctx, func(blob restic.PackedBlob) {
ip := indexPack[blob.PackID]
// Set blob type if not yet set
@ -227,6 +228,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// mark as unused for now, we will later on select one copy
ip.unusedSize += size
ip.unusedBlobs++
ip.duplicateBlobs++
// count as duplicate, will later on change one copy to be counted as used
stats.Size.Duplicate += size
@ -257,10 +259,12 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// if duplicate blobs exist, those will be set to either "used" or "unused":
// - mark only one occurrence of duplicate blobs as used
// - if there are already some used blobs in a pack, possibly mark duplicates in this pack as "used"
// - if a pack only consists of duplicates (which by definition are used blobs), mark it as "used". This
// ensures that already rewritten packs are kept.
// - if there are no used blobs in a pack, possibly mark duplicates as "unused"
if hasDuplicates {
// iterate again over all blobs in index (this is pretty cheap, all in-mem)
err = idx.Each(ctx, func(blob restic.PackedBlob) {
err = idx.ListBlobs(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle
count, ok := usedBlobs[bh]
// skip non-duplicate, aka. normal blobs
@ -272,8 +276,10 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
ip := indexPack[blob.PackID]
size := uint64(blob.Length)
switch {
case ip.usedBlobs > 0, count == 0:
// other used blobs in pack or "last" occurrence -> transition to used
case ip.usedBlobs > 0, (ip.duplicateBlobs == ip.unusedBlobs), count == 0:
// other used blobs in pack, only duplicate blobs or "last" occurrence -> transition to used
// a pack file created by an interrupted prune run will consist of only duplicate blobs
// thus select such already repacked pack files
ip.usedSize += size
ip.usedBlobs++
ip.unusedSize -= size
@ -314,7 +320,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
return usedBlobs, indexPack, nil
}
func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Repository, indexPack map[restic.ID]packInfo, stats *PruneStats, printer progress.Printer) (PrunePlan, error) {
func decidePackAction(ctx context.Context, opts PruneOptions, repo *Repository, indexPack map[restic.ID]packInfo, stats *PruneStats, printer progress.Printer) (PrunePlan, error) {
removePacksFirst := restic.NewIDSet()
removePacks := restic.NewIDSet()
repackPacks := restic.NewIDSet()
@ -323,10 +329,10 @@ func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Reposi
var repackSmallCandidates []packInfoWithID
repoVersion := repo.Config().Version
// only repack very small files by default
targetPackSize := repo.PackSize() / 25
targetPackSize := repo.packSize() / 25
if opts.RepackSmall {
// consider files with at least 80% of the target size as large enough
targetPackSize = repo.PackSize() / 5 * 4
targetPackSize = repo.packSize() / 5 * 4
}
// loop over all packs and decide what to do
@ -523,7 +529,7 @@ func (plan *PrunePlan) Stats() PruneStats {
// - rebuild the index while ignoring all files that will be deleted
// - delete the files
// plan.removePacks and plan.ignorePacks are modified in this function.
func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (err error) {
func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) error {
if plan.opts.DryRun {
printer.V("Repeated prune dry-runs can report slightly different amounts of data to keep or repack. This is expected behavior.\n\n")
if len(plan.removePacksFirst) > 0 {
@ -581,13 +587,13 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e
if plan.opts.UnsafeRecovery {
printer.P("deleting index files\n")
indexFiles := repo.Index().(*index.MasterIndex).IDs()
err = deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer)
indexFiles := repo.idx.IDs()
err := deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer)
if err != nil {
return errors.Fatalf("%s", err)
}
} else if len(plan.ignorePacks) != 0 {
err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, false, printer)
err := rewriteIndexFiles(ctx, repo, plan.ignorePacks, nil, nil, printer)
if err != nil {
return errors.Fatalf("%s", err)
}
@ -602,18 +608,14 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e
}
if plan.opts.UnsafeRecovery {
err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer)
err := repo.idx.SaveFallback(ctx, repo, plan.ignorePacks, printer.NewCounter("packs processed"))
if err != nil {
return errors.Fatalf("%s", err)
}
}
if err != nil {
return err
}
// drop outdated in-memory index
repo.ClearIndex()
repo.clearIndex()
printer.P("done\n")
return nil

View file

@ -54,7 +54,7 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
downloadQueue := make(chan restic.PackBlobs)
wg.Go(func() error {
defer close(downloadQueue)
for pbs := range repo.Index().ListPacks(wgCtx, packs) {
for pbs := range repo.ListPacksFromIndex(wgCtx, packs) {
var packBlobs []restic.Blob
keepMutex.Lock()
// filter out unnecessary blobs

View file

@ -7,10 +7,10 @@ import (
"time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/index"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
"github.com/restic/restic/internal/ui/progress"
"golang.org/x/sync/errgroup"
)
@ -145,9 +145,8 @@ func listFiles(t *testing.T, repo restic.Lister, tpe backend.FileType) restic.ID
func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSet) restic.IDSet {
packs := restic.NewIDSet()
idx := repo.Index()
for h := range blobs {
list := idx.Lookup(h)
list := repo.LookupBlob(h.Type, h.ID)
if len(list) == 0 {
t.Fatal("Failed to find blob", h.ID.Str(), "with type", h.Type)
}
@ -174,40 +173,12 @@ func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs rest
}
}
func rebuildIndex(t *testing.T, repo restic.Repository) {
err := repo.SetIndex(index.NewMasterIndex())
rtest.OK(t, err)
func rebuildAndReloadIndex(t *testing.T, repo *repository.Repository) {
rtest.OK(t, repository.RepairIndex(context.TODO(), repo, repository.RepairIndexOptions{
ReadAllPacks: true,
}, &progress.NoopPrinter{}))
packs := make(map[restic.ID]int64)
err = repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error {
packs[id] = size
return nil
})
rtest.OK(t, err)
_, err = repo.(*repository.Repository).CreateIndexFromPacks(context.TODO(), packs, nil)
rtest.OK(t, err)
var obsoleteIndexes restic.IDs
err = repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error {
obsoleteIndexes = append(obsoleteIndexes, id)
return nil
})
rtest.OK(t, err)
err = repo.Index().Save(context.TODO(), repo, restic.NewIDSet(), obsoleteIndexes, restic.MasterIndexSaveOpts{})
rtest.OK(t, err)
}
func reloadIndex(t *testing.T, repo restic.Repository) {
err := repo.SetIndex(index.NewMasterIndex())
if err != nil {
t.Fatal(err)
}
if err := repo.LoadIndex(context.TODO(), nil); err != nil {
t.Fatalf("error loading new index: %v", err)
}
rtest.OK(t, repo.LoadIndex(context.TODO(), nil))
}
func TestRepack(t *testing.T) {
@ -242,8 +213,7 @@ func testRepack(t *testing.T, version uint) {
removePacks := findPacksForBlobs(t, repo, removeBlobs)
repack(t, repo, removePacks, keepBlobs)
rebuildIndex(t, repo)
reloadIndex(t, repo)
rebuildAndReloadIndex(t, repo)
packsAfter = listPacks(t, repo)
for id := range removePacks {
@ -252,10 +222,8 @@ func testRepack(t *testing.T, version uint) {
}
}
idx := repo.Index()
for h := range keepBlobs {
list := idx.Lookup(h)
list := repo.LookupBlob(h.Type, h.ID)
if len(list) == 0 {
t.Errorf("unable to find blob %v in repo", h.ID.Str())
continue
@ -274,7 +242,7 @@ func testRepack(t *testing.T, version uint) {
}
for h := range removeBlobs {
if _, found := repo.LookupBlobSize(h.ID, h.Type); found {
if _, found := repo.LookupBlobSize(h.Type, h.ID); found {
t.Errorf("blob %v still contained in the repo", h)
}
}
@ -315,13 +283,10 @@ func testRepackCopy(t *testing.T, version uint) {
if err != nil {
t.Fatal(err)
}
rebuildIndex(t, dstRepo)
reloadIndex(t, dstRepo)
idx := dstRepo.Index()
rebuildAndReloadIndex(t, dstRepo)
for h := range keepBlobs {
list := idx.Lookup(h)
list := dstRepo.LookupBlob(h.Type, h.ID)
if len(list) == 0 {
t.Errorf("unable to find blob %v in repo", h.ID.Str())
continue

View file

@ -28,6 +28,8 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions,
if err != nil {
return err
}
repo.clearIndex()
} else {
printer.P("loading indexes...\n")
mi := index.NewMasterIndex()
@ -54,12 +56,14 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions,
if err != nil {
return err
}
packSizeFromIndex, err = pack.Size(ctx, repo.Index(), false)
packSizeFromIndex, err = pack.Size(ctx, repo, false)
if err != nil {
return err
}
}
oldIndexes := repo.idx.IDs()
printer.P("getting pack files to read...\n")
err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error {
size, ok := packSizeFromIndex[id]
@ -90,7 +94,7 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions,
printer.P("reading pack files\n")
bar := printer.NewCounter("packs")
bar.SetMax(uint64(len(packSizeFromList)))
invalidFiles, err := repo.CreateIndexFromPacks(ctx, packSizeFromList, bar)
invalidFiles, err := repo.createIndexFromPacks(ctx, packSizeFromList, bar)
bar.Done()
if err != nil {
return err
@ -101,21 +105,25 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions,
}
}
err = rebuildIndexFiles(ctx, repo, removePacks, obsoleteIndexes, false, printer)
if err := repo.Flush(ctx); err != nil {
return err
}
err = rewriteIndexFiles(ctx, repo, removePacks, oldIndexes, obsoleteIndexes, printer)
if err != nil {
return err
}
// drop outdated in-memory index
repo.ClearIndex()
repo.clearIndex()
return nil
}
func rebuildIndexFiles(ctx context.Context, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool, printer progress.Printer) error {
func rewriteIndexFiles(ctx context.Context, repo *Repository, removePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, printer progress.Printer) error {
printer.P("rebuilding index\n")
bar := printer.NewCounter("packs processed")
return repo.Index().Save(ctx, repo, removePacks, extraObsolete, restic.MasterIndexSaveOpts{
bar := printer.NewCounter("indexes processed")
return repo.idx.Rewrite(ctx, repo, removePacks, oldIndexes, extraObsolete, index.MasterIndexRewriteOpts{
SaveProgress: bar,
DeleteProgress: func() *progress.Counter {
return printer.NewCounter("old indexes deleted")
@ -127,6 +135,5 @@ func rebuildIndexFiles(ctx context.Context, repo restic.Repository, removePacks
printer.VV("removed index %v\n", id.String())
}
},
SkipDeletion: skipDeletion,
})
}

View file

@ -30,10 +30,6 @@ func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T,
ReadAllPacks: readAllPacks,
}, &progress.NoopPrinter{}))
newIndexes := listIndex(t, repo)
old := indexes.Intersect(newIndexes)
rtest.Assert(t, len(old) == 0, "expected old indexes to be removed, found %v", old)
checker.TestCheckRepo(t, repo, true)
}

View file

@ -10,7 +10,7 @@ import (
"golang.org/x/sync/errgroup"
)
func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet, printer progress.Printer) error {
func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printer progress.Printer) error {
wg, wgCtx := errgroup.WithContext(ctx)
repo.StartPackUploader(wgCtx, wg)
@ -21,7 +21,7 @@ func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet,
wg.Go(func() error {
// examine all data the indexes have for the pack file
for b := range repo.Index().ListPacks(wgCtx, ids) {
for b := range repo.ListPacksFromIndex(wgCtx, ids) {
blobs := b.Blobs
if len(blobs) == 0 {
printer.E("no blobs found for pack %v", b.PackID)
@ -56,7 +56,7 @@ func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet,
}
// remove salvaged packs from index
err = rebuildIndexFiles(ctx, repo, ids, nil, false, printer)
err = rewriteIndexFiles(ctx, repo, ids, nil, nil, printer)
if err != nil {
return err
}

View file

@ -8,7 +8,6 @@ import (
"github.com/restic/restic/internal/backend"
backendtest "github.com/restic/restic/internal/backend/test"
"github.com/restic/restic/internal/index"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
@ -18,7 +17,7 @@ import (
func listBlobs(repo restic.Repository) restic.BlobSet {
blobs := restic.NewBlobSet()
_ = repo.Index().Each(context.TODO(), func(pb restic.PackedBlob) {
_ = repo.ListBlobs(context.TODO(), func(pb restic.PackedBlob) {
blobs.Insert(pb.BlobHandle)
})
return blobs
@ -68,7 +67,7 @@ func testRepairBrokenPack(t *testing.T, version uint) {
// find blob that starts at offset 0
var damagedBlob restic.BlobHandle
for blobs := range repo.Index().ListPacks(context.TODO(), restic.NewIDSet(damagedID)) {
for blobs := range repo.ListPacksFromIndex(context.TODO(), restic.NewIDSet(damagedID)) {
for _, blob := range blobs.Blobs {
if blob.Offset == 0 {
damagedBlob = blob.BlobHandle
@ -91,7 +90,7 @@ func testRepairBrokenPack(t *testing.T, version uint) {
// all blobs in the file are broken
damagedBlobs := restic.NewBlobSet()
for blobs := range repo.Index().ListPacks(context.TODO(), restic.NewIDSet(damagedID)) {
for blobs := range repo.ListPacksFromIndex(context.TODO(), restic.NewIDSet(damagedID)) {
for _, blob := range blobs.Blobs {
damagedBlobs.Insert(blob.BlobHandle)
}
@ -118,7 +117,6 @@ func testRepairBrokenPack(t *testing.T, version uint) {
rtest.OK(t, repository.RepairPacks(context.TODO(), repo, toRepair, &progress.NoopPrinter{}))
// reload index
rtest.OK(t, repo.SetIndex(index.NewMasterIndex()))
rtest.OK(t, repo.LoadIndex(context.TODO(), nil))
packsAfter := listPacks(t, repo)

View file

@ -42,8 +42,6 @@ type Repository struct {
opts Options
noAutoIndexUpdate bool
packerWg *errgroup.Group
uploader *packerUploader
treePM *packerManager
@ -130,12 +128,6 @@ func New(be backend.Backend, opts Options) (*Repository, error) {
return repo, nil
}
// DisableAutoIndexUpdate deactives the automatic finalization and upload of new
// indexes once these are full
func (r *Repository) DisableAutoIndexUpdate() {
r.noAutoIndexUpdate = true
}
// setConfig assigns the given config and updates the repository parameters accordingly
func (r *Repository) setConfig(cfg restic.Config) {
r.cfg = cfg
@ -146,8 +138,8 @@ func (r *Repository) Config() restic.Config {
return r.cfg
}
// PackSize return the target size of a pack file when uploading
func (r *Repository) PackSize() uint {
// packSize return the target size of a pack file when uploading
func (r *Repository) packSize() uint {
return r.opts.PackSize
}
@ -300,11 +292,6 @@ func (r *Repository) loadBlob(ctx context.Context, blobs []restic.PackedBlob, bu
return nil, errors.Errorf("loading %v from %v packs failed", blobs[0].BlobHandle, len(blobs))
}
// LookupBlobSize returns the size of blob id.
func (r *Repository) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bool) {
return r.idx.LookupSize(restic.BlobHandle{ID: id, Type: tpe})
}
func (r *Repository) getZstdEncoder() *zstd.Encoder {
r.allocEnc.Do(func() {
level := zstd.SpeedDefault
@ -531,10 +518,6 @@ func (r *Repository) Flush(ctx context.Context) error {
return err
}
// Save index after flushing only if noAutoIndexUpdate is not set
if r.noAutoIndexUpdate {
return nil
}
return r.idx.SaveIndex(ctx, r)
}
@ -546,8 +529,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group)
innerWg, ctx := errgroup.WithContext(ctx)
r.packerWg = innerWg
r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections())
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.PackSize(), r.uploader.QueuePacker)
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.PackSize(), r.uploader.QueuePacker)
r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker)
r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker)
wg.Go(func() error {
return innerWg.Wait()
@ -583,9 +566,23 @@ func (r *Repository) Connections() uint {
return r.be.Connections()
}
// Index returns the currently used MasterIndex.
func (r *Repository) Index() restic.MasterIndex {
return r.idx
func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.PackedBlob {
return r.idx.Lookup(restic.BlobHandle{Type: tpe, ID: id})
}
// LookupBlobSize returns the size of blob id.
func (r *Repository) LookupBlobSize(tpe restic.BlobType, id restic.ID) (uint, bool) {
return r.idx.LookupSize(restic.BlobHandle{Type: tpe, ID: id})
}
// ListBlobs runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration returns immediately with ctx.Err(). This blocks any modification of the index.
func (r *Repository) ListBlobs(ctx context.Context, fn func(restic.PackedBlob)) error {
return r.idx.Each(ctx, fn)
}
func (r *Repository) ListPacksFromIndex(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs {
return r.idx.ListPacks(ctx, packs)
}
// SetIndex instructs the repository to use the given index.
@ -595,7 +592,7 @@ func (r *Repository) SetIndex(i restic.MasterIndex) error {
return r.prepareCache()
}
func (r *Repository) ClearIndex() {
func (r *Repository) clearIndex() {
r.idx = index.NewMasterIndex()
r.configureIndex()
}
@ -610,43 +607,10 @@ func (r *Repository) configureIndex() {
func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error {
debug.Log("Loading index")
indexList, err := restic.MemorizeList(ctx, r, restic.IndexFile)
if err != nil {
return err
}
if p != nil {
var numIndexFiles uint64
err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error {
numIndexFiles++
return nil
})
if err != nil {
return err
}
p.SetMax(numIndexFiles)
defer p.Done()
}
// reset in-memory index before loading it from the repository
r.ClearIndex()
r.clearIndex()
err = index.ForAllIndexes(ctx, indexList, r, func(_ restic.ID, idx *index.Index, _ bool, err error) error {
if err != nil {
return err
}
r.idx.Insert(idx)
if p != nil {
p.Add(1)
}
return nil
})
if err != nil {
return err
}
err = r.idx.MergeFinalIndexes()
err := r.idx.Load(ctx, r, p, nil)
if err != nil {
return err
}
@ -680,10 +644,10 @@ func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error {
return r.prepareCache()
}
// CreateIndexFromPacks creates a new index by reading all given pack files (with sizes).
// createIndexFromPacks creates a new index by reading all given pack files (with sizes).
// The index is added to the MasterIndex but not marked as finalized.
// Returned is the list of pack files which could not be read.
func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) {
func (r *Repository) createIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) {
var m sync.Mutex
debug.Log("Loading index from pack files")

View file

@ -161,7 +161,7 @@ func TestLoadBlobBroken(t *testing.T) {
data, err := repo.LoadBlob(context.TODO(), restic.TreeBlob, id, nil)
rtest.OK(t, err)
rtest.Assert(t, bytes.Equal(buf, data), "data mismatch")
pack := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID
pack := repo.LookupBlob(restic.TreeBlob, id)[0].PackID
rtest.Assert(t, c.Has(backend.Handle{Type: restic.PackFile, Name: pack.String()}), "expected tree pack to be cached")
}
@ -336,7 +336,7 @@ func benchmarkLoadIndex(b *testing.B, version uint) {
}
idx.Finalize()
id, err := index.SaveIndex(context.TODO(), repo, idx)
id, err := idx.SaveIndex(context.TODO(), repo)
rtest.OK(b, err)
b.Logf("index saved as %v", id.Str())
@ -439,7 +439,7 @@ func TestListPack(t *testing.T) {
repo.UseCache(c)
// Forcibly cache pack file
packID := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID
packID := repo.LookupBlob(restic.TreeBlob, id)[0].PackID
rtest.OK(t, be.Load(context.TODO(), backend.Handle{Type: restic.PackFile, IsMetadata: true, Name: packID.String()}, 0, 0, func(rd io.Reader) error { return nil }))
// Get size to list pack

View file

@ -11,7 +11,7 @@ import (
// Loader loads a blob from a repository.
type Loader interface {
LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error)
LookupBlobSize(id ID, tpe BlobType) (uint, bool)
LookupBlobSize(tpe BlobType, id ID) (uint, bool)
Connections() uint
}

View file

@ -166,7 +166,7 @@ func (r ForbiddenRepo) LoadBlob(context.Context, restic.BlobType, restic.ID, []b
return nil, errors.New("should not be called")
}
func (r ForbiddenRepo) LookupBlobSize(_ restic.ID, _ restic.BlobType) (uint, bool) {
func (r ForbiddenRepo) LookupBlobSize(_ restic.BlobType, _ restic.ID) (uint, bool) {
return 0, false
}

View file

@ -105,3 +105,9 @@ func (s IDSet) String() string {
str := s.List().String()
return "{" + str[1:len(str)-1] + "}"
}
func (s IDSet) Clone() IDSet {
c := NewIDSet()
c.Merge(s)
return c
}

View file

@ -35,4 +35,7 @@ func TestIDSet(t *testing.T) {
}
rtest.Equals(t, "{1285b303 7bb086db f658198b}", set.String())
copied := set.Clone()
rtest.Equals(t, "{1285b303 7bb086db f658198b}", copied.String())
}

View file

@ -18,17 +18,32 @@ var ErrInvalidData = errors.New("invalid data returned")
type Repository interface {
// Connections returns the maximum number of concurrent backend operations
Connections() uint
Config() Config
Key() *crypto.Key
Index() MasterIndex
LoadIndex(context.Context, *progress.Counter) error
ClearIndex()
SetIndex(MasterIndex) error
LookupBlobSize(ID, BlobType) (uint, bool)
LoadIndex(ctx context.Context, p *progress.Counter) error
SetIndex(mi MasterIndex) error
Config() Config
PackSize() uint
LookupBlob(t BlobType, id ID) []PackedBlob
LookupBlobSize(t BlobType, id ID) (size uint, exists bool)
// ListBlobs runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration returns immediately with ctx.Err(). This blocks any modification of the index.
ListBlobs(ctx context.Context, fn func(PackedBlob)) error
ListPacksFromIndex(ctx context.Context, packs IDSet) <-chan PackBlobs
// ListPack returns the list of blobs saved in the pack id and the length of
// the pack header.
ListPack(ctx context.Context, id ID, packSize int64) (entries []Blob, hdrSize uint32, err error)
LoadBlob(ctx context.Context, t BlobType, id ID, buf []byte) ([]byte, error)
LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error
// StartPackUploader start goroutines to upload new pack files. The errgroup
// is used to immediately notify about an upload error. Flush() will also return
// that error.
StartPackUploader(ctx context.Context, wg *errgroup.Group)
SaveBlob(ctx context.Context, t BlobType, buf []byte, id ID, storeDuplicate bool) (newID ID, known bool, size int, err error)
Flush(ctx context.Context) error
// List calls the function fn for each file of type t in the repository.
// When an error is returned by fn, processing stops and List() returns the
@ -36,31 +51,15 @@ type Repository interface {
//
// The function fn is called in the same Goroutine List() was called from.
List(ctx context.Context, t FileType, fn func(ID, int64) error) error
// ListPack returns the list of blobs saved in the pack id and the length of
// the pack header.
ListPack(context.Context, ID, int64) ([]Blob, uint32, error)
LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error)
LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error
SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, int, error)
// StartPackUploader start goroutines to upload new pack files. The errgroup
// is used to immediately notify about an upload error. Flush() will also return
// that error.
StartPackUploader(ctx context.Context, wg *errgroup.Group)
Flush(context.Context) error
// LoadUnpacked loads and decrypts the file with the given type and ID.
LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error)
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
// RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots.
RemoveUnpacked(ctx context.Context, t FileType, id ID) error
// LoadRaw reads all data stored in the backend for the file with id and filetype t.
// If the backend returns data that does not match the id, then the buffer is returned
// along with an error that is a restic.ErrInvalidData error.
LoadRaw(ctx context.Context, t FileType, id ID) (data []byte, err error)
// LoadUnpacked loads and decrypts the file with the given type and ID.
LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error)
SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error)
// RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots.
RemoveUnpacked(ctx context.Context, t FileType, id ID) error
}
type FileType = backend.FileType
@ -86,7 +85,7 @@ type LoaderUnpacked interface {
type SaverUnpacked interface {
// Connections returns the maximum number of concurrent backend operations
Connections() uint
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error)
}
// RemoverUnpacked allows removing an unpacked blob
@ -106,24 +105,15 @@ type PackBlobs struct {
Blobs []Blob
}
type MasterIndexSaveOpts struct {
SaveProgress *progress.Counter
DeleteProgress func() *progress.Counter
DeleteReport func(id ID, err error)
SkipDeletion bool
}
// MasterIndex keeps track of the blobs are stored within files.
type MasterIndex interface {
Has(BlobHandle) bool
Lookup(BlobHandle) []PackedBlob
Has(bh BlobHandle) bool
Lookup(bh BlobHandle) []PackedBlob
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration returns immediately with ctx.Err(). This blocks any modification of the index.
Each(ctx context.Context, fn func(PackedBlob)) error
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
Save(ctx context.Context, repo SaverRemoverUnpacked, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error
}
// Lister allows listing files in a backend.
@ -141,3 +131,7 @@ type Unpacked interface {
SaverUnpacked
RemoverUnpacked
}
type ListBlobser interface {
ListBlobs(ctx context.Context, fn func(PackedBlob)) error
}

View file

@ -77,7 +77,7 @@ func filterTrees(ctx context.Context, repo Loader, trees IDs, loaderChan chan<-
continue
}
treeSize, found := repo.LookupBlobSize(nextTreeID.ID, TreeBlob)
treeSize, found := repo.LookupBlobSize(TreeBlob, nextTreeID.ID)
if found && treeSize > 50*1024*1024 {
loadCh = hugeTreeLoaderChan
} else {

View file

@ -48,7 +48,7 @@ type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Bl
// fileRestorer restores set of files
type fileRestorer struct {
idx func(restic.BlobHandle) []restic.PackedBlob
idx func(restic.BlobType, restic.ID) []restic.PackedBlob
blobsLoader blobsLoaderFn
workerCount int
@ -64,7 +64,7 @@ type fileRestorer struct {
func newFileRestorer(dst string,
blobsLoader blobsLoaderFn,
idx func(restic.BlobHandle) []restic.PackedBlob,
idx func(restic.BlobType, restic.ID) []restic.PackedBlob,
connections uint,
sparse bool,
progress *restore.Progress) *fileRestorer {
@ -99,7 +99,7 @@ func (r *fileRestorer) forEachBlob(blobIDs []restic.ID, fn func(packID restic.ID
}
for _, blobID := range blobIDs {
packs := r.idx(restic.BlobHandle{ID: blobID, Type: restic.DataBlob})
packs := r.idx(restic.DataBlob, blobID)
if len(packs) == 0 {
return errors.Errorf("Unknown blob %s", blobID.String())
}
@ -227,7 +227,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
}
} else if packsMap, ok := file.blobs.(map[restic.ID][]fileBlobInfo); ok {
for _, blob := range packsMap[pack.id] {
idxPacks := r.idx(restic.BlobHandle{ID: blob.id, Type: restic.DataBlob})
idxPacks := r.idx(restic.DataBlob, blob.id)
for _, idxPack := range idxPacks {
if idxPack.PackID.Equal(pack.id) {
addBlob(idxPack.Blob, blob.offset)

View file

@ -35,8 +35,8 @@ type TestRepo struct {
loader blobsLoaderFn
}
func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob {
packs := i.blobs[bh.ID]
func (i *TestRepo) Lookup(tpe restic.BlobType, id restic.ID) []restic.PackedBlob {
packs := i.blobs[id]
return packs
}

View file

@ -240,7 +240,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
}
idx := NewHardlinkIndex[string]()
filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.Index().Lookup,
filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.LookupBlob,
res.repo.Connections(), res.sparse, res.progress)
filerestorer.Error = res.Error
@ -435,7 +435,7 @@ func (res *Restorer) verifyFile(target string, node *restic.Node, buf []byte) ([
var offset int64
for _, blobID := range node.Content {
length, found := res.repo.LookupBlobSize(blobID, restic.DataBlob)
length, found := res.repo.LookupBlobSize(restic.DataBlob, blobID)
if !found {
return buf, errors.Errorf("Unable to fetch blob %s", blobID)
}