forked from TrueCloudLab/restic
commit
55d9c5f80c
4 changed files with 277 additions and 14 deletions
|
@ -125,6 +125,11 @@ func memCreate(be *MemoryBackend) (Blob, error) {
|
|||
return blob, nil
|
||||
}
|
||||
|
||||
// ReadCloser wraps a reader and adds a noop Close method.
|
||||
func ReadCloser(rd io.Reader) io.ReadCloser {
|
||||
return readCloser{rd}
|
||||
}
|
||||
|
||||
// readCloser wraps a reader and adds a noop Close method.
|
||||
type readCloser struct {
|
||||
io.Reader
|
||||
|
|
|
@ -1,13 +1,17 @@
|
|||
package checker
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic"
|
||||
"github.com/restic/restic/backend"
|
||||
"github.com/restic/restic/crypto"
|
||||
"github.com/restic/restic/debug"
|
||||
"github.com/restic/restic/pack"
|
||||
"github.com/restic/restic/repository"
|
||||
)
|
||||
|
||||
|
@ -335,7 +339,7 @@ type TreeError struct {
|
|||
}
|
||||
|
||||
func (e TreeError) Error() string {
|
||||
return fmt.Sprintf("%v: %d errors", e.ID.String(), len(e.Errors))
|
||||
return fmt.Sprintf("tree %v: %v", e.ID.Str(), e.Errors)
|
||||
}
|
||||
|
||||
type treeJob struct {
|
||||
|
@ -634,3 +638,105 @@ func (c *Checker) UnusedBlobs() (blobs backend.IDs) {
|
|||
func (c *Checker) OrphanedPacks() backend.IDs {
|
||||
return c.orphanedPacks
|
||||
}
|
||||
|
||||
// CountPacks returns the number of packs in the repository.
|
||||
func (c *Checker) CountPacks() uint64 {
|
||||
return uint64(len(c.packs))
|
||||
}
|
||||
|
||||
// checkPack reads a pack and checks the integrity of all blobs.
|
||||
func checkPack(r *repository.Repository, id backend.ID) error {
|
||||
debug.Log("Checker.checkPack", "checking pack %v", id.Str())
|
||||
rd, err := r.Backend().Get(backend.Data, id.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf, err := ioutil.ReadAll(rd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = rd.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
unpacker, err := pack.NewUnpacker(r.Key(), bytes.NewReader(buf))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var errs []error
|
||||
for i, blob := range unpacker.Entries {
|
||||
debug.Log("Checker.checkPack", " check blob %d: %v", i, blob.ID.Str())
|
||||
|
||||
plainBuf := make([]byte, blob.Length)
|
||||
plainBuf, err = crypto.Decrypt(r.Key(), plainBuf, buf[blob.Offset:blob.Offset+blob.Length])
|
||||
if err != nil {
|
||||
debug.Log("Checker.checkPack", " error decrypting blob %v: %v", blob.ID.Str(), err)
|
||||
errs = append(errs, fmt.Errorf("blob %v: %v", i, err))
|
||||
continue
|
||||
}
|
||||
|
||||
hash := backend.Hash(plainBuf)
|
||||
if !hash.Equal(blob.ID) {
|
||||
debug.Log("Checker.checkPack", " ID does not match, want %v, got %v", blob.ID.Str(), hash.Str())
|
||||
errs = append(errs, fmt.Errorf("ID does not match, want %v, got %v", blob.ID.Str(), hash.Str()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return fmt.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadData loads all data from the repository and checks the integrity.
|
||||
func (c *Checker) ReadData(p *restic.Progress, errChan chan<- error, done <-chan struct{}) {
|
||||
defer close(errChan)
|
||||
|
||||
p.Start()
|
||||
defer p.Done()
|
||||
|
||||
worker := func(wg *sync.WaitGroup, in <-chan backend.ID) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
var id backend.ID
|
||||
var ok bool
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case id, ok = <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err := checkPack(c.repo, id)
|
||||
p.Report(restic.Stat{Blobs: 1})
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case errChan <- err:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ch := c.repo.List(backend.Data, done)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < defaultParallelism; i++ {
|
||||
wg.Add(1)
|
||||
go worker(&wg, ch)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
package checker_test
|
||||
|
||||
import (
|
||||
"io"
|
||||
"math/rand"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic"
|
||||
"github.com/restic/restic/backend"
|
||||
"github.com/restic/restic/checker"
|
||||
"github.com/restic/restic/repository"
|
||||
|
@ -24,13 +27,13 @@ func list(repo *repository.Repository, t backend.Type) (IDs []string) {
|
|||
return IDs
|
||||
}
|
||||
|
||||
func checkPacks(chkr *checker.Checker) (errs []error) {
|
||||
func collectErrors(f func(chan<- error, <-chan struct{})) (errs []error) {
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
errChan := make(chan error)
|
||||
|
||||
go chkr.Packs(errChan, done)
|
||||
go f(errChan, done)
|
||||
|
||||
for err := range errChan {
|
||||
errs = append(errs, err)
|
||||
|
@ -39,19 +42,20 @@ func checkPacks(chkr *checker.Checker) (errs []error) {
|
|||
return errs
|
||||
}
|
||||
|
||||
func checkStruct(chkr *checker.Checker) (errs []error) {
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
errChan := make(chan error)
|
||||
|
||||
go chkr.Structure(errChan, done)
|
||||
|
||||
for err := range errChan {
|
||||
errs = append(errs, err)
|
||||
func checkPacks(chkr *checker.Checker) []error {
|
||||
return collectErrors(chkr.Packs)
|
||||
}
|
||||
|
||||
return errs
|
||||
func checkStruct(chkr *checker.Checker) []error {
|
||||
return collectErrors(chkr.Structure)
|
||||
}
|
||||
|
||||
func checkData(chkr *checker.Checker) []error {
|
||||
return collectErrors(
|
||||
func(errCh chan<- error, done <-chan struct{}) {
|
||||
chkr.ReadData(nil, errCh, done)
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func TestCheckRepo(t *testing.T) {
|
||||
|
@ -204,3 +208,102 @@ func TestDuplicatePacksInIndex(t *testing.T) {
|
|||
|
||||
})
|
||||
}
|
||||
|
||||
// errorBackend randomly modifies data after reading.
|
||||
type errorBackend struct {
|
||||
backend.Backend
|
||||
}
|
||||
|
||||
func (b errorBackend) Get(t backend.Type, name string) (io.ReadCloser, error) {
|
||||
rd, err := b.Backend.Get(t, name)
|
||||
if err != nil {
|
||||
return rd, err
|
||||
}
|
||||
|
||||
if t != backend.Data {
|
||||
return rd, err
|
||||
}
|
||||
|
||||
return backend.ReadCloser(faultReader{rd}), nil
|
||||
}
|
||||
|
||||
func (b errorBackend) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) {
|
||||
rd, err := b.Backend.GetReader(t, name, offset, length)
|
||||
if err != nil {
|
||||
return rd, err
|
||||
}
|
||||
|
||||
if t != backend.Data {
|
||||
return rd, err
|
||||
}
|
||||
|
||||
return backend.ReadCloser(faultReader{rd}), nil
|
||||
}
|
||||
|
||||
// induceError flips a bit in the slice.
|
||||
func induceError(data []byte) {
|
||||
if rand.Float32() < 0.8 {
|
||||
return
|
||||
}
|
||||
|
||||
pos := rand.Intn(len(data))
|
||||
data[pos] ^= 1
|
||||
}
|
||||
|
||||
// faultReader wraps a reader and randomly modifies data on read.
|
||||
type faultReader struct {
|
||||
rd io.Reader
|
||||
}
|
||||
|
||||
func (f faultReader) Read(p []byte) (int, error) {
|
||||
n, err := f.rd.Read(p)
|
||||
if n > 0 {
|
||||
induceError(p)
|
||||
}
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
func TestCheckerModifiedData(t *testing.T) {
|
||||
be := backend.NewMemoryBackend()
|
||||
|
||||
repo := repository.New(be)
|
||||
OK(t, repo.Init(TestPassword))
|
||||
|
||||
arch := restic.NewArchiver(repo)
|
||||
_, id, err := arch.Snapshot(nil, []string{"."}, nil)
|
||||
OK(t, err)
|
||||
t.Logf("archived as %v", id.Str())
|
||||
|
||||
checkRepo := repository.New(errorBackend{be})
|
||||
OK(t, checkRepo.SearchKey(TestPassword))
|
||||
|
||||
chkr := checker.New(checkRepo)
|
||||
|
||||
hints, errs := chkr.LoadIndex()
|
||||
if len(errs) > 0 {
|
||||
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
|
||||
}
|
||||
|
||||
if len(hints) > 0 {
|
||||
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
|
||||
}
|
||||
|
||||
errFound := false
|
||||
for _, err := range checkPacks(chkr) {
|
||||
t.Logf("pack error: %v", err)
|
||||
}
|
||||
|
||||
for _, err := range checkStruct(chkr) {
|
||||
t.Logf("struct error: %v", err)
|
||||
}
|
||||
|
||||
for _, err := range checkData(chkr) {
|
||||
t.Logf("struct error: %v", err)
|
||||
errFound = true
|
||||
}
|
||||
|
||||
if !errFound {
|
||||
t.Fatal("no error found, checker is broken")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,11 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"golang.org/x/crypto/ssh/terminal"
|
||||
|
||||
"github.com/restic/restic"
|
||||
"github.com/restic/restic/checker"
|
||||
)
|
||||
|
||||
|
@ -29,6 +33,37 @@ func (cmd CmdCheck) Usage() string {
|
|||
return "[check-options]"
|
||||
}
|
||||
|
||||
func (cmd CmdCheck) newReadProgress(todo restic.Stat) *restic.Progress {
|
||||
if !cmd.global.ShowProgress() {
|
||||
return nil
|
||||
}
|
||||
|
||||
readProgress := restic.NewProgress(time.Second)
|
||||
|
||||
readProgress.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
|
||||
status := fmt.Sprintf("[%s] %s %d / %d items",
|
||||
formatDuration(d),
|
||||
formatPercent(s.Blobs, todo.Blobs),
|
||||
s.Blobs, todo.Blobs)
|
||||
|
||||
w, _, err := terminal.GetSize(int(os.Stdout.Fd()))
|
||||
if err == nil {
|
||||
if len(status) > w {
|
||||
max := w - len(status) - 4
|
||||
status = status[:max] + "... "
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("\x1b[2K%s\r", status)
|
||||
}
|
||||
|
||||
readProgress.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
|
||||
fmt.Printf("\nduration: %s\n", formatDuration(d))
|
||||
}
|
||||
|
||||
return readProgress
|
||||
}
|
||||
|
||||
func (cmd CmdCheck) Execute(args []string) error {
|
||||
if len(args) != 0 {
|
||||
return errors.New("check has no arguments")
|
||||
|
@ -109,6 +144,20 @@ func (cmd CmdCheck) Execute(args []string) error {
|
|||
}
|
||||
}
|
||||
|
||||
if cmd.ReadData {
|
||||
cmd.global.Verbosef("Read all data\n")
|
||||
|
||||
p := cmd.newReadProgress(restic.Stat{Blobs: chkr.CountPacks()})
|
||||
errChan := make(chan error)
|
||||
|
||||
go chkr.ReadData(p, errChan, done)
|
||||
|
||||
for err := range errChan {
|
||||
errorsFound = true
|
||||
fmt.Fprintf(os.Stderr, "%v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
if errorsFound {
|
||||
return errors.New("repository contains errors")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue