Merge pull request #2692 from MichaelEischer/fix-archiveraborttest
archiver: Fix race condition triggered by TestArchiverAbortEarlyOnError
This commit is contained in:
commit
f033850aa0
6 changed files with 16 additions and 35 deletions
|
@ -22,8 +22,7 @@ type BlobSaver struct {
|
|||
m sync.Mutex
|
||||
knownBlobs restic.BlobSet
|
||||
|
||||
ch chan<- saveBlobJob
|
||||
done <-chan struct{}
|
||||
ch chan<- saveBlobJob
|
||||
}
|
||||
|
||||
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
|
||||
|
@ -34,7 +33,6 @@ func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *
|
|||
repo: repo,
|
||||
knownBlobs: restic.NewBlobSet(),
|
||||
ch: ch,
|
||||
done: t.Dying(),
|
||||
}
|
||||
|
||||
for i := uint(0); i < workers; i++ {
|
||||
|
@ -53,10 +51,6 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) Fu
|
|||
ch := make(chan saveBlobResponse, 1)
|
||||
select {
|
||||
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
|
||||
case <-s.done:
|
||||
debug.Log("not sending job, BlobSaver is done")
|
||||
close(ch)
|
||||
return FutureBlob{ch: ch}
|
||||
case <-ctx.Done():
|
||||
debug.Log("not sending job, context is cancelled")
|
||||
close(ch)
|
||||
|
|
|
@ -38,12 +38,12 @@ func TestBlobSaver(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var tmb tomb.Tomb
|
||||
tmb, ctx := tomb.WithContext(ctx)
|
||||
saver := &saveFail{
|
||||
idx: repository.NewIndex(),
|
||||
}
|
||||
|
||||
b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU()))
|
||||
b := NewBlobSaver(ctx, tmb, saver, uint(runtime.NumCPU()))
|
||||
|
||||
var results []FutureBlob
|
||||
|
||||
|
@ -84,13 +84,13 @@ func TestBlobSaverError(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var tmb tomb.Tomb
|
||||
tmb, ctx := tomb.WithContext(ctx)
|
||||
saver := &saveFail{
|
||||
idx: repository.NewIndex(),
|
||||
failAt: int32(test.failAt),
|
||||
}
|
||||
|
||||
b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU()))
|
||||
b := NewBlobSaver(ctx, tmb, saver, uint(runtime.NumCPU()))
|
||||
|
||||
var results []FutureBlob
|
||||
|
||||
|
|
|
@ -58,8 +58,7 @@ type FileSaver struct {
|
|||
|
||||
pol chunker.Pol
|
||||
|
||||
ch chan<- saveFileJob
|
||||
done <-chan struct{}
|
||||
ch chan<- saveFileJob
|
||||
|
||||
CompleteBlob func(filename string, bytes uint64)
|
||||
|
||||
|
@ -80,7 +79,6 @@ func NewFileSaver(ctx context.Context, t *tomb.Tomb, save SaveBlobFn, pol chunke
|
|||
saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize),
|
||||
pol: pol,
|
||||
ch: ch,
|
||||
done: t.Dying(),
|
||||
|
||||
CompleteBlob: func(string, uint64) {},
|
||||
}
|
||||
|
@ -113,11 +111,6 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os
|
|||
|
||||
select {
|
||||
case s.ch <- job:
|
||||
case <-s.done:
|
||||
debug.Log("not sending job, FileSaver is done")
|
||||
_ = file.Close()
|
||||
close(ch)
|
||||
return FutureFile{ch: ch}
|
||||
case <-ctx.Done():
|
||||
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
|
||||
_ = file.Close()
|
||||
|
|
|
@ -30,8 +30,8 @@ func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) {
|
|||
return files, cleanup
|
||||
}
|
||||
|
||||
func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, *tomb.Tomb) {
|
||||
var tmb tomb.Tomb
|
||||
func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *tomb.Tomb) {
|
||||
tmb, ctx := tomb.WithContext(ctx)
|
||||
|
||||
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob {
|
||||
ch := make(chan saveBlobResponse)
|
||||
|
@ -45,10 +45,10 @@ func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, *tomb.Tomb)
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewFileSaver(ctx, &tmb, saveBlob, pol, workers, workers)
|
||||
s := NewFileSaver(ctx, tmb, saveBlob, pol, workers, workers)
|
||||
s.NodeFromFileInfo = restic.NodeFromFileInfo
|
||||
|
||||
return s, &tmb
|
||||
return s, ctx, tmb
|
||||
}
|
||||
|
||||
func TestFileSaver(t *testing.T) {
|
||||
|
@ -62,7 +62,7 @@ func TestFileSaver(t *testing.T) {
|
|||
completeFn := func(*restic.Node, ItemStats) {}
|
||||
|
||||
testFs := fs.Local{}
|
||||
s, tmb := startFileSaver(ctx, t)
|
||||
s, ctx, tmb := startFileSaver(ctx, t)
|
||||
|
||||
var results []FutureFile
|
||||
|
||||
|
|
|
@ -42,8 +42,7 @@ type TreeSaver struct {
|
|||
saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error)
|
||||
errFn ErrorFunc
|
||||
|
||||
ch chan<- saveTreeJob
|
||||
done <-chan struct{}
|
||||
ch chan<- saveTreeJob
|
||||
}
|
||||
|
||||
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
||||
|
@ -53,7 +52,6 @@ func NewTreeSaver(ctx context.Context, t *tomb.Tomb, treeWorkers uint, saveTree
|
|||
|
||||
s := &TreeSaver{
|
||||
ch: ch,
|
||||
done: t.Dying(),
|
||||
saveTree: saveTree,
|
||||
errFn: errFn,
|
||||
}
|
||||
|
@ -78,10 +76,6 @@ func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node,
|
|||
}
|
||||
select {
|
||||
case s.ch <- job:
|
||||
case <-s.done:
|
||||
debug.Log("not saving tree, TreeSaver is done")
|
||||
close(ch)
|
||||
return FutureTree{ch: ch}
|
||||
case <-ctx.Done():
|
||||
debug.Log("not saving tree, context is cancelled")
|
||||
close(ch)
|
||||
|
|
|
@ -17,7 +17,7 @@ func TestTreeSaver(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var tmb tomb.Tomb
|
||||
tmb, ctx := tomb.WithContext(ctx)
|
||||
|
||||
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) {
|
||||
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
|
||||
|
@ -27,7 +27,7 @@ func TestTreeSaver(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn)
|
||||
b := NewTreeSaver(ctx, tmb, uint(runtime.NumCPU()), saveFn, errFn)
|
||||
|
||||
var results []FutureTree
|
||||
|
||||
|
@ -71,7 +71,7 @@ func TestTreeSaverError(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var tmb tomb.Tomb
|
||||
tmb, ctx := tomb.WithContext(ctx)
|
||||
|
||||
var num int32
|
||||
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) {
|
||||
|
@ -88,7 +88,7 @@ func TestTreeSaverError(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn)
|
||||
b := NewTreeSaver(ctx, tmb, uint(runtime.NumCPU()), saveFn, errFn)
|
||||
|
||||
var results []FutureTree
|
||||
|
||||
|
|
Loading…
Reference in a new issue