Merge pull request #214 from restic/fix-fsck

Add checker and command 'check' to replace 'fsck'
This commit is contained in:
Alexander Neumann 2015-07-12 18:43:34 +02:00
commit 2cc5318c46
15 changed files with 930 additions and 435 deletions

View file

@ -1,4 +1,4 @@
// +build debug // +build debug_sftp
package sftp package sftp

View file

@ -1,4 +1,4 @@
// +build !debug // +build !debug_sftp
package sftp package sftp

View file

@ -62,16 +62,18 @@ afterwards you'll find the binary in the current directory:
backup save file/directory backup save file/directory
cache manage cache cache manage cache
cat dump something cat dump something
check check the repository
find find a file/directory find find a file/directory
fsck check the repository
init create repository init create repository
key manage keys key manage keys
list lists data list lists data
ls list files ls list files
restore restore a snapshot restore restore a snapshot
snapshots show snapshots snapshots show snapshots
unlock remove locks
version display version version display version
Contribute and Documentation Contribute and Documentation
============================ ============================

View file

@ -1,75 +0,0 @@
package backend
import (
"errors"
"sort"
"sync"
)
type IDSet struct {
list IDs
m sync.Mutex
}
func NewIDSet() *IDSet {
return &IDSet{
list: make(IDs, 0),
}
}
func (s *IDSet) find(id ID) (int, error) {
pos := sort.Search(len(s.list), func(i int) bool {
return id.Compare(s.list[i]) >= 0
})
if pos < len(s.list) {
candID := s.list[pos]
if id.Compare(candID) == 0 {
return pos, nil
}
}
return pos, errors.New("ID not found")
}
func (s *IDSet) insert(id ID) {
pos, err := s.find(id)
if err == nil {
// already present
return
}
// insert blob
// https://code.google.com/p/go-wiki/wiki/SliceTricks
s.list = append(s.list, ID{})
copy(s.list[pos+1:], s.list[pos:])
s.list[pos] = id
return
}
func (s *IDSet) Insert(id ID) {
s.m.Lock()
defer s.m.Unlock()
s.insert(id)
}
func (s *IDSet) Find(id ID) error {
s.m.Lock()
defer s.m.Unlock()
_, err := s.find(id)
if err != nil {
return err
}
return nil
}
func (s *IDSet) Len() int {
s.m.Lock()
defer s.m.Unlock()
return len(s.list)
}

View file

@ -1,41 +0,0 @@
package backend_test
import (
"crypto/rand"
"io"
"testing"
"github.com/restic/restic/backend"
. "github.com/restic/restic/test"
)
func randomID() []byte {
buf := make([]byte, backend.IDSize)
_, err := io.ReadFull(rand.Reader, buf)
if err != nil {
panic(err)
}
return buf
}
func TestSet(t *testing.T) {
s := backend.NewIDSet()
testID := randomID()
err := s.Find(testID)
Assert(t, err != nil, "found test ID in IDSet before insertion")
for i := 0; i < 238; i++ {
s.Insert(randomID())
}
s.Insert(testID)
OK(t, s.Find(testID))
for i := 0; i < 80; i++ {
s.Insert(randomID())
}
s.Insert(testID)
OK(t, s.Find(testID))
}

601
checker/checker.go Normal file
View file

@ -0,0 +1,601 @@
package checker
import (
"encoding/hex"
"errors"
"fmt"
"sync"
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/repository"
)
type mapID [backend.IDSize]byte
func id2map(id backend.ID) (mid mapID) {
copy(mid[:], id)
return
}
func str2map(s string) (mid mapID, err error) {
data, err := hex.DecodeString(s)
if err != nil {
return mid, err
}
return id2map(data), nil
}
func map2str(id mapID) string {
return hex.EncodeToString(id[:])
}
func map2id(id mapID) backend.ID {
return backend.ID(id[:])
}
// Checker runs various checks on a repository. It is advisable to create an
// exclusive Lock in the repository before running any checks.
//
// A Checker only tests for internal errors within the data structures of the
// repository (e.g. missing blobs), and needs a valid Repository to work on.
type Checker struct {
packs map[mapID]struct{}
blobs map[mapID]struct{}
blobRefs struct {
sync.Mutex
M map[mapID]uint
}
indexes map[mapID]*repository.Index
orphanedPacks backend.IDs
masterIndex *repository.Index
repo *repository.Repository
}
// New returns a new checker which runs on repo.
func New(repo *repository.Repository) *Checker {
c := &Checker{
packs: make(map[mapID]struct{}),
blobs: make(map[mapID]struct{}),
masterIndex: repository.NewIndex(),
indexes: make(map[mapID]*repository.Index),
repo: repo,
}
c.blobRefs.M = make(map[mapID]uint)
return c
}
const defaultParallelism = 40
// LoadIndex loads all index files.
func (c *Checker) LoadIndex() error {
debug.Log("LoadIndex", "Start")
type indexRes struct {
Index *repository.Index
ID string
}
indexCh := make(chan indexRes)
worker := func(id string, done <-chan struct{}) error {
debug.Log("LoadIndex", "worker got index %v", id)
idx, err := repository.LoadIndex(c.repo, id)
if err != nil {
return err
}
select {
case indexCh <- indexRes{Index: idx, ID: id}:
case <-done:
}
return nil
}
var perr error
go func() {
defer close(indexCh)
debug.Log("LoadIndex", "start loading indexes in parallel")
perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, defaultParallelism, worker)
debug.Log("LoadIndex", "loading indexes finished, error: %v", perr)
}()
done := make(chan struct{})
defer close(done)
for res := range indexCh {
debug.Log("LoadIndex", "process index %v", res.ID)
id, err := str2map(res.ID)
if err != nil {
return err
}
c.indexes[id] = res.Index
c.masterIndex.Merge(res.Index)
debug.Log("LoadIndex", "process blobs")
cnt := 0
for blob := range res.Index.Each(done) {
c.packs[id2map(blob.PackID)] = struct{}{}
c.blobs[id2map(blob.ID)] = struct{}{}
c.blobRefs.M[id2map(blob.ID)] = 0
cnt++
}
debug.Log("LoadIndex", "%d blobs processed", cnt)
}
debug.Log("LoadIndex", "done, error %v", perr)
c.repo.SetIndex(c.masterIndex)
return perr
}
// PackError describes an error with a specific pack.
type PackError struct {
ID backend.ID
Orphaned bool
Err error
}
func (e PackError) Error() string {
return "pack " + e.ID.String() + ": " + e.Err.Error()
}
func packIDTester(repo *repository.Repository, inChan <-chan mapID, errChan chan<- error, wg *sync.WaitGroup, done <-chan struct{}) {
debug.Log("Checker.testPackID", "worker start")
defer debug.Log("Checker.testPackID", "worker done")
defer wg.Done()
for id := range inChan {
ok, err := repo.Backend().Test(backend.Data, map2str(id))
if err != nil {
err = PackError{ID: map2id(id), Err: err}
} else {
if !ok {
err = PackError{ID: map2id(id), Err: errors.New("does not exist")}
}
}
if err != nil {
debug.Log("Checker.testPackID", "error checking for pack %s: %v", map2id(id).Str(), err)
select {
case <-done:
return
case errChan <- err:
}
continue
}
debug.Log("Checker.testPackID", "pack %s exists", map2id(id).Str())
}
}
func collectErrors(in <-chan error, out chan<- []error, done <-chan struct{}) {
var errs []error
outer:
for {
select {
case err, ok := <-in:
if !ok {
break outer
}
errs = append(errs, err)
case <-done:
break outer
}
}
out <- errs
}
// Packs checks that all packs referenced in the index are still available and
// there are no packs that aren't in an index. errChan is closed after all
// packs have been checked.
func (c *Checker) Packs(errChan chan<- error, done <-chan struct{}) {
defer close(errChan)
debug.Log("Checker.Packs", "checking for %d packs", len(c.packs))
seenPacks := make(map[mapID]struct{})
var workerWG sync.WaitGroup
IDChan := make(chan mapID)
for i := 0; i < defaultParallelism; i++ {
workerWG.Add(1)
go packIDTester(c.repo, IDChan, errChan, &workerWG, done)
}
for id := range c.packs {
seenPacks[id] = struct{}{}
IDChan <- id
}
close(IDChan)
debug.Log("Checker.Packs", "waiting for %d workers to terminate", defaultParallelism)
workerWG.Wait()
debug.Log("Checker.Packs", "workers terminated")
for id := range c.repo.List(backend.Data, done) {
debug.Log("Checker.Packs", "check data blob %v", id.Str())
if _, ok := seenPacks[id2map(id)]; !ok {
c.orphanedPacks = append(c.orphanedPacks, id)
select {
case <-done:
return
case errChan <- PackError{ID: id, Orphaned: true, Err: errors.New("not referenced in any index")}:
}
}
}
}
// Error is an error that occurred while checking a repository.
type Error struct {
TreeID backend.ID
BlobID backend.ID
Err error
}
func (e Error) Error() string {
if e.BlobID != nil && e.TreeID != nil {
msg := "tree " + e.TreeID.String()
msg += ", blob " + e.BlobID.String()
msg += ": " + e.Err.Error()
return msg
}
if e.TreeID != nil {
return "tree " + e.TreeID.String() + ": " + e.Err.Error()
}
return e.Err.Error()
}
func loadTreeFromSnapshot(repo *repository.Repository, id backend.ID) (backend.ID, error) {
sn, err := restic.LoadSnapshot(repo, id)
if err != nil {
debug.Log("Checker.loadTreeFromSnapshot", "error loading snapshot %v: %v", id.Str(), err)
return nil, err
}
if sn.Tree == nil {
debug.Log("Checker.loadTreeFromSnapshot", "snapshot %v has no tree", id.Str())
return nil, fmt.Errorf("snapshot %v has no tree", id)
}
return sn.Tree, nil
}
// loadSnapshotTreeIDs loads all snapshots from backend and returns the tree IDs.
func loadSnapshotTreeIDs(repo *repository.Repository) (backend.IDs, []error) {
var trees struct {
IDs backend.IDs
sync.Mutex
}
var errs struct {
errs []error
sync.Mutex
}
snapshotWorker := func(strID string, done <-chan struct{}) error {
id, err := backend.ParseID(strID)
if err != nil {
return err
}
debug.Log("Checker.Snaphots", "load snapshot %v", id.Str())
treeID, err := loadTreeFromSnapshot(repo, id)
if err != nil {
errs.Lock()
errs.errs = append(errs.errs, err)
errs.Unlock()
return nil
}
debug.Log("Checker.Snaphots", "snapshot %v has tree %v", id.Str(), treeID.Str())
trees.Lock()
trees.IDs = append(trees.IDs, treeID)
trees.Unlock()
return nil
}
err := repository.FilesInParallel(repo.Backend(), backend.Snapshot, defaultParallelism, snapshotWorker)
if err != nil {
errs.errs = append(errs.errs, err)
}
return trees.IDs, errs.errs
}
// TreeError is returned when loading a tree from the repository failed.
type TreeError struct {
ID backend.ID
Errors []error
}
func (e TreeError) Error() string {
return fmt.Sprintf("%v: %d errors", e.ID.String(), len(e.Errors))
}
type treeJob struct {
backend.ID
error
*restic.Tree
}
// loadTreeWorker loads trees from repo and sends them to out.
func loadTreeWorker(repo *repository.Repository,
in <-chan backend.ID, out chan<- treeJob,
done <-chan struct{}, wg *sync.WaitGroup) {
defer func() {
debug.Log("checker.loadTreeWorker", "exiting")
wg.Done()
}()
var (
inCh = in
outCh = out
job treeJob
)
outCh = nil
for {
select {
case <-done:
return
case treeID, ok := <-inCh:
if !ok {
return
}
debug.Log("checker.loadTreeWorker", "load tree %v", treeID.Str())
tree, err := restic.LoadTree(repo, treeID)
debug.Log("checker.loadTreeWorker", "load tree %v (%v) returned err %v", tree, treeID.Str(), err)
job = treeJob{ID: treeID, error: err, Tree: tree}
outCh = out
inCh = nil
case outCh <- job:
debug.Log("checker.loadTreeWorker", "sent tree %v", job.ID.Str())
outCh = nil
inCh = in
}
}
}
// checkTreeWorker checks the trees received and sends out errors to errChan.
func (c *Checker) checkTreeWorker(in <-chan treeJob, out chan<- TreeError, done <-chan struct{}, wg *sync.WaitGroup) {
defer func() {
debug.Log("checker.checkTreeWorker", "exiting")
wg.Done()
}()
var (
inCh = in
outCh = out
treeError TreeError
)
outCh = nil
for {
select {
case <-done:
return
case job, ok := <-inCh:
if !ok {
return
}
id := id2map(job.ID)
alreadyChecked := false
c.blobRefs.Lock()
if c.blobRefs.M[id] > 0 {
alreadyChecked = true
}
c.blobRefs.M[id]++
debug.Log("checker.checkTreeWorker", "tree %v refcount %d", job.ID.Str(), c.blobRefs.M[id])
c.blobRefs.Unlock()
if alreadyChecked {
continue
}
debug.Log("checker.checkTreeWorker", "load tree %v", job.ID.Str())
errs := c.checkTree(job.ID, job.Tree)
if len(errs) > 0 {
debug.Log("checker.checkTreeWorker", "checked tree %v: %v errors", job.ID.Str(), len(errs))
treeError = TreeError{ID: job.ID, Errors: errs}
outCh = out
inCh = nil
}
case outCh <- treeError:
debug.Log("checker.checkTreeWorker", "tree %v: sent %d errors", treeError.ID, len(treeError.Errors))
outCh = nil
inCh = in
}
}
}
func filterTrees(backlog backend.IDs, loaderChan chan<- backend.ID, in <-chan treeJob, out chan<- treeJob, done <-chan struct{}) {
defer func() {
debug.Log("checker.filterTrees", "closing output channels")
close(loaderChan)
close(out)
}()
var (
inCh = in
outCh = out
loadCh = loaderChan
job treeJob
nextTreeID backend.ID
outstandingLoadTreeJobs = 0
)
outCh = nil
loadCh = nil
for {
if loadCh == nil && len(backlog) > 0 {
loadCh = loaderChan
nextTreeID, backlog = backlog[0], backlog[1:]
}
if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 {
debug.Log("checker.filterTrees", "backlog is empty, all channels nil, exiting")
return
}
select {
case <-done:
return
case loadCh <- nextTreeID:
outstandingLoadTreeJobs++
loadCh = nil
case j, ok := <-inCh:
if !ok {
debug.Log("checker.filterTrees", "input channel closed")
inCh = nil
in = nil
continue
}
outstandingLoadTreeJobs--
debug.Log("checker.filterTrees", "input job tree %v", j.ID.Str())
backlog = append(backlog, j.Tree.Subtrees()...)
job = j
outCh = out
inCh = nil
case outCh <- job:
outCh = nil
inCh = in
}
}
}
// Structure checks that for all snapshots all referenced data blobs and
// subtrees are available in the index. errChan is closed after all trees have
// been traversed.
func (c *Checker) Structure(errChan chan<- error, done <-chan struct{}) {
defer close(errChan)
trees, errs := loadSnapshotTreeIDs(c.repo)
debug.Log("checker.Structure", "need to check %d trees from snapshots, %d errs returned", len(trees), len(errs))
for _, err := range errs {
select {
case <-done:
return
case errChan <- err:
}
}
treeIDChan := make(chan backend.ID)
treeJobChan1 := make(chan treeJob)
treeJobChan2 := make(chan treeJob)
treeErrChan := make(chan TreeError)
var wg sync.WaitGroup
for i := 0; i < defaultParallelism; i++ {
wg.Add(2)
go loadTreeWorker(c.repo, treeIDChan, treeJobChan1, done, &wg)
go c.checkTreeWorker(treeJobChan2, treeErrChan, done, &wg)
}
filterTrees(trees, treeIDChan, treeJobChan1, treeJobChan2, done)
wg.Wait()
}
func (c *Checker) checkTree(id backend.ID, tree *restic.Tree) (errs []error) {
debug.Log("Checker.checkTree", "checking tree %v", id.Str())
// if _, ok := c.blobs[id2map(id)]; !ok {
// errs = append(errs, Error{TreeID: id, Err: errors.New("not found in index")})
// }
// blobs, subtrees, treeErrors := c.tree(id)
// if treeErrors != nil {
// debug.Log("Checker.trees", "error checking tree %v: %v", id.Str(), treeErrors)
// errs = append(errs, treeErrors...)
// continue
// }
// treeIDs = append(treeIDs, subtrees...)
// treesChecked[id2map(id)] = struct{}{}
var blobs []backend.ID
for i, node := range tree.Nodes {
switch node.Type {
case "file":
blobs = append(blobs, node.Content...)
case "dir":
if node.Subtree == nil {
errs = append(errs, Error{TreeID: id, Err: fmt.Errorf("node %d is dir but has no subtree", i)})
continue
}
}
}
for _, blobID := range blobs {
mid := id2map(blobID)
c.blobRefs.Lock()
c.blobRefs.M[mid]++
debug.Log("Checker.checkTree", "blob %v refcount %d", blobID.Str(), c.blobRefs.M[mid])
c.blobRefs.Unlock()
if _, ok := c.blobs[id2map(blobID)]; !ok {
debug.Log("Checker.trees", "tree %v references blob %v which isn't contained in index", id.Str(), blobID.Str())
errs = append(errs, Error{TreeID: id, BlobID: blobID, Err: errors.New("not found in index")})
}
}
return errs
}
// UnusedBlobs returns all blobs that have never been referenced.
func (c *Checker) UnusedBlobs() (blobs backend.IDs) {
c.blobRefs.Lock()
defer c.blobRefs.Unlock()
debug.Log("Checker.UnusedBlobs", "checking %d blobs", len(c.blobs))
for id := range c.blobs {
if c.blobRefs.M[id] == 0 {
debug.Log("Checker.UnusedBlobs", "blob %v not not referenced", map2id(id).Str())
blobs = append(blobs, map2id(id))
}
}
return blobs
}
// OrphanedPacks returns a slice of unused packs (only available after Packs() was run).
func (c *Checker) OrphanedPacks() backend.IDs {
return c.orphanedPacks
}

142
checker/checker_test.go Normal file
View file

@ -0,0 +1,142 @@
package checker_test
import (
"path/filepath"
"sort"
"testing"
"github.com/restic/restic/backend"
"github.com/restic/restic/checker"
"github.com/restic/restic/repository"
. "github.com/restic/restic/test"
)
var checkerTestData = filepath.Join("testdata", "checker-test-repo.tar.gz")
func list(repo *repository.Repository, t backend.Type) (IDs []string) {
done := make(chan struct{})
defer close(done)
for id := range repo.List(t, done) {
IDs = append(IDs, id.String())
}
return IDs
}
func checkPacks(chkr *checker.Checker) (errs []error) {
done := make(chan struct{})
defer close(done)
errChan := make(chan error)
go chkr.Packs(errChan, done)
for err := range errChan {
errs = append(errs, err)
}
return errs
}
func checkStruct(chkr *checker.Checker) (errs []error) {
done := make(chan struct{})
defer close(done)
errChan := make(chan error)
go chkr.Structure(errChan, done)
for err := range errChan {
errs = append(errs, err)
}
return errs
}
func TestCheckRepo(t *testing.T) {
WithTestEnvironment(t, checkerTestData, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
chkr := checker.New(repo)
OK(t, chkr.LoadIndex())
OKs(t, checkPacks(chkr))
OKs(t, checkStruct(chkr))
})
}
func TestMissingPack(t *testing.T) {
WithTestEnvironment(t, checkerTestData, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
packID := "657f7fb64f6a854fff6fe9279998ee09034901eded4e6db9bcee0e59745bbce6"
OK(t, repo.Backend().Remove(backend.Data, packID))
chkr := checker.New(repo)
OK(t, chkr.LoadIndex())
errs := checkPacks(chkr)
Assert(t, len(errs) == 1,
"expected exactly one error, got %v", len(errs))
if err, ok := errs[0].(checker.PackError); ok {
Equals(t, packID, err.ID.String())
} else {
t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err)
}
})
}
func TestUnreferencedPack(t *testing.T) {
WithTestEnvironment(t, checkerTestData, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
// index 8eb5 only references pack 60e0
indexID := "8eb5b61062bf8e959f244fba0c971108bc8d4d2a4b236f71a704998e28cc5cf6"
packID := "60e0438dcb978ec6860cc1f8c43da648170ee9129af8f650f876bad19f8f788e"
OK(t, repo.Backend().Remove(backend.Index, indexID))
chkr := checker.New(repo)
OK(t, chkr.LoadIndex())
errs := checkPacks(chkr)
Assert(t, len(errs) == 1,
"expected exactly one error, got %v", len(errs))
if err, ok := errs[0].(checker.PackError); ok {
Equals(t, packID, err.ID.String())
} else {
t.Errorf("expected error returned by checker.Packs() to be PackError, got %v", err)
}
})
}
func TestUnreferencedBlobs(t *testing.T) {
WithTestEnvironment(t, checkerTestData, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
snID := "51d249d28815200d59e4be7b3f21a157b864dc343353df9d8e498220c2499b02"
OK(t, repo.Backend().Remove(backend.Snapshot, snID))
unusedBlobsBySnapshot := backend.IDs{
ParseID("58c748bbe2929fdf30c73262bd8313fe828f8925b05d1d4a87fe109082acb849"),
ParseID("988a272ab9768182abfd1fe7d7a7b68967825f0b861d3b36156795832c772235"),
ParseID("c01952de4d91da1b1b80bc6e06eaa4ec21523f4853b69dc8231708b9b7ec62d8"),
ParseID("bec3a53d7dc737f9a9bee68b107ec9e8ad722019f649b34d474b9982c3a3fec7"),
ParseID("2a6f01e5e92d8343c4c6b78b51c5a4dc9c39d42c04e26088c7614b13d8d0559d"),
ParseID("18b51b327df9391732ba7aaf841a4885f350d8a557b2da8352c9acf8898e3f10"),
}
sort.Sort(unusedBlobsBySnapshot)
chkr := checker.New(repo)
OK(t, chkr.LoadIndex())
OKs(t, checkPacks(chkr))
OKs(t, checkStruct(chkr))
blobs := chkr.UnusedBlobs()
sort.Sort(blobs)
Equals(t, unusedBlobsBySnapshot, blobs)
})
}

Binary file not shown.

106
cmd/restic/cmd_check.go Normal file
View file

@ -0,0 +1,106 @@
package main
import (
"errors"
"fmt"
"os"
"github.com/restic/restic/backend"
"github.com/restic/restic/checker"
)
type CmdCheck struct {
ReadData bool `long:"read-data" description:"Read data blobs" default:"false"`
RemoveOrphaned bool `long:"remove" description:"Remove data that isn't used" default:"false"`
global *GlobalOptions
}
func init() {
_, err := parser.AddCommand("check",
"check the repository",
"The check command check the integrity and consistency of the repository",
&CmdCheck{global: &globalOpts})
if err != nil {
panic(err)
}
}
func (cmd CmdCheck) Usage() string {
return "[check-options]"
}
func (cmd CmdCheck) Execute(args []string) error {
if len(args) != 0 {
return errors.New("check has no arguments")
}
repo, err := cmd.global.OpenRepository()
if err != nil {
return err
}
cmd.global.Verbosef("Create exclusive lock for repository\n")
lock, err := lockRepoExclusive(repo)
defer unlockRepo(lock)
if err != nil {
return err
}
chkr := checker.New(repo)
cmd.global.Verbosef("Load indexes\n")
if err = chkr.LoadIndex(); err != nil {
return err
}
done := make(chan struct{})
defer close(done)
errorsFound := false
errChan := make(chan error)
cmd.global.Verbosef("Check all packs\n")
go chkr.Packs(errChan, done)
foundOrphanedPacks := false
for err := range errChan {
errorsFound = true
fmt.Fprintf(os.Stderr, "%v\n", err)
if e, ok := err.(checker.PackError); ok && e.Orphaned {
foundOrphanedPacks = true
}
}
cmd.global.Verbosef("Check snapshots, trees and blobs\n")
errChan = make(chan error)
go chkr.Structure(errChan, done)
for err := range errChan {
errorsFound = true
fmt.Fprintf(os.Stderr, "error: %v\n", err)
}
for _, id := range chkr.UnusedBlobs() {
cmd.global.Verbosef("unused blob %v\n", id.Str())
}
if foundOrphanedPacks && cmd.RemoveOrphaned {
IDs := chkr.OrphanedPacks()
cmd.global.Verbosef("Remove %d orphaned packs... ", len(IDs))
for _, id := range IDs {
if err := repo.Backend().Remove(backend.Data, id.String()); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}
cmd.global.Verbosef("done\n")
}
if errorsFound {
return errors.New("repository contains errors")
}
return nil
}

View file

@ -1,272 +0,0 @@
package main
import (
"errors"
"fmt"
"os"
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/crypto"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/repository"
)
type CmdFsck struct {
CheckData bool ` long:"check-data" description:"Read data blobs" default:"false"`
Snapshot string `short:"s" long:"snapshot" description:"Only check this snapshot"`
Orphaned bool `short:"o" long:"orphaned" description:"Check for orphaned blobs"`
RemoveOrphaned bool `short:"r" long:"remove-orphaned" description:"Remove orphaned blobs (implies -o)"`
global *GlobalOptions
// lists checking for orphaned blobs
o_data *backend.IDSet
o_trees *backend.IDSet
}
func init() {
_, err := parser.AddCommand("fsck",
"check the repository",
"The fsck command check the integrity and consistency of the repository",
&CmdFsck{global: &globalOpts})
if err != nil {
panic(err)
}
}
func fsckFile(global CmdFsck, repo *repository.Repository, IDs []backend.ID) (uint64, error) {
debug.Log("restic.fsckFile", "checking file %v", IDs)
var bytes uint64
for _, id := range IDs {
debug.Log("restic.fsck", " checking data blob %v\n", id)
// test if blob is in the index
packID, tpe, _, length, err := repo.Index().Lookup(id)
if err != nil {
return 0, fmt.Errorf("storage for blob %v (%v) not found", id, tpe)
}
bytes += uint64(length - crypto.Extension)
debug.Log("restic.fsck", " blob found in pack %v\n", packID)
if global.CheckData {
// load content
_, err := repo.LoadBlob(pack.Data, id)
if err != nil {
return 0, err
}
} else {
// test if pack for data blob is there
ok, err := repo.Backend().Test(backend.Data, packID.String())
if err != nil {
return 0, err
}
if !ok {
return 0, fmt.Errorf("data blob %v not found", id)
}
}
// if orphan check is active, record storage id
if global.o_data != nil {
debug.Log("restic.fsck", " recording blob %v as used\n", id)
global.o_data.Insert(id)
}
}
return bytes, nil
}
func fsckTree(global CmdFsck, repo *repository.Repository, id backend.ID) error {
debug.Log("restic.fsckTree", "checking tree %v", id.Str())
tree, err := restic.LoadTree(repo, id)
if err != nil {
return err
}
// if orphan check is active, record storage id
if global.o_trees != nil {
// add ID to list
global.o_trees.Insert(id)
}
var firstErr error
seenIDs := backend.NewIDSet()
for i, node := range tree.Nodes {
if node.Name == "" {
return fmt.Errorf("node %v of tree %v has no name", i, id.Str())
}
if node.Type == "" {
return fmt.Errorf("node %q of tree %v has no type", node.Name, id.Str())
}
switch node.Type {
case "file":
if node.Content == nil {
debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, id, node)
return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, id, node)
}
if node.Content == nil && node.Error == "" {
debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, id)
return fmt.Errorf("file node %q of tree %v has no content", node.Name, id)
}
// record ids
for _, id := range node.Content {
seenIDs.Insert(id)
}
debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, id.Str())
bytes, err := fsckFile(global, repo, node.Content)
if err != nil {
return err
}
if bytes != node.Size {
debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes)
return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes)
}
case "dir":
if node.Subtree == nil {
return fmt.Errorf("dir node %q of tree %v has no subtree", node.Name, id)
}
// record id
seenIDs.Insert(node.Subtree)
err = fsckTree(global, repo, node.Subtree)
if err != nil {
firstErr = err
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}
}
// check map for unused ids
// for _, id := range tree.Map.IDs() {
// if seenIDs.Find(id) != nil {
// return fmt.Errorf("tree %v: map contains unused ID %v", id, id)
// }
// }
return firstErr
}
func fsckSnapshot(global CmdFsck, repo *repository.Repository, id backend.ID) error {
debug.Log("restic.fsck", "checking snapshot %v\n", id)
sn, err := restic.LoadSnapshot(repo, id)
if err != nil {
return fmt.Errorf("loading snapshot %v failed: %v", id, err)
}
err = fsckTree(global, repo, sn.Tree)
if err != nil {
debug.Log("restic.fsck", " checking tree %v for snapshot %v\n", sn.Tree, id)
fmt.Fprintf(os.Stderr, "snapshot %v:\n error for tree %v:\n %v\n", id, sn.Tree, err)
}
return err
}
func (cmd CmdFsck) Usage() string {
return "[fsck-options]"
}
func (cmd CmdFsck) Execute(args []string) error {
if len(args) != 0 {
return errors.New("fsck has no arguments")
}
if cmd.RemoveOrphaned && !cmd.Orphaned {
cmd.Orphaned = true
}
repo, err := cmd.global.OpenRepository()
if err != nil {
return err
}
lock, err := lockRepoExclusive(repo)
defer unlockRepo(lock)
if err != nil {
return err
}
err = repo.LoadIndex()
if err != nil {
return err
}
if cmd.Snapshot != "" {
id, err := restic.FindSnapshot(repo, cmd.Snapshot)
if err != nil {
return fmt.Errorf("invalid id %q: %v", cmd.Snapshot, err)
}
err = fsckSnapshot(cmd, repo, id)
if err != nil {
fmt.Fprintf(os.Stderr, "check for snapshot %v failed\n", id)
}
return err
}
if cmd.Orphaned {
cmd.o_data = backend.NewIDSet()
cmd.o_trees = backend.NewIDSet()
}
done := make(chan struct{})
defer close(done)
var firstErr error
for id := range repo.List(backend.Snapshot, done) {
err = fsckSnapshot(cmd, repo, id)
if err != nil {
fmt.Fprintf(os.Stderr, "check for snapshot %v failed\n", id)
firstErr = err
}
}
if !cmd.Orphaned {
return firstErr
}
debug.Log("restic.fsck", "starting orphaned check\n")
cnt := make(map[pack.BlobType]*backend.IDSet)
cnt[pack.Data] = cmd.o_data
cnt[pack.Tree] = cmd.o_trees
for blob := range repo.Index().Each(done) {
debug.Log("restic.fsck", "checking %v blob %v\n", blob.Type, blob.ID)
err = cnt[blob.Type].Find(blob.ID)
if err != nil {
debug.Log("restic.fsck", " blob %v is orphaned\n", blob.ID)
if !cmd.RemoveOrphaned {
fmt.Printf("orphaned %v blob %v\n", blob.Type, blob.ID)
continue
}
fmt.Printf("removing orphaned %v blob %v\n", blob.Type, blob.ID)
// err := s.Remove(d.tpe, name)
// if err != nil {
// return err
// }
return errors.New("not implemented")
}
}
return firstErr
}

View file

@ -8,7 +8,6 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"regexp" "regexp"
"syscall" "syscall"
@ -20,20 +19,6 @@ import (
. "github.com/restic/restic/test" . "github.com/restic/restic/test"
) )
func setupTarTestFixture(t testing.TB, outputDir, tarFile string) {
err := system("sh", "-c", `(cd "$1" && tar xz) < "$2"`,
"sh", outputDir, tarFile)
OK(t, err)
}
func system(command string, args ...string) error {
cmd := exec.Command(command, args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}
func parseIDsFromReader(t testing.TB, rd io.Reader) backend.IDs { func parseIDsFromReader(t testing.TB, rd io.Reader) backend.IDs {
IDs := backend.IDs{} IDs := backend.IDs{}
sc := bufio.NewScanner(rd) sc := bufio.NewScanner(rd)
@ -83,8 +68,8 @@ func cmdRestore(t testing.TB, global GlobalOptions, dir string, snapshotID backe
cmd.Execute(append([]string{snapshotID.String(), dir}, args...)) cmd.Execute(append([]string{snapshotID.String(), dir}, args...))
} }
func cmdFsck(t testing.TB, global GlobalOptions) { func cmdCheck(t testing.TB, global GlobalOptions) {
cmd := &CmdFsck{global: &global, CheckData: true, Orphaned: true} cmd := &CmdCheck{global: &global, ReadData: true}
OK(t, cmd.Execute(nil)) OK(t, cmd.Execute(nil))
} }
@ -101,7 +86,7 @@ func TestBackup(t *testing.T) {
cmdInit(t, global) cmdInit(t, global)
setupTarTestFixture(t, env.testdata, datafile) SetupTarTestFixture(t, env.testdata, datafile)
// first backup // first backup
cmdBackup(t, global, []string{env.testdata}, nil) cmdBackup(t, global, []string{env.testdata}, nil)
@ -109,7 +94,7 @@ func TestBackup(t *testing.T) {
Assert(t, len(snapshotIDs) == 1, Assert(t, len(snapshotIDs) == 1,
"expected one snapshot, got %v", snapshotIDs) "expected one snapshot, got %v", snapshotIDs)
cmdFsck(t, global) cmdCheck(t, global)
stat1 := dirStats(env.repo) stat1 := dirStats(env.repo)
// second backup, implicit incremental // second backup, implicit incremental
@ -124,7 +109,7 @@ func TestBackup(t *testing.T) {
} }
t.Logf("repository grown by %d bytes", stat2.size-stat1.size) t.Logf("repository grown by %d bytes", stat2.size-stat1.size)
cmdFsck(t, global) cmdCheck(t, global)
// third backup, explicit incremental // third backup, explicit incremental
cmdBackup(t, global, []string{env.testdata}, snapshotIDs[0]) cmdBackup(t, global, []string{env.testdata}, snapshotIDs[0])
snapshotIDs = cmdList(t, global, "snapshots") snapshotIDs = cmdList(t, global, "snapshots")
@ -146,7 +131,7 @@ func TestBackup(t *testing.T) {
"directories are not equal") "directories are not equal")
} }
cmdFsck(t, global) cmdCheck(t, global)
}) })
} }
@ -161,7 +146,7 @@ func TestBackupNonExistingFile(t *testing.T) {
OK(t, err) OK(t, err)
OK(t, fd.Close()) OK(t, fd.Close())
setupTarTestFixture(t, env.testdata, datafile) SetupTarTestFixture(t, env.testdata, datafile)
cmdInit(t, global) cmdInit(t, global)
@ -189,7 +174,7 @@ func TestBackupMissingFile1(t *testing.T) {
OK(t, err) OK(t, err)
OK(t, fd.Close()) OK(t, fd.Close())
setupTarTestFixture(t, env.testdata, datafile) SetupTarTestFixture(t, env.testdata, datafile)
cmdInit(t, global) cmdInit(t, global)
@ -208,7 +193,7 @@ func TestBackupMissingFile1(t *testing.T) {
}) })
cmdBackup(t, global, []string{env.testdata}, nil) cmdBackup(t, global, []string{env.testdata}, nil)
cmdFsck(t, global) cmdCheck(t, global)
Assert(t, ranHook, "hook did not run") Assert(t, ranHook, "hook did not run")
debug.RemoveHook("pipe.walk1") debug.RemoveHook("pipe.walk1")
@ -226,7 +211,7 @@ func TestBackupMissingFile2(t *testing.T) {
OK(t, err) OK(t, err)
OK(t, fd.Close()) OK(t, fd.Close())
setupTarTestFixture(t, env.testdata, datafile) SetupTarTestFixture(t, env.testdata, datafile)
cmdInit(t, global) cmdInit(t, global)
@ -245,7 +230,7 @@ func TestBackupMissingFile2(t *testing.T) {
}) })
cmdBackup(t, global, []string{env.testdata}, nil) cmdBackup(t, global, []string{env.testdata}, nil)
cmdFsck(t, global) cmdCheck(t, global)
Assert(t, ranHook, "hook did not run") Assert(t, ranHook, "hook did not run")
debug.RemoveHook("pipe.walk2") debug.RemoveHook("pipe.walk2")
@ -290,13 +275,13 @@ func TestIncrementalBackup(t *testing.T) {
OK(t, appendRandomData(testfile, incrementalFirstWrite)) OK(t, appendRandomData(testfile, incrementalFirstWrite))
cmdBackup(t, global, []string{datadir}, nil) cmdBackup(t, global, []string{datadir}, nil)
cmdFsck(t, global) cmdCheck(t, global)
stat1 := dirStats(env.repo) stat1 := dirStats(env.repo)
OK(t, appendRandomData(testfile, incrementalSecondWrite)) OK(t, appendRandomData(testfile, incrementalSecondWrite))
cmdBackup(t, global, []string{datadir}, nil) cmdBackup(t, global, []string{datadir}, nil)
cmdFsck(t, global) cmdCheck(t, global)
stat2 := dirStats(env.repo) stat2 := dirStats(env.repo)
if stat2.size-stat1.size > incrementalFirstWrite { if stat2.size-stat1.size > incrementalFirstWrite {
t.Errorf("repository size has grown by more than %d bytes", incrementalFirstWrite) t.Errorf("repository size has grown by more than %d bytes", incrementalFirstWrite)
@ -306,7 +291,7 @@ func TestIncrementalBackup(t *testing.T) {
OK(t, appendRandomData(testfile, incrementalThirdWrite)) OK(t, appendRandomData(testfile, incrementalThirdWrite))
cmdBackup(t, global, []string{datadir}, nil) cmdBackup(t, global, []string{datadir}, nil)
cmdFsck(t, global) cmdCheck(t, global)
stat3 := dirStats(env.repo) stat3 := dirStats(env.repo)
if stat3.size-stat2.size > incrementalFirstWrite { if stat3.size-stat2.size > incrementalFirstWrite {
t.Errorf("repository size has grown by more than %d bytes", incrementalFirstWrite) t.Errorf("repository size has grown by more than %d bytes", incrementalFirstWrite)
@ -387,7 +372,7 @@ func TestKeyAddRemove(t *testing.T) {
t.Logf("testing access with last password %q\n", global.password) t.Logf("testing access with last password %q\n", global.password)
cmdKey(t, global, "list") cmdKey(t, global, "list")
cmdFsck(t, global) cmdCheck(t, global)
}) })
} }
@ -425,7 +410,7 @@ func TestRestoreFilter(t *testing.T) {
} }
cmdBackup(t, global, []string{env.testdata}, nil) cmdBackup(t, global, []string{env.testdata}, nil)
cmdFsck(t, global) cmdCheck(t, global)
snapshotID := cmdList(t, global, "snapshots")[0] snapshotID := cmdList(t, global, "snapshots")[0]
@ -471,7 +456,7 @@ func TestRestoreNoMetadataOnIgnoredIntermediateDirs(t *testing.T) {
OK(t, setZeroModTime(filepath.Join(env.testdata, "subdir1", "subdir2"))) OK(t, setZeroModTime(filepath.Join(env.testdata, "subdir1", "subdir2")))
cmdBackup(t, global, []string{env.testdata}, nil) cmdBackup(t, global, []string{env.testdata}, nil)
cmdFsck(t, global) cmdCheck(t, global)
snapshotID := cmdList(t, global, "snapshots")[0] snapshotID := cmdList(t, global, "snapshots")[0]

View file

@ -2,7 +2,6 @@ package repository
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"sync" "sync"
@ -79,7 +78,7 @@ func (idx *Index) Lookup(id backend.ID) (packID backend.ID, tpe pack.BlobType, o
} }
debug.Log("Index.Lookup", "id %v not found", id.Str()) debug.Log("Index.Lookup", "id %v not found", id.Str())
return nil, pack.Data, 0, 0, errors.New("id not found") return nil, pack.Data, 0, 0, fmt.Errorf("id %v not found in index", id)
} }
// Has returns true iff the id is listed in the index. // Has returns true iff the id is listed in the index.
@ -108,13 +107,19 @@ func (idx *Index) Merge(other *Index) {
debug.Log("Index.Merge", "done merging index") debug.Log("Index.Merge", "done merging index")
} }
// PackedBlob is a blob already saved within a pack.
type PackedBlob struct {
pack.Blob
PackID backend.ID
}
// Each returns a channel that yields all blobs known to the index. If done is // Each returns a channel that yields all blobs known to the index. If done is
// closed, the background goroutine terminates. This blocks any modification of // closed, the background goroutine terminates. This blocks any modification of
// the index. // the index.
func (idx *Index) Each(done chan struct{}) <-chan pack.Blob { func (idx *Index) Each(done chan struct{}) <-chan PackedBlob {
idx.m.Lock() idx.m.Lock()
ch := make(chan pack.Blob) ch := make(chan PackedBlob)
go func() { go func() {
defer idx.m.Unlock() defer idx.m.Unlock()
@ -132,11 +137,14 @@ func (idx *Index) Each(done chan struct{}) <-chan pack.Blob {
select { select {
case <-done: case <-done:
return return
case ch <- pack.Blob{ case ch <- PackedBlob{
ID: id, Blob: pack.Blob{
Offset: blob.offset, ID: id,
Type: blob.tpe, Offset: blob.offset,
Length: uint32(blob.length), Type: blob.tpe,
Length: uint32(blob.length),
},
PackID: blob.packID,
}: }:
} }
} }

View file

@ -35,6 +35,21 @@ func OK(tb testing.TB, err error) {
} }
} }
// OKs fails the test if any error from errs is not nil.
func OKs(tb testing.TB, errs []error) {
errFound := false
for _, err := range errs {
if err != nil {
errFound = true
_, file, line, _ := runtime.Caller(1)
fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error())
}
}
if errFound {
tb.FailNow()
}
}
// Equals fails the test if exp is not equal to act. // Equals fails the test if exp is not equal to act.
func Equals(tb testing.TB, exp, act interface{}) { func Equals(tb testing.TB, exp, act interface{}) {
if !reflect.DeepEqual(exp, act) { if !reflect.DeepEqual(exp, act) {

11
tree.go
View file

@ -89,3 +89,14 @@ func (t Tree) Find(name string) (*Node, error) {
_, node, err := t.binarySearch(name) _, node, err := t.binarySearch(name)
return node, err return node, err
} }
// Subtrees returns a slice of all subtree IDs of the tree.
func (t Tree) Subtrees() (trees backend.IDs) {
for _, node := range t.Nodes {
if node.Type == "dir" && node.Subtree != nil {
trees = append(trees, node.Subtree)
}
}
return trees
}

19
walk.go
View file

@ -21,7 +21,11 @@ func walkTree(repo *repository.Repository, path string, treeID backend.ID, done
t, err := LoadTree(repo, treeID) t, err := LoadTree(repo, treeID)
if err != nil { if err != nil {
jobCh <- WalkTreeJob{Path: path, Error: err} select {
case jobCh <- WalkTreeJob{Path: path, Error: err}:
case <-done:
return
}
return return
} }
@ -30,11 +34,20 @@ func walkTree(repo *repository.Repository, path string, treeID backend.ID, done
if node.Type == "dir" { if node.Type == "dir" {
walkTree(repo, p, node.Subtree, done, jobCh) walkTree(repo, p, node.Subtree, done, jobCh)
} else { } else {
jobCh <- WalkTreeJob{Path: p, Node: node} select {
case jobCh <- WalkTreeJob{Path: p, Node: node}:
case <-done:
return
}
} }
} }
jobCh <- WalkTreeJob{Path: path, Tree: t} select {
case jobCh <- WalkTreeJob{Path: path, Tree: t}:
case <-done:
return
}
debug.Log("walkTree", "done for %q (%v)", path, treeID.Str()) debug.Log("walkTree", "done for %q (%v)", path, treeID.Str())
} }