Merge pull request #3207 from aawsome/filerestorer-fix-errorhandling

Fix error handling in filerestorer and fix #3166
This commit is contained in:
MichaelEischer 2021-01-31 17:47:46 +01:00 committed by GitHub
commit b3dc127af5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 60 deletions

View file

@ -0,0 +1,9 @@
Bugfix: Improve error handling in restore
Restic restore used to not print errors while downloading file contents from
the repository. restore also incorrectly exited with a zero error code even
when there were errors during the restore process. Now, a non-zero code is
returned.
https://github.com/restic/restic/issues/3166
https://github.com/restic/restic/pull/3207

View file

@ -191,14 +191,26 @@ func runRestore(opts RestoreOptions, gopts GlobalOptions, args []string) error {
Verbosef("restoring %s to %s\n", res.Snapshot(), opts.Target) Verbosef("restoring %s to %s\n", res.Snapshot(), opts.Target)
err = res.RestoreTo(ctx, opts.Target) err = res.RestoreTo(ctx, opts.Target)
if err == nil && opts.Verify { if err != nil {
return err
}
if totalErrors > 0 {
return errors.Fatalf("There were %d errors\n", totalErrors)
}
if opts.Verify {
Verbosef("verifying files in %s\n", opts.Target) Verbosef("verifying files in %s\n", opts.Target)
var count int var count int
count, err = res.VerifyFiles(ctx, opts.Target) count, err = res.VerifyFiles(ctx, opts.Target)
if err != nil {
return err
}
if totalErrors > 0 {
return errors.Fatalf("There were %d errors\n", totalErrors)
}
Verbosef("finished verifying %d files in %s\n", count, opts.Target) Verbosef("finished verifying %d files in %s\n", count, opts.Target)
} }
if totalErrors > 0 {
Printf("There were %d errors\n", totalErrors) return nil
}
return err
} }

View file

@ -9,6 +9,8 @@ import (
"sort" "sort"
"sync" "sync"
"golang.org/x/sync/errgroup"
"github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
@ -23,20 +25,16 @@ import (
const ( const (
workerCount = 8 workerCount = 8
// fileInfo flags
fileProgress = 1
fileError = 2
largeFileBlobCount = 25 largeFileBlobCount = 25
) )
// information about regular file being restored // information about regular file being restored
type fileInfo struct { type fileInfo struct {
lock sync.Mutex lock sync.Mutex
flags int inProgress bool
size int64 size int64
location string // file on local filesystem relative to restorer basedir location string // file on local filesystem relative to restorer basedir
blobs interface{} // blobs of the file blobs interface{} // blobs of the file
} }
type fileBlobInfo struct { type fileBlobInfo struct {
@ -60,6 +58,7 @@ type fileRestorer struct {
dst string dst string
files []*fileInfo files []*fileInfo
Error func(string, error) error
} }
func newFileRestorer(dst string, func newFileRestorer(dst string,
@ -73,6 +72,7 @@ func newFileRestorer(dst string,
packLoader: packLoader, packLoader: packLoader,
filesWriter: newFilesWriter(workerCount), filesWriter: newFilesWriter(workerCount),
dst: dst, dst: dst,
Error: restorerAbortOnAllErrors,
} }
} }
@ -142,47 +142,42 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
} }
} }
var wg sync.WaitGroup wg, ctx := errgroup.WithContext(ctx)
downloadCh := make(chan *packInfo) downloadCh := make(chan *packInfo)
worker := func() {
defer wg.Done() worker := func() error {
for { for pack := range downloadCh {
select { if err := r.downloadPack(ctx, pack); err != nil {
case <-ctx.Done(): return err
return // context cancelled
case pack, ok := <-downloadCh:
if !ok {
return // channel closed
}
r.downloadPack(ctx, pack)
} }
} }
return nil
} }
for i := 0; i < workerCount; i++ { for i := 0; i < workerCount; i++ {
wg.Add(1) wg.Go(worker)
go worker()
} }
// the main restore loop // the main restore loop
for _, id := range packOrder { wg.Go(func() error {
pack := packs[id] for _, id := range packOrder {
select { pack := packs[id]
case <-ctx.Done(): select {
return ctx.Err() case <-ctx.Done():
case downloadCh <- pack: return ctx.Err()
debug.Log("Scheduled download pack %s", pack.id.Str()) case downloadCh <- pack:
debug.Log("Scheduled download pack %s", pack.id.Str())
}
} }
} close(downloadCh)
return nil
})
close(downloadCh) return wg.Wait()
wg.Wait()
return nil
} }
const maxBufferSize = 4 * 1024 * 1024 const maxBufferSize = 4 * 1024 * 1024
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) { func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
// calculate pack byte range and blob->[]files->[]offsets mappings // calculate pack byte range and blob->[]files->[]offsets mappings
start, end := int64(math.MaxInt64), int64(0) start, end := int64(math.MaxInt64), int64(0)
@ -237,12 +232,11 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset
}) })
markFileError := func(file *fileInfo, err error) { sanitizeError := func(file *fileInfo, err error) error {
file.lock.Lock() if err != nil {
defer file.lock.Unlock() err = r.Error(file.location, err)
if file.flags&fileError == 0 {
file.flags |= fileError
} }
return err
} }
h := restic.Handle{Type: restic.PackFile, Name: pack.id.String()} h := restic.Handle{Type: restic.PackFile, Name: pack.id.String()}
@ -263,7 +257,9 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
blobData, buf, err = r.loadBlob(bufRd, blobID, blob.length, buf) blobData, buf, err = r.loadBlob(bufRd, blobID, blob.length, buf)
if err != nil { if err != nil {
for file := range blob.files { for file := range blob.files {
markFileError(file, err) if errFile := sanitizeError(file, err); errFile != nil {
return errFile
}
} }
continue continue
} }
@ -277,37 +273,36 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
// - should allow concurrent writes to the file // - should allow concurrent writes to the file
// so write the first blob while holding file lock // so write the first blob while holding file lock
// write other blobs after releasing the lock // write other blobs after releasing the lock
file.lock.Lock()
create := file.flags&fileProgress == 0
createSize := int64(-1) createSize := int64(-1)
if create { file.lock.Lock()
defer file.lock.Unlock() if file.inProgress {
file.flags |= fileProgress
createSize = file.size
} else {
file.lock.Unlock() file.lock.Unlock()
} else {
defer file.lock.Unlock()
file.inProgress = true
createSize = file.size
} }
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize) return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize)
} }
err := writeToFile() err := sanitizeError(file, writeToFile())
if err != nil { if err != nil {
markFileError(file, err) return err
break
} }
} }
} }
} }
return nil return nil
}) })
if err != nil { if err != nil {
for file := range pack.files { for file := range pack.files {
markFileError(file, err) if errFile := sanitizeError(file, err); errFile != nil {
return errFile
}
} }
return
} }
return nil
} }
func (r *fileRestorer) loadBlob(rd io.Reader, blobID restic.ID, length int, buf []byte) ([]byte, []byte, error) { func (r *fileRestorer) loadBlob(rd io.Reader, blobID restic.ID, length int, buf []byte) ([]byte, []byte, error) {

View file

@ -8,6 +8,7 @@ import (
"testing" "testing"
"github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test"
) )
@ -237,3 +238,29 @@ func TestFileRestorerPackSkip(t *testing.T) {
}, },
}, files) }, files)
} }
func TestErrorRestoreFiles(t *testing.T) {
tempdir, cleanup := rtest.TempDir(t)
defer cleanup()
content := []TestFile{
{
name: "file1",
blobs: []TestBlob{
{"data1-1", "pack1-1"},
},
}}
repo := newTestRepo(content)
loadError := errors.New("load error")
// loader always returns an error
repo.loader = func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return loadError
}
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup)
r.files = repo.files
err := r.restoreFiles(context.TODO())
rtest.Equals(t, loadError, err)
}

View file

@ -216,6 +216,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
idx := restic.NewHardlinkIndex() idx := restic.NewHardlinkIndex()
filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup) filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup)
filerestorer.Error = res.Error
debug.Log("first pass for %q", dst) debug.Log("first pass for %q", dst)