let index.Each() and pack.Size() return error on canceled context

This forces a caller to actually check that the function did complete.
This commit is contained in:
Michael Eischer 2024-04-05 22:20:14 +02:00
parent 31624aeffd
commit 940a3159b5
20 changed files with 107 additions and 72 deletions

View file

@ -324,7 +324,11 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args
}
if opts.CheckUnused {
for _, id := range chkr.UnusedBlobs(ctx) {
unused, err := chkr.UnusedBlobs(ctx)
if err != nil {
return err
}
for _, id := range unused {
Verbosef("unused blob %v\n", id)
errorsFound = true
}

View file

@ -439,7 +439,10 @@ func (f *Finder) packsToBlobs(ctx context.Context, packs []string) error {
if err != errAllPacksFound {
// try to resolve unknown pack ids from the index
packIDs = f.indexPacksToBlobs(ctx, packIDs)
packIDs, err = f.indexPacksToBlobs(ctx, packIDs)
if err != nil {
return err
}
}
if len(packIDs) > 0 {
@ -456,13 +459,13 @@ func (f *Finder) packsToBlobs(ctx context.Context, packs []string) error {
return nil
}
func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struct{}) map[string]struct{} {
func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struct{}) (map[string]struct{}, error) {
wctx, cancel := context.WithCancel(ctx)
defer cancel()
// remember which packs were found in the index
indexPackIDs := make(map[string]struct{})
f.repo.Index().Each(wctx, func(pb restic.PackedBlob) {
err := f.repo.Index().Each(wctx, func(pb restic.PackedBlob) {
idStr := pb.PackID.String()
// keep entry in packIDs as Each() returns individual index entries
matchingID := false
@ -481,6 +484,9 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
indexPackIDs[idStr] = struct{}{}
}
})
if err != nil {
return nil, err
}
for id := range indexPackIDs {
delete(packIDs, id)
@ -493,7 +499,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc
}
Warnf("some pack files are missing from the repository, getting their blobs from the repository index: %v\n\n", list)
}
return packIDs
return packIDs, nil
}
func (f *Finder) findObjectPack(id string, t restic.BlobType) {

View file

@ -59,10 +59,9 @@ func runList(ctx context.Context, gopts GlobalOptions, args []string) error {
if err != nil {
return err
}
idx.Each(ctx, func(blobs restic.PackedBlob) {
return idx.Each(ctx, func(blobs restic.PackedBlob) {
Printf("%v %v\n", blobs.Type, blobs.ID)
})
return nil
})
default:
return errors.Fatal("invalid type")

View file

@ -61,13 +61,13 @@ func runRecover(ctx context.Context, gopts GlobalOptions) error {
// tree. If it is not referenced, we have a root tree.
trees := make(map[restic.ID]bool)
repo.Index().Each(ctx, func(blob restic.PackedBlob) {
err = repo.Index().Each(ctx, func(blob restic.PackedBlob) {
if blob.Type == restic.TreeBlob {
trees[blob.Blob.ID] = false
}
})
if ctx.Err() != nil {
return ctx.Err()
if err != nil {
return err
}
Verbosef("load %d trees\n", len(trees))

View file

@ -351,7 +351,10 @@ func statsDebug(ctx context.Context, repo restic.Repository) error {
Warnf("File Type: %v\n%v\n", t, hist)
}
hist := statsDebugBlobs(ctx, repo)
hist, err := statsDebugBlobs(ctx, repo)
if err != nil {
return err
}
for _, t := range []restic.BlobType{restic.DataBlob, restic.TreeBlob} {
Warnf("Blob Type: %v\n%v\n\n", t, hist[t])
}
@ -369,17 +372,17 @@ func statsDebugFileType(ctx context.Context, repo restic.Lister, tpe restic.File
return hist, err
}
func statsDebugBlobs(ctx context.Context, repo restic.Repository) [restic.NumBlobTypes]*sizeHistogram {
func statsDebugBlobs(ctx context.Context, repo restic.Repository) ([restic.NumBlobTypes]*sizeHistogram, error) {
var hist [restic.NumBlobTypes]*sizeHistogram
for i := 0; i < len(hist); i++ {
hist[i] = newSizeHistogram(2 * chunker.MaxSize)
}
repo.Index().Each(ctx, func(pb restic.PackedBlob) {
err := repo.Index().Each(ctx, func(pb restic.PackedBlob) {
hist[pb.Type].Add(uint64(pb.Length))
})
return hist
return hist, err
}
type sizeClass struct {

View file

@ -252,11 +252,11 @@ func listTreePacks(gopts GlobalOptions, t *testing.T) restic.IDSet {
rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet()
r.Index().Each(ctx, func(pb restic.PackedBlob) {
rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID)
}
})
}))
return treePacks
}
@ -280,11 +280,11 @@ func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, rem
rtest.OK(t, r.LoadIndex(ctx, nil))
treePacks := restic.NewIDSet()
r.Index().Each(ctx, func(pb restic.PackedBlob) {
rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) {
if pb.Type == restic.TreeBlob {
treePacks.Insert(pb.PackID)
}
})
}))
// remove all packs containing data blobs
rtest.OK(t, r.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {

View file

@ -106,9 +106,9 @@ func (c *Checker) LoadSnapshots(ctx context.Context) error {
return err
}
func computePackTypes(ctx context.Context, idx restic.MasterIndex) map[restic.ID]restic.BlobType {
func computePackTypes(ctx context.Context, idx restic.MasterIndex) (map[restic.ID]restic.BlobType, error) {
packs := make(map[restic.ID]restic.BlobType)
idx.Each(ctx, func(pb restic.PackedBlob) {
err := idx.Each(ctx, func(pb restic.PackedBlob) {
tpe, exists := packs[pb.PackID]
if exists {
if pb.Type != tpe {
@ -119,7 +119,7 @@ func computePackTypes(ctx context.Context, idx restic.MasterIndex) map[restic.ID
}
packs[pb.PackID] = tpe
})
return packs
return packs, err
}
// LoadIndex loads all index files.
@ -169,7 +169,7 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
debug.Log("process blobs")
cnt := 0
index.Each(ctx, func(blob restic.PackedBlob) {
err = index.Each(ctx, func(blob restic.PackedBlob) {
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
@ -179,7 +179,7 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
})
debug.Log("%d blobs processed", cnt)
return nil
return err
})
if err != nil {
errs = append(errs, err)
@ -193,8 +193,14 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e
}
// compute pack size using index entries
c.packs = pack.Size(ctx, c.masterIndex, false)
packTypes := computePackTypes(ctx, c.masterIndex)
c.packs, err = pack.Size(ctx, c.masterIndex, false)
if err != nil {
return hints, append(errs, err)
}
packTypes, err := computePackTypes(ctx, c.masterIndex)
if err != nil {
return hints, append(errs, err)
}
debug.Log("checking for duplicate packs")
for packID := range c.packs {
@ -484,7 +490,7 @@ func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) {
}
// UnusedBlobs returns all blobs that have never been referenced.
func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, err error) {
if !c.trackUnused {
panic("only works when tracking blob references")
}
@ -495,7 +501,7 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c.repo.Index().Each(ctx, func(blob restic.PackedBlob) {
err = c.repo.Index().Each(ctx, func(blob restic.PackedBlob) {
h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
if !c.blobRefs.M.Has(h) {
debug.Log("blob %v not referenced", h)
@ -503,7 +509,7 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) {
}
})
return blobs
return blobs, err
}
// CountPacks returns the number of packs in the repository.

View file

@ -180,7 +180,8 @@ func TestUnreferencedBlobs(t *testing.T) {
test.OKs(t, checkPacks(chkr))
test.OKs(t, checkStruct(chkr))
blobs := chkr.UnusedBlobs(context.TODO())
blobs, err := chkr.UnusedBlobs(context.TODO())
test.OK(t, err)
sort.Sort(blobs)
test.Equals(t, unusedBlobsBySnapshot, blobs)

View file

@ -43,7 +43,10 @@ func TestCheckRepo(t testing.TB, repo restic.Repository, skipStructure bool) {
}
// unused blobs
blobs := chkr.UnusedBlobs(context.TODO())
blobs, err := chkr.UnusedBlobs(context.TODO())
if err != nil {
t.Error(err)
}
if len(blobs) > 0 {
t.Errorf("unused blobs found: %v", blobs)
}

View file

@ -218,7 +218,7 @@ func (idx *Index) AddToSupersedes(ids ...restic.ID) error {
// Each passes all blobs known to the index to the callback fn. This blocks any
// modification of the index.
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) {
func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
idx.m.Lock()
defer idx.m.Unlock()
@ -232,6 +232,7 @@ func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) {
return true
})
}
return ctx.Err()
}
type EachByPackResult struct {

View file

@ -339,7 +339,7 @@ func TestIndexUnserialize(t *testing.T) {
rtest.Equals(t, oldIdx, idx.Supersedes())
blobs := listPack(idx, exampleLookupTest.packID)
blobs := listPack(t, idx, exampleLookupTest.packID)
if len(blobs) != len(exampleLookupTest.blobs) {
t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs))
}
@ -356,12 +356,12 @@ func TestIndexUnserialize(t *testing.T) {
}
}
func listPack(idx *index.Index, id restic.ID) (pbs []restic.PackedBlob) {
idx.Each(context.TODO(), func(pb restic.PackedBlob) {
func listPack(t testing.TB, idx *index.Index, id restic.ID) (pbs []restic.PackedBlob) {
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if pb.PackID.Equal(id) {
pbs = append(pbs, pb)
}
})
}))
return pbs
}

View file

@ -223,13 +223,16 @@ func (mi *MasterIndex) finalizeFullIndexes() []*Index {
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) {
func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) error {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
idx.Each(ctx, fn)
if err := idx.Each(ctx, fn); err != nil {
return err
}
}
return nil
}
// MergeFinalIndexes merges all final indexes together.
@ -429,10 +432,6 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
defer close(out)
// only resort a part of the index to keep the memory overhead bounded
for i := byte(0); i < 16; i++ {
if ctx.Err() != nil {
return
}
packBlob := make(map[restic.ID][]restic.Blob)
for pack := range packs {
if pack[0]&0xf == i {
@ -442,11 +441,14 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan
if len(packBlob) == 0 {
continue
}
mi.Each(ctx, func(pb restic.PackedBlob) {
err := mi.Each(ctx, func(pb restic.PackedBlob) {
if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i {
packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob)
}
})
if err != nil {
return
}
// pass on packs
for packID, pbs := range packBlob {

View file

@ -166,9 +166,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, 1, idxCount)
blobCount := 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++
})
}))
rtest.Equals(t, 2, blobCount)
blobs := mIdx.Lookup(bhInIdx1)
@ -198,9 +198,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) {
rtest.Equals(t, []restic.PackedBlob{blob2}, blobs)
blobCount = 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
blobCount++
})
}))
rtest.Equals(t, 2, blobCount)
}
@ -319,9 +319,9 @@ func BenchmarkMasterIndexEach(b *testing.B) {
for i := 0; i < b.N; i++ {
entries := 0
mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
rtest.OK(b, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) {
entries++
})
}))
}
}

View file

@ -389,10 +389,10 @@ func CalculateHeaderSize(blobs []restic.Blob) int {
// If onlyHdr is set to true, only the size of the header is returned
// Note that this function only gives correct sizes, if there are no
// duplicates in the index.
func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.ID]int64 {
func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) (map[restic.ID]int64, error) {
packSize := make(map[restic.ID]int64)
mi.Each(ctx, func(blob restic.PackedBlob) {
err := mi.Each(ctx, func(blob restic.PackedBlob) {
size, ok := packSize[blob.PackID]
if !ok {
size = headerSize
@ -403,5 +403,5 @@ func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.I
packSize[blob.PackID] = size + int64(CalculateEntrySize(blob.Blob))
})
return packSize
return packSize, err
}

View file

@ -124,14 +124,14 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g
blobCount := keepBlobs.Len()
// when repacking, we do not want to keep blobs which are
// already contained in kept packs, so delete them from keepBlobs
repo.Index().Each(ctx, func(blob restic.PackedBlob) {
err := repo.Index().Each(ctx, func(blob restic.PackedBlob) {
if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) {
return
}
keepBlobs.Delete(blob.BlobHandle)
})
if ctx.Err() != nil {
return nil, ctx.Err()
if err != nil {
return nil, err
}
if keepBlobs.Len() < blobCount/2 {
@ -155,7 +155,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// iterate over all blobs in index to find out which blobs are duplicates
// The counter in usedBlobs describes how many instances of the blob exist in the repository index
// Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist
idx.Each(ctx, func(blob restic.PackedBlob) {
err := idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle
count, ok := usedBlobs[bh]
if ok {
@ -169,8 +169,8 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
usedBlobs[bh] = count
}
})
if ctx.Err() != nil {
return nil, nil, ctx.Err()
if err != nil {
return nil, nil, err
}
// Check if all used blobs have been found in index
@ -194,14 +194,18 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
indexPack := make(map[restic.ID]packInfo)
// save computed pack header size
for pid, hdrSize := range pack.Size(ctx, idx, true) {
sz, err := pack.Size(ctx, idx, true)
if err != nil {
return nil, nil, err
}
for pid, hdrSize := range sz {
// initialize tpe with NumBlobTypes to indicate it's not set
indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)}
}
hasDuplicates := false
// iterate over all blobs in index to generate packInfo
idx.Each(ctx, func(blob restic.PackedBlob) {
err = idx.Each(ctx, func(blob restic.PackedBlob) {
ip := indexPack[blob.PackID]
// Set blob type if not yet set
@ -246,8 +250,8 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// update indexPack
indexPack[blob.PackID] = ip
})
if ctx.Err() != nil {
return nil, nil, ctx.Err()
if err != nil {
return nil, nil, err
}
// if duplicate blobs exist, those will be set to either "used" or "unused":
@ -256,7 +260,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// - if there are no used blobs in a pack, possibly mark duplicates as "unused"
if hasDuplicates {
// iterate again over all blobs in index (this is pretty cheap, all in-mem)
idx.Each(ctx, func(blob restic.PackedBlob) {
err = idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle
count, ok := usedBlobs[bh]
// skip non-duplicate, aka. normal blobs
@ -294,9 +298,9 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// update indexPack
indexPack[blob.PackID] = ip
})
if err != nil {
return nil, nil, err
}
if ctx.Err() != nil {
return nil, nil, ctx.Err()
}
// Sanity check. If no duplicates exist, all blobs have value 1. After handling

View file

@ -54,7 +54,10 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions,
if err != nil {
return err
}
packSizeFromIndex = pack.Size(ctx, repo.Index(), false)
packSizeFromIndex, err = pack.Size(ctx, repo.Index(), false)
if err != nil {
return err
}
}
printer.P("getting pack files to read...\n")

View file

@ -17,7 +17,7 @@ import (
func listBlobs(repo restic.Repository) restic.BlobSet {
blobs := restic.NewBlobSet()
repo.Index().Each(context.TODO(), func(pb restic.PackedBlob) {
_ = repo.Index().Each(context.TODO(), func(pb restic.PackedBlob) {
blobs.Insert(pb.BlobHandle)
})
return blobs

View file

@ -704,11 +704,14 @@ func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error {
defer cancel()
invalidIndex := false
r.idx.Each(ctx, func(blob restic.PackedBlob) {
err := r.idx.Each(ctx, func(blob restic.PackedBlob) {
if blob.IsCompressed() {
invalidIndex = true
}
})
if err != nil {
return err
}
if invalidIndex {
return errors.New("index uses feature not supported by repository version 1")
}

View file

@ -370,13 +370,13 @@ func testRepositoryIncrementalIndex(t *testing.T, version uint) {
idx, err := loadIndex(context.TODO(), repo, id)
rtest.OK(t, err)
idx.Each(context.TODO(), func(pb restic.PackedBlob) {
rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) {
if _, ok := packEntries[pb.PackID]; !ok {
packEntries[pb.PackID] = make(map[restic.ID]struct{})
}
packEntries[pb.PackID][id] = struct{}{}
})
}))
return nil
})
if err != nil {

View file

@ -103,8 +103,8 @@ type MasterIndex interface {
Lookup(BlobHandle) []PackedBlob
// Each runs fn on all blobs known to the index. When the context is cancelled,
// the index iteration return immediately. This blocks any modification of the index.
Each(ctx context.Context, fn func(PackedBlob))
// the index iteration returns immediately with ctx.Err(). This blocks any modification of the index.
Each(ctx context.Context, fn func(PackedBlob)) error
ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs
Save(ctx context.Context, repo Repository, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error