Merge branch 'refactor-archiver'

This commit is contained in:
Alexander Neumann 2015-01-04 23:25:38 +01:00
commit 6645cb04e6
12 changed files with 542 additions and 390 deletions

View file

@ -5,9 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"path/filepath"
"sync" "sync"
"time"
"github.com/juju/arrar" "github.com/juju/arrar"
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
@ -17,50 +15,28 @@ import (
const ( const (
maxConcurrentFiles = 8 maxConcurrentFiles = 8
maxConcurrentBlobs = 8 maxConcurrentBlobs = 8
statTimeout = 20 * time.Millisecond
) )
type Archiver struct { type Archiver struct {
s Server s Server
ch *ContentHandler ch *ContentHandler
bl *BlobList // blobs used for the current snapshot bl *BlobList // blobs used for the current snapshot
parentBl *BlobList // blobs from the parent snapshot
fileToken chan struct{} fileToken chan struct{}
blobToken chan struct{} blobToken chan struct{}
Stats Stats
Error func(dir string, fi os.FileInfo, err error) error Error func(dir string, fi os.FileInfo, err error) error
Filter func(item string, fi os.FileInfo) bool Filter func(item string, fi os.FileInfo) bool
ScannerStats chan Stats p *Progress
SaveStats chan Stats
statsMutex sync.Mutex
updateStats Stats
} }
type Stats struct { func NewArchiver(s Server, p *Progress) (*Archiver, error) {
Files int
Directories int
Other int
Bytes uint64
}
func (s *Stats) Add(other Stats) {
s.Bytes += other.Bytes
s.Directories += other.Directories
s.Files += other.Files
s.Other += other.Other
}
func NewArchiver(s Server) (*Archiver, error) {
var err error var err error
arch := &Archiver{ arch := &Archiver{
s: s, s: s,
p: p,
fileToken: make(chan struct{}, maxConcurrentFiles), fileToken: make(chan struct{}, maxConcurrentFiles),
blobToken: make(chan struct{}, maxConcurrentBlobs), blobToken: make(chan struct{}, maxConcurrentBlobs),
} }
@ -80,12 +56,10 @@ func NewArchiver(s Server) (*Archiver, error) {
arch.Filter = func(string, os.FileInfo) bool { return true } arch.Filter = func(string, os.FileInfo) bool { return true }
arch.bl = NewBlobList() arch.bl = NewBlobList()
arch.ch, err = NewContentHandler(s) arch.ch = NewContentHandler(s)
if err != nil {
return nil, err
}
// load all blobs from all snapshots // load all blobs from all snapshots
// TODO: only use bloblist from old snapshot if available
err = arch.ch.LoadAllMaps() err = arch.ch.LoadAllMaps()
if err != nil { if err != nil {
return nil, err return nil, err
@ -94,34 +68,6 @@ func NewArchiver(s Server) (*Archiver, error) {
return arch, nil return arch, nil
} }
func (arch *Archiver) update(ch chan Stats, stats Stats) {
if ch == nil {
return
}
// load old stats from global state
arch.statsMutex.Lock()
stats.Add(arch.updateStats)
arch.updateStats = Stats{}
arch.statsMutex.Unlock()
// try to send stats through the channel, with a timeout
timeout := time.After(statTimeout)
select {
case ch <- stats:
break
case _ = <-timeout:
// save cumulated stats to global state
arch.statsMutex.Lock()
arch.updateStats.Add(stats)
arch.statsMutex.Unlock()
break
}
}
func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) { func (arch *Archiver) Save(t backend.Type, data []byte) (Blob, error) {
blob, err := arch.ch.Save(t, data) blob, err := arch.ch.Save(t, data)
if err != nil { if err != nil {
@ -152,7 +98,28 @@ func (arch *Archiver) SaveFile(node *Node) error {
file, err := os.Open(node.path) file, err := os.Open(node.path)
defer file.Close() defer file.Close()
if err != nil { if err != nil {
return arrar.Annotate(err, "SaveFile()") return err
}
// check file again
fi, err := file.Stat()
if err != nil {
return err
}
if fi.ModTime() != node.ModTime {
e2 := arch.Error(node.path, fi, errors.New("file changed as we read it\n"))
if e2 == nil {
// create new node
n, err := NodeFromFileInfo(node.path, fi)
if err != nil {
return err
}
// copy node
*node = *n
}
} }
var blobs Blobs var blobs Blobs
@ -181,7 +148,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
return arrar.Annotate(err, "SaveFile() save chunk") return arrar.Annotate(err, "SaveFile() save chunk")
} }
arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) arch.p.Report(Stat{Bytes: blob.Size})
blobs = Blobs{blob} blobs = Blobs{blob}
} }
@ -221,7 +188,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
FreeChunkBuf("blob chunker", buf) FreeChunkBuf("blob chunker", buf)
arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) arch.p.Report(Stat{Bytes: blob.Size})
arch.blobToken <- token arch.blobToken <- token
ch <- blob ch <- blob
}(resCh) }(resCh)
@ -255,217 +222,17 @@ func (arch *Archiver) SaveFile(node *Node) error {
return nil return nil
} }
func (arch *Archiver) populateFromOldTree(tree, oldTree Tree) error {
// update content from old tree
err := tree.PopulateFrom(oldTree)
if err != nil {
return err
}
// add blobs to bloblist
for _, node := range tree {
if node.Content != nil {
for _, blobID := range node.Content {
blob, err := arch.parentBl.Find(Blob{ID: blobID})
if err != nil {
return err
}
arch.bl.Insert(blob)
}
}
}
return nil
}
func (arch *Archiver) loadTree(dir string, oldTreeID backend.ID) (*Tree, error) {
var (
oldTree Tree
err error
)
if oldTreeID != nil {
// load old tree
oldTree, err = LoadTree(arch.ch, oldTreeID)
if err != nil {
return nil, arrar.Annotate(err, "load old tree")
}
debug("old tree: %v\n", oldTree)
}
// open and list path
fd, err := os.Open(dir)
defer fd.Close()
if err != nil {
return nil, arch.Error(dir, nil, err)
}
entries, err := fd.Readdir(-1)
if err != nil {
return nil, err
}
// build new tree
tree := Tree{}
for _, entry := range entries {
path := filepath.Join(dir, entry.Name())
if !arch.Filter(path, entry) {
continue
}
node, err := NodeFromFileInfo(path, entry)
if err != nil {
// TODO: error processing
return nil, err
}
err = tree.Insert(node)
if err != nil {
return nil, err
}
if entry.IsDir() {
oldSubtree, err := oldTree.Find(node.Name)
if err != nil && err != ErrNodeNotFound {
return nil, err
}
var oldSubtreeID backend.ID
if err == nil {
oldSubtreeID = oldSubtree.Subtree
}
node.Tree, err = arch.loadTree(path, oldSubtreeID)
if err != nil {
return nil, err
}
}
}
// populate with content from oldTree
err = arch.populateFromOldTree(tree, oldTree)
if err != nil {
return nil, err
}
for _, node := range tree {
if node.Type == "file" && node.Content != nil {
continue
}
switch node.Type {
case "file":
arch.Stats.Files++
arch.Stats.Bytes += node.Size
case "dir":
arch.Stats.Directories++
default:
arch.Stats.Other++
}
}
arch.update(arch.ScannerStats, arch.Stats)
return &tree, nil
}
func (arch *Archiver) LoadTree(path string, parentSnapshot backend.ID) (*Tree, error) {
var oldTree Tree
if parentSnapshot != nil {
// load old tree from snapshot
snapshot, err := LoadSnapshot(arch.ch, parentSnapshot)
if err != nil {
return nil, arrar.Annotate(err, "load old snapshot")
}
if snapshot.Tree == nil {
return nil, errors.New("snapshot without tree!")
}
// load old bloblist from snapshot
arch.parentBl, err = LoadBlobList(arch.ch, snapshot.Map)
if err != nil {
return nil, err
}
oldTree, err = LoadTree(arch.ch, snapshot.Tree)
if err != nil {
return nil, arrar.Annotate(err, "load old tree")
}
debug("old tree: %v\n", oldTree)
}
// reset global stats
arch.updateStats = Stats{}
fi, err := os.Lstat(path)
if err != nil {
return nil, arrar.Annotatef(err, "Lstat(%q)", path)
}
node, err := NodeFromFileInfo(path, fi)
if err != nil {
return nil, arrar.Annotate(err, "NodeFromFileInfo()")
}
if node.Type != "dir" {
t := &Tree{node}
// populate with content from oldTree
err = arch.populateFromOldTree(*t, oldTree)
if err != nil {
return nil, err
}
// if no old node has been found, update stats
if node.Content == nil && node.Subtree == nil {
arch.Stats.Files = 1
arch.Stats.Bytes = node.Size
}
arch.update(arch.ScannerStats, arch.Stats)
return t, nil
}
arch.Stats.Directories = 1
var oldSubtreeID backend.ID
oldSubtree, err := oldTree.Find(node.Name)
if err != nil && err != ErrNodeNotFound {
return nil, arrar.Annotate(err, "search node in old tree")
}
if err == nil {
oldSubtreeID = oldSubtree.Subtree
}
node.Tree, err = arch.loadTree(path, oldSubtreeID)
if err != nil {
return nil, arrar.Annotate(err, "loadTree()")
}
arch.update(arch.ScannerStats, arch.Stats)
return &Tree{node}, nil
}
func (arch *Archiver) saveTree(t *Tree) (Blob, error) { func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
for _, node := range *t { for _, node := range *t {
if node.Tree != nil && node.Subtree == nil { if node.tree != nil && node.Subtree == nil {
b, err := arch.saveTree(node.Tree) b, err := arch.saveTree(node.tree)
if err != nil { if err != nil {
return Blob{}, err return Blob{}, err
} }
node.Subtree = b.ID node.Subtree = b.ID
arch.update(arch.SaveStats, Stats{Directories: 1}) arch.p.Report(Stat{Dirs: 1})
} else if node.Type == "file" && len(node.Content) == 0 { } else if node.Type == "file" && len(node.Content) == 0 {
// get token // get token
token := <-arch.fileToken token := <-arch.fileToken
@ -478,15 +245,9 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
arch.fileToken <- token arch.fileToken <- token
}() }()
// TODO: handle error node.err = arch.SaveFile(n)
err := arch.SaveFile(n) arch.p.Report(Stat{Files: 1})
if err != nil {
panic(err)
}
arch.update(arch.SaveStats, Stats{Files: 1})
}(node) }(node)
} else {
arch.update(arch.SaveStats, Stats{Other: 1})
} }
} }
@ -494,9 +255,19 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
// check for invalid file nodes // check for invalid file nodes
for _, node := range *t { for _, node := range *t {
if node.Type == "file" && node.Content == nil { if node.Type == "file" && node.Content == nil && node.err == nil {
return Blob{}, fmt.Errorf("node %v has empty content", node.Name) return Blob{}, fmt.Errorf("node %v has empty content", node.Name)
} }
if node.err != nil {
err := arch.Error(node.path, nil, node.err)
if err != nil {
return Blob{}, err
}
// save error message in node
node.Error = node.err.Error()
}
} }
blob, err := arch.SaveJSON(backend.Tree, t) blob, err := arch.SaveJSON(backend.Tree, t)
@ -508,8 +279,8 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
} }
func (arch *Archiver) Snapshot(dir string, t *Tree, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { func (arch *Archiver) Snapshot(dir string, t *Tree, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) {
// reset global stats arch.p.Start()
arch.updateStats = Stats{} defer arch.p.Done()
sn, err := NewSnapshot(dir) sn, err := NewSnapshot(dir)
if err != nil { if err != nil {

View file

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"path/filepath"
"strings" "strings"
"time" "time"
@ -40,7 +41,7 @@ func format_bytes(c uint64) string {
} }
} }
func format_duration(sec uint64) string { func format_seconds(sec uint64) string {
hours := sec / 3600 hours := sec / 3600
sec -= hours * 3600 sec -= hours * 3600
min := sec / 60 min := sec / 60
@ -52,11 +53,16 @@ func format_duration(sec uint64) string {
return fmt.Sprintf("%d:%02d", min, sec) return fmt.Sprintf("%d:%02d", min, sec)
} }
func format_duration(d time.Duration) string {
sec := uint64(d / time.Second)
return format_seconds(sec)
}
func print_tree2(indent int, t *restic.Tree) { func print_tree2(indent int, t *restic.Tree) {
for _, node := range *t { for _, node := range *t {
if node.Tree != nil { if node.Tree() != nil {
fmt.Printf("%s%s/\n", strings.Repeat(" ", indent), node.Name) fmt.Printf("%s%s/\n", strings.Repeat(" ", indent), node.Name)
print_tree2(indent+1, node.Tree) print_tree2(indent+1, node.Tree())
} else { } else {
fmt.Printf("%s%s\n", strings.Repeat(" ", indent), node.Name) fmt.Printf("%s%s\n", strings.Repeat(" ", indent), node.Name)
} }
@ -89,27 +95,16 @@ func (cmd CmdBackup) Execute(args []string) error {
fmt.Printf("found parent snapshot %v\n", parentSnapshotID) fmt.Printf("found parent snapshot %v\n", parentSnapshotID)
} }
arch, err := restic.NewArchiver(s) fmt.Printf("scan %s\n", target)
if err != nil {
fmt.Fprintf(os.Stderr, "err: %v\n", err)
}
arch.Error = func(dir string, fi os.FileInfo, err error) error {
// TODO: make ignoring errors configurable
fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n%v\n", dir, err, fi)
return nil
}
fmt.Printf("scanning %s\n", target)
scanProgress := restic.NewProgress(time.Second)
if terminal.IsTerminal(int(os.Stdout.Fd())) { if terminal.IsTerminal(int(os.Stdout.Fd())) {
ch := make(chan restic.Stats, 20) scanProgress.F = func(s restic.Stat, d time.Duration, ticker bool) {
arch.ScannerStats = ch fmt.Printf("\x1b[2K\r[%s] %d directories, %d files, %s", format_duration(d), s.Dirs, s.Files, format_bytes(s.Bytes))
}
go func(ch <-chan restic.Stats) { scanProgress.D = func(s restic.Stat, d time.Duration, ticker bool) {
for stats := range ch { fmt.Printf("\nDone in %s\n", format_duration(d))
fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) }
}
}(ch)
} }
// TODO: add filter // TODO: add filter
@ -117,83 +112,86 @@ func (cmd CmdBackup) Execute(args []string) error {
// return true // return true
// } // }
t, err := arch.LoadTree(target, parentSnapshotID) sc := restic.NewScanner(scanProgress)
newTree, err := sc.Scan(target)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err) fmt.Fprintf(os.Stderr, "error: %v\n", err)
return err return err
} }
fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes)) if parentSnapshotID != nil {
fmt.Printf("load old snapshot\n")
ch := restic.NewContentHandler(s)
sn, err := ch.LoadSnapshot(parentSnapshotID)
if err != nil {
return err
}
oldTree, err := restic.LoadTreeRecursive(filepath.Dir(sn.Dir), ch, sn.Tree)
if err != nil {
return err
}
newTree.CopyFrom(oldTree)
}
archiveProgress := restic.NewProgress(time.Second)
targetStat := newTree.StatTodo()
stats := restic.Stats{}
start := time.Now()
if terminal.IsTerminal(int(os.Stdout.Fd())) { if terminal.IsTerminal(int(os.Stdout.Fd())) {
ch := make(chan restic.Stats, 20) var bps, eta uint64
arch.SaveStats = ch itemsTodo := targetStat.Files + targetStat.Dirs
ticker := time.NewTicker(time.Second) archiveProgress.F = func(s restic.Stat, d time.Duration, ticker bool) {
var eta, bps uint64 sec := uint64(d / time.Second)
if targetStat.Bytes > 0 && sec > 0 && ticker {
go func(ch <-chan restic.Stats) { bps = s.Bytes / sec
if bps > 0 {
status := func(sec uint64) { eta = (targetStat.Bytes - s.Bytes) / bps
fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s ETA %s",
format_duration(sec),
float64(stats.Bytes)/float64(arch.Stats.Bytes)*100,
format_bytes(bps),
format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes),
format_duration(eta))
}
defer ticker.Stop()
for {
select {
case s, ok := <-ch:
if !ok {
return
}
stats.Files += s.Files
stats.Directories += s.Directories
stats.Other += s.Other
stats.Bytes += s.Bytes
status(uint64(time.Since(start) / time.Second))
case <-ticker.C:
sec := uint64(time.Since(start) / time.Second)
bps = stats.Bytes / sec
if bps > 0 {
eta = (arch.Stats.Bytes - stats.Bytes) / bps
}
status(sec)
} }
} }
}(ch)
itemsDone := s.Files + s.Dirs
fmt.Printf("\x1b[2K\r[%s] %3.2f%% %s/s %s / %s %d / %d items ETA %s",
format_duration(d),
float64(s.Bytes)/float64(targetStat.Bytes)*100,
format_bytes(bps),
format_bytes(s.Bytes), format_bytes(targetStat.Bytes),
itemsDone, itemsTodo,
format_seconds(eta))
}
archiveProgress.D = func(s restic.Stat, d time.Duration, ticker bool) {
sec := uint64(d / time.Second)
fmt.Printf("\nduration: %s, %.2fMiB/s\n",
format_duration(d),
float64(targetStat.Bytes)/float64(sec)/(1<<20))
}
} }
_, id, err := arch.Snapshot(target, t, parentSnapshotID) arch, err := restic.NewArchiver(s, archiveProgress)
if err != nil {
fmt.Fprintf(os.Stderr, "err: %v\n", err)
}
arch.Error = func(dir string, fi os.FileInfo, err error) error {
// TODO: make ignoring errors configurable
fmt.Fprintf(os.Stderr, "\nerror for %s: %v\n", dir, err)
return nil
}
_, id, err := arch.Snapshot(target, newTree, parentSnapshotID)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err) fmt.Fprintf(os.Stderr, "error: %v\n", err)
} }
if terminal.IsTerminal(int(os.Stdout.Fd())) {
// close channels so that the goroutines terminate
close(arch.SaveStats)
close(arch.ScannerStats)
}
plen, err := s.PrefixLength(backend.Snapshot) plen, err := s.PrefixLength(backend.Snapshot)
if err != nil { if err != nil {
return err return err
} }
fmt.Printf("\nsnapshot %s saved\n", id[:plen]) fmt.Printf("snapshot %s saved\n", id[:plen])
sec := uint64(time.Since(start) / time.Second)
fmt.Printf("duration: %s, %.2fMiB/s\n",
format_duration(sec),
float64(arch.Stats.Bytes)/float64(sec)/(1<<20))
return nil return nil
} }

View file

@ -53,10 +53,7 @@ func (cmd CmdCat) Execute(args []string) error {
} }
} }
ch, err := restic.NewContentHandler(s) ch := restic.NewContentHandler(s)
if err != nil {
return err
}
switch tpe { switch tpe {
case "blob": case "blob":

View file

@ -113,11 +113,7 @@ func (c CmdFind) findInTree(ch *restic.ContentHandler, id backend.ID, path strin
func (c CmdFind) findInSnapshot(s restic.Server, id backend.ID) error { func (c CmdFind) findInSnapshot(s restic.Server, id backend.ID) error {
debug("searching in snapshot %s\n for entries within [%s %s]", id, c.oldest, c.newest) debug("searching in snapshot %s\n for entries within [%s %s]", id, c.oldest, c.newest)
ch, err := restic.NewContentHandler(s) ch := restic.NewContentHandler(s)
if err != nil {
return err
}
sn, err := ch.LoadSnapshot(id) sn, err := ch.LoadSnapshot(id)
if err != nil { if err != nil {
return err return err

View file

@ -99,7 +99,7 @@ func fsckTree(opts CmdFsck, ch *restic.ContentHandler, id backend.ID) error {
switch node.Type { switch node.Type {
case "file": case "file":
if node.Content == nil { if node.Content == nil && node.Error == "" {
return fmt.Errorf("file node %q of tree %v has no content", node.Name, id) return fmt.Errorf("file node %q of tree %v has no content", node.Name, id)
} }
@ -125,10 +125,7 @@ func fsckTree(opts CmdFsck, ch *restic.ContentHandler, id backend.ID) error {
func fsck_snapshot(opts CmdFsck, s restic.Server, id backend.ID) error { func fsck_snapshot(opts CmdFsck, s restic.Server, id backend.ID) error {
debug("checking snapshot %v\n", id) debug("checking snapshot %v\n", id)
ch, err := restic.NewContentHandler(s) ch := restic.NewContentHandler(s)
if err != nil {
return err
}
sn, err := ch.LoadSnapshot(id) sn, err := ch.LoadSnapshot(id)
if err != nil { if err != nil {

View file

@ -78,7 +78,7 @@ func (cmd CmdLs) Execute(args []string) error {
return err return err
} }
ch, err := restic.NewContentHandler(s) ch := restic.NewContentHandler(s)
if err != nil { if err != nil {
return err return err
} }

View file

@ -97,10 +97,7 @@ func (cmd CmdSnapshots) Execute(args []string) error {
return err return err
} }
ch, err := restic.NewContentHandler(s) ch := restic.NewContentHandler(s)
if err != nil {
return err
}
tab := NewTable() tab := NewTable()
tab.Header = fmt.Sprintf("%-8s %-19s %-10s %s", "ID", "Date", "Source", "Directory") tab.Header = fmt.Sprintf("%-8s %-19s %-10s %s", "ID", "Date", "Source", "Directory")

View file

@ -17,13 +17,13 @@ type ContentHandler struct {
} }
// NewContentHandler creates a new content handler. // NewContentHandler creates a new content handler.
func NewContentHandler(s Server) (*ContentHandler, error) { func NewContentHandler(s Server) *ContentHandler {
ch := &ContentHandler{ ch := &ContentHandler{
s: s, s: s,
bl: NewBlobList(), bl: NewBlobList(),
} }
return ch, nil return ch
} }
// LoadSnapshot adds all blobs from a snapshot into the content handler and returns the snapshot. // LoadSnapshot adds all blobs from a snapshot into the content handler and returns the snapshot.
@ -243,3 +243,8 @@ func (ch *ContentHandler) Test(t backend.Type, id backend.ID) (bool, error) {
return ch.s.Test(t, id) return ch.s.Test(t, id)
} }
// BlobList returns the current BlobList.
func (ch *ContentHandler) BlobList() *BlobList {
return ch.bl
}

185
progress.go Normal file
View file

@ -0,0 +1,185 @@
package restic
import (
"fmt"
"sync"
"time"
)
type Progress struct {
F ProgressFunc
D ProgressFunc
fnM sync.Mutex
cur Stat
curM sync.Mutex
start time.Time
c *time.Ticker
cancel chan struct{}
o sync.Once
d time.Duration
running bool
}
type Stat struct {
Files uint64
Dirs uint64
Bytes uint64
}
type ProgressFunc func(s Stat, runtime time.Duration, ticker bool)
// NewProgress returns a new progress reporter. After Start() has been called,
// the function fn is called when new data arrives or at least every d
// interval. The function doneFn is called when Done() is called. Both
// functions F and D are called synchronously and can use shared state.
func NewProgress(d time.Duration) *Progress {
return &Progress{d: d}
}
// Start runs resets and runs the progress reporter.
func (p *Progress) Start() {
if p == nil {
return
}
if p.running {
panic("truing to reset a running Progress")
}
p.o = sync.Once{}
p.cancel = make(chan struct{})
p.running = true
p.Reset()
p.start = time.Now()
p.c = time.NewTicker(p.d)
go p.reporter()
}
// Report adds the statistics from s to the current state and tries to report
// the accumulated statistics via the feedback channel.
func (p *Progress) Report(s Stat) {
if p == nil {
return
}
if !p.running {
panic("reporting in a non-running Progress")
}
p.curM.Lock()
p.cur.Add(s)
cur := p.cur
p.curM.Unlock()
// update progress
if p.F != nil {
p.fnM.Lock()
p.F(cur, time.Since(p.start), false)
p.fnM.Unlock()
}
}
func (p *Progress) reporter() {
if p == nil {
return
}
for {
select {
case <-p.c.C:
p.curM.Lock()
cur := p.cur
p.curM.Unlock()
if p.F != nil {
p.fnM.Lock()
p.F(cur, time.Since(p.start), true)
p.fnM.Unlock()
}
case <-p.cancel:
p.c.Stop()
return
}
}
}
// Reset resets all statistic counters to zero.
func (p *Progress) Reset() {
if p == nil {
return
}
if !p.running {
panic("resetting a non-running Progress")
}
p.curM.Lock()
p.cur = Stat{}
p.curM.Unlock()
}
// Done closes the progress report.
func (p *Progress) Done() {
if p == nil {
return
}
if !p.running {
panic("Done() called on non-running Progress")
}
if p.running {
p.running = false
p.o.Do(func() {
close(p.cancel)
})
cur := p.cur
if p.D != nil {
p.fnM.Lock()
p.D(cur, time.Since(p.start), false)
p.fnM.Unlock()
}
}
}
// Current returns the current stat value.
func (p *Progress) Current() Stat {
p.curM.Lock()
s := p.cur
p.curM.Unlock()
return s
}
// Add accumulates other into s.
func (s *Stat) Add(other Stat) {
s.Bytes += other.Bytes
s.Dirs += other.Dirs
s.Files += other.Files
}
func (s Stat) String() string {
b := float64(s.Bytes)
var str string
switch {
case s.Bytes > 1<<40:
str = fmt.Sprintf("%.3f TiB", b/(1<<40))
case s.Bytes > 1<<30:
str = fmt.Sprintf("%.3f GiB", b/(1<<30))
case s.Bytes > 1<<20:
str = fmt.Sprintf("%.3f MiB", b/(1<<20))
case s.Bytes > 1<<10:
str = fmt.Sprintf("%.3f KiB", b/(1<<10))
default:
str = fmt.Sprintf("%dB", s.Bytes)
}
return fmt.Sprintf("Stat(%d files, %d dirs, %v)",
s.Files, s.Dirs, str)
}

View file

@ -25,10 +25,7 @@ func NewRestorer(s Server, snid backend.ID) (*Restorer, error) {
r := &Restorer{s: s} r := &Restorer{s: s}
var err error var err error
r.ch, err = NewContentHandler(s) r.ch = NewContentHandler(s)
if err != nil {
return nil, arrar.Annotate(err, "create contenthandler for restorer")
}
r.sn, err = r.ch.LoadSnapshot(snid) r.sn, err = r.ch.LoadSnapshot(snid)
if err != nil { if err != nil {

119
scanner.go Normal file
View file

@ -0,0 +1,119 @@
package restic
import (
"os"
"path/filepath"
"github.com/juju/arrar"
)
type FilterFunc func(item string, fi os.FileInfo) bool
type ErrorFunc func(dir string, fi os.FileInfo, err error) error
type Scanner struct {
Error ErrorFunc
Filter FilterFunc
p *Progress
}
func NewScanner(p *Progress) *Scanner {
sc := &Scanner{p: p}
// abort on all errors
sc.Error = func(s string, fi os.FileInfo, err error) error { return err }
// allow all files
sc.Filter = func(string, os.FileInfo) bool { return true }
return sc
}
func scan(filterFn FilterFunc, progress *Progress, dir string) (*Tree, error) {
var err error
// open and list path
fd, err := os.Open(dir)
defer fd.Close()
if err != nil {
return nil, err
}
entries, err := fd.Readdir(-1)
if err != nil {
return nil, err
}
// build new tree
tree := Tree{}
for _, entry := range entries {
path := filepath.Join(dir, entry.Name())
if !filterFn(path, entry) {
continue
}
node, err := NodeFromFileInfo(path, entry)
if err != nil {
// TODO: error processing
return nil, err
}
err = tree.Insert(node)
if err != nil {
return nil, err
}
if entry.IsDir() {
// save all errors in node.err, sort out later
node.tree, node.err = scan(filterFn, progress, path)
}
}
for _, node := range tree {
if node.Type == "file" && node.Content != nil {
continue
}
switch node.Type {
case "file":
progress.Report(Stat{Files: 1, Bytes: node.Size})
case "dir":
progress.Report(Stat{Dirs: 1})
}
}
return &tree, nil
}
func (sc *Scanner) Scan(path string) (*Tree, error) {
sc.p.Start()
defer sc.p.Done()
fi, err := os.Lstat(path)
if err != nil {
return nil, arrar.Annotatef(err, "Lstat(%q)", path)
}
node, err := NodeFromFileInfo(path, fi)
if err != nil {
return nil, arrar.Annotate(err, "NodeFromFileInfo()")
}
if node.Type != "dir" {
t := &Tree{node}
sc.p.Report(Stat{Files: 1, Bytes: node.Size})
return t, nil
}
sc.p.Report(Stat{Dirs: 1})
node.tree, err = scan(sc.Filter, sc.p, path)
if err != nil {
return nil, arrar.Annotate(err, "loadTree()")
}
return &Tree{node}, nil
}

110
tree.go
View file

@ -6,6 +6,8 @@ import (
"fmt" "fmt"
"os" "os"
"os/user" "os/user"
"path/filepath"
"reflect"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -37,9 +39,12 @@ type Node struct {
Content []backend.ID `json:"content"` Content []backend.ID `json:"content"`
Subtree backend.ID `json:"subtree,omitempty"` Subtree backend.ID `json:"subtree,omitempty"`
Tree *Tree `json:"-"` Error string `json:"error,omitempty"`
tree *Tree
path string path string
err error
} }
var ( var (
@ -91,11 +96,34 @@ func LoadTree(ch *ContentHandler, id backend.ID) (Tree, error) {
return tree, nil return tree, nil
} }
// PopulateFrom copies subtrees and content from other when it hasn't changed. // LoadTreeRecursive loads the tree and all subtrees via ch.
func (t Tree) PopulateFrom(other Tree) error { func LoadTreeRecursive(path string, ch *ContentHandler, id backend.ID) (Tree, error) {
// TODO: load subtrees in parallel
tree, err := LoadTree(ch, id)
if err != nil {
return nil, err
}
for _, n := range tree {
n.path = filepath.Join(path, n.Name)
if n.Type == "dir" && n.Subtree != nil {
t, err := LoadTreeRecursive(n.path, ch, n.Subtree)
if err != nil {
return nil, err
}
n.tree = &t
}
}
return tree, nil
}
// CopyFrom recursively copies all content from other to t.
func (t Tree) CopyFrom(other Tree) {
for _, node := range t { for _, node := range t {
// only copy entries for files // only process files and dirs
if node.Type != "file" { if node.Type != "file" && node.Type != "dir" {
continue continue
} }
@ -107,14 +135,32 @@ func (t Tree) PopulateFrom(other Tree) error {
continue continue
} }
// compare content if node.Type == "file" {
if node.SameContent(oldNode) { // compare content
// copy Content if node.SameContent(oldNode) {
node.Content = oldNode.Content // copy Content
node.Content = oldNode.Content
}
} else {
// fill in all subtrees from old subtree
node.tree.CopyFrom(*oldNode.tree)
// check if tree has changed
if node.tree.Equals(*oldNode.tree) {
// if nothing has changed, copy subtree ID
node.Subtree = oldNode.Subtree
}
} }
} }
}
return nil // Equals returns true if t and other have exactly the same nodes.
func (t Tree) Equals(other Tree) bool {
if len(t) != len(other) {
return false
}
return reflect.DeepEqual(t, other)
} }
func (t *Tree) Insert(node *Node) error { func (t *Tree) Insert(node *Node) error {
@ -150,6 +196,50 @@ func (t Tree) Find(name string) (*Node, error) {
return node, err return node, err
} }
func (t Tree) Stat() Stat {
s := Stat{}
for _, n := range t {
switch n.Type {
case "file":
s.Files++
s.Bytes += n.Size
case "dir":
s.Dirs++
if n.tree != nil {
s.Add(n.tree.Stat())
}
}
}
return s
}
func (t Tree) StatTodo() Stat {
s := Stat{}
for _, n := range t {
switch n.Type {
case "file":
if n.Content == nil {
s.Files++
s.Bytes += n.Size
}
case "dir":
if n.Subtree == nil {
s.Dirs++
if n.tree != nil {
s.Add(n.tree.StatTodo())
}
}
}
}
return s
}
func (node Node) Tree() *Tree {
return node.tree
}
func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) { func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) {
stat, ok := fi.Sys().(*syscall.Stat_t) stat, ok := fi.Sys().(*syscall.Stat_t)
if !ok { if !ok {