forked from TrueCloudLab/restic
archiver: Return low-level errors
This commit changes the archiver so that low-level errors saving data to the repo are returned to the caller (instead of being handled by the error callback function). This correctly bubbles up errors like a full temp file system and makes restic abort early and makes all other worker goroutines exit.
This commit is contained in:
parent
1f2463f42e
commit
ca4af43c03
5 changed files with 154 additions and 118 deletions
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/fs"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
tomb "gopkg.in/tomb.v2"
|
||||
)
|
||||
|
||||
// SelectFunc returns true for all items that should be included (files and
|
||||
|
@ -131,37 +132,16 @@ func New(repo restic.Repository, fs fs.FS, opts Options) *Archiver {
|
|||
return arch
|
||||
}
|
||||
|
||||
// Valid returns an error if anything is missing.
|
||||
func (arch *Archiver) Valid() error {
|
||||
if arch.blobSaver == nil {
|
||||
return errors.New("blobSaver is nil")
|
||||
}
|
||||
|
||||
if arch.fileSaver == nil {
|
||||
return errors.New("fileSaver is nil")
|
||||
}
|
||||
|
||||
if arch.Repo == nil {
|
||||
return errors.New("repo is not set")
|
||||
}
|
||||
|
||||
if arch.Select == nil {
|
||||
return errors.New("Select is not set")
|
||||
}
|
||||
|
||||
if arch.FS == nil {
|
||||
return errors.New("FS is not set")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// error calls arch.Error if it is set.
|
||||
// error calls arch.Error if it is set and the error is different from context.Canceled.
|
||||
func (arch *Archiver) error(item string, fi os.FileInfo, err error) error {
|
||||
if arch.Error == nil || err == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err == context.Canceled {
|
||||
return err
|
||||
}
|
||||
|
||||
errf := arch.Error(item, fi, err)
|
||||
if err != errf {
|
||||
debug.Log("item %v: error was filtered by handler, before: %q, after: %v", item, err, errf)
|
||||
|
@ -184,10 +164,8 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID,
|
|||
|
||||
b := &Buffer{Data: buf}
|
||||
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
|
||||
if res.Err() != nil {
|
||||
return restic.ID{}, s, res.Err()
|
||||
}
|
||||
|
||||
res.Wait(ctx)
|
||||
if !res.Known() {
|
||||
s.TreeBlobs++
|
||||
s.TreeSize += uint64(len(buf))
|
||||
|
@ -238,6 +216,11 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo
|
|||
nodes := make([]FutureNode, 0, len(names))
|
||||
|
||||
for _, name := range names {
|
||||
// test if context has been cancelled
|
||||
if ctx.Err() != nil {
|
||||
return FutureTree{}, ctx.Err()
|
||||
}
|
||||
|
||||
pathname := arch.FS.Join(dir, name)
|
||||
oldNode := previous.Find(name)
|
||||
snItem := join(snPath, name)
|
||||
|
@ -299,7 +282,6 @@ func (fn *FutureNode) wait(ctx context.Context) {
|
|||
case fn.isDir:
|
||||
// wait for and collect the data for the dir
|
||||
fn.node = fn.dir.Node()
|
||||
fn.err = fn.dir.Err()
|
||||
fn.stats = fn.dir.Stats()
|
||||
|
||||
// ensure the other stuff can be garbage-collected
|
||||
|
@ -496,6 +478,10 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
|
|||
futureNodes := make(map[string]FutureNode)
|
||||
|
||||
for name, subatree := range atree.Nodes {
|
||||
// test if context has been cancelled
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
// this is a leaf node
|
||||
if subatree.Path != "" {
|
||||
|
@ -722,10 +708,10 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID)
|
|||
}
|
||||
|
||||
// runWorkers starts the worker pools, which are stopped when the context is cancelled.
|
||||
func (arch *Archiver) runWorkers(ctx context.Context) {
|
||||
arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency)
|
||||
func (arch *Archiver) runWorkers(ctx context.Context, t *tomb.Tomb) {
|
||||
arch.blobSaver = NewBlobSaver(ctx, t, arch.Repo, arch.Options.SaveBlobConcurrency)
|
||||
|
||||
arch.fileSaver = NewFileSaver(ctx,
|
||||
arch.fileSaver = NewFileSaver(ctx, t,
|
||||
arch.FS,
|
||||
arch.blobSaver,
|
||||
arch.Repo.Config().ChunkerPolynomial,
|
||||
|
@ -733,21 +719,11 @@ func (arch *Archiver) runWorkers(ctx context.Context) {
|
|||
arch.fileSaver.CompleteBlob = arch.CompleteBlob
|
||||
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
|
||||
|
||||
arch.treeSaver = NewTreeSaver(ctx, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error)
|
||||
arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error)
|
||||
}
|
||||
|
||||
// Snapshot saves several targets and returns a snapshot.
|
||||
func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts SnapshotOptions) (*restic.Snapshot, restic.ID, error) {
|
||||
workerCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
arch.runWorkers(workerCtx)
|
||||
|
||||
err := arch.Valid()
|
||||
if err != nil {
|
||||
return nil, restic.ID{}, err
|
||||
}
|
||||
|
||||
cleanTargets, err := resolveRelativeTargets(arch.FS, targets)
|
||||
if err != nil {
|
||||
return nil, restic.ID{}, err
|
||||
|
@ -758,14 +734,32 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
|
|||
return nil, restic.ID{}, err
|
||||
}
|
||||
|
||||
var t tomb.Tomb
|
||||
wctx := t.Context(ctx)
|
||||
|
||||
arch.runWorkers(wctx, &t)
|
||||
|
||||
start := time.Now()
|
||||
tree, err := arch.SaveTree(ctx, "/", atree, arch.loadParentTree(ctx, opts.ParentSnapshot))
|
||||
|
||||
debug.Log("starting snapshot")
|
||||
rootTreeID, stats, err := func() (restic.ID, ItemStats, error) {
|
||||
tree, err := arch.SaveTree(wctx, "/", atree, arch.loadParentTree(wctx, opts.ParentSnapshot))
|
||||
if err != nil {
|
||||
return nil, restic.ID{}, err
|
||||
return restic.ID{}, ItemStats{}, err
|
||||
}
|
||||
|
||||
return arch.saveTree(wctx, tree)
|
||||
}()
|
||||
debug.Log("saved tree, error: %v", err)
|
||||
|
||||
t.Kill(nil)
|
||||
werr := t.Wait()
|
||||
if err != nil && errors.Cause(err) == context.Canceled {
|
||||
err = werr
|
||||
}
|
||||
|
||||
rootTreeID, stats, err := arch.saveTree(ctx, tree)
|
||||
if err != nil {
|
||||
debug.Log("error while saving tree: %v", err)
|
||||
return nil, restic.ID{}, err
|
||||
}
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
restictest "github.com/restic/restic/internal/test"
|
||||
tomb "gopkg.in/tomb.v2"
|
||||
)
|
||||
|
||||
func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo restic.Repository, cleanup func()) {
|
||||
|
@ -34,11 +35,11 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo rest
|
|||
}
|
||||
|
||||
func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
var tmb tomb.Tomb
|
||||
ctx := tmb.Context(context.Background())
|
||||
|
||||
arch := New(repo, filesystem, Options{})
|
||||
arch.runWorkers(ctx)
|
||||
arch.runWorkers(ctx, &tmb)
|
||||
|
||||
var (
|
||||
completeCallbackNode *restic.Node
|
||||
|
@ -73,6 +74,12 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
|
|||
t.Fatal(res.Err())
|
||||
}
|
||||
|
||||
tmb.Kill(nil)
|
||||
err = tmb.Wait()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = repo.Flush(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -586,14 +593,14 @@ func TestArchiverSaveDir(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
t.Run("", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
var tmb tomb.Tomb
|
||||
ctx := tmb.Context(context.Background())
|
||||
|
||||
tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src)
|
||||
defer cleanup()
|
||||
|
||||
arch := New(repo, fs.Track{fs.Local{}}, Options{})
|
||||
arch.runWorkers(ctx)
|
||||
arch.runWorkers(ctx, &tmb)
|
||||
|
||||
chdir := tempdir
|
||||
if test.chdir != "" {
|
||||
|
@ -613,7 +620,10 @@ func TestArchiverSaveDir(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node, stats, err := ft.Node(), ft.Stats(), ft.Err()
|
||||
node, stats := ft.Node(), ft.Stats()
|
||||
|
||||
tmb.Kill(nil)
|
||||
err = tmb.Wait()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -675,11 +685,11 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
|
|||
// save the empty directory several times in a row, then have a look if the
|
||||
// archiver did save the same tree several times
|
||||
for i := 0; i < 5; i++ {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
var tmb tomb.Tomb
|
||||
ctx := tmb.Context(context.Background())
|
||||
|
||||
arch := New(repo, fs.Track{fs.Local{}}, Options{})
|
||||
arch.runWorkers(ctx)
|
||||
arch.runWorkers(ctx, &tmb)
|
||||
|
||||
fi, err := fs.Lstat(tempdir)
|
||||
if err != nil {
|
||||
|
@ -691,7 +701,10 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node, stats, err := ft.Node(), ft.Stats(), ft.Err()
|
||||
node, stats := ft.Node(), ft.Stats()
|
||||
|
||||
tmb.Kill(nil)
|
||||
err = tmb.Wait()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -828,8 +841,8 @@ func TestArchiverSaveTree(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
t.Run("", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
var tmb tomb.Tomb
|
||||
ctx := tmb.Context(context.Background())
|
||||
|
||||
tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src)
|
||||
defer cleanup()
|
||||
|
@ -837,7 +850,7 @@ func TestArchiverSaveTree(t *testing.T) {
|
|||
testFS := fs.Track{fs.Local{}}
|
||||
|
||||
arch := New(repo, testFS, Options{})
|
||||
arch.runWorkers(ctx)
|
||||
arch.runWorkers(ctx, &tmb)
|
||||
|
||||
back := fs.TestChdir(t, tempdir)
|
||||
defer back()
|
||||
|
@ -861,6 +874,12 @@ func TestArchiverSaveTree(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
tmb.Kill(nil)
|
||||
err = tmb.Wait()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = repo.Flush(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
|
@ -21,12 +23,11 @@ type BlobSaver struct {
|
|||
knownBlobs restic.BlobSet
|
||||
|
||||
ch chan<- saveBlobJob
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
|
||||
// when ctx is cancelled.
|
||||
func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
|
||||
func NewBlobSaver(ctx context.Context, g Goer, repo Saver, workers uint) *BlobSaver {
|
||||
ch := make(chan saveBlobJob)
|
||||
s := &BlobSaver{
|
||||
repo: repo,
|
||||
|
@ -35,8 +36,9 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
|
|||
}
|
||||
|
||||
for i := uint(0); i < workers; i++ {
|
||||
s.wg.Add(1)
|
||||
go s.worker(ctx, &s.wg, ch)
|
||||
g.Go(func() error {
|
||||
return s.worker(ctx, ch)
|
||||
})
|
||||
}
|
||||
|
||||
return s
|
||||
|
@ -47,7 +49,13 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
|
|||
// previously unknown.
|
||||
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
|
||||
ch := make(chan saveBlobResponse, 1)
|
||||
s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}
|
||||
select {
|
||||
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
|
||||
case <-ctx.Done():
|
||||
debug.Log("not sending job, context is cancelled")
|
||||
close(ch)
|
||||
return FutureBlob{ch: ch}
|
||||
}
|
||||
|
||||
return FutureBlob{ch: ch, length: len(buf.Data)}
|
||||
}
|
||||
|
@ -59,31 +67,28 @@ type FutureBlob struct {
|
|||
res saveBlobResponse
|
||||
}
|
||||
|
||||
func (s *FutureBlob) wait() {
|
||||
res, ok := <-s.ch
|
||||
// Wait blocks until the result is available or the context is cancelled.
|
||||
func (s *FutureBlob) Wait(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case res, ok := <-s.ch:
|
||||
if ok {
|
||||
s.res = res
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ID returns the ID of the blob after it has been saved.
|
||||
func (s *FutureBlob) ID() restic.ID {
|
||||
s.wait()
|
||||
return s.res.id
|
||||
}
|
||||
|
||||
// Known returns whether or not the blob was already known.
|
||||
func (s *FutureBlob) Known() bool {
|
||||
s.wait()
|
||||
return s.res.known
|
||||
}
|
||||
|
||||
// Err returns the error which may have occurred during save.
|
||||
func (s *FutureBlob) Err() error {
|
||||
s.wait()
|
||||
return s.res.err
|
||||
}
|
||||
|
||||
// Length returns the length of the blob.
|
||||
func (s *FutureBlob) Length() int {
|
||||
return s.length
|
||||
|
@ -98,10 +103,9 @@ type saveBlobJob struct {
|
|||
type saveBlobResponse struct {
|
||||
id restic.ID
|
||||
known bool
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) saveBlobResponse {
|
||||
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) {
|
||||
id := restic.Hash(buf)
|
||||
h := restic.BlobHandle{ID: id, Type: t}
|
||||
|
||||
|
@ -121,7 +125,7 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte)
|
|||
return saveBlobResponse{
|
||||
id: id,
|
||||
known: true,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// check if the repo knows this blob
|
||||
|
@ -129,29 +133,38 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte)
|
|||
return saveBlobResponse{
|
||||
id: id,
|
||||
known: true,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
// otherwise we're responsible for saving it
|
||||
_, err := s.repo.SaveBlob(ctx, t, buf, id)
|
||||
if err != nil {
|
||||
return saveBlobResponse{}, errors.Fatalf("unable to save data: %v", err)
|
||||
}
|
||||
|
||||
return saveBlobResponse{
|
||||
id: id,
|
||||
known: false,
|
||||
err: err,
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *BlobSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveBlobJob) {
|
||||
defer wg.Done()
|
||||
func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
|
||||
for {
|
||||
var job saveBlobJob
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
debug.Log("context is cancelled, exiting: %v", ctx.Err())
|
||||
return nil
|
||||
case job = <-jobs:
|
||||
}
|
||||
|
||||
job.ch <- s.saveBlob(ctx, job.BlobType, job.buf.Data)
|
||||
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
|
||||
if err != nil {
|
||||
debug.Log("saveBlob returned error: %v", err)
|
||||
close(job.ch)
|
||||
return err
|
||||
}
|
||||
job.ch <- res
|
||||
close(job.ch)
|
||||
job.buf.Release()
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/chunker"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
|
@ -13,6 +12,11 @@ import (
|
|||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// Goer starts a function in a goroutine.
|
||||
type Goer interface {
|
||||
Go(func() error)
|
||||
}
|
||||
|
||||
// FutureFile is returned by Save and will return the data once it
|
||||
// has been processed.
|
||||
type FutureFile struct {
|
||||
|
@ -54,7 +58,6 @@ type FileSaver struct {
|
|||
pol chunker.Pol
|
||||
|
||||
ch chan<- saveFileJob
|
||||
wg sync.WaitGroup
|
||||
|
||||
CompleteBlob func(filename string, bytes uint64)
|
||||
|
||||
|
@ -63,7 +66,7 @@ type FileSaver struct {
|
|||
|
||||
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
|
||||
// started, it is stopped when ctx is cancelled.
|
||||
func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
|
||||
func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
|
||||
ch := make(chan saveFileJob)
|
||||
|
||||
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
|
||||
|
@ -81,8 +84,10 @@ func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunk
|
|||
}
|
||||
|
||||
for i := uint(0); i < fileWorkers; i++ {
|
||||
s.wg.Add(1)
|
||||
go s.worker(ctx, &s.wg, ch)
|
||||
g.Go(func() error {
|
||||
s.worker(ctx, ch)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
return s
|
||||
|
@ -95,7 +100,7 @@ type CompleteFunc func(*restic.Node, ItemStats)
|
|||
// file is closed by Save.
|
||||
func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureFile {
|
||||
ch := make(chan saveFileResponse, 1)
|
||||
s.ch <- saveFileJob{
|
||||
job := saveFileJob{
|
||||
snPath: snPath,
|
||||
file: file,
|
||||
fi: fi,
|
||||
|
@ -104,6 +109,12 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os
|
|||
ch: ch,
|
||||
}
|
||||
|
||||
select {
|
||||
case s.ch <- job:
|
||||
case <-ctx.Done():
|
||||
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
|
||||
}
|
||||
|
||||
return FutureFile{ch: ch}
|
||||
}
|
||||
|
||||
|
@ -189,10 +200,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
|
|||
}
|
||||
|
||||
for _, res := range results {
|
||||
if res.Err() != nil {
|
||||
return saveFileResponse{err: res.Err()}
|
||||
}
|
||||
|
||||
res.Wait(ctx)
|
||||
if !res.Known() {
|
||||
stats.DataBlobs++
|
||||
stats.DataSize += uint64(res.Length())
|
||||
|
@ -209,11 +217,10 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
|
|||
}
|
||||
}
|
||||
|
||||
func (s *FileSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveFileJob) {
|
||||
func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
|
||||
// a worker has one chunker which is reused for each file (because it contains a rather large buffer)
|
||||
chnker := chunker.New(nil, s.pol)
|
||||
|
||||
defer wg.Done()
|
||||
for {
|
||||
var job saveFileJob
|
||||
select {
|
||||
|
|
|
@ -2,9 +2,9 @@ package archiver
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
|
@ -34,24 +34,17 @@ func (s *FutureTree) Stats() ItemStats {
|
|||
return s.res.stats
|
||||
}
|
||||
|
||||
// Err returns the error in case an error occurred.
|
||||
func (s *FutureTree) Err() error {
|
||||
s.wait()
|
||||
return s.res.err
|
||||
}
|
||||
|
||||
// TreeSaver concurrently saves incoming trees to the repo.
|
||||
type TreeSaver struct {
|
||||
saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error)
|
||||
errFn ErrorFunc
|
||||
|
||||
ch chan<- saveTreeJob
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
||||
// started, it is stopped when ctx is cancelled.
|
||||
func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
|
||||
func NewTreeSaver(ctx context.Context, g Goer, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
|
||||
ch := make(chan saveTreeJob)
|
||||
|
||||
s := &TreeSaver{
|
||||
|
@ -61,8 +54,9 @@ func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.C
|
|||
}
|
||||
|
||||
for i := uint(0); i < treeWorkers; i++ {
|
||||
s.wg.Add(1)
|
||||
go s.worker(ctx, &s.wg, ch)
|
||||
g.Go(func() error {
|
||||
return s.worker(ctx, ch)
|
||||
})
|
||||
}
|
||||
|
||||
return s
|
||||
|
@ -71,12 +65,19 @@ func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.C
|
|||
// Save stores the dir d and returns the data once it has been completed.
|
||||
func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) FutureTree {
|
||||
ch := make(chan saveTreeResponse, 1)
|
||||
s.ch <- saveTreeJob{
|
||||
job := saveTreeJob{
|
||||
snPath: snPath,
|
||||
node: node,
|
||||
nodes: nodes,
|
||||
ch: ch,
|
||||
}
|
||||
select {
|
||||
case s.ch <- job:
|
||||
case <-ctx.Done():
|
||||
debug.Log("refusing to save job, context is cancelled: %v", ctx.Err())
|
||||
close(ch)
|
||||
return FutureTree{ch: ch}
|
||||
}
|
||||
|
||||
return FutureTree{ch: ch}
|
||||
}
|
||||
|
@ -91,7 +92,6 @@ type saveTreeJob struct {
|
|||
type saveTreeResponse struct {
|
||||
node *restic.Node
|
||||
stats ItemStats
|
||||
err error
|
||||
}
|
||||
|
||||
// save stores the nodes as a tree in the repo.
|
||||
|
@ -137,21 +137,24 @@ func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node,
|
|||
return node, stats, nil
|
||||
}
|
||||
|
||||
func (s *TreeSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveTreeJob) {
|
||||
defer wg.Done()
|
||||
func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error {
|
||||
for {
|
||||
var job saveTreeJob
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
return nil
|
||||
case job = <-jobs:
|
||||
}
|
||||
|
||||
node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes)
|
||||
if err != nil {
|
||||
debug.Log("error saving tree blob: %v", err)
|
||||
return errors.Fatalf("unable to save data: %v", err)
|
||||
}
|
||||
|
||||
job.ch <- saveTreeResponse{
|
||||
node: node,
|
||||
stats: stats,
|
||||
err: err,
|
||||
}
|
||||
close(job.ch)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue