Merge pull request 1944 from mholt/jsonprogress
backup: Support --json flag by streaming JSON to report progress
This commit is contained in:
commit
6a5c9f57c2
4 changed files with 490 additions and 18 deletions
|
@ -5,6 +5,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -23,6 +24,7 @@ import (
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
"github.com/restic/restic/internal/textfile"
|
"github.com/restic/restic/internal/textfile"
|
||||||
"github.com/restic/restic/internal/ui"
|
"github.com/restic/restic/internal/ui"
|
||||||
|
"github.com/restic/restic/internal/ui/jsonstatus"
|
||||||
"github.com/restic/restic/internal/ui/termstatus"
|
"github.com/restic/restic/internal/ui/termstatus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -395,15 +397,43 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
|
|
||||||
var t tomb.Tomb
|
var t tomb.Tomb
|
||||||
|
|
||||||
if gopts.verbosity >= 2 {
|
if gopts.verbosity >= 2 && !gopts.JSON {
|
||||||
term.Print("open repository\n")
|
term.Print("open repository\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
repo, err := OpenRepository(gopts)
|
repo, err := OpenRepository(gopts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
p := ui.NewBackup(term, gopts.verbosity)
|
type ArchiveProgressReporter interface {
|
||||||
|
CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration)
|
||||||
|
StartFile(filename string)
|
||||||
|
CompleteBlob(filename string, bytes uint64)
|
||||||
|
ScannerError(item string, fi os.FileInfo, err error) error
|
||||||
|
ReportTotal(item string, s archiver.ScanStats)
|
||||||
|
SetMinUpdatePause(d time.Duration)
|
||||||
|
Run(ctx context.Context) error
|
||||||
|
Error(item string, fi os.FileInfo, err error) error
|
||||||
|
Finish(snapshotID restic.ID)
|
||||||
|
|
||||||
|
// ui.StdioWrapper
|
||||||
|
Stdout() io.WriteCloser
|
||||||
|
Stderr() io.WriteCloser
|
||||||
|
|
||||||
|
// ui.Message
|
||||||
|
E(msg string, args ...interface{})
|
||||||
|
P(msg string, args ...interface{})
|
||||||
|
V(msg string, args ...interface{})
|
||||||
|
VV(msg string, args ...interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
var p ArchiveProgressReporter
|
||||||
|
if gopts.JSON {
|
||||||
|
p = jsonstatus.NewBackup(term, gopts.verbosity)
|
||||||
|
} else {
|
||||||
|
p = ui.NewBackup(term, gopts.verbosity)
|
||||||
|
}
|
||||||
|
|
||||||
// use the terminal for stdout/stderr
|
// use the terminal for stdout/stderr
|
||||||
prevStdout, prevStderr := gopts.stdout, gopts.stderr
|
prevStdout, prevStderr := gopts.stdout, gopts.stderr
|
||||||
|
@ -418,13 +448,15 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
if fps > 60 {
|
if fps > 60 {
|
||||||
fps = 60
|
fps = 60
|
||||||
}
|
}
|
||||||
p.MinUpdatePause = time.Second / time.Duration(fps)
|
p.SetMinUpdatePause(time.Second / time.Duration(fps))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Go(func() error { return p.Run(t.Context(gopts.ctx)) })
|
t.Go(func() error { return p.Run(t.Context(gopts.ctx)) })
|
||||||
|
|
||||||
p.V("lock repository")
|
if !gopts.JSON {
|
||||||
|
p.V("lock repository")
|
||||||
|
}
|
||||||
lock, err := lockRepo(repo)
|
lock, err := lockRepo(repo)
|
||||||
defer unlockRepo(lock)
|
defer unlockRepo(lock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -443,7 +475,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
p.V("load index files")
|
if !gopts.JSON {
|
||||||
|
p.V("load index files")
|
||||||
|
}
|
||||||
err = repo.LoadIndex(gopts.ctx)
|
err = repo.LoadIndex(gopts.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -454,7 +488,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if parentSnapshotID != nil {
|
if !gopts.JSON && parentSnapshotID != nil {
|
||||||
p.V("using parent snapshot %v\n", parentSnapshotID.Str())
|
p.V("using parent snapshot %v\n", parentSnapshotID.Str())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -478,7 +512,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
|
|
||||||
var targetFS fs.FS = fs.Local{}
|
var targetFS fs.FS = fs.Local{}
|
||||||
if opts.Stdin {
|
if opts.Stdin {
|
||||||
p.V("read data from stdin")
|
if !gopts.JSON {
|
||||||
|
p.V("read data from stdin")
|
||||||
|
}
|
||||||
targetFS = &fs.Reader{
|
targetFS = &fs.Reader{
|
||||||
ModTime: timeStamp,
|
ModTime: timeStamp,
|
||||||
Name: opts.StdinFilename,
|
Name: opts.StdinFilename,
|
||||||
|
@ -494,7 +530,9 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
sc.Error = p.ScannerError
|
sc.Error = p.ScannerError
|
||||||
sc.Result = p.ReportTotal
|
sc.Result = p.ReportTotal
|
||||||
|
|
||||||
p.V("start scan on %v", targets)
|
if !gopts.JSON {
|
||||||
|
p.V("start scan on %v", targets)
|
||||||
|
}
|
||||||
t.Go(func() error { return sc.Scan(t.Context(gopts.ctx), targets) })
|
t.Go(func() error { return sc.Scan(t.Context(gopts.ctx), targets) })
|
||||||
|
|
||||||
arch := archiver.New(repo, targetFS, archiver.Options{})
|
arch := archiver.New(repo, targetFS, archiver.Options{})
|
||||||
|
@ -502,7 +540,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
arch.Select = selectFilter
|
arch.Select = selectFilter
|
||||||
arch.WithAtime = opts.WithAtime
|
arch.WithAtime = opts.WithAtime
|
||||||
arch.Error = p.Error
|
arch.Error = p.Error
|
||||||
arch.CompleteItem = p.CompleteItemFn
|
arch.CompleteItem = p.CompleteItem
|
||||||
arch.StartFile = p.StartFile
|
arch.StartFile = p.StartFile
|
||||||
arch.CompleteBlob = p.CompleteBlob
|
arch.CompleteBlob = p.CompleteBlob
|
||||||
|
|
||||||
|
@ -521,10 +559,14 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
uploader := archiver.IndexUploader{
|
uploader := archiver.IndexUploader{
|
||||||
Repository: repo,
|
Repository: repo,
|
||||||
Start: func() {
|
Start: func() {
|
||||||
p.VV("uploading intermediate index")
|
if !gopts.JSON {
|
||||||
|
p.VV("uploading intermediate index")
|
||||||
|
}
|
||||||
},
|
},
|
||||||
Complete: func(id restic.ID) {
|
Complete: func(id restic.ID) {
|
||||||
p.V("uploaded intermediate index %v", id.Str())
|
if !gopts.JSON {
|
||||||
|
p.V("uploaded intermediate index %v", id.Str())
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -532,14 +574,18 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina
|
||||||
return uploader.Upload(gopts.ctx, t.Context(gopts.ctx), 30*time.Second)
|
return uploader.Upload(gopts.ctx, t.Context(gopts.ctx), 30*time.Second)
|
||||||
})
|
})
|
||||||
|
|
||||||
p.V("start backup on %v", targets)
|
if !gopts.JSON {
|
||||||
|
p.V("start backup on %v", targets)
|
||||||
|
}
|
||||||
_, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts)
|
_, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Fatalf("unable to save snapshot: %v", err)
|
return errors.Fatalf("unable to save snapshot: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Finish()
|
p.Finish(id)
|
||||||
p.P("snapshot %s saved\n", id.Str())
|
if !gopts.JSON {
|
||||||
|
p.P("snapshot %s saved\n", id.Str())
|
||||||
|
}
|
||||||
|
|
||||||
// cleanly shutdown all running goroutines
|
// cleanly shutdown all running goroutines
|
||||||
t.Kill(nil)
|
t.Kill(nil)
|
||||||
|
|
|
@ -377,7 +377,7 @@ func OpenRepository(opts GlobalOptions) (*repository.Repository, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if stdoutIsTerminal() {
|
if stdoutIsTerminal() && !opts.JSON {
|
||||||
id := s.Config().ID
|
id := s.Config().ID
|
||||||
if len(id) > 8 {
|
if len(id) > 8 {
|
||||||
id = id[:8]
|
id = id[:8]
|
||||||
|
|
|
@ -254,9 +254,9 @@ func formatBytes(c uint64) string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CompleteItemFn is the status callback function for the archiver when a
|
// CompleteItem is the status callback function for the archiver when a
|
||||||
// file/dir has been saved successfully.
|
// file/dir has been saved successfully.
|
||||||
func (b *Backup) CompleteItemFn(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
|
func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
|
||||||
b.summary.Lock()
|
b.summary.Lock()
|
||||||
b.summary.ItemStats.Add(s)
|
b.summary.ItemStats.Add(s)
|
||||||
b.summary.Unlock()
|
b.summary.Unlock()
|
||||||
|
@ -349,7 +349,7 @@ func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finish prints the finishing messages.
|
// Finish prints the finishing messages.
|
||||||
func (b *Backup) Finish() {
|
func (b *Backup) Finish(snapshotID restic.ID) {
|
||||||
close(b.finished)
|
close(b.finished)
|
||||||
|
|
||||||
b.P("\n")
|
b.P("\n")
|
||||||
|
@ -365,3 +365,9 @@ func (b *Backup) Finish() {
|
||||||
formatDuration(time.Since(b.start)),
|
formatDuration(time.Since(b.start)),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
|
||||||
|
// ArchiveProgressReporter interface.
|
||||||
|
func (b *Backup) SetMinUpdatePause(d time.Duration) {
|
||||||
|
b.MinUpdatePause = d
|
||||||
|
}
|
||||||
|
|
420
internal/ui/jsonstatus/status.go
Normal file
420
internal/ui/jsonstatus/status.go
Normal file
|
@ -0,0 +1,420 @@
|
||||||
|
package jsonstatus
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/restic/restic/internal/archiver"
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
"github.com/restic/restic/internal/ui"
|
||||||
|
"github.com/restic/restic/internal/ui/termstatus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type counter struct {
|
||||||
|
Files, Dirs, Bytes uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type fileWorkerMessage struct {
|
||||||
|
filename string
|
||||||
|
done bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backup reports progress for the `backup` command in JSON.
|
||||||
|
type Backup struct {
|
||||||
|
*ui.Message
|
||||||
|
*ui.StdioWrapper
|
||||||
|
|
||||||
|
MinUpdatePause time.Duration
|
||||||
|
|
||||||
|
term *termstatus.Terminal
|
||||||
|
v uint
|
||||||
|
start time.Time
|
||||||
|
|
||||||
|
totalBytes uint64
|
||||||
|
|
||||||
|
totalCh chan counter
|
||||||
|
processedCh chan counter
|
||||||
|
errCh chan struct{}
|
||||||
|
workerCh chan fileWorkerMessage
|
||||||
|
finished chan struct{}
|
||||||
|
|
||||||
|
summary struct {
|
||||||
|
sync.Mutex
|
||||||
|
Files, Dirs struct {
|
||||||
|
New uint
|
||||||
|
Changed uint
|
||||||
|
Unchanged uint
|
||||||
|
}
|
||||||
|
archiver.ItemStats
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBackup returns a new backup progress reporter.
|
||||||
|
func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
|
||||||
|
return &Backup{
|
||||||
|
Message: ui.NewMessage(term, verbosity),
|
||||||
|
StdioWrapper: ui.NewStdioWrapper(term),
|
||||||
|
term: term,
|
||||||
|
v: verbosity,
|
||||||
|
start: time.Now(),
|
||||||
|
|
||||||
|
// limit to 60fps by default
|
||||||
|
MinUpdatePause: time.Second / 60,
|
||||||
|
|
||||||
|
totalCh: make(chan counter),
|
||||||
|
processedCh: make(chan counter),
|
||||||
|
errCh: make(chan struct{}),
|
||||||
|
workerCh: make(chan fileWorkerMessage),
|
||||||
|
finished: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run regularly updates the status lines. It should be called in a separate
|
||||||
|
// goroutine.
|
||||||
|
func (b *Backup) Run(ctx context.Context) error {
|
||||||
|
var (
|
||||||
|
lastUpdate time.Time
|
||||||
|
total, processed counter
|
||||||
|
errors uint
|
||||||
|
started bool
|
||||||
|
currentFiles = make(map[string]struct{})
|
||||||
|
secondsRemaining uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
t := time.NewTicker(time.Second)
|
||||||
|
defer t.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case <-b.finished:
|
||||||
|
started = false
|
||||||
|
case t, ok := <-b.totalCh:
|
||||||
|
if ok {
|
||||||
|
total = t
|
||||||
|
started = true
|
||||||
|
} else {
|
||||||
|
// scan has finished
|
||||||
|
b.totalCh = nil
|
||||||
|
b.totalBytes = total.Bytes
|
||||||
|
}
|
||||||
|
case s := <-b.processedCh:
|
||||||
|
processed.Files += s.Files
|
||||||
|
processed.Dirs += s.Dirs
|
||||||
|
processed.Bytes += s.Bytes
|
||||||
|
started = true
|
||||||
|
case <-b.errCh:
|
||||||
|
errors++
|
||||||
|
started = true
|
||||||
|
case m := <-b.workerCh:
|
||||||
|
if m.done {
|
||||||
|
delete(currentFiles, m.filename)
|
||||||
|
} else {
|
||||||
|
currentFiles[m.filename] = struct{}{}
|
||||||
|
}
|
||||||
|
case <-t.C:
|
||||||
|
if !started {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if b.totalCh == nil {
|
||||||
|
secs := float64(time.Since(b.start) / time.Second)
|
||||||
|
todo := float64(total.Bytes - processed.Bytes)
|
||||||
|
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// limit update frequency
|
||||||
|
if time.Since(lastUpdate) < b.MinUpdatePause {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lastUpdate = time.Now()
|
||||||
|
|
||||||
|
b.update(total, processed, errors, currentFiles, secondsRemaining)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update updates the status lines.
|
||||||
|
func (b *Backup) update(total, processed counter, errors uint, currentFiles map[string]struct{}, secs uint64) {
|
||||||
|
status := statusUpdate{
|
||||||
|
MessageType: "status",
|
||||||
|
SecondsElapsed: uint64(time.Since(b.start) / time.Second),
|
||||||
|
SecondsRemaining: secs,
|
||||||
|
TotalFiles: total.Files,
|
||||||
|
FilesDone: processed.Files,
|
||||||
|
TotalBytes: total.Bytes,
|
||||||
|
BytesDone: processed.Bytes,
|
||||||
|
ErrorCount: errors,
|
||||||
|
}
|
||||||
|
|
||||||
|
if total.Bytes > 0 {
|
||||||
|
status.PercentDone = float64(processed.Bytes) / float64(total.Bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
for filename := range currentFiles {
|
||||||
|
status.CurrentFiles = append(status.CurrentFiles, filename)
|
||||||
|
}
|
||||||
|
sort.Sort(sort.StringSlice(status.CurrentFiles))
|
||||||
|
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stdout()).Encode(status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ScannerError is the error callback function for the scanner, it prints the
|
||||||
|
// error in verbose mode and returns nil.
|
||||||
|
func (b *Backup) ScannerError(item string, fi os.FileInfo, err error) error {
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stderr()).Encode(errorUpdate{
|
||||||
|
MessageType: "error",
|
||||||
|
Error: err,
|
||||||
|
During: "scan",
|
||||||
|
Item: item,
|
||||||
|
})
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error is the error callback function for the archiver, it prints the error and returns nil.
|
||||||
|
func (b *Backup) Error(item string, fi os.FileInfo, err error) error {
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stderr()).Encode(errorUpdate{
|
||||||
|
MessageType: "error",
|
||||||
|
Error: err,
|
||||||
|
During: "archival",
|
||||||
|
Item: item,
|
||||||
|
})
|
||||||
|
b.errCh <- struct{}{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartFile is called when a file is being processed by a worker.
|
||||||
|
func (b *Backup) StartFile(filename string) {
|
||||||
|
b.workerCh <- fileWorkerMessage{
|
||||||
|
filename: filename,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompleteBlob is called for all saved blobs for files.
|
||||||
|
func (b *Backup) CompleteBlob(filename string, bytes uint64) {
|
||||||
|
b.processedCh <- counter{Bytes: bytes}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompleteItem is the status callback function for the archiver when a
|
||||||
|
// file/dir has been saved successfully.
|
||||||
|
func (b *Backup) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
|
||||||
|
b.summary.Lock()
|
||||||
|
b.summary.ItemStats.Add(s)
|
||||||
|
b.summary.Unlock()
|
||||||
|
|
||||||
|
if current == nil {
|
||||||
|
// error occurred, tell the status display to remove the line
|
||||||
|
b.workerCh <- fileWorkerMessage{
|
||||||
|
filename: item,
|
||||||
|
done: true,
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch current.Type {
|
||||||
|
case "file":
|
||||||
|
b.processedCh <- counter{Files: 1}
|
||||||
|
b.workerCh <- fileWorkerMessage{
|
||||||
|
filename: item,
|
||||||
|
done: true,
|
||||||
|
}
|
||||||
|
case "dir":
|
||||||
|
b.processedCh <- counter{Dirs: 1}
|
||||||
|
}
|
||||||
|
|
||||||
|
if current.Type == "dir" {
|
||||||
|
if previous == nil {
|
||||||
|
if b.v >= 3 {
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{
|
||||||
|
MessageType: "verbose_status",
|
||||||
|
Action: "new",
|
||||||
|
Item: item,
|
||||||
|
Duration: d.Seconds(),
|
||||||
|
DataSize: s.DataSize,
|
||||||
|
MetadataSize: s.TreeSize,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
b.summary.Lock()
|
||||||
|
b.summary.Dirs.New++
|
||||||
|
b.summary.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if previous.Equals(*current) {
|
||||||
|
if b.v >= 3 {
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{
|
||||||
|
MessageType: "verbose_status",
|
||||||
|
Action: "unchanged",
|
||||||
|
Item: item,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
b.summary.Lock()
|
||||||
|
b.summary.Dirs.Unchanged++
|
||||||
|
b.summary.Unlock()
|
||||||
|
} else {
|
||||||
|
if b.v >= 3 {
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{
|
||||||
|
MessageType: "verbose_status",
|
||||||
|
Action: "modified",
|
||||||
|
Item: item,
|
||||||
|
Duration: d.Seconds(),
|
||||||
|
DataSize: s.DataSize,
|
||||||
|
MetadataSize: s.TreeSize,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
b.summary.Lock()
|
||||||
|
b.summary.Dirs.Changed++
|
||||||
|
b.summary.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if current.Type == "file" {
|
||||||
|
|
||||||
|
b.workerCh <- fileWorkerMessage{
|
||||||
|
done: true,
|
||||||
|
filename: item,
|
||||||
|
}
|
||||||
|
|
||||||
|
if previous == nil {
|
||||||
|
if b.v >= 3 {
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{
|
||||||
|
MessageType: "verbose_status",
|
||||||
|
Action: "new",
|
||||||
|
Item: item,
|
||||||
|
Duration: d.Seconds(),
|
||||||
|
DataSize: s.DataSize,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
b.summary.Lock()
|
||||||
|
b.summary.Files.New++
|
||||||
|
b.summary.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if previous.Equals(*current) {
|
||||||
|
if b.v >= 3 {
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{
|
||||||
|
MessageType: "verbose_status",
|
||||||
|
Action: "unchanged",
|
||||||
|
Item: item,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
b.summary.Lock()
|
||||||
|
b.summary.Files.Unchanged++
|
||||||
|
b.summary.Unlock()
|
||||||
|
} else {
|
||||||
|
if b.v >= 3 {
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{
|
||||||
|
MessageType: "verbose_status",
|
||||||
|
Action: "modified",
|
||||||
|
Item: item,
|
||||||
|
Duration: d.Seconds(),
|
||||||
|
DataSize: s.DataSize,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
b.summary.Lock()
|
||||||
|
b.summary.Files.Changed++
|
||||||
|
b.summary.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReportTotal sets the total stats up to now
|
||||||
|
func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
|
||||||
|
select {
|
||||||
|
case b.totalCh <- counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}:
|
||||||
|
case <-b.finished:
|
||||||
|
}
|
||||||
|
|
||||||
|
if item == "" {
|
||||||
|
if b.v >= 2 {
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stdout()).Encode(verboseUpdate{
|
||||||
|
MessageType: "status",
|
||||||
|
Action: "scan_finished",
|
||||||
|
Duration: time.Since(b.start).Seconds(),
|
||||||
|
DataSize: s.Bytes,
|
||||||
|
TotalFiles: s.Files,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
close(b.totalCh)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finish prints the finishing messages.
|
||||||
|
func (b *Backup) Finish(snapshotID restic.ID) {
|
||||||
|
close(b.finished)
|
||||||
|
json.NewEncoder(b.StdioWrapper.Stdout()).Encode(summaryOutput{
|
||||||
|
MessageType: "summary",
|
||||||
|
FilesNew: b.summary.Files.New,
|
||||||
|
FilesChanged: b.summary.Files.Changed,
|
||||||
|
FilesUnmodified: b.summary.Files.Unchanged,
|
||||||
|
DirsNew: b.summary.Dirs.New,
|
||||||
|
DirsChanged: b.summary.Dirs.Changed,
|
||||||
|
DirsUnmodified: b.summary.Dirs.Unchanged,
|
||||||
|
DataBlobs: b.summary.ItemStats.DataBlobs,
|
||||||
|
TreeBlobs: b.summary.ItemStats.TreeBlobs,
|
||||||
|
DataAdded: b.summary.ItemStats.DataSize + b.summary.ItemStats.TreeSize,
|
||||||
|
TotalFilesProcessed: b.summary.Files.New + b.summary.Files.Changed + b.summary.Files.Unchanged,
|
||||||
|
TotalBytesProcessed: b.totalBytes,
|
||||||
|
TotalDuration: time.Since(b.start).Seconds(),
|
||||||
|
SnapshotID: snapshotID.Str(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetMinUpdatePause sets b.MinUpdatePause. It satisfies the
|
||||||
|
// ArchiveProgressReporter interface.
|
||||||
|
func (b *Backup) SetMinUpdatePause(d time.Duration) {
|
||||||
|
b.MinUpdatePause = d
|
||||||
|
}
|
||||||
|
|
||||||
|
type statusUpdate struct {
|
||||||
|
MessageType string `json:"message_type"` // "status"
|
||||||
|
SecondsElapsed uint64 `json:"seconds_elapsed,omitempty"`
|
||||||
|
SecondsRemaining uint64 `json:"seconds_remaining,omitempty"`
|
||||||
|
PercentDone float64 `json:"percent_done"`
|
||||||
|
TotalFiles uint64 `json:"total_files,omitempty"`
|
||||||
|
FilesDone uint64 `json:"files_done,omitempty"`
|
||||||
|
TotalBytes uint64 `json:"total_bytes,omitempty"`
|
||||||
|
BytesDone uint64 `json:"bytes_done,omitempty"`
|
||||||
|
ErrorCount uint `json:"error_count,omitempty"`
|
||||||
|
CurrentFiles []string `json:"current_files,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type errorUpdate struct {
|
||||||
|
MessageType string `json:"message_type"` // "error"
|
||||||
|
Error error `json:"error"`
|
||||||
|
During string `json:"during"`
|
||||||
|
Item string `json:"item"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type verboseUpdate struct {
|
||||||
|
MessageType string `json:"message_type"` // "verbose_status"
|
||||||
|
Action string `json:"action"`
|
||||||
|
Item string `json:"item"`
|
||||||
|
Duration float64 `json:"duration"` // in seconds
|
||||||
|
DataSize uint64 `json:"data_size"`
|
||||||
|
MetadataSize uint64 `json:"metadata_size"`
|
||||||
|
TotalFiles uint `json:"total_files"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type summaryOutput struct {
|
||||||
|
MessageType string `json:"message_type"` // "summary"
|
||||||
|
FilesNew uint `json:"files_new"`
|
||||||
|
FilesChanged uint `json:"files_changed"`
|
||||||
|
FilesUnmodified uint `json:"files_unmodified"`
|
||||||
|
DirsNew uint `json:"dirs_new"`
|
||||||
|
DirsChanged uint `json:"dirs_changed"`
|
||||||
|
DirsUnmodified uint `json:"dirs_unmodified"`
|
||||||
|
DataBlobs int `json:"data_blobs"`
|
||||||
|
TreeBlobs int `json:"tree_blobs"`
|
||||||
|
DataAdded uint64 `json:"data_added"`
|
||||||
|
TotalFilesProcessed uint `json:"total_files_processed"`
|
||||||
|
TotalBytesProcessed uint64 `json:"total_bytes_processed"`
|
||||||
|
TotalDuration float64 `json:"total_duration"` // in seconds
|
||||||
|
SnapshotID string `json:"snapshot_id"`
|
||||||
|
}
|
Loading…
Reference in a new issue