Refactor cache refresh for blobs, add progress

This commit is contained in:
Alexander Neumann 2015-03-22 14:41:51 +01:00
parent 702cf3c2ff
commit cfa2229bc0
4 changed files with 89 additions and 14 deletions

View file

@ -66,6 +66,11 @@ func NewArchiver(s Server) (*Archiver, error) {
return arch, nil
}
// Cache returns the current cache for the Archiver.
func (arch *Archiver) Cache() *Cache {
return arch.c
}
// Preload loads all blobs for all cached snapshots.
func (arch *Archiver) Preload() error {
// list snapshots first
@ -79,17 +84,10 @@ func (arch *Archiver) Preload() error {
m, err := arch.c.LoadMap(arch.s, id)
if err != nil {
debug.Log("Archiver.Preload", "blobs for snapshot %v not cached: %v", id.Str(), err)
// build new cache
m, err = CacheSnapshotBlobs(arch.s, arch.c, id)
if err != nil {
debug.Log("Archiver.Preload", "unable to cache snapshot blobs for %v: %v", id.Str(), err)
return err
}
continue
}
arch.m.Merge(m)
debug.Log("Archiver.Preload", "done loading cached blobs for snapshot %v", id.Str())
}

View file

@ -208,10 +208,45 @@ func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string,
// high-level functions
// CacheSnapshotBlobs creates a cache of all the blobs used within the
// RefreshSnapshots loads the maps for all snapshots and saves them to the local cache.
func (c *Cache) RefreshSnapshots(s Server, p *Progress) error {
defer p.Done()
// list snapshots first
snapshots, err := s.List(backend.Snapshot)
if err != nil {
return err
}
// check that snapshot blobs are cached
for _, id := range snapshots {
has, err := c.Has(backend.Snapshot, "blobs", id)
if err != nil {
return err
}
if has {
continue
}
// else start progress reporting
p.Start()
// build new cache
_, err = cacheSnapshotBlobs(p, s, c, id)
if err != nil {
debug.Log("Cache.RefreshSnapshots", "unable to cache snapshot blobs for %v: %v", id.Str(), err)
return err
}
}
return nil
}
// cacheSnapshotBlobs creates a cache of all the blobs used within the
// snapshot. It collects all blobs from all trees and saves the resulting map
// to the cache and returns the map.
func CacheSnapshotBlobs(s Server, c *Cache, id backend.ID) (*Map, error) {
func cacheSnapshotBlobs(p *Progress, s Server, c *Cache, id backend.ID) (*Map, error) {
debug.Log("CacheSnapshotBlobs", "create cache for snapshot %v", id.Str())
sn, err := LoadSnapshot(s, id)
@ -225,6 +260,8 @@ func CacheSnapshotBlobs(s Server, c *Cache, id backend.ID) (*Map, error) {
// add top-level node
m.Insert(sn.Tree)
p.Report(Stat{Trees: 1})
// start walker
var wg sync.WaitGroup
ch := make(chan WalkTreeJob)
@ -242,6 +279,7 @@ func CacheSnapshotBlobs(s Server, c *Cache, id backend.ID) (*Map, error) {
if job.Tree == nil {
continue
}
p.Report(Stat{Trees: 1})
debug.Log("CacheSnapshotBlobs", "got job %v", job)
m.Merge(job.Tree.Map)
}

View file

@ -36,8 +36,22 @@ func TestCache(t *testing.T) {
// remove cached blob list
ok(t, cache.Purge(backend.Snapshot, "blobs", id))
// load map from cache again, this should fail
rd, err = cache.Load(backend.Snapshot, "blobs", id)
assert(t, err != nil, "Expected failure did not occur")
// recreate cached blob list
m2, err := restic.CacheSnapshotBlobs(server, cache, id)
err = cache.RefreshSnapshots(server, nil)
ok(t, err)
// load map from cache again
rd, err = cache.Load(backend.Snapshot, "blobs", id)
ok(t, err)
dec = json.NewDecoder(rd)
m2 := &restic.Map{}
err = dec.Decode(m2)
ok(t, err)
// compare maps

View file

@ -75,6 +75,26 @@ func (cmd CmdBackup) Usage() string {
return "DIR/FILE [snapshot-ID]"
}
func newCacheRefreshProgress() *restic.Progress {
p := restic.NewProgress(time.Second)
p.OnStart = func() {
fmt.Printf("refreshing cache\n")
}
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return p
}
p.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\x1b[2K[%s] %d trees loaded\r", format_duration(d), s.Trees)
}
p.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\x1b[2Krefreshed cache in %s\n", format_duration(d))
}
return p
}
func newScanProgress() *restic.Progress {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return nil
@ -82,10 +102,10 @@ func newScanProgress() *restic.Progress {
p := restic.NewProgress(time.Second)
p.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\x1b[2K\r[%s] %d directories, %d files, %s", format_duration(d), s.Dirs, s.Files, format_bytes(s.Bytes))
fmt.Printf("\x1b[2K[%s] %d directories, %d files, %s\r", format_duration(d), s.Dirs, s.Files, format_bytes(s.Bytes))
}
p.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\nDone in %s\n", format_duration(d))
fmt.Printf("\x1b[2Kscanned %d directories, %d files in %s\n", s.Dirs, s.Files, format_duration(d))
}
return p
@ -134,7 +154,7 @@ func newArchiveProgress(todo restic.Stat) *restic.Progress {
}
}
fmt.Printf("\x1b[2K\r%s%s", status1, status2)
fmt.Printf("\x1b[2K%s%s\r", status1, status2)
}
archiveProgress.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
@ -196,6 +216,11 @@ func (cmd CmdBackup) Execute(args []string) error {
return nil
}
err = arch.Cache().RefreshSnapshots(s, newCacheRefreshProgress())
if err != nil {
return err
}
fmt.Printf("loading blobs\n")
err = arch.Preload()
if err != nil {