Recreate blob cache if missing (closes #104)
This commit is contained in:
parent
05f3e98ed9
commit
3c92c7e689
4 changed files with 147 additions and 27 deletions
40
archiver.go
40
archiver.go
|
@ -76,21 +76,16 @@ func (arch *Archiver) Preload() error {
|
|||
|
||||
// TODO: track seen tree ids, load trees that aren't in the set
|
||||
for _, id := range snapshots {
|
||||
// try to load snapshot blobs from cache
|
||||
rd, err := arch.c.Load(backend.Snapshot, "blobs", id)
|
||||
m, err := arch.c.LoadMap(arch.s, id)
|
||||
if err != nil {
|
||||
debug.Log("Archiver.Preload", "blobs for snapshot %v not cached: %v", id.Str(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
debug.Log("Archiver.Preload", "load cached blobs for snapshot %v", id.Str())
|
||||
dec := json.NewDecoder(rd)
|
||||
|
||||
m := &Map{}
|
||||
err = dec.Decode(m)
|
||||
if err != nil {
|
||||
debug.Log("Archiver.Preload", "error loading cached blobs for snapshot %v: %v", id.Str(), err)
|
||||
continue
|
||||
// build new cache
|
||||
m, err = CacheSnapshotBlobs(arch.s, arch.c, id)
|
||||
if err != nil {
|
||||
debug.Log("Archiver.Preload", "unable to cache snapshot blobs for %v: %v", id.Str(), err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
arch.m.Merge(m)
|
||||
|
@ -473,9 +468,9 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
|
|||
debug.Log("Archiver.dirWorker", "got tree node for %s: %v", node.path, node.blobs)
|
||||
}
|
||||
|
||||
// also store blob in tree map
|
||||
for _, blob := range node.blobs {
|
||||
tree.Map.Insert(blob)
|
||||
arch.m.Insert(blob)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -761,11 +756,12 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn
|
|||
|
||||
// receive the top-level tree
|
||||
root := (<-resCh).(*Node)
|
||||
debug.Log("Archiver.Snapshot", "root node received: %v", root.blobs[0])
|
||||
sn.Tree = root.blobs[0]
|
||||
blob := root.blobs[0]
|
||||
debug.Log("Archiver.Snapshot", "root node received: %v", blob)
|
||||
sn.Tree = blob
|
||||
|
||||
// save snapshot
|
||||
blob, err := arch.s.SaveJSON(backend.Snapshot, sn)
|
||||
blob, err = arch.s.SaveJSON(backend.Snapshot, sn)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -776,23 +772,13 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn
|
|||
debug.Log("Archiver.Snapshot", "saved snapshot %v", blob.Storage.Str())
|
||||
|
||||
// cache blobs
|
||||
wr, err := arch.c.Store(backend.Snapshot, "blobs", blob.Storage)
|
||||
err = arch.c.StoreMap(sn.id, arch.m)
|
||||
if err != nil {
|
||||
debug.Log("Archiver.Snapshot", "unable to cache blobs for snapshot %v: %v", blob.Storage.Str(), err)
|
||||
fmt.Fprintf(os.Stderr, "unable to cache blobs for snapshot %v: %v\n", blob.Storage.Str(), err)
|
||||
return sn, blob.Storage, nil
|
||||
}
|
||||
|
||||
enc := json.NewEncoder(wr)
|
||||
err = enc.Encode(arch.m)
|
||||
if err != nil {
|
||||
debug.Log("Archiver.Snapshot", "error encoding map for snapshot %v: %v", blob.Storage.Str(), err)
|
||||
} else {
|
||||
debug.Log("Archiver.Snapshot", "cached %d blobs for snapshot %v", arch.m.Len(), blob.Storage.Str())
|
||||
}
|
||||
|
||||
wr.Close()
|
||||
|
||||
return sn, blob.Storage, nil
|
||||
}
|
||||
|
||||
|
|
79
cache.go
79
cache.go
|
@ -1,11 +1,13 @@
|
|||
package restic
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/backend"
|
||||
"github.com/restic/restic/debug"
|
||||
|
@ -203,3 +205,80 @@ func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string,
|
|||
|
||||
return "", fmt.Errorf("cache not supported for type %v", t)
|
||||
}
|
||||
|
||||
// high-level functions
|
||||
|
||||
// CacheSnapshotBlobs creates a cache of all the blobs used within the
|
||||
// snapshot. It collects all blobs from all trees and saves the resulting map
|
||||
// to the cache and returns the map.
|
||||
func CacheSnapshotBlobs(s Server, c *Cache, id backend.ID) (*Map, error) {
|
||||
debug.Log("CacheSnapshotBlobs", "create cache for snapshot %v", id.Str())
|
||||
|
||||
sn, err := LoadSnapshot(s, id)
|
||||
if err != nil {
|
||||
debug.Log("CacheSnapshotBlobs", "unable to load snapshot %v: %v", id.Str(), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := NewMap()
|
||||
|
||||
// add top-level node
|
||||
m.Insert(sn.Tree)
|
||||
|
||||
// start walker
|
||||
var wg sync.WaitGroup
|
||||
ch := make(chan WalkTreeJob)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
WalkTree(s, sn.Tree.Storage, nil, ch)
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
for i := 0; i < maxConcurrencyPreload; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for job := range ch {
|
||||
if job.Tree == nil {
|
||||
continue
|
||||
}
|
||||
debug.Log("CacheSnapshotBlobs", "got job %v", job)
|
||||
m.Merge(job.Tree.Map)
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// save blob list for snapshot
|
||||
return m, c.StoreMap(id, m)
|
||||
}
|
||||
|
||||
func (c *Cache) StoreMap(snid backend.ID, m *Map) error {
|
||||
wr, err := c.Store(backend.Snapshot, "blobs", snid)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer wr.Close()
|
||||
|
||||
enc := json.NewEncoder(wr)
|
||||
err = enc.Encode(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) LoadMap(s Server, snid backend.ID) (*Map, error) {
|
||||
rd, err := c.Load(backend.Snapshot, "blobs", snid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := &Map{}
|
||||
err = json.NewDecoder(rd).Decode(m)
|
||||
return m, err
|
||||
}
|
||||
|
|
45
cache_test.go
Normal file
45
cache_test.go
Normal file
|
@ -0,0 +1,45 @@
|
|||
package restic_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic"
|
||||
"github.com/restic/restic/backend"
|
||||
)
|
||||
|
||||
func TestCache(t *testing.T) {
|
||||
be := setupBackend(t)
|
||||
defer teardownBackend(t, be)
|
||||
key := setupKey(t, be, "geheim")
|
||||
server := restic.NewServerWithKey(be, key)
|
||||
|
||||
cache, err := restic.NewCache(server)
|
||||
ok(t, err)
|
||||
|
||||
arch, err := restic.NewArchiver(server)
|
||||
ok(t, err)
|
||||
|
||||
// archive some files, this should automatically cache all blobs from the snapshot
|
||||
_, id, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil)
|
||||
|
||||
// try to load map from cache
|
||||
rd, err := cache.Load(backend.Snapshot, "blobs", id)
|
||||
ok(t, err)
|
||||
|
||||
dec := json.NewDecoder(rd)
|
||||
|
||||
m := &restic.Map{}
|
||||
err = dec.Decode(m)
|
||||
ok(t, err)
|
||||
|
||||
// remove cached blob list
|
||||
ok(t, cache.Purge(backend.Snapshot, "blobs", id))
|
||||
|
||||
// recreate cached blob list
|
||||
m2, err := restic.CacheSnapshotBlobs(server, cache, id)
|
||||
ok(t, err)
|
||||
|
||||
// compare maps
|
||||
assert(t, m.Equals(m2), "Maps are not equal")
|
||||
}
|
10
map.go
10
map.go
|
@ -124,15 +124,25 @@ func (bl *Map) StorageIDs() []backend.ID {
|
|||
}
|
||||
|
||||
func (bl *Map) Equals(other *Map) bool {
|
||||
if bl == nil && other == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if bl == nil || other == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
bl.m.Lock()
|
||||
defer bl.m.Unlock()
|
||||
|
||||
if len(bl.list) != len(other.list) {
|
||||
debug.Log("Map.Equals", "length does not match: %d != %d", len(bl.list), len(other.list))
|
||||
return false
|
||||
}
|
||||
|
||||
for i := 0; i < len(bl.list); i++ {
|
||||
if bl.list[i].Compare(other.list[i]) != 0 {
|
||||
debug.Log("Map.Equals", "entry %d does not match: %v != %v", i, bl.list[i], other.list[i])
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue