Split Scanner from Archiver, refactor Progress

This commit is contained in:
Alexander Neumann 2015-01-04 18:23:00 +01:00
parent a93bc3c991
commit 4b70bba588
5 changed files with 344 additions and 237 deletions

View file

@ -4,9 +4,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path/filepath"
"sync" "sync"
"time"
"github.com/juju/arrar" "github.com/juju/arrar"
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
@ -16,8 +14,6 @@ import (
const ( const (
maxConcurrentFiles = 8 maxConcurrentFiles = 8
maxConcurrentBlobs = 8 maxConcurrentBlobs = 8
statTimeout = 20 * time.Millisecond
) )
type Archiver struct { type Archiver struct {
@ -29,36 +25,17 @@ type Archiver struct {
fileToken chan struct{} fileToken chan struct{}
blobToken chan struct{} blobToken chan struct{}
Stats Stats
Error func(dir string, fi os.FileInfo, err error) error Error func(dir string, fi os.FileInfo, err error) error
Filter func(item string, fi os.FileInfo) bool Filter func(item string, fi os.FileInfo) bool
ScannerStats chan Stats p *Progress
SaveStats chan Stats
statsMutex sync.Mutex
updateStats Stats
} }
type Stats struct { func NewArchiver(s Server, p *Progress) (*Archiver, error) {
Files int
Directories int
Other int
Bytes uint64
}
func (s *Stats) Add(other Stats) {
s.Bytes += other.Bytes
s.Directories += other.Directories
s.Files += other.Files
s.Other += other.Other
}
func NewArchiver(s Server) (*Archiver, error) {
var err error var err error
arch := &Archiver{ arch := &Archiver{
s: s, s: s,
p: p,
fileToken: make(chan struct{}, maxConcurrentFiles), fileToken: make(chan struct{}, maxConcurrentFiles),
blobToken: make(chan struct{}, maxConcurrentBlobs), blobToken: make(chan struct{}, maxConcurrentBlobs),
} }
@ -92,34 +69,6 @@ func NewArchiver(s Server) (*Archiver, error) {
return arch, nil return arch, nil
} }
func (arch *Archiver) update(ch chan Stats, stats Stats) {
if ch == nil {
return
}
// load old stats from global state
arch.statsMutex.Lock()
stats.Add(arch.updateStats)
arch.updateStats = Stats{}
arch.statsMutex.Unlock()
// try to send stats through the channel, with a timeout
timeout := time.After(statTimeout)
select {
case ch <- stats:
break
case _ = <-timeout:
// save cumulated stats to global state
arch.statsMutex.Lock()
arch.updateStats.Add(stats)
arch.statsMutex.Unlock()
break
}
}
func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) { func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) {
blob, err := arch.ch.Save(t, data) blob, err := arch.ch.Save(t, data)
if err != nil { if err != nil {
@ -179,7 +128,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
return arrar.Annotate(err, "SaveFile() save chunk") return arrar.Annotate(err, "SaveFile() save chunk")
} }
arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) arch.p.Report(Stat{Bytes: blob.Size})
blobs = Blobs{blob} blobs = Blobs{blob}
} }
@ -219,7 +168,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
FreeChunkBuf("blob chunker", buf) FreeChunkBuf("blob chunker", buf)
arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) arch.p.Report(Stat{Bytes: blob.Size})
arch.blobToken <- token arch.blobToken <- token
ch <- blob ch <- blob
}(resCh) }(resCh)
@ -253,110 +202,6 @@ func (arch *Archiver) SaveFile(node *Node) error {
return nil return nil
} }
func (arch *Archiver) scan(dir string) (*Tree, error) {
var err error
// open and list path
fd, err := os.Open(dir)
defer fd.Close()
if err != nil {
return nil, arch.Error(dir, nil, err)
}
entries, err := fd.Readdir(-1)
if err != nil {
return nil, err
}
// build new tree
tree := Tree{}
for _, entry := range entries {
path := filepath.Join(dir, entry.Name())
if !arch.Filter(path, entry) {
continue
}
node, err := NodeFromFileInfo(path, entry)
if err != nil {
// TODO: error processing
return nil, err
}
err = tree.Insert(node)
if err != nil {
return nil, err
}
if entry.IsDir() {
node.Tree, err = arch.scan(path)
if err != nil {
return nil, err
}
}
}
for _, node := range tree {
if node.Type == "file" && node.Content != nil {
continue
}
switch node.Type {
case "file":
arch.Stats.Files++
arch.Stats.Bytes += node.Size
case "dir":
arch.Stats.Directories++
default:
arch.Stats.Other++
}
}
arch.update(arch.ScannerStats, arch.Stats)
return &tree, nil
}
func (arch *Archiver) Scan(path string) (*Tree, error) {
// reset global stats
arch.updateStats = Stats{}
fi, err := os.Lstat(path)
if err != nil {
return nil, arrar.Annotatef(err, "Lstat(%q)", path)
}
node, err := NodeFromFileInfo(path, fi)
if err != nil {
return nil, arrar.Annotate(err, "NodeFromFileInfo()")
}
if node.Type != "dir" {
t := &Tree{node}
// update stats
if node.Content == nil && node.Subtree == nil {
arch.Stats.Files = 1
arch.Stats.Bytes = node.Size
}
arch.update(arch.ScannerStats, arch.Stats)
return t, nil
}
arch.Stats.Directories = 1
node.Tree, err = arch.scan(path)
if err != nil {
return nil, arrar.Annotate(err, "loadTree()")
}
arch.update(arch.ScannerStats, arch.Stats)
return &Tree{node}, nil
}
func (arch *Archiver) saveTree(t *Tree) (Blob, error) { func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
@ -367,7 +212,7 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
return Blob{}, err return Blob{}, err
} }
node.Subtree = b.ID node.Subtree = b.ID
arch.update(arch.SaveStats, Stats{Directories: 1}) arch.p.Report(Stat{Dirs: 1})
} else if node.Type == "file" && len(node.Content) == 0 { } else if node.Type == "file" && len(node.Content) == 0 {
// get token // get token
token := <-arch.fileToken token := <-arch.fileToken
@ -385,10 +230,10 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
arch.update(arch.SaveStats, Stats{Files: 1}) arch.p.Report(Stat{Files: 1})
}(node) }(node)
} else { } else {
arch.update(arch.SaveStats, Stats{Other: 1}) arch.p.Report(Stat{Other: 1})
} }
} }
@ -410,8 +255,8 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
} }
func (arch *Archiver) Snapshot(dir string, t *Tree, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { func (arch *Archiver) Snapshot(dir string, t *Tree, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) {
// reset global stats arch.p.Start()
arch.updateStats = Stats{} defer arch.p.Done()
sn, err := NewSnapshot(dir) sn, err := NewSnapshot(dir)
if err != nil { if err != nil {

View file

@ -40,7 +40,7 @@ func format_bytes(c uint64) string {
} }
} }
func format_duration(sec uint64) string { func format_seconds(sec uint64) string {
hours := sec / 3600 hours := sec / 3600
sec -= hours * 3600 sec -= hours * 3600
min := sec / 60 min := sec / 60
@ -52,6 +52,11 @@ func format_duration(sec uint64) string {
return fmt.Sprintf("%d:%02d", min, sec) return fmt.Sprintf("%d:%02d", min, sec)
} }
func format_duration(d time.Duration) string {
sec := uint64(d / time.Second)
return format_seconds(sec)
}
func print_tree2(indent int, t *restic.Tree) { func print_tree2(indent int, t *restic.Tree) {
for _, node := range *t { for _, node := range *t {
if node.Tree != nil { if node.Tree != nil {
@ -89,27 +94,16 @@ func (cmd CmdBackup) Execute(args []string) error {
fmt.Printf("found parent snapshot %v\n", parentSnapshotID) fmt.Printf("found parent snapshot %v\n", parentSnapshotID)
} }
arch, err := restic.NewArchiver(s)
if err != nil {
fmt.Fprintf(os.Stderr, "err: %v\n", err)
}
arch.Error = func(dir string, fi os.FileInfo, err error) error {
// TODO: make ignoring errors configurable
fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n%v\n", dir, err, fi)
return nil
}
fmt.Printf("scanning %s\n", target) fmt.Printf("scanning %s\n", target)
scanProgress := restic.NewProgress(time.Second)
if terminal.IsTerminal(int(os.Stdout.Fd())) { if terminal.IsTerminal(int(os.Stdout.Fd())) {
ch := make(chan restic.Stats, 20) scanProgress.F = func(s restic.Stat, d time.Duration, ticker bool) {
arch.ScannerStats = ch fmt.Printf("\x1b[2K\r[%s] %d directories, %d files, %s", format_duration(d), s.Dirs, s.Files, format_bytes(s.Bytes))
}
go func(ch <-chan restic.Stats) { scanProgress.D = func(s restic.Stat, d time.Duration, ticker bool) {
for stats := range ch { fmt.Printf("\nDone in %s\n", format_duration(d))
fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes))
} }
}(ch)
} }
// TODO: add filter // TODO: add filter
@ -117,59 +111,51 @@ func (cmd CmdBackup) Execute(args []string) error {
// return true // return true
// } // }
t, err := arch.Scan(target) sc := restic.NewScanner(scanProgress)
t, err := sc.Scan(target)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err) fmt.Fprintf(os.Stderr, "error: %v\n", err)
return err return err
} }
fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes)) archiveProgress := restic.NewProgress(time.Second)
targetStat := scanProgress.Current()
stats := restic.Stats{}
start := time.Now()
if terminal.IsTerminal(int(os.Stdout.Fd())) { if terminal.IsTerminal(int(os.Stdout.Fd())) {
ch := make(chan restic.Stats, 20) var bps, eta uint64
arch.SaveStats = ch archiveProgress.F = func(s restic.Stat, d time.Duration, ticker bool) {
sec := uint64(d / time.Second)
if sec > 0 && ticker {
bps = s.Bytes / sec
eta = (targetStat.Bytes - s.Bytes) / bps
}
ticker := time.NewTicker(time.Second)
var eta, bps uint64
go func(ch <-chan restic.Stats) {
status := func(sec uint64) {
fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s ETA %s", fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s ETA %s",
format_duration(sec), format_duration(d),
float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, float64(s.Bytes)/float64(targetStat.Bytes)*100,
format_bytes(bps), format_bytes(bps),
format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes), format_bytes(s.Bytes), format_bytes(targetStat.Bytes),
format_duration(eta)) format_seconds(eta))
} }
defer ticker.Stop() archiveProgress.D = func(s restic.Stat, d time.Duration, ticker bool) {
for { sec := uint64(d / time.Second)
select { fmt.Printf("\nduration: %s, %.2fMiB/s\n",
case s, ok := <-ch: format_duration(d),
if !ok { float64(targetStat.Bytes)/float64(sec)/(1<<20))
return
} }
stats.Files += s.Files
stats.Directories += s.Directories
stats.Other += s.Other
stats.Bytes += s.Bytes
status(uint64(time.Since(start) / time.Second))
case <-ticker.C:
sec := uint64(time.Since(start) / time.Second)
bps = stats.Bytes / sec
if bps > 0 {
eta = (arch.Stats.Bytes - stats.Bytes) / bps
} }
status(sec) arch, err := restic.NewArchiver(s, archiveProgress)
if err != nil {
fmt.Fprintf(os.Stderr, "err: %v\n", err)
} }
}
}(ch) arch.Error = func(dir string, fi os.FileInfo, err error) error {
// TODO: make ignoring errors configurable
fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n%v\n", dir, err, fi)
return nil
} }
_, id, err := arch.Snapshot(target, t, parentSnapshotID) _, id, err := arch.Snapshot(target, t, parentSnapshotID)
@ -177,23 +163,12 @@ func (cmd CmdBackup) Execute(args []string) error {
fmt.Fprintf(os.Stderr, "error: %v\n", err) fmt.Fprintf(os.Stderr, "error: %v\n", err)
} }
if terminal.IsTerminal(int(os.Stdout.Fd())) {
// close channels so that the goroutines terminate
close(arch.SaveStats)
close(arch.ScannerStats)
}
plen, err := s.PrefixLength(backend.Snapshot) plen, err := s.PrefixLength(backend.Snapshot)
if err != nil { if err != nil {
return err return err
} }
fmt.Printf("\nsnapshot %s saved\n", id[:plen]) fmt.Printf("snapshot %s saved\n", id[:plen])
sec := uint64(time.Since(start) / time.Second)
fmt.Printf("duration: %s, %.2fMiB/s\n",
format_duration(sec),
float64(arch.Stats.Bytes)/float64(sec)/(1<<20))
return nil return nil
} }

165
progress.go Normal file
View file

@ -0,0 +1,165 @@
package restic
import (
"sync"
"time"
)
type Progress struct {
F ProgressFunc
D ProgressFunc
fnM sync.Mutex
cur Stat
curM sync.Mutex
start time.Time
c *time.Ticker
cancel chan struct{}
o sync.Once
d time.Duration
running bool
}
type Stat struct {
Files uint64
Dirs uint64
Other uint64
Bytes uint64
}
type ProgressFunc func(s Stat, runtime time.Duration, ticker bool)
// NewProgress returns a new progress reporter. After Start() has been called,
// the function fn is called when new data arrives or at least every d
// interval. The function doneFn is called when Done() is called. Both
// functions F and D are called synchronously and can use shared state.
func NewProgress(d time.Duration) *Progress {
return &Progress{d: d}
}
// Start runs resets and runs the progress reporter.
func (p *Progress) Start() {
if p == nil {
return
}
if p.running {
panic("truing to reset a running Progress")
}
p.o = sync.Once{}
p.cancel = make(chan struct{})
p.running = true
p.Reset()
p.start = time.Now()
p.c = time.NewTicker(p.d)
go p.reporter()
}
// Report adds the statistics from s to the current state and tries to report
// the accumulated statistics via the feedback channel.
func (p *Progress) Report(s Stat) {
if p == nil {
return
}
if !p.running {
panic("reporting in a non-running Progress")
}
p.curM.Lock()
p.cur.Add(s)
cur := p.cur
p.curM.Unlock()
// update progress
if p.F != nil {
p.fnM.Lock()
p.F(cur, time.Since(p.start), false)
p.fnM.Unlock()
}
}
func (p *Progress) reporter() {
if p == nil {
return
}
for {
select {
case <-p.c.C:
p.curM.Lock()
cur := p.cur
p.curM.Unlock()
if p.F != nil {
p.fnM.Lock()
p.F(cur, time.Since(p.start), true)
p.fnM.Unlock()
}
case <-p.cancel:
p.c.Stop()
return
}
}
}
// Reset resets all statistic counters to zero.
func (p *Progress) Reset() {
if p == nil {
return
}
if !p.running {
panic("resetting a non-running Progress")
}
p.curM.Lock()
p.cur = Stat{}
p.curM.Unlock()
}
// Done closes the progress report.
func (p *Progress) Done() {
if p == nil {
return
}
if !p.running {
panic("Done() called on non-running Progress")
}
if p.running {
p.running = false
p.o.Do(func() {
close(p.cancel)
})
cur := p.cur
if p.D != nil {
p.fnM.Lock()
p.D(cur, time.Since(p.start), false)
p.fnM.Unlock()
}
}
}
// Current returns the current stat value.
func (p *Progress) Current() Stat {
p.curM.Lock()
s := p.cur
p.curM.Unlock()
return s
}
// Add accumulates other into s.
func (s *Stat) Add(other Stat) {
s.Bytes += other.Bytes
s.Dirs += other.Dirs
s.Files += other.Files
s.Other += other.Other
}

121
scanner.go Normal file
View file

@ -0,0 +1,121 @@
package restic
import (
"os"
"path/filepath"
"github.com/juju/arrar"
)
type FilterFunc func(item string, fi os.FileInfo) bool
type ErrorFunc func(dir string, fi os.FileInfo, err error) error
type Scanner struct {
Error ErrorFunc
Filter FilterFunc
p *Progress
}
func NewScanner(p *Progress) *Scanner {
sc := &Scanner{p: p}
// abort on all errors
sc.Error = func(s string, fi os.FileInfo, err error) error { return err }
// allow all files
sc.Filter = func(string, os.FileInfo) bool { return true }
return sc
}
func scan(filterFn FilterFunc, progress *Progress, dir string) (*Tree, error) {
var err error
// open and list path
fd, err := os.Open(dir)
defer fd.Close()
if err != nil {
return nil, err
}
entries, err := fd.Readdir(-1)
if err != nil {
return nil, err
}
// build new tree
tree := Tree{}
for _, entry := range entries {
path := filepath.Join(dir, entry.Name())
if !filterFn(path, entry) {
continue
}
node, err := NodeFromFileInfo(path, entry)
if err != nil {
// TODO: error processing
return nil, err
}
err = tree.Insert(node)
if err != nil {
return nil, err
}
if entry.IsDir() {
// save all errors in node.err, sort out later
node.Tree, node.err = scan(filterFn, progress, path)
}
}
for _, node := range tree {
if node.Type == "file" && node.Content != nil {
continue
}
switch node.Type {
case "file":
progress.Report(Stat{Files: 1, Bytes: node.Size})
case "dir":
progress.Report(Stat{Dirs: 1})
default:
progress.Report(Stat{Other: 1})
}
}
return &tree, nil
}
func (sc *Scanner) Scan(path string) (*Tree, error) {
sc.p.Start()
defer sc.p.Done()
fi, err := os.Lstat(path)
if err != nil {
return nil, arrar.Annotatef(err, "Lstat(%q)", path)
}
node, err := NodeFromFileInfo(path, fi)
if err != nil {
return nil, arrar.Annotate(err, "NodeFromFileInfo()")
}
if node.Type != "dir" {
t := &Tree{node}
sc.p.Report(Stat{Files: 1, Bytes: node.Size})
return t, nil
}
sc.p.Report(Stat{Dirs: 1})
node.Tree, err = scan(sc.Filter, sc.p, path)
if err != nil {
return nil, arrar.Annotate(err, "loadTree()")
}
return &Tree{node}, nil
}

View file

@ -40,6 +40,7 @@ type Node struct {
Tree *Tree `json:"-"` Tree *Tree `json:"-"`
path string path string
err error
} }
var ( var (