Refactor backup progress

Move the shared logic into the progress

Allows logic to be shared with forth coming restore status
This commit is contained in:
Dan Willoughby 2021-01-26 12:52:00 -07:00 committed by Michael Eischer
parent fa5ca8af81
commit 448419990c
4 changed files with 444 additions and 595 deletions

View file

@ -527,39 +527,17 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
return err
}
type ArchiveProgressReporter interface {
CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration)
StartFile(filename string)
CompleteBlob(filename string, bytes uint64)
ScannerError(item string, fi os.FileInfo, err error) error
ReportTotal(item string, s archiver.ScanStats)
SetMinUpdatePause(d time.Duration)
Run(ctx context.Context) error
Error(item string, fi os.FileInfo, err error) error
Finish(snapshotID restic.ID)
SetDryRun()
// ui.StdioWrapper
Stdout() io.WriteCloser
Stderr() io.WriteCloser
// ui.Message
E(msg string, args ...interface{})
P(msg string, args ...interface{})
V(msg string, args ...interface{})
VV(msg string, args ...interface{})
}
var p ArchiveProgressReporter
var progressPrinter ui.ProgressPrinter
if gopts.JSON {
p = json.NewBackup(term, gopts.verbosity)
progressPrinter = json.NewBackup(term, gopts.verbosity)
} else {
p = ui.NewBackup(term, gopts.verbosity)
progressPrinter = ui.NewBackup(term, gopts.verbosity)
}
progressReporter := ui.NewProgress(progressPrinter)
if opts.DryRun {
repo.SetDryRun()
p.SetDryRun()
progressPrinter.SetDryRun()
}
// use the terminal for stdout/stderr
@ -567,14 +545,14 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
defer func() {
gopts.stdout, gopts.stderr = prevStdout, prevStderr
}()
gopts.stdout, gopts.stderr = p.Stdout(), p.Stderr()
gopts.stdout, gopts.stderr = progressPrinter.Stdout(), progressPrinter.Stderr()
p.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet))
progressReporter.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet))
t.Go(func() error { return p.Run(t.Context(gopts.ctx)) })
t.Go(func() error { return progressReporter.Run(t.Context(gopts.ctx)) })
if !gopts.JSON {
p.V("lock repository")
progressPrinter.V("lock repository")
}
lock, err := lockRepo(gopts.ctx, repo)
defer unlockRepo(lock)
@ -595,7 +573,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
}
if !gopts.JSON {
p.V("load index files")
progressPrinter.V("load index files")
}
err = repo.LoadIndex(gopts.ctx)
if err != nil {
@ -609,9 +587,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
if !gopts.JSON {
if parentSnapshotID != nil {
p.P("using parent snapshot %v\n", parentSnapshotID.Str())
progressPrinter.P("using parent snapshot %v\n", parentSnapshotID.Str())
} else {
p.P("no parent snapshot found, will read all files\n")
progressPrinter.P("no parent snapshot found, will read all files\n")
}
}
@ -640,12 +618,12 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
}
errorHandler := func(item string, err error) error {
return p.Error(item, nil, err)
return progressReporter.Error(item, nil, err)
}
messageHandler := func(msg string, args ...interface{}) {
if !gopts.JSON {
p.P(msg, args...)
progressPrinter.P(msg, args...)
}
}
@ -655,7 +633,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
}
if opts.Stdin {
if !gopts.JSON {
p.V("read data from stdin")
progressPrinter.V("read data from stdin")
}
filename := path.Join("/", opts.StdinFilename)
targetFS = &fs.Reader{
@ -670,11 +648,11 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
sc := archiver.NewScanner(targetFS)
sc.SelectByName = selectByNameFilter
sc.Select = selectFilter
sc.Error = p.ScannerError
sc.Result = p.ReportTotal
sc.Error = progressReporter.ScannerError
sc.Result = progressReporter.ReportTotal
if !gopts.JSON {
p.V("start scan on %v", targets)
progressPrinter.V("start scan on %v", targets)
}
t.Go(func() error { return sc.Scan(t.Context(gopts.ctx), targets) })
@ -685,11 +663,11 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
success := true
arch.Error = func(item string, fi os.FileInfo, err error) error {
success = false
return p.Error(item, fi, err)
return progressReporter.Error(item, fi, err)
}
arch.CompleteItem = p.CompleteItem
arch.StartFile = p.StartFile
arch.CompleteBlob = p.CompleteBlob
arch.CompleteItem = progressReporter.CompleteItem
arch.StartFile = progressReporter.StartFile
arch.CompleteBlob = progressReporter.CompleteBlob
if opts.IgnoreInode {
// --ignore-inode implies --ignore-ctime: on FUSE, the ctime is not
@ -713,7 +691,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
}
if !gopts.JSON {
p.V("start backup on %v", targets)
progressPrinter.V("start backup on %v", targets)
}
_, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts)
@ -729,9 +707,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
}
// Report finished execution
p.Finish(id)
progressReporter.Finish(id)
if !gopts.JSON && !opts.DryRun {
p.P("snapshot %s saved\n", id.Str())
progressPrinter.P("snapshot %s saved\n", id.Str())
}
if !success {
return ErrInvalidSourceData

View file

@ -1,58 +1,23 @@
package ui
import (
"context"
"fmt"
"os"
"sort"
"sync"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/signals"
"github.com/restic/restic/internal/ui/termstatus"
)
type counter struct {
Files, Dirs uint
Bytes uint64
}
type fileWorkerMessage struct {
filename string
done bool
}
// Backup reports progress for the `backup` command.
type Backup struct {
*Message
*StdioWrapper
MinUpdatePause time.Duration
term *termstatus.Terminal
start time.Time
totalBytes uint64
dry bool // true if writes are faked
totalCh chan counter
processedCh chan counter
errCh chan struct{}
workerCh chan fileWorkerMessage
closed chan struct{}
summary struct {
sync.Mutex
Files, Dirs struct {
New uint
Changed uint
Unchanged uint
}
ProcessedBytes uint64
archiver.ItemStats
}
}
// NewBackup returns a new backup progress reporter.
@ -61,102 +26,16 @@ func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
Message: NewMessage(term, verbosity),
StdioWrapper: NewStdioWrapper(term),
term: term,
start: time.Now(),
// limit to 60fps by default
MinUpdatePause: time.Second / 60,
totalCh: make(chan counter),
processedCh: make(chan counter),
errCh: make(chan struct{}),
workerCh: make(chan fileWorkerMessage),
closed: make(chan struct{}),
}
}
// Run regularly updates the status lines. It should be called in a separate
// goroutine.
func (b *Backup) Run(ctx context.Context) error {
var (
lastUpdate time.Time
total, processed counter
errors uint
started bool
currentFiles = make(map[string]struct{})
secondsRemaining uint64
)
t := time.NewTicker(time.Second)
signalsCh := signals.GetProgressChannel()
defer t.Stop()
defer close(b.closed)
// Reset status when finished
defer func() {
if b.term.CanUpdateStatus() {
b.term.SetStatus([]string{""})
}
}()
for {
forceUpdate := false
select {
case <-ctx.Done():
return nil
case t, ok := <-b.totalCh:
if ok {
total = t
started = true
} else {
// scan has finished
b.totalCh = nil
b.totalBytes = total.Bytes
}
case s := <-b.processedCh:
processed.Files += s.Files
processed.Dirs += s.Dirs
processed.Bytes += s.Bytes
started = true
case <-b.errCh:
errors++
started = true
case m := <-b.workerCh:
if m.done {
delete(currentFiles, m.filename)
} else {
currentFiles[m.filename] = struct{}{}
}
case <-t.C:
if !started {
continue
}
if b.totalCh == nil {
secs := float64(time.Since(b.start) / time.Second)
todo := float64(total.Bytes - processed.Bytes)
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
}
case <-signalsCh:
forceUpdate = true
}
// limit update frequency
if !forceUpdate && (time.Since(lastUpdate) < b.MinUpdatePause || b.MinUpdatePause == 0) {
continue
}
lastUpdate = time.Now()
b.update(total, processed, errors, currentFiles, secondsRemaining)
}
}
// update updates the status lines.
func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) {
func (b *Backup) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) {
var status string
if total.Files == 0 && total.Dirs == 0 {
// no total count available yet
status = fmt.Sprintf("[%s] %v files, %s, %d errors",
formatDuration(time.Since(b.start)),
formatDuration(time.Since(start)),
processed.Files, formatBytes(processed.Bytes), errors,
)
} else {
@ -170,7 +49,7 @@ func (b *Backup) update(total, processed counter, errors uint, currentFiles map[
// include totals
status = fmt.Sprintf("[%s] %s%v files %s, total %v files %v, %d errors%s",
formatDuration(time.Since(b.start)),
formatDuration(time.Since(start)),
percent,
processed.Files,
formatBytes(processed.Bytes),
@ -201,29 +80,9 @@ func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error {
// Error is the error callback function for the archiver, it prints the error and returns nil.
func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
b.E("error: %v\n", err)
select {
case b.errCh <- struct{}{}:
case <-b.closed:
}
return nil
}
// StartFile is called when a file is being processed by a worker.
func (b *Backup) StartFile(filename string) {
select {
case b.workerCh <- fileWorkerMessage{filename: filename}:
case <-b.closed:
}
}
// CompleteBlob is called for all saved blobs for files.
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
select {
case b.processedCh <- counter{Bytes: bytes}:
case <-b.closed:
}
}
func formatPercent(numerator uint64, denominator uint64) string {
if denominator == 0 {
return ""
@ -273,138 +132,60 @@ func formatBytes(c uint64) string {
// CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
b.summary.Lock()
b.summary.ItemStats.Add(s)
// for the last item "/", current is nil
if current != nil {
b.summary.ProcessedBytes += current.Size
}
b.summary.Unlock()
if current == nil {
// error occurred, tell the status display to remove the line
select {
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-b.closed:
}
return
}
switch current.Type {
case "file":
select {
case b.processedCh <- counter{Files: 1}:
case <-b.closed:
}
select {
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-b.closed:
}
case "dir":
select {
case b.processedCh <- counter{Dirs: 1}:
case <-b.closed:
}
}
if current.Type == "dir" {
if previous == nil {
func (b *Backup) CompleteItem(messageType, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
switch messageType {
case "dir new":
b.VV("new %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
b.summary.Lock()
b.summary.Dirs.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
case "dir unchanged":
b.VV("unchanged %v", item)
b.summary.Lock()
b.summary.Dirs.Unchanged++
b.summary.Unlock()
} else {
case "dir modified":
b.VV("modified %v, saved in %.3fs (%v added, %v metadata)", item, d.Seconds(), formatBytes(s.DataSize), formatBytes(s.TreeSize))
b.summary.Lock()
b.summary.Dirs.Changed++
b.summary.Unlock()
}
} else if current.Type == "file" {
select {
case b.workerCh <- fileWorkerMessage{done: true, filename: item}:
case <-b.closed:
}
if previous == nil {
case "file new":
b.VV("new %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
b.summary.Lock()
b.summary.Files.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
case "file unchanged":
b.VV("unchanged %v", item)
b.summary.Lock()
b.summary.Files.Unchanged++
b.summary.Unlock()
} else {
case "file modified":
b.VV("modified %v, saved in %.3fs (%v added)", item, d.Seconds(), formatBytes(s.DataSize))
b.summary.Lock()
b.summary.Files.Changed++
b.summary.Unlock()
}
}
}
// ReportTotal sets the total stats up to now
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
select {
case b.totalCh <- counter{Files: s.Files, Dirs: s.Dirs, Bytes: s.Bytes}:
case <-b.closed:
}
func (b *Backup) ReportTotal(item string, start time.Time, s archiver.ScanStats) {
if item == "" {
b.V("scan finished in %.3fs: %v files, %s",
time.Since(b.start).Seconds(),
time.Since(start).Seconds(),
s.Files, formatBytes(s.Bytes),
)
close(b.totalCh)
return
}
}
// Reset status
func (b *Backup) Reset() {
if b.term.CanUpdateStatus() {
b.term.SetStatus([]string{""})
}
}
// Finish prints the finishing messages.
func (b *Backup) Finish(snapshotID restic.ID) {
// wait for the status update goroutine to shut down
<-b.closed
func (b *Backup) Finish(snapshotID restic.ID, start time.Time, summary *Summary) {
b.P("\n")
b.P("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged)
b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", b.summary.Dirs.New, b.summary.Dirs.Changed, b.summary.Dirs.Unchanged)
b.V("Data Blobs: %5d new\n", b.summary.ItemStats.DataBlobs)
b.V("Tree Blobs: %5d new\n", b.summary.ItemStats.TreeBlobs)
b.P("Files: %5d new, %5d changed, %5d unmodified\n", summary.Files.New, summary.Files.Changed, summary.Files.Unchanged)
b.P("Dirs: %5d new, %5d changed, %5d unmodified\n", summary.Dirs.New, summary.Dirs.Changed, summary.Dirs.Unchanged)
b.V("Data Blobs: %5d new\n", summary.ItemStats.DataBlobs)
b.V("Tree Blobs: %5d new\n", summary.ItemStats.TreeBlobs)
verb := "Added"
if b.dry {
verb = "Would add"
}
b.P("%s to the repo: %-5s\n", verb, formatBytes(b.summary.ItemStats.DataSize+b.summary.ItemStats.TreeSize))
b.P("%s to the repo: %-5s\n", verb, formatBytes(summary.ItemStats.DataSize+summary.ItemStats.TreeSize))
b.P("\n")
b.P("processed %v files, %v in %s",
b.summary.Files.New+b.summary.Files.Changed+b.summary.Files.Unchanged,
formatBytes(b.summary.ProcessedBytes),
formatDuration(time.Since(b.start)),
summary.Files.New+summary.Files.Changed+summary.Files.Unchanged,
formatBytes(summary.ProcessedBytes),
formatDuration(time.Since(start)),
)
}
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
// ArchiveProgressReporter interface.
func (b *Backup) SetMinUpdatePause(d time.Duration) {
b.MinUpdatePause = d
}
func (b *Backup) SetDryRun() {
b.dry = true
}

View file

@ -2,11 +2,9 @@ package json
import (
"bytes"
"context"
"encoding/json"
"os"
"sort"
"sync"
"time"
"github.com/restic/restic/internal/archiver"
@ -15,46 +13,14 @@ import (
"github.com/restic/restic/internal/ui/termstatus"
)
type counter struct {
Files, Dirs, Bytes uint64
}
type fileWorkerMessage struct {
filename string
done bool
}
// Backup reports progress for the `backup` command in JSON.
type Backup struct {
*ui.Message
*ui.StdioWrapper
MinUpdatePause time.Duration
term *termstatus.Terminal
v uint
start time.Time
dry bool
totalBytes uint64
totalCh chan counter
processedCh chan counter
errCh chan struct{}
workerCh chan fileWorkerMessage
finished chan struct{}
closed chan struct{}
summary struct {
sync.Mutex
Files, Dirs struct {
New uint
Changed uint
Unchanged uint
}
ProcessedBytes uint64
archiver.ItemStats
}
}
// NewBackup returns a new backup progress reporter.
@ -64,17 +30,6 @@ func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
StdioWrapper: ui.NewStdioWrapper(term),
term: term,
v: verbosity,
start: time.Now(),
// limit to 60fps by default
MinUpdatePause: time.Second / 60,
totalCh: make(chan counter),
processedCh: make(chan counter),
errCh: make(chan struct{}),
workerCh: make(chan fileWorkerMessage),
finished: make(chan struct{}),
closed: make(chan struct{}),
}
}
@ -95,78 +50,11 @@ func (b *Backup) error(status interface{}) {
b.term.Error(toJSONString(status))
}
// Run regularly updates the status lines. It should be called in a separate
// goroutine.
func (b *Backup) Run(ctx context.Context) error {
var (
lastUpdate time.Time
total, processed counter
errors uint
started bool
currentFiles = make(map[string]struct{})
secondsRemaining uint64
)
t := time.NewTicker(time.Second)
defer t.Stop()
defer close(b.closed)
for {
select {
case <-ctx.Done():
return nil
case <-b.finished:
started = false
case t, ok := <-b.totalCh:
if ok {
total = t
started = true
} else {
// scan has finished
b.totalCh = nil
b.totalBytes = total.Bytes
}
case s := <-b.processedCh:
processed.Files += s.Files
processed.Dirs += s.Dirs
processed.Bytes += s.Bytes
started = true
case <-b.errCh:
errors++
started = true
case m := <-b.workerCh:
if m.done {
delete(currentFiles, m.filename)
} else {
currentFiles[m.filename] = struct{}{}
}
case <-t.C:
if !started {
continue
}
if b.totalCh == nil {
secs := float64(time.Since(b.start) / time.Second)
todo := float64(total.Bytes - processed.Bytes)
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
}
}
// limit update frequency
if time.Since(lastUpdate) < b.MinUpdatePause {
continue
}
lastUpdate = time.Now()
b.update(total, processed, errors, currentFiles, secondsRemaining)
}
}
// update updates the status lines.
func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) {
func (b *Backup) Update(total, processed ui.Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) {
status := statusUpdate{
MessageType: "status",
SecondsElapsed: uint64(time.Since(b.start) / time.Second),
SecondsElapsed: uint64(time.Since(start) / time.Second),
SecondsRemaining: secs,
TotalFiles: total.Files,
FilesDone: processed.Files,
@ -207,67 +95,18 @@ func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
During: "archival",
Item: item,
})
select {
case b.errCh <- struct{}{}:
case <-b.closed:
}
return nil
}
// StartFile is called when a file is being processed by a worker.
func (b *Backup) StartFile(filename string) {
select {
case b.workerCh <- fileWorkerMessage{filename: filename}:
case <-b.closed:
}
}
// CompleteBlob is called for all saved blobs for files.
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
select {
case b.processedCh <- counter{Bytes: bytes}:
case <-b.closed:
}
}
// CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
b.summary.Lock()
b.summary.ItemStats.Add(s)
b.summary.Unlock()
if current == nil {
// error occurred, tell the status display to remove the line
select {
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-b.closed:
}
func (b *Backup) CompleteItem(messageType, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
if b.v < 2 {
return
}
b.summary.ProcessedBytes += current.Size
switch current.Type {
case "file":
select {
case b.processedCh <- counter{Files: 1}:
case <-b.closed:
}
select {
case b.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-b.closed:
}
case "dir":
select {
case b.processedCh <- counter{Dirs: 1}:
case <-b.closed:
}
}
if current.Type == "dir" {
if previous == nil {
if b.v >= 3 {
switch messageType {
case "dir new":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "new",
@ -276,26 +115,13 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
DataSize: s.DataSize,
MetadataSize: s.TreeSize,
})
}
b.summary.Lock()
b.summary.Dirs.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
if b.v >= 3 {
case "dir unchanged":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "unchanged",
Item: item,
})
}
b.summary.Lock()
b.summary.Dirs.Unchanged++
b.summary.Unlock()
} else {
if b.v >= 3 {
case "dir modified":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "modified",
@ -304,20 +130,7 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
DataSize: s.DataSize,
MetadataSize: s.TreeSize,
})
}
b.summary.Lock()
b.summary.Dirs.Changed++
b.summary.Unlock()
}
} else if current.Type == "file" {
select {
case b.workerCh <- fileWorkerMessage{done: true, filename: item}:
case <-b.closed:
}
if previous == nil {
if b.v >= 3 {
case "file new":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "new",
@ -325,26 +138,13 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
Duration: d.Seconds(),
DataSize: s.DataSize,
})
}
b.summary.Lock()
b.summary.Files.New++
b.summary.Unlock()
return
}
if previous.Equals(*current) {
if b.v >= 3 {
case "file unchanged":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "unchanged",
Item: item,
})
}
b.summary.Lock()
b.summary.Files.Unchanged++
b.summary.Unlock()
} else {
if b.v >= 3 {
case "file modified":
b.print(verboseUpdate{
MessageType: "verbose_status",
Action: "modified",
@ -353,65 +153,34 @@ func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s arc
DataSize: s.DataSize,
})
}
b.summary.Lock()
b.summary.Files.Changed++
b.summary.Unlock()
}
}
}
// ReportTotal sets the total stats up to now
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
select {
case b.totalCh <- counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}:
case <-b.closed:
}
func (b *Backup) ReportTotal(item string, start time.Time, s archiver.ScanStats) {
if item == "" {
if b.v >= 2 {
b.print(verboseUpdate{
MessageType: "status",
Action: "scan_finished",
Duration: time.Since(b.start).Seconds(),
Duration: time.Since(start).Seconds(),
DataSize: s.Bytes,
TotalFiles: s.Files,
})
}
close(b.totalCh)
return
}
}
// Finish prints the finishing messages.
func (b *Backup) Finish(snapshotID restic.ID) {
select {
case b.finished <- struct{}{}:
case <-b.closed:
}
func (b *Backup) Finish(snapshotID restic.ID, start time.Time, summary *ui.Summary) {
b.print(summaryOutput{
MessageType: "summary",
FilesNew: b.summary.Files.New,
FilesChanged: b.summary.Files.Changed,
FilesUnmodified: b.summary.Files.Unchanged,
DirsNew: b.summary.Dirs.New,
DirsChanged: b.summary.Dirs.Changed,
DirsUnmodified: b.summary.Dirs.Unchanged,
DataBlobs: b.summary.ItemStats.DataBlobs,
TreeBlobs: b.summary.ItemStats.TreeBlobs,
DataAdded: b.summary.ItemStats.DataSize + b.summary.ItemStats.TreeSize,
TotalFilesProcessed: b.summary.Files.New + b.summary.Files.Changed + b.summary.Files.Unchanged,
TotalBytesProcessed: b.summary.ProcessedBytes,
TotalDuration: time.Since(b.start).Seconds(),
SnapshotID: snapshotID.Str(),
DryRun: b.dry,
})
}
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
// ArchiveProgressReporter interface.
func (b *Backup) SetMinUpdatePause(d time.Duration) {
b.MinUpdatePause = d
// Reset no-op
func (b *Backup) Reset() {
}
// SetDryRun marks the backup as a "dry run".

321
internal/ui/progress.go Normal file
View file

@ -0,0 +1,321 @@
package ui
import (
"context"
"io"
"os"
"sync"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/signals"
)
type ProgressPrinter interface {
Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64)
Error(item string, fi os.FileInfo, err error) error
ScannerError(item string, fi os.FileInfo, err error) error
CompleteItem(messageType string, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration)
ReportTotal(item string, start time.Time, s archiver.ScanStats)
Finish(snapshotID restic.ID, start time.Time, summary *Summary)
Reset()
SetDryRun()
// ui.StdioWrapper
Stdout() io.WriteCloser
Stderr() io.WriteCloser
E(msg string, args ...interface{})
P(msg string, args ...interface{})
V(msg string, args ...interface{})
VV(msg string, args ...interface{})
}
type Counter struct {
Files, Dirs, Bytes uint64
}
type fileWorkerMessage struct {
filename string
done bool
}
type ProgressReporter interface {
CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration)
StartFile(filename string)
CompleteBlob(filename string, bytes uint64)
ScannerError(item string, fi os.FileInfo, err error) error
ReportTotal(item string, s archiver.ScanStats)
SetMinUpdatePause(d time.Duration)
Run(ctx context.Context) error
Error(item string, fi os.FileInfo, err error) error
Finish(snapshotID restic.ID)
}
type Summary struct {
sync.Mutex
Files, Dirs struct {
New uint
Changed uint
Unchanged uint
}
ProcessedBytes uint64
TotalErrors uint
archiver.ItemStats
}
type Progress struct {
MinUpdatePause time.Duration
start time.Time
totalBytes uint64
totalCh chan Counter
processedCh chan Counter
errCh chan struct{}
workerCh chan fileWorkerMessage
closed chan struct{}
summary *Summary
printer ProgressPrinter
}
func NewProgress(printer ProgressPrinter) *Progress {
return &Progress{
// limit to 60fps by default
MinUpdatePause: time.Second / 60,
start: time.Now(),
totalCh: make(chan Counter),
processedCh: make(chan Counter),
errCh: make(chan struct{}),
workerCh: make(chan fileWorkerMessage),
closed: make(chan struct{}),
summary: &Summary{},
printer: printer,
}
}
func (p *Progress) Run(ctx context.Context) error {
var (
lastUpdate time.Time
total, processed Counter
errors uint
started bool
currentFiles = make(map[string]struct{})
secondsRemaining uint64
)
t := time.NewTicker(time.Second)
signalsCh := signals.GetProgressChannel()
defer t.Stop()
defer close(p.closed)
// Reset status when finished
defer p.printer.Reset()
for {
forceUpdate := false
select {
case <-ctx.Done():
return nil
case t, ok := <-p.totalCh:
if ok {
total = t
started = true
} else {
// scan has finished
p.totalCh = nil
p.totalBytes = total.Bytes
}
case s := <-p.processedCh:
processed.Files += s.Files
processed.Dirs += s.Dirs
processed.Bytes += s.Bytes
started = true
case <-p.errCh:
errors++
p.summary.Lock()
p.summary.TotalErrors = errors
p.summary.Unlock()
started = true
case m := <-p.workerCh:
if m.done {
delete(currentFiles, m.filename)
} else {
currentFiles[m.filename] = struct{}{}
}
case <-t.C:
if !started {
continue
}
if p.totalCh == nil {
secs := float64(time.Since(p.start) / time.Second)
todo := float64(total.Bytes - processed.Bytes)
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
}
case <-signalsCh:
forceUpdate = true
}
// limit update frequency
if !forceUpdate && (time.Since(lastUpdate) < p.MinUpdatePause || p.MinUpdatePause == 0) {
continue
}
lastUpdate = time.Now()
p.printer.Update(total, processed, errors, currentFiles, p.start, secondsRemaining)
}
}
// ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil.
func (p *Progress) ScannerError(item string, fi os.FileInfo, err error) error {
return p.printer.ScannerError(item, fi, err)
}
// Error is the error callback function for the archiver, it prints the error and returns nil.
func (p *Progress) Error(item string, fi os.FileInfo, err error) error {
cbErr := p.printer.Error(item, fi, err)
select {
case p.errCh <- struct{}{}:
case <-p.closed:
}
return cbErr
}
// StartFile is called when a file is being processed by a worker.
func (p *Progress) StartFile(filename string) {
select {
case p.workerCh <- fileWorkerMessage{filename: filename}:
case <-p.closed:
}
}
// CompleteBlob is called for all saved blobs for files.
func (p *Progress) CompleteBlob(filename string, bytes uint64) {
select {
case p.processedCh <- Counter{Bytes: bytes}:
case <-p.closed:
}
}
// CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully.
func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
p.summary.Lock()
p.summary.ItemStats.Add(s)
// for the last item "/", current is nil
if current != nil {
p.summary.ProcessedBytes += current.Size
}
p.summary.Unlock()
if current == nil {
// error occurred, tell the status display to remove the line
select {
case p.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-p.closed:
}
return
}
switch current.Type {
case "file":
select {
case p.processedCh <- Counter{Files: 1}:
case <-p.closed:
}
select {
case p.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-p.closed:
}
case "dir":
select {
case p.processedCh <- Counter{Dirs: 1}:
case <-p.closed:
}
}
if current.Type == "dir" {
if previous == nil {
p.printer.CompleteItem("dir new", item, previous, current, s, d)
p.summary.Lock()
p.summary.Dirs.New++
p.summary.Unlock()
return
}
if previous.Equals(*current) {
p.printer.CompleteItem("dir unchanged", item, previous, current, s, d)
p.summary.Lock()
p.summary.Dirs.Unchanged++
p.summary.Unlock()
} else {
p.printer.CompleteItem("dir modified", item, previous, current, s, d)
p.summary.Lock()
p.summary.Dirs.Changed++
p.summary.Unlock()
}
} else if current.Type == "file" {
select {
case p.workerCh <- fileWorkerMessage{done: true, filename: item}:
case <-p.closed:
}
if previous == nil {
p.printer.CompleteItem("file new", item, previous, current, s, d)
p.summary.Lock()
p.summary.Files.New++
p.summary.Unlock()
return
}
if previous.Equals(*current) {
p.printer.CompleteItem("file unchanged", item, previous, current, s, d)
p.summary.Lock()
p.summary.Files.Unchanged++
p.summary.Unlock()
} else {
p.printer.CompleteItem("file modified", item, previous, current, s, d)
p.summary.Lock()
p.summary.Files.Changed++
p.summary.Unlock()
}
}
}
// ReportTotal sets the total stats up to now
func (p *Progress) ReportTotal(item string, s archiver.ScanStats) {
select {
case p.totalCh <- Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}:
case <-p.closed:
}
if item == "" {
p.printer.ReportTotal(item, p.start, s)
close(p.totalCh)
return
}
}
// Finish prints the finishing messages.
func (p *Progress) Finish(snapshotID restic.ID) {
<-p.closed
summary := p.summary
p.printer.Finish(snapshotID, p.start, summary)
}
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
// ArchiveProgressReporter interface.
func (p *Progress) SetMinUpdatePause(d time.Duration) {
p.MinUpdatePause = d
}