check: Check sizes computed from index and pack header

This commit is contained in:
Alexander Weiss 2020-11-10 08:16:47 +01:00
parent 145830005b
commit 80dcfca191
2 changed files with 78 additions and 38 deletions

View file

@ -234,12 +234,12 @@ func runCheck(opts CheckOptions, gopts GlobalOptions, args []string) error {
}
doReadData := func(bucket, totalBuckets uint) {
packs := restic.IDSet{}
for pack := range chkr.GetPacks() {
packs := make(map[restic.ID]int64)
for pack, size := range chkr.GetPacks() {
// If we ever check more than the first byte
// of pack, update totalBucketsMax.
if (uint(pack[0]) % totalBuckets) == (bucket - 1) {
packs.Insert(pack)
packs[pack] = size
}
}
packCount := uint64(len(packs))

View file

@ -22,7 +22,7 @@ import (
// A Checker only tests for internal errors within the data structures of the
// repository (e.g. missing blobs), and needs a valid Repository to work on.
type Checker struct {
packs restic.IDSet
packs map[restic.ID]int64
blobRefs struct {
sync.Mutex
// see flags below
@ -44,7 +44,7 @@ const (
// New returns a new checker which runs on repo.
func New(repo restic.Repository) *Checker {
c := &Checker{
packs: restic.NewIDSet(),
packs: make(map[restic.ID]int64),
masterIndex: repository.NewMasterIndex(),
repo: repo,
}
@ -82,7 +82,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
wg, wgCtx := errgroup.WithContext(ctx)
type FileInfo struct {
restic.ID
@ -101,9 +101,9 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
// send list of index files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return c.repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
return c.repo.List(wgCtx, restic.IndexFile, func(id restic.ID, size int64) error {
select {
case <-ctx.Done():
case <-wgCtx.Done():
return nil
case ch <- FileInfo{id, size}:
}
@ -120,7 +120,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
var idx *repository.Index
oldFormat := false
buf, err = c.repo.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, fi.ID)
buf, err = c.repo.LoadAndDecrypt(wgCtx, buf[:0], restic.IndexFile, fi.ID)
if err == nil {
idx, oldFormat, err = repository.DecodeIndex(buf, fi.ID)
}
@ -134,7 +134,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
select {
case resultCh <- Result{idx, fi.ID, err}:
case <-ctx.Done():
case <-wgCtx.Done():
}
}
return nil
@ -161,8 +161,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
debug.Log("process blobs")
cnt := 0
for blob := range res.Index.Each(ctx) {
c.packs.Insert(blob.PackID)
for blob := range res.Index.Each(wgCtx) {
h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
c.blobRefs.M[h] = blobStatusExists
cnt++
@ -183,6 +182,18 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
errs = append(errs, err)
}
// Merge index before computing pack sizes, as this needs removed duplicates
c.masterIndex.MergeFinalIndexes()
// compute pack size using index entries
for blob := range c.masterIndex.Each(ctx) {
size, ok := c.packs[blob.PackID]
if !ok {
size = pack.HeaderSize
}
c.packs[blob.PackID] = size + int64(pack.PackedSizeOfBlob(blob.Length))
}
debug.Log("checking for duplicate packs")
for packID := range c.packs {
debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID]))
@ -194,8 +205,6 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
}
}
c.masterIndex.MergeFinalIndexes()
err = c.repo.SetIndex(c.masterIndex)
if err != nil {
debug.Log("SetIndex returned error: %v", err)
@ -235,10 +244,10 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
debug.Log("checking for %d packs", len(c.packs))
debug.Log("listing repository packs")
repoPacks := restic.NewIDSet()
repoPacks := make(map[restic.ID]int64)
err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
repoPacks.Insert(id)
repoPacks[id] = size
return nil
})
@ -246,23 +255,39 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
errChan <- err
}
for id, size := range c.packs {
reposize, ok := repoPacks[id]
// remove from repoPacks so we can find orphaned packs
delete(repoPacks, id)
// missing: present in c.packs but not in the repo
if !ok {
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: id, Err: errors.New("does not exist")}:
}
continue
}
// size not matching: present in c.packs and in the repo, but sizes do not match
if size != reposize {
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: id, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}:
}
}
}
// orphaned: present in the repo but not in c.packs
for orphanID := range repoPacks.Sub(c.packs) {
for orphanID := range repoPacks {
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}:
}
}
// missing: present in c.packs but not in the repo
for missingID := range c.packs.Sub(repoPacks) {
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: missingID, Err: errors.New("does not exist")}:
}
}
}
// Error is an error that occurred while checking a repository.
@ -695,16 +720,16 @@ func (c *Checker) CountPacks() uint64 {
}
// GetPacks returns IDSet of packs in the repository
func (c *Checker) GetPacks() restic.IDSet {
func (c *Checker) GetPacks() map[restic.ID]int64 {
return c.packs
}
// checkPack reads a pack and checks the integrity of all blobs.
func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error {
debug.Log("checking pack %v", id)
h := restic.Handle{Type: restic.PackFile, Name: id.String()}
packfile, hash, size, err := repository.DownloadAndHash(ctx, r.Backend(), h)
packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h)
if err != nil {
return errors.Wrap(err, "checkPack")
}
@ -721,6 +746,11 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str())
}
if realSize != size {
debug.Log("Pack size does not match, want %v, got %v", size, realSize)
return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize)
}
blobs, err := pack.List(r.Key(), packfile, size)
if err != nil {
return err
@ -728,8 +758,10 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
var errs []error
var buf []byte
sizeFromBlobs := int64(pack.HeaderSize) // pack size computed only from blob information
idx := r.Index()
for i, blob := range blobs {
sizeFromBlobs += int64(pack.PackedSizeOfBlob(blob.Length))
debug.Log(" check blob %d: %v", i, blob)
buf = buf[:cap(buf)]
@ -765,7 +797,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
continue
}
// Check if blob is contained in index
// Check if blob is contained in index and position is correct
idxHas := false
for _, pb := range idx.Lookup(blob.ID, blob.Type) {
if pb.PackID == id {
@ -779,6 +811,11 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
}
}
if sizeFromBlobs != size {
debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs)
errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs))
}
if len(errs) > 0 {
return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs)
}
@ -792,29 +829,32 @@ func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
}
// ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs restic.IDSet, p *progress.Counter, errChan chan<- error) {
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan)
g, ctx := errgroup.WithContext(ctx)
ch := make(chan restic.ID)
type packsize struct {
id restic.ID
size int64
}
ch := make(chan packsize)
// run workers
for i := 0; i < defaultParallelism; i++ {
g.Go(func() error {
for {
var id restic.ID
var ps packsize
var ok bool
select {
case <-ctx.Done():
return nil
case id, ok = <-ch:
case ps, ok = <-ch:
if !ok {
return nil
}
}
err := checkPack(ctx, c.repo, id)
err := checkPack(ctx, c.repo, ps.id, ps.size)
p.Add(1)
if err == nil {
continue
@ -830,9 +870,9 @@ func (c *Checker) ReadPacks(ctx context.Context, packs restic.IDSet, p *progress
}
// push packs to ch
for pack := range packs {
for pack, size := range packs {
select {
case ch <- pack:
case ch <- packsize{id: pack, size: size}:
case <-ctx.Done():
}
}