checker: rewrite ReadData to stream packs

This commit is contained in:
Michael Eischer 2021-08-20 16:15:40 +02:00
parent f40abd92fa
commit f1e58e7c7f
4 changed files with 119 additions and 79 deletions

View file

@ -2135,7 +2135,4 @@ func TestBackendLoadWriteTo(t *testing.T) {
firstSnapshot := testRunList(t, "snapshots", env.gopts)
rtest.Assert(t, len(firstSnapshot) == 1,
"expected one snapshot, got %v", firstSnapshot)
// test readData using the hashing.Reader
testRunCheck(t, env.gopts)
}

View file

@ -1,14 +1,18 @@
package checker
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
"sort"
"sync"
"github.com/minio/sha256-simd"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/hashing"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
@ -436,78 +440,112 @@ func (c *Checker) GetPacks() map[restic.ID]int64 {
}
// checkPack reads a pack and checks the integrity of all blobs.
func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error {
debug.Log("checking pack %v", id)
h := restic.Handle{Type: restic.PackFile, Name: id.String()}
func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64) error {
debug.Log("checking pack %v", id.String())
packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h)
if err != nil {
return errors.Wrap(err, "checkPack")
if len(blobs) == 0 {
return errors.Errorf("pack %v is empty or not indexed", id)
}
defer func() {
_ = packfile.Close()
_ = os.Remove(packfile.Name())
}()
// sanity check blobs in index
sort.Slice(blobs, func(i, j int) bool {
return blobs[i].Offset < blobs[j].Offset
})
idxHdrSize := pack.HeaderSize + len(blobs)*int(pack.EntrySize)
lastBlobEnd := 0
nonContinuousPack := false
for _, blob := range blobs {
if lastBlobEnd != int(blob.Offset) {
nonContinuousPack = true
}
lastBlobEnd = int(blob.Offset + blob.Length)
}
// size was calculated by masterindex.PackSize, thus there's no need to recalculate it here
debug.Log("hash for pack %v is %v", id, hash)
var errs []error
if nonContinuousPack {
debug.Log("Index for pack contains gaps / overlaps, blobs: %v", blobs)
errs = append(errs, errors.New("Index for pack contains gaps / overlapping blobs"))
}
// calculate hash on-the-fly while reading the pack and capture pack header
var hash restic.ID
var hdrBuf *bytes.Buffer
hashingLoader := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error {
hrd := hashing.NewReader(rd, sha256.New())
// create a buffer that is large enough to be reused by repository.StreamPack
// this ensures that we can read the pack header later on
bufferSize := int(size)
if bufferSize > repository.MaxStreamBufferSize {
bufferSize = repository.MaxStreamBufferSize
}
bufRd := bufio.NewReaderSize(hrd, bufferSize)
// skip to start of first blob, offset == 0 for correct pack files
_, err := bufRd.Discard(int(offset))
if err != nil {
return err
}
err = fn(bufRd)
if err != nil {
return err
}
// skip enough bytes until we reach the possible header start
curPos := length + int(offset)
minHdrStart := int(size) - pack.MaxHeaderSize
if minHdrStart > curPos {
_, err := bufRd.Discard(minHdrStart - curPos)
if err != nil {
return err
}
}
// read remainder, which should be the pack header
hdrBuf = new(bytes.Buffer)
_, err = io.Copy(hdrBuf, bufRd)
if err != nil {
return err
}
hash = restic.IDFromHash(hrd.Sum(nil))
return nil
})
}
err := repository.StreamPack(ctx, hashingLoader, r.Key(), id, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
debug.Log(" check blob %v: %v", blob.ID, blob)
if err != nil {
debug.Log(" error verifying blob %v: %v", blob.ID, err)
errs = append(errs, errors.Errorf("blob %v: %v", blob.ID, err))
}
return nil
})
if err != nil {
// failed to load the pack file, return as further checks cannot succeed anyways
debug.Log(" error streaming pack: %v", err)
return errors.Errorf("pack %v failed to download: %v", err)
}
if !hash.Equal(id) {
debug.Log("Pack ID does not match, want %v, got %v", id, hash)
return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str())
}
if realSize != size {
debug.Log("Pack size does not match, want %v, got %v", size, realSize)
return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize)
}
blobs, hdrSize, err := pack.List(r.Key(), packfile, size)
blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf.Bytes()), int64(hdrBuf.Len()))
if err != nil {
return err
}
var errs []error
var buf []byte
sizeFromBlobs := uint(hdrSize)
if uint32(idxHdrSize) != hdrSize {
debug.Log("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)
errs = append(errs, errors.Errorf("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize))
}
idx := r.Index()
for i, blob := range blobs {
sizeFromBlobs += blob.Length
debug.Log(" check blob %d: %v", i, blob)
buf = buf[:cap(buf)]
if uint(len(buf)) < blob.Length {
buf = make([]byte, blob.Length)
}
buf = buf[:blob.Length]
_, err := packfile.Seek(int64(blob.Offset), 0)
if err != nil {
return errors.Errorf("Seek(%v): %v", blob.Offset, err)
}
_, err = io.ReadFull(packfile, buf)
if err != nil {
debug.Log(" error loading blob %v: %v", blob.ID, err)
errs = append(errs, errors.Errorf("blob %v: %v", i, err))
continue
}
nonce, ciphertext := buf[:r.Key().NonceSize()], buf[r.Key().NonceSize():]
plaintext, err := r.Key().Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
debug.Log(" error decrypting blob %v: %v", blob.ID, err)
errs = append(errs, errors.Errorf("blob %v: %v", i, err))
continue
}
hash := restic.Hash(plaintext)
if !hash.Equal(blob.ID) {
debug.Log(" Blob ID does not match, want %v, got %v", blob.ID, hash)
errs = append(errs, errors.Errorf("Blob ID does not match, want %v, got %v", blob.ID.Str(), hash.Str()))
continue
}
for _, blob := range blobs {
// Check if blob is contained in index and position is correct
idxHas := false
for _, pb := range idx.Lookup(blob.BlobHandle) {
@ -522,11 +560,6 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int6
}
}
if int64(sizeFromBlobs) != size {
debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs)
errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs))
}
if len(errs) > 0 {
return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs)
}
@ -544,17 +577,18 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
defer close(errChan)
g, ctx := errgroup.WithContext(ctx)
type packsize struct {
type checkTask struct {
id restic.ID
size int64
blobs []restic.Blob
}
ch := make(chan packsize)
ch := make(chan checkTask)
// run workers
for i := 0; i < defaultParallelism; i++ {
g.Go(func() error {
for {
var ps packsize
var ps checkTask
var ok bool
select {
@ -565,7 +599,8 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
return nil
}
}
err := checkPack(ctx, c.repo, ps.id, ps.size)
err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size)
p.Add(1)
if err == nil {
continue
@ -580,10 +615,17 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
})
}
packSet := restic.NewIDSet()
for pack := range packs {
packSet.Insert(pack)
}
// push packs to ch
for pack, size := range packs {
for pbs := range c.repo.Index().ListPacks(ctx, packSet) {
size := packs[pbs.PackID]
debug.Log("listed %v", pbs.PackID)
select {
case ch <- packsize{id: pack, size: size}:
case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}:
case <-ctx.Done():
}
}

View file

@ -160,7 +160,8 @@ const (
// HeaderSize is the header's constant overhead (independent of #entries)
HeaderSize = headerLengthSize + crypto.Extension
maxHeaderSize = 16 * 1024 * 1024
// MaxHeaderSize is the max size of header including header-length field
MaxHeaderSize = 16*1024*1024 + headerLengthSize
// number of header enries to download as part of header-length request
eagerEntries = 15
)
@ -199,7 +200,7 @@ func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) {
err = InvalidFileError{Message: "header length is invalid"}
case int64(hlen) > size-int64(headerLengthSize):
err = InvalidFileError{Message: "header is larger than file"}
case int64(hlen) > maxHeaderSize:
case int64(hlen) > MaxHeaderSize-int64(headerLengthSize):
err = InvalidFileError{Message: "header is larger than maxHeaderSize"}
}
if err != nil {

View file

@ -27,7 +27,7 @@ import (
"golang.org/x/sync/errgroup"
)
const maxStreamBufferSize = 4 * 1024 * 1024
const MaxStreamBufferSize = 4 * 1024 * 1024
// Repository is used to access a repository in a backend.
type Repository struct {
@ -808,8 +808,8 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack
// stream blobs in pack
err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
bufferSize := int(dataEnd - dataStart)
if bufferSize > maxStreamBufferSize {
bufferSize = maxStreamBufferSize
if bufferSize > MaxStreamBufferSize {
bufferSize = MaxStreamBufferSize
}
bufRd := bufio.NewReaderSize(rd, bufferSize)
currentBlobEnd := dataStart