Merge pull request #2963 from MichaelEischer/fix-status-deadlock
backup: Fix possible deadlock of scanner goroutine
This commit is contained in:
commit
5fd3dbccb7
3 changed files with 82 additions and 36 deletions
7
changelog/unreleased/issue-2834
Normal file
7
changelog/unreleased/issue-2834
Normal file
|
@ -0,0 +1,7 @@
|
|||
Bugfix: Fix rare cases of backup command hanging forever
|
||||
|
||||
We've fixed an issue with the backup progress reporting which could cause
|
||||
restic to hang forever right before finishing a backup.
|
||||
|
||||
https://github.com/restic/restic/issues/2834
|
||||
https://github.com/restic/restic/pull/2963
|
|
@ -41,6 +41,7 @@ type Backup struct {
|
|||
errCh chan struct{}
|
||||
workerCh chan fileWorkerMessage
|
||||
finished chan struct{}
|
||||
closed chan struct{}
|
||||
|
||||
summary struct {
|
||||
sync.Mutex
|
||||
|
@ -71,6 +72,7 @@ func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
|
|||
errCh: make(chan struct{}),
|
||||
workerCh: make(chan fileWorkerMessage),
|
||||
finished: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,6 +90,9 @@ func (b *Backup) Run(ctx context.Context) error {
|
|||
|
||||
t := time.NewTicker(time.Second)
|
||||
defer t.Stop()
|
||||
defer close(b.closed)
|
||||
// Reset status when finished
|
||||
defer b.term.SetStatus([]string{""})
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -192,20 +197,27 @@ func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error {
|
|||
// Error is the error callback function for the archiver, it prints the error and returns nil.
|
||||
func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
|
||||
b.E("error: %v\n", err)
|
||||
b.errCh <- struct{}{}
|
||||
select {
|
||||
case b.errCh <- struct{}{}:
|
||||
case <-b.closed:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartFile is called when a file is being processed by a worker.
|
||||
func (b *Backup) StartFile(filename string) {
|
||||
b.workerCh <- fileWorkerMessage{
|
||||
filename: filename,
|
||||
select {
|
||||
case b.workerCh <- fileWorkerMessage{filename: filename}:
|
||||
case <-b.closed:
|
||||
}
|
||||
}
|
||||
|
||||
// CompleteBlob is called for all saved blobs for files.
|
||||
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
|
||||
b.processedCh <- counter{Bytes: bytes}
|
||||
select {
|
||||
case b.processedCh <- counter{Bytes: bytes}:
|
||||
case <-b.closed:
|
||||
}
|
||||
}
|
||||
|
||||
func formatPercent(numerator uint64, denominator uint64) string {
|
||||
|
@ -270,22 +282,28 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
|
|||
|
||||
if current == nil {
|
||||
// error occurred, tell the status display to remove the line
|
||||
b.workerCh <- fileWorkerMessage{
|
||||
filename: item,
|
||||
done: true,
|
||||
select {
|
||||
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
|
||||
case <-b.closed:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
switch current.Type {
|
||||
case "file":
|
||||
b.processedCh <- counter{Files: 1}
|
||||
b.workerCh <- fileWorkerMessage{
|
||||
filename: item,
|
||||
done: true,
|
||||
select {
|
||||
case b.processedCh <- counter{Files: 1}:
|
||||
case <-b.closed:
|
||||
}
|
||||
select {
|
||||
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
|
||||
case <-b.closed:
|
||||
}
|
||||
case "dir":
|
||||
b.processedCh <- counter{Dirs: 1}
|
||||
select {
|
||||
case b.processedCh <- counter{Dirs: 1}:
|
||||
case <-b.closed:
|
||||
}
|
||||
}
|
||||
|
||||
if current.Type == "dir" {
|
||||
|
@ -310,10 +328,9 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
|
|||
}
|
||||
|
||||
} else if current.Type == "file" {
|
||||
|
||||
b.workerCh <- fileWorkerMessage{
|
||||
done: true,
|
||||
filename: item,
|
||||
select {
|
||||
case b.workerCh <- fileWorkerMessage{done: true, filename: item}:
|
||||
case <-b.closed:
|
||||
}
|
||||
|
||||
if previous == nil {
|
||||
|
@ -342,7 +359,7 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
|
|||
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
|
||||
select {
|
||||
case b.totalCh <- counter{Files: s.Files, Dirs: s.Dirs, Bytes: s.Bytes}:
|
||||
case <-b.finished:
|
||||
case <-b.closed:
|
||||
}
|
||||
|
||||
if item == "" {
|
||||
|
@ -357,7 +374,10 @@ func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
|
|||
|
||||
// Finish prints the finishing messages.
|
||||
func (b *Backup) Finish(snapshotID restic.ID) {
|
||||
close(b.finished)
|
||||
select {
|
||||
case b.finished <- struct{}{}:
|
||||
case <-b.closed:
|
||||
}
|
||||
|
||||
b.P("\n")
|
||||
b.P("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged)
|
||||
|
|
|
@ -42,6 +42,7 @@ type Backup struct {
|
|||
errCh chan struct{}
|
||||
workerCh chan fileWorkerMessage
|
||||
finished chan struct{}
|
||||
closed chan struct{}
|
||||
|
||||
summary struct {
|
||||
sync.Mutex
|
||||
|
@ -72,6 +73,7 @@ func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
|
|||
errCh: make(chan struct{}),
|
||||
workerCh: make(chan fileWorkerMessage),
|
||||
finished: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,6 +105,7 @@ func (b *Backup) Run(ctx context.Context) error {
|
|||
|
||||
t := time.NewTicker(time.Second)
|
||||
defer t.Stop()
|
||||
defer close(b.closed)
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -200,20 +203,27 @@ func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
|
|||
During: "archival",
|
||||
Item: item,
|
||||
})
|
||||
b.errCh <- struct{}{}
|
||||
select {
|
||||
case b.errCh <- struct{}{}:
|
||||
case <-b.closed:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartFile is called when a file is being processed by a worker.
|
||||
func (b *Backup) StartFile(filename string) {
|
||||
b.workerCh <- fileWorkerMessage{
|
||||
filename: filename,
|
||||
select {
|
||||
case b.workerCh <- fileWorkerMessage{filename: filename}:
|
||||
case <-b.closed:
|
||||
}
|
||||
}
|
||||
|
||||
// CompleteBlob is called for all saved blobs for files.
|
||||
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
|
||||
b.processedCh <- counter{Bytes: bytes}
|
||||
select {
|
||||
case b.processedCh <- counter{Bytes: bytes}:
|
||||
case <-b.closed:
|
||||
}
|
||||
}
|
||||
|
||||
// CompleteItem is the status callback function for the archiver when a
|
||||
|
@ -225,9 +235,9 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
|
|||
|
||||
if current == nil {
|
||||
// error occurred, tell the status display to remove the line
|
||||
b.workerCh <- fileWorkerMessage{
|
||||
filename: item,
|
||||
done: true,
|
||||
select {
|
||||
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
|
||||
case <-b.closed:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -236,13 +246,19 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
|
|||
|
||||
switch current.Type {
|
||||
case "file":
|
||||
b.processedCh <- counter{Files: 1}
|
||||
b.workerCh <- fileWorkerMessage{
|
||||
filename: item,
|
||||
done: true,
|
||||
select {
|
||||
case b.processedCh <- counter{Files: 1}:
|
||||
case <-b.closed:
|
||||
}
|
||||
select {
|
||||
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
|
||||
case <-b.closed:
|
||||
}
|
||||
case "dir":
|
||||
b.processedCh <- counter{Dirs: 1}
|
||||
select {
|
||||
case b.processedCh <- counter{Dirs: 1}:
|
||||
case <-b.closed:
|
||||
}
|
||||
}
|
||||
|
||||
if current.Type == "dir" {
|
||||
|
@ -291,10 +307,9 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
|
|||
}
|
||||
|
||||
} else if current.Type == "file" {
|
||||
|
||||
b.workerCh <- fileWorkerMessage{
|
||||
done: true,
|
||||
filename: item,
|
||||
select {
|
||||
case b.workerCh <- fileWorkerMessage{done: true, filename: item}:
|
||||
case <-b.closed:
|
||||
}
|
||||
|
||||
if previous == nil {
|
||||
|
@ -345,7 +360,7 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
|
|||
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
|
||||
select {
|
||||
case b.totalCh <- counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}:
|
||||
case <-b.finished:
|
||||
case <-b.closed:
|
||||
}
|
||||
|
||||
if item == "" {
|
||||
|
@ -365,7 +380,11 @@ func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
|
|||
|
||||
// Finish prints the finishing messages.
|
||||
func (b *Backup) Finish(snapshotID restic.ID) {
|
||||
close(b.finished)
|
||||
select {
|
||||
case b.finished <- struct{}{}:
|
||||
case <-b.closed:
|
||||
}
|
||||
|
||||
b.print(summaryOutput{
|
||||
MessageType: "summary",
|
||||
FilesNew: b.summary.Files.New,
|
||||
|
|
Loading…
Reference in a new issue