Cache blobs for each snapshot

This commit is contained in:
Alexander Neumann 2015-03-09 22:26:39 +01:00
parent b2cce7f051
commit 14298fe232
6 changed files with 98 additions and 136 deletions

View file

@ -29,6 +29,7 @@ const (
type Archiver struct {
s Server
m *Map
c *Cache
blobToken chan struct{}
@ -51,6 +52,12 @@ func NewArchiver(s Server) (*Archiver, error) {
// create new map to store all blobs in
arch.m = NewMap()
// init cache
arch.c, err = NewCache()
if err != nil {
return nil, err
}
// abort on all errors
arch.Error = func(string, os.FileInfo, error) error { return err }
// allow all files
@ -59,81 +66,40 @@ func NewArchiver(s Server) (*Archiver, error) {
return arch, nil
}
// Preload loads all tree objects from repository and adds all blobs that are
// still available to the map for deduplication.
func (arch *Archiver) Preload(p *Progress) error {
cache, err := NewCache()
// Preload loads all blobs for all cached snapshots.
func (arch *Archiver) Preload() error {
// list snapshots first
snapshots, err := arch.s.List(backend.Snapshot)
if err != nil {
return err
}
p.Start()
defer p.Done()
debug.Log("Archiver.Preload", "Start loading known blobs")
// load all trees, in parallel
worker := func(wg *sync.WaitGroup, c <-chan backend.ID) {
for id := range c {
var tree *Tree
// load from cache
var t Tree
rd, err := cache.Load(backend.Tree, id)
if err == nil {
debug.Log("Archiver.Preload", "tree %v cached", id.Str())
tree = &t
dec := json.NewDecoder(rd)
err = dec.Decode(&t)
if err != nil {
continue
}
} else {
debug.Log("Archiver.Preload", "tree %v not cached: %v", id.Str(), err)
tree, err = LoadTree(arch.s, id)
// ignore error and advance to next tree
if err != nil {
continue
}
}
debug.Log("Archiver.Preload", "load tree %v with %d blobs", id, tree.Map.Len())
arch.m.Merge(tree.Map)
p.Report(Stat{Trees: 1, Blobs: uint64(tree.Map.Len())})
// TODO: track seen tree ids, load trees that aren't in the set
for _, id := range snapshots {
// try to load snapshot blobs from cache
rd, err := arch.c.Load(backend.Snapshot, "blobs", id)
if err != nil {
debug.Log("Archiver.Preload", "blobs for snapshot %v not cached: %v", id.Str(), err)
return err
}
wg.Done()
debug.Log("Archiver.Preload", "load cached blobs for snapshot %v", id.Str())
dec := json.NewDecoder(rd)
m := &Map{}
err = dec.Decode(m)
if err != nil {
debug.Log("Archiver.Preload", "error loading cached blobs for snapshot %v: %v", id.Str(), err)
continue
}
arch.m.Merge(m)
debug.Log("Archiver.Preload", "done loading cached blobs for snapshot %v", id.Str())
}
idCh := make(chan backend.ID)
// start workers
var wg sync.WaitGroup
for i := 0; i < maxConcurrencyPreload; i++ {
wg.Add(1)
go worker(&wg, idCh)
}
// list ids
trees := 0
err = arch.s.EachID(backend.Tree, func(id backend.ID) {
trees++
if trees%1000 == 0 {
debug.Log("Archiver.Preload", "Loaded %v trees", trees)
}
idCh <- id
})
close(idCh)
// wait for workers
wg.Wait()
debug.Log("Archiver.Preload", "Loaded %v blobs from %v trees", arch.m.Len(), trees)
return err
debug.Log("Archiver.Preload", "Loaded %v blobs from %v snapshots", arch.m.Len(), len(snapshots))
return nil
}
func (arch *Archiver) Save(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) {
@ -787,7 +753,7 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn
// receive the top-level tree
root := (<-resCh).(*Node)
debug.Log("Archiver.Snapshot", "root node received: %#v", root.blobs[0])
debug.Log("Archiver.Snapshot", "root node received: %v", root.blobs[0])
sn.Tree = root.blobs[0]
// save snapshot
@ -796,6 +762,26 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn
return nil, nil, err
}
debug.Log("Archiver.Snapshot", "saved snapshot %v", blob.Storage.Str())
// cache blobs
wr, err := arch.c.Store(backend.Snapshot, "blobs", blob.Storage)
if err != nil {
debug.Log("Archiver.Snapshot", "unable to cache blobs for snapshot %v: %v", blob.Storage.Str(), err)
fmt.Fprintf(os.Stderr, "unable to cache blobs for snapshot %v: %v\n", blob.Storage.Str(), err)
return sn, blob.Storage, nil
}
enc := json.NewEncoder(wr)
err = enc.Encode(arch.m)
if err != nil {
debug.Log("Archiver.Snapshot", "error encoding map for snapshot %v: %v", blob.Storage.Str(), err)
} else {
debug.Log("Archiver.Snapshot", "cached %d blobs for snapshot %v", arch.m.Len(), blob.Storage.Str())
}
wr.Close()
return sn, blob.Storage, nil
}

View file

@ -146,7 +146,7 @@ func BenchmarkArchiveDirectory(b *testing.B) {
func snapshot(t testing.TB, server restic.Server, path string) *restic.Snapshot {
arch, err := restic.NewArchiver(server)
ok(t, err)
ok(t, arch.Preload(nil))
ok(t, arch.Preload())
sn, _, err := arch.Snapshot(nil, []string{path}, nil)
ok(t, err)
return sn
@ -226,7 +226,7 @@ func BenchmarkPreload(t *testing.B) {
// create new archiver and preload
arch2, err := restic.NewArchiver(server)
ok(t, err)
ok(t, arch2.Preload(nil))
ok(t, arch2.Preload())
}
}

View file

@ -9,12 +9,15 @@ import (
"github.com/restic/restic/backend"
)
// for testing
var getCacheDir = GetCacheDir
type Cache struct {
base string
}
func NewCache() (*Cache, error) {
dir, err := GetCacheDir()
dir, err := getCacheDir()
if err != nil {
return nil, err
}
@ -22,9 +25,9 @@ func NewCache() (*Cache, error) {
return &Cache{base: dir}, nil
}
func (c *Cache) Has(t backend.Type, id backend.ID) (bool, error) {
func (c *Cache) Has(t backend.Type, subtype string, id backend.ID) (bool, error) {
// try to open file
filename, err := c.filename(t, id)
filename, err := c.filename(t, subtype, id)
if err != nil {
return false, err
}
@ -42,31 +45,29 @@ func (c *Cache) Has(t backend.Type, id backend.ID) (bool, error) {
return true, nil
}
func (c *Cache) Store(t backend.Type, id backend.ID, rd io.Reader) error {
filename, err := c.filename(t, id)
func (c *Cache) Store(t backend.Type, subtype string, id backend.ID) (io.WriteCloser, error) {
filename, err := c.filename(t, subtype, id)
if err != nil {
return err
return nil, err
}
dirname := filepath.Dir(filename)
err = os.MkdirAll(dirname, 0700)
if err != nil {
return err
return nil, err
}
file, err := os.Create(filename)
defer file.Close()
if err != nil {
return err
return nil, err
}
_, err = io.Copy(file, rd)
return err
return file, nil
}
func (c *Cache) Load(t backend.Type, id backend.ID) (io.ReadCloser, error) {
func (c *Cache) Load(t backend.Type, subtype string, id backend.ID) (io.ReadCloser, error) {
// try to open file
filename, err := c.filename(t, id)
filename, err := c.filename(t, subtype, id)
if err != nil {
return nil, err
}
@ -75,17 +76,17 @@ func (c *Cache) Load(t backend.Type, id backend.ID) (io.ReadCloser, error) {
}
// Construct file name for given Type.
func (c *Cache) filename(t backend.Type, id backend.ID) (string, error) {
cachedir, err := GetCacheDir()
if err != nil {
return "", err
func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string, error) {
filename := id.String()
if subtype != "" {
filename += "." + subtype
}
switch t {
case backend.Snapshot:
return filepath.Join(cachedir, "snapshots", id.String()), nil
return filepath.Join(c.base, "snapshots", filename), nil
case backend.Tree:
return filepath.Join(cachedir, "trees", id.String()), nil
return filepath.Join(c.base, "trees", filename), nil
}
return "", fmt.Errorf("cache not supported for type %v", t)

View file

@ -91,44 +91,6 @@ func newScanProgress() *restic.Progress {
return p
}
func newLoadBlobsProgress(s restic.Server) (*restic.Progress, error) {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return nil, nil
}
trees, err := s.Count(backend.Tree)
if err != nil {
return nil, err
}
eta := uint64(0)
tps := uint64(0) // trees per second
p := restic.NewProgress(time.Second)
p.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
sec := uint64(d / time.Second)
if trees > 0 && sec > 0 && ticker {
tps = uint64(s.Trees) / sec
if tps > 0 {
eta = (uint64(trees) - s.Trees) / tps
}
}
fmt.Printf("\x1b[2K\r[%s] %3.2f%% %d trees/s %d / %d trees, %d blobs ETA %s",
format_duration(d),
float64(s.Trees)/float64(trees)*100,
tps,
s.Trees, trees,
s.Blobs,
format_seconds(eta))
}
p.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\nDone in %s\n", format_duration(d))
}
return p, nil
}
func newArchiveProgress(todo restic.Stat) *restic.Progress {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return nil
@ -218,12 +180,7 @@ func (cmd CmdBackup) Execute(args []string) error {
}
fmt.Printf("loading blobs\n")
pb, err := newLoadBlobsProgress(s)
if err != nil {
return err
}
err = arch.Preload(pb)
err = arch.Preload()
if err != nil {
return err
}

View file

@ -2,6 +2,7 @@ package main
import (
"fmt"
"io"
"sync"
"github.com/restic/restic"
@ -49,7 +50,7 @@ func (cmd CmdCache) Execute(args []string) error {
treeCh := make(chan backend.ID)
worker := func(wg *sync.WaitGroup, ch chan backend.ID) {
for treeID := range ch {
cached, err := cache.Has(backend.Tree, treeID)
cached, err := cache.Has(backend.Tree, "", treeID)
if err != nil {
fmt.Printf("tree %v cache error: %v\n", treeID.Str(), err)
continue
@ -72,12 +73,18 @@ func (cmd CmdCache) Execute(args []string) error {
continue
}
err = cache.Store(backend.Tree, treeID, decRd)
wr, err := cache.Store(backend.Tree, "", treeID)
if err != nil {
fmt.Printf(" store error: %v\n", err)
continue
}
_, err = io.Copy(wr, decRd)
if err != nil {
fmt.Printf(" Copy error: %v\n", err)
continue
}
err = decRd.Close()
if err != nil {
fmt.Printf(" close error: %v\n", err)

11
map.go
View file

@ -140,6 +140,17 @@ func (bl *Map) Equals(other *Map) bool {
return true
}
// Each calls f for each blob in the Map. While Each is running, no other
// operation is possible, since a mutex is held for the whole time.
func (bl *Map) Each(f func(blob Blob)) {
bl.m.Lock()
defer bl.m.Unlock()
for _, blob := range bl.list {
f(blob)
}
}
// Select returns a list of of blobs from the plaintext IDs given in list.
func (bl *Map) Select(list backend.IDs) (Blobs, error) {
bl.m.Lock()