diff --git a/archiver.go b/archiver.go index 983d6f168..c850b11fe 100644 --- a/archiver.go +++ b/archiver.go @@ -14,6 +14,7 @@ import ( "github.com/restic/restic/backend" "github.com/restic/restic/chunker" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/pipe" "github.com/restic/restic/server" ) @@ -29,8 +30,6 @@ const ( type Archiver struct { s *server.Server - m *Map - c *Cache blobToken chan struct{} @@ -50,15 +49,6 @@ func NewArchiver(s *server.Server) (*Archiver, error) { arch.blobToken <- struct{}{} } - // create new map to store all blobs in - arch.m = NewMap() - - // init cache - arch.c, err = NewCache(s) - if err != nil { - return nil, err - } - // abort on all errors arch.Error = func(string, os.FileInfo, error) error { return err } // allow all files @@ -67,119 +57,59 @@ func NewArchiver(s *server.Server) (*Archiver, error) { return arch, nil } -// Cache returns the current cache for the Archiver. -func (arch *Archiver) Cache() *Cache { - return arch.c -} - -// Preload loads all blobs for all cached snapshots. -func (arch *Archiver) Preload() error { - done := make(chan struct{}) - defer close(done) - - // list snapshots - // TODO: track seen tree ids, load trees that aren't in the set - snapshots := 0 - for name := range arch.s.List(backend.Snapshot, done) { - id, err := backend.ParseID(name) - if err != nil { - debug.Log("Archiver.Preload", "unable to parse name %v as id: %v", name, err) - continue - } - - m, err := arch.c.LoadMap(arch.s, id) - if err != nil { - debug.Log("Archiver.Preload", "blobs for snapshot %v not cached: %v", id.Str(), err) - continue - } - - arch.m.Merge(m) - debug.Log("Archiver.Preload", "done loading cached blobs for snapshot %v", id.Str()) - snapshots++ - } - - debug.Log("Archiver.Preload", "Loaded %v blobs from %v snapshots", arch.m.Len(), snapshots) - return nil -} - -func (arch *Archiver) Save(t backend.Type, id backend.ID, length uint, rd io.Reader) (server.Blob, error) { +func (arch *Archiver) Save(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error { debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str()) // test if this blob is already known - blob, err := arch.m.FindID(id) - if err == nil { - debug.Log("Archiver.Save", "Save(%v, %v): reusing %v\n", t, id.Str(), blob.Storage.Str()) - return blob, nil + if arch.s.Index().Has(id) { + debug.Log("Archiver.Save", "(%v, %v) already saved\n", t, id.Str()) + return nil } - // else encrypt and save data - blob, err = arch.s.SaveFrom(t, id, length, rd) - - // store blob in storage map - smapblob := arch.m.Insert(blob) - - // if the map has a different storage id for this plaintext blob, use that - // one and remove the other. This happens if the same plaintext blob was - // stored concurrently and finished earlier than this blob. - if blob.Storage.Compare(smapblob.Storage) != 0 { - debug.Log("Archiver.Save", "using other block, removing %v\n", blob.Storage.Str()) - - // remove the blob again - // TODO: implement a list of blobs in transport, so this doesn't happen so often - err = arch.s.Remove(t, blob.Storage.String()) - if err != nil { - return server.Blob{}, err - } + // otherwise save blob + err := arch.s.SaveFrom(t, id, length, rd) + if err != nil { + debug.Log("Archiver.Save", "Save(%v, %v): error %v\n", t, id.Str(), err) + return err } - debug.Log("Archiver.Save", "Save(%v, %v): new blob %v\n", t, id.Str(), blob) - - return smapblob, nil + debug.Log("Archiver.Save", "Save(%v, %v): new blob\n", t, id.Str()) + return nil } -func (arch *Archiver) SaveTreeJSON(item interface{}) (server.Blob, error) { +func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) { // convert to json data, err := json.Marshal(item) // append newline data = append(data, '\n') if err != nil { - return server.Blob{}, err + return nil, err } // check if tree has been saved before id := backend.Hash(data) - blob, err := arch.m.FindID(id) - // return the blob if we found it - if err == nil { - return blob, nil + if arch.s.Index().Has(id) { + return id, nil } // otherwise save the data - blob, err = arch.s.SaveJSON(backend.Tree, item) - if err != nil { - return server.Blob{}, err - } - - // store blob in storage map - arch.m.Insert(blob) - - return blob, nil + return arch.s.SaveJSON(pack.Tree, item) } // SaveFile stores the content of the file on the backend as a Blob by calling // Save for each chunk. -func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) { +func (arch *Archiver) SaveFile(p *Progress, node *Node) error { file, err := node.OpenForReading() defer file.Close() if err != nil { - return nil, err + return err } // check file again fi, err := file.Stat() if err != nil { - return nil, err + return err } if fi.ModTime() != node.ModTime { @@ -190,7 +120,7 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) { n, err := NodeFromFileInfo(node.path, fi) if err != nil { debug.Log("Archiver.SaveFile", "NodeFromFileInfo returned error for %v: %v", node.path, err) - return nil, err + return err } // copy node @@ -198,12 +128,15 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) { } } - var blobs server.Blobs + type result struct { + id backend.ID + bytes uint64 + } // store all chunks chnker := GetChunker("archiver.SaveFile") chnker.Reset(file, arch.s.ChunkerPolynomial()) - chans := [](<-chan server.Blob){} + chans := [](<-chan result){} defer FreeChunker("archiver.SaveFile", chnker) chunks := 0 @@ -215,75 +148,71 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) (server.Blobs, error) { } if err != nil { - return nil, arrar.Annotate(err, "SaveFile() chunker.Next()") + return arrar.Annotate(err, "SaveFile() chunker.Next()") } chunks++ // acquire token, start goroutine to save chunk token := <-arch.blobToken - resCh := make(chan server.Blob, 1) + resCh := make(chan result, 1) - go func(ch chan<- server.Blob) { - blob, err := arch.Save(backend.Data, chunk.Digest, chunk.Length, chunk.Reader(file)) + go func(ch chan<- result) { + err := arch.Save(pack.Data, chunk.Digest, chunk.Length, chunk.Reader(file)) // TODO handle error if err != nil { panic(err) } - p.Report(Stat{Bytes: blob.Size}) + p.Report(Stat{Bytes: uint64(chunk.Length)}) arch.blobToken <- token - ch <- blob + ch <- result{id: backend.ID(chunk.Digest), bytes: uint64(chunk.Length)} }(resCh) chans = append(chans, resCh) } - blobs = []server.Blob{} + results := []result{} for _, ch := range chans { - blobs = append(blobs, <-ch) + results = append(results, <-ch) } - if len(blobs) != chunks { - return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(blobs)) + if len(results) != chunks { + return fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(results)) } var bytes uint64 - node.Content = make([]backend.ID, len(blobs)) + node.Content = make([]backend.ID, len(results)) debug.Log("Archiver.Save", "checking size for file %s", node.path) - for i, blob := range blobs { - node.Content[i] = blob.ID - bytes += blob.Size + for i, b := range results { + node.Content[i] = b.id + bytes += b.bytes - debug.Log("Archiver.Save", " adding blob %s", blob) + debug.Log("Archiver.Save", " adding blob %s, %d bytes", b.id.Str(), b.bytes) } if bytes != node.Size { - return nil, fmt.Errorf("errors saving node %q: saved %d bytes, wanted %d bytes", node.path, bytes, node.Size) + return fmt.Errorf("errors saving node %q: saved %d bytes, wanted %d bytes", node.path, bytes, node.Size) } - debug.Log("Archiver.SaveFile", "SaveFile(%q): %v\n", node.path, blobs) + debug.Log("Archiver.SaveFile", "SaveFile(%q): %v blobs\n", node.path, len(results)) - return blobs, nil + return nil } -func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { +func (arch *Archiver) saveTree(p *Progress, t *Tree) (backend.ID, error) { debug.Log("Archiver.saveTree", "saveTree(%v)\n", t) var wg sync.WaitGroup - // add all blobs to global map - arch.m.Merge(t.Map) - // TODO: do all this in parallel for _, node := range t.Nodes { if node.tree != nil { - b, err := arch.saveTree(p, node.tree) + id, err := arch.saveTree(p, node.tree) if err != nil { - return server.Blob{}, err + return nil, err } - node.Subtree = b.ID - t.Map.Insert(b) + node.Subtree = id p.Report(Stat{Dirs: 1}) } else if node.Type == "file" { if len(node.Content) > 0 { @@ -291,22 +220,18 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { // check content for _, id := range node.Content { - blob, err := t.Map.FindID(id) + packID, _, _, _, err := arch.s.Index().Lookup(id) if err != nil { - debug.Log("Archiver.saveTree", "unable to find storage id for data blob %v", id.Str()) - arch.Error(node.path, nil, fmt.Errorf("unable to find storage id for data blob %v", id.Str())) + debug.Log("Archiver.saveTree", "unable to find storage id for data blob %v: %v", id.Str(), err) + arch.Error(node.path, nil, fmt.Errorf("unable to find storage id for data blob %v: %v", id.Str(), err)) removeContent = true - t.Map.DeleteID(id) - arch.m.DeleteID(id) continue } - if ok, err := arch.s.Test(backend.Data, blob.Storage.String()); !ok || err != nil { - debug.Log("Archiver.saveTree", "blob %v not in repository (error is %v)", blob, err) - arch.Error(node.path, nil, fmt.Errorf("blob %v not in repository (error is %v)", blob.Storage.Str(), err)) + if ok, err := arch.s.Test(backend.Data, packID.String()); !ok || err != nil { + debug.Log("Archiver.saveTree", "pack %v of blob %v not in repository (error is %v)", packID, id, err) + arch.Error(node.path, nil, fmt.Errorf("pack %v of blob %v not in repository (error is %v)", packID, id, err)) removeContent = true - t.Map.DeleteID(id) - arch.m.DeleteID(id) } } @@ -322,12 +247,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { go func(n *Node) { defer wg.Done() - var blobs server.Blobs - blobs, n.err = arch.SaveFile(p, n) - for _, b := range blobs { - t.Map.Insert(b) - } - + n.err = arch.SaveFile(p, n) p.Report(Stat{Files: 1}) }(node) } @@ -341,7 +261,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { // check for invalid file nodes for _, node := range t.Nodes { if node.Type == "file" && node.Content == nil && node.err == nil { - return server.Blob{}, fmt.Errorf("node %v has empty content", node.Name) + return nil, fmt.Errorf("node %v has empty content", node.Name) } // remember used hashes @@ -358,7 +278,7 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { if node.err != nil { err := arch.Error(node.path, nil, node.err) if err != nil { - return server.Blob{}, err + return nil, err } // save error message in node @@ -366,20 +286,12 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (server.Blob, error) { } } - before := len(t.Map.IDs()) - t.Map.Prune(usedIDs) - after := len(t.Map.IDs()) - - if before != after { - debug.Log("Archiver.saveTree", "pruned %d ids from map for tree %v\n", before-after, t) - } - - blob, err := arch.SaveTreeJSON(t) + id, err := arch.SaveTreeJSON(t) if err != nil { - return server.Blob{}, err + return nil, err } - return blob, nil + return id, nil } func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, entCh <-chan pipe.Entry) { @@ -444,7 +356,7 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st // otherwise read file normally if node.Type == "file" && len(node.Content) == 0 { debug.Log("Archiver.fileWorker", " read and save %v, content: %v", e.Path(), node.Content) - node.blobs, err = arch.SaveFile(p, node) + err = arch.SaveFile(p, node) if err != nil { // TODO: integrate error reporting fmt.Fprintf(os.Stderr, "error for %v: %v\n", node.path, err) @@ -501,11 +413,6 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str if node.Type == "dir" { debug.Log("Archiver.dirWorker", "got tree node for %s: %v", node.path, node.blobs) } - - // also store blob in tree map - for _, blob := range node.blobs { - tree.Map.Insert(blob) - } } var ( @@ -525,14 +432,13 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str } } - blob, err := arch.SaveTreeJSON(tree) + id, err := arch.SaveTreeJSON(tree) if err != nil { panic(err) } - debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), blob) + debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), id.Str()) - node.Subtree = blob.ID - node.blobs = server.Blobs{blob} + node.Subtree = id dir.Result() <- node if dir.Path() != "" { @@ -792,30 +698,35 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn // receive the top-level tree root := (<-resCh).(*Node) - blob := root.blobs[0] - debug.Log("Archiver.Snapshot", "root node received: %v", blob) - sn.Tree = blob + debug.Log("Archiver.Snapshot", "root node received: %v", root.Subtree.Str()) + sn.Tree = root.Subtree // save snapshot - blob, err = arch.s.SaveJSON(backend.Snapshot, sn) + id, err := arch.s.SaveJSONUnpacked(backend.Snapshot, sn) if err != nil { return nil, nil, err } // store ID in snapshot struct - sn.id = blob.Storage + sn.id = id + debug.Log("Archiver.Snapshot", "saved snapshot %v", id.Str()) - debug.Log("Archiver.Snapshot", "saved snapshot %v", blob.Storage.Str()) - - // cache blobs - err = arch.c.StoreMap(sn.id, arch.m) + // flush server + err = arch.s.Flush() if err != nil { - debug.Log("Archiver.Snapshot", "unable to cache blobs for snapshot %v: %v", blob.Storage.Str(), err) - fmt.Fprintf(os.Stderr, "unable to cache blobs for snapshot %v: %v\n", blob.Storage.Str(), err) - return sn, blob.Storage, nil + return nil, nil, err } - return sn, blob.Storage, nil + // save index + indexID, err := arch.s.SaveIndex() + if err != nil { + debug.Log("Archiver.Snapshot", "error saving index: %v", err) + return nil, nil, err + } + + debug.Log("Archiver.Snapshot", "saved index %v", indexID.Str()) + + return sn, id, nil } func isFile(fi os.FileInfo) bool { diff --git a/archiver_test.go b/archiver_test.go index 77e10f17f..c64f0d145 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic" "github.com/restic/restic/backend" "github.com/restic/restic/chunker" + "github.com/restic/restic/pack" "github.com/restic/restic/server" . "github.com/restic/restic/test" ) @@ -114,11 +115,7 @@ func BenchmarkChunkEncryptParallel(b *testing.B) { restic.FreeChunkBuf("BenchmarkChunkEncryptParallel", buf) } -func BenchmarkArchiveDirectory(b *testing.B) { - if *benchArchiveDirectory == "" { - b.Skip("benchdir not set, skipping BenchmarkArchiveDirectory") - } - +func archiveDirectory(b testing.TB) { server := SetupBackend(b) defer TeardownBackend(b, server) key := SetupKey(b, server, "geheim") @@ -132,13 +129,25 @@ func BenchmarkArchiveDirectory(b *testing.B) { b.Logf("snapshot archived as %v", id) } -func countBlobs(t testing.TB, server *server.Server) (trees int, data int) { - return server.Count(backend.Tree), server.Count(backend.Data) +func TestArchiveDirectory(t *testing.T) { + if *benchArchiveDirectory == "" { + t.Skip("benchdir not set, skipping TestArchiveDirectory") + } + + archiveDirectory(t) } -func archiveWithPreload(t testing.TB) { +func BenchmarkArchiveDirectory(b *testing.B) { if *benchArchiveDirectory == "" { - t.Skip("benchdir not set, skipping TestArchiverPreload") + b.Skip("benchdir not set, skipping BenchmarkArchiveDirectory") + } + + archiveDirectory(b) +} + +func archiveWithDedup(t testing.TB) { + if *benchArchiveDirectory == "" { + t.Skip("benchdir not set, skipping TestArchiverDedup") } server := SetupBackend(t) @@ -146,78 +155,81 @@ func archiveWithPreload(t testing.TB) { key := SetupKey(t, server, "geheim") server.SetKey(key) + var cnt struct { + before, after, after2 struct { + packs, dataBlobs, treeBlobs uint + } + } + // archive a few files sn := SnapshotDir(t, server, *benchArchiveDirectory, nil) t.Logf("archived snapshot %v", sn.ID().Str()) // get archive stats - beforeTrees, beforeData := countBlobs(t, server) - t.Logf("found %v trees, %v data blobs", beforeTrees, beforeData) + cnt.before.packs = server.Count(backend.Data) + cnt.before.dataBlobs = server.Index().Count(pack.Data) + cnt.before.treeBlobs = server.Index().Count(pack.Tree) + t.Logf("packs %v, data blobs %v, tree blobs %v", + cnt.before.packs, cnt.before.dataBlobs, cnt.before.treeBlobs) // archive the same files again, without parent snapshot sn2 := SnapshotDir(t, server, *benchArchiveDirectory, nil) t.Logf("archived snapshot %v", sn2.ID().Str()) - // get archive stats - afterTrees2, afterData2 := countBlobs(t, server) - t.Logf("found %v trees, %v data blobs", afterTrees2, afterData2) + // get archive stats again + cnt.after.packs = server.Count(backend.Data) + cnt.after.dataBlobs = server.Index().Count(pack.Data) + cnt.after.treeBlobs = server.Index().Count(pack.Tree) + t.Logf("packs %v, data blobs %v, tree blobs %v", + cnt.after.packs, cnt.after.dataBlobs, cnt.after.treeBlobs) - // if there are more blobs, something is wrong - if afterData2 > beforeData { - t.Fatalf("TestArchiverPreload: too many data blobs in repository: before %d, after %d", - beforeData, afterData2) + // if there are more packs or blobs, something is wrong + if cnt.after.packs > cnt.before.packs { + t.Fatalf("TestArchiverDedup: too many packs in repository: before %d, after %d", + cnt.before.packs, cnt.after.packs) + } + if cnt.after.dataBlobs > cnt.before.dataBlobs { + t.Fatalf("TestArchiverDedup: too many data blobs in repository: before %d, after %d", + cnt.before.dataBlobs, cnt.after.dataBlobs) + } + if cnt.after.treeBlobs > cnt.before.treeBlobs { + t.Fatalf("TestArchiverDedup: too many tree blobs in repository: before %d, after %d", + cnt.before.treeBlobs, cnt.after.treeBlobs) } // archive the same files again, with a parent snapshot sn3 := SnapshotDir(t, server, *benchArchiveDirectory, sn2.ID()) t.Logf("archived snapshot %v, parent %v", sn3.ID().Str(), sn2.ID().Str()) - // get archive stats - afterTrees3, afterData3 := countBlobs(t, server) - t.Logf("found %v trees, %v data blobs", afterTrees3, afterData3) + // get archive stats again + cnt.after2.packs = server.Count(backend.Data) + cnt.after2.dataBlobs = server.Index().Count(pack.Data) + cnt.after2.treeBlobs = server.Index().Count(pack.Tree) + t.Logf("packs %v, data blobs %v, tree blobs %v", + cnt.after2.packs, cnt.after2.dataBlobs, cnt.after2.treeBlobs) - // if there are more blobs, something is wrong - if afterData3 > beforeData { - t.Fatalf("TestArchiverPreload: too many data blobs in repository: before %d, after %d", - beforeData, afterData3) + // if there are more packs or blobs, something is wrong + if cnt.after2.packs > cnt.before.packs { + t.Fatalf("TestArchiverDedup: too many packs in repository: before %d, after %d", + cnt.before.packs, cnt.after2.packs) + } + if cnt.after2.dataBlobs > cnt.before.dataBlobs { + t.Fatalf("TestArchiverDedup: too many data blobs in repository: before %d, after %d", + cnt.before.dataBlobs, cnt.after2.dataBlobs) + } + if cnt.after2.treeBlobs > cnt.before.treeBlobs { + t.Fatalf("TestArchiverDedup: too many tree blobs in repository: before %d, after %d", + cnt.before.treeBlobs, cnt.after2.treeBlobs) } } -func TestArchivePreload(t *testing.T) { - archiveWithPreload(t) -} - -func BenchmarkPreload(t *testing.B) { - if *benchArchiveDirectory == "" { - t.Skip("benchdir not set, skipping TestArchiverPreload") - } - - server := SetupBackend(t) - defer TeardownBackend(t, server) - key := SetupKey(t, server, "geheim") - server.SetKey(key) - - // archive a few files - arch, err := restic.NewArchiver(server) - OK(t, err) - sn, _, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) - OK(t, err) - t.Logf("archived snapshot %v", sn.ID()) - - // start benchmark - t.ResetTimer() - - for i := 0; i < t.N; i++ { - // create new archiver and preload - arch2, err := restic.NewArchiver(server) - OK(t, err) - OK(t, arch2.Preload()) - } +func TestArchiveDedup(t *testing.T) { + archiveWithDedup(t) } func BenchmarkLoadTree(t *testing.B) { if *benchArchiveDirectory == "" { - t.Skip("benchdir not set, skipping TestArchiverPreload") + t.Skip("benchdir not set, skipping TestArchiverDedup") } s := SetupBackend(t) @@ -235,14 +247,12 @@ func BenchmarkLoadTree(t *testing.B) { list := make([]backend.ID, 0, 10) done := make(chan struct{}) - for name := range s.List(backend.Tree, done) { - id, err := backend.ParseID(name) - if err != nil { - t.Logf("invalid id for tree %v", name) + for blob := range s.Index().Each(done) { + if blob.Type != pack.Tree { continue } - list = append(list, id) + list = append(list, blob.ID) if len(list) == cap(list) { close(done) break @@ -254,7 +264,7 @@ func BenchmarkLoadTree(t *testing.B) { for i := 0; i < t.N; i++ { for _, id := range list { - _, err := restic.LoadTree(s, server.Blob{Storage: id}) + _, err := restic.LoadTree(s, id) OK(t, err) } } diff --git a/backend/backend_test.go b/backend/backend_test.go index 36a91cec6..da5606551 100644 --- a/backend/backend_test.go +++ b/backend/backend_test.go @@ -13,7 +13,10 @@ import ( ) func testBackend(b backend.Backend, t *testing.T) { - for _, tpe := range []backend.Type{backend.Data, backend.Key, backend.Lock, backend.Snapshot, backend.Tree} { + for _, tpe := range []backend.Type{ + backend.Data, backend.Key, backend.Lock, + backend.Snapshot, backend.Index, + } { // detect non-existing files for _, test := range TestStrings { id, err := backend.ParseID(test.id) diff --git a/backend/generic.go b/backend/generic.go index 173f4e1b1..0720809bd 100644 --- a/backend/generic.go +++ b/backend/generic.go @@ -3,6 +3,7 @@ package backend import ( "crypto/sha256" "errors" + "io" ) const ( @@ -96,3 +97,33 @@ outer: return IDSize, nil } + +// wrap around io.LimitedReader that implements io.ReadCloser +type blobReader struct { + f io.Closer + rd io.Reader + closed bool +} + +func (l *blobReader) Read(p []byte) (int, error) { + n, err := l.rd.Read(p) + if err == io.EOF { + l.Close() + } + + return n, err +} + +func (l *blobReader) Close() error { + if !l.closed { + err := l.f.Close() + l.closed = true + return err + } + + return nil +} + +func LimitReader(f io.ReadCloser, n int64) *blobReader { + return &blobReader{f: f, rd: io.LimitReader(f, n)} +} diff --git a/backend/interface.go b/backend/interface.go index e96c6ab7c..a20b0e751 100644 --- a/backend/interface.go +++ b/backend/interface.go @@ -10,14 +10,14 @@ const ( Key = "key" Lock = "lock" Snapshot = "snapshot" - Tree = "tree" + Index = "index" ) const ( Version = 1 ) -// A Backend manages blobs of data. +// A Backend manages data stored somewhere. type Backend interface { // Location returns a string that specifies the location of the repository, // like a URL. @@ -30,6 +30,10 @@ type Backend interface { // Get returns an io.ReadCloser for the Blob with the given name of type t. Get(t Type, name string) (io.ReadCloser, error) + // GetReader returns an io.ReadCloser for the Blob with the given name of + // type t at offset and length. + GetReader(t Type, name string, offset, length uint) (io.ReadCloser, error) + // Test a boolean value whether a Blob with the name and type exists. Test(t Type, name string) (bool, error) diff --git a/backend/local/local.go b/backend/local/local.go index bc9cf3b7f..0fbd5fcf7 100644 --- a/backend/local/local.go +++ b/backend/local/local.go @@ -30,7 +30,7 @@ func Open(dir string) (*Local, error) { dir, filepath.Join(dir, backend.Paths.Data), filepath.Join(dir, backend.Paths.Snapshots), - filepath.Join(dir, backend.Paths.Trees), + filepath.Join(dir, backend.Paths.Index), filepath.Join(dir, backend.Paths.Locks), filepath.Join(dir, backend.Paths.Keys), filepath.Join(dir, backend.Paths.Temp), @@ -102,7 +102,7 @@ func Create(dir string) (*Local, error) { dir, filepath.Join(dir, backend.Paths.Data), filepath.Join(dir, backend.Paths.Snapshots), - filepath.Join(dir, backend.Paths.Trees), + filepath.Join(dir, backend.Paths.Index), filepath.Join(dir, backend.Paths.Locks), filepath.Join(dir, backend.Paths.Keys), filepath.Join(dir, backend.Paths.Temp), @@ -222,7 +222,7 @@ func (lb *localBlob) Finalize(t backend.Type, name string) error { f := filename(lb.basedir, t, name) // create directories if necessary, ignore errors - if t == backend.Data || t == backend.Tree { + if t == backend.Data { os.MkdirAll(filepath.Dir(f), backend.Modes.Dir) } @@ -279,11 +279,8 @@ func dirname(base string, t backend.Type, name string) string { } case backend.Snapshot: n = backend.Paths.Snapshots - case backend.Tree: - n = backend.Paths.Trees - if len(name) > 2 { - n = filepath.Join(n, name[:2]) - } + case backend.Index: + n = backend.Paths.Index case backend.Lock: n = backend.Paths.Locks case backend.Key: @@ -298,6 +295,26 @@ func (b *Local) Get(t backend.Type, name string) (io.ReadCloser, error) { return os.Open(filename(b.p, t, name)) } +// GetReader returns an io.ReadCloser for the Blob with the given name of +// type t at offset and length. If length is 0, the reader reads until EOF. +func (b *Local) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) { + f, err := os.Open(filename(b.p, t, name)) + if err != nil { + return nil, err + } + + _, err = f.Seek(int64(offset), 0) + if err != nil { + return nil, err + } + + if length == 0 { + return f, nil + } + + return backend.LimitReader(f, int64(length)), nil +} + // Test returns true if a blob of the given type and name exists in the backend. func (b *Local) Test(t backend.Type, name string) (bool, error) { _, err := os.Stat(filename(b.p, t, name)) @@ -322,7 +339,7 @@ func (b *Local) Remove(t backend.Type, name string) error { func (b *Local) List(t backend.Type, done <-chan struct{}) <-chan string { // TODO: use os.Open() and d.Readdirnames() instead of Glob() var pattern string - if t == backend.Data || t == backend.Tree { + if t == backend.Data { pattern = filepath.Join(dirname(b.p, t, ""), "*", "*") } else { pattern = filepath.Join(dirname(b.p, t, ""), "*") diff --git a/backend/paths.go b/backend/paths.go index aa73bf6eb..d697a6cc6 100644 --- a/backend/paths.go +++ b/backend/paths.go @@ -6,7 +6,7 @@ import "os" var Paths = struct { Data string Snapshots string - Trees string + Index string Locks string Keys string Temp string @@ -15,7 +15,7 @@ var Paths = struct { }{ "data", "snapshots", - "trees", + "index", "locks", "keys", "tmp", diff --git a/backend/sftp/sftp.go b/backend/sftp/sftp.go index 4237efae4..aa0fdd2af 100644 --- a/backend/sftp/sftp.go +++ b/backend/sftp/sftp.go @@ -78,7 +78,7 @@ func Open(dir string, program string, args ...string) (*SFTP, error) { dir, filepath.Join(dir, backend.Paths.Data), filepath.Join(dir, backend.Paths.Snapshots), - filepath.Join(dir, backend.Paths.Trees), + filepath.Join(dir, backend.Paths.Index), filepath.Join(dir, backend.Paths.Locks), filepath.Join(dir, backend.Paths.Keys), filepath.Join(dir, backend.Paths.Version), @@ -152,7 +152,7 @@ func Create(dir string, program string, args ...string) (*SFTP, error) { dir, filepath.Join(dir, backend.Paths.Data), filepath.Join(dir, backend.Paths.Snapshots), - filepath.Join(dir, backend.Paths.Trees), + filepath.Join(dir, backend.Paths.Index), filepath.Join(dir, backend.Paths.Locks), filepath.Join(dir, backend.Paths.Keys), filepath.Join(dir, backend.Paths.Temp), @@ -303,7 +303,7 @@ func (r *SFTP) renameFile(oldname string, t backend.Type, name string) error { filename := r.filename(t, name) // create directories if necessary - if t == backend.Data || t == backend.Tree { + if t == backend.Data { err := r.mkdirAll(filepath.Dir(filename), backend.Modes.Dir) if err != nil { return err @@ -403,11 +403,8 @@ func (r *SFTP) dirname(t backend.Type, name string) string { } case backend.Snapshot: n = backend.Paths.Snapshots - case backend.Tree: - n = backend.Paths.Trees - if len(name) > 2 { - n = filepath.Join(n, name[:2]) - } + case backend.Index: + n = backend.Paths.Index case backend.Lock: n = backend.Paths.Locks case backend.Key: @@ -432,6 +429,26 @@ func (r *SFTP) Get(t backend.Type, name string) (io.ReadCloser, error) { return file, nil } +// GetReader returns an io.ReadCloser for the Blob with the given name of +// type t at offset and length. If length is 0, the reader reads until EOF. +func (r *SFTP) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) { + f, err := r.c.Open(r.filename(t, name)) + if err != nil { + return nil, err + } + + _, err = f.Seek(int64(offset), 0) + if err != nil { + return nil, err + } + + if length == 0 { + return f, nil + } + + return backend.LimitReader(f, int64(length)), nil +} + // Test returns true if a blob of the given type and name exists in the backend. func (r *SFTP) Test(t backend.Type, name string) (bool, error) { _, err := r.c.Lstat(r.filename(t, name)) @@ -460,7 +477,7 @@ func (r *SFTP) List(t backend.Type, done <-chan struct{}) <-chan string { go func() { defer close(ch) - if t == backend.Data || t == backend.Tree { + if t == backend.Data { // read first level basedir := r.dirname(t, "") diff --git a/cache.go b/cache.go index 19b32cb75..eb3f9bf4f 100644 --- a/cache.go +++ b/cache.go @@ -1,14 +1,12 @@ package restic import ( - "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "strings" - "sync" "github.com/restic/restic/backend" "github.com/restic/restic/debug" @@ -148,8 +146,6 @@ func (c *Cache) List(t backend.Type) ([]CacheEntry, error) { switch t { case backend.Snapshot: dir = filepath.Join(c.base, "snapshots") - case backend.Tree: - dir = filepath.Join(c.base, "trees") default: return nil, fmt.Errorf("cache not supported for type %v", t) } @@ -202,8 +198,6 @@ func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string, switch t { case backend.Snapshot: return filepath.Join(c.base, "snapshots", filename), nil - case backend.Tree: - return filepath.Join(c.base, "trees", filename), nil } return "", fmt.Errorf("cache not supported for type %v", t) @@ -211,146 +205,6 @@ func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string, // high-level functions -// RefreshSnapshots loads the maps for all snapshots and saves them to the -// local cache. Old cache entries are purged. -func (c *Cache) RefreshSnapshots(s *server.Server, p *Progress) error { - defer p.Done() - - // list cache entries - entries, err := c.List(backend.Snapshot) - if err != nil { - return err - } - - // list snapshots first - done := make(chan struct{}) - defer close(done) - - // check that snapshot blobs are cached - for name := range s.List(backend.Snapshot, done) { - id, err := backend.ParseID(name) - if err != nil { - continue - } - - // remove snapshot from list of entries - for i, e := range entries { - if e.ID.Equal(id) { - entries = append(entries[:i], entries[i+1:]...) - break - } - } - - has, err := c.Has(backend.Snapshot, "blobs", id) - if err != nil { - return err - } - - if has { - continue - } - - // else start progress reporting - p.Start() - - // build new cache - _, err = cacheSnapshotBlobs(p, s, c, id) - if err != nil { - debug.Log("Cache.RefreshSnapshots", "unable to cache snapshot blobs for %v: %v", id.Str(), err) - return err - } - } - - // remove other entries - for _, e := range entries { - debug.Log("Cache.RefreshSnapshots", "remove entry %v", e) - err = c.Purge(backend.Snapshot, e.Subtype, e.ID) - if err != nil { - return err - } - } - - return nil -} - -// cacheSnapshotBlobs creates a cache of all the blobs used within the -// snapshot. It collects all blobs from all trees and saves the resulting map -// to the cache and returns the map. -func cacheSnapshotBlobs(p *Progress, s *server.Server, c *Cache, id backend.ID) (*Map, error) { - debug.Log("CacheSnapshotBlobs", "create cache for snapshot %v", id.Str()) - - sn, err := LoadSnapshot(s, id) - if err != nil { - debug.Log("CacheSnapshotBlobs", "unable to load snapshot %v: %v", id.Str(), err) - return nil, err - } - - m := NewMap() - - // add top-level node - m.Insert(sn.Tree) - - p.Report(Stat{Trees: 1}) - - // start walker - var wg sync.WaitGroup - ch := make(chan WalkTreeJob) - - wg.Add(1) - go func() { - WalkTree(s, sn.Tree, nil, ch) - wg.Done() - }() - - for i := 0; i < maxConcurrencyPreload; i++ { - wg.Add(1) - go func() { - for job := range ch { - if job.Tree == nil { - continue - } - p.Report(Stat{Trees: 1}) - debug.Log("CacheSnapshotBlobs", "got job %v", job) - m.Merge(job.Tree.Map) - } - - wg.Done() - }() - } - - wg.Wait() - - // save blob list for snapshot - return m, c.StoreMap(id, m) -} - -func (c *Cache) StoreMap(snid backend.ID, m *Map) error { - wr, err := c.Store(backend.Snapshot, "blobs", snid) - if err != nil { - return nil - } - defer wr.Close() - - enc := json.NewEncoder(wr) - err = enc.Encode(m) - if err != nil { - return err - } - - return nil -} - -func (c *Cache) LoadMap(s *server.Server, snid backend.ID) (*Map, error) { - rd, err := c.Load(backend.Snapshot, "blobs", snid) - if err != nil { - return nil, err - } - - m := &Map{} - err = json.NewDecoder(rd).Decode(m) - return m, err -} - // GetCacheDir returns the cache directory according to XDG basedir spec, see // http://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html func GetCacheDir() (string, error) { @@ -389,3 +243,5 @@ func GetCacheDir() (string, error) { return cachedir, nil } + +// TODO: implement RefreshIndex() diff --git a/cache_test.go b/cache_test.go index 98c8ff564..aed61de9c 100644 --- a/cache_test.go +++ b/cache_test.go @@ -1,11 +1,9 @@ package restic_test import ( - "encoding/json" "testing" "github.com/restic/restic" - "github.com/restic/restic/backend" . "github.com/restic/restic/test" ) @@ -15,46 +13,14 @@ func TestCache(t *testing.T) { key := SetupKey(t, server, "geheim") server.SetKey(key) - cache, err := restic.NewCache(server) + _, err := restic.NewCache(server) OK(t, err) arch, err := restic.NewArchiver(server) OK(t, err) // archive some files, this should automatically cache all blobs from the snapshot - _, id, err := arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) + _, _, err = arch.Snapshot(nil, []string{*benchArchiveDirectory}, nil) - // try to load map from cache - rd, err := cache.Load(backend.Snapshot, "blobs", id) - OK(t, err) - - dec := json.NewDecoder(rd) - - m := &restic.Map{} - err = dec.Decode(m) - OK(t, err) - - // remove cached blob list - OK(t, cache.Purge(backend.Snapshot, "blobs", id)) - - // load map from cache again, this should fail - rd, err = cache.Load(backend.Snapshot, "blobs", id) - Assert(t, err != nil, "Expected failure did not occur") - - // recreate cached blob list - err = cache.RefreshSnapshots(server, nil) - OK(t, err) - - // load map from cache again - rd, err = cache.Load(backend.Snapshot, "blobs", id) - OK(t, err) - - dec = json.NewDecoder(rd) - - m2 := &restic.Map{} - err = dec.Decode(m2) - OK(t, err) - - // compare maps - Assert(t, m.Equals(m2), "Maps are not equal") + // TODO: test caching index } diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index ee53e4c60..8ebb5cb26 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -96,26 +96,6 @@ func (cmd CmdBackup) Usage() string { return "DIR/FILE [snapshot-ID]" } -func newCacheRefreshProgress() *restic.Progress { - p := restic.NewProgress(time.Second) - p.OnStart = func() { - fmt.Printf("refreshing cache\n") - } - - if !terminal.IsTerminal(int(os.Stdout.Fd())) { - return p - } - - p.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) { - fmt.Printf("\x1b[2K[%s] %d trees loaded\r", formatDuration(d), s.Trees) - } - p.OnDone = func(s restic.Stat, d time.Duration, ticker bool) { - fmt.Printf("\x1b[2Krefreshed cache in %s\n", formatDuration(d)) - } - - return p -} - func newScanProgress() *restic.Progress { if !terminal.IsTerminal(int(os.Stdout.Fd())) { return nil @@ -200,6 +180,11 @@ func (cmd CmdBackup) Execute(args []string) error { return err } + err = s.LoadIndex() + if err != nil { + return err + } + var ( parentSnapshot string parentSnapshotID backend.ID @@ -278,17 +263,6 @@ func (cmd CmdBackup) Execute(args []string) error { return nil } - err = arch.Cache().RefreshSnapshots(s, newCacheRefreshProgress()) - if err != nil { - return err - } - - fmt.Printf("loading blobs\n") - err = arch.Preload() - if err != nil { - return err - } - _, id, err := arch.Snapshot(newArchiveProgress(stat), target, parentSnapshotID) if err != nil { return err diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index 850ac4325..c49b7d0e2 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -4,10 +4,13 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/server" ) @@ -24,7 +27,7 @@ func init() { } func (cmd CmdCat) Usage() string { - return "[data|tree|snapshot|key|masterkey|lock] ID" + return "[pack|blob|tree|snapshot|key|masterkey|lock] ID" } func (cmd CmdCat) Execute(args []string) error { @@ -62,37 +65,20 @@ func (cmd CmdCat) Execute(args []string) error { } } + // handle all types that don't need an index switch tpe { - case "data": - // try storage id - data, err := s.LoadID(backend.Data, id) - if err == nil { - _, err = os.Stdout.Write(data) + case "index": + buf, err := s.Load(backend.Index, id) + if err != nil { return err } - _, err = os.Stdout.Write(data) + _, err = os.Stdout.Write(append(buf, '\n')) return err - case "tree": - // try storage id - tree := &restic.Tree{} - err := s.LoadJSONID(backend.Tree, id, tree) - if err != nil { - return err - } - - buf, err := json.MarshalIndent(&tree, "", " ") - if err != nil { - return err - } - - fmt.Println(string(buf)) - - return nil case "snapshot": sn := &restic.Snapshot{} - err = s.LoadJSONID(backend.Snapshot, id, sn) + err = s.LoadJSONEncrypted(backend.Snapshot, id, sn) if err != nil { return err } @@ -136,6 +122,52 @@ func (cmd CmdCat) Execute(args []string) error { return nil case "lock": return errors.New("not yet implemented") + } + + // load index, handle all the other types + err = s.LoadIndex() + if err != nil { + return err + } + + switch tpe { + case "pack": + rd, err := s.Backend().Get(backend.Data, id.String()) + if err != nil { + return err + } + + _, err = io.Copy(os.Stdout, rd) + return err + + case "blob": + data, err := s.LoadBlob(pack.Data, id) + if err == nil { + _, err = os.Stdout.Write(data) + return err + } + + _, err = os.Stdout.Write(data) + return err + + case "tree": + debug.Log("cat", "cat tree %v", id.Str()) + tree := restic.NewTree() + err = s.LoadJSONPack(pack.Tree, id, tree) + if err != nil { + debug.Log("cat", "unable to load tree %v: %v", id.Str(), err) + return err + } + + buf, err := json.MarshalIndent(&tree, "", " ") + if err != nil { + debug.Log("cat", "error json.MarshalIndent(): %v", err) + return err + } + + _, err = os.Stdout.Write(append(buf, '\n')) + return nil + default: return errors.New("invalid type") } diff --git a/cmd/restic/cmd_find.go b/cmd/restic/cmd_find.go index e80bda99f..077752001 100644 --- a/cmd/restic/cmd_find.go +++ b/cmd/restic/cmd_find.go @@ -59,9 +59,9 @@ func parseTime(str string) (time.Time, error) { return time.Time{}, fmt.Errorf("unable to parse time: %q", str) } -func (c CmdFind) findInTree(s *server.Server, blob server.Blob, path string) ([]findResult, error) { - debug.Log("restic.find", "checking tree %v\n", blob) - tree, err := restic.LoadTree(s, blob) +func (c CmdFind) findInTree(s *server.Server, id backend.ID, path string) ([]findResult, error) { + debug.Log("restic.find", "checking tree %v\n", id) + tree, err := restic.LoadTree(s, id) if err != nil { return nil, err } @@ -93,12 +93,7 @@ func (c CmdFind) findInTree(s *server.Server, blob server.Blob, path string) ([] } if node.Type == "dir" { - b, err := tree.Map.FindID(node.Subtree) - if err != nil { - return nil, err - } - - subdirResults, err := c.findInTree(s, b, filepath.Join(path, node.Name)) + subdirResults, err := c.findInTree(s, id, filepath.Join(path, node.Name)) if err != nil { return nil, err } diff --git a/cmd/restic/cmd_fsck.go b/cmd/restic/cmd_fsck.go index 2c360652e..eac4c4030 100644 --- a/cmd/restic/cmd_fsck.go +++ b/cmd/restic/cmd_fsck.go @@ -7,7 +7,9 @@ import ( "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/crypto" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/server" ) @@ -32,31 +34,31 @@ func init() { } } -func fsckFile(opts CmdFsck, s *server.Server, m *restic.Map, IDs []backend.ID) (uint64, error) { +func fsckFile(opts CmdFsck, s *server.Server, IDs []backend.ID) (uint64, error) { debug.Log("restic.fsckFile", "checking file %v", IDs) var bytes uint64 for _, id := range IDs { debug.Log("restic.fsck", " checking data blob %v\n", id) - // test if blob is in map - blob, err := m.FindID(id) + // test if blob is in the index + packID, tpe, _, length, err := s.Index().Lookup(id) if err != nil { - return 0, fmt.Errorf("storage ID for data blob %v not found", id) + return 0, fmt.Errorf("storage for blob %v (%v) not found", id, tpe) } - bytes += blob.Size - debug.Log("restic.fsck", " data blob found: %v\n", blob) + bytes += uint64(length - crypto.Extension) + debug.Log("restic.fsck", " blob found in pack %v\n", packID) if opts.CheckData { // load content - _, err := s.Load(backend.Data, blob) + _, err := s.LoadBlob(pack.Data, id) if err != nil { return 0, err } } else { // test if data blob is there - ok, err := s.Test(backend.Data, blob.Storage.String()) + ok, err := s.Test(backend.Data, packID.String()) if err != nil { return 0, err } @@ -68,17 +70,17 @@ func fsckFile(opts CmdFsck, s *server.Server, m *restic.Map, IDs []backend.ID) ( // if orphan check is active, record storage id if opts.o_data != nil { - opts.o_data.Insert(blob.Storage) + opts.o_data.Insert(id) } } return bytes, nil } -func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { - debug.Log("restic.fsckTree", "checking tree %v", blob) +func fsckTree(opts CmdFsck, s *server.Server, id backend.ID) error { + debug.Log("restic.fsckTree", "checking tree %v", id.Str()) - tree, err := restic.LoadTree(s, blob) + tree, err := restic.LoadTree(s, id) if err != nil { return err } @@ -86,7 +88,7 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { // if orphan check is active, record storage id if opts.o_trees != nil { // add ID to list - opts.o_trees.Insert(blob.Storage) + opts.o_trees.Insert(id) } var firstErr error @@ -95,23 +97,23 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { for i, node := range tree.Nodes { if node.Name == "" { - return fmt.Errorf("node %v of tree %v has no name", i, blob.ID) + return fmt.Errorf("node %v of tree %v has no name", i, id.Str()) } if node.Type == "" { - return fmt.Errorf("node %q of tree %v has no type", node.Name, blob.ID) + return fmt.Errorf("node %q of tree %v has no type", node.Name, id.Str()) } switch node.Type { case "file": if node.Content == nil { - debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, blob.ID, node) - return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, blob.ID, node) + debug.Log("restic.fsckTree", "file node %q of tree %v has no content: %v", node.Name, id, node) + return fmt.Errorf("file node %q of tree %v has no content: %v", node.Name, id, node) } if node.Content == nil && node.Error == "" { - debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, blob.ID) - return fmt.Errorf("file node %q of tree %v has no content", node.Name, blob.ID) + debug.Log("restic.fsckTree", "file node %q of tree %v has no content", node.Name, id) + return fmt.Errorf("file node %q of tree %v has no content", node.Name, id) } // record ids @@ -119,32 +121,25 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { seenIDs.Insert(id) } - debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, blob.ID.Str()) - bytes, err := fsckFile(opts, s, tree.Map, node.Content) + debug.Log("restic.fsckTree", "check file %v (%v)", node.Name, id.Str()) + bytes, err := fsckFile(opts, s, node.Content) if err != nil { return err } if bytes != node.Size { - debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, blob, node.Size, bytes) - return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, blob, node.Size, bytes) + debug.Log("restic.fsckTree", "file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes) + return fmt.Errorf("file node %q of tree %v has size %d, but only %d bytes could be found", node.Name, id, node.Size, bytes) } case "dir": if node.Subtree == nil { - return fmt.Errorf("dir node %q of tree %v (storage id %v) has no subtree", node.Name, blob.ID, blob.Storage) - } - - // lookup blob - subtreeBlob, err := tree.Map.FindID(node.Subtree) - if err != nil { - firstErr = err - fmt.Fprintf(os.Stderr, "%v\n", err) + return fmt.Errorf("dir node %q of tree %v has no subtree", node.Name, id) } // record id seenIDs.Insert(node.Subtree) - err = fsckTree(opts, s, subtreeBlob) + err = fsckTree(opts, s, node.Subtree) if err != nil { firstErr = err fmt.Fprintf(os.Stderr, "%v\n", err) @@ -153,11 +148,11 @@ func fsckTree(opts CmdFsck, s *server.Server, blob server.Blob) error { } // check map for unused ids - for _, id := range tree.Map.IDs() { - if seenIDs.Find(id) != nil { - return fmt.Errorf("tree %v: map contains unused ID %v", blob.ID, id) - } - } + // for _, id := range tree.Map.IDs() { + // if seenIDs.Find(id) != nil { + // return fmt.Errorf("tree %v: map contains unused ID %v", id, id) + // } + // } return firstErr } @@ -170,10 +165,6 @@ func fsckSnapshot(opts CmdFsck, s *server.Server, id backend.ID) error { return fmt.Errorf("loading snapshot %v failed: %v", id, err) } - if !sn.Tree.Valid() { - return fmt.Errorf("snapshot %s has invalid tree %v", sn.ID(), sn.Tree) - } - err = fsckTree(opts, s, sn.Tree) if err != nil { debug.Log("restic.fsck", " checking tree %v for snapshot %v\n", sn.Tree, id) @@ -201,6 +192,11 @@ func (cmd CmdFsck) Execute(args []string) error { return err } + err = s.LoadIndex() + if err != nil { + return err + } + if cmd.Snapshot != "" { name, err := s.FindSnapshot(cmd.Snapshot) if err != nil { @@ -249,40 +245,26 @@ func (cmd CmdFsck) Execute(args []string) error { debug.Log("restic.fsck", "starting orphaned check\n") - l := []struct { - desc string - tpe backend.Type - set *backend.IDSet - }{ - {"data blob", backend.Data, cmd.o_data}, - {"tree", backend.Tree, cmd.o_trees}, - } + cnt := make(map[pack.BlobType]*backend.IDSet) + cnt[pack.Data] = backend.NewIDSet() + cnt[pack.Tree] = backend.NewIDSet() - for _, d := range l { - debug.Log("restic.fsck", "checking for orphaned %v\n", d.desc) + for blob := range s.Index().Each(done) { + fmt.Println(blob.ID) - done := make(chan struct{}) - - for name := range s.List(d.tpe, done) { - id, err := backend.ParseID(name) - if err != nil { - fmt.Fprintf(os.Stderr, "invalid id for %v: %v\n", d.tpe, name) + err = cnt[blob.Type].Find(blob.ID) + if err != nil { + if !cmd.RemoveOrphaned { + fmt.Printf("orphaned %v blob %v\n", blob.Type, blob.ID) continue } - err = d.set.Find(id) - if err != nil { - if !cmd.RemoveOrphaned { - fmt.Printf("orphaned %v %v\n", d.desc, id) - continue - } - - fmt.Printf("removing orphaned %v %v\n", d.desc, id) - err := s.Remove(d.tpe, name) - if err != nil { - return err - } - } + fmt.Printf("removing orphaned %v blob %v\n", blob.Type, blob.ID) + // err := s.Remove(d.tpe, name) + // if err != nil { + // return err + // } + return errors.New("not implemented") } } diff --git a/cmd/restic/cmd_list.go b/cmd/restic/cmd_list.go index c7ee12f54..01201cef0 100644 --- a/cmd/restic/cmd_list.go +++ b/cmd/restic/cmd_list.go @@ -20,7 +20,7 @@ func init() { } func (cmd CmdList) Usage() string { - return "[data|trees|snapshots|keys|locks]" + return "[blobs|packs|index|snapshots|keys|locks]" } func (cmd CmdList) Execute(args []string) error { @@ -35,10 +35,21 @@ func (cmd CmdList) Execute(args []string) error { var t backend.Type switch args[0] { - case "data": + case "blobs": + err = s.LoadIndex() + if err != nil { + return err + } + + for blob := range s.Index().Each(nil) { + fmt.Println(blob.ID) + } + + return nil + case "packs": t = backend.Data - case "trees": - t = backend.Tree + case "index": + t = backend.Index case "snapshots": t = backend.Snapshot case "keys": diff --git a/cmd/restic/cmd_ls.go b/cmd/restic/cmd_ls.go index ddbe460a3..ff3bbc015 100644 --- a/cmd/restic/cmd_ls.go +++ b/cmd/restic/cmd_ls.go @@ -38,8 +38,8 @@ func printNode(prefix string, n *restic.Node) string { } } -func printTree(prefix string, s *server.Server, blob server.Blob) error { - tree, err := restic.LoadTree(s, blob) +func printTree(prefix string, s *server.Server, id backend.ID) error { + tree, err := restic.LoadTree(s, id) if err != nil { return err } @@ -48,12 +48,7 @@ func printTree(prefix string, s *server.Server, blob server.Blob) error { fmt.Println(printNode(prefix, entry)) if entry.Type == "dir" && entry.Subtree != nil { - b, err := tree.Map.FindID(entry.Subtree) - if err != nil { - return err - } - - err = printTree(filepath.Join(prefix, entry.Name), s, b) + err = printTree(filepath.Join(prefix, entry.Name), s, id) if err != nil { return err } diff --git a/cmd/restic/cmd_restore.go b/cmd/restic/cmd_restore.go index 732b489a6..5d6589a82 100644 --- a/cmd/restic/cmd_restore.go +++ b/cmd/restic/cmd_restore.go @@ -35,6 +35,11 @@ func (cmd CmdRestore) Execute(args []string) error { return err } + err = s.LoadIndex() + if err != nil { + return err + } + name, err := backend.FindSnapshot(s, args[0]) if err != nil { errx(1, "invalid id %q: %v", args[0], err) diff --git a/doc/Design.md b/doc/Design.md index ff28b207b..083cba818 100644 --- a/doc/Design.md +++ b/doc/Design.md @@ -64,6 +64,10 @@ The basic layout of a sample restic repository is shown below: /tmp/restic-repo ├── data + │ ├── 21 + │ │ └── 2159dd48f8a24f33c307b750592773f8b71ff8d11452132a7b2e2a6a01611be1 + │ ├── 32 + │ │ └── 32ea976bc30771cebad8285cd99120ac8786f9ffd42141d452458089985043a5 │ ├── 59 │ │ └── 59fe4bcde59bd6222eba87795e35a90d82cd2f138a27b6835032b7b58173a426 │ ├── 73 @@ -71,25 +75,14 @@ The basic layout of a sample restic repository is shown below: │ [...] ├── id ├── index - │ └── c38f5fb68307c6a3e3aa945d556e325dc38f5fb68307c6a3e3aa945d556e325d + │ ├── c38f5fb68307c6a3e3aa945d556e325dc38f5fb68307c6a3e3aa945d556e325d + │ └── ca171b1b7394d90d330b265d90f506f9984043b342525f019788f97e745c71fd ├── keys │ └── b02de829beeb3c01a63e6b25cbd421a98fef144f03b9a02e46eff9e2ca3f0bd7 ├── locks ├── snapshots │ └── 22a5af1bdc6e616f8a29579458c49627e01b32210d09adb288d1ecda7c5711ec ├── tmp - ├── trees - │ ├── 21 - │ │ └── 2159dd48f8a24f33c307b750592773f8b71ff8d11452132a7b2e2a6a01611be1 - │ ├── 32 - │ │ └── 32ea976bc30771cebad8285cd99120ac8786f9ffd42141d452458089985043a5 - │ ├── 95 - │ │ └── 95f75feb05a7cc73e328b2efa668b1ea68f65fece55a93bc65aff6cd0bcfeefc - │ ├── b8 - │ │ └── b8138ab08a4722596ac89c917827358da4672eac68e3c03a8115b88dbf4bfb59 - │ ├── e0 - │ │ └── e01150928f7ad24befd6ec15b087de1b9e0f92edabd8e5cabb3317f8b20ad044 - │ [...] └── version A repository can be initialized with the `restic init` command, e.g.: @@ -99,39 +92,51 @@ A repository can be initialized with the `restic init` command, e.g.: Pack Format ----------- -All files in the repository except Key, Tree and Data files just contain raw -data, stored as `IV || Ciphertext || MAC`. Tree and Data files may contain -several Blobs of data. The format is described in the following. +All files in the repository except Key and Data files just contain raw data, +stored as `IV || Ciphertext || MAC`. Data files may contain one or more Blobs +of data. The format is described in the following. -A Pack starts with a nonce and a header, the header describes the content and -is encrypted and signed. The Pack's structure is as follows: +The Pack's structure is as follows: - NONCE || Header_Length || - IV_Header || Ciphertext_Header || MAC_Header || - IV_Blob_1 || Ciphertext_Blob_1 || MAC_Blob_1 || - [...] - IV_Blob_n || Ciphertext_Blob_n || MAC_Blob_n || - MAC + EncryptedBlob1 || ... || EncryptedBlobN || EncryptedHeader || Header_Length -`NONCE` consists of 16 bytes and `Header_Length` is a four byte integer in -little-endian encoding. +At the end of the Pack is a header, which describes the content and is +encrypted and signed. `Header_Length` is the length of the encrypted header +encoded as a four byte integer in little-endian encoding. Placing the header at +the end of a file allows writing the blobs in a continuous stream as soon as +they are read during the backup phase. This reduces code complexity and avoids +having to re-write a file once the pack is complete and the content and length +of the header is known. -All the parts (`Ciphertext_Header`, `Ciphertext_Blob1` etc.) are signed and -encrypted independently. In addition, the complete pack is signed using -`NONCE`. This enables repository reorganisation without having to touch the -encrypted Blobs. In addition it also allows efficient indexing, for only the -header needs to be read in order to find out which Blobs are contained in the -Pack. Since the header is signed, authenticity of the header can be checked -without having to read the complete Pack. +All the blobs (`EncryptedBlob1`, `EncryptedBlobN` etc.) are signed and +encrypted independently. This enables repository reorganisation without having +to touch the encrypted Blobs. In addition it also allows efficient indexing, +for only the header needs to be read in order to find out which Blobs are +contained in the Pack. Since the header is signed, authenticity of the header +can be checked without having to read the complete Pack. After decryption, a Pack's header consists of the following elements: - Length(IV_Blob_1+Ciphertext_Blob1+MAC_Blob_1) || Hash(Plaintext_Blob_1) || + Type_Blob1 || Length(EncryptedBlob1) || Hash(Plaintext_Blob1) || [...] - Length(IV_Blob_n+Ciphertext_Blob_n+MAC_Blob_n) || Hash(Plaintext_Blob_n) || + Type_BlobN || Length(EncryptedBlobN) || Hash(Plaintext_Blobn) || This is enough to calculate the offsets for all the Blobs in the Pack. Length -is the length of a Blob as a four byte integer in little-endian format. +is the length of a Blob as a four byte integer in little-endian format. The +type field is a one byte field and labels the content of a blob according to +the following table: + + Type | Meaning + -----|--------- + 0 | data + 1 | tree + +All other types are invalid, more types may be added in the future. + +For reconstructing the index or parsing a pack without an index, first the last +four bytes must be read in order to find the length of the header. Afterwards, +the header can be read and parsed, which yields all plaintext hashes, types, +offsets and lengths of all included blobs. Indexing -------- @@ -139,23 +144,40 @@ Indexing Index files contain information about Data and Tree Blobs and the Packs they are contained in and store this information in the repository. When the local cached index is not accessible any more, the index files can be downloaded and -used to reconstruct the index. The index Blobs are encrypted and signed like -Data and Tree Blobs, so the outer structure is `IV || Ciphertext || MAC` again. -The plaintext consists of a JSON document like the following: +used to reconstruct the index. The files are encrypted and signed like Data and +Tree Blobs, so the outer structure is `IV || Ciphertext || MAC` again. The +plaintext consists of a JSON document like the following: - [ - { - "id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c", - "blobs": [ - "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce", - "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae", - "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66" - ] - } - ] + [ { + "id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c", + "blobs": [ + { + "id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce", + "type": "data", + "offset": 0, + "length": 25 + },{ + "id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae", + "type": "tree", + "offset": 38, + "length": 100 + }, + { + "id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66", + "type": "data", + "offset": 150, + "length": 123 + } + ] + } ] -This JSON document lists all the Blobs with contents. In this example, the Pack -`73d04e61` contains three Blobs, the plaintext hashes are listed afterwards. +This JSON document lists Blobs with contents. In this example, the Pack +`73d04e61` contains two data Blobs and one Tree blob, the plaintext hashes are +listed afterwards. + +There may be an arbitrary number of index files, containing information on +non-disjoint sets of Packs. The number of packs described in a single file is +chosen so that the file size is kep below 8 MiB. Keys, Encryption and MAC ------------------------ diff --git a/map.go b/map.go deleted file mode 100644 index eec11d567..000000000 --- a/map.go +++ /dev/null @@ -1,219 +0,0 @@ -package restic - -import ( - "encoding/json" - "errors" - "sort" - "sync" - - "github.com/restic/restic/backend" - "github.com/restic/restic/debug" - "github.com/restic/restic/server" -) - -type Map struct { - list []server.Blob - m sync.Mutex -} - -var ErrBlobNotFound = errors.New("Blob not found") - -func NewMap() *Map { - return &Map{ - list: []server.Blob{}, - } -} - -func (bl *Map) find(blob server.Blob, checkSize bool) (int, server.Blob, error) { - pos := sort.Search(len(bl.list), func(i int) bool { - return blob.ID.Compare(bl.list[i].ID) >= 0 - }) - - if pos < len(bl.list) { - b := bl.list[pos] - if blob.ID.Compare(b.ID) == 0 && (!checkSize || blob.Size == b.Size) { - return pos, b, nil - } - } - - return pos, server.Blob{}, ErrBlobNotFound -} - -func (bl *Map) Find(blob server.Blob) (server.Blob, error) { - bl.m.Lock() - defer bl.m.Unlock() - - _, blob, err := bl.find(blob, true) - return blob, err -} - -func (bl *Map) FindID(id backend.ID) (server.Blob, error) { - bl.m.Lock() - defer bl.m.Unlock() - - _, blob, err := bl.find(server.Blob{ID: id}, false) - return blob, err -} - -func (bl *Map) Merge(other *Map) { - bl.m.Lock() - defer bl.m.Unlock() - other.m.Lock() - defer other.m.Unlock() - - for _, blob := range other.list { - bl.insert(blob) - } -} - -func (bl *Map) insert(blob server.Blob) server.Blob { - pos, b, err := bl.find(blob, true) - if err == nil { - // already present - return b - } - - // insert blob - // https://code.google.com/p/go-wiki/wiki/SliceTricks - bl.list = append(bl.list, server.Blob{}) - copy(bl.list[pos+1:], bl.list[pos:]) - bl.list[pos] = blob - - return blob -} - -func (bl *Map) Insert(blob server.Blob) server.Blob { - bl.m.Lock() - defer bl.m.Unlock() - - debug.Log("Map.Insert", " Map<%p> insert %v", bl, blob) - - return bl.insert(blob) -} - -func (bl *Map) MarshalJSON() ([]byte, error) { - return json.Marshal(bl.list) -} - -func (bl *Map) UnmarshalJSON(data []byte) error { - return json.Unmarshal(data, &bl.list) -} - -func (bl *Map) IDs() []backend.ID { - bl.m.Lock() - defer bl.m.Unlock() - - ids := make([]backend.ID, 0, len(bl.list)) - for _, b := range bl.list { - ids = append(ids, b.ID) - } - - return ids -} - -func (bl *Map) StorageIDs() []backend.ID { - bl.m.Lock() - defer bl.m.Unlock() - - ids := make([]backend.ID, 0, len(bl.list)) - for _, b := range bl.list { - ids = append(ids, b.Storage) - } - - return ids -} - -func (bl *Map) Equals(other *Map) bool { - if bl == nil && other == nil { - return true - } - - if bl == nil || other == nil { - return false - } - - bl.m.Lock() - defer bl.m.Unlock() - - if len(bl.list) != len(other.list) { - debug.Log("Map.Equals", "length does not match: %d != %d", len(bl.list), len(other.list)) - return false - } - - for i := 0; i < len(bl.list); i++ { - if bl.list[i].Compare(other.list[i]) != 0 { - debug.Log("Map.Equals", "entry %d does not match: %v != %v", i, bl.list[i], other.list[i]) - return false - } - } - - return true -} - -// Each calls f for each blob in the Map. While Each is running, no other -// operation is possible, since a mutex is held for the whole time. -func (bl *Map) Each(f func(blob server.Blob)) { - bl.m.Lock() - defer bl.m.Unlock() - - for _, blob := range bl.list { - f(blob) - } -} - -// Select returns a list of of blobs from the plaintext IDs given in list. -func (bl *Map) Select(list backend.IDs) (server.Blobs, error) { - bl.m.Lock() - defer bl.m.Unlock() - - blobs := make(server.Blobs, 0, len(list)) - for _, id := range list { - _, blob, err := bl.find(server.Blob{ID: id}, false) - if err != nil { - return nil, err - } - - blobs = append(blobs, blob) - } - - return blobs, nil -} - -// Len returns the number of blobs in the map. -func (bl *Map) Len() int { - bl.m.Lock() - defer bl.m.Unlock() - - return len(bl.list) -} - -// Prune deletes all IDs from the map except the ones listed in ids. -func (bl *Map) Prune(ids *backend.IDSet) { - bl.m.Lock() - defer bl.m.Unlock() - - pos := 0 - for pos < len(bl.list) { - blob := bl.list[pos] - if ids.Find(blob.ID) != nil { - // remove element - bl.list = append(bl.list[:pos], bl.list[pos+1:]...) - continue - } - - pos++ - } -} - -// DeleteID removes the plaintext ID id from the map. -func (bl *Map) DeleteID(id backend.ID) { - bl.m.Lock() - defer bl.m.Unlock() - - pos, _, err := bl.find(server.Blob{ID: id}, false) - if err != nil { - return - } - - bl.list = append(bl.list[:pos], bl.list[pos+1:]...) -} diff --git a/map_test.go b/map_test.go deleted file mode 100644 index f83fa6828..000000000 --- a/map_test.go +++ /dev/null @@ -1,147 +0,0 @@ -package restic_test - -import ( - "crypto/rand" - "encoding/json" - "flag" - "io" - mrand "math/rand" - "sync" - "testing" - "time" - - "github.com/restic/restic" - "github.com/restic/restic/backend" - "github.com/restic/restic/server" - . "github.com/restic/restic/test" -) - -var maxWorkers = flag.Uint("workers", 20, "number of workers to test Map concurrent access against") - -func randomID() []byte { - buf := make([]byte, backend.IDSize) - _, err := io.ReadFull(rand.Reader, buf) - if err != nil { - panic(err) - } - return buf -} - -func newBlob() server.Blob { - return server.Blob{ - ID: randomID(), - Size: uint64(mrand.Uint32()), - Storage: randomID(), - StorageSize: uint64(mrand.Uint32()), - } -} - -// Test basic functionality -func TestMap(t *testing.T) { - bl := restic.NewMap() - - b := newBlob() - bl.Insert(b) - - for i := 0; i < 1000; i++ { - bl.Insert(newBlob()) - } - - b2, err := bl.Find(server.Blob{ID: b.ID, Size: b.Size}) - OK(t, err) - Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) - - b2, err = bl.FindID(b.ID) - OK(t, err) - Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) - - bl2 := restic.NewMap() - for i := 0; i < 1000; i++ { - bl.Insert(newBlob()) - } - - b2, err = bl2.Find(b) - Assert(t, err != nil, "found ID in restic that was never inserted: %v", b2) - - bl2.Merge(bl) - - b2, err = bl2.Find(b) - - if err != nil { - t.Fatal(err) - } - - if b.Compare(b2) != 0 { - t.Fatalf("items are not equal: want %v, got %v", b, b2) - } -} - -// Test JSON encode/decode -func TestMapJSON(t *testing.T) { - bl := restic.NewMap() - b := server.Blob{ID: randomID()} - bl.Insert(b) - - b2, err := bl.Find(b) - OK(t, err) - Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) - - buf, err := json.Marshal(bl) - OK(t, err) - - bl2 := restic.Map{} - json.Unmarshal(buf, &bl2) - - b2, err = bl2.Find(b) - OK(t, err) - Assert(t, b2.Compare(b) == 0, "items are not equal: want %v, got %v", b, b2) - - buf, err = json.Marshal(bl2) - OK(t, err) -} - -// random insert/find access by several goroutines -func TestMapRandom(t *testing.T) { - var wg sync.WaitGroup - - worker := func(bl *restic.Map) { - defer wg.Done() - - b := newBlob() - bl.Insert(b) - - for i := 0; i < 200; i++ { - bl.Insert(newBlob()) - } - - d := time.Duration(mrand.Intn(10)*100) * time.Millisecond - time.Sleep(d) - - for i := 0; i < 100; i++ { - b2, err := bl.Find(b) - if err != nil { - t.Fatal(err) - } - - if b.Compare(b2) != 0 { - t.Fatalf("items are not equal: want %v, got %v", b, b2) - } - } - - bl2 := restic.NewMap() - for i := 0; i < 200; i++ { - bl2.Insert(newBlob()) - } - - bl2.Merge(bl) - } - - bl := restic.NewMap() - - for i := 0; uint(i) < *maxWorkers; i++ { - wg.Add(1) - go worker(bl) - } - - wg.Wait() -} diff --git a/node.go b/node.go index 45a6a9be0..b29f2fe45 100644 --- a/node.go +++ b/node.go @@ -12,6 +12,7 @@ import ( "github.com/juju/arrar" "github.com/restic/restic/backend" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/server" ) @@ -98,14 +99,14 @@ func nodeTypeFromFileInfo(fi os.FileInfo) string { return "" } -func (node *Node) CreateAt(path string, m *Map, s *server.Server) error { +func (node *Node) CreateAt(path string, s *server.Server) error { switch node.Type { case "dir": if err := node.createDirAt(path); err != nil { return err } case "file": - if err := node.createFileAt(path, m, s); err != nil { + if err := node.createFileAt(path, s); err != nil { return err } case "symlink": @@ -171,7 +172,7 @@ func (node Node) createDirAt(path string) error { return nil } -func (node Node) createFileAt(path string, m *Map, s *server.Server) error { +func (node Node) createFileAt(path string, s *server.Server) error { f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, 0600) defer f.Close() @@ -179,13 +180,8 @@ func (node Node) createFileAt(path string, m *Map, s *server.Server) error { return arrar.Annotate(err, "OpenFile") } - for _, blobid := range node.Content { - blob, err := m.FindID(blobid) - if err != nil { - return arrar.Annotate(err, "Find Blob") - } - - buf, err := s.Load(backend.Data, blob) + for _, id := range node.Content { + buf, err := s.LoadBlob(pack.Data, id) if err != nil { return arrar.Annotate(err, "Load") } diff --git a/pack/pack.go b/pack/pack.go new file mode 100644 index 000000000..1724003b1 --- /dev/null +++ b/pack/pack.go @@ -0,0 +1,291 @@ +package pack + +import ( + "crypto/sha256" + "encoding/binary" + "errors" + "fmt" + "io" + "sync" + + "github.com/restic/restic/backend" + "github.com/restic/restic/crypto" +) + +type BlobType uint8 + +const ( + Data BlobType = 0 + Tree = 1 +) + +func (t BlobType) String() string { + switch t { + case Data: + return "data" + case Tree: + return "tree" + } + + return fmt.Sprintf("", t) +} + +func (t BlobType) MarshalJSON() ([]byte, error) { + switch t { + case Data: + return []byte(`"data"`), nil + case Tree: + return []byte(`"tree"`), nil + } + + return nil, errors.New("unknown blob type") +} + +func (t *BlobType) UnmarshalJSON(buf []byte) error { + switch string(buf) { + case `"data"`: + *t = Data + case `"tree"`: + *t = Tree + default: + return errors.New("unknown blob type") + } + + return nil +} + +// Blob is a blob within a pack. +type Blob struct { + Type BlobType + Length uint32 + ID backend.ID + Offset uint +} + +// GetReader returns an io.Reader for the blob entry e. +func (e Blob) GetReader(rd io.ReadSeeker) (io.Reader, error) { + // seek to the correct location + _, err := rd.Seek(int64(e.Offset), 0) + if err != nil { + return nil, err + } + + return io.LimitReader(rd, int64(e.Length)), nil +} + +// Packer is used to create a new Pack. +type Packer struct { + blobs []Blob + + bytes uint + k *crypto.Key + wr io.Writer + hw *backend.HashingWriter + + m sync.Mutex +} + +// NewPacker returns a new Packer that can be used to pack blobs +// together. +func NewPacker(k *crypto.Key, w io.Writer) *Packer { + return &Packer{k: k, wr: w, hw: backend.NewHashingWriter(w, sha256.New())} +} + +// Add saves the data read from rd as a new blob to the packer. Returned is the +// number of bytes written to the pack. +func (p *Packer) Add(t BlobType, id backend.ID, rd io.Reader) (int64, error) { + p.m.Lock() + defer p.m.Unlock() + + c := Blob{Type: t, ID: id} + + n, err := io.Copy(p.hw, rd) + c.Length = uint32(n) + c.Offset = p.bytes + p.bytes += uint(n) + p.blobs = append(p.blobs, c) + + return n, err +} + +var entrySize = uint(binary.Size(BlobType(0)) + binary.Size(uint32(0)) + backend.IDSize) + +// headerEntry is used with encoding/binary to read and write header entries +type headerEntry struct { + Type BlobType + Length uint32 + ID [backend.IDSize]byte +} + +// Finalize writes the header for all added blobs and finalizes the pack. +// Returned are the complete number of bytes written, including the header. +// After Finalize() has finished, the ID of this pack can be obtained by +// calling ID(). +func (p *Packer) Finalize() (bytesWritten uint, err error) { + p.m.Lock() + defer p.m.Unlock() + + bytesWritten = p.bytes + + // create writer to encrypt header + wr := crypto.EncryptTo(p.k, p.hw) + + bytesHeader, err := p.writeHeader(wr) + if err != nil { + wr.Close() + return bytesWritten + bytesHeader, err + } + + bytesWritten += bytesHeader + + // finalize encrypted header + err = wr.Close() + if err != nil { + return bytesWritten, err + } + + // account for crypto overhead + bytesWritten += crypto.Extension + + // write length + err = binary.Write(p.hw, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension)) + if err != nil { + return bytesWritten, err + } + bytesWritten += uint(binary.Size(uint32(0))) + + p.bytes = uint(bytesWritten) + + return bytesWritten, nil +} + +// writeHeader constructs and writes the header to wr. +func (p *Packer) writeHeader(wr io.Writer) (bytesWritten uint, err error) { + for _, b := range p.blobs { + entry := headerEntry{ + Type: b.Type, + Length: b.Length, + } + copy(entry.ID[:], b.ID) + + err := binary.Write(wr, binary.LittleEndian, entry) + if err != nil { + return bytesWritten, err + } + + bytesWritten += entrySize + } + + return +} + +// ID returns the ID of all data written so far. +func (p *Packer) ID() backend.ID { + p.m.Lock() + defer p.m.Unlock() + + return p.hw.Sum(nil) +} + +// Size returns the number of bytes written so far. +func (p *Packer) Size() uint { + p.m.Lock() + defer p.m.Unlock() + + return p.bytes +} + +// Count returns the number of blobs in this packer. +func (p *Packer) Count() int { + p.m.Lock() + defer p.m.Unlock() + + return len(p.blobs) +} + +// Blobs returns the slice of blobs that have been written. +func (p *Packer) Blobs() []Blob { + p.m.Lock() + defer p.m.Unlock() + + return p.blobs +} + +// Writer returns the underlying writer. +func (p *Packer) Writer() io.Writer { + return p.wr +} + +func (p *Packer) String() string { + return fmt.Sprintf("", len(p.blobs), p.bytes) +} + +// Unpacker is used to read individual blobs from a pack. +type Unpacker struct { + rd io.ReadSeeker + Entries []Blob + k *crypto.Key +} + +// NewUnpacker returns a pointer to Unpacker which can be used to read +// individual Blobs from a pack. +func NewUnpacker(k *crypto.Key, entries []Blob, rd io.ReadSeeker) (*Unpacker, error) { + var err error + ls := binary.Size(uint32(0)) + + // reset to the end to read header length + _, err = rd.Seek(-int64(ls), 2) + if err != nil { + return nil, fmt.Errorf("seeking to read header length failed: %v", err) + } + + var length uint32 + err = binary.Read(rd, binary.LittleEndian, &length) + if err != nil { + return nil, fmt.Errorf("reading header length failed: %v", err) + } + + // reset to the beginning of the header + _, err = rd.Seek(-int64(ls)-int64(length), 2) + if err != nil { + return nil, fmt.Errorf("seeking to read header length failed: %v", err) + } + + // read header + hrd, err := crypto.DecryptFrom(k, io.LimitReader(rd, int64(length))) + if err != nil { + return nil, err + } + + if entries == nil { + pos := uint(0) + for { + e := headerEntry{} + err = binary.Read(hrd, binary.LittleEndian, &e) + if err == io.EOF { + break + } + + if err != nil { + return nil, err + } + + entries = append(entries, Blob{ + Type: e.Type, + Length: e.Length, + ID: e.ID[:], + Offset: pos, + }) + + pos += uint(e.Length) + } + } + + p := &Unpacker{ + rd: rd, + k: k, + Entries: entries, + } + + return p, nil +} diff --git a/pack/pack_test.go b/pack/pack_test.go new file mode 100644 index 000000000..001161090 --- /dev/null +++ b/pack/pack_test.go @@ -0,0 +1,109 @@ +package pack_test + +import ( + "bytes" + "crypto/rand" + "crypto/sha256" + "encoding/binary" + "encoding/json" + "io" + "io/ioutil" + "testing" + + "github.com/restic/restic/backend" + "github.com/restic/restic/crypto" + "github.com/restic/restic/pack" + . "github.com/restic/restic/test" +) + +var lengths = []int{23, 31650, 25860, 10928, 13769, 19862, 5211, 127, 13690, 30231} + +func TestCreatePack(t *testing.T) { + type Buf struct { + data []byte + id backend.ID + } + + bufs := []Buf{} + + for _, l := range lengths { + b := make([]byte, l) + _, err := io.ReadFull(rand.Reader, b) + OK(t, err) + h := sha256.Sum256(b) + bufs = append(bufs, Buf{data: b, id: h[:]}) + } + + file := bytes.NewBuffer(nil) + + // create random keys + k := crypto.NewKey() + + // pack blobs + p := pack.NewPacker(k, file) + for _, b := range bufs { + p.Add(pack.Tree, b.id, bytes.NewReader(b.data)) + } + + // write file + n, err := p.Finalize() + OK(t, err) + + written := 0 + // data + for _, l := range lengths { + written += l + } + // header length + written += binary.Size(uint32(0)) + // header + written += len(lengths) * (binary.Size(pack.BlobType(0)) + binary.Size(uint32(0)) + backend.IDSize) + // header crypto + written += crypto.Extension + + // check length + Equals(t, uint(written), n) + Equals(t, uint(written), p.Size()) + + // read and parse it again + rd := bytes.NewReader(file.Bytes()) + np, err := pack.NewUnpacker(k, nil, rd) + OK(t, err) + Equals(t, len(np.Entries), len(bufs)) + + for i, b := range bufs { + e := np.Entries[i] + Equals(t, b.id, e.ID) + + brd, err := e.GetReader(rd) + OK(t, err) + data, err := ioutil.ReadAll(brd) + OK(t, err) + + Assert(t, bytes.Equal(b.data, data), + "data for blob %v doesn't match", i) + } +} + +var blobTypeJson = []struct { + t pack.BlobType + res string +}{ + {pack.Data, `"data"`}, + {pack.Tree, `"tree"`}, +} + +func TestBlobTypeJSON(t *testing.T) { + for _, test := range blobTypeJson { + // test serialize + buf, err := json.Marshal(test.t) + OK(t, err) + Equals(t, test.res, string(buf)) + + // test unserialize + var v pack.BlobType + err = json.Unmarshal([]byte(test.res), &v) + OK(t, err) + Equals(t, test.t, v) + } +} diff --git a/restorer.go b/restorer.go index e7a8d18ca..54e797abc 100644 --- a/restorer.go +++ b/restorer.go @@ -36,8 +36,8 @@ func NewRestorer(s *server.Server, snid backend.ID) (*Restorer, error) { return r, nil } -func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error { - tree, err := LoadTree(res.s, treeBlob) +func (res *Restorer) to(dst string, dir string, treeID backend.ID) error { + tree, err := LoadTree(res.s, treeID) if err != nil { return res.Error(dir, nil, arrar.Annotate(err, "LoadTree")) } @@ -47,7 +47,7 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error { if res.Filter == nil || res.Filter(filepath.Join(dir, node.Name), dstpath, node) { - err := node.CreateAt(dstpath, tree.Map, res.s) + err := node.CreateAt(dstpath, res.s) // Did it fail because of ENOENT? if arrar.Check(err, func(err error) bool { @@ -60,7 +60,7 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error { // Create parent directories and retry err = os.MkdirAll(filepath.Dir(dstpath), 0700) if err == nil || err == os.ErrExist { - err = node.CreateAt(dstpath, tree.Map, res.s) + err = node.CreateAt(dstpath, res.s) } } @@ -74,20 +74,11 @@ func (res *Restorer) to(dst string, dir string, treeBlob server.Blob) error { if node.Type == "dir" { if node.Subtree == nil { - return fmt.Errorf("Dir without subtree in tree %s", treeBlob) + return fmt.Errorf("Dir without subtree in tree %v", treeID.Str()) } subp := filepath.Join(dir, node.Name) - - subtreeBlob, err := tree.Map.FindID(node.Subtree) - if err != nil { - err = res.Error(subp, node, arrar.Annotate(err, "lookup subtree")) - if err != nil { - return err - } - } - - err = res.to(dst, subp, subtreeBlob) + err = res.to(dst, subp, node.Subtree) if err != nil { err = res.Error(subp, node, arrar.Annotate(err, "restore subtree")) if err != nil { diff --git a/server/index.go b/server/index.go new file mode 100644 index 000000000..a6271ff69 --- /dev/null +++ b/server/index.go @@ -0,0 +1,258 @@ +package server + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "sync" + + "github.com/restic/restic/backend" + "github.com/restic/restic/debug" + "github.com/restic/restic/pack" +) + +// Index holds a lookup table for id -> pack. +type Index struct { + m sync.Mutex + pack map[string]indexEntry +} + +type indexEntry struct { + tpe pack.BlobType + packID backend.ID + offset uint + length uint + old bool +} + +// NewIndex returns a new index. +func NewIndex() *Index { + return &Index{ + pack: make(map[string]indexEntry), + } +} + +func (idx *Index) store(t pack.BlobType, id, pack backend.ID, offset, length uint, old bool) { + idx.pack[id.String()] = indexEntry{ + tpe: t, + packID: pack, + offset: offset, + length: length, + old: old, + } +} + +// Store remembers the id and pack in the index. +func (idx *Index) Store(t pack.BlobType, id, pack backend.ID, offset, length uint) { + idx.m.Lock() + defer idx.m.Unlock() + + debug.Log("Index.Store", "pack %v contains id %v (%v), offset %v, length %v", + pack.Str(), id.Str(), t, offset, length) + + idx.store(t, id, pack, offset, length, false) +} + +// Remove removes the pack ID from the index. +func (idx *Index) Remove(packID backend.ID) { + idx.m.Lock() + defer idx.m.Unlock() + + debug.Log("Index.Remove", "id %v removed", packID.Str()) + + s := packID.String() + if _, ok := idx.pack[s]; ok { + delete(idx.pack, s) + } +} + +// Lookup returns the pack for the id. +func (idx *Index) Lookup(id backend.ID) (packID backend.ID, tpe pack.BlobType, offset, length uint, err error) { + idx.m.Lock() + defer idx.m.Unlock() + + if p, ok := idx.pack[id.String()]; ok { + debug.Log("Index.Lookup", "id %v found in pack %v at %d, length %d", + id.Str(), p.packID.Str(), p.offset, p.length) + return p.packID, p.tpe, p.offset, p.length, nil + } + + debug.Log("Index.Lookup", "id %v not found", id.Str()) + return nil, pack.Data, 0, 0, errors.New("id not found") +} + +// Has returns true iff the id is listed in the index. +func (idx *Index) Has(id backend.ID) bool { + _, _, _, _, err := idx.Lookup(id) + if err == nil { + return true + } + + return false +} + +// Merge loads all items from other into idx. +func (idx *Index) Merge(other *Index) { + debug.Log("Index.Merge", "Merge index with %p", other) + idx.m.Lock() + defer idx.m.Unlock() + + for k, v := range other.pack { + if _, ok := idx.pack[k]; ok { + debug.Log("Index.Merge", "index already has key %v, updating", k[:8]) + } + + idx.pack[k] = v + } + debug.Log("Index.Merge", "done merging index") +} + +// Each returns a channel that yields all blobs known to the index. If done is +// closed, the background goroutine terminates. This blocks any modification of +// the index. +func (idx *Index) Each(done chan struct{}) <-chan pack.Blob { + idx.m.Lock() + + ch := make(chan pack.Blob) + + go func() { + defer idx.m.Unlock() + defer func() { + close(ch) + }() + + for ids, blob := range idx.pack { + id, err := backend.ParseID(ids) + if err != nil { + // ignore invalid IDs + continue + } + + select { + case <-done: + return + case ch <- pack.Blob{ + ID: id, + Offset: blob.offset, + Type: blob.tpe, + Length: uint32(blob.length), + }: + } + } + }() + + return ch +} + +// Count returns the number of blobs of type t in the index. +func (idx *Index) Count(t pack.BlobType) (n uint) { + debug.Log("Index.Count", "counting blobs of type %v", t) + idx.m.Lock() + defer idx.m.Unlock() + + for id, blob := range idx.pack { + if blob.tpe == t { + n++ + debug.Log("Index.Count", " blob %v counted: %v", id[:8], blob) + } + } + + return +} + +type packJSON struct { + ID string `json:"id"` + Blobs []blobJSON `json:"blobs"` +} + +type blobJSON struct { + ID string `json:"id"` + Type pack.BlobType `json:"type"` + Offset uint `json:"offset"` + Length uint `json:"length"` +} + +// Encode writes the JSON serialization of the index to the writer w. This +// serialization only contains new blobs added via idx.Store(), not old ones +// introduced via DecodeIndex(). +func (idx *Index) Encode(w io.Writer) error { + debug.Log("Index.Encode", "encoding index") + idx.m.Lock() + defer idx.m.Unlock() + + list := []*packJSON{} + packs := make(map[string]*packJSON) + + for id, blob := range idx.pack { + if blob.old { + continue + } + + debug.Log("Index.Encode", "handle blob %q", id[:8]) + + if blob.packID == nil { + debug.Log("Index.Encode", "blob %q has no packID! (type %v, offset %v, length %v)", + id[:8], blob.tpe, blob.offset, blob.length) + return fmt.Errorf("unable to serialize index: pack for blob %v hasn't been written yet", id) + } + + // see if pack is already in map + p, ok := packs[blob.packID.String()] + if !ok { + // else create new pack + p = &packJSON{ID: blob.packID.String()} + + // and append it to the list and map + list = append(list, p) + packs[p.ID] = p + } + + // add blob + p.Blobs = append(p.Blobs, blobJSON{ + ID: id, + Type: blob.tpe, + Offset: blob.offset, + Length: blob.length, + }) + } + + debug.Log("Index.Encode", "done") + + enc := json.NewEncoder(w) + return enc.Encode(list) +} + +// DecodeIndex loads and unserializes an index from rd. +func DecodeIndex(rd io.Reader) (*Index, error) { + debug.Log("Index.DecodeIndex", "Start decoding index") + list := []*packJSON{} + + dec := json.NewDecoder(rd) + err := dec.Decode(&list) + if err != nil { + return nil, err + } + + idx := NewIndex() + for _, pack := range list { + packID, err := backend.ParseID(pack.ID) + if err != nil { + debug.Log("Index.DecodeIndex", "error parsing pack ID %q: %v", pack.ID, err) + return nil, err + } + + for _, blob := range pack.Blobs { + blobID, err := backend.ParseID(blob.ID) + if err != nil { + debug.Log("Index.DecodeIndex", "error parsing blob ID %q: %v", blob.ID, err) + return nil, err + } + + idx.store(blob.Type, blobID, packID, blob.Offset, blob.Length, true) + } + } + + debug.Log("Index.DecodeIndex", "done") + return idx, err +} diff --git a/server/index_test.go b/server/index_test.go new file mode 100644 index 000000000..149fdc468 --- /dev/null +++ b/server/index_test.go @@ -0,0 +1,225 @@ +package server_test + +import ( + "bytes" + "crypto/rand" + "io" + "testing" + + "github.com/restic/restic/backend" + "github.com/restic/restic/pack" + "github.com/restic/restic/server" + . "github.com/restic/restic/test" +) + +func randomID() backend.ID { + buf := make([]byte, backend.IDSize) + _, err := io.ReadFull(rand.Reader, buf) + if err != nil { + panic(err) + } + return buf +} + +func TestIndexSerialize(t *testing.T) { + type testEntry struct { + id backend.ID + pack backend.ID + tpe pack.BlobType + offset, length uint + } + tests := []testEntry{} + + idx := server.NewIndex() + + // create 50 packs with 20 blobs each + for i := 0; i < 50; i++ { + packID := randomID() + + pos := uint(0) + for j := 0; j < 20; j++ { + id := randomID() + length := uint(i*100 + j) + idx.Store(pack.Data, id, packID, pos, length) + + tests = append(tests, testEntry{ + id: id, + pack: packID, + tpe: pack.Data, + offset: pos, + length: length, + }) + + pos += length + } + } + + wr := bytes.NewBuffer(nil) + err := idx.Encode(wr) + OK(t, err) + + idx2, err := server.DecodeIndex(wr) + OK(t, err) + Assert(t, idx2 != nil, + "nil returned for decoded index") + + wr2 := bytes.NewBuffer(nil) + err = idx2.Encode(wr2) + OK(t, err) + + for _, testBlob := range tests { + packID, tpe, offset, length, err := idx.Lookup(testBlob.id) + OK(t, err) + + Equals(t, testBlob.pack, packID) + Equals(t, testBlob.tpe, tpe) + Equals(t, testBlob.offset, offset) + Equals(t, testBlob.length, length) + + packID, tpe, offset, length, err = idx2.Lookup(testBlob.id) + OK(t, err) + + Equals(t, testBlob.pack, packID) + Equals(t, testBlob.tpe, tpe) + Equals(t, testBlob.offset, offset) + Equals(t, testBlob.length, length) + } + + // add more blobs to idx2 + newtests := []testEntry{} + for i := 0; i < 10; i++ { + packID := randomID() + + pos := uint(0) + for j := 0; j < 10; j++ { + id := randomID() + length := uint(i*100 + j) + idx2.Store(pack.Data, id, packID, pos, length) + + newtests = append(newtests, testEntry{ + id: id, + pack: packID, + tpe: pack.Data, + offset: pos, + length: length, + }) + + pos += length + } + } + + // serialize idx2, unserialize to idx3 + wr3 := bytes.NewBuffer(nil) + err = idx2.Encode(wr3) + OK(t, err) + + idx3, err := server.DecodeIndex(wr3) + OK(t, err) + Assert(t, idx3 != nil, + "nil returned for decoded index") + + // all old blobs must not be present in the index + for _, testBlob := range tests { + _, _, _, _, err := idx3.Lookup(testBlob.id) + Assert(t, err != nil, + "found old id %v in serialized index", testBlob.id.Str()) + } + + // all new blobs must be in the index + for _, testBlob := range newtests { + packID, tpe, offset, length, err := idx3.Lookup(testBlob.id) + OK(t, err) + + Equals(t, testBlob.pack, packID) + Equals(t, testBlob.tpe, tpe) + Equals(t, testBlob.offset, offset) + Equals(t, testBlob.length, length) + } +} + +func TestIndexSize(t *testing.T) { + idx := server.NewIndex() + + packs := 200 + blobs := 100 + for i := 0; i < packs; i++ { + packID := randomID() + + pos := uint(0) + for j := 0; j < blobs; j++ { + id := randomID() + length := uint(i*100 + j) + idx.Store(pack.Data, id, packID, pos, length) + + pos += length + } + } + + wr := bytes.NewBuffer(nil) + + err := idx.Encode(wr) + OK(t, err) + + t.Logf("Index file size for %d blobs in %d packs is %d", blobs*packs, packs, wr.Len()) +} + +// example index serialization from doc/Design.md +var docExample = []byte(` +[ { + "id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c", + "blobs": [ + { + "id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce", + "type": "data", + "offset": 0, + "length": 25 + },{ + "id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae", + "type": "tree", + "offset": 38, + "length": 100 + }, + { + "id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66", + "type": "data", + "offset": 150, + "length": 123 + } + ] +} ] +`) + +var exampleTests = []struct { + id, packID backend.ID + tpe pack.BlobType + offset, length uint +}{ + { + ParseID("3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce"), + ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"), + pack.Data, 0, 25, + }, { + ParseID("9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae"), + ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"), + pack.Tree, 38, 100, + }, { + ParseID("d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66"), + ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"), + pack.Data, 150, 123, + }, +} + +func TestIndexUnserialize(t *testing.T) { + idx, err := server.DecodeIndex(bytes.NewReader(docExample)) + OK(t, err) + + for _, test := range exampleTests { + packID, tpe, offset, length, err := idx.Lookup(test.id) + OK(t, err) + + Equals(t, test.packID, packID) + Equals(t, test.tpe, tpe) + Equals(t, test.offset, offset) + Equals(t, test.length, length) + } +} diff --git a/server/server.go b/server/server.go index d00d47f4f..da1974a99 100644 --- a/server/server.go +++ b/server/server.go @@ -1,24 +1,35 @@ package server import ( + "bytes" "crypto/sha256" "encoding/json" "errors" "fmt" "io" "io/ioutil" + "sync" "github.com/restic/restic/backend" "github.com/restic/restic/chunker" + "github.com/restic/restic/debug" + "github.com/restic/restic/pack" ) type Server struct { be backend.Backend key *Key + idx *Index + + pm sync.Mutex + packs []*pack.Packer } func NewServer(be backend.Backend) *Server { - return &Server{be: be} + return &Server{ + be: be, + idx: NewIndex(), + } } func (s *Server) SetKey(k *Key) { @@ -49,14 +60,15 @@ func (s *Server) PrefixLength(t backend.Type) (int, error) { return backend.PrefixLength(s.be, t) } -// Load tries to load and decrypt content identified by t and blob from the -// backend. If the blob specifies an ID, the decrypted plaintext is checked -// against this ID. The same goes for blob.Size and blob.StorageSize: If they -// are set to a value > 0, this value is checked. -func (s *Server) Load(t backend.Type, blob Blob) ([]byte, error) { - // load data - rd, err := s.be.Get(t, blob.Storage.String()) +// Load tries to load and decrypt content identified by t and id from the +// backend. +func (s *Server) Load(t backend.Type, id backend.ID) ([]byte, error) { + debug.Log("Server.Load", "load %v with id %v", t, id.Str()) + + // load blob from pack + rd, err := s.be.Get(t, id.String()) if err != nil { + debug.Log("Server.Load", "error loading %v: %v", id.Str(), err) return nil, err } @@ -65,58 +77,78 @@ func (s *Server) Load(t backend.Type, blob Blob) ([]byte, error) { return nil, err } - // check hash - if !backend.Hash(buf).Equal(blob.Storage) { - return nil, errors.New("invalid data returned") - } - - // check length - if blob.StorageSize > 0 && len(buf) != int(blob.StorageSize) { - return nil, errors.New("Invalid storage length") - } - - // decrypt - buf, err = s.Decrypt(buf) + err = rd.Close() if err != nil { return nil, err } - // check length - if blob.Size > 0 && len(buf) != int(blob.Size) { - return nil, errors.New("Invalid length") + // check hash + if !backend.Hash(buf).Equal(id) { + return nil, errors.New("invalid data returned") } - // check SHA256 sum - if blob.ID != nil { - id := backend.Hash(buf) - if !blob.ID.Equal(id) { - return nil, fmt.Errorf("load %v: expected plaintext hash %v, got %v", blob.Storage, blob.ID, id) - } - } - - return buf, nil -} - -// Load tries to load and decrypt content identified by t and id from the backend. -func (s *Server) LoadID(t backend.Type, storageID backend.ID) ([]byte, error) { - return s.Load(t, Blob{Storage: storageID}) -} - -// LoadJSON calls Load() to get content from the backend and afterwards calls -// json.Unmarshal on the item. -func (s *Server) LoadJSON(t backend.Type, blob Blob, item interface{}) error { - buf, err := s.Load(t, blob) + // decrypt + plain, err := s.Decrypt(buf) if err != nil { - return err + return nil, err } - return json.Unmarshal(buf, item) + return plain, nil } -// LoadJSONID calls Load() to get content from the backend and afterwards calls -// json.Unmarshal on the item. -func (s *Server) LoadJSONID(t backend.Type, id backend.ID, item interface{}) error { - // read +// LoadBlob tries to load and decrypt content identified by t and id from a +// pack from the backend. +func (s *Server) LoadBlob(t pack.BlobType, id backend.ID) ([]byte, error) { + debug.Log("Server.LoadBlob", "load %v with id %v", t, id.Str()) + // lookup pack + packID, tpe, offset, length, err := s.idx.Lookup(id) + if err != nil { + debug.Log("Server.LoadBlob", "id %v not found in index: %v", id.Str(), err) + return nil, err + } + + if tpe != t { + debug.Log("Server.LoadBlob", "wrong type returned for %v: wanted %v, got %v", id.Str(), t, tpe) + return nil, fmt.Errorf("blob has wrong type %v (wanted: %v)", tpe, t) + } + + debug.Log("Server.LoadBlob", "id %v found in pack %v at offset %v (length %d)", id.Str(), packID.Str(), offset, length) + + // load blob from pack + rd, err := s.be.GetReader(backend.Data, packID.String(), offset, length) + if err != nil { + debug.Log("Server.LoadBlob", "error loading pack %v for %v: %v", packID.Str(), id.Str(), err) + return nil, err + } + + buf, err := ioutil.ReadAll(rd) + if err != nil { + return nil, err + } + + err = rd.Close() + if err != nil { + return nil, err + } + + // decrypt + plain, err := s.Decrypt(buf) + if err != nil { + return nil, err + } + + // check hash + if !backend.Hash(plain).Equal(id) { + return nil, errors.New("invalid data returned") + } + + return plain, nil +} + +// LoadJSONEncrypted decrypts the data and afterwards calls json.Unmarshal on +// the item. +func (s *Server) LoadJSONEncrypted(t backend.Type, id backend.ID, item interface{}) error { + // load blob from backend rd, err := s.be.Get(t, id.String()) if err != nil { return err @@ -140,132 +172,254 @@ func (s *Server) LoadJSONID(t backend.Type, id backend.ID, item interface{}) err return nil } -// Save encrypts data and stores it to the backend as type t. -func (s *Server) Save(t backend.Type, data []byte, id backend.ID) (Blob, error) { +// LoadJSONPack calls LoadBlob() to load a blob from the backend, decrypt the +// data and afterwards call json.Unmarshal on the item. +func (s *Server) LoadJSONPack(t pack.BlobType, id backend.ID, item interface{}) error { + // lookup pack + packID, _, offset, length, err := s.idx.Lookup(id) + if err != nil { + return err + } + + // load blob from pack + rd, err := s.be.GetReader(backend.Data, packID.String(), offset, length) + if err != nil { + return err + } + defer rd.Close() + + // decrypt + decryptRd, err := s.key.DecryptFrom(rd) + defer decryptRd.Close() + if err != nil { + return err + } + + // decode + decoder := json.NewDecoder(decryptRd) + err = decoder.Decode(item) + if err != nil { + return err + } + + return nil +} + +const minPackSize = 4 * chunker.MiB +const maxPackSize = 16 * chunker.MiB +const maxPackers = 200 + +// findPacker returns a packer for a new blob of size bytes. Either a new one is +// created or one is returned that already has some blobs. +func (s *Server) findPacker(size uint) (*pack.Packer, error) { + s.pm.Lock() + defer s.pm.Unlock() + + // search for a suitable packer + if len(s.packs) > 0 { + debug.Log("Server.findPacker", "searching packer for %d bytes\n", size) + for i, p := range s.packs { + if p.Size()+size < maxPackSize { + debug.Log("Server.findPacker", "found packer %v", p) + // remove from list + s.packs = append(s.packs[:i], s.packs[i+1:]...) + return p, nil + } + } + } + + // no suitable packer found, return new + blob, err := s.be.Create() + if err != nil { + return nil, err + } + debug.Log("Server.findPacker", "create new pack %p", blob) + return pack.NewPacker(s.key.Master(), blob), nil +} + +// insertPacker appends p to s.packs. +func (s *Server) insertPacker(p *pack.Packer) { + s.pm.Lock() + defer s.pm.Unlock() + + s.packs = append(s.packs, p) + debug.Log("Server.insertPacker", "%d packers\n", len(s.packs)) +} + +// savePacker stores p in the backend. +func (s *Server) savePacker(p *pack.Packer) error { + debug.Log("Server.savePacker", "save packer with %d blobs\n", p.Count()) + _, err := p.Finalize() + if err != nil { + return err + } + + // move file to the final location + sid := p.ID() + err = p.Writer().(backend.Blob).Finalize(backend.Data, sid.String()) + if err != nil { + debug.Log("Server.savePacker", "blob Finalize() error: %v", err) + return err + } + + debug.Log("Server.savePacker", "saved as %v", sid.Str()) + + // update blobs in the index + for _, b := range p.Blobs() { + debug.Log("Server.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str()) + s.idx.Store(b.Type, b.ID, sid, b.Offset, uint(b.Length)) + } + + return nil +} + +// countPacker returns the number of open (unfinished) packers. +func (s *Server) countPacker() int { + s.pm.Lock() + defer s.pm.Unlock() + + return len(s.packs) +} + +// Save encrypts data and stores it to the backend as type t. If data is small +// enough, it will be packed together with other small blobs. +func (s *Server) Save(t pack.BlobType, data []byte, id backend.ID) (backend.ID, error) { if id == nil { // compute plaintext hash id = backend.Hash(data) } - // create a new blob - blob := Blob{ - ID: id, - Size: uint64(len(data)), - } + debug.Log("Server.Save", "save id %v (%v, %d bytes)", id.Str(), t, len(data)) + // get buf from the pool ciphertext := getBuf() defer freeBuf(ciphertext) // encrypt blob ciphertext, err := s.Encrypt(ciphertext, data) if err != nil { - return Blob{}, err + return nil, err } - // compute ciphertext hash - sid := backend.Hash(ciphertext) - - // save blob - backendBlob, err := s.be.Create() + // find suitable packer and add blob + packer, err := s.findPacker(uint(len(ciphertext))) if err != nil { - return Blob{}, err + return nil, err } - _, err = backendBlob.Write(ciphertext) - if err != nil { - return Blob{}, err + // save ciphertext + packer.Add(t, id, bytes.NewReader(ciphertext)) + + // add this id to the index, although we don't know yet in which pack it + // will be saved, the entry will be updated when the pack is written. + s.idx.Store(t, id, nil, 0, 0) + debug.Log("Server.Save", "saving stub for %v (%v) in index", id.Str, t) + + // if the pack is not full enough and there are less than maxPackers + // packers, put back to the list + if packer.Size() < minPackSize && s.countPacker() < maxPackers { + debug.Log("Server.Save", "pack is not full enough (%d bytes)", packer.Size()) + s.insertPacker(packer) + return id, nil } - err = backendBlob.Finalize(t, sid.String()) - if err != nil { - return Blob{}, err - } - - blob.Storage = sid - blob.StorageSize = uint64(len(ciphertext)) - - return blob, nil + // else write the pack to the backend + return id, s.savePacker(packer) } -// SaveFrom encrypts data read from rd and stores it to the backend as type t. -func (s *Server) SaveFrom(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) { +// SaveFrom encrypts data read from rd and stores it in a pack in the backend as type t. +func (s *Server) SaveFrom(t pack.BlobType, id backend.ID, length uint, rd io.Reader) error { + debug.Log("Server.SaveFrom", "save id %v (%v, %d bytes)", id.Str(), t, length) if id == nil { - return Blob{}, errors.New("id is nil") + return errors.New("id is nil") } - backendBlob, err := s.be.Create() + buf, err := ioutil.ReadAll(rd) if err != nil { - return Blob{}, err + return err } - hw := backend.NewHashingWriter(backendBlob, sha256.New()) - encWr := s.key.EncryptTo(hw) - - _, err = io.Copy(encWr, rd) + _, err = s.Save(t, buf, id) if err != nil { - return Blob{}, err + return err } - // finish encryption - err = encWr.Close() - if err != nil { - return Blob{}, fmt.Errorf("EncryptedWriter.Close(): %v", err) - } - - // finish backend blob - sid := backend.ID(hw.Sum(nil)) - err = backendBlob.Finalize(t, sid.String()) - if err != nil { - return Blob{}, fmt.Errorf("backend.Blob.Close(): %v", err) - } - - return Blob{ - ID: id, - Size: uint64(length), - Storage: sid, - StorageSize: uint64(backendBlob.Size()), - }, nil + return nil } -// SaveJSON serialises item as JSON and encrypts and saves it in the backend as -// type t. -func (s *Server) SaveJSON(t backend.Type, item interface{}) (Blob, error) { - backendBlob, err := s.be.Create() +// SaveJSON serialises item as JSON and encrypts and saves it in a pack in the +// backend as type t. +func (s *Server) SaveJSON(t pack.BlobType, item interface{}) (backend.ID, error) { + debug.Log("Server.SaveJSON", "save %v blob", t) + buf := getBuf()[:0] + defer freeBuf(buf) + + wr := bytes.NewBuffer(buf) + + enc := json.NewEncoder(wr) + err := enc.Encode(item) if err != nil { - return Blob{}, fmt.Errorf("Create: %v", err) + return nil, fmt.Errorf("json.Encode: %v", err) } - storagehw := backend.NewHashingWriter(backendBlob, sha256.New()) - encWr := s.key.EncryptTo(storagehw) - plainhw := backend.NewHashingWriter(encWr, sha256.New()) + buf = wr.Bytes() + return s.Save(t, buf, nil) +} - enc := json.NewEncoder(plainhw) +// SaveJSONUnpacked serialises item as JSON and encrypts and saves it in the +// backend as type t, without a pack. It returns the storage hash. +func (s *Server) SaveJSONUnpacked(t backend.Type, item interface{}) (backend.ID, error) { + // create blob + blob, err := s.be.Create() + if err != nil { + return nil, err + } + debug.Log("Server.SaveJSONUnpacked", "create new pack %p", blob) + + // hash + hw := backend.NewHashingWriter(blob, sha256.New()) + + // encrypt blob + ewr := s.key.EncryptTo(hw) + + enc := json.NewEncoder(ewr) err = enc.Encode(item) if err != nil { - return Blob{}, fmt.Errorf("json.NewEncoder: %v", err) + return nil, fmt.Errorf("json.Encode: %v", err) } - // finish encryption - err = encWr.Close() + err = ewr.Close() if err != nil { - return Blob{}, fmt.Errorf("EncryptedWriter.Close(): %v", err) + return nil, err } - // finish backend blob - sid := backend.ID(storagehw.Sum(nil)) - err = backendBlob.Finalize(t, sid.String()) + // finalize blob in the backend + sid := backend.ID(hw.Sum(nil)) + + err = blob.Finalize(t, sid.String()) if err != nil { - return Blob{}, fmt.Errorf("backend.Blob.Close(): %v", err) + return nil, err } - id := backend.ID(plainhw.Sum(nil)) + return sid, nil +} - return Blob{ - ID: id, - Size: uint64(plainhw.Size()), - Storage: sid, - StorageSize: uint64(backendBlob.Size()), - }, nil +// Flush saves all remaining packs. +func (s *Server) Flush() error { + s.pm.Lock() + defer s.pm.Unlock() + + debug.Log("Server.Flush", "manually flushing %d packs", len(s.packs)) + + for _, p := range s.packs { + err := s.savePacker(p) + if err != nil { + return err + } + } + s.packs = s.packs[:0] + + return nil } // Returns the backend used for this server. @@ -273,6 +427,106 @@ func (s *Server) Backend() backend.Backend { return s.be } +// Returns the index of this server. +func (s *Server) Index() *Index { + return s.idx +} + +// SetIndex instructs the server to use the given index. +func (s *Server) SetIndex(i *Index) { + s.idx = i +} + +// SaveIndex saves all new packs in the index in the backend, returned is the +// storage ID. +func (s *Server) SaveIndex() (backend.ID, error) { + debug.Log("Server.SaveIndex", "Saving index") + + // create blob + blob, err := s.be.Create() + if err != nil { + return nil, err + } + + debug.Log("Server.SaveIndex", "create new pack %p", blob) + + // hash + hw := backend.NewHashingWriter(blob, sha256.New()) + + // encrypt blob + ewr := s.key.EncryptTo(hw) + + err = s.idx.Encode(ewr) + if err != nil { + return nil, err + } + + err = ewr.Close() + if err != nil { + return nil, err + } + + // finalize blob in the backend + sid := backend.ID(hw.Sum(nil)) + + err = blob.Finalize(backend.Index, sid.String()) + if err != nil { + return nil, err + } + + debug.Log("Server.SaveIndex", "Saved index as %v", sid.Str()) + + return sid, nil +} + +// LoadIndex loads all index files from the backend and merges them with the +// current index. +func (s *Server) LoadIndex() error { + debug.Log("Server.LoadIndex", "Loading index") + done := make(chan struct{}) + defer close(done) + + for id := range s.be.List(backend.Index, done) { + err := s.loadIndex(id) + if err != nil { + return err + } + } + return nil +} + +// loadIndex loads the index id and merges it with the currently used index. +func (s *Server) loadIndex(id string) error { + debug.Log("Server.loadIndex", "Loading index %v", id[:8]) + before := len(s.idx.pack) + + rd, err := s.be.Get(backend.Index, id) + defer rd.Close() + if err != nil { + return err + } + + // decrypt + decryptRd, err := s.key.DecryptFrom(rd) + defer decryptRd.Close() + if err != nil { + return err + } + + idx, err := DecodeIndex(decryptRd) + if err != nil { + debug.Log("Server.loadIndex", "error while decoding index %v: %v", id, err) + return err + } + + s.idx.Merge(idx) + + after := len(s.idx.pack) + debug.Log("Server.loadIndex", "Loaded index %v, added %v blobs", id[:8], after-before) + + return nil +} + func (s *Server) SearchKey(password string) error { key, err := SearchKey(s, password) if err != nil { @@ -289,7 +543,7 @@ func (s *Server) Decrypt(ciphertext []byte) ([]byte, error) { return nil, errors.New("key for server not set") } - return s.key.Decrypt([]byte{}, ciphertext) + return s.key.Decrypt(nil, ciphertext) } func (s *Server) Encrypt(ciphertext, plaintext []byte) ([]byte, error) { @@ -305,8 +559,8 @@ func (s *Server) Key() *Key { } // Count returns the number of blobs of a given type in the backend. -func (s *Server) Count(t backend.Type) (n int) { - for _ = range s.List(t, nil) { +func (s *Server) Count(t backend.Type) (n uint) { + for _ = range s.be.List(t, nil) { n++ } diff --git a/server/server_test.go b/server/server_test.go index 414b4f0cc..51556ee94 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -11,6 +11,7 @@ import ( "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/pack" . "github.com/restic/restic/test" ) @@ -38,12 +39,12 @@ func TestSaveJSON(t *testing.T) { data = append(data, '\n') h := sha256.Sum256(data) - blob, err := server.SaveJSON(backend.Tree, obj) + id, err := server.SaveJSON(pack.Tree, obj) OK(t, err) - Assert(t, bytes.Equal(h[:], blob.ID), + Assert(t, bytes.Equal(h[:], id), "TestSaveJSON: wrong plaintext ID: expected %02x, got %02x", - h, blob.ID) + h, id) } } @@ -63,17 +64,51 @@ func BenchmarkSaveJSON(t *testing.B) { t.ResetTimer() for i := 0; i < t.N; i++ { - blob, err := server.SaveJSON(backend.Tree, obj) + id, err := server.SaveJSON(pack.Tree, obj) OK(t, err) - Assert(t, bytes.Equal(h[:], blob.ID), + Assert(t, bytes.Equal(h[:], id), "TestSaveJSON: wrong plaintext ID: expected %02x, got %02x", - h, blob.ID) + h, id) } } var testSizes = []int{5, 23, 2<<18 + 23, 1 << 20} +func TestSave(t *testing.T) { + server := SetupBackend(t) + defer TeardownBackend(t, server) + key := SetupKey(t, server, "geheim") + server.SetKey(key) + + for _, size := range testSizes { + data := make([]byte, size) + _, err := io.ReadFull(rand.Reader, data) + OK(t, err) + + id := backend.Hash(data) + + // save + sid, err := server.Save(pack.Data, data, nil) + OK(t, err) + + Equals(t, id, sid) + + OK(t, server.Flush()) + + // read back + buf, err := server.LoadBlob(pack.Data, id) + + Assert(t, len(buf) == len(data), + "number of bytes read back does not match: expected %d, got %d", + len(data), len(buf)) + + Assert(t, bytes.Equal(buf, data), + "data does not match: expected %02x, got %02x", + data, buf) + } +} + func TestSaveFrom(t *testing.T) { server := SetupBackend(t) defer TeardownBackend(t, server) @@ -85,14 +120,16 @@ func TestSaveFrom(t *testing.T) { _, err := io.ReadFull(rand.Reader, data) OK(t, err) - id := sha256.Sum256(data) + id := backend.Hash(data) // save - blob, err := server.SaveFrom(backend.Data, id[:], uint(size), bytes.NewReader(data)) + err = server.SaveFrom(pack.Data, id[:], uint(size), bytes.NewReader(data)) OK(t, err) + OK(t, server.Flush()) + // read back - buf, err := server.Load(backend.Data, blob) + buf, err := server.LoadBlob(pack.Data, id[:]) Assert(t, len(buf) == len(data), "number of bytes read back does not match: expected %d, got %d", @@ -123,12 +160,12 @@ func BenchmarkSaveFrom(t *testing.B) { for i := 0; i < t.N; i++ { // save - _, err := server.SaveFrom(backend.Data, id[:], uint(size), bytes.NewReader(data)) + err = server.SaveFrom(pack.Data, id[:], uint(size), bytes.NewReader(data)) OK(t, err) } } -func TestLoadJSONID(t *testing.T) { +func TestLoadJSONPack(t *testing.T) { if *benchTestDir == "" { t.Skip("benchdir not set, skipping TestServerStats") } @@ -140,23 +177,14 @@ func TestLoadJSONID(t *testing.T) { // archive a few files sn := SnapshotDir(t, server, *benchTestDir, nil) - t.Logf("archived snapshot %v", sn.ID()) - - // benchmark loading first tree - done := make(chan struct{}) - first, found := <-server.List(backend.Tree, done) - Assert(t, found, "no Trees in repository found") - close(done) - - id, err := backend.ParseID(first) - OK(t, err) + OK(t, server.Flush()) tree := restic.NewTree() - err = server.LoadJSONID(backend.Tree, id, &tree) + err := server.LoadJSONPack(pack.Tree, sn.Tree, &tree) OK(t, err) } -func BenchmarkLoadJSONID(t *testing.B) { +func TestLoadJSONEncrypted(t *testing.T) { if *benchTestDir == "" { t.Skip("benchdir not set, skipping TestServerStats") } @@ -166,18 +194,20 @@ func BenchmarkLoadJSONID(t *testing.B) { key := SetupKey(t, server, "geheim") server.SetKey(key) - // archive a few files - sn := SnapshotDir(t, server, *benchTestDir, nil) - t.Logf("archived snapshot %v", sn.ID()) + // archive a snapshot + sn := restic.Snapshot{} + sn.Hostname = "foobar" + sn.Username = "test!" - t.ResetTimer() + id, err := server.SaveJSONUnpacked(backend.Snapshot, &sn) + OK(t, err) - tree := restic.NewTree() - for i := 0; i < t.N; i++ { - for name := range server.List(backend.Tree, nil) { - id, err := backend.ParseID(name) - OK(t, err) - OK(t, server.LoadJSONID(backend.Tree, id, &tree)) - } - } + var sn2 restic.Snapshot + + // restore + err = server.LoadJSONEncrypted(backend.Snapshot, id, &sn2) + OK(t, err) + + Equals(t, sn.Hostname, sn2.Hostname) + Equals(t, sn.Username, sn2.Username) } diff --git a/snapshot.go b/snapshot.go index eaaf94ec4..e8ac96bba 100644 --- a/snapshot.go +++ b/snapshot.go @@ -11,14 +11,14 @@ import ( ) type Snapshot struct { - Time time.Time `json:"time"` - Parent backend.ID `json:"parent,omitempty"` - Tree server.Blob `json:"tree"` - Paths []string `json:"paths"` - Hostname string `json:"hostname,omitempty"` - Username string `json:"username,omitempty"` - UID uint32 `json:"uid,omitempty"` - GID uint32 `json:"gid,omitempty"` + Time time.Time `json:"time"` + Parent backend.ID `json:"parent,omitempty"` + Tree backend.ID `json:"tree"` + Paths []string `json:"paths"` + Hostname string `json:"hostname,omitempty"` + Username string `json:"username,omitempty"` + UID uint32 `json:"uid,omitempty"` + GID uint32 `json:"gid,omitempty"` id backend.ID // plaintext ID, used during restore } @@ -50,7 +50,7 @@ func NewSnapshot(paths []string) (*Snapshot, error) { func LoadSnapshot(s *server.Server, id backend.ID) (*Snapshot, error) { sn := &Snapshot{id: id} - err := s.LoadJSONID(backend.Snapshot, id, sn) + err := s.LoadJSONEncrypted(backend.Snapshot, id, sn) if err != nil { return nil, err } diff --git a/test/backend.go b/test/backend.go index f2cc8dae6..7dc517ae2 100644 --- a/test/backend.go +++ b/test/backend.go @@ -52,7 +52,6 @@ func SetupKey(t testing.TB, s *server.Server, password string) *server.Key { func SnapshotDir(t testing.TB, server *server.Server, path string, parent backend.ID) *restic.Snapshot { arch, err := restic.NewArchiver(server) OK(t, err) - OK(t, arch.Preload()) sn, _, err := arch.Snapshot(nil, []string{path}, parent) OK(t, err) return sn diff --git a/test/helpers.go b/test/helpers.go index d7fe0eb16..a3e5a743c 100644 --- a/test/helpers.go +++ b/test/helpers.go @@ -39,7 +39,7 @@ func Equals(tb testing.TB, exp, act interface{}) { } } -func Str2ID(s string) backend.ID { +func ParseID(s string) backend.ID { id, err := backend.ParseID(s) if err != nil { panic(err) diff --git a/tree.go b/tree.go index b835e2118..d621ced6f 100644 --- a/tree.go +++ b/tree.go @@ -7,12 +7,12 @@ import ( "github.com/restic/restic/backend" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/server" ) type Tree struct { Nodes []*Node `json:"nodes"` - Map *Map `json:"map"` } var ( @@ -23,17 +23,16 @@ var ( func NewTree() *Tree { return &Tree{ Nodes: []*Node{}, - Map: NewMap(), } } func (t Tree) String() string { - return fmt.Sprintf("Tree<%d nodes, %d blobs>", len(t.Nodes), len(t.Map.list)) + return fmt.Sprintf("Tree<%d nodes>", len(t.Nodes)) } -func LoadTree(s *server.Server, blob server.Blob) (*Tree, error) { +func LoadTree(s *server.Server, id backend.ID) (*Tree, error) { tree := &Tree{} - err := s.LoadJSON(backend.Tree, blob, tree) + err := s.LoadJSONPack(pack.Tree, id, tree) if err != nil { return nil, err } @@ -41,18 +40,13 @@ func LoadTree(s *server.Server, blob server.Blob) (*Tree, error) { return tree, nil } -// Equals returns true if t and other have exactly the same nodes and map. -func (t Tree) Equals(other Tree) bool { +// Equals returns true if t and other have exactly the same nodes. +func (t Tree) Equals(other *Tree) bool { if len(t.Nodes) != len(other.Nodes) { debug.Log("Tree.Equals", "tree.Equals(): trees have different number of nodes") return false } - if !t.Map.Equals(other.Map) { - debug.Log("Tree.Equals", "tree.Equals(): maps aren't equal") - return false - } - for i := 0; i < len(t.Nodes); i++ { if !t.Nodes[i].Equals(*other.Nodes[i]) { debug.Log("Tree.Equals", "tree.Equals(): node %d is different:", i) diff --git a/tree_test.go b/tree_test.go index 89a28b698..db9653d19 100644 --- a/tree_test.go +++ b/tree_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/restic/restic" + "github.com/restic/restic/pack" . "github.com/restic/restic/test" ) @@ -91,3 +92,26 @@ func TestNodeComparison(t *testing.T) { n2.Size -= 1 Assert(t, !node.Equals(n2), "nodes are equal") } + +func TestLoadTree(t *testing.T) { + server := SetupBackend(t) + defer TeardownBackend(t, server) + key := SetupKey(t, server, "geheim") + server.SetKey(key) + + // save tree + tree := restic.NewTree() + id, err := server.SaveJSON(pack.Tree, tree) + OK(t, err) + + // save packs + OK(t, server.Flush()) + + // load tree again + tree2, err := restic.LoadTree(server, id) + OK(t, err) + + Assert(t, tree.Equals(tree2), + "trees are not equal: want %v, got %v", + tree, tree2) +} diff --git a/walk.go b/walk.go index 2bd3373de..569e5820c 100644 --- a/walk.go +++ b/walk.go @@ -3,6 +3,7 @@ package restic import ( "path/filepath" + "github.com/restic/restic/backend" "github.com/restic/restic/debug" "github.com/restic/restic/server" ) @@ -15,10 +16,10 @@ type WalkTreeJob struct { Tree *Tree } -func walkTree(s *server.Server, path string, treeBlob server.Blob, done chan struct{}, jobCh chan<- WalkTreeJob) { - debug.Log("walkTree", "start on %q (%v)", path, treeBlob) +func walkTree(s *server.Server, path string, treeID backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { + debug.Log("walkTree", "start on %q (%v)", path, treeID.Str()) // load tree - t, err := LoadTree(s, treeBlob) + t, err := LoadTree(s, treeID) if err != nil { jobCh <- WalkTreeJob{Path: path, Error: err} return @@ -27,32 +28,22 @@ func walkTree(s *server.Server, path string, treeBlob server.Blob, done chan str for _, node := range t.Nodes { p := filepath.Join(path, node.Name) if node.Type == "dir" { - blob, err := t.Map.FindID(node.Subtree) - if err != nil { - jobCh <- WalkTreeJob{Path: p, Error: err} - continue - } - walkTree(s, p, blob, done, jobCh) + walkTree(s, p, node.Subtree, done, jobCh) } else { - // load old blobs - node.blobs, err = t.Map.Select(node.Content) - if err != nil { - debug.Log("walkTree", "unable to load bobs for %q (%v): %v", path, treeBlob, err) - } jobCh <- WalkTreeJob{Path: p, Node: node, Error: err} } } jobCh <- WalkTreeJob{Path: filepath.Join(path), Tree: t} - debug.Log("walkTree", "done for %q (%v)", path, treeBlob) + debug.Log("walkTree", "done for %q (%v)", path, treeID.Str()) } // WalkTree walks the tree specified by ID recursively and sends a job for each // file and directory it finds. When the channel done is closed, processing // stops. -func WalkTree(server *server.Server, blob server.Blob, done chan struct{}, jobCh chan<- WalkTreeJob) { - debug.Log("WalkTree", "start on %v", blob) - walkTree(server, "", blob, done, jobCh) +func WalkTree(server *server.Server, id backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { + debug.Log("WalkTree", "start on %v", id.Str()) + walkTree(server, "", id, done, jobCh) close(jobCh) debug.Log("WalkTree", "done") } diff --git a/walk_test.go b/walk_test.go index bce1da765..170742d48 100644 --- a/walk_test.go +++ b/walk_test.go @@ -27,6 +27,9 @@ func TestWalkTree(t *testing.T) { sn, _, err := arch.Snapshot(nil, dirs, nil) OK(t, err) + // flush server, write all packs + OK(t, server.Flush()) + // start benchmark // t.ResetTimer() @@ -48,6 +51,9 @@ func TestWalkTree(t *testing.T) { fsJob, fsChOpen := <-fsJobs Assert(t, !fsChOpen || fsJob != nil, "received nil job from filesystem: %v %v", fsJob, fsChOpen) + if fsJob != nil { + OK(t, fsJob.Error()) + } var path string fsEntries := 1 @@ -63,6 +69,8 @@ func TestWalkTree(t *testing.T) { treeJob, treeChOpen := <-treeJobs treeEntries := 1 + OK(t, treeJob.Error) + if treeJob.Tree != nil { treeEntries = len(treeJob.Tree.Nodes) }