Refactor stats

* channel instead of callback func
 * cumulate Stats for slow receivers
This commit is contained in:
Alexander Neumann 2014-11-23 09:22:18 +01:00
parent 7d1ba8ab65
commit 21df0e50e5
2 changed files with 85 additions and 32 deletions

View file

@ -6,6 +6,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/fd0/khepri/backend" "github.com/fd0/khepri/backend"
"github.com/fd0/khepri/chunker" "github.com/fd0/khepri/chunker"
@ -15,6 +16,8 @@ import (
const ( const (
maxConcurrentFiles = 32 maxConcurrentFiles = 32
maxConcurrentBlobs = 32 maxConcurrentBlobs = 32
statTimeout = 20 * time.Millisecond
) )
type Archiver struct { type Archiver struct {
@ -32,10 +35,11 @@ type Archiver struct {
Error func(dir string, fi os.FileInfo, err error) error Error func(dir string, fi os.FileInfo, err error) error
Filter func(item string, fi os.FileInfo) bool Filter func(item string, fi os.FileInfo) bool
ScannerUpdate func(stats Stats) ScannerStats chan Stats
SaveUpdate func(stats Stats) SaveStats chan Stats
sum sync.Mutex // for SaveUpdate statsMutex sync.Mutex
updateStats Stats
} }
type Stats struct { type Stats struct {
@ -45,6 +49,13 @@ type Stats struct {
Bytes uint64 Bytes uint64
} }
func (s *Stats) Add(other Stats) {
s.Bytes += other.Bytes
s.Directories += other.Directories
s.Files += other.Files
s.Other += other.Other
}
func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
var err error var err error
arch := &Archiver{ arch := &Archiver{
@ -67,8 +78,6 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
arch.Error = func(string, os.FileInfo, error) error { return err } arch.Error = func(string, os.FileInfo, error) error { return err }
// allow all files // allow all files
arch.Filter = func(string, os.FileInfo) bool { return true } arch.Filter = func(string, os.FileInfo) bool { return true }
// do nothing
arch.ScannerUpdate = func(Stats) {}
arch.bl = NewBlobList() arch.bl = NewBlobList()
arch.ch, err = NewContentHandler(be, key) arch.ch, err = NewContentHandler(be, key)
@ -85,11 +94,31 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
return arch, nil return arch, nil
} }
func (arch *Archiver) saveUpdate(stats Stats) { func (arch *Archiver) update(ch chan Stats, stats Stats) {
if arch.SaveUpdate != nil { if ch == nil {
arch.sum.Lock() return
defer arch.sum.Unlock() }
arch.SaveUpdate(stats)
// load old stats from global state
arch.statsMutex.Lock()
stats.Add(arch.updateStats)
arch.updateStats = Stats{}
arch.statsMutex.Unlock()
// try to send stats through the channel, with a timeout
timeout := time.After(statTimeout)
select {
case ch <- stats:
break
case _ = <-timeout:
// save cumulated stats to global state
arch.statsMutex.Lock()
arch.updateStats.Add(stats)
arch.statsMutex.Unlock()
break
} }
} }
@ -140,7 +169,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
return err return err
} }
arch.saveUpdate(Stats{Bytes: blob.Size}) arch.update(arch.SaveStats, Stats{Bytes: blob.Size})
blobs = Blobs{blob} blobs = Blobs{blob}
} else { } else {
@ -169,7 +198,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
panic(err) panic(err)
} }
arch.saveUpdate(Stats{Bytes: blob.Size}) arch.update(arch.SaveStats, Stats{Bytes: blob.Size})
arch.blobToken <- token arch.blobToken <- token
ch <- blob ch <- blob
}(resCh) }(resCh)
@ -240,12 +269,15 @@ func (arch *Archiver) loadTree(dir string) (*Tree, error) {
} }
} }
arch.ScannerUpdate(arch.Stats) arch.update(arch.ScannerStats, arch.Stats)
return &tree, nil return &tree, nil
} }
func (arch *Archiver) LoadTree(path string) (*Tree, error) { func (arch *Archiver) LoadTree(path string) (*Tree, error) {
// reset global stats
arch.updateStats = Stats{}
fi, err := os.Lstat(path) fi, err := os.Lstat(path)
if err != nil { if err != nil {
return nil, arrar.Annotatef(err, "Lstat(%q)", path) return nil, arrar.Annotatef(err, "Lstat(%q)", path)
@ -259,7 +291,7 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) {
if node.Type != "dir" { if node.Type != "dir" {
arch.Stats.Files = 1 arch.Stats.Files = 1
arch.Stats.Bytes = node.Size arch.Stats.Bytes = node.Size
arch.ScannerUpdate(arch.Stats) arch.update(arch.ScannerStats, arch.Stats)
return &Tree{node}, nil return &Tree{node}, nil
} }
@ -269,7 +301,7 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) {
return nil, arrar.Annotate(err, "loadTree()") return nil, arrar.Annotate(err, "loadTree()")
} }
arch.ScannerUpdate(arch.Stats) arch.update(arch.ScannerStats, arch.Stats)
return &Tree{node}, nil return &Tree{node}, nil
} }
@ -284,7 +316,7 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
return Blob{}, err return Blob{}, err
} }
node.Subtree = b.ID node.Subtree = b.ID
arch.saveUpdate(Stats{Directories: 1}) arch.update(arch.SaveStats, Stats{Directories: 1})
} else if node.Type == "file" && len(node.Content) == 0 { } else if node.Type == "file" && len(node.Content) == 0 {
// start goroutine // start goroutine
wg.Add(1) wg.Add(1)
@ -299,10 +331,10 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
// TODO: handle error // TODO: handle error
arch.SaveFile(n) arch.SaveFile(n)
arch.saveUpdate(Stats{Files: 1}) arch.update(arch.SaveStats, Stats{Files: 1})
}(node) }(node)
} else { } else {
arch.saveUpdate(Stats{Other: 1}) arch.update(arch.SaveStats, Stats{Other: 1})
} }
} }
@ -317,6 +349,9 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
} }
func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) { func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) {
// reset global stats
arch.updateStats = Stats{}
sn := NewSnapshot(dir) sn := NewSnapshot(dir)
blob, err := arch.saveTree(t) blob, err := arch.saveTree(t)

View file

@ -60,11 +60,18 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error {
fmt.Printf("scanning %s\n", target) fmt.Printf("scanning %s\n", target)
if terminal.IsTerminal(int(os.Stdout.Fd())) { if terminal.IsTerminal(int(os.Stdout.Fd())) {
arch.ScannerUpdate = func(stats khepri.Stats) { ch := make(chan khepri.Stats, 5)
fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) arch.ScannerStats = ch
}
go func(ch <-chan khepri.Stats) {
for stats := range ch {
fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes))
}
}(ch)
} }
fmt.Printf("done\n")
// TODO: add filter // TODO: add filter
// arch.Filter = func(dir string, fi os.FileInfo) bool { // arch.Filter = func(dir string, fi os.FileInfo) bool {
// return true // return true
@ -80,18 +87,23 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error {
stats := khepri.Stats{} stats := khepri.Stats{}
if terminal.IsTerminal(int(os.Stdout.Fd())) { if terminal.IsTerminal(int(os.Stdout.Fd())) {
arch.SaveUpdate = func(s khepri.Stats) { ch := make(chan khepri.Stats, 5)
stats.Files += s.Files arch.SaveStats = ch
stats.Directories += s.Directories
stats.Other += s.Other
stats.Bytes += s.Bytes
fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s", go func(ch <-chan khepri.Stats) {
float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, for s := range ch {
stats.Directories, arch.Stats.Directories, stats.Files += s.Files
stats.Files, arch.Stats.Files, stats.Directories += s.Directories
format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes)) stats.Other += s.Other
} stats.Bytes += s.Bytes
fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s",
float64(stats.Bytes)/float64(arch.Stats.Bytes)*100,
stats.Directories, arch.Stats.Directories,
stats.Files, arch.Stats.Files,
format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes))
}
}(ch)
} }
start := time.Now() start := time.Now()
@ -100,6 +112,12 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error {
fmt.Fprintf(os.Stderr, "error: %v\n", err) fmt.Fprintf(os.Stderr, "error: %v\n", err)
} }
if terminal.IsTerminal(int(os.Stdout.Fd())) {
// close channels so that the goroutines terminate
close(arch.SaveStats)
close(arch.ScannerStats)
}
fmt.Printf("\nsnapshot %s saved: %v\n", id, sn) fmt.Printf("\nsnapshot %s saved: %v\n", id, sn)
duration := time.Now().Sub(start) duration := time.Now().Sub(start)
fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20)) fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20))