From 21df0e50e5320b7c6c3d4bcc1f8f4f4aad77687c Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 23 Nov 2014 09:22:18 +0100 Subject: [PATCH] Refactor stats * channel instead of callback func * cumulate Stats for slow receivers --- archiver.go | 71 ++++++++++++++++++++++++++++++---------- cmd/khepri/cmd_backup.go | 46 ++++++++++++++++++-------- 2 files changed, 85 insertions(+), 32 deletions(-) diff --git a/archiver.go b/archiver.go index 1ce7180bc..0bd35b8e2 100644 --- a/archiver.go +++ b/archiver.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/fd0/khepri/backend" "github.com/fd0/khepri/chunker" @@ -15,6 +16,8 @@ import ( const ( maxConcurrentFiles = 32 maxConcurrentBlobs = 32 + + statTimeout = 20 * time.Millisecond ) type Archiver struct { @@ -32,10 +35,11 @@ type Archiver struct { Error func(dir string, fi os.FileInfo, err error) error Filter func(item string, fi os.FileInfo) bool - ScannerUpdate func(stats Stats) - SaveUpdate func(stats Stats) + ScannerStats chan Stats + SaveStats chan Stats - sum sync.Mutex // for SaveUpdate + statsMutex sync.Mutex + updateStats Stats } type Stats struct { @@ -45,6 +49,13 @@ type Stats struct { Bytes uint64 } +func (s *Stats) Add(other Stats) { + s.Bytes += other.Bytes + s.Directories += other.Directories + s.Files += other.Files + s.Other += other.Other +} + func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { var err error arch := &Archiver{ @@ -67,8 +78,6 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { arch.Error = func(string, os.FileInfo, error) error { return err } // allow all files arch.Filter = func(string, os.FileInfo) bool { return true } - // do nothing - arch.ScannerUpdate = func(Stats) {} arch.bl = NewBlobList() arch.ch, err = NewContentHandler(be, key) @@ -85,11 +94,31 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { return arch, nil } -func (arch *Archiver) saveUpdate(stats Stats) { - if arch.SaveUpdate != nil { - arch.sum.Lock() - defer arch.sum.Unlock() - arch.SaveUpdate(stats) +func (arch *Archiver) update(ch chan Stats, stats Stats) { + if ch == nil { + return + } + + // load old stats from global state + arch.statsMutex.Lock() + stats.Add(arch.updateStats) + arch.updateStats = Stats{} + arch.statsMutex.Unlock() + + // try to send stats through the channel, with a timeout + timeout := time.After(statTimeout) + + select { + case ch <- stats: + break + case _ = <-timeout: + + // save cumulated stats to global state + arch.statsMutex.Lock() + arch.updateStats.Add(stats) + arch.statsMutex.Unlock() + + break } } @@ -140,7 +169,7 @@ func (arch *Archiver) SaveFile(node *Node) error { return err } - arch.saveUpdate(Stats{Bytes: blob.Size}) + arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) blobs = Blobs{blob} } else { @@ -169,7 +198,7 @@ func (arch *Archiver) SaveFile(node *Node) error { panic(err) } - arch.saveUpdate(Stats{Bytes: blob.Size}) + arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) arch.blobToken <- token ch <- blob }(resCh) @@ -240,12 +269,15 @@ func (arch *Archiver) loadTree(dir string) (*Tree, error) { } } - arch.ScannerUpdate(arch.Stats) + arch.update(arch.ScannerStats, arch.Stats) return &tree, nil } func (arch *Archiver) LoadTree(path string) (*Tree, error) { + // reset global stats + arch.updateStats = Stats{} + fi, err := os.Lstat(path) if err != nil { return nil, arrar.Annotatef(err, "Lstat(%q)", path) @@ -259,7 +291,7 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) { if node.Type != "dir" { arch.Stats.Files = 1 arch.Stats.Bytes = node.Size - arch.ScannerUpdate(arch.Stats) + arch.update(arch.ScannerStats, arch.Stats) return &Tree{node}, nil } @@ -269,7 +301,7 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) { return nil, arrar.Annotate(err, "loadTree()") } - arch.ScannerUpdate(arch.Stats) + arch.update(arch.ScannerStats, arch.Stats) return &Tree{node}, nil } @@ -284,7 +316,7 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { return Blob{}, err } node.Subtree = b.ID - arch.saveUpdate(Stats{Directories: 1}) + arch.update(arch.SaveStats, Stats{Directories: 1}) } else if node.Type == "file" && len(node.Content) == 0 { // start goroutine wg.Add(1) @@ -299,10 +331,10 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { // TODO: handle error arch.SaveFile(n) - arch.saveUpdate(Stats{Files: 1}) + arch.update(arch.SaveStats, Stats{Files: 1}) }(node) } else { - arch.saveUpdate(Stats{Other: 1}) + arch.update(arch.SaveStats, Stats{Other: 1}) } } @@ -317,6 +349,9 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { } func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) { + // reset global stats + arch.updateStats = Stats{} + sn := NewSnapshot(dir) blob, err := arch.saveTree(t) diff --git a/cmd/khepri/cmd_backup.go b/cmd/khepri/cmd_backup.go index 8ede5da32..82cab39aa 100644 --- a/cmd/khepri/cmd_backup.go +++ b/cmd/khepri/cmd_backup.go @@ -60,11 +60,18 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { fmt.Printf("scanning %s\n", target) if terminal.IsTerminal(int(os.Stdout.Fd())) { - arch.ScannerUpdate = func(stats khepri.Stats) { - fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) - } + ch := make(chan khepri.Stats, 5) + arch.ScannerStats = ch + + go func(ch <-chan khepri.Stats) { + for stats := range ch { + fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) + } + }(ch) } + fmt.Printf("done\n") + // TODO: add filter // arch.Filter = func(dir string, fi os.FileInfo) bool { // return true @@ -80,18 +87,23 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { stats := khepri.Stats{} if terminal.IsTerminal(int(os.Stdout.Fd())) { - arch.SaveUpdate = func(s khepri.Stats) { - stats.Files += s.Files - stats.Directories += s.Directories - stats.Other += s.Other - stats.Bytes += s.Bytes + ch := make(chan khepri.Stats, 5) + arch.SaveStats = ch - fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s", - float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, - stats.Directories, arch.Stats.Directories, - stats.Files, arch.Stats.Files, - format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes)) - } + go func(ch <-chan khepri.Stats) { + for s := range ch { + stats.Files += s.Files + stats.Directories += s.Directories + stats.Other += s.Other + stats.Bytes += s.Bytes + + fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s", + float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, + stats.Directories, arch.Stats.Directories, + stats.Files, arch.Stats.Files, + format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes)) + } + }(ch) } start := time.Now() @@ -100,6 +112,12 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error { fmt.Fprintf(os.Stderr, "error: %v\n", err) } + if terminal.IsTerminal(int(os.Stdout.Fd())) { + // close channels so that the goroutines terminate + close(arch.SaveStats) + close(arch.ScannerStats) + } + fmt.Printf("\nsnapshot %s saved: %v\n", id, sn) duration := time.Now().Sub(start) fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20))