forked from TrueCloudLab/restic
8d0ba55ecd
When the backup is interrupted for some reason while the scanner is still active this could lead to a deadlock. Interruptions are triggered by canceling the context object used by both the backup progress UI and the scanner. It is possible that a context is canceled between the respective check in the scanner and it calling the `ReportTotal` method of the UI. The latter method sends a message to the UI goroutine. However, a canceled context will also stop that goroutine, which can cause the channel send operation to block indefinitely. This is resolved by adding a `closed` channel which is closed once the UI goroutine is stopped and serves as an escape hatch for reported UI updates. This change covers not just the ReportTotal method but all potentially affected methods of the progress UI implementation.
457 lines
11 KiB
Go
457 lines
11 KiB
Go
package json
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"os"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/restic/restic/internal/archiver"
|
|
"github.com/restic/restic/internal/restic"
|
|
"github.com/restic/restic/internal/ui"
|
|
"github.com/restic/restic/internal/ui/termstatus"
|
|
)
|
|
|
|
type counter struct {
|
|
Files, Dirs, Bytes uint64
|
|
}
|
|
|
|
type fileWorkerMessage struct {
|
|
filename string
|
|
done bool
|
|
}
|
|
|
|
// Backup reports progress for the `backup` command in JSON.
|
|
type Backup struct {
|
|
*ui.Message
|
|
*ui.StdioWrapper
|
|
|
|
MinUpdatePause time.Duration
|
|
|
|
term *termstatus.Terminal
|
|
v uint
|
|
start time.Time
|
|
|
|
totalBytes uint64
|
|
|
|
totalCh chan counter
|
|
processedCh chan counter
|
|
errCh chan struct{}
|
|
workerCh chan fileWorkerMessage
|
|
finished chan struct{}
|
|
closed chan struct{}
|
|
|
|
summary struct {
|
|
sync.Mutex
|
|
Files, Dirs struct {
|
|
New uint
|
|
Changed uint
|
|
Unchanged uint
|
|
}
|
|
ProcessedBytes uint64
|
|
archiver.ItemStats
|
|
}
|
|
}
|
|
|
|
// NewBackup returns a new backup progress reporter.
|
|
func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
|
|
return &Backup{
|
|
Message: ui.NewMessage(term, verbosity),
|
|
StdioWrapper: ui.NewStdioWrapper(term),
|
|
term: term,
|
|
v: verbosity,
|
|
start: time.Now(),
|
|
|
|
// limit to 60fps by default
|
|
MinUpdatePause: time.Second / 60,
|
|
|
|
totalCh: make(chan counter),
|
|
processedCh: make(chan counter),
|
|
errCh: make(chan struct{}),
|
|
workerCh: make(chan fileWorkerMessage),
|
|
finished: make(chan struct{}),
|
|
closed: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func toJSONString(status interface{}) string {
|
|
buf := new(bytes.Buffer)
|
|
json.NewEncoder(buf).Encode(status)
|
|
return buf.String()
|
|
}
|
|
|
|
func (b *Backup) print(status interface{}) {
|
|
b.term.Print(toJSONString(status))
|
|
}
|
|
|
|
func (b *Backup) error(status interface{}) {
|
|
b.term.Error(toJSONString(status))
|
|
}
|
|
|
|
// Run regularly updates the status lines. It should be called in a separate
|
|
// goroutine.
|
|
func (b *Backup) Run(ctx context.Context) error {
|
|
var (
|
|
lastUpdate time.Time
|
|
total, processed counter
|
|
errors uint
|
|
started bool
|
|
currentFiles = make(map[string]struct{})
|
|
secondsRemaining uint64
|
|
)
|
|
|
|
t := time.NewTicker(time.Second)
|
|
defer t.Stop()
|
|
defer close(b.closed)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-b.finished:
|
|
started = false
|
|
case t, ok := <-b.totalCh:
|
|
if ok {
|
|
total = t
|
|
started = true
|
|
} else {
|
|
// scan has finished
|
|
b.totalCh = nil
|
|
b.totalBytes = total.Bytes
|
|
}
|
|
case s := <-b.processedCh:
|
|
processed.Files += s.Files
|
|
processed.Dirs += s.Dirs
|
|
processed.Bytes += s.Bytes
|
|
started = true
|
|
case <-b.errCh:
|
|
errors++
|
|
started = true
|
|
case m := <-b.workerCh:
|
|
if m.done {
|
|
delete(currentFiles, m.filename)
|
|
} else {
|
|
currentFiles[m.filename] = struct{}{}
|
|
}
|
|
case <-t.C:
|
|
if !started {
|
|
continue
|
|
}
|
|
|
|
if b.totalCh == nil {
|
|
secs := float64(time.Since(b.start) / time.Second)
|
|
todo := float64(total.Bytes - processed.Bytes)
|
|
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
|
|
}
|
|
}
|
|
|
|
// limit update frequency
|
|
if time.Since(lastUpdate) < b.MinUpdatePause {
|
|
continue
|
|
}
|
|
lastUpdate = time.Now()
|
|
|
|
b.update(total, processed, errors, currentFiles, secondsRemaining)
|
|
}
|
|
}
|
|
|
|
// update updates the status lines.
|
|
func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) {
|
|
status := statusUpdate{
|
|
MessageType: "status",
|
|
SecondsElapsed: uint64(time.Since(b.start) / time.Second),
|
|
SecondsRemaining: secs,
|
|
TotalFiles: total.Files,
|
|
FilesDone: processed.Files,
|
|
TotalBytes: total.Bytes,
|
|
BytesDone: processed.Bytes,
|
|
ErrorCount: errors,
|
|
}
|
|
|
|
if total.Bytes > 0 {
|
|
status.PercentDone = float64(processed.Bytes) / float64(total.Bytes)
|
|
}
|
|
|
|
for filename := range currentFiles {
|
|
status.CurrentFiles = append(status.CurrentFiles, filename)
|
|
}
|
|
sort.Strings(status.CurrentFiles)
|
|
|
|
b.print(status)
|
|
}
|
|
|
|
// ScannerError is the error callback function for the scanner, it prints the
|
|
// error in verbose mode and returns nil.
|
|
func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error {
|
|
b.error(errorUpdate{
|
|
MessageType: "error",
|
|
Error: err,
|
|
During: "scan",
|
|
Item: item,
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// Error is the error callback function for the archiver, it prints the error and returns nil.
|
|
func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
|
|
b.error(errorUpdate{
|
|
MessageType: "error",
|
|
Error: err,
|
|
During: "archival",
|
|
Item: item,
|
|
})
|
|
select {
|
|
case b.errCh <- struct{}{}:
|
|
case <-b.closed:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StartFile is called when a file is being processed by a worker.
|
|
func (b *Backup) StartFile(filename string) {
|
|
select {
|
|
case b.workerCh <- fileWorkerMessage{filename: filename}:
|
|
case <-b.closed:
|
|
}
|
|
}
|
|
|
|
// CompleteBlob is called for all saved blobs for files.
|
|
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
|
|
select {
|
|
case b.processedCh <- counter{Bytes: bytes}:
|
|
case <-b.closed:
|
|
}
|
|
}
|
|
|
|
// CompleteItem is the status callback function for the archiver when a
|
|
// file/dir has been saved successfully.
|
|
func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
|
|
b.summary.Lock()
|
|
b.summary.ItemStats.Add(s)
|
|
b.summary.Unlock()
|
|
|
|
if current == nil {
|
|
// error occurred, tell the status display to remove the line
|
|
select {
|
|
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
|
|
case <-b.closed:
|
|
}
|
|
return
|
|
}
|
|
|
|
b.summary.ProcessedBytes += current.Size
|
|
|
|
switch current.Type {
|
|
case "file":
|
|
select {
|
|
case b.processedCh <- counter{Files: 1}:
|
|
case <-b.closed:
|
|
}
|
|
select {
|
|
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
|
|
case <-b.closed:
|
|
}
|
|
case "dir":
|
|
select {
|
|
case b.processedCh <- counter{Dirs: 1}:
|
|
case <-b.closed:
|
|
}
|
|
}
|
|
|
|
if current.Type == "dir" {
|
|
if previous == nil {
|
|
if b.v >= 3 {
|
|
b.print(verboseUpdate{
|
|
MessageType: "verbose_status",
|
|
Action: "new",
|
|
Item: item,
|
|
Duration: d.Seconds(),
|
|
DataSize: s.DataSize,
|
|
MetadataSize: s.TreeSize,
|
|
})
|
|
}
|
|
b.summary.Lock()
|
|
b.summary.Dirs.New++
|
|
b.summary.Unlock()
|
|
return
|
|
}
|
|
|
|
if previous.Equals(*current) {
|
|
if b.v >= 3 {
|
|
b.print(verboseUpdate{
|
|
MessageType: "verbose_status",
|
|
Action: "unchanged",
|
|
Item: item,
|
|
})
|
|
}
|
|
b.summary.Lock()
|
|
b.summary.Dirs.Unchanged++
|
|
b.summary.Unlock()
|
|
} else {
|
|
if b.v >= 3 {
|
|
b.print(verboseUpdate{
|
|
MessageType: "verbose_status",
|
|
Action: "modified",
|
|
Item: item,
|
|
Duration: d.Seconds(),
|
|
DataSize: s.DataSize,
|
|
MetadataSize: s.TreeSize,
|
|
})
|
|
}
|
|
b.summary.Lock()
|
|
b.summary.Dirs.Changed++
|
|
b.summary.Unlock()
|
|
}
|
|
|
|
} else if current.Type == "file" {
|
|
select {
|
|
case b.workerCh <- fileWorkerMessage{done: true, filename: item}:
|
|
case <-b.closed:
|
|
}
|
|
|
|
if previous == nil {
|
|
if b.v >= 3 {
|
|
b.print(verboseUpdate{
|
|
MessageType: "verbose_status",
|
|
Action: "new",
|
|
Item: item,
|
|
Duration: d.Seconds(),
|
|
DataSize: s.DataSize,
|
|
})
|
|
}
|
|
b.summary.Lock()
|
|
b.summary.Files.New++
|
|
b.summary.Unlock()
|
|
return
|
|
}
|
|
|
|
if previous.Equals(*current) {
|
|
if b.v >= 3 {
|
|
b.print(verboseUpdate{
|
|
MessageType: "verbose_status",
|
|
Action: "unchanged",
|
|
Item: item,
|
|
})
|
|
}
|
|
b.summary.Lock()
|
|
b.summary.Files.Unchanged++
|
|
b.summary.Unlock()
|
|
} else {
|
|
if b.v >= 3 {
|
|
b.print(verboseUpdate{
|
|
MessageType: "verbose_status",
|
|
Action: "modified",
|
|
Item: item,
|
|
Duration: d.Seconds(),
|
|
DataSize: s.DataSize,
|
|
})
|
|
}
|
|
b.summary.Lock()
|
|
b.summary.Files.Changed++
|
|
b.summary.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// ReportTotal sets the total stats up to now
|
|
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
|
|
select {
|
|
case b.totalCh <- counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}:
|
|
case <-b.closed:
|
|
}
|
|
|
|
if item == "" {
|
|
if b.v >= 2 {
|
|
b.print(verboseUpdate{
|
|
MessageType: "status",
|
|
Action: "scan_finished",
|
|
Duration: time.Since(b.start).Seconds(),
|
|
DataSize: s.Bytes,
|
|
TotalFiles: s.Files,
|
|
})
|
|
}
|
|
close(b.totalCh)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Finish prints the finishing messages.
|
|
func (b *Backup) Finish(snapshotID restic.ID) {
|
|
select {
|
|
case b.finished <- struct{}{}:
|
|
case <-b.closed:
|
|
}
|
|
|
|
b.print(summaryOutput{
|
|
MessageType: "summary",
|
|
FilesNew: b.summary.Files.New,
|
|
FilesChanged: b.summary.Files.Changed,
|
|
FilesUnmodified: b.summary.Files.Unchanged,
|
|
DirsNew: b.summary.Dirs.New,
|
|
DirsChanged: b.summary.Dirs.Changed,
|
|
DirsUnmodified: b.summary.Dirs.Unchanged,
|
|
DataBlobs: b.summary.ItemStats.DataBlobs,
|
|
TreeBlobs: b.summary.ItemStats.TreeBlobs,
|
|
DataAdded: b.summary.ItemStats.DataSize + b.summary.ItemStats.TreeSize,
|
|
TotalFilesProcessed: b.summary.Files.New + b.summary.Files.Changed + b.summary.Files.Unchanged,
|
|
TotalBytesProcessed: b.summary.ProcessedBytes,
|
|
TotalDuration: time.Since(b.start).Seconds(),
|
|
SnapshotID: snapshotID.Str(),
|
|
})
|
|
}
|
|
|
|
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
|
|
// ArchiveProgressReporter interface.
|
|
func (b *Backup) SetMinUpdatePause(d time.Duration) {
|
|
b.MinUpdatePause = d
|
|
}
|
|
|
|
type statusUpdate struct {
|
|
MessageType string `json:"message_type"` // "status"
|
|
SecondsElapsed uint64 `json:"seconds_elapsed,omitempty"`
|
|
SecondsRemaining uint64 `json:"seconds_remaining,omitempty"`
|
|
PercentDone float64 `json:"percent_done"`
|
|
TotalFiles uint64 `json:"total_files,omitempty"`
|
|
FilesDone uint64 `json:"files_done,omitempty"`
|
|
TotalBytes uint64 `json:"total_bytes,omitempty"`
|
|
BytesDone uint64 `json:"bytes_done,omitempty"`
|
|
ErrorCount uint `json:"error_count,omitempty"`
|
|
CurrentFiles []string `json:"current_files,omitempty"`
|
|
}
|
|
|
|
type errorUpdate struct {
|
|
MessageType string `json:"message_type"` // "error"
|
|
Error error `json:"error"`
|
|
During string `json:"during"`
|
|
Item string `json:"item"`
|
|
}
|
|
|
|
type verboseUpdate struct {
|
|
MessageType string `json:"message_type"` // "verbose_status"
|
|
Action string `json:"action"`
|
|
Item string `json:"item"`
|
|
Duration float64 `json:"duration"` // in seconds
|
|
DataSize uint64 `json:"data_size"`
|
|
MetadataSize uint64 `json:"metadata_size"`
|
|
TotalFiles uint `json:"total_files"`
|
|
}
|
|
|
|
type summaryOutput struct {
|
|
MessageType string `json:"message_type"` // "summary"
|
|
FilesNew uint `json:"files_new"`
|
|
FilesChanged uint `json:"files_changed"`
|
|
FilesUnmodified uint `json:"files_unmodified"`
|
|
DirsNew uint `json:"dirs_new"`
|
|
DirsChanged uint `json:"dirs_changed"`
|
|
DirsUnmodified uint `json:"dirs_unmodified"`
|
|
DataBlobs int `json:"data_blobs"`
|
|
TreeBlobs int `json:"tree_blobs"`
|
|
DataAdded uint64 `json:"data_added"`
|
|
TotalFilesProcessed uint `json:"total_files_processed"`
|
|
TotalBytesProcessed uint64 `json:"total_bytes_processed"`
|
|
TotalDuration float64 `json:"total_duration"` // in seconds
|
|
SnapshotID string `json:"snapshot_id"`
|
|
}
|