archiver: Improve error handling

This commit changes how the worker goroutines for saving e.g. blobs
interact. Before, it was possible to get stuck sending an instruction to
archive a file or dir when no worker goroutines were available any more.
This commit introduces a `done` channel for each of the worker pools,
which is set to the channel returned by `tomb.Dying()`, so it is closed
when the first worker returned an error.
This commit is contained in:
Alexander Neumann 2018-05-12 21:40:31 +02:00
parent fcfa6f0355
commit 581c62ee72
9 changed files with 408 additions and 45 deletions

View file

@ -467,7 +467,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
p.V("start backup on %v", targets)
_, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts)
if err != nil {
return err
return errors.Fatalf("unable to save snapshot: %v", err)
}
p.Finish()

View file

@ -271,6 +271,7 @@ func (fn *FutureNode) wait(ctx context.Context) {
switch {
case fn.isFile:
// wait for and collect the data for the file
fn.file.Wait(ctx)
fn.node = fn.file.Node()
fn.err = fn.file.Err()
fn.stats = fn.file.Stats()
@ -281,6 +282,7 @@ func (fn *FutureNode) wait(ctx context.Context) {
case fn.isDir:
// wait for and collect the data for the dir
fn.dir.Wait(ctx)
fn.node = fn.dir.Node()
fn.stats = fn.dir.Stats()
@ -713,13 +715,13 @@ func (arch *Archiver) runWorkers(ctx context.Context, t *tomb.Tomb) {
arch.fileSaver = NewFileSaver(ctx, t,
arch.FS,
arch.blobSaver,
arch.blobSaver.Save,
arch.Repo.Config().ChunkerPolynomial,
arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency)
arch.fileSaver.CompleteBlob = arch.CompleteBlob
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error)
arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.Error)
}
// Snapshot saves several targets and returns a snapshot.

View file

@ -70,6 +70,8 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
}
res := arch.fileSaver.Save(ctx, "/", file, fi, start, complete)
res.Wait(ctx)
if res.Err() != nil {
t.Fatal(res.Err())
}
@ -620,6 +622,7 @@ func TestArchiverSaveDir(t *testing.T) {
t.Fatal(err)
}
ft.Wait(ctx)
node, stats := ft.Node(), ft.Stats()
tmb.Kill(nil)
@ -701,6 +704,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
t.Fatal(err)
}
ft.Wait(ctx)
node, stats := ft.Node(), ft.Stats()
tmb.Kill(nil)

View file

@ -5,8 +5,8 @@ import (
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
tomb "gopkg.in/tomb.v2"
)
// Saver allows saving a blob.
@ -23,21 +23,23 @@ type BlobSaver struct {
knownBlobs restic.BlobSet
ch chan<- saveBlobJob
done <-chan struct{}
}
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
// when ctx is cancelled.
func NewBlobSaver(ctx context.Context, g Goer, repo Saver, workers uint) *BlobSaver {
func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *BlobSaver {
ch := make(chan saveBlobJob)
s := &BlobSaver{
repo: repo,
knownBlobs: restic.NewBlobSet(),
ch: ch,
done: t.Dying(),
}
for i := uint(0); i < workers; i++ {
g.Go(func() error {
return s.worker(ctx, ch)
t.Go(func() error {
return s.worker(t.Context(ctx), ch)
})
}
@ -51,6 +53,10 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) Fu
ch := make(chan saveBlobResponse, 1)
select {
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
case <-s.done:
debug.Log("not sending job, BlobSaver is done")
close(ch)
return FutureBlob{ch: ch}
case <-ctx.Done():
debug.Log("not sending job, context is cancelled")
close(ch)
@ -139,7 +145,7 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte)
// otherwise we're responsible for saving it
_, err := s.repo.SaveBlob(ctx, t, buf, id)
if err != nil {
return saveBlobResponse{}, errors.Fatalf("unable to save data: %v", err)
return saveBlobResponse{}, err
}
return saveBlobResponse{
@ -153,14 +159,13 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
var job saveBlobJob
select {
case <-ctx.Done():
debug.Log("context is cancelled, exiting: %v", ctx.Err())
return nil
case job = <-jobs:
}
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
if err != nil {
debug.Log("saveBlob returned error: %v", err)
debug.Log("saveBlob returned error, exiting: %v", err)
close(job.ch)
return err
}

View file

@ -0,0 +1,115 @@
package archiver
import (
"context"
"fmt"
"runtime"
"sync/atomic"
"testing"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
tomb "gopkg.in/tomb.v2"
)
var errTest = errors.New("test error")
type saveFail struct {
idx restic.Index
cnt int32
failAt int32
}
func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) {
val := atomic.AddInt32(&b.cnt, 1)
if val == b.failAt {
return restic.ID{}, errTest
}
return id, nil
}
func (b *saveFail) Index() restic.Index {
return b.idx
}
func TestBlobSaver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var tmb tomb.Tomb
saver := &saveFail{
idx: repository.NewIndex(),
}
b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU()))
var results []FutureBlob
for i := 0; i < 20; i++ {
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
fb := b.Save(ctx, restic.DataBlob, buf)
results = append(results, fb)
}
for i, blob := range results {
blob.Wait(ctx)
if blob.Known() {
t.Errorf("blob %v is known, that should not be the case", i)
}
}
tmb.Kill(nil)
err := tmb.Wait()
if err != nil {
t.Fatal(err)
}
}
func TestBlobSaverError(t *testing.T) {
var tests = []struct {
blobs int
failAt int
}{
{20, 2},
{20, 5},
{20, 15},
{200, 150},
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var tmb tomb.Tomb
saver := &saveFail{
idx: repository.NewIndex(),
failAt: int32(test.failAt),
}
b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU()))
var results []FutureBlob
for i := 0; i < test.blobs; i++ {
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
fb := b.Save(ctx, restic.DataBlob, buf)
results = append(results, fb)
}
tmb.Kill(nil)
err := tmb.Wait()
if err == nil {
t.Errorf("expected error not found")
}
if err != errTest {
t.Fatalf("unexpected error found: %v", err)
}
})
}
}

View file

@ -10,13 +10,9 @@ import (
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
tomb "gopkg.in/tomb.v2"
)
// Goer starts a function in a goroutine.
type Goer interface {
Go(func() error)
}
// FutureFile is returned by Save and will return the data once it
// has been processed.
type FutureFile struct {
@ -24,40 +20,47 @@ type FutureFile struct {
res saveFileResponse
}
func (s *FutureFile) wait() {
res, ok := <-s.ch
// Wait blocks until the result of the save operation is received or ctx is
// cancelled.
func (s *FutureFile) Wait(ctx context.Context) {
select {
case res, ok := <-s.ch:
if ok {
s.res = res
}
case <-ctx.Done():
return
}
}
// Node returns the node once it is available.
func (s *FutureFile) Node() *restic.Node {
s.wait()
return s.res.node
}
// Stats returns the stats for the file once they are available.
func (s *FutureFile) Stats() ItemStats {
s.wait()
return s.res.stats
}
// Err returns the error in case an error occurred.
func (s *FutureFile) Err() error {
s.wait()
return s.res.err
}
// SaveBlobFn saves a blob to a repo.
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob
// FileSaver concurrently saves incoming files to the repo.
type FileSaver struct {
fs fs.FS
blobSaver *BlobSaver
saveFilePool *BufferPool
saveBlob SaveBlobFn
pol chunker.Pol
ch chan<- saveFileJob
done <-chan struct{}
CompleteBlob func(filename string, bytes uint64)
@ -66,7 +69,7 @@ type FileSaver struct {
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
// started, it is stopped when ctx is cancelled.
func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
func NewFileSaver(ctx context.Context, t *tomb.Tomb, fs fs.FS, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
ch := make(chan saveFileJob)
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
@ -75,17 +78,18 @@ func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, p
s := &FileSaver{
fs: fs,
blobSaver: blobSaver,
saveBlob: save,
saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize),
pol: pol,
ch: ch,
done: t.Dying(),
CompleteBlob: func(string, uint64) {},
}
for i := uint(0); i < fileWorkers; i++ {
g.Go(func() error {
s.worker(ctx, ch)
t.Go(func() error {
s.worker(t.Context(ctx), ch)
return nil
})
}
@ -111,8 +115,14 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os
select {
case s.ch <- job:
case <-s.done:
debug.Log("not sending job, FileSaver is done")
close(ch)
return FutureFile{ch: ch}
case <-ctx.Done():
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
close(ch)
return FutureFile{ch: ch}
}
return FutureFile{ch: ch}
@ -182,7 +192,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
return saveFileResponse{err: ctx.Err()}
}
res := s.blobSaver.Save(ctx, restic.DataBlob, buf)
res := s.saveBlob(ctx, restic.DataBlob, buf)
results = append(results, res)
// test if the context has been cancelled, return the error

View file

@ -0,0 +1,97 @@
package archiver
import (
"context"
"fmt"
"io/ioutil"
"path/filepath"
"runtime"
"testing"
"github.com/restic/chunker"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
tomb "gopkg.in/tomb.v2"
)
func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) {
tempdir, cleanup := test.TempDir(t)
for i := 0; i < 15; i++ {
filename := fmt.Sprintf("testfile-%d", i)
err := ioutil.WriteFile(filepath.Join(tempdir, filename), []byte(filename), 0600)
if err != nil {
t.Fatal(err)
}
files = append(files, filepath.Join(tempdir, filename))
}
return files, cleanup
}
func startFileSaver(ctx context.Context, t testing.TB, fs fs.FS) (*FileSaver, *tomb.Tomb) {
var tmb tomb.Tomb
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan saveBlobResponse)
close(ch)
return FutureBlob{ch: ch}
}
workers := uint(runtime.NumCPU())
pol, err := chunker.RandomPolynomial()
if err != nil {
t.Fatal(err)
}
s := NewFileSaver(ctx, &tmb, fs, saveBlob, pol, workers, workers)
s.NodeFromFileInfo = restic.NodeFromFileInfo
return s, &tmb
}
func TestFileSaver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
files, cleanup := createTestFiles(t, 15)
defer cleanup()
startFn := func() {}
completeFn := func(*restic.Node, ItemStats) {}
testFs := fs.Local{}
s, tmb := startFileSaver(ctx, t, testFs)
var results []FutureFile
for _, filename := range files {
f, err := testFs.Open(filename)
if err != nil {
t.Fatal(err)
}
fi, err := f.Stat()
if err != nil {
t.Fatal(err)
}
ff := s.Save(ctx, filename, f, fi, startFn, completeFn)
results = append(results, ff)
}
for _, file := range results {
file.Wait(ctx)
if file.Err() != nil {
t.Errorf("unable to save file: %v", file.Err())
}
}
tmb.Kill(nil)
err := tmb.Wait()
if err != nil {
t.Fatal(err)
}
}

View file

@ -4,8 +4,8 @@ import (
"context"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
tomb "gopkg.in/tomb.v2"
)
// FutureTree is returned by Save and will return the data once it
@ -15,22 +15,25 @@ type FutureTree struct {
res saveTreeResponse
}
func (s *FutureTree) wait() {
res, ok := <-s.ch
// Wait blocks until the data has been received or ctx is cancelled.
func (s *FutureTree) Wait(ctx context.Context) {
select {
case <-ctx.Done():
return
case res, ok := <-s.ch:
if ok {
s.res = res
}
}
}
// Node returns the node once it is available.
// Node returns the node.
func (s *FutureTree) Node() *restic.Node {
s.wait()
return s.res.node
}
// Stats returns the stats for the file once they are available.
// Stats returns the stats for the file.
func (s *FutureTree) Stats() ItemStats {
s.wait()
return s.res.stats
}
@ -40,22 +43,24 @@ type TreeSaver struct {
errFn ErrorFunc
ch chan<- saveTreeJob
done <-chan struct{}
}
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
// started, it is stopped when ctx is cancelled.
func NewTreeSaver(ctx context.Context, g Goer, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
func NewTreeSaver(ctx context.Context, t *tomb.Tomb, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
ch := make(chan saveTreeJob)
s := &TreeSaver{
ch: ch,
done: t.Dying(),
saveTree: saveTree,
errFn: errFn,
}
for i := uint(0); i < treeWorkers; i++ {
g.Go(func() error {
return s.worker(ctx, ch)
t.Go(func() error {
return s.worker(t.Context(ctx), ch)
})
}
@ -73,8 +78,12 @@ func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node,
}
select {
case s.ch <- job:
case <-s.done:
debug.Log("not saving tree, TreeSaver is done")
close(ch)
return FutureTree{ch: ch}
case <-ctx.Done():
debug.Log("refusing to save job, context is cancelled: %v", ctx.Err())
debug.Log("not saving tree, context is cancelled")
close(ch)
return FutureTree{ch: ch}
}
@ -149,7 +158,8 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error {
node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes)
if err != nil {
debug.Log("error saving tree blob: %v", err)
return errors.Fatalf("unable to save data: %v", err)
close(job.ch)
return err
}
job.ch <- saveTreeResponse{

View file

@ -0,0 +1,120 @@
package archiver
import (
"context"
"fmt"
"os"
"runtime"
"sync/atomic"
"testing"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
tomb "gopkg.in/tomb.v2"
)
func TestTreeSaver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var tmb tomb.Tomb
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) {
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
}
errFn := func(snPath string, fi os.FileInfo, err error) error {
return nil
}
b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn)
var results []FutureTree
for i := 0; i < 20; i++ {
node := &restic.Node{
Name: fmt.Sprintf("file-%d", i),
}
fb := b.Save(ctx, "/", node, nil)
results = append(results, fb)
}
for _, tree := range results {
tree.Wait(ctx)
}
tmb.Kill(nil)
err := tmb.Wait()
if err != nil {
t.Fatal(err)
}
}
func TestTreeSaverError(t *testing.T) {
var tests = []struct {
trees int
failAt int32
}{
{1, 1},
{20, 2},
{20, 5},
{20, 15},
{200, 150},
}
errTest := errors.New("test error")
for _, test := range tests {
t.Run("", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var tmb tomb.Tomb
var num int32
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) {
val := atomic.AddInt32(&num, 1)
if val == test.failAt {
t.Logf("sending error for request %v\n", test.failAt)
return restic.ID{}, ItemStats{}, errTest
}
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
}
errFn := func(snPath string, fi os.FileInfo, err error) error {
t.Logf("ignoring error %v\n", err)
return nil
}
b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn)
var results []FutureTree
for i := 0; i < test.trees; i++ {
node := &restic.Node{
Name: fmt.Sprintf("file-%d", i),
}
fb := b.Save(ctx, "/", node, nil)
results = append(results, fb)
}
for _, tree := range results {
tree.Wait(ctx)
}
tmb.Kill(nil)
err := tmb.Wait()
if err == nil {
t.Errorf("expected error not found")
}
if err != errTest {
t.Fatalf("unexpected error found: %v", err)
}
})
}
}