From ca4af43c03e1719ec7ac785ce8e3197d093cbdc4 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Tue, 8 May 2018 22:28:37 +0200 Subject: [PATCH] archiver: Return low-level errors This commit changes the archiver so that low-level errors saving data to the repo are returned to the caller (instead of being handled by the error callback function). This correctly bubbles up errors like a full temp file system and makes restic abort early and makes all other worker goroutines exit. --- internal/archiver/archiver.go | 90 ++++++++++++++---------------- internal/archiver/archiver_test.go | 47 +++++++++++----- internal/archiver/blob_saver.go | 67 +++++++++++++--------- internal/archiver/file_saver.go | 31 ++++++---- internal/archiver/tree_saver.go | 37 ++++++------ 5 files changed, 154 insertions(+), 118 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 03e916e7e..3e1a6d0b8 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -13,6 +13,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" ) // SelectFunc returns true for all items that should be included (files and @@ -131,37 +132,16 @@ func New(repo restic.Repository, fs fs.FS, opts Options) *Archiver { return arch } -// Valid returns an error if anything is missing. -func (arch *Archiver) Valid() error { - if arch.blobSaver == nil { - return errors.New("blobSaver is nil") - } - - if arch.fileSaver == nil { - return errors.New("fileSaver is nil") - } - - if arch.Repo == nil { - return errors.New("repo is not set") - } - - if arch.Select == nil { - return errors.New("Select is not set") - } - - if arch.FS == nil { - return errors.New("FS is not set") - } - - return nil -} - -// error calls arch.Error if it is set. +// error calls arch.Error if it is set and the error is different from context.Canceled. func (arch *Archiver) error(item string, fi os.FileInfo, err error) error { if arch.Error == nil || err == nil { return err } + if err == context.Canceled { + return err + } + errf := arch.Error(item, fi, err) if err != errf { debug.Log("item %v: error was filtered by handler, before: %q, after: %v", item, err, errf) @@ -184,10 +164,8 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID, b := &Buffer{Data: buf} res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) - if res.Err() != nil { - return restic.ID{}, s, res.Err() - } + res.Wait(ctx) if !res.Known() { s.TreeBlobs++ s.TreeSize += uint64(len(buf)) @@ -238,6 +216,11 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo nodes := make([]FutureNode, 0, len(names)) for _, name := range names { + // test if context has been cancelled + if ctx.Err() != nil { + return FutureTree{}, ctx.Err() + } + pathname := arch.FS.Join(dir, name) oldNode := previous.Find(name) snItem := join(snPath, name) @@ -299,7 +282,6 @@ func (fn *FutureNode) wait(ctx context.Context) { case fn.isDir: // wait for and collect the data for the dir fn.node = fn.dir.Node() - fn.err = fn.dir.Err() fn.stats = fn.dir.Stats() // ensure the other stuff can be garbage-collected @@ -496,6 +478,10 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, futureNodes := make(map[string]FutureNode) for name, subatree := range atree.Nodes { + // test if context has been cancelled + if ctx.Err() != nil { + return nil, ctx.Err() + } // this is a leaf node if subatree.Path != "" { @@ -722,10 +708,10 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID) } // runWorkers starts the worker pools, which are stopped when the context is cancelled. -func (arch *Archiver) runWorkers(ctx context.Context) { - arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency) +func (arch *Archiver) runWorkers(ctx context.Context, t *tomb.Tomb) { + arch.blobSaver = NewBlobSaver(ctx, t, arch.Repo, arch.Options.SaveBlobConcurrency) - arch.fileSaver = NewFileSaver(ctx, + arch.fileSaver = NewFileSaver(ctx, t, arch.FS, arch.blobSaver, arch.Repo.Config().ChunkerPolynomial, @@ -733,21 +719,11 @@ func (arch *Archiver) runWorkers(ctx context.Context) { arch.fileSaver.CompleteBlob = arch.CompleteBlob arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo - arch.treeSaver = NewTreeSaver(ctx, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error) + arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error) } // Snapshot saves several targets and returns a snapshot. func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts SnapshotOptions) (*restic.Snapshot, restic.ID, error) { - workerCtx, cancel := context.WithCancel(ctx) - defer cancel() - - arch.runWorkers(workerCtx) - - err := arch.Valid() - if err != nil { - return nil, restic.ID{}, err - } - cleanTargets, err := resolveRelativeTargets(arch.FS, targets) if err != nil { return nil, restic.ID{}, err @@ -758,14 +734,32 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return nil, restic.ID{}, err } + var t tomb.Tomb + wctx := t.Context(ctx) + + arch.runWorkers(wctx, &t) + start := time.Now() - tree, err := arch.SaveTree(ctx, "/", atree, arch.loadParentTree(ctx, opts.ParentSnapshot)) - if err != nil { - return nil, restic.ID{}, err + + debug.Log("starting snapshot") + rootTreeID, stats, err := func() (restic.ID, ItemStats, error) { + tree, err := arch.SaveTree(wctx, "/", atree, arch.loadParentTree(wctx, opts.ParentSnapshot)) + if err != nil { + return restic.ID{}, ItemStats{}, err + } + + return arch.saveTree(wctx, tree) + }() + debug.Log("saved tree, error: %v", err) + + t.Kill(nil) + werr := t.Wait() + if err != nil && errors.Cause(err) == context.Canceled { + err = werr } - rootTreeID, stats, err := arch.saveTree(ctx, tree) if err != nil { + debug.Log("error while saving tree: %v", err) return nil, restic.ID{}, err } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 8916f58a3..c24232b50 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -17,6 +17,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" restictest "github.com/restic/restic/internal/test" + tomb "gopkg.in/tomb.v2" ) func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo restic.Repository, cleanup func()) { @@ -34,11 +35,11 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo rest } func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + var tmb tomb.Tomb + ctx := tmb.Context(context.Background()) arch := New(repo, filesystem, Options{}) - arch.runWorkers(ctx) + arch.runWorkers(ctx, &tmb) var ( completeCallbackNode *restic.Node @@ -73,6 +74,12 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Fatal(res.Err()) } + tmb.Kill(nil) + err = tmb.Wait() + if err != nil { + t.Fatal(err) + } + err = repo.Flush(ctx) if err != nil { t.Fatal(err) @@ -586,14 +593,14 @@ func TestArchiverSaveDir(t *testing.T) { for _, test := range tests { t.Run("", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + var tmb tomb.Tomb + ctx := tmb.Context(context.Background()) tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) defer cleanup() arch := New(repo, fs.Track{fs.Local{}}, Options{}) - arch.runWorkers(ctx) + arch.runWorkers(ctx, &tmb) chdir := tempdir if test.chdir != "" { @@ -613,7 +620,10 @@ func TestArchiverSaveDir(t *testing.T) { t.Fatal(err) } - node, stats, err := ft.Node(), ft.Stats(), ft.Err() + node, stats := ft.Node(), ft.Stats() + + tmb.Kill(nil) + err = tmb.Wait() if err != nil { t.Fatal(err) } @@ -675,11 +685,11 @@ func TestArchiverSaveDirIncremental(t *testing.T) { // save the empty directory several times in a row, then have a look if the // archiver did save the same tree several times for i := 0; i < 5; i++ { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + var tmb tomb.Tomb + ctx := tmb.Context(context.Background()) arch := New(repo, fs.Track{fs.Local{}}, Options{}) - arch.runWorkers(ctx) + arch.runWorkers(ctx, &tmb) fi, err := fs.Lstat(tempdir) if err != nil { @@ -691,7 +701,10 @@ func TestArchiverSaveDirIncremental(t *testing.T) { t.Fatal(err) } - node, stats, err := ft.Node(), ft.Stats(), ft.Err() + node, stats := ft.Node(), ft.Stats() + + tmb.Kill(nil) + err = tmb.Wait() if err != nil { t.Fatal(err) } @@ -828,8 +841,8 @@ func TestArchiverSaveTree(t *testing.T) { for _, test := range tests { t.Run("", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + var tmb tomb.Tomb + ctx := tmb.Context(context.Background()) tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) defer cleanup() @@ -837,7 +850,7 @@ func TestArchiverSaveTree(t *testing.T) { testFS := fs.Track{fs.Local{}} arch := New(repo, testFS, Options{}) - arch.runWorkers(ctx) + arch.runWorkers(ctx, &tmb) back := fs.TestChdir(t, tempdir) defer back() @@ -861,6 +874,12 @@ func TestArchiverSaveTree(t *testing.T) { t.Fatal(err) } + tmb.Kill(nil) + err = tmb.Wait() + if err != nil { + t.Fatal(err) + } + err = repo.Flush(ctx) if err != nil { t.Fatal(err) diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 4d0f39c48..7ef274058 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" ) @@ -21,12 +23,11 @@ type BlobSaver struct { knownBlobs restic.BlobSet ch chan<- saveBlobJob - wg sync.WaitGroup } // NewBlobSaver returns a new blob. A worker pool is started, it is stopped // when ctx is cancelled. -func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { +func NewBlobSaver(ctx context.Context, g Goer, repo Saver, workers uint) *BlobSaver { ch := make(chan saveBlobJob) s := &BlobSaver{ repo: repo, @@ -35,8 +36,9 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { } for i := uint(0); i < workers; i++ { - s.wg.Add(1) - go s.worker(ctx, &s.wg, ch) + g.Go(func() error { + return s.worker(ctx, ch) + }) } return s @@ -47,7 +49,13 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { // previously unknown. func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { ch := make(chan saveBlobResponse, 1) - s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch} + select { + case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: + case <-ctx.Done(): + debug.Log("not sending job, context is cancelled") + close(ch) + return FutureBlob{ch: ch} + } return FutureBlob{ch: ch, length: len(buf.Data)} } @@ -59,31 +67,28 @@ type FutureBlob struct { res saveBlobResponse } -func (s *FutureBlob) wait() { - res, ok := <-s.ch - if ok { - s.res = res +// Wait blocks until the result is available or the context is cancelled. +func (s *FutureBlob) Wait(ctx context.Context) { + select { + case <-ctx.Done(): + return + case res, ok := <-s.ch: + if ok { + s.res = res + } } } // ID returns the ID of the blob after it has been saved. func (s *FutureBlob) ID() restic.ID { - s.wait() return s.res.id } // Known returns whether or not the blob was already known. func (s *FutureBlob) Known() bool { - s.wait() return s.res.known } -// Err returns the error which may have occurred during save. -func (s *FutureBlob) Err() error { - s.wait() - return s.res.err -} - // Length returns the length of the blob. func (s *FutureBlob) Length() int { return s.length @@ -98,10 +103,9 @@ type saveBlobJob struct { type saveBlobResponse struct { id restic.ID known bool - err error } -func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) saveBlobResponse { +func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) { id := restic.Hash(buf) h := restic.BlobHandle{ID: id, Type: t} @@ -121,7 +125,7 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) return saveBlobResponse{ id: id, known: true, - } + }, nil } // check if the repo knows this blob @@ -129,29 +133,38 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) return saveBlobResponse{ id: id, known: true, - } + }, nil } // otherwise we're responsible for saving it _, err := s.repo.SaveBlob(ctx, t, buf, id) + if err != nil { + return saveBlobResponse{}, errors.Fatalf("unable to save data: %v", err) + } + return saveBlobResponse{ id: id, known: false, - err: err, - } + }, nil } -func (s *BlobSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveBlobJob) { - defer wg.Done() +func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error { for { var job saveBlobJob select { case <-ctx.Done(): - return + debug.Log("context is cancelled, exiting: %v", ctx.Err()) + return nil case job = <-jobs: } - job.ch <- s.saveBlob(ctx, job.BlobType, job.buf.Data) + res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data) + if err != nil { + debug.Log("saveBlob returned error: %v", err) + close(job.ch) + return err + } + job.ch <- res close(job.ch) job.buf.Release() } diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 1a10c39d3..7a2c24adf 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -4,7 +4,6 @@ import ( "context" "io" "os" - "sync" "github.com/restic/chunker" "github.com/restic/restic/internal/debug" @@ -13,6 +12,11 @@ import ( "github.com/restic/restic/internal/restic" ) +// Goer starts a function in a goroutine. +type Goer interface { + Go(func() error) +} + // FutureFile is returned by Save and will return the data once it // has been processed. type FutureFile struct { @@ -54,7 +58,6 @@ type FileSaver struct { pol chunker.Pol ch chan<- saveFileJob - wg sync.WaitGroup CompleteBlob func(filename string, bytes uint64) @@ -63,7 +66,7 @@ type FileSaver struct { // NewFileSaver returns a new file saver. A worker pool with fileWorkers is // started, it is stopped when ctx is cancelled. -func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { +func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { ch := make(chan saveFileJob) debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers) @@ -81,8 +84,10 @@ func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunk } for i := uint(0); i < fileWorkers; i++ { - s.wg.Add(1) - go s.worker(ctx, &s.wg, ch) + g.Go(func() error { + s.worker(ctx, ch) + return nil + }) } return s @@ -95,7 +100,7 @@ type CompleteFunc func(*restic.Node, ItemStats) // file is closed by Save. func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureFile { ch := make(chan saveFileResponse, 1) - s.ch <- saveFileJob{ + job := saveFileJob{ snPath: snPath, file: file, fi: fi, @@ -104,6 +109,12 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os ch: ch, } + select { + case s.ch <- job: + case <-ctx.Done(): + debug.Log("not sending job, context is cancelled: %v", ctx.Err()) + } + return FutureFile{ch: ch} } @@ -189,10 +200,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } for _, res := range results { - if res.Err() != nil { - return saveFileResponse{err: res.Err()} - } - + res.Wait(ctx) if !res.Known() { stats.DataBlobs++ stats.DataSize += uint64(res.Length()) @@ -209,11 +217,10 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } } -func (s *FileSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveFileJob) { +func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { // a worker has one chunker which is reused for each file (because it contains a rather large buffer) chnker := chunker.New(nil, s.pol) - defer wg.Done() for { var job saveFileJob select { diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index db04dfe80..743ef3b55 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -2,9 +2,9 @@ package archiver import ( "context" - "sync" "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" ) @@ -34,24 +34,17 @@ func (s *FutureTree) Stats() ItemStats { return s.res.stats } -// Err returns the error in case an error occurred. -func (s *FutureTree) Err() error { - s.wait() - return s.res.err -} - // TreeSaver concurrently saves incoming trees to the repo. type TreeSaver struct { saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) errFn ErrorFunc ch chan<- saveTreeJob - wg sync.WaitGroup } // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { +func NewTreeSaver(ctx context.Context, g Goer, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { ch := make(chan saveTreeJob) s := &TreeSaver{ @@ -61,8 +54,9 @@ func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.C } for i := uint(0); i < treeWorkers; i++ { - s.wg.Add(1) - go s.worker(ctx, &s.wg, ch) + g.Go(func() error { + return s.worker(ctx, ch) + }) } return s @@ -71,12 +65,19 @@ func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.C // Save stores the dir d and returns the data once it has been completed. func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) FutureTree { ch := make(chan saveTreeResponse, 1) - s.ch <- saveTreeJob{ + job := saveTreeJob{ snPath: snPath, node: node, nodes: nodes, ch: ch, } + select { + case s.ch <- job: + case <-ctx.Done(): + debug.Log("refusing to save job, context is cancelled: %v", ctx.Err()) + close(ch) + return FutureTree{ch: ch} + } return FutureTree{ch: ch} } @@ -91,7 +92,6 @@ type saveTreeJob struct { type saveTreeResponse struct { node *restic.Node stats ItemStats - err error } // save stores the nodes as a tree in the repo. @@ -137,21 +137,24 @@ func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, return node, stats, nil } -func (s *TreeSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveTreeJob) { - defer wg.Done() +func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { for { var job saveTreeJob select { case <-ctx.Done(): - return + return nil case job = <-jobs: } node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes) + if err != nil { + debug.Log("error saving tree blob: %v", err) + return errors.Fatalf("unable to save data: %v", err) + } + job.ch <- saveTreeResponse{ node: node, stats: stats, - err: err, } close(job.ch) }