Refactor progress reporting

This commit is contained in:
Alexander Neumann 2015-02-21 14:23:49 +01:00
parent 97b963ef1d
commit 58cded6b75
4 changed files with 113 additions and 99 deletions

View file

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path/filepath"
"sync" "sync"
"github.com/juju/arrar" "github.com/juju/arrar"
@ -32,15 +33,12 @@ type Archiver struct {
Error func(dir string, fi os.FileInfo, err error) error Error func(dir string, fi os.FileInfo, err error) error
Filter func(item string, fi os.FileInfo) bool Filter func(item string, fi os.FileInfo) bool
p *Progress
} }
func NewArchiver(s Server, p *Progress) (*Archiver, error) { func NewArchiver(s Server) (*Archiver, error) {
var err error var err error
arch := &Archiver{ arch := &Archiver{
s: s, s: s,
p: p,
blobToken: make(chan struct{}, maxConcurrentBlobs), blobToken: make(chan struct{}, maxConcurrentBlobs),
} }
@ -179,7 +177,7 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (Blob, error) {
// SaveFile stores the content of the file on the backend as a Blob by calling // SaveFile stores the content of the file on the backend as a Blob by calling
// Save for each chunk. // Save for each chunk.
func (arch *Archiver) SaveFile(node *Node) (Blobs, error) { func (arch *Archiver) SaveFile(p *Progress, node *Node) (Blobs, error) {
file, err := os.Open(node.path) file, err := os.Open(node.path)
defer file.Close() defer file.Close()
if err != nil { if err != nil {
@ -240,7 +238,7 @@ func (arch *Archiver) SaveFile(node *Node) (Blobs, error) {
panic(err) panic(err)
} }
arch.p.Report(Stat{Bytes: blob.Size}) p.Report(Stat{Bytes: blob.Size})
arch.blobToken <- token arch.blobToken <- token
ch <- blob ch <- blob
}(resCh) }(resCh)
@ -277,7 +275,7 @@ func (arch *Archiver) SaveFile(node *Node) (Blobs, error) {
return blobs, nil return blobs, nil
} }
func (arch *Archiver) saveTree(t *Tree) (Blob, error) { func (arch *Archiver) saveTree(p *Progress, t *Tree) (Blob, error) {
debug.Log("Archiver.saveTree", "saveTree(%v)\n", t) debug.Log("Archiver.saveTree", "saveTree(%v)\n", t)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -287,13 +285,13 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
// TODO: do all this in parallel // TODO: do all this in parallel
for _, node := range t.Nodes { for _, node := range t.Nodes {
if node.tree != nil { if node.tree != nil {
b, err := arch.saveTree(node.tree) b, err := arch.saveTree(p, node.tree)
if err != nil { if err != nil {
return Blob{}, err return Blob{}, err
} }
node.Subtree = b.ID node.Subtree = b.ID
t.Map.Insert(b) t.Map.Insert(b)
arch.p.Report(Stat{Dirs: 1}) p.Report(Stat{Dirs: 1})
} else if node.Type == "file" { } else if node.Type == "file" {
if len(node.Content) > 0 { if len(node.Content) > 0 {
removeContent := false removeContent := false
@ -332,12 +330,12 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
defer wg.Done() defer wg.Done()
var blobs Blobs var blobs Blobs
blobs, n.err = arch.SaveFile(n) blobs, n.err = arch.SaveFile(p, n)
for _, b := range blobs { for _, b := range blobs {
t.Map.Insert(b) t.Map.Insert(b)
} }
arch.p.Report(Stat{Files: 1}) p.Report(Stat{Files: 1})
}(node) }(node)
} }
} }
@ -391,11 +389,11 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
return blob, nil return blob, nil
} }
func (arch *Archiver) Snapshot(path string, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { func (arch *Archiver) Snapshot(p *Progress, path string, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) {
debug.Break("Archiver.Snapshot") debug.Break("Archiver.Snapshot")
arch.p.Start() p.Start()
defer arch.p.Done() defer p.Done()
sn, err := NewSnapshot(path) sn, err := NewSnapshot(path)
if err != nil { if err != nil {
@ -424,14 +422,14 @@ func (arch *Archiver) Snapshot(path string, parentSnapshot backend.ID) (*Snapsho
} }
if node.Type == "file" { if node.Type == "file" {
node.blobs, err = arch.SaveFile(node) node.blobs, err = arch.SaveFile(p, node)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
e.Result <- node e.Result <- node
arch.p.Report(Stat{Files: 1}) p.Report(Stat{Files: 1})
case <-done: case <-done:
// pipeline was cancelled // pipeline was cancelled
return return
@ -483,7 +481,7 @@ func (arch *Archiver) Snapshot(path string, parentSnapshot backend.ID) (*Snapsho
node.blobs = Blobs{blob} node.blobs = Blobs{blob}
dir.Result <- node dir.Result <- node
arch.p.Report(Stat{Dirs: 1}) p.Report(Stat{Dirs: 1})
case <-done: case <-done:
// pipeline was cancelled // pipeline was cancelled
return return
@ -536,3 +534,32 @@ func (arch *Archiver) Snapshot(path string, parentSnapshot backend.ID) (*Snapsho
return sn, blob.Storage, nil return sn, blob.Storage, nil
} }
func isFile(fi os.FileInfo) bool {
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0
}
func Scan(dir string, p *Progress) (Stat, error) {
p.Start()
defer p.Done()
var stat Stat
err := filepath.Walk(dir, func(str string, fi os.FileInfo, err error) error {
s := Stat{}
if isFile(fi) {
s.Files++
s.Bytes += uint64(fi.Size())
} else if fi.IsDir() {
s.Dirs++
}
p.Report(s)
stat.Add(s)
// TODO: handle error?
return nil
})
return stat, err
}

View file

@ -135,19 +135,19 @@ func BenchmarkArchiveDirectory(b *testing.B) {
key := setupKey(b, be, "geheim") key := setupKey(b, be, "geheim")
server := restic.NewServerWithKey(be, key) server := restic.NewServerWithKey(be, key)
arch, err := restic.NewArchiver(server, nil) arch, err := restic.NewArchiver(server)
ok(b, err) ok(b, err)
_, id, err := arch.Snapshot(*benchArchiveDirectory, nil) _, id, err := arch.Snapshot(nil, *benchArchiveDirectory, nil)
b.Logf("snapshot archived as %v", id) b.Logf("snapshot archived as %v", id)
} }
func snapshot(t testing.TB, server restic.Server, path string) *restic.Snapshot { func snapshot(t testing.TB, server restic.Server, path string) *restic.Snapshot {
arch, err := restic.NewArchiver(server, nil) arch, err := restic.NewArchiver(server)
ok(t, err) ok(t, err)
ok(t, arch.Preload()) ok(t, arch.Preload())
sn, _, err := arch.Snapshot(path, nil) sn, _, err := arch.Snapshot(nil, path, nil)
ok(t, err) ok(t, err)
return sn return sn
} }
@ -217,9 +217,9 @@ func BenchmarkPreload(t *testing.B) {
server := restic.NewServerWithKey(be, key) server := restic.NewServerWithKey(be, key)
// archive a few files // archive a few files
arch, err := restic.NewArchiver(server, nil) arch, err := restic.NewArchiver(server)
ok(t, err) ok(t, err)
sn, _, err := arch.Snapshot(*benchArchiveDirectory, nil) sn, _, err := arch.Snapshot(nil, *benchArchiveDirectory, nil)
ok(t, err) ok(t, err)
t.Logf("archived snapshot %v", sn.ID()) t.Logf("archived snapshot %v", sn.ID())
@ -228,7 +228,7 @@ func BenchmarkPreload(t *testing.B) {
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
// create new archiver and preload // create new archiver and preload
arch2, err := restic.NewArchiver(server, nil) arch2, err := restic.NewArchiver(server)
ok(t, err) ok(t, err)
ok(t, arch2.Preload()) ok(t, arch2.Preload())
} }

View file

@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"path/filepath"
"strings" "strings"
"time" "time"
@ -74,8 +73,61 @@ func (cmd CmdBackup) Usage() string {
return "DIR/FILE [snapshot-ID]" return "DIR/FILE [snapshot-ID]"
} }
func isFile(fi os.FileInfo) bool { func newScanProgress() *restic.Progress {
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0 if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return nil
}
scanProgress := restic.NewProgress(time.Second)
if terminal.IsTerminal(int(os.Stdout.Fd())) {
scanProgress.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\x1b[2K\r[%s] %d directories, %d files, %s", format_duration(d), s.Dirs, s.Files, format_bytes(s.Bytes))
}
scanProgress.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\nDone in %s\n", format_duration(d))
}
}
return scanProgress
}
func newArchiveProgress(todo restic.Stat) *restic.Progress {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return nil
}
archiveProgress := restic.NewProgress(time.Second)
var bps, eta uint64
itemsTodo := todo.Files + todo.Dirs
archiveProgress.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
sec := uint64(d / time.Second)
if todo.Bytes > 0 && sec > 0 && ticker {
bps = s.Bytes / sec
if bps > 0 {
eta = (todo.Bytes - s.Bytes) / bps
}
}
itemsDone := s.Files + s.Dirs
fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s %d / %d items ETA %s",
format_duration(d),
float64(s.Bytes)/float64(todo.Bytes)*100,
format_bytes(bps),
format_bytes(s.Bytes), format_bytes(todo.Bytes),
itemsDone, itemsTodo,
format_seconds(eta))
}
archiveProgress.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
sec := uint64(d / time.Second)
fmt.Printf("\nduration: %s, %.2fMiB/s\n",
format_duration(d),
float64(todo.Bytes)/float64(sec)/(1<<20))
}
return archiveProgress
} }
func (cmd CmdBackup) Execute(args []string) error { func (cmd CmdBackup) Execute(args []string) error {
@ -102,87 +154,19 @@ func (cmd CmdBackup) Execute(args []string) error {
fmt.Printf("scan %s\n", target) fmt.Printf("scan %s\n", target)
scanProgress := restic.NewProgress(time.Second) sp := newScanProgress()
if terminal.IsTerminal(int(os.Stdout.Fd())) { stat, err := restic.Scan(target, sp)
scanProgress.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\x1b[2K\r[%s] %d directories, %d files, %s", format_duration(d), s.Dirs, s.Files, format_bytes(s.Bytes))
}
scanProgress.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\nDone in %s\n", format_duration(d))
}
}
// TODO: add filter // TODO: add filter
// arch.Filter = func(dir string, fi os.FileInfo) bool { // arch.Filter = func(dir string, fi os.FileInfo) bool {
// return true // return true
// } // }
stat := restic.Stat{}
term := terminal.IsTerminal(int(os.Stdout.Fd()))
start := time.Now()
err = filepath.Walk(target, func(p string, fi os.FileInfo, err error) error {
if isFile(fi) {
stat.Files++
stat.Bytes += uint64(fi.Size())
} else if fi.IsDir() {
stat.Dirs++
}
if term {
fmt.Printf("\x1b[2K\r[%s] %d directories, %d files, %s",
format_duration(time.Since(start)), stat.Dirs, stat.Files, format_bytes(stat.Bytes))
}
// TODO: handle error?
return nil
})
if err != nil {
return err
}
fmt.Printf("\nDone in %s\n", format_duration(time.Since(start)))
if parentSnapshotID != nil { if parentSnapshotID != nil {
return errors.New("not implemented") return errors.New("not implemented")
} }
archiveProgress := restic.NewProgress(time.Second) arch, err := restic.NewArchiver(s)
if terminal.IsTerminal(int(os.Stdout.Fd())) {
var bps, eta uint64
itemsTodo := stat.Files + stat.Dirs
archiveProgress.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
sec := uint64(d / time.Second)
if stat.Bytes > 0 && sec > 0 && ticker {
bps = s.Bytes / sec
if bps > 0 {
eta = (stat.Bytes - s.Bytes) / bps
}
}
itemsDone := s.Files + s.Dirs
fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s %d / %d items ETA %s",
format_duration(d),
float64(s.Bytes)/float64(stat.Bytes)*100,
format_bytes(bps),
format_bytes(s.Bytes), format_bytes(stat.Bytes),
itemsDone, itemsTodo,
format_seconds(eta))
}
archiveProgress.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
sec := uint64(d / time.Second)
fmt.Printf("\nduration: %s, %.2fMiB/s\n",
format_duration(d),
float64(stat.Bytes)/float64(sec)/(1<<20))
}
}
arch, err := restic.NewArchiver(s, archiveProgress)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "err: %v\n", err) fmt.Fprintf(os.Stderr, "err: %v\n", err)
} }
@ -199,7 +183,8 @@ func (cmd CmdBackup) Execute(args []string) error {
return err return err
} }
_, id, err := arch.Snapshot(target, parentSnapshotID) ap := newArchiveProgress(stat)
_, id, err := arch.Snapshot(ap, target, parentSnapshotID)
if err != nil { if err != nil {
return err return err
} }

View file

@ -26,6 +26,8 @@ type Stat struct {
Files uint64 Files uint64
Dirs uint64 Dirs uint64
Bytes uint64 Bytes uint64
Trees uint64
Blobs uint64
} }
type ProgressFunc func(s Stat, runtime time.Duration, ticker bool) type ProgressFunc func(s Stat, runtime time.Duration, ticker bool)