forked from TrueCloudLab/restic
8d0ba55ecd
When the backup is interrupted for some reason while the scanner is still active this could lead to a deadlock. Interruptions are triggered by canceling the context object used by both the backup progress UI and the scanner. It is possible that a context is canceled between the respective check in the scanner and it calling the `ReportTotal` method of the UI. The latter method sends a message to the UI goroutine. However, a canceled context will also stop that goroutine, which can cause the channel send operation to block indefinitely. This is resolved by adding a `closed` channel which is closed once the UI goroutine is stopped and serves as an escape hatch for reported UI updates. This change covers not just the ReportTotal method but all potentially affected methods of the progress UI implementation.
398 lines
9.3 KiB
Go
398 lines
9.3 KiB
Go
package ui
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/restic/restic/internal/archiver"
|
|
"github.com/restic/restic/internal/restic"
|
|
"github.com/restic/restic/internal/ui/termstatus"
|
|
)
|
|
|
|
type counter struct {
|
|
Files, Dirs uint
|
|
Bytes uint64
|
|
}
|
|
|
|
type fileWorkerMessage struct {
|
|
filename string
|
|
done bool
|
|
}
|
|
|
|
// Backup reports progress for the `backup` command.
|
|
type Backup struct {
|
|
*Message
|
|
*StdioWrapper
|
|
|
|
MinUpdatePause time.Duration
|
|
|
|
term *termstatus.Terminal
|
|
v uint
|
|
start time.Time
|
|
|
|
totalBytes uint64
|
|
|
|
totalCh chan counter
|
|
processedCh chan counter
|
|
errCh chan struct{}
|
|
workerCh chan fileWorkerMessage
|
|
finished chan struct{}
|
|
closed chan struct{}
|
|
|
|
summary struct {
|
|
sync.Mutex
|
|
Files, Dirs struct {
|
|
New uint
|
|
Changed uint
|
|
Unchanged uint
|
|
}
|
|
ProcessedBytes uint64
|
|
archiver.ItemStats
|
|
}
|
|
}
|
|
|
|
// NewBackup returns a new backup progress reporter.
|
|
func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
|
|
return &Backup{
|
|
Message: NewMessage(term, verbosity),
|
|
StdioWrapper: NewStdioWrapper(term),
|
|
term: term,
|
|
v: verbosity,
|
|
start: time.Now(),
|
|
|
|
// limit to 60fps by default
|
|
MinUpdatePause: time.Second / 60,
|
|
|
|
totalCh: make(chan counter),
|
|
processedCh: make(chan counter),
|
|
errCh: make(chan struct{}),
|
|
workerCh: make(chan fileWorkerMessage),
|
|
finished: make(chan struct{}),
|
|
closed: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Run regularly updates the status lines. It should be called in a separate
|
|
// goroutine.
|
|
func (b *Backup) Run(ctx context.Context) error {
|
|
var (
|
|
lastUpdate time.Time
|
|
total, processed counter
|
|
errors uint
|
|
started bool
|
|
currentFiles = make(map[string]struct{})
|
|
secondsRemaining uint64
|
|
)
|
|
|
|
t := time.NewTicker(time.Second)
|
|
defer t.Stop()
|
|
defer close(b.closed)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-b.finished:
|
|
started = false
|
|
b.term.SetStatus([]string{""})
|
|
case t, ok := <-b.totalCh:
|
|
if ok {
|
|
total = t
|
|
started = true
|
|
} else {
|
|
// scan has finished
|
|
b.totalCh = nil
|
|
b.totalBytes = total.Bytes
|
|
}
|
|
case s := <-b.processedCh:
|
|
processed.Files += s.Files
|
|
processed.Dirs += s.Dirs
|
|
processed.Bytes += s.Bytes
|
|
started = true
|
|
case <-b.errCh:
|
|
errors++
|
|
started = true
|
|
case m := <-b.workerCh:
|
|
if m.done {
|
|
delete(currentFiles, m.filename)
|
|
} else {
|
|
currentFiles[m.filename] = struct{}{}
|
|
}
|
|
case <-t.C:
|
|
if !started {
|
|
continue
|
|
}
|
|
|
|
if b.totalCh == nil {
|
|
secs := float64(time.Since(b.start) / time.Second)
|
|
todo := float64(total.Bytes - processed.Bytes)
|
|
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
|
|
}
|
|
}
|
|
|
|
// limit update frequency
|
|
if time.Since(lastUpdate) < b.MinUpdatePause {
|
|
continue
|
|
}
|
|
lastUpdate = time.Now()
|
|
|
|
b.update(total, processed, errors, currentFiles, secondsRemaining)
|
|
}
|
|
}
|
|
|
|
// update updates the status lines.
|
|
func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) {
|
|
var status string
|
|
if total.Files == 0 && total.Dirs == 0 {
|
|
// no total count available yet
|
|
status = fmt.Sprintf("[%s] %v files, %s, %d errors",
|
|
formatDuration(time.Since(b.start)),
|
|
processed.Files, formatBytes(processed.Bytes), errors,
|
|
)
|
|
} else {
|
|
var eta, percent string
|
|
|
|
if secs > 0 && processed.Bytes < total.Bytes {
|
|
eta = fmt.Sprintf(" ETA %s", formatSeconds(secs))
|
|
percent = formatPercent(processed.Bytes, total.Bytes)
|
|
percent += " "
|
|
}
|
|
|
|
// include totals
|
|
status = fmt.Sprintf("[%s] %s%v files %s, total %v files %v, %d errors%s",
|
|
formatDuration(time.Since(b.start)),
|
|
percent,
|
|
processed.Files,
|
|
formatBytes(processed.Bytes),
|
|
total.Files,
|
|
formatBytes(total.Bytes),
|
|
errors,
|
|
eta,
|
|
)
|
|
}
|
|
|
|
lines := make([]string, 0, len(currentFiles)+1)
|
|
for filename := range currentFiles {
|
|
lines = append(lines, filename)
|
|
}
|
|
sort.Strings(lines)
|
|
lines = append([]string{status}, lines...)
|
|
|
|
b.term.SetStatus(lines)
|
|
}
|
|
|
|
// ScannerError is the error callback function for the scanner, it prints the
|
|
// error in verbose mode and returns nil.
|
|
func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error {
|
|
b.V("scan: %v\n", err)
|
|
return nil
|
|
}
|
|
|
|
// Error is the error callback function for the archiver, it prints the error and returns nil.
|
|
func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
|
|
b.E("error: %v\n", err)
|
|
select {
|
|
case b.errCh <- struct{}{}:
|
|
case <-b.closed:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StartFile is called when a file is being processed by a worker.
|
|
func (b *Backup) StartFile(filename string) {
|
|
select {
|
|
case b.workerCh <- fileWorkerMessage{filename: filename}:
|
|
case <-b.closed:
|
|
}
|
|
}
|
|
|
|
// CompleteBlob is called for all saved blobs for files.
|
|
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
|
|
select {
|
|
case b.processedCh <- counter{Bytes: bytes}:
|
|
case <-b.closed:
|
|
}
|
|
}
|
|
|
|
func formatPercent(numerator uint64, denominator uint64) string {
|
|
if denominator == 0 {
|
|
return ""
|
|
}
|
|
|
|
percent := 100.0 * float64(numerator) / float64(denominator)
|
|
|
|
if percent > 100 {
|
|
percent = 100
|
|
}
|
|
|
|
return fmt.Sprintf("%3.2f%%", percent)
|
|
}
|
|
|
|
func formatSeconds(sec uint64) string {
|
|
hours := sec / 3600
|
|
sec -= hours * 3600
|
|
min := sec / 60
|
|
sec -= min * 60
|
|
if hours > 0 {
|
|
return fmt.Sprintf("%d:%02d:%02d", hours, min, sec)
|
|
}
|
|
|
|
return fmt.Sprintf("%d:%02d", min, sec)
|
|
}
|
|
|
|
func formatDuration(d time.Duration) string {
|
|
sec := uint64(d / time.Second)
|
|
return formatSeconds(sec)
|
|
}
|
|
|
|
func formatBytes(c uint64) string {
|
|
b := float64(c)
|
|
switch {
|
|
case c > 1<<40:
|
|
return fmt.Sprintf("%.3f TiB", b/(1<<40))
|
|
case c > 1<<30:
|
|
return fmt.Sprintf("%.3f GiB", b/(1<<30))
|
|
case c > 1<<20:
|
|
return fmt.Sprintf("%.3f MiB", b/(1<<20))
|
|
case c > 1<<10:
|
|
return fmt.Sprintf("%.3f KiB", b/(1<<10))
|
|
default:
|
|
return fmt.Sprintf("%d B", c)
|
|
}
|
|
}
|
|
|
|
// CompleteItem is the status callback function for the archiver when a
|
|
// file/dir has been saved successfully.
|
|
func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
|
|
b.summary.Lock()
|
|
b.summary.ItemStats.Add(s)
|
|
|
|
// for the last item "/", current is nil
|
|
if current != nil {
|
|
b.summary.ProcessedBytes += current.Size
|
|
}
|
|
|
|
b.summary.Unlock()
|
|
|
|
if current == nil {
|
|
// error occurred, tell the status display to remove the line
|
|
select {
|
|
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
|
|
case <-b.closed:
|
|
}
|
|
return
|
|
}
|
|
|
|
switch current.Type {
|
|
case "file":
|
|
select {
|
|
case b.processedCh <- counter{Files: 1}:
|
|
case <-b.closed:
|
|
}
|
|
select {
|
|
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
|
|
case <-b.closed:
|
|
}
|
|
case "dir":
|
|
select {
|
|
case b.processedCh <- counter{Dirs: 1}:
|
|
case <-b.closed:
|
|
}
|
|
}
|
|
|
|
if current.Type == "dir" {
|
|
if previous == nil {
|
|
b.VV("new %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
|
|
b.summary.Lock()
|
|
b.summary.Dirs.New++
|
|
b.summary.Unlock()
|
|
return
|
|
}
|
|
|
|
if previous.Equals(*current) {
|
|
b.VV("unchanged %v", item)
|
|
b.summary.Lock()
|
|
b.summary.Dirs.Unchanged++
|
|
b.summary.Unlock()
|
|
} else {
|
|
b.VV("modified %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
|
|
b.summary.Lock()
|
|
b.summary.Dirs.Changed++
|
|
b.summary.Unlock()
|
|
}
|
|
|
|
} else if current.Type == "file" {
|
|
select {
|
|
case b.workerCh <- fileWorkerMessage{done: true, filename: item}:
|
|
case <-b.closed:
|
|
}
|
|
|
|
if previous == nil {
|
|
b.VV("new %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
|
|
b.summary.Lock()
|
|
b.summary.Files.New++
|
|
b.summary.Unlock()
|
|
return
|
|
}
|
|
|
|
if previous.Equals(*current) {
|
|
b.VV("unchanged %v", item)
|
|
b.summary.Lock()
|
|
b.summary.Files.Unchanged++
|
|
b.summary.Unlock()
|
|
} else {
|
|
b.VV("modified %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
|
|
b.summary.Lock()
|
|
b.summary.Files.Changed++
|
|
b.summary.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// ReportTotal sets the total stats up to now
|
|
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
|
|
select {
|
|
case b.totalCh <- counter{Files: s.Files, Dirs: s.Dirs, Bytes: s.Bytes}:
|
|
case <-b.closed:
|
|
}
|
|
|
|
if item == "" {
|
|
b.V("scan finished in %.3fs: %v files, %s",
|
|
time.Since(b.start).Seconds(),
|
|
s.Files, formatBytes(s.Bytes),
|
|
)
|
|
close(b.totalCh)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Finish prints the finishing messages.
|
|
func (b *Backup) Finish(snapshotID restic.ID) {
|
|
select {
|
|
case b.finished <- struct{}{}:
|
|
case <-b.closed:
|
|
}
|
|
|
|
b.P("\n")
|
|
b.P("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged)
|
|
b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", b.summary.Dirs.New, b.summary.Dirs.Changed, b.summary.Dirs.Unchanged)
|
|
b.V("Data Blobs: %5d new\n", b.summary.ItemStats.DataBlobs)
|
|
b.V("Tree Blobs: %5d new\n", b.summary.ItemStats.TreeBlobs)
|
|
b.P("Added to the repo: %-5s\n", formatBytes(b.summary.ItemStats.DataSize+b.summary.ItemStats.TreeSize))
|
|
b.P("\n")
|
|
b.P("processed %v files, %v in %s",
|
|
b.summary.Files.New+b.summary.Files.Changed+b.summary.Files.Unchanged,
|
|
formatBytes(b.summary.ProcessedBytes),
|
|
formatDuration(time.Since(b.start)),
|
|
)
|
|
}
|
|
|
|
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
|
|
// ArchiveProgressReporter interface.
|
|
func (b *Backup) SetMinUpdatePause(d time.Duration) {
|
|
b.MinUpdatePause = d
|
|
}
|