From 94e863885ce8eb89c1ba0f5735c00f14f7f30927 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 10 May 2024 16:28:23 +0200 Subject: [PATCH] check: move verification of individual pack file to repository --- cmd/restic/cmd_check.go | 3 +- internal/checker/checker.go | 194 +-------------------------------- internal/repository/check.go | 205 +++++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+), 194 deletions(-) create mode 100644 internal/repository/check.go diff --git a/cmd/restic/cmd_check.go b/cmd/restic/cmd_check.go index c44edae7e..671cab0e6 100644 --- a/cmd/restic/cmd_check.go +++ b/cmd/restic/cmd_check.go @@ -15,6 +15,7 @@ import ( "github.com/restic/restic/internal/checker" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui" ) @@ -347,7 +348,7 @@ func runCheck(ctx context.Context, opts CheckOptions, gopts GlobalOptions, args for err := range errChan { errorsFound = true Warnf("%v\n", err) - if err, ok := err.(*checker.ErrPackData); ok { + if err, ok := err.(*repository.ErrPackData); ok { salvagePacks = append(salvagePacks, err.PackID) } } diff --git a/internal/checker/checker.go b/internal/checker/checker.go index f19439622..dc83aef5b 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -2,21 +2,16 @@ package checker import ( "bufio" - "bytes" "context" "fmt" - "io" "runtime" - "sort" "sync" "github.com/klauspost/compress/zstd" - "github.com/minio/sha256-simd" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/s3" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/repository" @@ -90,16 +85,6 @@ func (err *ErrOldIndexFormat) Error() string { return fmt.Sprintf("index %v has old format", err.ID) } -// ErrPackData is returned if errors are discovered while verifying a packfile -type ErrPackData struct { - PackID restic.ID - errs []error -} - -func (e *ErrPackData) Error() string { - return fmt.Sprintf("pack %v contains %v errors: %v", e.PackID, len(e.errs), e.errs) -} - func (c *Checker) LoadSnapshots(ctx context.Context) error { var err error c.snapshots, err = restic.MemorizeList(ctx, c.repo, restic.SnapshotFile) @@ -524,182 +509,6 @@ func (c *Checker) GetPacks() map[restic.ID]int64 { return c.packs } -type partialReadError struct { - err error -} - -func (e *partialReadError) Error() string { - return e.err.Error() -} - -// checkPack reads a pack and checks the integrity of all blobs. -func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { - err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec) - if err != nil { - // retry pack verification to detect transient errors - err2 := checkPackInner(ctx, r, id, blobs, size, bufRd, dec) - if err2 != nil { - err = err2 - } else { - err = fmt.Errorf("check successful on second attempt, original error %w", err) - } - } - return err -} - -func checkPackInner(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { - - debug.Log("checking pack %v", id.String()) - - if len(blobs) == 0 { - return &ErrPackData{PackID: id, errs: []error{errors.New("pack is empty or not indexed")}} - } - - // sanity check blobs in index - sort.Slice(blobs, func(i, j int) bool { - return blobs[i].Offset < blobs[j].Offset - }) - idxHdrSize := pack.CalculateHeaderSize(blobs) - lastBlobEnd := 0 - nonContinuousPack := false - for _, blob := range blobs { - if lastBlobEnd != int(blob.Offset) { - nonContinuousPack = true - } - lastBlobEnd = int(blob.Offset + blob.Length) - } - // size was calculated by masterindex.PackSize, thus there's no need to recalculate it here - - var errs []error - if nonContinuousPack { - debug.Log("Index for pack contains gaps / overlaps, blobs: %v", blobs) - errs = append(errs, errors.New("index for pack contains gaps / overlapping blobs")) - } - - // calculate hash on-the-fly while reading the pack and capture pack header - var hash restic.ID - var hdrBuf []byte - h := backend.Handle{Type: backend.PackFile, Name: id.String()} - err := r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error { - hrd := hashing.NewReader(rd, sha256.New()) - bufRd.Reset(hrd) - - it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec) - for { - val, err := it.Next() - if err == repository.ErrPackEOF { - break - } else if err != nil { - return &partialReadError{err} - } - debug.Log(" check blob %v: %v", val.Handle.ID, val.Handle) - if val.Err != nil { - debug.Log(" error verifying blob %v: %v", val.Handle.ID, val.Err) - errs = append(errs, errors.Errorf("blob %v: %v", val.Handle.ID, val.Err)) - } - } - - // skip enough bytes until we reach the possible header start - curPos := lastBlobEnd - minHdrStart := int(size) - pack.MaxHeaderSize - if minHdrStart > curPos { - _, err := bufRd.Discard(minHdrStart - curPos) - if err != nil { - return &partialReadError{err} - } - curPos += minHdrStart - curPos - } - - // read remainder, which should be the pack header - var err error - hdrBuf = make([]byte, int(size-int64(curPos))) - _, err = io.ReadFull(bufRd, hdrBuf) - if err != nil { - return &partialReadError{err} - } - - hash = restic.IDFromHash(hrd.Sum(nil)) - return nil - }) - if err != nil { - var e *partialReadError - isPartialReadError := errors.As(err, &e) - // failed to load the pack file, return as further checks cannot succeed anyways - debug.Log(" error streaming pack (partial %v): %v", isPartialReadError, err) - if isPartialReadError { - return &ErrPackData{PackID: id, errs: append(errs, fmt.Errorf("partial download error: %w", err))} - } - - // The check command suggests to repair files for which a `ErrPackData` is returned. However, this file - // completely failed to download such that there's no point in repairing anything. - return fmt.Errorf("download error: %w", err) - } - if !hash.Equal(id) { - debug.Log("pack ID does not match, want %v, got %v", id, hash) - return &ErrPackData{PackID: id, errs: append(errs, errors.Errorf("unexpected pack id %v", hash))} - } - - blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf), int64(len(hdrBuf))) - if err != nil { - return &ErrPackData{PackID: id, errs: append(errs, err)} - } - - if uint32(idxHdrSize) != hdrSize { - debug.Log("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize) - errs = append(errs, errors.Errorf("pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)) - } - - idx := r.Index() - for _, blob := range blobs { - // Check if blob is contained in index and position is correct - idxHas := false - for _, pb := range idx.Lookup(blob.BlobHandle) { - if pb.PackID == id && pb.Blob == blob { - idxHas = true - break - } - } - if !idxHas { - errs = append(errs, errors.Errorf("blob %v is not contained in index or position is incorrect", blob.ID)) - continue - } - } - - if len(errs) > 0 { - return &ErrPackData{PackID: id, errs: errs} - } - - return nil -} - -type bufReader struct { - rd *bufio.Reader - buf []byte -} - -func newBufReader(rd *bufio.Reader) *bufReader { - return &bufReader{ - rd: rd, - } -} - -func (b *bufReader) Discard(n int) (discarded int, err error) { - return b.rd.Discard(n) -} - -func (b *bufReader) ReadFull(n int) (buf []byte, err error) { - if cap(b.buf) < n { - b.buf = make([]byte, n) - } - b.buf = b.buf[:n] - - _, err = io.ReadFull(b.rd, b.buf) - if err != nil { - return nil, err - } - return b.buf, nil -} - // ReadData loads all data from the repository and checks the integrity. func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { c.ReadPacks(ctx, c.packs, nil, errChan) @@ -743,8 +552,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p } } - err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd, dec) - + err := repository.CheckPack(ctx, c.repo.(*repository.Repository), ps.id, ps.blobs, ps.size, bufRd, dec) p.Add(1) if err == nil { continue diff --git a/internal/repository/check.go b/internal/repository/check.go new file mode 100644 index 000000000..17d344451 --- /dev/null +++ b/internal/repository/check.go @@ -0,0 +1,205 @@ +package repository + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "sort" + + "github.com/klauspost/compress/zstd" + "github.com/minio/sha256-simd" + "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/hashing" + "github.com/restic/restic/internal/pack" + "github.com/restic/restic/internal/restic" +) + +// ErrPackData is returned if errors are discovered while verifying a packfile +type ErrPackData struct { + PackID restic.ID + errs []error +} + +func (e *ErrPackData) Error() string { + return fmt.Sprintf("pack %v contains %v errors: %v", e.PackID, len(e.errs), e.errs) +} + +type partialReadError struct { + err error +} + +func (e *partialReadError) Error() string { + return e.err.Error() +} + +// CheckPack reads a pack and checks the integrity of all blobs. +func CheckPack(ctx context.Context, r *Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { + err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec) + if err != nil { + // retry pack verification to detect transient errors + err2 := checkPackInner(ctx, r, id, blobs, size, bufRd, dec) + if err2 != nil { + err = err2 + } else { + err = fmt.Errorf("check successful on second attempt, original error %w", err) + } + } + return err +} + +func checkPackInner(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error { + + debug.Log("checking pack %v", id.String()) + + if len(blobs) == 0 { + return &ErrPackData{PackID: id, errs: []error{errors.New("pack is empty or not indexed")}} + } + + // sanity check blobs in index + sort.Slice(blobs, func(i, j int) bool { + return blobs[i].Offset < blobs[j].Offset + }) + idxHdrSize := pack.CalculateHeaderSize(blobs) + lastBlobEnd := 0 + nonContinuousPack := false + for _, blob := range blobs { + if lastBlobEnd != int(blob.Offset) { + nonContinuousPack = true + } + lastBlobEnd = int(blob.Offset + blob.Length) + } + // size was calculated by masterindex.PackSize, thus there's no need to recalculate it here + + var errs []error + if nonContinuousPack { + debug.Log("Index for pack contains gaps / overlaps, blobs: %v", blobs) + errs = append(errs, errors.New("index for pack contains gaps / overlapping blobs")) + } + + // calculate hash on-the-fly while reading the pack and capture pack header + var hash restic.ID + var hdrBuf []byte + h := backend.Handle{Type: backend.PackFile, Name: id.String()} + err := r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error { + hrd := hashing.NewReader(rd, sha256.New()) + bufRd.Reset(hrd) + + it := NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec) + for { + val, err := it.Next() + if err == ErrPackEOF { + break + } else if err != nil { + return &partialReadError{err} + } + debug.Log(" check blob %v: %v", val.Handle.ID, val.Handle) + if val.Err != nil { + debug.Log(" error verifying blob %v: %v", val.Handle.ID, val.Err) + errs = append(errs, errors.Errorf("blob %v: %v", val.Handle.ID, val.Err)) + } + } + + // skip enough bytes until we reach the possible header start + curPos := lastBlobEnd + minHdrStart := int(size) - pack.MaxHeaderSize + if minHdrStart > curPos { + _, err := bufRd.Discard(minHdrStart - curPos) + if err != nil { + return &partialReadError{err} + } + curPos += minHdrStart - curPos + } + + // read remainder, which should be the pack header + var err error + hdrBuf = make([]byte, int(size-int64(curPos))) + _, err = io.ReadFull(bufRd, hdrBuf) + if err != nil { + return &partialReadError{err} + } + + hash = restic.IDFromHash(hrd.Sum(nil)) + return nil + }) + if err != nil { + var e *partialReadError + isPartialReadError := errors.As(err, &e) + // failed to load the pack file, return as further checks cannot succeed anyways + debug.Log(" error streaming pack (partial %v): %v", isPartialReadError, err) + if isPartialReadError { + return &ErrPackData{PackID: id, errs: append(errs, fmt.Errorf("partial download error: %w", err))} + } + + // The check command suggests to repair files for which a `ErrPackData` is returned. However, this file + // completely failed to download such that there's no point in repairing anything. + return fmt.Errorf("download error: %w", err) + } + if !hash.Equal(id) { + debug.Log("pack ID does not match, want %v, got %v", id, hash) + return &ErrPackData{PackID: id, errs: append(errs, errors.Errorf("unexpected pack id %v", hash))} + } + + blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf), int64(len(hdrBuf))) + if err != nil { + return &ErrPackData{PackID: id, errs: append(errs, err)} + } + + if uint32(idxHdrSize) != hdrSize { + debug.Log("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize) + errs = append(errs, errors.Errorf("pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)) + } + + idx := r.Index() + for _, blob := range blobs { + // Check if blob is contained in index and position is correct + idxHas := false + for _, pb := range idx.Lookup(blob.BlobHandle) { + if pb.PackID == id && pb.Blob == blob { + idxHas = true + break + } + } + if !idxHas { + errs = append(errs, errors.Errorf("blob %v is not contained in index or position is incorrect", blob.ID)) + continue + } + } + + if len(errs) > 0 { + return &ErrPackData{PackID: id, errs: errs} + } + + return nil +} + +type bufReader struct { + rd *bufio.Reader + buf []byte +} + +func newBufReader(rd *bufio.Reader) *bufReader { + return &bufReader{ + rd: rd, + } +} + +func (b *bufReader) Discard(n int) (discarded int, err error) { + return b.rd.Discard(n) +} + +func (b *bufReader) ReadFull(n int) (buf []byte, err error) { + if cap(b.buf) < n { + b.buf = make([]byte, n) + } + b.buf = b.buf[:n] + + _, err = io.ReadFull(b.rd, b.buf) + if err != nil { + return nil, err + } + return b.buf, nil +}