From 80dcfca191386c6d6bba4dc9698edc5d6cfd3e51 Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Tue, 10 Nov 2020 08:16:47 +0100 Subject: [PATCH] check: Check sizes computed from index and pack header --- cmd/restic/cmd_check.go | 6 +- internal/checker/checker.go | 110 ++++++++++++++++++++++++------------ 2 files changed, 78 insertions(+), 38 deletions(-) diff --git a/cmd/restic/cmd_check.go b/cmd/restic/cmd_check.go index edcd3fe46..618b98fe2 100644 --- a/cmd/restic/cmd_check.go +++ b/cmd/restic/cmd_check.go @@ -234,12 +234,12 @@ func runCheck(opts CheckOptions, gopts GlobalOptions, args []string) error { } doReadData := func(bucket, totalBuckets uint) { - packs := restic.IDSet{} - for pack := range chkr.GetPacks() { + packs := make(map[restic.ID]int64) + for pack, size := range chkr.GetPacks() { // If we ever check more than the first byte // of pack, update totalBucketsMax. if (uint(pack[0]) % totalBuckets) == (bucket - 1) { - packs.Insert(pack) + packs[pack] = size } } packCount := uint64(len(packs)) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 6bef98520..271f4c46c 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -22,7 +22,7 @@ import ( // A Checker only tests for internal errors within the data structures of the // repository (e.g. missing blobs), and needs a valid Repository to work on. type Checker struct { - packs restic.IDSet + packs map[restic.ID]int64 blobRefs struct { sync.Mutex // see flags below @@ -44,7 +44,7 @@ const ( // New returns a new checker which runs on repo. func New(repo restic.Repository) *Checker { c := &Checker{ - packs: restic.NewIDSet(), + packs: make(map[restic.ID]int64), masterIndex: repository.NewMasterIndex(), repo: repo, } @@ -82,7 +82,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { // track spawned goroutines using wg, create a new context which is // cancelled as soon as an error occurs. - wg, ctx := errgroup.WithContext(ctx) + wg, wgCtx := errgroup.WithContext(ctx) type FileInfo struct { restic.ID @@ -101,9 +101,9 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { // send list of index files through ch, which is closed afterwards wg.Go(func() error { defer close(ch) - return c.repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { + return c.repo.List(wgCtx, restic.IndexFile, func(id restic.ID, size int64) error { select { - case <-ctx.Done(): + case <-wgCtx.Done(): return nil case ch <- FileInfo{id, size}: } @@ -120,7 +120,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { var idx *repository.Index oldFormat := false - buf, err = c.repo.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, fi.ID) + buf, err = c.repo.LoadAndDecrypt(wgCtx, buf[:0], restic.IndexFile, fi.ID) if err == nil { idx, oldFormat, err = repository.DecodeIndex(buf, fi.ID) } @@ -134,7 +134,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { select { case resultCh <- Result{idx, fi.ID, err}: - case <-ctx.Done(): + case <-wgCtx.Done(): } } return nil @@ -161,8 +161,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { debug.Log("process blobs") cnt := 0 - for blob := range res.Index.Each(ctx) { - c.packs.Insert(blob.PackID) + for blob := range res.Index.Each(wgCtx) { h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} c.blobRefs.M[h] = blobStatusExists cnt++ @@ -183,6 +182,18 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { errs = append(errs, err) } + // Merge index before computing pack sizes, as this needs removed duplicates + c.masterIndex.MergeFinalIndexes() + + // compute pack size using index entries + for blob := range c.masterIndex.Each(ctx) { + size, ok := c.packs[blob.PackID] + if !ok { + size = pack.HeaderSize + } + c.packs[blob.PackID] = size + int64(pack.PackedSizeOfBlob(blob.Length)) + } + debug.Log("checking for duplicate packs") for packID := range c.packs { debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID])) @@ -194,8 +205,6 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { } } - c.masterIndex.MergeFinalIndexes() - err = c.repo.SetIndex(c.masterIndex) if err != nil { debug.Log("SetIndex returned error: %v", err) @@ -235,10 +244,10 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) { debug.Log("checking for %d packs", len(c.packs)) debug.Log("listing repository packs") - repoPacks := restic.NewIDSet() + repoPacks := make(map[restic.ID]int64) err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { - repoPacks.Insert(id) + repoPacks[id] = size return nil }) @@ -246,23 +255,39 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) { errChan <- err } + for id, size := range c.packs { + reposize, ok := repoPacks[id] + // remove from repoPacks so we can find orphaned packs + delete(repoPacks, id) + + // missing: present in c.packs but not in the repo + if !ok { + select { + case <-ctx.Done(): + return + case errChan <- PackError{ID: id, Err: errors.New("does not exist")}: + } + continue + } + + // size not matching: present in c.packs and in the repo, but sizes do not match + if size != reposize { + select { + case <-ctx.Done(): + return + case errChan <- PackError{ID: id, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}: + } + } + } + // orphaned: present in the repo but not in c.packs - for orphanID := range repoPacks.Sub(c.packs) { + for orphanID := range repoPacks { select { case <-ctx.Done(): return case errChan <- PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}: } } - - // missing: present in c.packs but not in the repo - for missingID := range c.packs.Sub(repoPacks) { - select { - case <-ctx.Done(): - return - case errChan <- PackError{ID: missingID, Err: errors.New("does not exist")}: - } - } } // Error is an error that occurred while checking a repository. @@ -695,16 +720,16 @@ func (c *Checker) CountPacks() uint64 { } // GetPacks returns IDSet of packs in the repository -func (c *Checker) GetPacks() restic.IDSet { +func (c *Checker) GetPacks() map[restic.ID]int64 { return c.packs } // checkPack reads a pack and checks the integrity of all blobs. -func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error { +func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error { debug.Log("checking pack %v", id) h := restic.Handle{Type: restic.PackFile, Name: id.String()} - packfile, hash, size, err := repository.DownloadAndHash(ctx, r.Backend(), h) + packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h) if err != nil { return errors.Wrap(err, "checkPack") } @@ -721,6 +746,11 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error { return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str()) } + if realSize != size { + debug.Log("Pack size does not match, want %v, got %v", size, realSize) + return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize) + } + blobs, err := pack.List(r.Key(), packfile, size) if err != nil { return err @@ -728,8 +758,10 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error { var errs []error var buf []byte + sizeFromBlobs := int64(pack.HeaderSize) // pack size computed only from blob information idx := r.Index() for i, blob := range blobs { + sizeFromBlobs += int64(pack.PackedSizeOfBlob(blob.Length)) debug.Log(" check blob %d: %v", i, blob) buf = buf[:cap(buf)] @@ -765,7 +797,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error { continue } - // Check if blob is contained in index + // Check if blob is contained in index and position is correct idxHas := false for _, pb := range idx.Lookup(blob.ID, blob.Type) { if pb.PackID == id { @@ -779,6 +811,11 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error { } } + if sizeFromBlobs != size { + debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs) + errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs)) + } + if len(errs) > 0 { return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs) } @@ -792,29 +829,32 @@ func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { } // ReadPacks loads data from specified packs and checks the integrity. -func (c *Checker) ReadPacks(ctx context.Context, packs restic.IDSet, p *progress.Counter, errChan chan<- error) { +func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) { defer close(errChan) g, ctx := errgroup.WithContext(ctx) - ch := make(chan restic.ID) + type packsize struct { + id restic.ID + size int64 + } + ch := make(chan packsize) // run workers for i := 0; i < defaultParallelism; i++ { g.Go(func() error { for { - var id restic.ID + var ps packsize var ok bool select { case <-ctx.Done(): return nil - case id, ok = <-ch: + case ps, ok = <-ch: if !ok { return nil } } - - err := checkPack(ctx, c.repo, id) + err := checkPack(ctx, c.repo, ps.id, ps.size) p.Add(1) if err == nil { continue @@ -830,9 +870,9 @@ func (c *Checker) ReadPacks(ctx context.Context, packs restic.IDSet, p *progress } // push packs to ch - for pack := range packs { + for pack, size := range packs { select { - case ch <- pack: + case ch <- packsize{id: pack, size: size}: case <-ctx.Done(): } }