Merge pull request #310 from restic/resume-backups

resume interrupted backups
This commit is contained in:
Alexander Neumann 2015-10-14 21:53:25 +02:00
commit 5de36dfdf0
14 changed files with 840 additions and 244 deletions

View file

@ -9,6 +9,7 @@ import (
"path/filepath"
"sort"
"sync"
"time"
"github.com/restic/chunker"
"github.com/restic/restic/backend"
@ -87,7 +88,7 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) {
// check if tree has been saved before
id := backend.Hash(data)
if arch.repo.Index().Has(id) {
if arch.repo.Index().IsInFlight(id) || arch.repo.Index().Has(id) {
return id, nil
}
@ -540,6 +541,30 @@ func (j archiveJob) Copy() pipe.Job {
return j.new
}
const saveIndexTime = 30 * time.Second
// saveIndexes regularly queries the master index for full indexes and saves them.
func (arch *Archiver) saveIndexes(wg *sync.WaitGroup, done <-chan struct{}) {
defer wg.Done()
ticker := time.NewTicker(saveIndexTime)
defer ticker.Stop()
for {
select {
case <-done:
return
case <-ticker.C:
debug.Log("Archiver.saveIndexes", "saving full indexes")
err := arch.repo.SaveFullIndex()
if err != nil {
debug.Log("Archiver.saveIndexes", "save indexes returned an error: %v", err)
fmt.Fprintf(os.Stderr, "error saving preliminary index: %v\n", err)
}
}
}
}
// Snapshot creates a snapshot of the given paths. If parentID is set, this is
// used to compare the files to the ones archived at the time this snapshot was
// taken.
@ -623,10 +648,20 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentID *backend.ID
go arch.dirWorker(&wg, p, done, dirCh)
}
// run index saver
var wgIndexSaver sync.WaitGroup
stopIndexSaver := make(chan struct{})
wgIndexSaver.Add(1)
go arch.saveIndexes(&wgIndexSaver, stopIndexSaver)
// wait for all workers to terminate
debug.Log("Archiver.Snapshot", "wait for workers")
wg.Wait()
// stop index saver
close(stopIndexSaver)
wgIndexSaver.Wait()
debug.Log("Archiver.Snapshot", "workers terminated")
// receive the top-level tree
@ -651,13 +686,13 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentID *backend.ID
}
// save index
indexID, err := arch.repo.SaveIndex()
err = arch.repo.SaveIndex()
if err != nil {
debug.Log("Archiver.Snapshot", "error saving index: %v", err)
return nil, backend.ID{}, err
}
debug.Log("Archiver.Snapshot", "saved index %v", indexID.Str())
debug.Log("Archiver.Snapshot", "saved indexes")
return sn, id, nil
}
@ -681,7 +716,6 @@ func Scan(dirs []string, filter pipe.SelectFunc, p *Progress) (Stat, error) {
for _, dir := range dirs {
debug.Log("Scan", "Start for %v", dir)
err := filepath.Walk(dir, func(str string, fi os.FileInfo, err error) error {
debug.Log("Scan.Walk", "%v, fi: %v, err: %v", str, fi, err)
// TODO: integrate error reporting
if err != nil {
fmt.Fprintf(os.Stderr, "error for %v: %v\n", str, err)

View file

@ -209,15 +209,17 @@ func BenchmarkLoadTree(t *testing.B) {
list := make([]backend.ID, 0, 10)
done := make(chan struct{})
for blob := range repo.Index().Each(done) {
if blob.Type != pack.Tree {
continue
}
for _, idx := range repo.Index().All() {
for blob := range idx.Each(done) {
if blob.Type != pack.Tree {
continue
}
list = append(list, blob.ID)
if len(list) == cap(list) {
close(done)
break
list = append(list, blob.ID)
if len(list) == cap(list) {
close(done)
break
}
}
}

View file

@ -3,6 +3,7 @@ package checker
import (
"errors"
"fmt"
"os"
"sync"
"github.com/restic/restic"
@ -26,7 +27,7 @@ type Checker struct {
indexes map[backend.ID]*repository.Index
orphanedPacks backend.IDs
masterIndex *repository.Index
masterIndex *repository.MasterIndex
repo *repository.Repository
}
@ -36,7 +37,7 @@ func New(repo *repository.Repository) *Checker {
c := &Checker{
packs: make(map[backend.ID]struct{}),
blobs: make(map[backend.ID]struct{}),
masterIndex: repository.NewIndex(),
masterIndex: repository.NewMasterIndex(),
indexes: make(map[backend.ID]*repository.Index),
repo: repo,
}
@ -58,15 +59,26 @@ func (c *Checker) LoadIndex() error {
indexCh := make(chan indexRes)
worker := func(id string, done <-chan struct{}) error {
worker := func(id backend.ID, done <-chan struct{}) error {
debug.Log("LoadIndex", "worker got index %v", id)
idx, err := repository.LoadIndex(c.repo, id)
idx, err := repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeIndex)
if err == repository.ErrOldIndexFormat {
debug.Log("LoadIndex", "old index format found, converting")
fmt.Fprintf(os.Stderr, "convert index %v to new format\n", id.Str())
id, err = repository.ConvertIndex(c.repo, id)
if err != nil {
return err
}
idx, err = repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeIndex)
}
if err != nil {
return err
}
select {
case indexCh <- indexRes{Index: idx, ID: id}:
case indexCh <- indexRes{Index: idx, ID: id.String()}:
case <-done:
}
@ -77,7 +89,8 @@ func (c *Checker) LoadIndex() error {
go func() {
defer close(indexCh)
debug.Log("LoadIndex", "start loading indexes in parallel")
perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, defaultParallelism, worker)
perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, defaultParallelism,
repository.ParallelWorkFuncParseID(worker))
debug.Log("LoadIndex", "loading indexes finished, error: %v", perr)
}()
@ -92,7 +105,7 @@ func (c *Checker) LoadIndex() error {
}
c.indexes[id] = res.Index
c.masterIndex.Merge(res.Index)
c.masterIndex.Insert(res.Index)
debug.Log("LoadIndex", "process blobs")
cnt := 0

View file

@ -91,8 +91,8 @@ func TestUnreferencedPack(t *testing.T) {
WithTestEnvironment(t, checkerTestData, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
// index 8eb5 only references pack 60e0
indexID := "8eb5b61062bf8e959f244fba0c971108bc8d4d2a4b236f71a704998e28cc5cf6"
// index 3f1a only references pack 60e0
indexID := "3f1abfcb79c6f7d0a3be517d2c83c8562fba64ef2c8e9a3544b4edaf8b5e3b44"
packID := "60e0438dcb978ec6860cc1f8c43da648170ee9129af8f650f876bad19f8f788e"
OK(t, repo.Backend().Remove(backend.Index, indexID))

Binary file not shown.

View file

@ -17,6 +17,8 @@ import (
type CmdDump struct {
global *GlobalOptions
repo *repository.Repository
}
func init() {
@ -71,12 +73,14 @@ func printTrees(repo *repository.Repository, wr io.Writer) error {
trees := []backend.ID{}
for blob := range repo.Index().Each(done) {
if blob.Type != pack.Tree {
continue
}
for _, idx := range repo.Index().All() {
for blob := range idx.Each(nil) {
if blob.Type != pack.Tree {
continue
}
trees = append(trees, blob.ID)
trees = append(trees, blob.ID)
}
}
for _, id := range trees {
@ -94,6 +98,27 @@ func printTrees(repo *repository.Repository, wr io.Writer) error {
return nil
}
func (cmd CmdDump) DumpIndexes() error {
done := make(chan struct{})
defer close(done)
for id := range cmd.repo.List(backend.Index, done) {
fmt.Printf("index_id: %v\n", id)
idx, err := repository.LoadIndex(cmd.repo, id.String())
if err != nil {
return err
}
err = idx.Dump(os.Stdout)
if err != nil {
return err
}
}
return nil
}
func (cmd CmdDump) Execute(args []string) error {
if len(args) != 1 {
return fmt.Errorf("type not specified, Usage: %s", cmd.Usage())
@ -103,6 +128,7 @@ func (cmd CmdDump) Execute(args []string) error {
if err != nil {
return err
}
cmd.repo = repo
lock, err := lockRepo(repo)
defer unlockRepo(lock)
@ -118,8 +144,8 @@ func (cmd CmdDump) Execute(args []string) error {
tpe := args[0]
switch tpe {
case "index":
return repo.Index().Dump(os.Stdout)
case "indexes":
return cmd.DumpIndexes()
case "snapshots":
return printSnapshots(repo, os.Stdout)
case "trees":
@ -138,9 +164,8 @@ func (cmd CmdDump) Execute(args []string) error {
return err
}
fmt.Printf("\nindex:\n")
err = repo.Index().Dump(os.Stdout)
fmt.Printf("\nindexes:\n")
err = cmd.DumpIndexes()
if err != nil {
return err
}

View file

@ -49,8 +49,10 @@ func (cmd CmdList) Execute(args []string) error {
return err
}
for blob := range repo.Index().Each(nil) {
cmd.global.Printf("%s\n", blob.ID)
for _, idx := range repo.Index().All() {
for blob := range idx.Each(nil) {
cmd.global.Printf("%s\n", blob.ID)
}
}
return nil

View file

@ -164,7 +164,7 @@ Data and Tree Blobs, so the outer structure is `IV || Ciphertext || MAC` again.
The plaintext consists of a JSON document like the following:
{
"obsolete": [
"supersedes": [
"ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452"
],
"packs": [
@ -197,10 +197,9 @@ This JSON document lists Packs and the blobs contained therein. In this
example, the Pack `73d04e61` contains two data Blobs and one Tree blob, the
plaintext hashes are listed afterwards.
The field `obsolete` lists the storage IDs of index files that have been
The field `supersedes` lists the storage IDs of index files that have been
replaced with the current index file. This happens when index files are
repacked, this happens for example when old snapshots are removed and Packs are
recombined.
repacked, for example when old snapshots are removed and Packs are recombined.
There may be an arbitrary number of index files, containing information on
non-disjoint sets of Packs. The number of packs described in a single file is

View file

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sync"
"time"
"github.com/restic/restic/backend"
"github.com/restic/restic/crypto"
@ -17,6 +18,10 @@ import (
type Index struct {
m sync.Mutex
pack map[backend.ID]indexEntry
final bool // set to true for all indexes read from the backend ("finalized")
supersedes backend.IDs
created time.Time
}
type indexEntry struct {
@ -24,68 +29,84 @@ type indexEntry struct {
packID *backend.ID
offset uint
length uint
old bool
}
// NewIndex returns a new index.
func NewIndex() *Index {
return &Index{
pack: make(map[backend.ID]indexEntry),
pack: make(map[backend.ID]indexEntry),
created: time.Now(),
}
}
func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint, old bool) {
func (idx *Index) store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) {
idx.pack[id] = indexEntry{
tpe: t,
packID: pack,
offset: offset,
length: length,
old: old,
}
}
// Final returns true iff the index is already written to the repository, it is
// finalized.
func (idx *Index) Final() bool {
idx.m.Lock()
defer idx.m.Unlock()
return idx.final
}
const (
indexMinBlobs = 20
indexMaxBlobs = 2000
indexMinAge = 2 * time.Minute
indexMaxAge = 15 * time.Minute
)
// Full returns true iff the index is "full enough" to be saved as a preliminary index.
func (idx *Index) Full() bool {
idx.m.Lock()
defer idx.m.Unlock()
debug.Log("Index.Full", "checking whether index %p is full", idx)
packs := len(idx.pack)
age := time.Now().Sub(idx.created)
if age > indexMaxAge {
debug.Log("Index.Full", "index %p is old enough", idx, age)
return true
}
if packs < indexMinBlobs || age < indexMinAge {
debug.Log("Index.Full", "index %p only has %d packs or is too young (%v)", idx, packs, age)
return false
}
if packs > indexMaxBlobs {
debug.Log("Index.Full", "index %p has %d packs", idx, packs)
return true
}
debug.Log("Index.Full", "index %p is not full", idx)
return false
}
// Store remembers the id and pack in the index. An existing entry will be
// silently overwritten.
func (idx *Index) Store(t pack.BlobType, id backend.ID, pack *backend.ID, offset, length uint) {
idx.m.Lock()
defer idx.m.Unlock()
if idx.final {
panic("store new item in finalized index")
}
debug.Log("Index.Store", "pack %v contains id %v (%v), offset %v, length %v",
pack.Str(), id.Str(), t, offset, length)
idx.store(t, id, pack, offset, length, false)
}
// StoreInProgress adds a preliminary index entry for a blob that is about to be
// saved. The entry must be updated using Store once the the blob has been
// written to a pack. Adding an preliminary index fails if there's an existing
// entry associated with the same id.
func (idx *Index) StoreInProgress(t pack.BlobType, id backend.ID) error {
idx.m.Lock()
defer idx.m.Unlock()
if _, hasID := idx.pack[id]; hasID {
errorMsg := fmt.Sprintf("index already contains id %v (%v)", id.Str(), t)
debug.Log("Index.StoreInProgress", errorMsg)
return errors.New(errorMsg)
}
idx.store(t, id, nil, 0, 0, false)
debug.Log("Index.StoreInProgress", "preliminary entry added for id %v (%v)",
id.Str(), t)
return nil
}
// Remove removes the pack ID from the index.
func (idx *Index) Remove(packID backend.ID) {
idx.m.Lock()
defer idx.m.Unlock()
debug.Log("Index.Remove", "id %v removed", packID.Str())
if _, ok := idx.pack[packID]; ok {
delete(idx.pack, packID)
}
idx.store(t, id, pack, offset, length)
}
// Lookup returns the pack for the id.
@ -139,6 +160,11 @@ func (idx *Index) Merge(other *Index) {
debug.Log("Index.Merge", "done merging index")
}
// Supersedes returns the list of indexes this index supersedes, if any.
func (idx *Index) Supersedes() backend.IDs {
return idx.supersedes
}
// PackedBlob is a blob already saved within a pack.
type PackedBlob struct {
pack.Blob
@ -256,30 +282,48 @@ func (idx *Index) generatePackList(selectFn func(indexEntry) bool) ([]*packJSON,
return list, nil
}
// encode writes the JSON serialization of the index filtered by selectFn to enc.
func (idx *Index) encode(w io.Writer, selectFn func(indexEntry) bool) error {
list, err := idx.generatePackList(func(entry indexEntry) bool {
return !entry.old
})
if err != nil {
return err
}
debug.Log("Index.Encode", "done")
enc := json.NewEncoder(w)
return enc.Encode(list)
type jsonIndex struct {
Supersedes backend.IDs `json:"supersedes,omitempty"`
Packs []*packJSON `json:"packs"`
}
// Encode writes the JSON serialization of the index to the writer w. This
// serialization only contains new blobs added via idx.Store(), not old ones
// introduced via DecodeIndex().
type jsonOldIndex []*packJSON
// Encode writes the JSON serialization of the index to the writer w.
func (idx *Index) Encode(w io.Writer) error {
debug.Log("Index.Encode", "encoding index")
idx.m.Lock()
defer idx.m.Unlock()
return idx.encode(w, func(e indexEntry) bool { return !e.old })
return idx.encode(w)
}
// encode writes the JSON serialization of the index to the writer w.
func (idx *Index) encode(w io.Writer) error {
debug.Log("Index.encode", "encoding index")
list, err := idx.generatePackList(nil)
if err != nil {
return err
}
enc := json.NewEncoder(w)
idxJSON := jsonIndex{
Supersedes: idx.supersedes,
Packs: list,
}
return enc.Encode(idxJSON)
}
// Finalize sets the index to final and writes the JSON serialization to w.
func (idx *Index) Finalize(w io.Writer) error {
debug.Log("Index.Encode", "encoding index")
idx.m.Lock()
defer idx.m.Unlock()
idx.final = true
return idx.encode(w)
}
// Dump writes the pretty-printed JSON representation of the index to w.
@ -308,24 +352,139 @@ func (idx *Index) Dump(w io.Writer) error {
return nil
}
// isErrOldIndex returns true if the error may be caused by an old index
// format.
func isErrOldIndex(err error) bool {
if e, ok := err.(*json.UnmarshalTypeError); ok && e.Value == "array" {
return true
}
return false
}
// ErrOldIndexFormat means an index with the old format was detected.
var ErrOldIndexFormat = errors.New("index has old format")
// DecodeIndex loads and unserializes an index from rd.
func DecodeIndex(rd io.Reader) (*Index, error) {
func DecodeIndex(rd io.Reader) (idx *Index, err error) {
debug.Log("Index.DecodeIndex", "Start decoding index")
list := []*packJSON{}
idxJSON := jsonIndex{}
dec := json.NewDecoder(rd)
err := dec.Decode(&list)
err = dec.Decode(&idxJSON)
if err != nil {
debug.Log("Index.DecodeIndex", "Error %v", err)
if isErrOldIndex(err) {
debug.Log("Index.DecodeIndex", "index is probably old format, trying that")
err = ErrOldIndexFormat
}
return nil, err
}
idx := NewIndex()
for _, pack := range list {
idx = NewIndex()
for _, pack := range idxJSON.Packs {
for _, blob := range pack.Blobs {
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length, true)
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length)
}
}
idx.supersedes = idxJSON.Supersedes
idx.final = true
debug.Log("Index.DecodeIndex", "done")
return idx, err
}
// DecodeOldIndex loads and unserializes an index in the old format from rd.
func DecodeOldIndex(rd io.Reader) (idx *Index, err error) {
debug.Log("Index.DecodeOldIndex", "Start decoding old index")
list := []*packJSON{}
dec := json.NewDecoder(rd)
err = dec.Decode(&list)
if err != nil {
debug.Log("Index.DecodeOldIndex", "Error %#v", err)
return nil, err
}
idx = NewIndex()
for _, pack := range list {
for _, blob := range pack.Blobs {
idx.store(blob.Type, blob.ID, &pack.ID, blob.Offset, blob.Length)
}
}
debug.Log("Index.DecodeOldIndex", "done")
return idx, err
}
// ConvertIndexes loads all indexes from the repo and converts them to the new
// format (if necessary). When the conversion is succcessful, the old indexes
// are removed.
func ConvertIndexes(repo *Repository) error {
debug.Log("ConvertIndexes", "start")
done := make(chan struct{})
defer close(done)
for id := range repo.List(backend.Index, done) {
debug.Log("ConvertIndexes", "checking index %v", id.Str())
newID, err := ConvertIndex(repo, id)
if err != nil {
debug.Log("ConvertIndexes", "Converting index %v returns error: %v", id.Str(), err)
return err
}
if id != newID {
debug.Log("ConvertIndexes", "index %v converted to new format as %v", id.Str(), newID.Str())
}
}
debug.Log("ConvertIndexes", "done")
return nil
}
// ConvertIndex loads the given index from the repo and converts them to the new
// format (if necessary). When the conversion is succcessful, the old index
// is removed. Returned is either the old id (if no conversion was needed) or
// the new id.
func ConvertIndex(repo *Repository, id backend.ID) (backend.ID, error) {
debug.Log("ConvertIndex", "checking index %v", id.Str())
idx, err := LoadIndexWithDecoder(repo, id.String(), DecodeOldIndex)
if err != nil {
debug.Log("ConvertIndex", "LoadIndexWithDecoder(%v) returned error: %v", id.Str(), err)
return id, err
}
blob, err := repo.CreateEncryptedBlob(backend.Index)
if err != nil {
return id, err
}
idx.supersedes = backend.IDs{id}
err = idx.Encode(blob)
if err != nil {
debug.Log("ConvertIndex", "oldIdx.Encode() returned error: %v", err)
return id, err
}
err = blob.Close()
if err != nil {
debug.Log("ConvertIndex", "blob.Close() returned error: %v", err)
return id, err
}
newID := blob.ID()
debug.Log("ConvertIndex", "index %v converted to new format as %v", id.Str(), newID.Str())
err = repo.be.Remove(backend.Index, id.String())
if err != nil {
debug.Log("ConvertIndex", "backend.Remove(%v) returned error: %v", id.Str(), err)
return id, err
}
return newID, nil
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"crypto/rand"
"io"
"path/filepath"
"testing"
"github.com/restic/restic/backend"
@ -85,7 +86,7 @@ func TestIndexSerialize(t *testing.T) {
Equals(t, testBlob.length, length)
}
// add more blobs to idx2
// add more blobs to idx
newtests := []testEntry{}
for i := 0; i < 10; i++ {
packID := randomID()
@ -94,7 +95,7 @@ func TestIndexSerialize(t *testing.T) {
for j := 0; j < 10; j++ {
id := randomID()
length := uint(i*100 + j)
idx2.Store(pack.Data, id, &packID, pos, length)
idx.Store(pack.Data, id, &packID, pos, length)
newtests = append(newtests, testEntry{
id: id,
@ -108,22 +109,20 @@ func TestIndexSerialize(t *testing.T) {
}
}
// serialize idx2, unserialize to idx3
// serialize idx, unserialize to idx3
wr3 := bytes.NewBuffer(nil)
err = idx2.Encode(wr3)
err = idx.Finalize(wr3)
OK(t, err)
Assert(t, idx.Final(),
"index not final after encoding")
idx3, err := repository.DecodeIndex(wr3)
OK(t, err)
Assert(t, idx3 != nil,
"nil returned for decoded index")
// all old blobs must not be present in the index
for _, testBlob := range tests {
_, _, _, _, err := idx3.Lookup(testBlob.id)
Assert(t, err != nil,
"found old id %v in serialized index", testBlob.id.Str())
}
Assert(t, idx3.Final(),
"decoded index is not final")
// all new blobs must be in the index
for _, testBlob := range newtests {
@ -165,26 +164,58 @@ func TestIndexSize(t *testing.T) {
// example index serialization from doc/Design.md
var docExample = []byte(`
{
"supersedes": [
"ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452"
],
"packs": [
{
"id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c",
"blobs": [
{
"id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce",
"type": "data",
"offset": 0,
"length": 25
},{
"id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae",
"type": "tree",
"offset": 38,
"length": 100
},
{
"id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66",
"type": "data",
"offset": 150,
"length": 123
}
]
}
]
}
`)
var docOldExample = []byte(`
[ {
"id": "73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c",
"blobs": [
{
"id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce",
"type": "data",
"offset": 0,
"length": 25
},{
"id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae",
"type": "tree",
"offset": 38,
"length": 100
},
{
"id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66",
"type": "data",
"offset": 150,
"length": 123
}
{
"id": "3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce",
"type": "data",
"offset": 0,
"length": 25
},{
"id": "9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae",
"type": "tree",
"offset": 38,
"length": 100
},
{
"id": "d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66",
"type": "data",
"offset": 150,
"length": 123
}
]
} ]
`)
@ -210,6 +241,8 @@ var exampleTests = []struct {
}
func TestIndexUnserialize(t *testing.T) {
oldIdx := backend.IDs{ParseID("ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452")}
idx, err := repository.DecodeIndex(bytes.NewReader(docExample))
OK(t, err)
@ -222,69 +255,73 @@ func TestIndexUnserialize(t *testing.T) {
Equals(t, test.offset, offset)
Equals(t, test.length, length)
}
Equals(t, oldIdx, idx.Supersedes())
}
func TestStoreOverwritesPreliminaryEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
idx.StoreInProgress(dataType, blobID)
packID := randomID()
offset := uint(0)
length := uint(100)
idx.Store(dataType, blobID, &packID, offset, length)
actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID)
OK(t, err)
Equals(t, packID, *actPackID)
Equals(t, dataType, actType)
Equals(t, offset, actOffset)
Equals(t, length, actLength)
}
func TestStoreInProgressAddsPreliminaryEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
err := idx.StoreInProgress(dataType, blobID)
func TestIndexUnserializeOld(t *testing.T) {
idx, err := repository.DecodeOldIndex(bytes.NewReader(docOldExample))
OK(t, err)
actPackID, actType, actOffset, actLength, err := idx.Lookup(blobID)
OK(t, err)
Assert(t, actPackID == nil,
"Preliminary index entry illegaly associated with a pack id.")
Equals(t, uint(0), actOffset)
Equals(t, uint(0), actLength)
Equals(t, dataType, actType)
for _, test := range exampleTests {
packID, tpe, offset, length, err := idx.Lookup(test.id)
OK(t, err)
Equals(t, test.packID, *packID)
Equals(t, test.tpe, tpe)
Equals(t, test.offset, offset)
Equals(t, test.length, length)
}
Equals(t, 0, len(idx.Supersedes()))
}
func TestStoreInProgressRefusesToOverwriteExistingFinalEntry(t *testing.T) {
idx := repository.NewIndex()
var oldIndexTestRepo = filepath.Join("testdata", "old-index-repo.tar.gz")
blobID := randomID()
dataType := pack.Data
packID := randomID()
offset := uint(0)
length := uint(100)
idx.Store(dataType, blobID, &packID, offset, length)
func TestConvertIndex(t *testing.T) {
WithTestEnvironment(t, oldIndexTestRepo, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
err := idx.StoreInProgress(dataType, blobID)
Assert(t, err != nil,
"index.StoreInProgress did not refuse to overwrite existing entry")
}
func TestStoreInProgressRefusesToOverwriteExistingPreliminaryEntry(t *testing.T) {
idx := repository.NewIndex()
blobID := randomID()
dataType := pack.Data
_ = idx.StoreInProgress(dataType, blobID)
err := idx.StoreInProgress(dataType, blobID)
Assert(t, err != nil,
"index.StoreInProgress did not refuse to overwrite existing entry")
old := make(map[backend.ID]*repository.Index)
for id := range repo.List(backend.Index, nil) {
idx, err := repository.LoadIndex(repo, id.String())
OK(t, err)
old[id] = idx
}
OK(t, repository.ConvertIndexes(repo))
for id := range repo.List(backend.Index, nil) {
idx, err := repository.LoadIndexWithDecoder(repo, id.String(), repository.DecodeIndex)
OK(t, err)
Assert(t, len(idx.Supersedes()) == 1,
"Expected index %v to supersed exactly one index, got %v", id, idx.Supersedes())
oldIndexID := idx.Supersedes()[0]
oldIndex, ok := old[oldIndexID]
Assert(t, ok,
"Index %v superseds %v, but that wasn't found in the old index map", id.Str(), oldIndexID.Str())
Assert(t, idx.Count(pack.Data) == oldIndex.Count(pack.Data),
"Index %v count blobs %v: %v != %v", id.Str(), pack.Data, idx.Count(pack.Data), oldIndex.Count(pack.Data))
Assert(t, idx.Count(pack.Tree) == oldIndex.Count(pack.Tree),
"Index %v count blobs %v: %v != %v", id.Str(), pack.Tree, idx.Count(pack.Tree), oldIndex.Count(pack.Tree))
for packedBlob := range idx.Each(nil) {
packID, tpe, offset, length, err := oldIndex.Lookup(packedBlob.ID)
OK(t, err)
Assert(t, *packID == packedBlob.PackID,
"Check blob %v: pack ID %v != %v", packedBlob.ID, packID, packedBlob.PackID)
Assert(t, tpe == packedBlob.Type,
"Check blob %v: Type %v != %v", packedBlob.ID, tpe, packedBlob.Type)
Assert(t, offset == packedBlob.Offset,
"Check blob %v: Type %v != %v", packedBlob.ID, offset, packedBlob.Offset)
Assert(t, length == packedBlob.Length,
"Check blob %v: Type %v != %v", packedBlob.ID, length, packedBlob.Length)
}
}
})
}

227
repository/master_index.go Normal file
View file

@ -0,0 +1,227 @@
package repository
import (
"fmt"
"sync"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
)
// MasterIndex is a collection of indexes and IDs of chunks that are in the process of being saved.
type MasterIndex struct {
idx []*Index
idxMutex sync.RWMutex
inFlight struct {
backend.IDSet
sync.RWMutex
}
}
// NewMasterIndex creates a new master index.
func NewMasterIndex() *MasterIndex {
return &MasterIndex{
inFlight: struct {
backend.IDSet
sync.RWMutex
}{
IDSet: backend.NewIDSet(),
},
}
}
// Lookup queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) Lookup(id backend.ID) (packID *backend.ID, tpe pack.BlobType, offset, length uint, err error) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
debug.Log("MasterIndex.Lookup", "looking up id %v", id.Str())
for _, idx := range mi.idx {
packID, tpe, offset, length, err = idx.Lookup(id)
if err == nil {
debug.Log("MasterIndex.Lookup",
"found id %v in pack %v at offset %d with length %d",
id.Str(), packID.Str(), offset, length)
return
}
}
debug.Log("MasterIndex.Lookup", "id %v not found in any index", id.Str())
return nil, pack.Data, 0, 0, fmt.Errorf("id %v not found in any index", id)
}
// LookupSize queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) LookupSize(id backend.ID) (uint, error) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
length, err := idx.LookupSize(id)
if err == nil {
return length, nil
}
}
return 0, fmt.Errorf("id %v not found in any index", id)
}
// Has queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) Has(id backend.ID) bool {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
if idx.Has(id) {
return true
}
}
return false
}
// Count returns the number of blobs of type t in the index.
func (mi *MasterIndex) Count(t pack.BlobType) (n uint) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
var sum uint
for _, idx := range mi.idx {
sum += idx.Count(t)
}
return sum
}
// Insert adds a new index to the MasterIndex.
func (mi *MasterIndex) Insert(idx *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
mi.idx = append(mi.idx, idx)
}
// Remove deletes an index from the MasterIndex.
func (mi *MasterIndex) Remove(index *Index) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
for i, idx := range mi.idx {
if idx == index {
mi.idx = append(mi.idx[:i], mi.idx[i+1:]...)
return
}
}
}
// Current returns an index that is not yet finalized, so that new entries can
// still be added. If all indexes are finalized, a new index is created and
// returned.
func (mi *MasterIndex) Current() *Index {
mi.idxMutex.RLock()
for _, idx := range mi.idx {
if !idx.Final() {
mi.idxMutex.RUnlock()
return idx
}
}
mi.idxMutex.RUnlock()
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
newIdx := NewIndex()
mi.idx = append(mi.idx, newIdx)
return newIdx
}
// AddInFlight add the given ID to the list of in-flight IDs. An error is
// returned when the ID is already in the list.
func (mi *MasterIndex) AddInFlight(id backend.ID) error {
mi.inFlight.Lock()
defer mi.inFlight.Unlock()
debug.Log("MasterIndex.AddInFlight", "adding %v", id)
if mi.inFlight.Has(id) {
return fmt.Errorf("%v is already in flight", id)
}
mi.inFlight.Insert(id)
return nil
}
// IsInFlight returns true iff the id is contained in the list of in-flight IDs.
func (mi *MasterIndex) IsInFlight(id backend.ID) bool {
mi.inFlight.RLock()
defer mi.inFlight.RUnlock()
inFlight := mi.inFlight.Has(id)
debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight)
return inFlight
}
// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs.
func (mi *MasterIndex) RemoveFromInFlight(id backend.ID) {
mi.inFlight.Lock()
defer mi.inFlight.Unlock()
debug.Log("MasterIndex.RemoveFromInFlight", "removing %v from list of in flight blobs", id.Str())
mi.inFlight.Delete(id)
}
// NotFinalIndexes returns all indexes that have not yet been saved.
func (mi *MasterIndex) NotFinalIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
for _, idx := range mi.idx {
if !idx.Final() {
list = append(list, idx)
}
}
debug.Log("MasterIndex.NotFinalIndexes", "return %d indexes", len(list))
return list
}
// FullIndexes returns all indexes that are full.
func (mi *MasterIndex) FullIndexes() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
var list []*Index
debug.Log("MasterIndex.FullIndexes", "checking %d indexes", len(mi.idx))
for _, idx := range mi.idx {
if idx.Final() {
debug.Log("MasterIndex.FullIndexes", "index %p is final", idx)
continue
}
if idx.Full() {
debug.Log("MasterIndex.FullIndexes", "index %p is full", idx)
list = append(list, idx)
} else {
debug.Log("MasterIndex.FullIndexes", "index %p not full", idx)
}
}
debug.Log("MasterIndex.FullIndexes", "return %d indexes", len(list))
return list
}
// All returns all indexes.
func (mi *MasterIndex) All() []*Index {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
return mi.idx
}

View file

@ -8,6 +8,7 @@ import (
"fmt"
"io"
"io/ioutil"
"os"
"sync"
"github.com/restic/chunker"
@ -23,7 +24,7 @@ type Repository struct {
Config Config
key *crypto.Key
keyName string
idx *Index
idx *MasterIndex
pm sync.Mutex
packs []*pack.Packer
@ -33,7 +34,7 @@ type Repository struct {
func New(be backend.Backend) *Repository {
return &Repository{
be: be,
idx: NewIndex(),
idx: NewMasterIndex(),
}
}
@ -203,7 +204,7 @@ func (r *Repository) LoadJSONPack(t pack.BlobType, id backend.ID, item interface
// LookupBlobSize returns the size of blob id.
func (r *Repository) LookupBlobSize(id backend.ID) (uint, error) {
return r.Index().LookupSize(id)
return r.idx.LookupSize(id)
}
const minPackSize = 4 * chunker.MiB
@ -234,7 +235,7 @@ func (r *Repository) findPacker(size uint) (*pack.Packer, error) {
if err != nil {
return nil, err
}
debug.Log("Repo.findPacker", "create new pack %p", blob)
debug.Log("Repo.findPacker", "create new pack %p for %d bytes", blob, size)
return pack.NewPacker(r.key, blob), nil
}
@ -268,7 +269,8 @@ func (r *Repository) savePacker(p *pack.Packer) error {
// update blobs in the index
for _, b := range p.Blobs() {
debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str())
r.idx.Store(b.Type, b.ID, &sid, b.Offset, uint(b.Length))
r.idx.Current().Store(b.Type, b.ID, &sid, b.Offset, uint(b.Length))
r.idx.RemoveFromInFlight(b.ID)
}
return nil
@ -303,16 +305,9 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
return backend.ID{}, err
}
// add this id to the index, although we don't know yet in which pack it
// will be saved; the entry will be updated when the pack is written.
// Note: the current id needs to be added to the index before searching
// for a suitable packer: There's a little chance that more than one
// goroutine handles the same blob concurrently. Due to idx.StoreInProgress
// locking the index and raising an error if a matching index entry
// already exists, updating the index first ensures that only one of
// those goroutines will continue. See issue restic#292.
debug.Log("Repo.Save", "saving stub for %v (%v) in index", id.Str, t)
err = r.idx.StoreInProgress(t, *id)
// add this id to the list of in-flight chunk ids.
debug.Log("Repo.Save", "add %v to list of in-flight IDs", id.Str())
err = r.idx.AddInFlight(*id)
if err != nil {
debug.Log("Repo.Save", "another goroutine is already working on %v (%v) does already exist", id.Str, t)
return *id, nil
@ -321,12 +316,15 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID
// find suitable packer and add blob
packer, err := r.findPacker(uint(len(ciphertext)))
if err != nil {
r.idx.Remove(*id)
r.idx.RemoveFromInFlight(*id)
return backend.ID{}, err
}
// save ciphertext
packer.Add(t, *id, bytes.NewReader(ciphertext))
_, err = packer.Add(t, *id, bytes.NewReader(ciphertext))
if err != nil {
return backend.ID{}, err
}
// if the pack is not full enough and there are less than maxPackers
// packers, put back to the list
@ -445,63 +443,112 @@ func (r *Repository) Backend() backend.Backend {
return r.be
}
// Index returns the currently loaded Index.
func (r *Repository) Index() *Index {
// Index returns the currently used MasterIndex.
func (r *Repository) Index() *MasterIndex {
return r.idx
}
// SetIndex instructs the repository to use the given index.
func (r *Repository) SetIndex(i *Index) {
func (r *Repository) SetIndex(i *MasterIndex) {
r.idx = i
}
// SaveIndex saves all new packs in the index in the backend, returned is the
// storage ID.
func (r *Repository) SaveIndex() (backend.ID, error) {
debug.Log("Repo.SaveIndex", "Saving index")
// BlobWriter encrypts and saves the data written to it in a backend. After
// Close() was called, ID() returns the backend.ID.
type BlobWriter struct {
id backend.ID
blob backend.Blob
hw *backend.HashingWriter
ewr io.WriteCloser
t backend.Type
closed bool
}
// create blob
// CreateEncryptedBlob returns a BlobWriter that encrypts and saves the data
// written to it in the backend. After Close() was called, ID() returns the
// backend.ID.
func (r *Repository) CreateEncryptedBlob(t backend.Type) (*BlobWriter, error) {
blob, err := r.be.Create()
if err != nil {
return backend.ID{}, err
return nil, err
}
debug.Log("Repo.SaveIndex", "create new pack %p", blob)
// hash
hw := backend.NewHashingWriter(blob, sha256.New())
// encrypt blob
ewr := crypto.EncryptTo(r.key, hw)
err = r.idx.Encode(ewr)
return &BlobWriter{t: t, blob: blob, hw: hw, ewr: ewr}, nil
}
func (bw *BlobWriter) Write(buf []byte) (int, error) {
return bw.ewr.Write(buf)
}
// Close finalizes the blob in the backend, afterwards ID() can be used to retrieve the ID.
func (bw *BlobWriter) Close() error {
if bw.closed {
return errors.New("BlobWriter already closed")
}
bw.closed = true
err := bw.ewr.Close()
if err != nil {
return backend.ID{}, err
return err
}
err = ewr.Close()
if err != nil {
return backend.ID{}, err
copy(bw.id[:], bw.hw.Sum(nil))
return bw.blob.Finalize(bw.t, bw.id.String())
}
// ID returns the Id the blob has been written to after Close() was called.
func (bw *BlobWriter) ID() backend.ID {
return bw.id
}
// saveIndex saves all indexes in the backend.
func (r *Repository) saveIndex(indexes ...*Index) error {
for i, idx := range indexes {
debug.Log("Repo.SaveIndex", "Saving index %d", i)
blob, err := r.CreateEncryptedBlob(backend.Index)
if err != nil {
return err
}
err = idx.Encode(blob)
if err != nil {
return err
}
err = blob.Close()
if err != nil {
return err
}
sid := blob.ID()
debug.Log("Repo.SaveIndex", "Saved index %d as %v", i, sid.Str())
}
// finalize blob in the backend
sid := backend.ID{}
copy(sid[:], hw.Sum(nil))
return nil
}
err = blob.Finalize(backend.Index, sid.String())
if err != nil {
return backend.ID{}, err
}
// SaveIndex saves all new indexes in the backend.
func (r *Repository) SaveIndex() error {
return r.saveIndex(r.idx.NotFinalIndexes()...)
}
debug.Log("Repo.SaveIndex", "Saved index as %v", sid.Str())
return sid, nil
// SaveFullIndex saves all full indexes in the backend.
func (r *Repository) SaveFullIndex() error {
return r.saveIndex(r.idx.FullIndexes()...)
}
const loadIndexParallelism = 20
// LoadIndex loads all index files from the backend in parallel and merges them
// with the current index. The first error that occurred is returned.
// LoadIndex loads all index files from the backend in parallel and stores them
// in the master index. The first error that occurred is returned.
func (r *Repository) LoadIndex() error {
debug.Log("Repo.LoadIndex", "Loading index")
@ -528,7 +575,7 @@ func (r *Repository) LoadIndex() error {
}()
for idx := range indexes {
r.idx.Merge(idx)
r.idx.Insert(idx)
}
if err := <-errCh; err != nil {
@ -540,24 +587,72 @@ func (r *Repository) LoadIndex() error {
// LoadIndex loads the index id from backend and returns it.
func LoadIndex(repo *Repository, id string) (*Index, error) {
debug.Log("LoadIndex", "Loading index %v", id[:8])
idx, err := LoadIndexWithDecoder(repo, id, DecodeIndex)
if err == nil {
return idx, nil
}
rd, err := repo.be.Get(backend.Index, id)
if err == ErrOldIndexFormat {
fmt.Fprintf(os.Stderr, "index %v has old format\n", id[:10])
return LoadIndexWithDecoder(repo, id, DecodeOldIndex)
}
return nil, err
}
// decryptReadCloser couples an underlying reader with a DecryptReader and
// implements io.ReadCloser. On Close(), both readers are closed.
type decryptReadCloser struct {
r io.ReadCloser
dr io.ReadCloser
}
func newDecryptReadCloser(key *crypto.Key, rd io.ReadCloser) (io.ReadCloser, error) {
dr, err := crypto.DecryptFrom(key, rd)
if err != nil {
return nil, err
}
return &decryptReadCloser{r: rd, dr: dr}, nil
}
func (dr *decryptReadCloser) Read(buf []byte) (int, error) {
return dr.dr.Read(buf)
}
func (dr *decryptReadCloser) Close() error {
err := dr.dr.Close()
if err != nil {
return err
}
return dr.r.Close()
}
// GetDecryptReader opens the file id stored in the backend and returns a
// reader that yields the decrypted content. The reader must be closed.
func (r *Repository) GetDecryptReader(t backend.Type, id string) (io.ReadCloser, error) {
rd, err := r.be.Get(t, id)
if err != nil {
return nil, err
}
return newDecryptReadCloser(r.key, rd)
}
// LoadIndexWithDecoder loads the index and decodes it with fn.
func LoadIndexWithDecoder(repo *Repository, id string, fn func(io.Reader) (*Index, error)) (*Index, error) {
debug.Log("LoadIndexWithDecoder", "Loading index %v", id[:8])
rd, err := repo.GetDecryptReader(backend.Index, id)
if err != nil {
return nil, err
}
defer rd.Close()
if err != nil {
return nil, err
}
// decrypt
decryptRd, err := crypto.DecryptFrom(repo.key, rd)
defer decryptRd.Close()
idx, err := fn(rd)
if err != nil {
return nil, err
}
idx, err := DecodeIndex(decryptRd)
if err != nil {
debug.Log("LoadIndex", "error while decoding index %v: %v", id, err)
debug.Log("LoadIndexWithDecoder", "error while decoding index %v: %v", id, err)
return nil, err
}

View file

@ -88,9 +88,11 @@ func TestSave(t *testing.T) {
Equals(t, id, sid)
OK(t, repo.Flush())
// OK(t, repo.SaveIndex())
// read back
buf, err := repo.LoadBlob(pack.Data, id, make([]byte, size))
OK(t, err)
Assert(t, len(buf) == len(data),
"number of bytes read back does not match: expected %d, got %d",
@ -121,6 +123,7 @@ func TestSaveFrom(t *testing.T) {
// read back
buf, err := repo.LoadBlob(pack.Data, id, make([]byte, size))
OK(t, err)
Assert(t, len(buf) == len(data),
"number of bytes read back does not match: expected %d, got %d",
@ -199,7 +202,7 @@ func TestLoadJSONUnpacked(t *testing.T) {
var repoFixture = filepath.Join("testdata", "test-repo.tar.gz")
func TestLoadIndex(t *testing.T) {
func TestRepositoryLoadIndex(t *testing.T) {
WithTestEnvironment(t, repoFixture, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
OK(t, repo.LoadIndex())
@ -212,7 +215,7 @@ func BenchmarkLoadIndex(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
repo.SetIndex(repository.NewIndex())
repo.SetIndex(repository.NewMasterIndex())
OK(b, repo.LoadIndex())
}
})

Binary file not shown.