backup: Clean up progress reporting code

This commit is contained in:
greatroar 2022-10-16 12:01:27 +02:00
parent 964977677f
commit 201e5c7e74
2 changed files with 22 additions and 54 deletions

View file

@ -555,11 +555,11 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
} else { } else {
progressPrinter = backup.NewTextProgress(term, gopts.verbosity) progressPrinter = backup.NewTextProgress(term, gopts.verbosity)
} }
progressReporter := backup.NewProgress(progressPrinter) progressReporter := backup.NewProgress(progressPrinter,
calculateProgressInterval(!gopts.Quiet, gopts.JSON))
if opts.DryRun { if opts.DryRun {
repo.SetDryRun() repo.SetDryRun()
progressReporter.SetDryRun()
} }
// use the terminal for stdout/stderr // use the terminal for stdout/stderr
@ -569,12 +569,10 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
}() }()
gopts.stdout, gopts.stderr = progressPrinter.Stdout(), progressPrinter.Stderr() gopts.stdout, gopts.stderr = progressPrinter.Stdout(), progressPrinter.Stderr()
progressReporter.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet, gopts.JSON))
wg, wgCtx := errgroup.WithContext(ctx) wg, wgCtx := errgroup.WithContext(ctx)
cancelCtx, cancel := context.WithCancel(wgCtx) cancelCtx, cancel := context.WithCancel(wgCtx)
defer cancel() defer cancel()
wg.Go(func() error { return progressReporter.Run(cancelCtx) }) wg.Go(func() error { progressReporter.Run(cancelCtx); return nil })
if !gopts.JSON { if !gopts.JSON {
progressPrinter.V("lock repository") progressPrinter.V("lock repository")
@ -676,7 +674,7 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
sc := archiver.NewScanner(targetFS) sc := archiver.NewScanner(targetFS)
sc.SelectByName = selectByNameFilter sc.SelectByName = selectByNameFilter
sc.Select = selectFilter sc.Select = selectFilter
sc.Error = progressReporter.ScannerError sc.Error = progressPrinter.ScannerError
sc.Result = progressReporter.ReportTotal sc.Result = progressReporter.ReportTotal
if !gopts.JSON { if !gopts.JSON {
@ -731,7 +729,7 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
} }
// Report finished execution // Report finished execution
progressReporter.Finish(id) progressReporter.Finish(id, opts.DryRun)
if !gopts.JSON && !opts.DryRun { if !gopts.JSON && !opts.DryRun {
progressPrinter.P("snapshot %s saved\n", id.Str()) progressPrinter.P("snapshot %s saved\n", id.Str())
} }

View file

@ -24,10 +24,8 @@ type ProgressPrinter interface {
Stdout() io.WriteCloser Stdout() io.WriteCloser
Stderr() io.WriteCloser Stderr() io.WriteCloser
E(msg string, args ...interface{})
P(msg string, args ...interface{}) P(msg string, args ...interface{})
V(msg string, args ...interface{}) V(msg string, args ...interface{})
VV(msg string, args ...interface{})
} }
type Counter struct { type Counter struct {
@ -52,10 +50,8 @@ type Summary struct {
// Progress reports progress for the `backup` command. // Progress reports progress for the `backup` command.
type Progress struct { type Progress struct {
MinUpdatePause time.Duration interval time.Duration
start time.Time
start time.Time
dry bool
totalCh chan Counter totalCh chan Counter
processedCh chan Counter processedCh chan Counter
@ -63,15 +59,14 @@ type Progress struct {
workerCh chan fileWorkerMessage workerCh chan fileWorkerMessage
closed chan struct{} closed chan struct{}
summary *Summary summary Summary
printer ProgressPrinter printer ProgressPrinter
} }
func NewProgress(printer ProgressPrinter) *Progress { func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress {
return &Progress{ return &Progress{
// limit to 60fps by default interval: interval,
MinUpdatePause: time.Second / 60, start: time.Now(),
start: time.Now(),
// use buffered channels for the information used to update the status // use buffered channels for the information used to update the status
// the shutdown of the `Run()` method is somewhat racy, but won't affect // the shutdown of the `Run()` method is somewhat racy, but won't affect
@ -82,15 +77,13 @@ func NewProgress(printer ProgressPrinter) *Progress {
workerCh: make(chan fileWorkerMessage, 100), workerCh: make(chan fileWorkerMessage, 100),
closed: make(chan struct{}), closed: make(chan struct{}),
summary: &Summary{},
printer: printer, printer: printer,
} }
} }
// Run regularly updates the status lines. It should be called in a separate // Run regularly updates the status lines. It should be called in a separate
// goroutine. // goroutine.
func (p *Progress) Run(ctx context.Context) error { func (p *Progress) Run(ctx context.Context) {
var ( var (
lastUpdate time.Time lastUpdate time.Time
total, processed Counter total, processed Counter
@ -111,7 +104,7 @@ func (p *Progress) Run(ctx context.Context) error {
forceUpdate := false forceUpdate := false
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return
case t, ok := <-p.totalCh: case t, ok := <-p.totalCh:
if ok { if ok {
total = t total = t
@ -149,7 +142,7 @@ func (p *Progress) Run(ctx context.Context) error {
} }
// limit update frequency // limit update frequency
if !forceUpdate && (time.Since(lastUpdate) < p.MinUpdatePause || p.MinUpdatePause == 0) { if !forceUpdate && (p.interval == 0 || time.Since(lastUpdate) < p.interval) {
continue continue
} }
lastUpdate = time.Now() lastUpdate = time.Now()
@ -158,12 +151,6 @@ func (p *Progress) Run(ctx context.Context) error {
} }
} }
// ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil.
func (p *Progress) ScannerError(item string, err error) error {
return p.printer.ScannerError(item, err)
}
// Error is the error callback function for the archiver, it prints the error and returns nil. // Error is the error callback function for the archiver, it prints the error and returns nil.
func (p *Progress) Error(item string, err error) error { func (p *Progress) Error(item string, err error) error {
cbErr := p.printer.Error(item, err) cbErr := p.printer.Error(item, err)
@ -214,23 +201,12 @@ func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s a
} }
switch current.Type { switch current.Type {
case "file":
select {
case p.processedCh <- Counter{Files: 1}:
case <-p.closed:
}
select {
case p.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-p.closed:
}
case "dir": case "dir":
select { select {
case p.processedCh <- Counter{Dirs: 1}: case p.processedCh <- Counter{Dirs: 1}:
case <-p.closed: case <-p.closed:
} }
}
if current.Type == "dir" {
if previous == nil { if previous == nil {
p.printer.CompleteItem("dir new", item, previous, current, s, d) p.printer.CompleteItem("dir new", item, previous, current, s, d)
p.summary.Lock() p.summary.Lock()
@ -251,9 +227,13 @@ func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s a
p.summary.Unlock() p.summary.Unlock()
} }
} else if current.Type == "file" { case "file":
select { select {
case p.workerCh <- fileWorkerMessage{done: true, filename: item}: case p.processedCh <- Counter{Files: 1}:
case <-p.closed:
}
select {
case p.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-p.closed: case <-p.closed:
} }
@ -294,18 +274,8 @@ func (p *Progress) ReportTotal(item string, s archiver.ScanStats) {
} }
// Finish prints the finishing messages. // Finish prints the finishing messages.
func (p *Progress) Finish(snapshotID restic.ID) { func (p *Progress) Finish(snapshotID restic.ID, dryrun bool) {
// wait for the status update goroutine to shut down // wait for the status update goroutine to shut down
<-p.closed <-p.closed
p.printer.Finish(snapshotID, p.start, p.summary, p.dry) p.printer.Finish(snapshotID, p.start, &p.summary, dryrun)
}
// SetMinUpdatePause sets b.MinUpdatePause.
func (p *Progress) SetMinUpdatePause(d time.Duration) {
p.MinUpdatePause = d
}
// SetDryRun marks the backup as a "dry run".
func (p *Progress) SetDryRun() {
p.dry = true
} }