Add checker and command 'check' to replace 'fsck'

This commit is contained in:
Alexander Neumann 2015-06-29 00:22:25 +02:00
parent cbcf58f1c0
commit 54c4c29a89
7 changed files with 256 additions and 311 deletions

136
checker/checker.go Normal file
View file

@ -0,0 +1,136 @@
package checker
import (
"encoding/hex"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/repository"
)
// Error is an error in the repository detected by the checker.
type Error struct {
Message string
Err error
}
func (e Error) Error() string {
if e.Err != nil {
return e.Message + ": " + e.Err.Error()
}
return e.Message
}
type mapID [backend.IDSize]byte
func id2map(id backend.ID) (mid mapID) {
copy(mid[:], id)
return
}
func str2map(s string) (mid mapID, err error) {
data, err := hex.DecodeString(s)
if err != nil {
return mid, err
}
return id2map(data), nil
}
// Checker runs various checks on a repository. It is advisable to create an
// exclusive Lock in the repository before running any checks.
//
// A Checker only tests for internal errors within the data structures of the
// repository (e.g. missing blobs), and needs a valid Repository to work on.
type Checker struct {
packs map[mapID]struct{}
blobs map[mapID]struct{}
blobRefs map[mapID]uint
indexes map[mapID]*repository.Index
masterIndex *repository.Index
repo *repository.Repository
}
// New returns a new checker which runs on repo.
func New(repo *repository.Repository) *Checker {
return &Checker{
blobRefs: make(map[mapID]uint),
packs: make(map[mapID]struct{}),
blobs: make(map[mapID]struct{}),
masterIndex: repository.NewIndex(),
indexes: make(map[mapID]*repository.Index),
repo: repo,
}
}
const loadIndexParallelism = 20
// LoadIndex loads all index files.
func (c *Checker) LoadIndex() error {
debug.Log("LoadIndex", "Start")
type indexRes struct {
Index *repository.Index
ID string
}
indexCh := make(chan indexRes)
worker := func(id string, done <-chan struct{}) error {
debug.Log("LoadIndex", "worker got index %v", id)
idx, err := repository.LoadIndex(c.repo, id)
if err != nil {
return err
}
select {
case indexCh <- indexRes{Index: idx, ID: id}:
case <-done:
}
return nil
}
var perr error
go func() {
defer close(indexCh)
debug.Log("LoadIndex", "start loading indexes in parallel")
perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, loadIndexParallelism, worker)
debug.Log("LoadIndex", "loading indexes finished, error: %v", perr)
}()
done := make(chan struct{})
defer close(done)
for res := range indexCh {
debug.Log("LoadIndex", "process index %v", res.ID)
id, err := str2map(res.ID)
if err != nil {
return err
}
c.indexes[id] = res.Index
c.masterIndex.Merge(res.Index)
debug.Log("LoadIndex", "process blobs")
cnt := 0
for blob := range res.Index.Each(done) {
c.packs[id2map(blob.PackID)] = struct{}{}
c.blobs[id2map(blob.ID)] = struct{}{}
c.blobRefs[id2map(blob.ID)] = 0
cnt++
}
debug.Log("LoadIndex", "%d blobs processed", cnt)
}
debug.Log("LoadIndex", "done, error %v", perr)
return perr
}
// Packs checks that all packs referenced in the index are still available.
func (c *Checker) Packs() error {
return nil
}

33
checker/checker_test.go Normal file
View file

@ -0,0 +1,33 @@
package checker_test
import (
"path/filepath"
"testing"
"github.com/restic/restic/backend"
"github.com/restic/restic/checker"
"github.com/restic/restic/repository"
. "github.com/restic/restic/test"
)
var checkerTestData = filepath.Join("testdata", "checker-test-repo.tar.gz")
func list(repo *repository.Repository, t backend.Type) (IDs []string) {
done := make(chan struct{})
defer close(done)
for id := range repo.List(t, done) {
IDs = append(IDs, id.String())
}
return IDs
}
func TestCheckRepo(t *testing.T) {
WithTestEnvironment(t, checkerTestData, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
checker := checker.New(repo)
OK(t, checker.LoadIndex())
})
}

Binary file not shown.

54
cmd/restic/cmd_check.go Normal file
View file

@ -0,0 +1,54 @@
package main
import (
"errors"
"github.com/restic/restic/checker"
)
type CmdCheck struct {
ReadData bool ` long:"read-data" description:"Read data blobs" default:"false"`
global *GlobalOptions
}
func init() {
_, err := parser.AddCommand("check",
"check the repository",
"The check command check the integrity and consistency of the repository",
&CmdCheck{global: &globalOpts})
if err != nil {
panic(err)
}
}
func (cmd CmdCheck) Usage() string {
return "[check-options]"
}
func (cmd CmdCheck) Execute(args []string) error {
if len(args) != 0 {
return errors.New("check has no arguments")
}
repo, err := cmd.global.OpenRepository()
if err != nil {
return err
}
cmd.global.Verbosef("Create exclusive lock for repository\n")
lock, err := lockRepoExclusive(repo)
defer unlockRepo(lock)
if err != nil {
return err
}
checker := checker.New(repo)
cmd.global.Verbosef("Load indexes\n")
if err = checker.LoadIndex(); err != nil {
return err
}
return nil
}

View file

@ -1,272 +0,0 @@
package main
import (
"errors"
"fmt"
"os"
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/crypto"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/repository"
)
type CmdFsck struct {
CheckData bool ` long:"check-data" description:"Read data blobs" default:"false"`
Snapshot string `short:"s" long:"snapshot" description:"Only check this snapshot"`
Orphaned bool `short:"o" long:"orphaned" description:"Check for orphaned blobs"`
RemoveOrphaned bool `short:"r" long:"remove-orphaned" description:"Remove orphaned blobs (implies -o)"`
global *GlobalOptions
// lists checking for orphaned blobs
o_data *backend.IDSet
o_trees *backend.IDSet
}
func init() {
_, err := parser.AddCommand("fsck",
"check the repository",
"The fsck command check the integrity and consistency of the repository",
&CmdFsck{global: &globalOpts})
if err != nil {
panic(err)
}
}
func fsckFile(global CmdFsck, repo *repository.Repository, IDs []backend.ID) (uint64, error) {
debug.Log("restic.fsckFile", "checking file %v", IDs)
var bytes uint64
for _, id := range IDs {
debug.Log("restic.fsck", " checking data blob %v\n", id)
// test if blob is in the index
packID, tpe, _, length, err := repo.Index().Lookup(id)
if err != nil {
return 0, fmt.Errorf("storage for blob %v (%v) not found", id, tpe)
}
bytes += uint64(length - crypto.Extension)
debug.Log("restic.fsck", " blob found in pack %v\n", packID)
if global.CheckData {
// load content
_, err := repo.LoadBlob(pack.Data, id)
if err != nil {
return 0, err
}
} else {
// test if pack for data blob is there
ok, err := repo.Backend().Test(backend.Data, packID.String())
if err != nil {
return 0, err
}
if !ok {
return 0, fmt.Errorf("data blob %v not found", id)
}
}
// if orphan check is active, record storage id
if global.o_data != nil {
debug.Log("restic.fsck", " recording blob %v as used\n", id)
global.o_data.Insert(id)
}
}
return bytes, nil
}
func fsckTree(global CmdFsck, repo *repository.Repository, id backend.ID) error {
debug.Log("restic.fsckTree", "checking tree %v", id.Str())
tree, err := restic.LoadTree(repo, id)
if err != nil {
return err
}
// if orphan check is active, record storage id
if global.o_trees != nil {
// add ID to list
global.o_trees.Insert(id)
}
var firstErr error
seenIDs := backend.NewIDSet()
for i, node := range tree.Nodes {
if node.Name == "" {
return fmt.Errorf("node %v of tree %v has no name", i, id.Str())
}
if node.Type == "" {
return fmt.Errorf("node %q of tree %v has no type", node.Name, id.Str())
}
switch node.Type {
case "file":
if node.Content == nil {
debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, id, node)
return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, id, node)
}
if node.Content == nil && node.Error == "" {
debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, id)
return fmt.Errorf("file node %q of tree %v has no content", node.Name, id)
}
// record ids
for _, id := range node.Content {
seenIDs.Insert(id)
}
debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, id.Str())
bytes, err := fsckFile(global, repo, node.Content)
if err != nil {
return err
}
if bytes != node.Size {
debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes)
return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes)
}
case "dir":
if node.Subtree == nil {
return fmt.Errorf("dir node %q of tree %v has no subtree", node.Name, id)
}
// record id
seenIDs.Insert(node.Subtree)
err = fsckTree(global, repo, node.Subtree)
if err != nil {
firstErr = err
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}
}
// check map for unused ids
// for _, id := range tree.Map.IDs() {
// if seenIDs.Find(id) != nil {
// return fmt.Errorf("tree %v: map contains unused ID %v", id, id)
// }
// }
return firstErr
}
func fsckSnapshot(global CmdFsck, repo *repository.Repository, id backend.ID) error {
debug.Log("restic.fsck", "checking snapshot %v\n", id)
sn, err := restic.LoadSnapshot(repo, id)
if err != nil {
return fmt.Errorf("loading snapshot %v failed: %v", id, err)
}
err = fsckTree(global, repo, sn.Tree)
if err != nil {
debug.Log("restic.fsck", " checking tree %v for snapshot %v\n", sn.Tree, id)
fmt.Fprintf(os.Stderr, "snapshot %v:\n error for tree %v:\n %v\n", id, sn.Tree, err)
}
return err
}
func (cmd CmdFsck) Usage() string {
return "[fsck-options]"
}
func (cmd CmdFsck) Execute(args []string) error {
if len(args) != 0 {
return errors.New("fsck has no arguments")
}
if cmd.RemoveOrphaned && !cmd.Orphaned {
cmd.Orphaned = true
}
repo, err := cmd.global.OpenRepository()
if err != nil {
return err
}
lock, err := lockRepoExclusive(repo)
defer unlockRepo(lock)
if err != nil {
return err
}
err = repo.LoadIndex()
if err != nil {
return err
}
if cmd.Snapshot != "" {
id, err := restic.FindSnapshot(repo, cmd.Snapshot)
if err != nil {
return fmt.Errorf("invalid id %q: %v", cmd.Snapshot, err)
}
err = fsckSnapshot(cmd, repo, id)
if err != nil {
fmt.Fprintf(os.Stderr, "check for snapshot %v failed\n", id)
}
return err
}
if cmd.Orphaned {
cmd.o_data = backend.NewIDSet()
cmd.o_trees = backend.NewIDSet()
}
done := make(chan struct{})
defer close(done)
var firstErr error
for id := range repo.List(backend.Snapshot, done) {
err = fsckSnapshot(cmd, repo, id)
if err != nil {
fmt.Fprintf(os.Stderr, "check for snapshot %v failed\n", id)
firstErr = err
}
}
if !cmd.Orphaned {
return firstErr
}
debug.Log("restic.fsck", "starting orphaned check\n")
cnt := make(map[pack.BlobType]*backend.IDSet)
cnt[pack.Data] = cmd.o_data
cnt[pack.Tree] = cmd.o_trees
for blob := range repo.Index().Each(done) {
debug.Log("restic.fsck", "checking %v blob %v\n", blob.Type, blob.ID)
err = cnt[blob.Type].Find(blob.ID)
if err != nil {
debug.Log("restic.fsck", " blob %v is orphaned\n", blob.ID)
if !cmd.RemoveOrphaned {
fmt.Printf("orphaned %v blob %v\n", blob.Type, blob.ID)
continue
}
fmt.Printf("removing orphaned %v blob %v\n", blob.Type, blob.ID)
// err := s.Remove(d.tpe, name)
// if err != nil {
// return err
// }
return errors.New("not implemented")
}
}
return firstErr
}

View file

@ -8,7 +8,6 @@ import (
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"regexp"
"syscall"
@ -20,20 +19,6 @@ import (
. "github.com/restic/restic/test"
)
func setupTarTestFixture(t testing.TB, outputDir, tarFile string) {
err := system("sh", "-c", `(cd "$1" && tar xz) < "$2"`,
"sh", outputDir, tarFile)
OK(t, err)
}
func system(command string, args ...string) error {
cmd := exec.Command(command, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}
func parseIDsFromReader(t testing.TB, rd io.Reader) backend.IDs {
IDs := backend.IDs{}
sc := bufio.NewScanner(rd)
@ -83,8 +68,8 @@ func cmdRestore(t testing.TB, global GlobalOptions, dir string, snapshotID backe
cmd.Execute(append([]string{snapshotID.String(), dir}, args...))
}
func cmdFsck(t testing.TB, global GlobalOptions) {
cmd := &CmdFsck{global: &global, CheckData: true, Orphaned: true}
func cmdCheck(t testing.TB, global GlobalOptions) {
cmd := &CmdCheck{global: &global, ReadData: true}
OK(t, cmd.Execute(nil))
}
@ -101,7 +86,7 @@ func TestBackup(t *testing.T) {
cmdInit(t, global)
setupTarTestFixture(t, env.testdata, datafile)
SetupTarTestFixture(t, env.testdata, datafile)
// first backup
cmdBackup(t, global, []string{env.testdata}, nil)
@ -109,7 +94,7 @@ func TestBackup(t *testing.T) {
Assert(t, len(snapshotIDs) == 1,
"expected one snapshot, got %v", snapshotIDs)
cmdFsck(t, global)
cmdCheck(t, global)
stat1 := dirStats(env.repo)
// second backup, implicit incremental
@ -124,7 +109,7 @@ func TestBackup(t *testing.T) {
}
t.Logf("repository grown by %d bytes", stat2.size-stat1.size)
cmdFsck(t, global)
cmdCheck(t, global)
// third backup, explicit incremental
cmdBackup(t, global, []string{env.testdata}, snapshotIDs[0])
snapshotIDs = cmdList(t, global, "snapshots")
@ -146,7 +131,7 @@ func TestBackup(t *testing.T) {
"directories are not equal")
}
cmdFsck(t, global)
cmdCheck(t, global)
})
}
@ -161,7 +146,7 @@ func TestBackupNonExistingFile(t *testing.T) {
OK(t, err)
OK(t, fd.Close())
setupTarTestFixture(t, env.testdata, datafile)
SetupTarTestFixture(t, env.testdata, datafile)
cmdInit(t, global)
@ -189,7 +174,7 @@ func TestBackupMissingFile1(t *testing.T) {
OK(t, err)
OK(t, fd.Close())
setupTarTestFixture(t, env.testdata, datafile)
SetupTarTestFixture(t, env.testdata, datafile)
cmdInit(t, global)
@ -208,7 +193,7 @@ func TestBackupMissingFile1(t *testing.T) {
})
cmdBackup(t, global, []string{env.testdata}, nil)
cmdFsck(t, global)
cmdCheck(t, global)
Assert(t, ranHook, "hook did not run")
debug.RemoveHook("pipe.walk1")
@ -226,7 +211,7 @@ func TestBackupMissingFile2(t *testing.T) {
OK(t, err)
OK(t, fd.Close())
setupTarTestFixture(t, env.testdata, datafile)
SetupTarTestFixture(t, env.testdata, datafile)
cmdInit(t, global)
@ -245,7 +230,7 @@ func TestBackupMissingFile2(t *testing.T) {
})
cmdBackup(t, global, []string{env.testdata}, nil)
cmdFsck(t, global)
cmdCheck(t, global)
Assert(t, ranHook, "hook did not run")
debug.RemoveHook("pipe.walk2")
@ -290,13 +275,13 @@ func TestIncrementalBackup(t *testing.T) {
OK(t, appendRandomData(testfile, incrementalFirstWrite))
cmdBackup(t, global, []string{datadir}, nil)
cmdFsck(t, global)
cmdCheck(t, global)
stat1 := dirStats(env.repo)
OK(t, appendRandomData(testfile, incrementalSecondWrite))
cmdBackup(t, global, []string{datadir}, nil)
cmdFsck(t, global)
cmdCheck(t, global)
stat2 := dirStats(env.repo)
if stat2.size-stat1.size > incrementalFirstWrite {
t.Errorf("repository size has grown by more than %d bytes", incrementalFirstWrite)
@ -306,7 +291,7 @@ func TestIncrementalBackup(t *testing.T) {
OK(t, appendRandomData(testfile, incrementalThirdWrite))
cmdBackup(t, global, []string{datadir}, nil)
cmdFsck(t, global)
cmdCheck(t, global)
stat3 := dirStats(env.repo)
if stat3.size-stat2.size > incrementalFirstWrite {
t.Errorf("repository size has grown by more than %d bytes", incrementalFirstWrite)
@ -387,7 +372,7 @@ func TestKeyAddRemove(t *testing.T) {
t.Logf("testing access with last password %q\n", global.password)
cmdKey(t, global, "list")
cmdFsck(t, global)
cmdCheck(t, global)
})
}
@ -425,7 +410,7 @@ func TestRestoreFilter(t *testing.T) {
}
cmdBackup(t, global, []string{env.testdata}, nil)
cmdFsck(t, global)
cmdCheck(t, global)
snapshotID := cmdList(t, global, "snapshots")[0]
@ -471,7 +456,7 @@ func TestRestoreNoMetadataOnIgnoredIntermediateDirs(t *testing.T) {
OK(t, setZeroModTime(filepath.Join(env.testdata, "subdir1", "subdir2")))
cmdBackup(t, global, []string{env.testdata}, nil)
cmdFsck(t, global)
cmdCheck(t, global)
snapshotID := cmdList(t, global, "snapshots")[0]

View file

@ -107,13 +107,19 @@ func (idx *Index) Merge(other *Index) {
debug.Log("Index.Merge", "done merging index")
}
// PackedBlob is a blob already saved within a pack.
type PackedBlob struct {
pack.Blob
PackID backend.ID
}
// Each returns a channel that yields all blobs known to the index. If done is
// closed, the background goroutine terminates. This blocks any modification of
// the index.
func (idx *Index) Each(done chan struct{}) <-chan pack.Blob {
func (idx *Index) Each(done chan struct{}) <-chan PackedBlob {
idx.m.Lock()
ch := make(chan pack.Blob)
ch := make(chan PackedBlob)
go func() {
defer idx.m.Unlock()
@ -131,11 +137,14 @@ func (idx *Index) Each(done chan struct{}) <-chan pack.Blob {
select {
case <-done:
return
case ch <- pack.Blob{
case ch <- PackedBlob{
Blob: pack.Blob{
ID: id,
Offset: blob.offset,
Type: blob.tpe,
Length: uint32(blob.length),
},
PackID: blob.packID,
}:
}
}