Merge pull request #1778 from restic/fix-1771
archiver: Improve error handling
This commit is contained in:
commit
159badf5ba
10 changed files with 587 additions and 57 deletions
|
@ -467,7 +467,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
p.V("start backup on %v", targets)
|
p.V("start backup on %v", targets)
|
||||||
_, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts)
|
_, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.Fatalf("unable to save snapshot: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Finish()
|
p.Finish()
|
||||||
|
|
|
@ -218,6 +218,7 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
// test if context has been cancelled
|
// test if context has been cancelled
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
|
debug.Log("context has been cancelled, aborting")
|
||||||
return FutureTree{}, ctx.Err()
|
return FutureTree{}, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -263,14 +264,15 @@ type FutureNode struct {
|
||||||
|
|
||||||
isFile bool
|
isFile bool
|
||||||
file FutureFile
|
file FutureFile
|
||||||
isDir bool
|
isTree bool
|
||||||
dir FutureTree
|
tree FutureTree
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fn *FutureNode) wait(ctx context.Context) {
|
func (fn *FutureNode) wait(ctx context.Context) {
|
||||||
switch {
|
switch {
|
||||||
case fn.isFile:
|
case fn.isFile:
|
||||||
// wait for and collect the data for the file
|
// wait for and collect the data for the file
|
||||||
|
fn.file.Wait(ctx)
|
||||||
fn.node = fn.file.Node()
|
fn.node = fn.file.Node()
|
||||||
fn.err = fn.file.Err()
|
fn.err = fn.file.Err()
|
||||||
fn.stats = fn.file.Stats()
|
fn.stats = fn.file.Stats()
|
||||||
|
@ -279,19 +281,21 @@ func (fn *FutureNode) wait(ctx context.Context) {
|
||||||
fn.file = FutureFile{}
|
fn.file = FutureFile{}
|
||||||
fn.isFile = false
|
fn.isFile = false
|
||||||
|
|
||||||
case fn.isDir:
|
case fn.isTree:
|
||||||
// wait for and collect the data for the dir
|
// wait for and collect the data for the dir
|
||||||
fn.node = fn.dir.Node()
|
fn.tree.Wait(ctx)
|
||||||
fn.stats = fn.dir.Stats()
|
fn.node = fn.tree.Node()
|
||||||
|
fn.stats = fn.tree.Stats()
|
||||||
|
|
||||||
// ensure the other stuff can be garbage-collected
|
// ensure the other stuff can be garbage-collected
|
||||||
fn.dir = FutureTree{}
|
fn.tree = FutureTree{}
|
||||||
fn.isDir = false
|
fn.isTree = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save saves a target (file or directory) to the repo. If the item is
|
// Save saves a target (file or directory) to the repo. If the item is
|
||||||
// excluded,this function returns a nil node and error.
|
// excluded,this function returns a nil node and error, with excluded set to
|
||||||
|
// true.
|
||||||
//
|
//
|
||||||
// Errors and completion is needs to be handled by the caller.
|
// Errors and completion is needs to be handled by the caller.
|
||||||
//
|
//
|
||||||
|
@ -390,11 +394,12 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
oldSubtree := arch.loadSubtree(ctx, previous)
|
oldSubtree := arch.loadSubtree(ctx, previous)
|
||||||
|
|
||||||
fn.isDir = true
|
fn.isTree = true
|
||||||
fn.dir, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree)
|
fn.tree, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start))
|
arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start))
|
||||||
} else {
|
} else {
|
||||||
|
debug.Log("SaveDir for %v returned error: %v", snPath, err)
|
||||||
return FutureNode{}, false, err
|
return FutureNode{}, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,7 +482,16 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
|
||||||
|
|
||||||
futureNodes := make(map[string]FutureNode)
|
futureNodes := make(map[string]FutureNode)
|
||||||
|
|
||||||
for name, subatree := range atree.Nodes {
|
// iterate over the nodes of atree in lexicographic (=deterministic) order
|
||||||
|
names := make([]string, 0, len(atree.Nodes))
|
||||||
|
for name := range atree.Nodes {
|
||||||
|
names = append(names, name)
|
||||||
|
}
|
||||||
|
sort.Stable(sort.StringSlice(names))
|
||||||
|
|
||||||
|
for _, name := range names {
|
||||||
|
subatree := atree.Nodes[name]
|
||||||
|
|
||||||
// test if context has been cancelled
|
// test if context has been cancelled
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
|
@ -713,13 +727,13 @@ func (arch *Archiver) runWorkers(ctx context.Context, t *tomb.Tomb) {
|
||||||
|
|
||||||
arch.fileSaver = NewFileSaver(ctx, t,
|
arch.fileSaver = NewFileSaver(ctx, t,
|
||||||
arch.FS,
|
arch.FS,
|
||||||
arch.blobSaver,
|
arch.blobSaver.Save,
|
||||||
arch.Repo.Config().ChunkerPolynomial,
|
arch.Repo.Config().ChunkerPolynomial,
|
||||||
arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency)
|
arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency)
|
||||||
arch.fileSaver.CompleteBlob = arch.CompleteBlob
|
arch.fileSaver.CompleteBlob = arch.CompleteBlob
|
||||||
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
|
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
|
||||||
|
|
||||||
arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error)
|
arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot saves several targets and returns a snapshot.
|
// Snapshot saves several targets and returns a snapshot.
|
||||||
|
@ -754,7 +768,8 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
|
||||||
|
|
||||||
t.Kill(nil)
|
t.Kill(nil)
|
||||||
werr := t.Wait()
|
werr := t.Wait()
|
||||||
if err != nil && errors.Cause(err) == context.Canceled {
|
debug.Log("err is %v, werr is %v", err, werr)
|
||||||
|
if err == nil || errors.Cause(err) == context.Canceled {
|
||||||
err = werr
|
err = werr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,11 +8,13 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/checker"
|
"github.com/restic/restic/internal/checker"
|
||||||
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/fs"
|
"github.com/restic/restic/internal/fs"
|
||||||
"github.com/restic/restic/internal/repository"
|
"github.com/restic/restic/internal/repository"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
@ -70,6 +72,8 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
|
||||||
}
|
}
|
||||||
|
|
||||||
res := arch.fileSaver.Save(ctx, "/", file, fi, start, complete)
|
res := arch.fileSaver.Save(ctx, "/", file, fi, start, complete)
|
||||||
|
|
||||||
|
res.Wait(ctx)
|
||||||
if res.Err() != nil {
|
if res.Err() != nil {
|
||||||
t.Fatal(res.Err())
|
t.Fatal(res.Err())
|
||||||
}
|
}
|
||||||
|
@ -620,6 +624,7 @@ func TestArchiverSaveDir(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ft.Wait(ctx)
|
||||||
node, stats := ft.Node(), ft.Stats()
|
node, stats := ft.Node(), ft.Stats()
|
||||||
|
|
||||||
tmb.Kill(nil)
|
tmb.Kill(nil)
|
||||||
|
@ -701,6 +706,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ft.Wait(ctx)
|
||||||
node, stats := ft.Node(), ft.Stats()
|
node, stats := ft.Node(), ft.Stats()
|
||||||
|
|
||||||
tmb.Kill(nil)
|
tmb.Kill(nil)
|
||||||
|
@ -1594,3 +1600,141 @@ func TestArchiverErrorReporting(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TrackFS keeps track which files are opened. For some files, an error is injected.
|
||||||
|
type TrackFS struct {
|
||||||
|
fs.FS
|
||||||
|
|
||||||
|
errorOn map[string]error
|
||||||
|
|
||||||
|
opened map[string]uint
|
||||||
|
m sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *TrackFS) Open(name string) (fs.File, error) {
|
||||||
|
m.m.Lock()
|
||||||
|
m.opened[name]++
|
||||||
|
m.m.Unlock()
|
||||||
|
|
||||||
|
return m.FS.Open(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *TrackFS) OpenFile(name string, flag int, perm os.FileMode) (fs.File, error) {
|
||||||
|
m.m.Lock()
|
||||||
|
m.opened[name]++
|
||||||
|
m.m.Unlock()
|
||||||
|
|
||||||
|
return m.FS.OpenFile(name, flag, perm)
|
||||||
|
}
|
||||||
|
|
||||||
|
type failSaveRepo struct {
|
||||||
|
restic.Repository
|
||||||
|
failAfter int32
|
||||||
|
cnt int32
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) {
|
||||||
|
val := atomic.AddInt32(&f.cnt, 1)
|
||||||
|
if val >= f.failAfter {
|
||||||
|
return restic.ID{}, f.err
|
||||||
|
}
|
||||||
|
|
||||||
|
return f.Repository.SaveBlob(ctx, t, buf, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestArchiverAbortEarlyOnError(t *testing.T) {
|
||||||
|
var testErr = errors.New("test error")
|
||||||
|
|
||||||
|
var tests = []struct {
|
||||||
|
src TestDir
|
||||||
|
wantOpen map[string]uint
|
||||||
|
failAfter uint // error after so many files have been saved to the repo
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
src: TestDir{
|
||||||
|
"dir": TestDir{
|
||||||
|
"bar": TestFile{Content: "foobar"},
|
||||||
|
"baz": TestFile{Content: "foobar"},
|
||||||
|
"foo": TestFile{Content: "foobar"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantOpen: map[string]uint{
|
||||||
|
filepath.FromSlash("dir/bar"): 1,
|
||||||
|
filepath.FromSlash("dir/baz"): 1,
|
||||||
|
filepath.FromSlash("dir/foo"): 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
src: TestDir{
|
||||||
|
"dir": TestDir{
|
||||||
|
"file1": TestFile{Content: string(restictest.Random(3, 4*1024*1024))},
|
||||||
|
"file2": TestFile{Content: string(restictest.Random(3, 4*1024*1024))},
|
||||||
|
"file3": TestFile{Content: string(restictest.Random(3, 4*1024*1024))},
|
||||||
|
"file4": TestFile{Content: string(restictest.Random(3, 4*1024*1024))},
|
||||||
|
"file5": TestFile{Content: string(restictest.Random(3, 4*1024*1024))},
|
||||||
|
"file6": TestFile{Content: string(restictest.Random(3, 4*1024*1024))},
|
||||||
|
"file7": TestFile{Content: string(restictest.Random(3, 4*1024*1024))},
|
||||||
|
"file8": TestFile{Content: string(restictest.Random(3, 4*1024*1024))},
|
||||||
|
"file9": TestFile{Content: string(restictest.Random(3, 4*1024*1024))},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantOpen: map[string]uint{
|
||||||
|
filepath.FromSlash("dir/file1"): 1,
|
||||||
|
filepath.FromSlash("dir/file2"): 1,
|
||||||
|
filepath.FromSlash("dir/file3"): 1,
|
||||||
|
filepath.FromSlash("dir/file7"): 0,
|
||||||
|
filepath.FromSlash("dir/file8"): 0,
|
||||||
|
filepath.FromSlash("dir/file9"): 0,
|
||||||
|
},
|
||||||
|
failAfter: 5,
|
||||||
|
err: testErr,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
back := fs.TestChdir(t, tempdir)
|
||||||
|
defer back()
|
||||||
|
|
||||||
|
testFS := &TrackFS{
|
||||||
|
FS: fs.Track{fs.Local{}},
|
||||||
|
opened: make(map[string]uint),
|
||||||
|
}
|
||||||
|
|
||||||
|
if testFS.errorOn == nil {
|
||||||
|
testFS.errorOn = make(map[string]error)
|
||||||
|
}
|
||||||
|
|
||||||
|
testRepo := &failSaveRepo{
|
||||||
|
Repository: repo,
|
||||||
|
failAfter: int32(test.failAfter),
|
||||||
|
err: test.err,
|
||||||
|
}
|
||||||
|
|
||||||
|
arch := New(testRepo, testFS, Options{})
|
||||||
|
|
||||||
|
_, _, err := arch.Snapshot(ctx, []string{"."}, SnapshotOptions{Time: time.Now()})
|
||||||
|
if errors.Cause(err) != test.err {
|
||||||
|
t.Errorf("expected error (%v) not found, got %v", test.err, errors.Cause(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Snapshot return error: %v", err)
|
||||||
|
|
||||||
|
t.Logf("track fs: %v", testFS.opened)
|
||||||
|
|
||||||
|
for k, v := range test.wantOpen {
|
||||||
|
if testFS.opened[k] != v {
|
||||||
|
t.Errorf("opened %v %d times, want %d", k, testFS.opened[k], v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -5,8 +5,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
"github.com/restic/restic/internal/errors"
|
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
tomb "gopkg.in/tomb.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Saver allows saving a blob.
|
// Saver allows saving a blob.
|
||||||
|
@ -22,22 +22,24 @@ type BlobSaver struct {
|
||||||
m sync.Mutex
|
m sync.Mutex
|
||||||
knownBlobs restic.BlobSet
|
knownBlobs restic.BlobSet
|
||||||
|
|
||||||
ch chan<- saveBlobJob
|
ch chan<- saveBlobJob
|
||||||
|
done <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
|
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
|
||||||
// when ctx is cancelled.
|
// when ctx is cancelled.
|
||||||
func NewBlobSaver(ctx context.Context, g Goer, repo Saver, workers uint) *BlobSaver {
|
func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *BlobSaver {
|
||||||
ch := make(chan saveBlobJob)
|
ch := make(chan saveBlobJob)
|
||||||
s := &BlobSaver{
|
s := &BlobSaver{
|
||||||
repo: repo,
|
repo: repo,
|
||||||
knownBlobs: restic.NewBlobSet(),
|
knownBlobs: restic.NewBlobSet(),
|
||||||
ch: ch,
|
ch: ch,
|
||||||
|
done: t.Dying(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := uint(0); i < workers; i++ {
|
for i := uint(0); i < workers; i++ {
|
||||||
g.Go(func() error {
|
t.Go(func() error {
|
||||||
return s.worker(ctx, ch)
|
return s.worker(t.Context(ctx), ch)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,6 +53,10 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) Fu
|
||||||
ch := make(chan saveBlobResponse, 1)
|
ch := make(chan saveBlobResponse, 1)
|
||||||
select {
|
select {
|
||||||
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
|
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
|
||||||
|
case <-s.done:
|
||||||
|
debug.Log("not sending job, BlobSaver is done")
|
||||||
|
close(ch)
|
||||||
|
return FutureBlob{ch: ch}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
debug.Log("not sending job, context is cancelled")
|
debug.Log("not sending job, context is cancelled")
|
||||||
close(ch)
|
close(ch)
|
||||||
|
@ -139,7 +145,7 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte)
|
||||||
// otherwise we're responsible for saving it
|
// otherwise we're responsible for saving it
|
||||||
_, err := s.repo.SaveBlob(ctx, t, buf, id)
|
_, err := s.repo.SaveBlob(ctx, t, buf, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return saveBlobResponse{}, errors.Fatalf("unable to save data: %v", err)
|
return saveBlobResponse{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return saveBlobResponse{
|
return saveBlobResponse{
|
||||||
|
@ -153,14 +159,13 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
|
||||||
var job saveBlobJob
|
var job saveBlobJob
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
debug.Log("context is cancelled, exiting: %v", ctx.Err())
|
|
||||||
return nil
|
return nil
|
||||||
case job = <-jobs:
|
case job = <-jobs:
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
|
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("saveBlob returned error: %v", err)
|
debug.Log("saveBlob returned error, exiting: %v", err)
|
||||||
close(job.ch)
|
close(job.ch)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
115
internal/archiver/blob_saver_test.go
Normal file
115
internal/archiver/blob_saver_test.go
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
package archiver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/restic/restic/internal/errors"
|
||||||
|
"github.com/restic/restic/internal/repository"
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
tomb "gopkg.in/tomb.v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errTest = errors.New("test error")
|
||||||
|
|
||||||
|
type saveFail struct {
|
||||||
|
idx restic.Index
|
||||||
|
cnt int32
|
||||||
|
failAt int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) {
|
||||||
|
val := atomic.AddInt32(&b.cnt, 1)
|
||||||
|
if val == b.failAt {
|
||||||
|
return restic.ID{}, errTest
|
||||||
|
}
|
||||||
|
|
||||||
|
return id, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *saveFail) Index() restic.Index {
|
||||||
|
return b.idx
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlobSaver(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var tmb tomb.Tomb
|
||||||
|
saver := &saveFail{
|
||||||
|
idx: repository.NewIndex(),
|
||||||
|
}
|
||||||
|
|
||||||
|
b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU()))
|
||||||
|
|
||||||
|
var results []FutureBlob
|
||||||
|
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
|
||||||
|
fb := b.Save(ctx, restic.DataBlob, buf)
|
||||||
|
results = append(results, fb)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, blob := range results {
|
||||||
|
blob.Wait(ctx)
|
||||||
|
if blob.Known() {
|
||||||
|
t.Errorf("blob %v is known, that should not be the case", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tmb.Kill(nil)
|
||||||
|
|
||||||
|
err := tmb.Wait()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlobSaverError(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
blobs int
|
||||||
|
failAt int
|
||||||
|
}{
|
||||||
|
{20, 2},
|
||||||
|
{20, 5},
|
||||||
|
{20, 15},
|
||||||
|
{200, 150},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var tmb tomb.Tomb
|
||||||
|
saver := &saveFail{
|
||||||
|
idx: repository.NewIndex(),
|
||||||
|
failAt: int32(test.failAt),
|
||||||
|
}
|
||||||
|
|
||||||
|
b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU()))
|
||||||
|
|
||||||
|
var results []FutureBlob
|
||||||
|
|
||||||
|
for i := 0; i < test.blobs; i++ {
|
||||||
|
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
|
||||||
|
fb := b.Save(ctx, restic.DataBlob, buf)
|
||||||
|
results = append(results, fb)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmb.Kill(nil)
|
||||||
|
|
||||||
|
err := tmb.Wait()
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("expected error not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != errTest {
|
||||||
|
t.Fatalf("unexpected error found: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
12
internal/archiver/doc.go
Normal file
12
internal/archiver/doc.go
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
// Package archiver contains the code which reads files, splits them into
|
||||||
|
// chunks and saves the data to the repository.
|
||||||
|
//
|
||||||
|
// An Archiver has a number of worker goroutines handling saving the different
|
||||||
|
// data structures to the repository, the details are implemented by the
|
||||||
|
// FileSaver, BlobSaver, and TreeSaver types.
|
||||||
|
//
|
||||||
|
// The main goroutine (the one calling Snapshot()) traverses the directory tree
|
||||||
|
// and delegates all work to these worker pools. They return a type
|
||||||
|
// (FutureFile, FutureBlob, and FutureTree) which can be resolved later, by
|
||||||
|
// calling Wait() on it.
|
||||||
|
package archiver
|
|
@ -10,13 +10,9 @@ import (
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/fs"
|
"github.com/restic/restic/internal/fs"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
tomb "gopkg.in/tomb.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Goer starts a function in a goroutine.
|
|
||||||
type Goer interface {
|
|
||||||
Go(func() error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// FutureFile is returned by Save and will return the data once it
|
// FutureFile is returned by Save and will return the data once it
|
||||||
// has been processed.
|
// has been processed.
|
||||||
type FutureFile struct {
|
type FutureFile struct {
|
||||||
|
@ -24,40 +20,47 @@ type FutureFile struct {
|
||||||
res saveFileResponse
|
res saveFileResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *FutureFile) wait() {
|
// Wait blocks until the result of the save operation is received or ctx is
|
||||||
res, ok := <-s.ch
|
// cancelled.
|
||||||
if ok {
|
func (s *FutureFile) Wait(ctx context.Context) {
|
||||||
s.res = res
|
select {
|
||||||
|
case res, ok := <-s.ch:
|
||||||
|
if ok {
|
||||||
|
s.res = res
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Node returns the node once it is available.
|
// Node returns the node once it is available.
|
||||||
func (s *FutureFile) Node() *restic.Node {
|
func (s *FutureFile) Node() *restic.Node {
|
||||||
s.wait()
|
|
||||||
return s.res.node
|
return s.res.node
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stats returns the stats for the file once they are available.
|
// Stats returns the stats for the file once they are available.
|
||||||
func (s *FutureFile) Stats() ItemStats {
|
func (s *FutureFile) Stats() ItemStats {
|
||||||
s.wait()
|
|
||||||
return s.res.stats
|
return s.res.stats
|
||||||
}
|
}
|
||||||
|
|
||||||
// Err returns the error in case an error occurred.
|
// Err returns the error in case an error occurred.
|
||||||
func (s *FutureFile) Err() error {
|
func (s *FutureFile) Err() error {
|
||||||
s.wait()
|
|
||||||
return s.res.err
|
return s.res.err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SaveBlobFn saves a blob to a repo.
|
||||||
|
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob
|
||||||
|
|
||||||
// FileSaver concurrently saves incoming files to the repo.
|
// FileSaver concurrently saves incoming files to the repo.
|
||||||
type FileSaver struct {
|
type FileSaver struct {
|
||||||
fs fs.FS
|
fs fs.FS
|
||||||
blobSaver *BlobSaver
|
|
||||||
saveFilePool *BufferPool
|
saveFilePool *BufferPool
|
||||||
|
saveBlob SaveBlobFn
|
||||||
|
|
||||||
pol chunker.Pol
|
pol chunker.Pol
|
||||||
|
|
||||||
ch chan<- saveFileJob
|
ch chan<- saveFileJob
|
||||||
|
done <-chan struct{}
|
||||||
|
|
||||||
CompleteBlob func(filename string, bytes uint64)
|
CompleteBlob func(filename string, bytes uint64)
|
||||||
|
|
||||||
|
@ -66,7 +69,7 @@ type FileSaver struct {
|
||||||
|
|
||||||
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
|
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
|
||||||
// started, it is stopped when ctx is cancelled.
|
// started, it is stopped when ctx is cancelled.
|
||||||
func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
|
func NewFileSaver(ctx context.Context, t *tomb.Tomb, fs fs.FS, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
|
||||||
ch := make(chan saveFileJob)
|
ch := make(chan saveFileJob)
|
||||||
|
|
||||||
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
|
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
|
||||||
|
@ -75,17 +78,18 @@ func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, p
|
||||||
|
|
||||||
s := &FileSaver{
|
s := &FileSaver{
|
||||||
fs: fs,
|
fs: fs,
|
||||||
blobSaver: blobSaver,
|
saveBlob: save,
|
||||||
saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize),
|
saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize),
|
||||||
pol: pol,
|
pol: pol,
|
||||||
ch: ch,
|
ch: ch,
|
||||||
|
done: t.Dying(),
|
||||||
|
|
||||||
CompleteBlob: func(string, uint64) {},
|
CompleteBlob: func(string, uint64) {},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := uint(0); i < fileWorkers; i++ {
|
for i := uint(0); i < fileWorkers; i++ {
|
||||||
g.Go(func() error {
|
t.Go(func() error {
|
||||||
s.worker(ctx, ch)
|
s.worker(t.Context(ctx), ch)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -111,8 +115,16 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case s.ch <- job:
|
case s.ch <- job:
|
||||||
|
case <-s.done:
|
||||||
|
debug.Log("not sending job, FileSaver is done")
|
||||||
|
_ = file.Close()
|
||||||
|
close(ch)
|
||||||
|
return FutureFile{ch: ch}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
|
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
|
||||||
|
_ = file.Close()
|
||||||
|
close(ch)
|
||||||
|
return FutureFile{ch: ch}
|
||||||
}
|
}
|
||||||
|
|
||||||
return FutureFile{ch: ch}
|
return FutureFile{ch: ch}
|
||||||
|
@ -182,7 +194,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
|
||||||
return saveFileResponse{err: ctx.Err()}
|
return saveFileResponse{err: ctx.Err()}
|
||||||
}
|
}
|
||||||
|
|
||||||
res := s.blobSaver.Save(ctx, restic.DataBlob, buf)
|
res := s.saveBlob(ctx, restic.DataBlob, buf)
|
||||||
results = append(results, res)
|
results = append(results, res)
|
||||||
|
|
||||||
// test if the context has been cancelled, return the error
|
// test if the context has been cancelled, return the error
|
||||||
|
|
97
internal/archiver/file_saver_test.go
Normal file
97
internal/archiver/file_saver_test.go
Normal file
|
@ -0,0 +1,97 @@
|
||||||
|
package archiver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/restic/chunker"
|
||||||
|
"github.com/restic/restic/internal/fs"
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
"github.com/restic/restic/internal/test"
|
||||||
|
tomb "gopkg.in/tomb.v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) {
|
||||||
|
tempdir, cleanup := test.TempDir(t)
|
||||||
|
|
||||||
|
for i := 0; i < 15; i++ {
|
||||||
|
filename := fmt.Sprintf("testfile-%d", i)
|
||||||
|
err := ioutil.WriteFile(filepath.Join(tempdir, filename), []byte(filename), 0600)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
files = append(files, filepath.Join(tempdir, filename))
|
||||||
|
}
|
||||||
|
|
||||||
|
return files, cleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
func startFileSaver(ctx context.Context, t testing.TB, fs fs.FS) (*FileSaver, *tomb.Tomb) {
|
||||||
|
var tmb tomb.Tomb
|
||||||
|
|
||||||
|
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob {
|
||||||
|
ch := make(chan saveBlobResponse)
|
||||||
|
close(ch)
|
||||||
|
return FutureBlob{ch: ch}
|
||||||
|
}
|
||||||
|
|
||||||
|
workers := uint(runtime.NumCPU())
|
||||||
|
pol, err := chunker.RandomPolynomial()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s := NewFileSaver(ctx, &tmb, fs, saveBlob, pol, workers, workers)
|
||||||
|
s.NodeFromFileInfo = restic.NodeFromFileInfo
|
||||||
|
|
||||||
|
return s, &tmb
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFileSaver(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
files, cleanup := createTestFiles(t, 15)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
startFn := func() {}
|
||||||
|
completeFn := func(*restic.Node, ItemStats) {}
|
||||||
|
|
||||||
|
testFs := fs.Local{}
|
||||||
|
s, tmb := startFileSaver(ctx, t, testFs)
|
||||||
|
|
||||||
|
var results []FutureFile
|
||||||
|
|
||||||
|
for _, filename := range files {
|
||||||
|
f, err := testFs.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fi, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ff := s.Save(ctx, filename, f, fi, startFn, completeFn)
|
||||||
|
results = append(results, ff)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, file := range results {
|
||||||
|
file.Wait(ctx)
|
||||||
|
if file.Err() != nil {
|
||||||
|
t.Errorf("unable to save file: %v", file.Err())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tmb.Kill(nil)
|
||||||
|
|
||||||
|
err := tmb.Wait()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,8 +4,8 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
"github.com/restic/restic/internal/errors"
|
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
tomb "gopkg.in/tomb.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FutureTree is returned by Save and will return the data once it
|
// FutureTree is returned by Save and will return the data once it
|
||||||
|
@ -15,22 +15,25 @@ type FutureTree struct {
|
||||||
res saveTreeResponse
|
res saveTreeResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *FutureTree) wait() {
|
// Wait blocks until the data has been received or ctx is cancelled.
|
||||||
res, ok := <-s.ch
|
func (s *FutureTree) Wait(ctx context.Context) {
|
||||||
if ok {
|
select {
|
||||||
s.res = res
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case res, ok := <-s.ch:
|
||||||
|
if ok {
|
||||||
|
s.res = res
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Node returns the node once it is available.
|
// Node returns the node.
|
||||||
func (s *FutureTree) Node() *restic.Node {
|
func (s *FutureTree) Node() *restic.Node {
|
||||||
s.wait()
|
|
||||||
return s.res.node
|
return s.res.node
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stats returns the stats for the file once they are available.
|
// Stats returns the stats for the file.
|
||||||
func (s *FutureTree) Stats() ItemStats {
|
func (s *FutureTree) Stats() ItemStats {
|
||||||
s.wait()
|
|
||||||
return s.res.stats
|
return s.res.stats
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,23 +42,25 @@ type TreeSaver struct {
|
||||||
saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error)
|
saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error)
|
||||||
errFn ErrorFunc
|
errFn ErrorFunc
|
||||||
|
|
||||||
ch chan<- saveTreeJob
|
ch chan<- saveTreeJob
|
||||||
|
done <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
||||||
// started, it is stopped when ctx is cancelled.
|
// started, it is stopped when ctx is cancelled.
|
||||||
func NewTreeSaver(ctx context.Context, g Goer, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
|
func NewTreeSaver(ctx context.Context, t *tomb.Tomb, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
|
||||||
ch := make(chan saveTreeJob)
|
ch := make(chan saveTreeJob)
|
||||||
|
|
||||||
s := &TreeSaver{
|
s := &TreeSaver{
|
||||||
ch: ch,
|
ch: ch,
|
||||||
|
done: t.Dying(),
|
||||||
saveTree: saveTree,
|
saveTree: saveTree,
|
||||||
errFn: errFn,
|
errFn: errFn,
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := uint(0); i < treeWorkers; i++ {
|
for i := uint(0); i < treeWorkers; i++ {
|
||||||
g.Go(func() error {
|
t.Go(func() error {
|
||||||
return s.worker(ctx, ch)
|
return s.worker(t.Context(ctx), ch)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,8 +78,12 @@ func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node,
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case s.ch <- job:
|
case s.ch <- job:
|
||||||
|
case <-s.done:
|
||||||
|
debug.Log("not saving tree, TreeSaver is done")
|
||||||
|
close(ch)
|
||||||
|
return FutureTree{ch: ch}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
debug.Log("refusing to save job, context is cancelled: %v", ctx.Err())
|
debug.Log("not saving tree, context is cancelled")
|
||||||
close(ch)
|
close(ch)
|
||||||
return FutureTree{ch: ch}
|
return FutureTree{ch: ch}
|
||||||
}
|
}
|
||||||
|
@ -149,7 +158,8 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error {
|
||||||
node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes)
|
node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("error saving tree blob: %v", err)
|
debug.Log("error saving tree blob: %v", err)
|
||||||
return errors.Fatalf("unable to save data: %v", err)
|
close(job.ch)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
job.ch <- saveTreeResponse{
|
job.ch <- saveTreeResponse{
|
||||||
|
|
120
internal/archiver/tree_saver_test.go
Normal file
120
internal/archiver/tree_saver_test.go
Normal file
|
@ -0,0 +1,120 @@
|
||||||
|
package archiver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"runtime"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/restic/restic/internal/errors"
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
tomb "gopkg.in/tomb.v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTreeSaver(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var tmb tomb.Tomb
|
||||||
|
|
||||||
|
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) {
|
||||||
|
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
errFn := func(snPath string, fi os.FileInfo, err error) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn)
|
||||||
|
|
||||||
|
var results []FutureTree
|
||||||
|
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
node := &restic.Node{
|
||||||
|
Name: fmt.Sprintf("file-%d", i),
|
||||||
|
}
|
||||||
|
|
||||||
|
fb := b.Save(ctx, "/", node, nil)
|
||||||
|
results = append(results, fb)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tree := range results {
|
||||||
|
tree.Wait(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmb.Kill(nil)
|
||||||
|
|
||||||
|
err := tmb.Wait()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTreeSaverError(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
trees int
|
||||||
|
failAt int32
|
||||||
|
}{
|
||||||
|
{1, 1},
|
||||||
|
{20, 2},
|
||||||
|
{20, 5},
|
||||||
|
{20, 15},
|
||||||
|
{200, 150},
|
||||||
|
}
|
||||||
|
|
||||||
|
errTest := errors.New("test error")
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var tmb tomb.Tomb
|
||||||
|
|
||||||
|
var num int32
|
||||||
|
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) {
|
||||||
|
val := atomic.AddInt32(&num, 1)
|
||||||
|
if val == test.failAt {
|
||||||
|
t.Logf("sending error for request %v\n", test.failAt)
|
||||||
|
return restic.ID{}, ItemStats{}, errTest
|
||||||
|
}
|
||||||
|
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
errFn := func(snPath string, fi os.FileInfo, err error) error {
|
||||||
|
t.Logf("ignoring error %v\n", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn)
|
||||||
|
|
||||||
|
var results []FutureTree
|
||||||
|
|
||||||
|
for i := 0; i < test.trees; i++ {
|
||||||
|
node := &restic.Node{
|
||||||
|
Name: fmt.Sprintf("file-%d", i),
|
||||||
|
}
|
||||||
|
|
||||||
|
fb := b.Save(ctx, "/", node, nil)
|
||||||
|
results = append(results, fb)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tree := range results {
|
||||||
|
tree.Wait(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmb.Kill(nil)
|
||||||
|
|
||||||
|
err := tmb.Wait()
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("expected error not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != errTest {
|
||||||
|
t.Fatalf("unexpected error found: %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue