Merge branch 'pack-blobs'

This commit is contained in:
Alexander Neumann 2015-04-30 00:45:17 +02:00
commit e8041b0411
35 changed files with 1869 additions and 1234 deletions

View file

@ -14,6 +14,7 @@ import (
"github.com/restic/restic/backend"
"github.com/restic/restic/chunker"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/pipe"
"github.com/restic/restic/server"
)
@ -29,8 +30,6 @@ const (
type Archiver struct {
s *server.Server
m *Map
c *Cache
blobToken chan struct{}
@ -50,15 +49,6 @@ func NewArchiver(s *server.Server) (*Archiver, error) {
arch.blobToken <- struct{}{}
}
// create new map to store all blobs in
arch.m = NewMap()
// init cache
arch.c, err = NewCache(s)
if err != nil {
return nil, err
}
// abort on all errors
arch.Error = func(string, os.FileInfo, error) error { return err }
// allow all files
@ -67,119 +57,59 @@ func NewArchiver(s *server.Server) (*Archiver, error) {
return arch, nil
}
// Cache returns the current cache for the Archiver.
func (arch *Archiver) Cache() *Cache {
return arch.c
}
// Preload loads all blobs for all cached snapshots.
func (arch *Archiver) Preload() error {
done := make(chan struct{})
defer close(done)
// list snapshots
// TODO: track seen tree ids, load trees that aren't in the set
snapshots := 0
for name := range arch.s.List(backend.Snapshot, done) {
id, err := backend.ParseID(name)
if err != nil {
debug.Log("Archiver.Preload", "unable to parse name %v as id: %v", name, err)
continue
}
m, err := arch.c.LoadMap(arch.s, id)
if err != nil {
debug.Log("Archiver.Preload", "blobs for snapshot %v not cached: %v", id.Str(), err)
continue
}
arch.m.Merge(m)
debug.Log("Archiver.Preload", "done loading cached blobs for snapshot %v", id.Str())
snapshots++
}
debug.Log("Archiver.Preload", "Loaded %v blobs from %v snapshots", arch.m.Len(), snapshots)
return nil
}
func (arch *Archiver) Save(t backend.Type, id backend.ID, length uint, rd io.Reader) (server.Blob, error) {
func (arch *Archiver) Save(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error {
debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str())
// test if this blob is already known
blob, err := arch.m.FindID(id)
if err == nil {
debug.Log("Archiver.Save", "Save(%v, %v): reusing %v\n", t, id.Str(), blob.Storage.Str())
return blob, nil
if arch.s.Index().Has(id) {
debug.Log("Archiver.Save", "(%v, %v) already saved\n", t, id.Str())
return nil
}
// else encrypt and save data
blob, err = arch.s.SaveFrom(t, id, length, rd)
// store blob in storage map
smapblob := arch.m.Insert(blob)
// if the map has a different storage id for this plaintext blob, use that
// one and remove the other. This happens if the same plaintext blob was
// stored concurrently and finished earlier than this blob.
if blob.Storage.Compare(smapblob.Storage) != 0 {
debug.Log("Archiver.Save", "using other block, removing %v\n", blob.Storage.Str())
// remove the blob again
// TODO: implement a list of blobs in transport, so this doesn't happen so often
err = arch.s.Remove(t, blob.Storage.String())
if err != nil {
return server.Blob{}, err
}
// otherwise save blob
err := arch.s.SaveFrom(t, id, length, rd)
if err != nil {
debug.Log("Archiver.Save", "Save(%v, %v): error %v\n", t, id.Str(), err)
return err
}
debug.Log("Archiver.Save", "Save(%v, %v): new blob %v\n", t, id.Str(), blob)
return smapblob, nil
debug.Log("Archiver.Save", "Save(%v, %v): new blob\n", t, id.Str())
return nil
}
func (arch *Archiver) SaveTreeJSON(item interface{}) (server.Blob, error) {
func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) {
// convert to json
data, err := json.Marshal(item)
// append newline
data = append(data, '\n')
if err != nil {
return server.Blob{}, err
return nil, err
}
// check if tree has been saved before
id := backend.Hash(data)
blob, err := arch.m.FindID(id)
// return the blob if we found it
if err == nil {
return blob, nil
if arch.s.Index().Has(id) {
return id, nil
}
// otherwise save the data
blob, err = arch.s.SaveJSON(backend.Tree, item)
if err != nil {
return server.Blob{}, err
}
// store blob in storage map
arch.m.Insert(blob)
return blob, nil
return arch.s.SaveJSON(pack.Tree, item)
}
// SaveFile stores the content of the file on the backend as a Blob by calling
// Save for each chunk.
func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) {
func (arch *Archiver) SaveFile(p *Progress, node *Node) error {
file, err := node.OpenForReading()
defer file.Close()
if err != nil {
return nil, err
return err
}
// check file again
fi, err := file.Stat()
if err != nil {
return nil, err
return err
}
if fi.ModTime() != node.ModTime {
@ -190,7 +120,7 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) {
n, err := NodeFromFileInfo(node.path, fi)
if err != nil {
debug.Log("Archiver.SaveFile", "NodeFromFileInfo returned error for %v: %v", node.path, err)
return nil, err
return err
}
// copy node
@ -198,12 +128,15 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) {
}
}
var blobs server.Blobs
type result struct {
id backend.ID
bytes uint64
}
// store all chunks
chnker := GetChunker("archiver.SaveFile")
chnker.Reset(file, arch.s.ChunkerPolynomial())
chans := [](<-chan server.Blob){}
chans := [](<-chan result){}
defer FreeChunker("archiver.SaveFile", chnker)
chunks := 0
@ -215,75 +148,71 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) {
}
if err != nil {
return nil, arrar.Annotate(err, "SaveFile() chunker.Next()")
return arrar.Annotate(err, "SaveFile() chunker.Next()")
}
chunks++
// acquire token, start goroutine to save chunk
token := <-arch.blobToken
resCh := make(chan server.Blob, 1)
resCh := make(chan result, 1)
go func(ch chan<- server.Blob) {
blob, err := arch.Save(backend.Data, chunk.Digest, chunk.Length, chunk.Reader(file))
go func(ch chan<- result) {
err := arch.Save(pack.Data, chunk.Digest, chunk.Length, chunk.Reader(file))
// TODO handle error
if err != nil {
panic(err)
}
p.Report(Stat{Bytes: blob.Size})
p.Report(Stat{Bytes: uint64(chunk.Length)})
arch.blobToken <- token
ch <- blob
ch <- result{id: backend.ID(chunk.Digest), bytes: uint64(chunk.Length)}
}(resCh)
chans = append(chans, resCh)
}
blobs = []server.Blob{}
results := []result{}
for _, ch := range chans {
blobs = append(blobs, <-ch)
results = append(results, <-ch)
}
if len(blobs) != chunks {
return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(blobs))
if len(results) != chunks {
return fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(results))
}
var bytes uint64
node.Content = make([]backend.ID, len(blobs))
node.Content = make([]backend.ID, len(results))
debug.Log("Archiver.Save", "checking size for file %s", node.path)
for i, blob := range blobs {
node.Content[i] = blob.ID
bytes += blob.Size
for i, b := range results {
node.Content[i] = b.id
bytes += b.bytes
debug.Log("Archiver.Save", " adding blob %s", blob)
debug.Log("Archiver.Save", " adding blob %s, %d bytes", b.id.Str(), b.bytes)
}
if bytes != node.Size {
return nil, fmt.Errorf("errors saving node %q: saved %d bytes, wanted %d bytes", node.path, bytes, node.Size)
return fmt.Errorf("errors saving node %q: saved %d bytes, wanted %d bytes", node.path, bytes, node.Size)
}
debug.Log("Archiver.SaveFile", "SaveFile(%q): %v\n", node.path, blobs)
debug.Log("Archiver.SaveFile", "SaveFile(%q): %v blobs\n", node.path, len(results))
return blobs, nil
return nil
}
func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
func (arch *Archiver) saveTree(p *Progress, t *Tree) (backend.ID, error) {
debug.Log("Archiver.saveTree", "saveTree(%v)\n", t)
var wg sync.WaitGroup
// add all blobs to global map
arch.m.Merge(t.Map)
// TODO: do all this in parallel
for _, node := range t.Nodes {
if node.tree != nil {
b, err := arch.saveTree(p, node.tree)
id, err := arch.saveTree(p, node.tree)
if err != nil {
return server.Blob{}, err
return nil, err
}
node.Subtree = b.ID
t.Map.Insert(b)
node.Subtree = id
p.Report(Stat{Dirs: 1})
} else if node.Type == "file" {
if len(node.Content) > 0 {
@ -291,22 +220,18 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
// check content
for _, id := range node.Content {
blob, err := t.Map.FindID(id)
packID, _, _, _, err := arch.s.Index().Lookup(id)
if err != nil {
debug.Log("Archiver.saveTree", "unable to find storage id for data blob %v", id.Str())
arch.Error(node.path, nil, fmt.Errorf("unable to find storage id for data blob %v", id.Str()))
debug.Log("Archiver.saveTree", "unable to find storage id for data blob %v: %v", id.Str(), err)
arch.Error(node.path, nil, fmt.Errorf("unable to find storage id for data blob %v: %v", id.Str(), err))
removeContent = true
t.Map.DeleteID(id)
arch.m.DeleteID(id)
continue
}
if ok, err := arch.s.Test(backend.Data, blob.Storage.String()); !ok || err != nil {
debug.Log("Archiver.saveTree", "blob %v not in repository (error is %v)", blob, err)
arch.Error(node.path, nil, fmt.Errorf("blob %v not in repository (error is %v)", blob.Storage.Str(), err))
if ok, err := arch.s.Test(backend.Data, packID.String()); !ok || err != nil {
debug.Log("Archiver.saveTree", "pack %v of blob %v not in repository (error is %v)", packID, id, err)
arch.Error(node.path, nil, fmt.Errorf("pack %v of blob %v not in repository (error is %v)", packID, id, err))
removeContent = true
t.Map.DeleteID(id)
arch.m.DeleteID(id)
}
}
@ -322,12 +247,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
go func(n *Node) {
defer wg.Done()
var blobs server.Blobs
blobs, n.err = arch.SaveFile(p, n)
for _, b := range blobs {
t.Map.Insert(b)
}
n.err = arch.SaveFile(p, n)
p.Report(Stat{Files: 1})
}(node)
}
@ -341,7 +261,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
// check for invalid file nodes
for _, node := range t.Nodes {
if node.Type == "file" && node.Content == nil && node.err == nil {
return server.Blob{}, fmt.Errorf("node %v has empty content", node.Name)
return nil, fmt.Errorf("node %v has empty content", node.Name)
}
// remember used hashes
@ -358,7 +278,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
if node.err != nil {
err := arch.Error(node.path, nil, node.err)
if err != nil {
return server.Blob{}, err
return nil, err
}
// save error message in node
@ -366,20 +286,12 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) {
}
}
before := len(t.Map.IDs())
t.Map.Prune(usedIDs)
after := len(t.Map.IDs())
if before != after {
debug.Log("Archiver.saveTree", "pruned %d ids from map for tree %v\n", before-after, t)
}
blob, err := arch.SaveTreeJSON(t)
id, err := arch.SaveTreeJSON(t)
if err != nil {
return server.Blob{}, err
return nil, err
}
return blob, nil
return id, nil
}
func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, entCh <-chan pipe.Entry) {
@ -444,7 +356,7 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st
// otherwise read file normally
if node.Type == "file" && len(node.Content) == 0 {
debug.Log("Archiver.fileWorker", " read and save %v, content: %v", e.Path(), node.Content)
node.blobs, err = arch.SaveFile(p, node)
err = arch.SaveFile(p, node)
if err != nil {
// TODO: integrate error reporting
fmt.Fprintf(os.Stderr, "error for %v: %v\n", node.path, err)
@ -501,11 +413,6 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
if node.Type == "dir" {
debug.Log("Archiver.dirWorker", "got tree node for %s: %v", node.path, node.blobs)
}
// also store blob in tree map
for _, blob := range node.blobs {
tree.Map.Insert(blob)
}
}
var (
@ -525,14 +432,13 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
}
}
blob, err := arch.SaveTreeJSON(tree)
id, err := arch.SaveTreeJSON(tree)
if err != nil {
panic(err)
}
debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), blob)
debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), id.Str())
node.Subtree = blob.ID
node.blobs = server.Blobs{blob}
node.Subtree = id
dir.Result() <- node
if dir.Path() != "" {
@ -792,30 +698,35 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn
// receive the top-level tree
root := (<-resCh).(*Node)
blob := root.blobs[0]
debug.Log("Archiver.Snapshot", "root node received: %v", blob)
sn.Tree = blob
debug.Log("Archiver.Snapshot", "root node received: %v", root.Subtree.Str())
sn.Tree = root.Subtree
// save snapshot
blob, err = arch.s.SaveJSON(backend.Snapshot, sn)
id, err := arch.s.SaveJSONUnpacked(backend.Snapshot, sn)
if err != nil {
return nil, nil, err
}
// store ID in snapshot struct
sn.id = blob.Storage
sn.id = id
debug.Log("Archiver.Snapshot", "saved snapshot %v", id.Str())
debug.Log("Archiver.Snapshot", "saved snapshot %v", blob.Storage.Str())
// cache blobs
err = arch.c.StoreMap(sn.id, arch.m)
// flush server
err = arch.s.Flush()
if err != nil {
debug.Log("Archiver.Snapshot", "unable to cache blobs for snapshot %v: %v", blob.Storage.Str(), err)
fmt.Fprintf(os.Stderr, "unable to cache blobs for snapshot %v: %v\n", blob.Storage.Str(), err)
return sn, blob.Storage, nil
return nil, nil, err
}
return sn, blob.Storage, nil
// save index
indexID, err := arch.s.SaveIndex()
if err != nil {
debug.Log("Archiver.Snapshot", "error saving index: %v", err)
return nil, nil, err
}
debug.Log("Archiver.Snapshot", "saved index %v", indexID.Str())
return sn, id, nil
}
func isFile(fi os.FileInfo) bool {

View file

@ -9,6 +9,7 @@ import (
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/chunker"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
. "github.com/restic/restic/test"
)
@ -114,11 +115,7 @@ func BenchmarkChunkEncryptParallel(b *testing.B) {
restic.FreeChunkBuf("BenchmarkChunkEncryptParallel", buf)
}
func BenchmarkArchiveDirectory(b *testing.B) {
if *benchArchiveDirectory == "" {
b.Skip("benchdir not set, skipping BenchmarkArchiveDirectory")
}
func archiveDirectory(b testing.TB) {
server := SetupBackend(b)
defer TeardownBackend(b, server)
key := SetupKey(b, server, "geheim")
@ -132,13 +129,25 @@ func BenchmarkArchiveDirectory(b *testing.B) {
b.Logf("snapshot archived as %v", id)
}
func countBlobs(t testing.TB, server *server.Server) (trees int, data int) {
return server.Count(backend.Tree), server.Count(backend.Data)
func TestArchiveDirectory(t *testing.T) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiveDirectory")
}
archiveDirectory(t)
}
func archiveWithPreload(t testing.TB) {
func BenchmarkArchiveDirectory(b *testing.B) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiverPreload")
b.Skip("benchdir not set, skipping BenchmarkArchiveDirectory")
}
archiveDirectory(b)
}
func archiveWithDedup(t testing.TB) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiverDedup")
}
server := SetupBackend(t)
@ -146,78 +155,81 @@ func archiveWithPreload(t testing.TB) {
key := SetupKey(t, server, "geheim")
server.SetKey(key)
var cnt struct {
before, after, after2 struct {
packs, dataBlobs, treeBlobs uint
}
}
// archive a few files
sn := SnapshotDir(t, server, *benchArchiveDirectory, nil)
t.Logf("archived snapshot %v", sn.ID().Str())
// get archive stats
beforeTrees, beforeData := countBlobs(t, server)
t.Logf("found %v trees, %v data blobs", beforeTrees, beforeData)
cnt.before.packs = server.Count(backend.Data)
cnt.before.dataBlobs = server.Index().Count(pack.Data)
cnt.before.treeBlobs = server.Index().Count(pack.Tree)
t.Logf("packs %v, data blobs %v, tree blobs %v",
cnt.before.packs, cnt.before.dataBlobs, cnt.before.treeBlobs)
// archive the same files again, without parent snapshot
sn2 := SnapshotDir(t, server, *benchArchiveDirectory, nil)
t.Logf("archived snapshot %v", sn2.ID().Str())
// get archive stats
afterTrees2, afterData2 := countBlobs(t, server)
t.Logf("found %v trees, %v data blobs", afterTrees2, afterData2)
// get archive stats again
cnt.after.packs = server.Count(backend.Data)
cnt.after.dataBlobs = server.Index().Count(pack.Data)
cnt.after.treeBlobs = server.Index().Count(pack.Tree)
t.Logf("packs %v, data blobs %v, tree blobs %v",
cnt.after.packs, cnt.after.dataBlobs, cnt.after.treeBlobs)
// if there are more blobs, something is wrong
if afterData2 > beforeData {
t.Fatalf("TestArchiverPreload: too many data blobs in repository: before %d, after %d",
beforeData, afterData2)
// if there are more packs or blobs, something is wrong
if cnt.after.packs > cnt.before.packs {
t.Fatalf("TestArchiverDedup: too many packs in repository: before %d, after %d",
cnt.before.packs, cnt.after.packs)
}
if cnt.after.dataBlobs > cnt.before.dataBlobs {
t.Fatalf("TestArchiverDedup: too many data blobs in repository: before %d, after %d",
cnt.before.dataBlobs, cnt.after.dataBlobs)
}
if cnt.after.treeBlobs > cnt.before.treeBlobs {
t.Fatalf("TestArchiverDedup: too many tree blobs in repository: before %d, after %d",
cnt.before.treeBlobs, cnt.after.treeBlobs)
}
// archive the same files again, with a parent snapshot
sn3 := SnapshotDir(t, server, *benchArchiveDirectory, sn2.ID())
t.Logf("archived snapshot %v, parent %v", sn3.ID().Str(), sn2.ID().Str())
// get archive stats
afterTrees3, afterData3 := countBlobs(t, server)
t.Logf("found %v trees, %v data blobs", afterTrees3, afterData3)
// get archive stats again
cnt.after2.packs = server.Count(backend.Data)
cnt.after2.dataBlobs = server.Index().Count(pack.Data)
cnt.after2.treeBlobs = server.Index().Count(pack.Tree)
t.Logf("packs %v, data blobs %v, tree blobs %v",
cnt.after2.packs, cnt.after2.dataBlobs, cnt.after2.treeBlobs)
// if there are more blobs, something is wrong
if afterData3 > beforeData {
t.Fatalf("TestArchiverPreload: too many data blobs in repository: before %d, after %d",
beforeData, afterData3)
// if there are more packs or blobs, something is wrong
if cnt.after2.packs > cnt.before.packs {
t.Fatalf("TestArchiverDedup: too many packs in repository: before %d, after %d",
cnt.before.packs, cnt.after2.packs)
}
if cnt.after2.dataBlobs > cnt.before.dataBlobs {
t.Fatalf("TestArchiverDedup: too many data blobs in repository: before %d, after %d",
cnt.before.dataBlobs, cnt.after2.dataBlobs)
}
if cnt.after2.treeBlobs > cnt.before.treeBlobs {
t.Fatalf("TestArchiverDedup: too many tree blobs in repository: before %d, after %d",
cnt.before.treeBlobs, cnt.after2.treeBlobs)
}
}
func TestArchivePreload(t *testing.T) {
archiveWithPreload(t)
}
func BenchmarkPreload(t *testing.B) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiverPreload")
}
server := SetupBackend(t)
defer TeardownBackend(t, server)
key := SetupKey(t, server, "geheim")
server.SetKey(key)
// archive a few files
arch, err := restic.NewArchiver(server)
OK(t, err)
sn, _, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil)
OK(t, err)
t.Logf("archived snapshot %v", sn.ID())
// start benchmark
t.ResetTimer()
for i := 0; i < t.N; i++ {
// create new archiver and preload
arch2, err := restic.NewArchiver(server)
OK(t, err)
OK(t, arch2.Preload())
}
func TestArchiveDedup(t *testing.T) {
archiveWithDedup(t)
}
func BenchmarkLoadTree(t *testing.B) {
if *benchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping TestArchiverPreload")
t.Skip("benchdir not set, skipping TestArchiverDedup")
}
s := SetupBackend(t)
@ -235,14 +247,12 @@ func BenchmarkLoadTree(t *testing.B) {
list := make([]backend.ID, 0, 10)
done := make(chan struct{})
for name := range s.List(backend.Tree, done) {
id, err := backend.ParseID(name)
if err != nil {
t.Logf("invalid id for tree %v", name)
for blob := range s.Index().Each(done) {
if blob.Type != pack.Tree {
continue
}
list = append(list, id)
list = append(list, blob.ID)
if len(list) == cap(list) {
close(done)
break
@ -254,7 +264,7 @@ func BenchmarkLoadTree(t *testing.B) {
for i := 0; i < t.N; i++ {
for _, id := range list {
_, err := restic.LoadTree(s, server.Blob{Storage: id})
_, err := restic.LoadTree(s, id)
OK(t, err)
}
}

View file

@ -13,7 +13,10 @@ import (
)
func testBackend(b backend.Backend, t *testing.T) {
for _, tpe := range []backend.Type{backend.Data, backend.Key, backend.Lock, backend.Snapshot, backend.Tree} {
for _, tpe := range []backend.Type{
backend.Data, backend.Key, backend.Lock,
backend.Snapshot, backend.Index,
} {
// detect non-existing files
for _, test := range TestStrings {
id, err := backend.ParseID(test.id)

View file

@ -3,6 +3,7 @@ package backend
import (
"crypto/sha256"
"errors"
"io"
)
const (
@ -96,3 +97,33 @@ outer:
return IDSize, nil
}
// wrap around io.LimitedReader that implements io.ReadCloser
type blobReader struct {
f io.Closer
rd io.Reader
closed bool
}
func (l *blobReader) Read(p []byte) (int, error) {
n, err := l.rd.Read(p)
if err == io.EOF {
l.Close()
}
return n, err
}
func (l *blobReader) Close() error {
if !l.closed {
err := l.f.Close()
l.closed = true
return err
}
return nil
}
func LimitReader(f io.ReadCloser, n int64) *blobReader {
return &blobReader{f: f, rd: io.LimitReader(f, n)}
}

View file

@ -10,14 +10,14 @@ const (
Key = "key"
Lock = "lock"
Snapshot = "snapshot"
Tree = "tree"
Index = "index"
)
const (
Version = 1
)
// A Backend manages blobs of data.
// A Backend manages data stored somewhere.
type Backend interface {
// Location returns a string that specifies the location of the repository,
// like a URL.
@ -30,6 +30,10 @@ type Backend interface {
// Get returns an io.ReadCloser for the Blob with the given name of type t.
Get(t Type, name string) (io.ReadCloser, error)
// GetReader returns an io.ReadCloser for the Blob with the given name of
// type t at offset and length.
GetReader(t Type, name string, offset, length uint) (io.ReadCloser, error)
// Test a boolean value whether a Blob with the name and type exists.
Test(t Type, name string) (bool, error)

View file

@ -30,7 +30,7 @@ func Open(dir string) (*Local, error) {
dir,
filepath.Join(dir, backend.Paths.Data),
filepath.Join(dir, backend.Paths.Snapshots),
filepath.Join(dir, backend.Paths.Trees),
filepath.Join(dir, backend.Paths.Index),
filepath.Join(dir, backend.Paths.Locks),
filepath.Join(dir, backend.Paths.Keys),
filepath.Join(dir, backend.Paths.Temp),
@ -102,7 +102,7 @@ func Create(dir string) (*Local, error) {
dir,
filepath.Join(dir, backend.Paths.Data),
filepath.Join(dir, backend.Paths.Snapshots),
filepath.Join(dir, backend.Paths.Trees),
filepath.Join(dir, backend.Paths.Index),
filepath.Join(dir, backend.Paths.Locks),
filepath.Join(dir, backend.Paths.Keys),
filepath.Join(dir, backend.Paths.Temp),
@ -222,7 +222,7 @@ func (lb *localBlob) Finalize(t backend.Type, name string) error {
f := filename(lb.basedir, t, name)
// create directories if necessary, ignore errors
if t == backend.Data || t == backend.Tree {
if t == backend.Data {
os.MkdirAll(filepath.Dir(f), backend.Modes.Dir)
}
@ -279,11 +279,8 @@ func dirname(base string, t backend.Type, name string) string {
}
case backend.Snapshot:
n = backend.Paths.Snapshots
case backend.Tree:
n = backend.Paths.Trees
if len(name) > 2 {
n = filepath.Join(n, name[:2])
}
case backend.Index:
n = backend.Paths.Index
case backend.Lock:
n = backend.Paths.Locks
case backend.Key:
@ -298,6 +295,26 @@ func (b *Local) Get(t backend.Type, name string) (io.ReadCloser, error) {
return os.Open(filename(b.p, t, name))
}
// GetReader returns an io.ReadCloser for the Blob with the given name of
// type t at offset and length. If length is 0, the reader reads until EOF.
func (b *Local) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) {
f, err := os.Open(filename(b.p, t, name))
if err != nil {
return nil, err
}
_, err = f.Seek(int64(offset), 0)
if err != nil {
return nil, err
}
if length == 0 {
return f, nil
}
return backend.LimitReader(f, int64(length)), nil
}
// Test returns true if a blob of the given type and name exists in the backend.
func (b *Local) Test(t backend.Type, name string) (bool, error) {
_, err := os.Stat(filename(b.p, t, name))
@ -322,7 +339,7 @@ func (b *Local) Remove(t backend.Type, name string) error {
func (b *Local) List(t backend.Type, done <-chan struct{}) <-chan string {
// TODO: use os.Open() and d.Readdirnames() instead of Glob()
var pattern string
if t == backend.Data || t == backend.Tree {
if t == backend.Data {
pattern = filepath.Join(dirname(b.p, t, ""), "*", "*")
} else {
pattern = filepath.Join(dirname(b.p, t, ""), "*")

View file

@ -6,7 +6,7 @@ import "os"
var Paths = struct {
Data string
Snapshots string
Trees string
Index string
Locks string
Keys string
Temp string
@ -15,7 +15,7 @@ var Paths = struct {
}{
"data",
"snapshots",
"trees",
"index",
"locks",
"keys",
"tmp",

View file

@ -78,7 +78,7 @@ func Open(dir string, program string, args ...string) (*SFTP, error) {
dir,
filepath.Join(dir, backend.Paths.Data),
filepath.Join(dir, backend.Paths.Snapshots),
filepath.Join(dir, backend.Paths.Trees),
filepath.Join(dir, backend.Paths.Index),
filepath.Join(dir, backend.Paths.Locks),
filepath.Join(dir, backend.Paths.Keys),
filepath.Join(dir, backend.Paths.Version),
@ -152,7 +152,7 @@ func Create(dir string, program string, args ...string) (*SFTP, error) {
dir,
filepath.Join(dir, backend.Paths.Data),
filepath.Join(dir, backend.Paths.Snapshots),
filepath.Join(dir, backend.Paths.Trees),
filepath.Join(dir, backend.Paths.Index),
filepath.Join(dir, backend.Paths.Locks),
filepath.Join(dir, backend.Paths.Keys),
filepath.Join(dir, backend.Paths.Temp),
@ -303,7 +303,7 @@ func (r *SFTP) renameFile(oldname string, t backend.Type, name string) error {
filename := r.filename(t, name)
// create directories if necessary
if t == backend.Data || t == backend.Tree {
if t == backend.Data {
err := r.mkdirAll(filepath.Dir(filename), backend.Modes.Dir)
if err != nil {
return err
@ -403,11 +403,8 @@ func (r *SFTP) dirname(t backend.Type, name string) string {
}
case backend.Snapshot:
n = backend.Paths.Snapshots
case backend.Tree:
n = backend.Paths.Trees
if len(name) > 2 {
n = filepath.Join(n, name[:2])
}
case backend.Index:
n = backend.Paths.Index
case backend.Lock:
n = backend.Paths.Locks
case backend.Key:
@ -432,6 +429,26 @@ func (r *SFTP) Get(t backend.Type, name string) (io.ReadCloser, error) {
return file, nil
}
// GetReader returns an io.ReadCloser for the Blob with the given name of
// type t at offset and length. If length is 0, the reader reads until EOF.
func (r *SFTP) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) {
f, err := r.c.Open(r.filename(t, name))
if err != nil {
return nil, err
}
_, err = f.Seek(int64(offset), 0)
if err != nil {
return nil, err
}
if length == 0 {
return f, nil
}
return backend.LimitReader(f, int64(length)), nil
}
// Test returns true if a blob of the given type and name exists in the backend.
func (r *SFTP) Test(t backend.Type, name string) (bool, error) {
_, err := r.c.Lstat(r.filename(t, name))
@ -460,7 +477,7 @@ func (r *SFTP) List(t backend.Type, done <-chan struct{}) <-chan string {
go func() {
defer close(ch)
if t == backend.Data || t == backend.Tree {
if t == backend.Data {
// read first level
basedir := r.dirname(t, "")

148
cache.go
View file

@ -1,14 +1,12 @@
package restic
import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
@ -148,8 +146,6 @@ func (c *Cache) List(t backend.Type) ([]CacheEntry, error) {
switch t {
case backend.Snapshot:
dir = filepath.Join(c.base, "snapshots")
case backend.Tree:
dir = filepath.Join(c.base, "trees")
default:
return nil, fmt.Errorf("cache not supported for type %v", t)
}
@ -202,8 +198,6 @@ func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string,
switch t {
case backend.Snapshot:
return filepath.Join(c.base, "snapshots", filename), nil
case backend.Tree:
return filepath.Join(c.base, "trees", filename), nil
}
return "", fmt.Errorf("cache not supported for type %v", t)
@ -211,146 +205,6 @@ func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string,
// high-level functions
// RefreshSnapshots loads the maps for all snapshots and saves them to the
// local cache. Old cache entries are purged.
func (c *Cache) RefreshSnapshots(s *server.Server, p *Progress) error {
defer p.Done()
// list cache entries
entries, err := c.List(backend.Snapshot)
if err != nil {
return err
}
// list snapshots first
done := make(chan struct{})
defer close(done)
// check that snapshot blobs are cached
for name := range s.List(backend.Snapshot, done) {
id, err := backend.ParseID(name)
if err != nil {
continue
}
// remove snapshot from list of entries
for i, e := range entries {
if e.ID.Equal(id) {
entries = append(entries[:i], entries[i+1:]...)
break
}
}
has, err := c.Has(backend.Snapshot, "blobs", id)
if err != nil {
return err
}
if has {
continue
}
// else start progress reporting
p.Start()
// build new cache
_, err = cacheSnapshotBlobs(p, s, c, id)
if err != nil {
debug.Log("Cache.RefreshSnapshots", "unable to cache snapshot blobs for %v: %v", id.Str(), err)
return err
}
}
// remove other entries
for _, e := range entries {
debug.Log("Cache.RefreshSnapshots", "remove entry %v", e)
err = c.Purge(backend.Snapshot, e.Subtype, e.ID)
if err != nil {
return err
}
}
return nil
}
// cacheSnapshotBlobs creates a cache of all the blobs used within the
// snapshot. It collects all blobs from all trees and saves the resulting map
// to the cache and returns the map.
func cacheSnapshotBlobs(p *Progress, s *server.Server, c *Cache, id backend.ID) (*Map, error) {
debug.Log("CacheSnapshotBlobs", "create cache for snapshot %v", id.Str())
sn, err := LoadSnapshot(s, id)
if err != nil {
debug.Log("CacheSnapshotBlobs", "unable to load snapshot %v: %v", id.Str(), err)
return nil, err
}
m := NewMap()
// add top-level node
m.Insert(sn.Tree)
p.Report(Stat{Trees: 1})
// start walker
var wg sync.WaitGroup
ch := make(chan WalkTreeJob)
wg.Add(1)
go func() {
WalkTree(s, sn.Tree, nil, ch)
wg.Done()
}()
for i := 0; i < maxConcurrencyPreload; i++ {
wg.Add(1)
go func() {
for job := range ch {
if job.Tree == nil {
continue
}
p.Report(Stat{Trees: 1})
debug.Log("CacheSnapshotBlobs", "got job %v", job)
m.Merge(job.Tree.Map)
}
wg.Done()
}()
}
wg.Wait()
// save blob list for snapshot
return m, c.StoreMap(id, m)
}
func (c *Cache) StoreMap(snid backend.ID, m *Map) error {
wr, err := c.Store(backend.Snapshot, "blobs", snid)
if err != nil {
return nil
}
defer wr.Close()
enc := json.NewEncoder(wr)
err = enc.Encode(m)
if err != nil {
return err
}
return nil
}
func (c *Cache) LoadMap(s *server.Server, snid backend.ID) (*Map, error) {
rd, err := c.Load(backend.Snapshot, "blobs", snid)
if err != nil {
return nil, err
}
m := &Map{}
err = json.NewDecoder(rd).Decode(m)
return m, err
}
// GetCacheDir returns the cache directory according to XDG basedir spec, see
// http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
func GetCacheDir() (string, error) {
@ -389,3 +243,5 @@ func GetCacheDir() (string, error) {
return cachedir, nil
}
// TODO: implement RefreshIndex()

View file

@ -1,11 +1,9 @@
package restic_test
import (
"encoding/json"
"testing"
"github.com/restic/restic"
"github.com/restic/restic/backend"
. "github.com/restic/restic/test"
)
@ -15,46 +13,14 @@ func TestCache(t *testing.T) {
key := SetupKey(t, server, "geheim")
server.SetKey(key)
cache, err := restic.NewCache(server)
_, err := restic.NewCache(server)
OK(t, err)
arch, err := restic.NewArchiver(server)
OK(t, err)
// archive some files, this should automatically cache all blobs from the snapshot
_, id, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil)
_, _, err = arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil)
// try to load map from cache
rd, err := cache.Load(backend.Snapshot, "blobs", id)
OK(t, err)
dec := json.NewDecoder(rd)
m := &restic.Map{}
err = dec.Decode(m)
OK(t, err)
// remove cached blob list
OK(t, cache.Purge(backend.Snapshot, "blobs", id))
// load map from cache again, this should fail
rd, err = cache.Load(backend.Snapshot, "blobs", id)
Assert(t, err != nil, "Expected failure did not occur")
// recreate cached blob list
err = cache.RefreshSnapshots(server, nil)
OK(t, err)
// load map from cache again
rd, err = cache.Load(backend.Snapshot, "blobs", id)
OK(t, err)
dec = json.NewDecoder(rd)
m2 := &restic.Map{}
err = dec.Decode(m2)
OK(t, err)
// compare maps
Assert(t, m.Equals(m2), "Maps are not equal")
// TODO: test caching index
}

View file

@ -96,26 +96,6 @@ func (cmd CmdBackup) Usage() string {
return "DIR/FILE [snapshot-ID]"
}
func newCacheRefreshProgress() *restic.Progress {
p := restic.NewProgress(time.Second)
p.OnStart = func() {
fmt.Printf("refreshing cache\n")
}
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return p
}
p.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\x1b[2K[%s] %d trees loaded\r", formatDuration(d), s.Trees)
}
p.OnDone = func(s restic.Stat, d time.Duration, ticker bool) {
fmt.Printf("\x1b[2Krefreshed cache in %s\n", formatDuration(d))
}
return p
}
func newScanProgress() *restic.Progress {
if !terminal.IsTerminal(int(os.Stdout.Fd())) {
return nil
@ -200,6 +180,11 @@ func (cmd CmdBackup) Execute(args []string) error {
return err
}
err = s.LoadIndex()
if err != nil {
return err
}
var (
parentSnapshot string
parentSnapshotID backend.ID
@ -278,17 +263,6 @@ func (cmd CmdBackup) Execute(args []string) error {
return nil
}
err = arch.Cache().RefreshSnapshots(s, newCacheRefreshProgress())
if err != nil {
return err
}
fmt.Printf("loading blobs\n")
err = arch.Preload()
if err != nil {
return err
}
_, id, err := arch.Snapshot(newArchiveProgress(stat), target, parentSnapshotID)
if err != nil {
return err

View file

@ -4,10 +4,13 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
)
@ -24,7 +27,7 @@ func init() {
}
func (cmd CmdCat) Usage() string {
return "[data|tree|snapshot|key|masterkey|lock] ID"
return "[pack|blob|tree|snapshot|key|masterkey|lock] ID"
}
func (cmd CmdCat) Execute(args []string) error {
@ -62,37 +65,20 @@ func (cmd CmdCat) Execute(args []string) error {
}
}
// handle all types that don't need an index
switch tpe {
case "data":
// try storage id
data, err := s.LoadID(backend.Data, id)
if err == nil {
_, err = os.Stdout.Write(data)
case "index":
buf, err := s.Load(backend.Index, id)
if err != nil {
return err
}
_, err = os.Stdout.Write(data)
_, err = os.Stdout.Write(append(buf, '\n'))
return err
case "tree":
// try storage id
tree := &restic.Tree{}
err := s.LoadJSONID(backend.Tree, id, tree)
if err != nil {
return err
}
buf, err := json.MarshalIndent(&tree, "", " ")
if err != nil {
return err
}
fmt.Println(string(buf))
return nil
case "snapshot":
sn := &restic.Snapshot{}
err = s.LoadJSONID(backend.Snapshot, id, sn)
err = s.LoadJSONEncrypted(backend.Snapshot, id, sn)
if err != nil {
return err
}
@ -136,6 +122,52 @@ func (cmd CmdCat) Execute(args []string) error {
return nil
case "lock":
return errors.New("not yet implemented")
}
// load index, handle all the other types
err = s.LoadIndex()
if err != nil {
return err
}
switch tpe {
case "pack":
rd, err := s.Backend().Get(backend.Data, id.String())
if err != nil {
return err
}
_, err = io.Copy(os.Stdout, rd)
return err
case "blob":
data, err := s.LoadBlob(pack.Data, id)
if err == nil {
_, err = os.Stdout.Write(data)
return err
}
_, err = os.Stdout.Write(data)
return err
case "tree":
debug.Log("cat", "cat tree %v", id.Str())
tree := restic.NewTree()
err = s.LoadJSONPack(pack.Tree, id, tree)
if err != nil {
debug.Log("cat", "unable to load tree %v: %v", id.Str(), err)
return err
}
buf, err := json.MarshalIndent(&tree, "", " ")
if err != nil {
debug.Log("cat", "error json.MarshalIndent(): %v", err)
return err
}
_, err = os.Stdout.Write(append(buf, '\n'))
return nil
default:
return errors.New("invalid type")
}

View file

@ -59,9 +59,9 @@ func parseTime(str string) (time.Time, error) {
return time.Time{}, fmt.Errorf("unable to parse time: %q", str)
}
func (c CmdFind) findInTree(s *server.Server, blob server.Blob, path string) ([]findResult, error) {
debug.Log("restic.find", "checking tree %v\n", blob)
tree, err := restic.LoadTree(s, blob)
func (c CmdFind) findInTree(s *server.Server, id backend.ID, path string) ([]findResult, error) {
debug.Log("restic.find", "checking tree %v\n", id)
tree, err := restic.LoadTree(s, id)
if err != nil {
return nil, err
}
@ -93,12 +93,7 @@ func (c CmdFind) findInTree(s *server.Server, blob server.Blob, path string) ([]
}
if node.Type == "dir" {
b, err := tree.Map.FindID(node.Subtree)
if err != nil {
return nil, err
}
subdirResults, err := c.findInTree(s, b, filepath.Join(path, node.Name))
subdirResults, err := c.findInTree(s, id, filepath.Join(path, node.Name))
if err != nil {
return nil, err
}

View file

@ -7,7 +7,9 @@ import (
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/crypto"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
)
@ -32,31 +34,31 @@ func init() {
}
}
func fsckFile(opts CmdFsck, s *server.Server, m *restic.Map, IDs []backend.ID) (uint64, error) {
func fsckFile(opts CmdFsck, s *server.Server, IDs []backend.ID) (uint64, error) {
debug.Log("restic.fsckFile", "checking file %v", IDs)
var bytes uint64
for _, id := range IDs {
debug.Log("restic.fsck", " checking data blob %v\n", id)
// test if blob is in map
blob, err := m.FindID(id)
// test if blob is in the index
packID, tpe, _, length, err := s.Index().Lookup(id)
if err != nil {
return 0, fmt.Errorf("storage ID for data blob %v not found", id)
return 0, fmt.Errorf("storage for blob %v (%v) not found", id, tpe)
}
bytes += blob.Size
debug.Log("restic.fsck", " data blob found: %v\n", blob)
bytes += uint64(length - crypto.Extension)
debug.Log("restic.fsck", " blob found in pack %v\n", packID)
if opts.CheckData {
// load content
_, err := s.Load(backend.Data, blob)
_, err := s.LoadBlob(pack.Data, id)
if err != nil {
return 0, err
}
} else {
// test if data blob is there
ok, err := s.Test(backend.Data, blob.Storage.String())
ok, err := s.Test(backend.Data, packID.String())
if err != nil {
return 0, err
}
@ -68,17 +70,17 @@ func fsckFile(opts CmdFsck, s *server.Server, m *restic.Map, IDs []backend.ID) (
// if orphan check is active, record storage id
if opts.o_data != nil {
opts.o_data.Insert(blob.Storage)
opts.o_data.Insert(id)
}
}
return bytes, nil
}
func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
debug.Log("restic.fsckTree", "checking tree %v", blob)
func fsckTree(opts CmdFsck, s *server.Server, id backend.ID) error {
debug.Log("restic.fsckTree", "checking tree %v", id.Str())
tree, err := restic.LoadTree(s, blob)
tree, err := restic.LoadTree(s, id)
if err != nil {
return err
}
@ -86,7 +88,7 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
// if orphan check is active, record storage id
if opts.o_trees != nil {
// add ID to list
opts.o_trees.Insert(blob.Storage)
opts.o_trees.Insert(id)
}
var firstErr error
@ -95,23 +97,23 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
for i, node := range tree.Nodes {
if node.Name == "" {
return fmt.Errorf("node %v of tree %v has no name", i, blob.ID)
return fmt.Errorf("node %v of tree %v has no name", i, id.Str())
}
if node.Type == "" {
return fmt.Errorf("node %q of tree %v has no type", node.Name, blob.ID)
return fmt.Errorf("node %q of tree %v has no type", node.Name, id.Str())
}
switch node.Type {
case "file":
if node.Content == nil {
debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, blob.ID, node)
return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, blob.ID, node)
debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, id, node)
return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, id, node)
}
if node.Content == nil && node.Error == "" {
debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, blob.ID)
return fmt.Errorf("file node %q of tree %v has no content", node.Name, blob.ID)
debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, id)
return fmt.Errorf("file node %q of tree %v has no content", node.Name, id)
}
// record ids
@ -119,32 +121,25 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
seenIDs.Insert(id)
}
debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, blob.ID.Str())
bytes, err := fsckFile(opts, s, tree.Map, node.Content)
debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, id.Str())
bytes, err := fsckFile(opts, s, node.Content)
if err != nil {
return err
}
if bytes != node.Size {
debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, blob, node.Size, bytes)
return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, blob, node.Size, bytes)
debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes)
return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes)
}
case "dir":
if node.Subtree == nil {
return fmt.Errorf("dir node %q of tree %v (storage id %v) has no subtree", node.Name, blob.ID, blob.Storage)
}
// lookup blob
subtreeBlob, err := tree.Map.FindID(node.Subtree)
if err != nil {
firstErr = err
fmt.Fprintf(os.Stderr, "%v\n", err)
return fmt.Errorf("dir node %q of tree %v has no subtree", node.Name, id)
}
// record id
seenIDs.Insert(node.Subtree)
err = fsckTree(opts, s, subtreeBlob)
err = fsckTree(opts, s, node.Subtree)
if err != nil {
firstErr = err
fmt.Fprintf(os.Stderr, "%v\n", err)
@ -153,11 +148,11 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error {
}
// check map for unused ids
for _, id := range tree.Map.IDs() {
if seenIDs.Find(id) != nil {
return fmt.Errorf("tree %v: map contains unused ID %v", blob.ID, id)
}
}
// for _, id := range tree.Map.IDs() {
// if seenIDs.Find(id) != nil {
// return fmt.Errorf("tree %v: map contains unused ID %v", id, id)
// }
// }
return firstErr
}
@ -170,10 +165,6 @@ func fsckSnapshot(opts CmdFsck, s *server.Server, id backend.ID) error {
return fmt.Errorf("loading snapshot %v failed: %v", id, err)
}
if !sn.Tree.Valid() {
return fmt.Errorf("snapshot %s has invalid tree %v", sn.ID(), sn.Tree)
}
err = fsckTree(opts, s, sn.Tree)
if err != nil {
debug.Log("restic.fsck", " checking tree %v for snapshot %v\n", sn.Tree, id)
@ -201,6 +192,11 @@ func (cmd CmdFsck) Execute(args []string) error {
return err
}
err = s.LoadIndex()
if err != nil {
return err
}
if cmd.Snapshot != "" {
name, err := s.FindSnapshot(cmd.Snapshot)
if err != nil {
@ -249,40 +245,26 @@ func (cmd CmdFsck) Execute(args []string) error {
debug.Log("restic.fsck", "starting orphaned check\n")
l := []struct {
desc string
tpe backend.Type
set *backend.IDSet
}{
{"data blob", backend.Data, cmd.o_data},
{"tree", backend.Tree, cmd.o_trees},
}
cnt := make(map[pack.BlobType]*backend.IDSet)
cnt[pack.Data] = backend.NewIDSet()
cnt[pack.Tree] = backend.NewIDSet()
for _, d := range l {
debug.Log("restic.fsck", "checking for orphaned %v\n", d.desc)
for blob := range s.Index().Each(done) {
fmt.Println(blob.ID)
done := make(chan struct{})
for name := range s.List(d.tpe, done) {
id, err := backend.ParseID(name)
if err != nil {
fmt.Fprintf(os.Stderr, "invalid id for %v: %v\n", d.tpe, name)
err = cnt[blob.Type].Find(blob.ID)
if err != nil {
if !cmd.RemoveOrphaned {
fmt.Printf("orphaned %v blob %v\n", blob.Type, blob.ID)
continue
}
err = d.set.Find(id)
if err != nil {
if !cmd.RemoveOrphaned {
fmt.Printf("orphaned %v %v\n", d.desc, id)
continue
}
fmt.Printf("removing orphaned %v %v\n", d.desc, id)
err := s.Remove(d.tpe, name)
if err != nil {
return err
}
}
fmt.Printf("removing orphaned %v blob %v\n", blob.Type, blob.ID)
// err := s.Remove(d.tpe, name)
// if err != nil {
// return err
// }
return errors.New("not implemented")
}
}

View file

@ -20,7 +20,7 @@ func init() {
}
func (cmd CmdList) Usage() string {
return "[data|trees|snapshots|keys|locks]"
return "[blobs|packs|index|snapshots|keys|locks]"
}
func (cmd CmdList) Execute(args []string) error {
@ -35,10 +35,21 @@ func (cmd CmdList) Execute(args []string) error {
var t backend.Type
switch args[0] {
case "data":
case "blobs":
err = s.LoadIndex()
if err != nil {
return err
}
for blob := range s.Index().Each(nil) {
fmt.Println(blob.ID)
}
return nil
case "packs":
t = backend.Data
case "trees":
t = backend.Tree
case "index":
t = backend.Index
case "snapshots":
t = backend.Snapshot
case "keys":

View file

@ -38,8 +38,8 @@ func printNode(prefix string, n *restic.Node) string {
}
}
func printTree(prefix string, s *server.Server, blob server.Blob) error {
tree, err := restic.LoadTree(s, blob)
func printTree(prefix string, s *server.Server, id backend.ID) error {
tree, err := restic.LoadTree(s, id)
if err != nil {
return err
}
@ -48,12 +48,7 @@ func printTree(prefix string, s *server.Server, blob server.Blob) error {
fmt.Println(printNode(prefix, entry))
if entry.Type == "dir" && entry.Subtree != nil {
b, err := tree.Map.FindID(entry.Subtree)
if err != nil {
return err
}
err = printTree(filepath.Join(prefix, entry.Name), s, b)
err = printTree(filepath.Join(prefix, entry.Name), s, id)
if err != nil {
return err
}

View file

@ -35,6 +35,11 @@ func (cmd CmdRestore) Execute(args []string) error {
return err
}
err = s.LoadIndex()
if err != nil {
return err
}
name, err := backend.FindSnapshot(s, args[0])
if err != nil {
errx(1, "invalid id %q: %v", args[0], err)

View file

@ -64,6 +64,10 @@ The basic layout of a sample restic repository is shown below:
/tmp/restic-repo
├── data
│ ├── 21
│ │ └── 2159dd48f8a24f33c307b750592773f8b71ff8d11452132a7b2e2a6a01611be1
│ ├── 32
│ │ └── 32ea976bc30771cebad8285cd99120ac8786f9ffd42141d452458089985043a5
│ ├── 59
│ │ └── 59fe4bcde59bd6222eba87795e35a90d82cd2f138a27b6835032b7b58173a426
│ ├── 73
@ -71,25 +75,14 @@ The basic layout of a sample restic repository is shown below:
│ [...]
├── id
├── index
│ └── c38f5fb68307c6a3e3aa945d556e325dc38f5fb68307c6a3e3aa945d556e325d
│ ├── c38f5fb68307c6a3e3aa945d556e325dc38f5fb68307c6a3e3aa945d556e325d
│ └── ca171b1b7394d90d330b265d90f506f9984043b342525f019788f97e745c71fd
├── keys
│ └── b02de829beeb3c01a63e6b25cbd421a98fef144f03b9a02e46eff9e2ca3f0bd7
├── locks
├── snapshots
│ └── 22a5af1bdc6e616f8a29579458c49627e01b32210d09adb288d1ecda7c5711ec
├── tmp
├── trees
│ ├── 21
│ │ └── 2159dd48f8a24f33c307b750592773f8b71ff8d11452132a7b2e2a6a01611be1
│ ├── 32
│ │ └── 32ea976bc30771cebad8285cd99120ac8786f9ffd42141d452458089985043a5
│ ├── 95
│ │ └── 95f75feb05a7cc73e328b2efa668b1ea68f65fece55a93bc65aff6cd0bcfeefc
│ ├── b8
│ │ └── b8138ab08a4722596ac89c917827358da4672eac68e3c03a8115b88dbf4bfb59
│ ├── e0
│ │ └── e01150928f7ad24befd6ec15b087de1b9e0f92edabd8e5cabb3317f8b20ad044
│ [...]
└── version
A repository can be initialized with the `restic init` command, e.g.:
@ -99,39 +92,51 @@ A repository can be initialized with the `restic init` command, e.g.:
Pack Format
-----------
All files in the repository except Key, Tree and Data files just contain raw
data, stored as `IV || Ciphertext || MAC`. Tree and Data files may contain
several Blobs of data. The format is described in the following.
All files in the repository except Key and Data files just contain raw data,
stored as `IV || Ciphertext || MAC`. Data files may contain one or more Blobs
of data. The format is described in the following.
A Pack starts with a nonce and a header, the header describes the content and
is encrypted and signed. The Pack's structure is as follows:
The Pack's structure is as follows:
NONCE || Header_Length ||
IV_Header || Ciphertext_Header || MAC_Header ||
IV_Blob_1 || Ciphertext_Blob_1 || MAC_Blob_1 ||
[...]
IV_Blob_n || Ciphertext_Blob_n || MAC_Blob_n ||
MAC
EncryptedBlob1 || ... || EncryptedBlobN || EncryptedHeader || Header_Length
`NONCE` consists of 16 bytes and `Header_Length` is a four byte integer in
little-endian encoding.
At the end of the Pack is a header, which describes the content and is
encrypted and signed. `Header_Length` is the length of the encrypted header
encoded as a four byte integer in little-endian encoding. Placing the header at
the end of a file allows writing the blobs in a continuous stream as soon as
they are read during the backup phase. This reduces code complexity and avoids
having to re-write a file once the pack is complete and the content and length
of the header is known.
All the parts (`Ciphertext_Header`, `Ciphertext_Blob1` etc.) are signed and
encrypted independently. In addition, the complete pack is signed using
`NONCE`. This enables repository reorganisation without having to touch the
encrypted Blobs. In addition it also allows efficient indexing, for only the
header needs to be read in order to find out which Blobs are contained in the
Pack. Since the header is signed, authenticity of the header can be checked
without having to read the complete Pack.
All the blobs (`EncryptedBlob1`, `EncryptedBlobN` etc.) are signed and
encrypted independently. This enables repository reorganisation without having
to touch the encrypted Blobs. In addition it also allows efficient indexing,
for only the header needs to be read in order to find out which Blobs are
contained in the Pack. Since the header is signed, authenticity of the header
can be checked without having to read the complete Pack.
After decryption, a Pack's header consists of the following elements:
Length(IV_Blob_1+Ciphertext_Blob1+MAC_Blob_1) || Hash(Plaintext_Blob_1) ||
Type_Blob1 || Length(EncryptedBlob1) || Hash(Plaintext_Blob1) ||
[...]
Length(IV_Blob_n+Ciphertext_Blob_n+MAC_Blob_n) || Hash(Plaintext_Blob_n) ||
Type_BlobN || Length(EncryptedBlobN) || Hash(Plaintext_Blobn) ||
This is enough to calculate the offsets for all the Blobs in the Pack. Length
is the length of a Blob as a four byte integer in little-endian format.
is the length of a Blob as a four byte integer in little-endian format. The
type field is a one byte field and labels the content of a blob according to
the following table:
Type | Meaning
-----|---------
0 | data
1 | tree
All other types are invalid, more types may be added in the future.
For reconstructing the index or parsing a pack without an index, first the last
four bytes must be read in order to find the length of the header. Afterwards,
the header can be read and parsed, which yields all plaintext hashes, types,
offsets and lengths of all included blobs.
Indexing
--------
@ -139,23 +144,40 @@ Indexing
Index files contain information about Data and Tree Blobs and the Packs they
are contained in and store this information in the repository. When the local
cached index is not accessible any more, the index files can be downloaded and
used to reconstruct the index. The index Blobs are encrypted and signed like
Data and Tree Blobs, so the outer structure is `IV || Ciphertext || MAC` again.
The plaintext consists of a JSON document like the following:
used to reconstruct the index. The files are encrypted and signed like Data and
Tree Blobs, so the outer structure is `IV || Ciphertext || MAC` again. The
plaintext consists of a JSON document like the following:
[
{
"id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c",
"blobs": [
"3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce",
"9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae",
"d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66"
]
}
]
[ {
"id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c",
"blobs": [
{
"id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce",
"type": "data",
"offset": 0,
"length": 25
},{
"id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae",
"type": "tree",
"offset": 38,
"length": 100
},
{
"id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66",
"type": "data",
"offset": 150,
"length": 123
}
]
} ]
This JSON document lists all the Blobs with contents. In this example, the Pack
`73d04e61` contains three Blobs, the plaintext hashes are listed afterwards.
This JSON document lists Blobs with contents. In this example, the Pack
`73d04e61` contains two data Blobs and one Tree blob, the plaintext hashes are
listed afterwards.
There may be an arbitrary number of index files, containing information on
non-disjoint sets of Packs. The number of packs described in a single file is
chosen so that the file size is kep below 8 MiB.
Keys, Encryption and MAC
------------------------

219
map.go
View file

@ -1,219 +0,0 @@
package restic
import (
"encoding/json"
"errors"
"sort"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/server"
)
type Map struct {
list []server.Blob
m sync.Mutex
}
var ErrBlobNotFound = errors.New("Blob not found")
func NewMap() *Map {
return &Map{
list: []server.Blob{},
}
}
func (bl *Map) find(blob server.Blob, checkSize bool) (int, server.Blob, error) {
pos := sort.Search(len(bl.list), func(i int) bool {
return blob.ID.Compare(bl.list[i].ID) >= 0
})
if pos < len(bl.list) {
b := bl.list[pos]
if blob.ID.Compare(b.ID) == 0 && (!checkSize || blob.Size == b.Size) {
return pos, b, nil
}
}
return pos, server.Blob{}, ErrBlobNotFound
}
func (bl *Map) Find(blob server.Blob) (server.Blob, error) {
bl.m.Lock()
defer bl.m.Unlock()
_, blob, err := bl.find(blob, true)
return blob, err
}
func (bl *Map) FindID(id backend.ID) (server.Blob, error) {
bl.m.Lock()
defer bl.m.Unlock()
_, blob, err := bl.find(server.Blob{ID: id}, false)
return blob, err
}
func (bl *Map) Merge(other *Map) {
bl.m.Lock()
defer bl.m.Unlock()
other.m.Lock()
defer other.m.Unlock()
for _, blob := range other.list {
bl.insert(blob)
}
}
func (bl *Map) insert(blob server.Blob) server.Blob {
pos, b, err := bl.find(blob, true)
if err == nil {
// already present
return b
}
// insert blob
// https://code.google.com/p/go-wiki/wiki/SliceTricks
bl.list = append(bl.list, server.Blob{})
copy(bl.list[pos+1:], bl.list[pos:])
bl.list[pos] = blob
return blob
}
func (bl *Map) Insert(blob server.Blob) server.Blob {
bl.m.Lock()
defer bl.m.Unlock()
debug.Log("Map.Insert", " Map<%p> insert %v", bl, blob)
return bl.insert(blob)
}
func (bl *Map) MarshalJSON() ([]byte, error) {
return json.Marshal(bl.list)
}
func (bl *Map) UnmarshalJSON(data []byte) error {
return json.Unmarshal(data, &bl.list)
}
func (bl *Map) IDs() []backend.ID {
bl.m.Lock()
defer bl.m.Unlock()
ids := make([]backend.ID, 0, len(bl.list))
for _, b := range bl.list {
ids = append(ids, b.ID)
}
return ids
}
func (bl *Map) StorageIDs() []backend.ID {
bl.m.Lock()
defer bl.m.Unlock()
ids := make([]backend.ID, 0, len(bl.list))
for _, b := range bl.list {
ids = append(ids, b.Storage)
}
return ids
}
func (bl *Map) Equals(other *Map) bool {
if bl == nil && other == nil {
return true
}
if bl == nil || other == nil {
return false
}
bl.m.Lock()
defer bl.m.Unlock()
if len(bl.list) != len(other.list) {
debug.Log("Map.Equals", "length does not match: %d != %d", len(bl.list), len(other.list))
return false
}
for i := 0; i < len(bl.list); i++ {
if bl.list[i].Compare(other.list[i]) != 0 {
debug.Log("Map.Equals", "entry %d does not match: %v != %v", i, bl.list[i], other.list[i])
return false
}
}
return true
}
// Each calls f for each blob in the Map. While Each is running, no other
// operation is possible, since a mutex is held for the whole time.
func (bl *Map) Each(f func(blob server.Blob)) {
bl.m.Lock()
defer bl.m.Unlock()
for _, blob := range bl.list {
f(blob)
}
}
// Select returns a list of of blobs from the plaintext IDs given in list.
func (bl *Map) Select(list backend.IDs) (server.Blobs, error) {
bl.m.Lock()
defer bl.m.Unlock()
blobs := make(server.Blobs, 0, len(list))
for _, id := range list {
_, blob, err := bl.find(server.Blob{ID: id}, false)
if err != nil {
return nil, err
}
blobs = append(blobs, blob)
}
return blobs, nil
}
// Len returns the number of blobs in the map.
func (bl *Map) Len() int {
bl.m.Lock()
defer bl.m.Unlock()
return len(bl.list)
}
// Prune deletes all IDs from the map except the ones listed in ids.
func (bl *Map) Prune(ids *backend.IDSet) {
bl.m.Lock()
defer bl.m.Unlock()
pos := 0
for pos < len(bl.list) {
blob := bl.list[pos]
if ids.Find(blob.ID) != nil {
// remove element
bl.list = append(bl.list[:pos], bl.list[pos+1:]...)
continue
}
pos++
}
}
// DeleteID removes the plaintext ID id from the map.
func (bl *Map) DeleteID(id backend.ID) {
bl.m.Lock()
defer bl.m.Unlock()
pos, _, err := bl.find(server.Blob{ID: id}, false)
if err != nil {
return
}
bl.list = append(bl.list[:pos], bl.list[pos+1:]...)
}

View file

@ -1,147 +0,0 @@
package restic_test
import (
"crypto/rand"
"encoding/json"
"flag"
"io"
mrand "math/rand"
"sync"
"testing"
"time"
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/server"
. "github.com/restic/restic/test"
)
var maxWorkers = flag.Uint("workers", 20, "number of workers to test Map concurrent access against")
func randomID() []byte {
buf := make([]byte, backend.IDSize)
_, err := io.ReadFull(rand.Reader, buf)
if err != nil {
panic(err)
}
return buf
}
func newBlob() server.Blob {
return server.Blob{
ID: randomID(),
Size: uint64(mrand.Uint32()),
Storage: randomID(),
StorageSize: uint64(mrand.Uint32()),
}
}
// Test basic functionality
func TestMap(t *testing.T) {
bl := restic.NewMap()
b := newBlob()
bl.Insert(b)
for i := 0; i < 1000; i++ {
bl.Insert(newBlob())
}
b2, err := bl.Find(server.Blob{ID: b.ID, Size: b.Size})
OK(t, err)
Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
b2, err = bl.FindID(b.ID)
OK(t, err)
Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
bl2 := restic.NewMap()
for i := 0; i < 1000; i++ {
bl.Insert(newBlob())
}
b2, err = bl2.Find(b)
Assert(t, err != nil, "found ID in restic that was never inserted: %v", b2)
bl2.Merge(bl)
b2, err = bl2.Find(b)
if err != nil {
t.Fatal(err)
}
if b.Compare(b2) != 0 {
t.Fatalf("items are not equal: want %v, got %v", b, b2)
}
}
// Test JSON encode/decode
func TestMapJSON(t *testing.T) {
bl := restic.NewMap()
b := server.Blob{ID: randomID()}
bl.Insert(b)
b2, err := bl.Find(b)
OK(t, err)
Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
buf, err := json.Marshal(bl)
OK(t, err)
bl2 := restic.Map{}
json.Unmarshal(buf, &bl2)
b2, err = bl2.Find(b)
OK(t, err)
Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2)
buf, err = json.Marshal(bl2)
OK(t, err)
}
// random insert/find access by several goroutines
func TestMapRandom(t *testing.T) {
var wg sync.WaitGroup
worker := func(bl *restic.Map) {
defer wg.Done()
b := newBlob()
bl.Insert(b)
for i := 0; i < 200; i++ {
bl.Insert(newBlob())
}
d := time.Duration(mrand.Intn(10)*100) * time.Millisecond
time.Sleep(d)
for i := 0; i < 100; i++ {
b2, err := bl.Find(b)
if err != nil {
t.Fatal(err)
}
if b.Compare(b2) != 0 {
t.Fatalf("items are not equal: want %v, got %v", b, b2)
}
}
bl2 := restic.NewMap()
for i := 0; i < 200; i++ {
bl2.Insert(newBlob())
}
bl2.Merge(bl)
}
bl := restic.NewMap()
for i := 0; uint(i) < *maxWorkers; i++ {
wg.Add(1)
go worker(bl)
}
wg.Wait()
}

16
node.go
View file

@ -12,6 +12,7 @@ import (
"github.com/juju/arrar"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
)
@ -98,14 +99,14 @@ func nodeTypeFromFileInfo(fi os.FileInfo) string {
return ""
}
func (node *Node) CreateAt(path string, m *Map, s *server.Server) error {
func (node *Node) CreateAt(path string, s *server.Server) error {
switch node.Type {
case "dir":
if err := node.createDirAt(path); err != nil {
return err
}
case "file":
if err := node.createFileAt(path, m, s); err != nil {
if err := node.createFileAt(path, s); err != nil {
return err
}
case "symlink":
@ -171,7 +172,7 @@ func (node Node) createDirAt(path string) error {
return nil
}
func (node Node) createFileAt(path string, m *Map, s *server.Server) error {
func (node Node) createFileAt(path string, s *server.Server) error {
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0600)
defer f.Close()
@ -179,13 +180,8 @@ func (node Node) createFileAt(path string, m *Map, s *server.Server) error {
return arrar.Annotate(err, "OpenFile")
}
for _, blobid := range node.Content {
blob, err := m.FindID(blobid)
if err != nil {
return arrar.Annotate(err, "Find Blob")
}
buf, err := s.Load(backend.Data, blob)
for _, id := range node.Content {
buf, err := s.LoadBlob(pack.Data, id)
if err != nil {
return arrar.Annotate(err, "Load")
}

291
pack/pack.go Normal file
View file

@ -0,0 +1,291 @@
package pack
import (
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/crypto"
)
type BlobType uint8
const (
Data BlobType = 0
Tree = 1
)
func (t BlobType) String() string {
switch t {
case Data:
return "data"
case Tree:
return "tree"
}
return fmt.Sprintf("<BlobType %d>", t)
}
func (t BlobType) MarshalJSON() ([]byte, error) {
switch t {
case Data:
return []byte(`"data"`), nil
case Tree:
return []byte(`"tree"`), nil
}
return nil, errors.New("unknown blob type")
}
func (t *BlobType) UnmarshalJSON(buf []byte) error {
switch string(buf) {
case `"data"`:
*t = Data
case `"tree"`:
*t = Tree
default:
return errors.New("unknown blob type")
}
return nil
}
// Blob is a blob within a pack.
type Blob struct {
Type BlobType
Length uint32
ID backend.ID
Offset uint
}
// GetReader returns an io.Reader for the blob entry e.
func (e Blob) GetReader(rd io.ReadSeeker) (io.Reader, error) {
// seek to the correct location
_, err := rd.Seek(int64(e.Offset), 0)
if err != nil {
return nil, err
}
return io.LimitReader(rd, int64(e.Length)), nil
}
// Packer is used to create a new Pack.
type Packer struct {
blobs []Blob
bytes uint
k *crypto.Key
wr io.Writer
hw *backend.HashingWriter
m sync.Mutex
}
// NewPacker returns a new Packer that can be used to pack blobs
// together.
func NewPacker(k *crypto.Key, w io.Writer) *Packer {
return &Packer{k: k, wr: w, hw: backend.NewHashingWriter(w, sha256.New())}
}
// Add saves the data read from rd as a new blob to the packer. Returned is the
// number of bytes written to the pack.
func (p *Packer) Add(t BlobType, id backend.ID, rd io.Reader) (int64, error) {
p.m.Lock()
defer p.m.Unlock()
c := Blob{Type: t, ID: id}
n, err := io.Copy(p.hw, rd)
c.Length = uint32(n)
c.Offset = p.bytes
p.bytes += uint(n)
p.blobs = append(p.blobs, c)
return n, err
}
var entrySize = uint(binary.Size(BlobType(0)) + binary.Size(uint32(0)) + backend.IDSize)
// headerEntry is used with encoding/binary to read and write header entries
type headerEntry struct {
Type BlobType
Length uint32
ID [backend.IDSize]byte
}
// Finalize writes the header for all added blobs and finalizes the pack.
// Returned are the complete number of bytes written, including the header.
// After Finalize() has finished, the ID of this pack can be obtained by
// calling ID().
func (p *Packer) Finalize() (bytesWritten uint, err error) {
p.m.Lock()
defer p.m.Unlock()
bytesWritten = p.bytes
// create writer to encrypt header
wr := crypto.EncryptTo(p.k, p.hw)
bytesHeader, err := p.writeHeader(wr)
if err != nil {
wr.Close()
return bytesWritten + bytesHeader, err
}
bytesWritten += bytesHeader
// finalize encrypted header
err = wr.Close()
if err != nil {
return bytesWritten, err
}
// account for crypto overhead
bytesWritten += crypto.Extension
// write length
err = binary.Write(p.hw, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension))
if err != nil {
return bytesWritten, err
}
bytesWritten += uint(binary.Size(uint32(0)))
p.bytes = uint(bytesWritten)
return bytesWritten, nil
}
// writeHeader constructs and writes the header to wr.
func (p *Packer) writeHeader(wr io.Writer) (bytesWritten uint, err error) {
for _, b := range p.blobs {
entry := headerEntry{
Type: b.Type,
Length: b.Length,
}
copy(entry.ID[:], b.ID)
err := binary.Write(wr, binary.LittleEndian, entry)
if err != nil {
return bytesWritten, err
}
bytesWritten += entrySize
}
return
}
// ID returns the ID of all data written so far.
func (p *Packer) ID() backend.ID {
p.m.Lock()
defer p.m.Unlock()
return p.hw.Sum(nil)
}
// Size returns the number of bytes written so far.
func (p *Packer) Size() uint {
p.m.Lock()
defer p.m.Unlock()
return p.bytes
}
// Count returns the number of blobs in this packer.
func (p *Packer) Count() int {
p.m.Lock()
defer p.m.Unlock()
return len(p.blobs)
}
// Blobs returns the slice of blobs that have been written.
func (p *Packer) Blobs() []Blob {
p.m.Lock()
defer p.m.Unlock()
return p.blobs
}
// Writer returns the underlying writer.
func (p *Packer) Writer() io.Writer {
return p.wr
}
func (p *Packer) String() string {
return fmt.Sprintf("<Packer %d blobs, %d bytes>", len(p.blobs), p.bytes)
}
// Unpacker is used to read individual blobs from a pack.
type Unpacker struct {
rd io.ReadSeeker
Entries []Blob
k *crypto.Key
}
// NewUnpacker returns a pointer to Unpacker which can be used to read
// individual Blobs from a pack.
func NewUnpacker(k *crypto.Key, entries []Blob, rd io.ReadSeeker) (*Unpacker, error) {
var err error
ls := binary.Size(uint32(0))
// reset to the end to read header length
_, err = rd.Seek(-int64(ls), 2)
if err != nil {
return nil, fmt.Errorf("seeking to read header length failed: %v", err)
}
var length uint32
err = binary.Read(rd, binary.LittleEndian, &length)
if err != nil {
return nil, fmt.Errorf("reading header length failed: %v", err)
}
// reset to the beginning of the header
_, err = rd.Seek(-int64(ls)-int64(length), 2)
if err != nil {
return nil, fmt.Errorf("seeking to read header length failed: %v", err)
}
// read header
hrd, err := crypto.DecryptFrom(k, io.LimitReader(rd, int64(length)))
if err != nil {
return nil, err
}
if entries == nil {
pos := uint(0)
for {
e := headerEntry{}
err = binary.Read(hrd, binary.LittleEndian, &e)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
entries = append(entries, Blob{
Type: e.Type,
Length: e.Length,
ID: e.ID[:],
Offset: pos,
})
pos += uint(e.Length)
}
}
p := &Unpacker{
rd: rd,
k: k,
Entries: entries,
}
return p, nil
}

109
pack/pack_test.go Normal file
View file

@ -0,0 +1,109 @@
package pack_test
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"encoding/binary"
"encoding/json"
"io"
"io/ioutil"
"testing"
"github.com/restic/restic/backend"
"github.com/restic/restic/crypto"
"github.com/restic/restic/pack"
. "github.com/restic/restic/test"
)
var lengths = []int{23, 31650, 25860, 10928, 13769, 19862, 5211, 127, 13690, 30231}
func TestCreatePack(t *testing.T) {
type Buf struct {
data []byte
id backend.ID
}
bufs := []Buf{}
for _, l := range lengths {
b := make([]byte, l)
_, err := io.ReadFull(rand.Reader, b)
OK(t, err)
h := sha256.Sum256(b)
bufs = append(bufs, Buf{data: b, id: h[:]})
}
file := bytes.NewBuffer(nil)
// create random keys
k := crypto.NewKey()
// pack blobs
p := pack.NewPacker(k, file)
for _, b := range bufs {
p.Add(pack.Tree, b.id, bytes.NewReader(b.data))
}
// write file
n, err := p.Finalize()
OK(t, err)
written := 0
// data
for _, l := range lengths {
written += l
}
// header length
written += binary.Size(uint32(0))
// header
written += len(lengths) * (binary.Size(pack.BlobType(0)) + binary.Size(uint32(0)) + backend.IDSize)
// header crypto
written += crypto.Extension
// check length
Equals(t, uint(written), n)
Equals(t, uint(written), p.Size())
// read and parse it again
rd := bytes.NewReader(file.Bytes())
np, err := pack.NewUnpacker(k, nil, rd)
OK(t, err)
Equals(t, len(np.Entries), len(bufs))
for i, b := range bufs {
e := np.Entries[i]
Equals(t, b.id, e.ID)
brd, err := e.GetReader(rd)
OK(t, err)
data, err := ioutil.ReadAll(brd)
OK(t, err)
Assert(t, bytes.Equal(b.data, data),
"data for blob %v doesn't match", i)
}
}
var blobTypeJson = []struct {
t pack.BlobType
res string
}{
{pack.Data, `"data"`},
{pack.Tree, `"tree"`},
}
func TestBlobTypeJSON(t *testing.T) {
for _, test := range blobTypeJson {
// test serialize
buf, err := json.Marshal(test.t)
OK(t, err)
Equals(t, test.res, string(buf))
// test unserialize
var v pack.BlobType
err = json.Unmarshal([]byte(test.res), &v)
OK(t, err)
Equals(t, test.t, v)
}
}

View file

@ -36,8 +36,8 @@ func NewRestorer(s *server.Server, snid backend.ID) (*Restorer, error) {
return r, nil
}
func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error {
tree, err := LoadTree(res.s, treeBlob)
func (res *Restorer) to(dst string, dir string, treeID backend.ID) error {
tree, err := LoadTree(res.s, treeID)
if err != nil {
return res.Error(dir, nil, arrar.Annotate(err, "LoadTree"))
}
@ -47,7 +47,7 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error {
if res.Filter == nil ||
res.Filter(filepath.Join(dir, node.Name), dstpath, node) {
err := node.CreateAt(dstpath, tree.Map, res.s)
err := node.CreateAt(dstpath, res.s)
// Did it fail because of ENOENT?
if arrar.Check(err, func(err error) bool {
@ -60,7 +60,7 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error {
// Create parent directories and retry
err = os.MkdirAll(filepath.Dir(dstpath), 0700)
if err == nil || err == os.ErrExist {
err = node.CreateAt(dstpath, tree.Map, res.s)
err = node.CreateAt(dstpath, res.s)
}
}
@ -74,20 +74,11 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error {
if node.Type == "dir" {
if node.Subtree == nil {
return fmt.Errorf("Dir without subtree in tree %s", treeBlob)
return fmt.Errorf("Dir without subtree in tree %v", treeID.Str())
}
subp := filepath.Join(dir, node.Name)
subtreeBlob, err := tree.Map.FindID(node.Subtree)
if err != nil {
err = res.Error(subp, node, arrar.Annotate(err, "lookup subtree"))
if err != nil {
return err
}
}
err = res.to(dst, subp, subtreeBlob)
err = res.to(dst, subp, node.Subtree)
if err != nil {
err = res.Error(subp, node, arrar.Annotate(err, "restore subtree"))
if err != nil {

258
server/index.go Normal file
View file

@ -0,0 +1,258 @@
package server
import (
"encoding/json"
"errors"
"fmt"
"io"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
)
// Index holds a lookup table for id -> pack.
type Index struct {
m sync.Mutex
pack map[string]indexEntry
}
type indexEntry struct {
tpe pack.BlobType
packID backend.ID
offset uint
length uint
old bool
}
// NewIndex returns a new index.
func NewIndex() *Index {
return &Index{
pack: make(map[string]indexEntry),
}
}
func (idx *Index) store(t pack.BlobType, id, pack backend.ID, offset, length uint, old bool) {
idx.pack[id.String()] = indexEntry{
tpe: t,
packID: pack,
offset: offset,
length: length,
old: old,
}
}
// Store remembers the id and pack in the index.
func (idx *Index) Store(t pack.BlobType, id, pack backend.ID, offset, length uint) {
idx.m.Lock()
defer idx.m.Unlock()
debug.Log("Index.Store", "pack %v contains id %v (%v), offset %v, length %v",
pack.Str(), id.Str(), t, offset, length)
idx.store(t, id, pack, offset, length, false)
}
// Remove removes the pack ID from the index.
func (idx *Index) Remove(packID backend.ID) {
idx.m.Lock()
defer idx.m.Unlock()
debug.Log("Index.Remove", "id %v removed", packID.Str())
s := packID.String()
if _, ok := idx.pack[s]; ok {
delete(idx.pack, s)
}
}
// Lookup returns the pack for the id.
func (idx *Index) Lookup(id backend.ID) (packID backend.ID, tpe pack.BlobType, offset, length uint, err error) {
idx.m.Lock()
defer idx.m.Unlock()
if p, ok := idx.pack[id.String()]; ok {
debug.Log("Index.Lookup", "id %v found in pack %v at %d, length %d",
id.Str(), p.packID.Str(), p.offset, p.length)
return p.packID, p.tpe, p.offset, p.length, nil
}
debug.Log("Index.Lookup", "id %v not found", id.Str())
return nil, pack.Data, 0, 0, errors.New("id not found")
}
// Has returns true iff the id is listed in the index.
func (idx *Index) Has(id backend.ID) bool {
_, _, _, _, err := idx.Lookup(id)
if err == nil {
return true
}
return false
}
// Merge loads all items from other into idx.
func (idx *Index) Merge(other *Index) {
debug.Log("Index.Merge", "Merge index with %p", other)
idx.m.Lock()
defer idx.m.Unlock()
for k, v := range other.pack {
if _, ok := idx.pack[k]; ok {
debug.Log("Index.Merge", "index already has key %v, updating", k[:8])
}
idx.pack[k] = v
}
debug.Log("Index.Merge", "done merging index")
}
// Each returns a channel that yields all blobs known to the index. If done is
// closed, the background goroutine terminates. This blocks any modification of
// the index.
func (idx *Index) Each(done chan struct{}) <-chan pack.Blob {
idx.m.Lock()
ch := make(chan pack.Blob)
go func() {
defer idx.m.Unlock()
defer func() {
close(ch)
}()
for ids, blob := range idx.pack {
id, err := backend.ParseID(ids)
if err != nil {
// ignore invalid IDs
continue
}
select {
case <-done:
return
case ch <- pack.Blob{
ID: id,
Offset: blob.offset,
Type: blob.tpe,
Length: uint32(blob.length),
}:
}
}
}()
return ch
}
// Count returns the number of blobs of type t in the index.
func (idx *Index) Count(t pack.BlobType) (n uint) {
debug.Log("Index.Count", "counting blobs of type %v", t)
idx.m.Lock()
defer idx.m.Unlock()
for id, blob := range idx.pack {
if blob.tpe == t {
n++
debug.Log("Index.Count", " blob %v counted: %v", id[:8], blob)
}
}
return
}
type packJSON struct {
ID string `json:"id"`
Blobs []blobJSON `json:"blobs"`
}
type blobJSON struct {
ID string `json:"id"`
Type pack.BlobType `json:"type"`
Offset uint `json:"offset"`
Length uint `json:"length"`
}
// Encode writes the JSON serialization of the index to the writer w. This
// serialization only contains new blobs added via idx.Store(), not old ones
// introduced via DecodeIndex().
func (idx *Index) Encode(w io.Writer) error {
debug.Log("Index.Encode", "encoding index")
idx.m.Lock()
defer idx.m.Unlock()
list := []*packJSON{}
packs := make(map[string]*packJSON)
for id, blob := range idx.pack {
if blob.old {
continue
}
debug.Log("Index.Encode", "handle blob %q", id[:8])
if blob.packID == nil {
debug.Log("Index.Encode", "blob %q has no packID! (type %v, offset %v, length %v)",
id[:8], blob.tpe, blob.offset, blob.length)
return fmt.Errorf("unable to serialize index: pack for blob %v hasn't been written yet", id)
}
// see if pack is already in map
p, ok := packs[blob.packID.String()]
if !ok {
// else create new pack
p = &packJSON{ID: blob.packID.String()}
// and append it to the list and map
list = append(list, p)
packs[p.ID] = p
}
// add blob
p.Blobs = append(p.Blobs, blobJSON{
ID: id,
Type: blob.tpe,
Offset: blob.offset,
Length: blob.length,
})
}
debug.Log("Index.Encode", "done")
enc := json.NewEncoder(w)
return enc.Encode(list)
}
// DecodeIndex loads and unserializes an index from rd.
func DecodeIndex(rd io.Reader) (*Index, error) {
debug.Log("Index.DecodeIndex", "Start decoding index")
list := []*packJSON{}
dec := json.NewDecoder(rd)
err := dec.Decode(&list)
if err != nil {
return nil, err
}
idx := NewIndex()
for _, pack := range list {
packID, err := backend.ParseID(pack.ID)
if err != nil {
debug.Log("Index.DecodeIndex", "error parsing pack ID %q: %v", pack.ID, err)
return nil, err
}
for _, blob := range pack.Blobs {
blobID, err := backend.ParseID(blob.ID)
if err != nil {
debug.Log("Index.DecodeIndex", "error parsing blob ID %q: %v", blob.ID, err)
return nil, err
}
idx.store(blob.Type, blobID, packID, blob.Offset, blob.Length, true)
}
}
debug.Log("Index.DecodeIndex", "done")
return idx, err
}

225
server/index_test.go Normal file
View file

@ -0,0 +1,225 @@
package server_test
import (
"bytes"
"crypto/rand"
"io"
"testing"
"github.com/restic/restic/backend"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
. "github.com/restic/restic/test"
)
func randomID() backend.ID {
buf := make([]byte, backend.IDSize)
_, err := io.ReadFull(rand.Reader, buf)
if err != nil {
panic(err)
}
return buf
}
func TestIndexSerialize(t *testing.T) {
type testEntry struct {
id backend.ID
pack backend.ID
tpe pack.BlobType
offset, length uint
}
tests := []testEntry{}
idx := server.NewIndex()
// create 50 packs with 20 blobs each
for i := 0; i < 50; i++ {
packID := randomID()
pos := uint(0)
for j := 0; j < 20; j++ {
id := randomID()
length := uint(i*100 + j)
idx.Store(pack.Data, id, packID, pos, length)
tests = append(tests, testEntry{
id: id,
pack: packID,
tpe: pack.Data,
offset: pos,
length: length,
})
pos += length
}
}
wr := bytes.NewBuffer(nil)
err := idx.Encode(wr)
OK(t, err)
idx2, err := server.DecodeIndex(wr)
OK(t, err)
Assert(t, idx2 != nil,
"nil returned for decoded index")
wr2 := bytes.NewBuffer(nil)
err = idx2.Encode(wr2)
OK(t, err)
for _, testBlob := range tests {
packID, tpe, offset, length, err := idx.Lookup(testBlob.id)
OK(t, err)
Equals(t, testBlob.pack, packID)
Equals(t, testBlob.tpe, tpe)
Equals(t, testBlob.offset, offset)
Equals(t, testBlob.length, length)
packID, tpe, offset, length, err = idx2.Lookup(testBlob.id)
OK(t, err)
Equals(t, testBlob.pack, packID)
Equals(t, testBlob.tpe, tpe)
Equals(t, testBlob.offset, offset)
Equals(t, testBlob.length, length)
}
// add more blobs to idx2
newtests := []testEntry{}
for i := 0; i < 10; i++ {
packID := randomID()
pos := uint(0)
for j := 0; j < 10; j++ {
id := randomID()
length := uint(i*100 + j)
idx2.Store(pack.Data, id, packID, pos, length)
newtests = append(newtests, testEntry{
id: id,
pack: packID,
tpe: pack.Data,
offset: pos,
length: length,
})
pos += length
}
}
// serialize idx2, unserialize to idx3
wr3 := bytes.NewBuffer(nil)
err = idx2.Encode(wr3)
OK(t, err)
idx3, err := server.DecodeIndex(wr3)
OK(t, err)
Assert(t, idx3 != nil,
"nil returned for decoded index")
// all old blobs must not be present in the index
for _, testBlob := range tests {
_, _, _, _, err := idx3.Lookup(testBlob.id)
Assert(t, err != nil,
"found old id %v in serialized index", testBlob.id.Str())
}
// all new blobs must be in the index
for _, testBlob := range newtests {
packID, tpe, offset, length, err := idx3.Lookup(testBlob.id)
OK(t, err)
Equals(t, testBlob.pack, packID)
Equals(t, testBlob.tpe, tpe)
Equals(t, testBlob.offset, offset)
Equals(t, testBlob.length, length)
}
}
func TestIndexSize(t *testing.T) {
idx := server.NewIndex()
packs := 200
blobs := 100
for i := 0; i < packs; i++ {
packID := randomID()
pos := uint(0)
for j := 0; j < blobs; j++ {
id := randomID()
length := uint(i*100 + j)
idx.Store(pack.Data, id, packID, pos, length)
pos += length
}
}
wr := bytes.NewBuffer(nil)
err := idx.Encode(wr)
OK(t, err)
t.Logf("Index file size for %d blobs in %d packs is %d", blobs*packs, packs, wr.Len())
}
// example index serialization from doc/Design.md
var docExample = []byte(`
[ {
"id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c",
"blobs": [
{
"id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce",
"type": "data",
"offset": 0,
"length": 25
},{
"id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae",
"type": "tree",
"offset": 38,
"length": 100
},
{
"id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66",
"type": "data",
"offset": 150,
"length": 123
}
]
} ]
`)
var exampleTests = []struct {
id, packID backend.ID
tpe pack.BlobType
offset, length uint
}{
{
ParseID("3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce"),
ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"),
pack.Data, 0, 25,
}, {
ParseID("9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae"),
ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"),
pack.Tree, 38, 100,
}, {
ParseID("d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66"),
ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"),
pack.Data, 150, 123,
},
}
func TestIndexUnserialize(t *testing.T) {
idx, err := server.DecodeIndex(bytes.NewReader(docExample))
OK(t, err)
for _, test := range exampleTests {
packID, tpe, offset, length, err := idx.Lookup(test.id)
OK(t, err)
Equals(t, test.packID, packID)
Equals(t, test.tpe, tpe)
Equals(t, test.offset, offset)
Equals(t, test.length, length)
}
}

View file

@ -1,24 +1,35 @@
package server
import (
"bytes"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/chunker"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
)
type Server struct {
be backend.Backend
key *Key
idx *Index
pm sync.Mutex
packs []*pack.Packer
}
func NewServer(be backend.Backend) *Server {
return &Server{be: be}
return &Server{
be: be,
idx: NewIndex(),
}
}
func (s *Server) SetKey(k *Key) {
@ -49,14 +60,15 @@ func (s *Server) PrefixLength(t backend.Type) (int, error) {
return backend.PrefixLength(s.be, t)
}
// Load tries to load and decrypt content identified by t and blob from the
// backend. If the blob specifies an ID, the decrypted plaintext is checked
// against this ID. The same goes for blob.Size and blob.StorageSize: If they
// are set to a value > 0, this value is checked.
func (s *Server) Load(t backend.Type, blob Blob) ([]byte, error) {
// load data
rd, err := s.be.Get(t, blob.Storage.String())
// Load tries to load and decrypt content identified by t and id from the
// backend.
func (s *Server) Load(t backend.Type, id backend.ID) ([]byte, error) {
debug.Log("Server.Load", "load %v with id %v", t, id.Str())
// load blob from pack
rd, err := s.be.Get(t, id.String())
if err != nil {
debug.Log("Server.Load", "error loading %v: %v", id.Str(), err)
return nil, err
}
@ -65,58 +77,78 @@ func (s *Server) Load(t backend.Type, blob Blob) ([]byte, error) {
return nil, err
}
// check hash
if !backend.Hash(buf).Equal(blob.Storage) {
return nil, errors.New("invalid data returned")
}
// check length
if blob.StorageSize > 0 && len(buf) != int(blob.StorageSize) {
return nil, errors.New("Invalid storage length")
}
// decrypt
buf, err = s.Decrypt(buf)
err = rd.Close()
if err != nil {
return nil, err
}
// check length
if blob.Size > 0 && len(buf) != int(blob.Size) {
return nil, errors.New("Invalid length")
// check hash
if !backend.Hash(buf).Equal(id) {
return nil, errors.New("invalid data returned")
}
// check SHA256 sum
if blob.ID != nil {
id := backend.Hash(buf)
if !blob.ID.Equal(id) {
return nil, fmt.Errorf("load %v: expected plaintext hash %v, got %v", blob.Storage, blob.ID, id)
}
}
return buf, nil
}
// Load tries to load and decrypt content identified by t and id from the backend.
func (s *Server) LoadID(t backend.Type, storageID backend.ID) ([]byte, error) {
return s.Load(t, Blob{Storage: storageID})
}
// LoadJSON calls Load() to get content from the backend and afterwards calls
// json.Unmarshal on the item.
func (s *Server) LoadJSON(t backend.Type, blob Blob, item interface{}) error {
buf, err := s.Load(t, blob)
// decrypt
plain, err := s.Decrypt(buf)
if err != nil {
return err
return nil, err
}
return json.Unmarshal(buf, item)
return plain, nil
}
// LoadJSONID calls Load() to get content from the backend and afterwards calls
// json.Unmarshal on the item.
func (s *Server) LoadJSONID(t backend.Type, id backend.ID, item interface{}) error {
// read
// LoadBlob tries to load and decrypt content identified by t and id from a
// pack from the backend.
func (s *Server) LoadBlob(t pack.BlobType, id backend.ID) ([]byte, error) {
debug.Log("Server.LoadBlob", "load %v with id %v", t, id.Str())
// lookup pack
packID, tpe, offset, length, err := s.idx.Lookup(id)
if err != nil {
debug.Log("Server.LoadBlob", "id %v not found in index: %v", id.Str(), err)
return nil, err
}
if tpe != t {
debug.Log("Server.LoadBlob", "wrong type returned for %v: wanted %v, got %v", id.Str(), t, tpe)
return nil, fmt.Errorf("blob has wrong type %v (wanted: %v)", tpe, t)
}
debug.Log("Server.LoadBlob", "id %v found in pack %v at offset %v (length %d)", id.Str(), packID.Str(), offset, length)
// load blob from pack
rd, err := s.be.GetReader(backend.Data, packID.String(), offset, length)
if err != nil {
debug.Log("Server.LoadBlob", "error loading pack %v for %v: %v", packID.Str(), id.Str(), err)
return nil, err
}
buf, err := ioutil.ReadAll(rd)
if err != nil {
return nil, err
}
err = rd.Close()
if err != nil {
return nil, err
}
// decrypt
plain, err := s.Decrypt(buf)
if err != nil {
return nil, err
}
// check hash
if !backend.Hash(plain).Equal(id) {
return nil, errors.New("invalid data returned")
}
return plain, nil
}
// LoadJSONEncrypted decrypts the data and afterwards calls json.Unmarshal on
// the item.
func (s *Server) LoadJSONEncrypted(t backend.Type, id backend.ID, item interface{}) error {
// load blob from backend
rd, err := s.be.Get(t, id.String())
if err != nil {
return err
@ -140,132 +172,254 @@ func (s *Server) LoadJSONID(t backend.Type, id backend.ID, item interface{}) err
return nil
}
// Save encrypts data and stores it to the backend as type t.
func (s *Server) Save(t backend.Type, data []byte, id backend.ID) (Blob, error) {
// LoadJSONPack calls LoadBlob() to load a blob from the backend, decrypt the
// data and afterwards call json.Unmarshal on the item.
func (s *Server) LoadJSONPack(t pack.BlobType, id backend.ID, item interface{}) error {
// lookup pack
packID, _, offset, length, err := s.idx.Lookup(id)
if err != nil {
return err
}
// load blob from pack
rd, err := s.be.GetReader(backend.Data, packID.String(), offset, length)
if err != nil {
return err
}
defer rd.Close()
// decrypt
decryptRd, err := s.key.DecryptFrom(rd)
defer decryptRd.Close()
if err != nil {
return err
}
// decode
decoder := json.NewDecoder(decryptRd)
err = decoder.Decode(item)
if err != nil {
return err
}
return nil
}
const minPackSize = 4 * chunker.MiB
const maxPackSize = 16 * chunker.MiB
const maxPackers = 200
// findPacker returns a packer for a new blob of size bytes. Either a new one is
// created or one is returned that already has some blobs.
func (s *Server) findPacker(size uint) (*pack.Packer, error) {
s.pm.Lock()
defer s.pm.Unlock()
// search for a suitable packer
if len(s.packs) > 0 {
debug.Log("Server.findPacker", "searching packer for %d bytes\n", size)
for i, p := range s.packs {
if p.Size()+size < maxPackSize {
debug.Log("Server.findPacker", "found packer %v", p)
// remove from list
s.packs = append(s.packs[:i], s.packs[i+1:]...)
return p, nil
}
}
}
// no suitable packer found, return new
blob, err := s.be.Create()
if err != nil {
return nil, err
}
debug.Log("Server.findPacker", "create new pack %p", blob)
return pack.NewPacker(s.key.Master(), blob), nil
}
// insertPacker appends p to s.packs.
func (s *Server) insertPacker(p *pack.Packer) {
s.pm.Lock()
defer s.pm.Unlock()
s.packs = append(s.packs, p)
debug.Log("Server.insertPacker", "%d packers\n", len(s.packs))
}
// savePacker stores p in the backend.
func (s *Server) savePacker(p *pack.Packer) error {
debug.Log("Server.savePacker", "save packer with %d blobs\n", p.Count())
_, err := p.Finalize()
if err != nil {
return err
}
// move file to the final location
sid := p.ID()
err = p.Writer().(backend.Blob).Finalize(backend.Data, sid.String())
if err != nil {
debug.Log("Server.savePacker", "blob Finalize() error: %v", err)
return err
}
debug.Log("Server.savePacker", "saved as %v", sid.Str())
// update blobs in the index
for _, b := range p.Blobs() {
debug.Log("Server.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str())
s.idx.Store(b.Type, b.ID, sid, b.Offset, uint(b.Length))
}
return nil
}
// countPacker returns the number of open (unfinished) packers.
func (s *Server) countPacker() int {
s.pm.Lock()
defer s.pm.Unlock()
return len(s.packs)
}
// Save encrypts data and stores it to the backend as type t. If data is small
// enough, it will be packed together with other small blobs.
func (s *Server) Save(t pack.BlobType, data []byte, id backend.ID) (backend.ID, error) {
if id == nil {
// compute plaintext hash
id = backend.Hash(data)
}
// create a new blob
blob := Blob{
ID: id,
Size: uint64(len(data)),
}
debug.Log("Server.Save", "save id %v (%v, %d bytes)", id.Str(), t, len(data))
// get buf from the pool
ciphertext := getBuf()
defer freeBuf(ciphertext)
// encrypt blob
ciphertext, err := s.Encrypt(ciphertext, data)
if err != nil {
return Blob{}, err
return nil, err
}
// compute ciphertext hash
sid := backend.Hash(ciphertext)
// save blob
backendBlob, err := s.be.Create()
// find suitable packer and add blob
packer, err := s.findPacker(uint(len(ciphertext)))
if err != nil {
return Blob{}, err
return nil, err
}
_, err = backendBlob.Write(ciphertext)
if err != nil {
return Blob{}, err
// save ciphertext
packer.Add(t, id, bytes.NewReader(ciphertext))
// add this id to the index, although we don't know yet in which pack it
// will be saved, the entry will be updated when the pack is written.
s.idx.Store(t, id, nil, 0, 0)
debug.Log("Server.Save", "saving stub for %v (%v) in index", id.Str, t)
// if the pack is not full enough and there are less than maxPackers
// packers, put back to the list
if packer.Size() < minPackSize && s.countPacker() < maxPackers {
debug.Log("Server.Save", "pack is not full enough (%d bytes)", packer.Size())
s.insertPacker(packer)
return id, nil
}
err = backendBlob.Finalize(t, sid.String())
if err != nil {
return Blob{}, err
}
blob.Storage = sid
blob.StorageSize = uint64(len(ciphertext))
return blob, nil
// else write the pack to the backend
return id, s.savePacker(packer)
}
// SaveFrom encrypts data read from rd and stores it to the backend as type t.
func (s *Server) SaveFrom(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) {
// SaveFrom encrypts data read from rd and stores it in a pack in the backend as type t.
func (s *Server) SaveFrom(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error {
debug.Log("Server.SaveFrom", "save id %v (%v, %d bytes)", id.Str(), t, length)
if id == nil {
return Blob{}, errors.New("id is nil")
return errors.New("id is nil")
}
backendBlob, err := s.be.Create()
buf, err := ioutil.ReadAll(rd)
if err != nil {
return Blob{}, err
return err
}
hw := backend.NewHashingWriter(backendBlob, sha256.New())
encWr := s.key.EncryptTo(hw)
_, err = io.Copy(encWr, rd)
_, err = s.Save(t, buf, id)
if err != nil {
return Blob{}, err
return err
}
// finish encryption
err = encWr.Close()
if err != nil {
return Blob{}, fmt.Errorf("EncryptedWriter.Close(): %v", err)
}
// finish backend blob
sid := backend.ID(hw.Sum(nil))
err = backendBlob.Finalize(t, sid.String())
if err != nil {
return Blob{}, fmt.Errorf("backend.Blob.Close(): %v", err)
}
return Blob{
ID: id,
Size: uint64(length),
Storage: sid,
StorageSize: uint64(backendBlob.Size()),
}, nil
return nil
}
// SaveJSON serialises item as JSON and encrypts and saves it in the backend as
// type t.
func (s *Server) SaveJSON(t backend.Type, item interface{}) (Blob, error) {
backendBlob, err := s.be.Create()
// SaveJSON serialises item as JSON and encrypts and saves it in a pack in the
// backend as type t.
func (s *Server) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, error) {
debug.Log("Server.SaveJSON", "save %v blob", t)
buf := getBuf()[:0]
defer freeBuf(buf)
wr := bytes.NewBuffer(buf)
enc := json.NewEncoder(wr)
err := enc.Encode(item)
if err != nil {
return Blob{}, fmt.Errorf("Create: %v", err)
return nil, fmt.Errorf("json.Encode: %v", err)
}
storagehw := backend.NewHashingWriter(backendBlob, sha256.New())
encWr := s.key.EncryptTo(storagehw)
plainhw := backend.NewHashingWriter(encWr, sha256.New())
buf = wr.Bytes()
return s.Save(t, buf, nil)
}
enc := json.NewEncoder(plainhw)
// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the
// backend as type t, without a pack. It returns the storage hash.
func (s *Server) SaveJSONUnpacked(t backend.Type, item interface{}) (backend.ID, error) {
// create blob
blob, err := s.be.Create()
if err != nil {
return nil, err
}
debug.Log("Server.SaveJSONUnpacked", "create new pack %p", blob)
// hash
hw := backend.NewHashingWriter(blob, sha256.New())
// encrypt blob
ewr := s.key.EncryptTo(hw)
enc := json.NewEncoder(ewr)
err = enc.Encode(item)
if err != nil {
return Blob{}, fmt.Errorf("json.NewEncoder: %v", err)
return nil, fmt.Errorf("json.Encode: %v", err)
}
// finish encryption
err = encWr.Close()
err = ewr.Close()
if err != nil {
return Blob{}, fmt.Errorf("EncryptedWriter.Close(): %v", err)
return nil, err
}
// finish backend blob
sid := backend.ID(storagehw.Sum(nil))
err = backendBlob.Finalize(t, sid.String())
// finalize blob in the backend
sid := backend.ID(hw.Sum(nil))
err = blob.Finalize(t, sid.String())
if err != nil {
return Blob{}, fmt.Errorf("backend.Blob.Close(): %v", err)
return nil, err
}
id := backend.ID(plainhw.Sum(nil))
return sid, nil
}
return Blob{
ID: id,
Size: uint64(plainhw.Size()),
Storage: sid,
StorageSize: uint64(backendBlob.Size()),
}, nil
// Flush saves all remaining packs.
func (s *Server) Flush() error {
s.pm.Lock()
defer s.pm.Unlock()
debug.Log("Server.Flush", "manually flushing %d packs", len(s.packs))
for _, p := range s.packs {
err := s.savePacker(p)
if err != nil {
return err
}
}
s.packs = s.packs[:0]
return nil
}
// Returns the backend used for this server.
@ -273,6 +427,106 @@ func (s *Server) Backend() backend.Backend {
return s.be
}
// Returns the index of this server.
func (s *Server) Index() *Index {
return s.idx
}
// SetIndex instructs the server to use the given index.
func (s *Server) SetIndex(i *Index) {
s.idx = i
}
// SaveIndex saves all new packs in the index in the backend, returned is the
// storage ID.
func (s *Server) SaveIndex() (backend.ID, error) {
debug.Log("Server.SaveIndex", "Saving index")
// create blob
blob, err := s.be.Create()
if err != nil {
return nil, err
}
debug.Log("Server.SaveIndex", "create new pack %p", blob)
// hash
hw := backend.NewHashingWriter(blob, sha256.New())
// encrypt blob
ewr := s.key.EncryptTo(hw)
err = s.idx.Encode(ewr)
if err != nil {
return nil, err
}
err = ewr.Close()
if err != nil {
return nil, err
}
// finalize blob in the backend
sid := backend.ID(hw.Sum(nil))
err = blob.Finalize(backend.Index, sid.String())
if err != nil {
return nil, err
}
debug.Log("Server.SaveIndex", "Saved index as %v", sid.Str())
return sid, nil
}
// LoadIndex loads all index files from the backend and merges them with the
// current index.
func (s *Server) LoadIndex() error {
debug.Log("Server.LoadIndex", "Loading index")
done := make(chan struct{})
defer close(done)
for id := range s.be.List(backend.Index, done) {
err := s.loadIndex(id)
if err != nil {
return err
}
}
return nil
}
// loadIndex loads the index id and merges it with the currently used index.
func (s *Server) loadIndex(id string) error {
debug.Log("Server.loadIndex", "Loading index %v", id[:8])
before := len(s.idx.pack)
rd, err := s.be.Get(backend.Index, id)
defer rd.Close()
if err != nil {
return err
}
// decrypt
decryptRd, err := s.key.DecryptFrom(rd)
defer decryptRd.Close()
if err != nil {
return err
}
idx, err := DecodeIndex(decryptRd)
if err != nil {
debug.Log("Server.loadIndex", "error while decoding index %v: %v", id, err)
return err
}
s.idx.Merge(idx)
after := len(s.idx.pack)
debug.Log("Server.loadIndex", "Loaded index %v, added %v blobs", id[:8], after-before)
return nil
}
func (s *Server) SearchKey(password string) error {
key, err := SearchKey(s, password)
if err != nil {
@ -289,7 +543,7 @@ func (s *Server) Decrypt(ciphertext []byte) ([]byte, error) {
return nil, errors.New("key for server not set")
}
return s.key.Decrypt([]byte{}, ciphertext)
return s.key.Decrypt(nil, ciphertext)
}
func (s *Server) Encrypt(ciphertext, plaintext []byte) ([]byte, error) {
@ -305,8 +559,8 @@ func (s *Server) Key() *Key {
}
// Count returns the number of blobs of a given type in the backend.
func (s *Server) Count(t backend.Type) (n int) {
for _ = range s.List(t, nil) {
func (s *Server) Count(t backend.Type) (n uint) {
for _ = range s.be.List(t, nil) {
n++
}

View file

@ -11,6 +11,7 @@ import (
"github.com/restic/restic"
"github.com/restic/restic/backend"
"github.com/restic/restic/pack"
. "github.com/restic/restic/test"
)
@ -38,12 +39,12 @@ func TestSaveJSON(t *testing.T) {
data = append(data, '\n')
h := sha256.Sum256(data)
blob, err := server.SaveJSON(backend.Tree, obj)
id, err := server.SaveJSON(pack.Tree, obj)
OK(t, err)
Assert(t, bytes.Equal(h[:], blob.ID),
Assert(t, bytes.Equal(h[:], id),
"TestSaveJSON: wrong plaintext ID: expected %02x, got %02x",
h, blob.ID)
h, id)
}
}
@ -63,17 +64,51 @@ func BenchmarkSaveJSON(t *testing.B) {
t.ResetTimer()
for i := 0; i < t.N; i++ {
blob, err := server.SaveJSON(backend.Tree, obj)
id, err := server.SaveJSON(pack.Tree, obj)
OK(t, err)
Assert(t, bytes.Equal(h[:], blob.ID),
Assert(t, bytes.Equal(h[:], id),
"TestSaveJSON: wrong plaintext ID: expected %02x, got %02x",
h, blob.ID)
h, id)
}
}
var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20}
func TestSave(t *testing.T) {
server := SetupBackend(t)
defer TeardownBackend(t, server)
key := SetupKey(t, server, "geheim")
server.SetKey(key)
for _, size := range testSizes {
data := make([]byte, size)
_, err := io.ReadFull(rand.Reader, data)
OK(t, err)
id := backend.Hash(data)
// save
sid, err := server.Save(pack.Data, data, nil)
OK(t, err)
Equals(t, id, sid)
OK(t, server.Flush())
// read back
buf, err := server.LoadBlob(pack.Data, id)
Assert(t, len(buf) == len(data),
"number of bytes read back does not match: expected %d, got %d",
len(data), len(buf))
Assert(t, bytes.Equal(buf, data),
"data does not match: expected %02x, got %02x",
data, buf)
}
}
func TestSaveFrom(t *testing.T) {
server := SetupBackend(t)
defer TeardownBackend(t, server)
@ -85,14 +120,16 @@ func TestSaveFrom(t *testing.T) {
_, err := io.ReadFull(rand.Reader, data)
OK(t, err)
id := sha256.Sum256(data)
id := backend.Hash(data)
// save
blob, err := server.SaveFrom(backend.Data, id[:], uint(size), bytes.NewReader(data))
err = server.SaveFrom(pack.Data, id[:], uint(size), bytes.NewReader(data))
OK(t, err)
OK(t, server.Flush())
// read back
buf, err := server.Load(backend.Data, blob)
buf, err := server.LoadBlob(pack.Data, id[:])
Assert(t, len(buf) == len(data),
"number of bytes read back does not match: expected %d, got %d",
@ -123,12 +160,12 @@ func BenchmarkSaveFrom(t *testing.B) {
for i := 0; i < t.N; i++ {
// save
_, err := server.SaveFrom(backend.Data, id[:], uint(size), bytes.NewReader(data))
err = server.SaveFrom(pack.Data, id[:], uint(size), bytes.NewReader(data))
OK(t, err)
}
}
func TestLoadJSONID(t *testing.T) {
func TestLoadJSONPack(t *testing.T) {
if *benchTestDir == "" {
t.Skip("benchdir not set, skipping TestServerStats")
}
@ -140,23 +177,14 @@ func TestLoadJSONID(t *testing.T) {
// archive a few files
sn := SnapshotDir(t, server, *benchTestDir, nil)
t.Logf("archived snapshot %v", sn.ID())
// benchmark loading first tree
done := make(chan struct{})
first, found := <-server.List(backend.Tree, done)
Assert(t, found, "no Trees in repository found")
close(done)
id, err := backend.ParseID(first)
OK(t, err)
OK(t, server.Flush())
tree := restic.NewTree()
err = server.LoadJSONID(backend.Tree, id, &tree)
err := server.LoadJSONPack(pack.Tree, sn.Tree, &tree)
OK(t, err)
}
func BenchmarkLoadJSONID(t *testing.B) {
func TestLoadJSONEncrypted(t *testing.T) {
if *benchTestDir == "" {
t.Skip("benchdir not set, skipping TestServerStats")
}
@ -166,18 +194,20 @@ func BenchmarkLoadJSONID(t *testing.B) {
key := SetupKey(t, server, "geheim")
server.SetKey(key)
// archive a few files
sn := SnapshotDir(t, server, *benchTestDir, nil)
t.Logf("archived snapshot %v", sn.ID())
// archive a snapshot
sn := restic.Snapshot{}
sn.Hostname = "foobar"
sn.Username = "test!"
t.ResetTimer()
id, err := server.SaveJSONUnpacked(backend.Snapshot, &sn)
OK(t, err)
tree := restic.NewTree()
for i := 0; i < t.N; i++ {
for name := range server.List(backend.Tree, nil) {
id, err := backend.ParseID(name)
OK(t, err)
OK(t, server.LoadJSONID(backend.Tree, id, &tree))
}
}
var sn2 restic.Snapshot
// restore
err = server.LoadJSONEncrypted(backend.Snapshot, id, &sn2)
OK(t, err)
Equals(t, sn.Hostname, sn2.Hostname)
Equals(t, sn.Username, sn2.Username)
}

View file

@ -11,14 +11,14 @@ import (
)
type Snapshot struct {
Time time.Time `json:"time"`
Parent backend.ID `json:"parent,omitempty"`
Tree server.Blob `json:"tree"`
Paths []string `json:"paths"`
Hostname string `json:"hostname,omitempty"`
Username string `json:"username,omitempty"`
UID uint32 `json:"uid,omitempty"`
GID uint32 `json:"gid,omitempty"`
Time time.Time `json:"time"`
Parent backend.ID `json:"parent,omitempty"`
Tree backend.ID `json:"tree"`
Paths []string `json:"paths"`
Hostname string `json:"hostname,omitempty"`
Username string `json:"username,omitempty"`
UID uint32 `json:"uid,omitempty"`
GID uint32 `json:"gid,omitempty"`
id backend.ID // plaintext ID, used during restore
}
@ -50,7 +50,7 @@ func NewSnapshot(paths []string) (*Snapshot, error) {
func LoadSnapshot(s *server.Server, id backend.ID) (*Snapshot, error) {
sn := &Snapshot{id: id}
err := s.LoadJSONID(backend.Snapshot, id, sn)
err := s.LoadJSONEncrypted(backend.Snapshot, id, sn)
if err != nil {
return nil, err
}

View file

@ -52,7 +52,6 @@ func SetupKey(t testing.TB, s *server.Server, password string) *server.Key {
func SnapshotDir(t testing.TB, server *server.Server, path string, parent backend.ID) *restic.Snapshot {
arch, err := restic.NewArchiver(server)
OK(t, err)
OK(t, arch.Preload())
sn, _, err := arch.Snapshot(nil, []string{path}, parent)
OK(t, err)
return sn

View file

@ -39,7 +39,7 @@ func Equals(tb testing.TB, exp, act interface{}) {
}
}
func Str2ID(s string) backend.ID {
func ParseID(s string) backend.ID {
id, err := backend.ParseID(s)
if err != nil {
panic(err)

18
tree.go
View file

@ -7,12 +7,12 @@ import (
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/server"
)
type Tree struct {
Nodes []*Node `json:"nodes"`
Map *Map `json:"map"`
}
var (
@ -23,17 +23,16 @@ var (
func NewTree() *Tree {
return &Tree{
Nodes: []*Node{},
Map: NewMap(),
}
}
func (t Tree) String() string {
return fmt.Sprintf("Tree<%d nodes, %d blobs>", len(t.Nodes), len(t.Map.list))
return fmt.Sprintf("Tree<%d nodes>", len(t.Nodes))
}
func LoadTree(s *server.Server, blob server.Blob) (*Tree, error) {
func LoadTree(s *server.Server, id backend.ID) (*Tree, error) {
tree := &Tree{}
err := s.LoadJSON(backend.Tree, blob, tree)
err := s.LoadJSONPack(pack.Tree, id, tree)
if err != nil {
return nil, err
}
@ -41,18 +40,13 @@ func LoadTree(s *server.Server, blob server.Blob) (*Tree, error) {
return tree, nil
}
// Equals returns true if t and other have exactly the same nodes and map.
func (t Tree) Equals(other Tree) bool {
// Equals returns true if t and other have exactly the same nodes.
func (t Tree) Equals(other *Tree) bool {
if len(t.Nodes) != len(other.Nodes) {
debug.Log("Tree.Equals", "tree.Equals(): trees have different number of nodes")
return false
}
if !t.Map.Equals(other.Map) {
debug.Log("Tree.Equals", "tree.Equals(): maps aren't equal")
return false
}
for i := 0; i < len(t.Nodes); i++ {
if !t.Nodes[i].Equals(*other.Nodes[i]) {
debug.Log("Tree.Equals", "tree.Equals(): node %d is different:", i)

View file

@ -8,6 +8,7 @@ import (
"testing"
"github.com/restic/restic"
"github.com/restic/restic/pack"
. "github.com/restic/restic/test"
)
@ -91,3 +92,26 @@ func TestNodeComparison(t *testing.T) {
n2.Size -= 1
Assert(t, !node.Equals(n2), "nodes are equal")
}
func TestLoadTree(t *testing.T) {
server := SetupBackend(t)
defer TeardownBackend(t, server)
key := SetupKey(t, server, "geheim")
server.SetKey(key)
// save tree
tree := restic.NewTree()
id, err := server.SaveJSON(pack.Tree, tree)
OK(t, err)
// save packs
OK(t, server.Flush())
// load tree again
tree2, err := restic.LoadTree(server, id)
OK(t, err)
Assert(t, tree.Equals(tree2),
"trees are not equal: want %v, got %v",
tree, tree2)
}

27
walk.go
View file

@ -3,6 +3,7 @@ package restic
import (
"path/filepath"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/server"
)
@ -15,10 +16,10 @@ type WalkTreeJob struct {
Tree *Tree
}
func walkTree(s *server.Server, path string, treeBlob server.Blob, done chan struct{}, jobCh chan<- WalkTreeJob) {
debug.Log("walkTree", "start on %q (%v)", path, treeBlob)
func walkTree(s *server.Server, path string, treeID backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) {
debug.Log("walkTree", "start on %q (%v)", path, treeID.Str())
// load tree
t, err := LoadTree(s, treeBlob)
t, err := LoadTree(s, treeID)
if err != nil {
jobCh <- WalkTreeJob{Path: path, Error: err}
return
@ -27,32 +28,22 @@ func walkTree(s *server.Server, path string, treeBlob server.Blob, done chan str
for _, node := range t.Nodes {
p := filepath.Join(path, node.Name)
if node.Type == "dir" {
blob, err := t.Map.FindID(node.Subtree)
if err != nil {
jobCh <- WalkTreeJob{Path: p, Error: err}
continue
}
walkTree(s, p, blob, done, jobCh)
walkTree(s, p, node.Subtree, done, jobCh)
} else {
// load old blobs
node.blobs, err = t.Map.Select(node.Content)
if err != nil {
debug.Log("walkTree", "unable to load bobs for %q (%v): %v", path, treeBlob, err)
}
jobCh <- WalkTreeJob{Path: p, Node: node, Error: err}
}
}
jobCh <- WalkTreeJob{Path: filepath.Join(path), Tree: t}
debug.Log("walkTree", "done for %q (%v)", path, treeBlob)
debug.Log("walkTree", "done for %q (%v)", path, treeID.Str())
}
// WalkTree walks the tree specified by ID recursively and sends a job for each
// file and directory it finds. When the channel done is closed, processing
// stops.
func WalkTree(server *server.Server, blob server.Blob, done chan struct{}, jobCh chan<- WalkTreeJob) {
debug.Log("WalkTree", "start on %v", blob)
walkTree(server, "", blob, done, jobCh)
func WalkTree(server *server.Server, id backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) {
debug.Log("WalkTree", "start on %v", id.Str())
walkTree(server, "", id, done, jobCh)
close(jobCh)
debug.Log("WalkTree", "done")
}

View file

@ -27,6 +27,9 @@ func TestWalkTree(t *testing.T) {
sn, _, err := arch.Snapshot(nil, dirs, nil)
OK(t, err)
// flush server, write all packs
OK(t, server.Flush())
// start benchmark
// t.ResetTimer()
@ -48,6 +51,9 @@ func TestWalkTree(t *testing.T) {
fsJob, fsChOpen := <-fsJobs
Assert(t, !fsChOpen || fsJob != nil,
"received nil job from filesystem: %v %v", fsJob, fsChOpen)
if fsJob != nil {
OK(t, fsJob.Error())
}
var path string
fsEntries := 1
@ -63,6 +69,8 @@ func TestWalkTree(t *testing.T) {
treeJob, treeChOpen := <-treeJobs
treeEntries := 1
OK(t, treeJob.Error)
if treeJob.Tree != nil {
treeEntries = len(treeJob.Tree.Nodes)
}