Merge pull request #1770 from restic/handle-blob-save-errors

archiver: Correct error handling
This commit is contained in:
Alexander Neumann 2018-05-10 22:36:00 +02:00
commit 580f90d745
6 changed files with 159 additions and 119 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
tomb "gopkg.in/tomb.v2"
)
// SelectFunc returns true for all items that should be included (files and
@ -131,37 +132,16 @@ func New(repo restic.Repository, fs fs.FS, opts Options) *Archiver {
return arch
}
// Valid returns an error if anything is missing.
func (arch *Archiver) Valid() error {
if arch.blobSaver == nil {
return errors.New("blobSaver is nil")
}
if arch.fileSaver == nil {
return errors.New("fileSaver is nil")
}
if arch.Repo == nil {
return errors.New("repo is not set")
}
if arch.Select == nil {
return errors.New("Select is not set")
}
if arch.FS == nil {
return errors.New("FS is not set")
}
return nil
}
// error calls arch.Error if it is set.
// error calls arch.Error if it is set and the error is different from context.Canceled.
func (arch *Archiver) error(item string, fi os.FileInfo, err error) error {
if arch.Error == nil || err == nil {
return err
}
if err == context.Canceled {
return err
}
errf := arch.Error(item, fi, err)
if err != errf {
debug.Log("item %v: error was filtered by handler, before: %q, after: %v", item, err, errf)
@ -184,10 +164,8 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID,
b := &Buffer{Data: buf}
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
if res.Err() != nil {
return restic.ID{}, s, res.Err()
}
res.Wait(ctx)
if !res.Known() {
s.TreeBlobs++
s.TreeSize += uint64(len(buf))
@ -238,6 +216,11 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo
nodes := make([]FutureNode, 0, len(names))
for _, name := range names {
// test if context has been cancelled
if ctx.Err() != nil {
return FutureTree{}, ctx.Err()
}
pathname := arch.FS.Join(dir, name)
oldNode := previous.Find(name)
snItem := join(snPath, name)
@ -299,7 +282,6 @@ func (fn *FutureNode) wait(ctx context.Context) {
case fn.isDir:
// wait for and collect the data for the dir
fn.node = fn.dir.Node()
fn.err = fn.dir.Err()
fn.stats = fn.dir.Stats()
// ensure the other stuff can be garbage-collected
@ -496,6 +478,10 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
futureNodes := make(map[string]FutureNode)
for name, subatree := range atree.Nodes {
// test if context has been cancelled
if ctx.Err() != nil {
return nil, ctx.Err()
}
// this is a leaf node
if subatree.Path != "" {
@ -722,10 +708,10 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID)
}
// runWorkers starts the worker pools, which are stopped when the context is cancelled.
func (arch *Archiver) runWorkers(ctx context.Context) {
arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency)
func (arch *Archiver) runWorkers(ctx context.Context, t *tomb.Tomb) {
arch.blobSaver = NewBlobSaver(ctx, t, arch.Repo, arch.Options.SaveBlobConcurrency)
arch.fileSaver = NewFileSaver(ctx,
arch.fileSaver = NewFileSaver(ctx, t,
arch.FS,
arch.blobSaver,
arch.Repo.Config().ChunkerPolynomial,
@ -733,21 +719,11 @@ func (arch *Archiver) runWorkers(ctx context.Context) {
arch.fileSaver.CompleteBlob = arch.CompleteBlob
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
arch.treeSaver = NewTreeSaver(ctx, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error)
arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error)
}
// Snapshot saves several targets and returns a snapshot.
func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts SnapshotOptions) (*restic.Snapshot, restic.ID, error) {
workerCtx, cancel := context.WithCancel(ctx)
defer cancel()
arch.runWorkers(workerCtx)
err := arch.Valid()
if err != nil {
return nil, restic.ID{}, err
}
cleanTargets, err := resolveRelativeTargets(arch.FS, targets)
if err != nil {
return nil, restic.ID{}, err
@ -758,14 +734,32 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
return nil, restic.ID{}, err
}
var t tomb.Tomb
wctx := t.Context(ctx)
arch.runWorkers(wctx, &t)
start := time.Now()
tree, err := arch.SaveTree(ctx, "/", atree, arch.loadParentTree(ctx, opts.ParentSnapshot))
if err != nil {
return nil, restic.ID{}, err
debug.Log("starting snapshot")
rootTreeID, stats, err := func() (restic.ID, ItemStats, error) {
tree, err := arch.SaveTree(wctx, "/", atree, arch.loadParentTree(wctx, opts.ParentSnapshot))
if err != nil {
return restic.ID{}, ItemStats{}, err
}
return arch.saveTree(wctx, tree)
}()
debug.Log("saved tree, error: %v", err)
t.Kill(nil)
werr := t.Wait()
if err != nil && errors.Cause(err) == context.Canceled {
err = werr
}
rootTreeID, stats, err := arch.saveTree(ctx, tree)
if err != nil {
debug.Log("error while saving tree: %v", err)
return nil, restic.ID{}, err
}

View file

@ -17,6 +17,7 @@ import (
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
restictest "github.com/restic/restic/internal/test"
tomb "gopkg.in/tomb.v2"
)
func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo restic.Repository, cleanup func()) {
@ -34,11 +35,11 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo rest
}
func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var tmb tomb.Tomb
ctx := tmb.Context(context.Background())
arch := New(repo, filesystem, Options{})
arch.runWorkers(ctx)
arch.runWorkers(ctx, &tmb)
var (
completeCallbackNode *restic.Node
@ -73,6 +74,12 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
t.Fatal(res.Err())
}
tmb.Kill(nil)
err = tmb.Wait()
if err != nil {
t.Fatal(err)
}
err = repo.Flush(ctx)
if err != nil {
t.Fatal(err)
@ -586,14 +593,14 @@ func TestArchiverSaveDir(t *testing.T) {
for _, test := range tests {
t.Run("", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var tmb tomb.Tomb
ctx := tmb.Context(context.Background())
tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src)
defer cleanup()
arch := New(repo, fs.Track{fs.Local{}}, Options{})
arch.runWorkers(ctx)
arch.runWorkers(ctx, &tmb)
chdir := tempdir
if test.chdir != "" {
@ -613,7 +620,10 @@ func TestArchiverSaveDir(t *testing.T) {
t.Fatal(err)
}
node, stats, err := ft.Node(), ft.Stats(), ft.Err()
node, stats := ft.Node(), ft.Stats()
tmb.Kill(nil)
err = tmb.Wait()
if err != nil {
t.Fatal(err)
}
@ -675,11 +685,11 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
// save the empty directory several times in a row, then have a look if the
// archiver did save the same tree several times
for i := 0; i < 5; i++ {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var tmb tomb.Tomb
ctx := tmb.Context(context.Background())
arch := New(repo, fs.Track{fs.Local{}}, Options{})
arch.runWorkers(ctx)
arch.runWorkers(ctx, &tmb)
fi, err := fs.Lstat(tempdir)
if err != nil {
@ -691,7 +701,10 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
t.Fatal(err)
}
node, stats, err := ft.Node(), ft.Stats(), ft.Err()
node, stats := ft.Node(), ft.Stats()
tmb.Kill(nil)
err = tmb.Wait()
if err != nil {
t.Fatal(err)
}
@ -828,8 +841,8 @@ func TestArchiverSaveTree(t *testing.T) {
for _, test := range tests {
t.Run("", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var tmb tomb.Tomb
ctx := tmb.Context(context.Background())
tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src)
defer cleanup()
@ -837,7 +850,7 @@ func TestArchiverSaveTree(t *testing.T) {
testFS := fs.Track{fs.Local{}}
arch := New(repo, testFS, Options{})
arch.runWorkers(ctx)
arch.runWorkers(ctx, &tmb)
back := fs.TestChdir(t, tempdir)
defer back()
@ -861,6 +874,12 @@ func TestArchiverSaveTree(t *testing.T) {
t.Fatal(err)
}
tmb.Kill(nil)
err = tmb.Wait()
if err != nil {
t.Fatal(err)
}
err = repo.Flush(ctx)
if err != nil {
t.Fatal(err)

View file

@ -4,6 +4,8 @@ import (
"context"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
@ -21,12 +23,11 @@ type BlobSaver struct {
knownBlobs restic.BlobSet
ch chan<- saveBlobJob
wg sync.WaitGroup
}
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
// when ctx is cancelled.
func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
func NewBlobSaver(ctx context.Context, g Goer, repo Saver, workers uint) *BlobSaver {
ch := make(chan saveBlobJob)
s := &BlobSaver{
repo: repo,
@ -35,8 +36,9 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
}
for i := uint(0); i < workers; i++ {
s.wg.Add(1)
go s.worker(ctx, &s.wg, ch)
g.Go(func() error {
return s.worker(ctx, ch)
})
}
return s
@ -47,7 +49,13 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
// previously unknown.
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan saveBlobResponse, 1)
s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}
select {
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled")
close(ch)
return FutureBlob{ch: ch}
}
return FutureBlob{ch: ch, length: len(buf.Data)}
}
@ -59,31 +67,28 @@ type FutureBlob struct {
res saveBlobResponse
}
func (s *FutureBlob) wait() {
res, ok := <-s.ch
if ok {
s.res = res
// Wait blocks until the result is available or the context is cancelled.
func (s *FutureBlob) Wait(ctx context.Context) {
select {
case <-ctx.Done():
return
case res, ok := <-s.ch:
if ok {
s.res = res
}
}
}
// ID returns the ID of the blob after it has been saved.
func (s *FutureBlob) ID() restic.ID {
s.wait()
return s.res.id
}
// Known returns whether or not the blob was already known.
func (s *FutureBlob) Known() bool {
s.wait()
return s.res.known
}
// Err returns the error which may have occurred during save.
func (s *FutureBlob) Err() error {
s.wait()
return s.res.err
}
// Length returns the length of the blob.
func (s *FutureBlob) Length() int {
return s.length
@ -98,10 +103,9 @@ type saveBlobJob struct {
type saveBlobResponse struct {
id restic.ID
known bool
err error
}
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) saveBlobResponse {
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) {
id := restic.Hash(buf)
h := restic.BlobHandle{ID: id, Type: t}
@ -121,7 +125,7 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte)
return saveBlobResponse{
id: id,
known: true,
}
}, nil
}
// check if the repo knows this blob
@ -129,29 +133,38 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte)
return saveBlobResponse{
id: id,
known: true,
}
}, nil
}
// otherwise we're responsible for saving it
_, err := s.repo.SaveBlob(ctx, t, buf, id)
if err != nil {
return saveBlobResponse{}, errors.Fatalf("unable to save data: %v", err)
}
return saveBlobResponse{
id: id,
known: false,
err: err,
}
}, nil
}
func (s *BlobSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveBlobJob) {
defer wg.Done()
func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
for {
var job saveBlobJob
select {
case <-ctx.Done():
return
debug.Log("context is cancelled, exiting: %v", ctx.Err())
return nil
case job = <-jobs:
}
job.ch <- s.saveBlob(ctx, job.BlobType, job.buf.Data)
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
if err != nil {
debug.Log("saveBlob returned error: %v", err)
close(job.ch)
return err
}
job.ch <- res
close(job.ch)
job.buf.Release()
}

View file

@ -4,7 +4,6 @@ import (
"context"
"io"
"os"
"sync"
"github.com/restic/chunker"
"github.com/restic/restic/internal/debug"
@ -13,6 +12,11 @@ import (
"github.com/restic/restic/internal/restic"
)
// Goer starts a function in a goroutine.
type Goer interface {
Go(func() error)
}
// FutureFile is returned by Save and will return the data once it
// has been processed.
type FutureFile struct {
@ -54,7 +58,6 @@ type FileSaver struct {
pol chunker.Pol
ch chan<- saveFileJob
wg sync.WaitGroup
CompleteBlob func(filename string, bytes uint64)
@ -63,7 +66,7 @@ type FileSaver struct {
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
// started, it is stopped when ctx is cancelled.
func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
ch := make(chan saveFileJob)
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
@ -81,8 +84,10 @@ func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunk
}
for i := uint(0); i < fileWorkers; i++ {
s.wg.Add(1)
go s.worker(ctx, &s.wg, ch)
g.Go(func() error {
s.worker(ctx, ch)
return nil
})
}
return s
@ -95,7 +100,7 @@ type CompleteFunc func(*restic.Node, ItemStats)
// file is closed by Save.
func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureFile {
ch := make(chan saveFileResponse, 1)
s.ch <- saveFileJob{
job := saveFileJob{
snPath: snPath,
file: file,
fi: fi,
@ -104,6 +109,12 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os
ch: ch,
}
select {
case s.ch <- job:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
}
return FutureFile{ch: ch}
}
@ -189,11 +200,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
}
for _, res := range results {
// test if the context has been cancelled, return the error
if res.Err() != nil {
return saveFileResponse{err: ctx.Err()}
}
res.Wait(ctx)
if !res.Known() {
stats.DataBlobs++
stats.DataSize += uint64(res.Length())
@ -210,11 +217,10 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
}
}
func (s *FileSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveFileJob) {
func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
// a worker has one chunker which is reused for each file (because it contains a rather large buffer)
chnker := chunker.New(nil, s.pol)
defer wg.Done()
for {
var job saveFileJob
select {

View file

@ -2,9 +2,9 @@ package archiver
import (
"context"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
@ -34,24 +34,17 @@ func (s *FutureTree) Stats() ItemStats {
return s.res.stats
}
// Err returns the error in case an error occurred.
func (s *FutureTree) Err() error {
s.wait()
return s.res.err
}
// TreeSaver concurrently saves incoming trees to the repo.
type TreeSaver struct {
saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error)
errFn ErrorFunc
ch chan<- saveTreeJob
wg sync.WaitGroup
}
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
// started, it is stopped when ctx is cancelled.
func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
func NewTreeSaver(ctx context.Context, g Goer, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
ch := make(chan saveTreeJob)
s := &TreeSaver{
@ -61,8 +54,9 @@ func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.C
}
for i := uint(0); i < treeWorkers; i++ {
s.wg.Add(1)
go s.worker(ctx, &s.wg, ch)
g.Go(func() error {
return s.worker(ctx, ch)
})
}
return s
@ -71,12 +65,19 @@ func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.C
// Save stores the dir d and returns the data once it has been completed.
func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) FutureTree {
ch := make(chan saveTreeResponse, 1)
s.ch <- saveTreeJob{
job := saveTreeJob{
snPath: snPath,
node: node,
nodes: nodes,
ch: ch,
}
select {
case s.ch <- job:
case <-ctx.Done():
debug.Log("refusing to save job, context is cancelled: %v", ctx.Err())
close(ch)
return FutureTree{ch: ch}
}
return FutureTree{ch: ch}
}
@ -91,7 +92,6 @@ type saveTreeJob struct {
type saveTreeResponse struct {
node *restic.Node
stats ItemStats
err error
}
// save stores the nodes as a tree in the repo.
@ -137,21 +137,24 @@ func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node,
return node, stats, nil
}
func (s *TreeSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveTreeJob) {
defer wg.Done()
func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error {
for {
var job saveTreeJob
select {
case <-ctx.Done():
return
return nil
case job = <-jobs:
}
node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes)
if err != nil {
debug.Log("error saving tree blob: %v", err)
return errors.Fatalf("unable to save data: %v", err)
}
job.ch <- saveTreeResponse{
node: node,
stats: stats,
err: err,
}
close(job.ch)
}

View file

@ -262,6 +262,11 @@ func (b *Backup) CompleteItemFn(item string, previous, current *restic.Node, s a
b.summary.Unlock()
if current == nil {
// error occurred, tell the status display to remove the line
b.workerCh <- fileWorkerMessage{
filename: item,
done: true,
}
return
}