Merge pull request #252 from restic/repack-blobs

WIP: Repack blobs
This commit is contained in:
Alexander Neumann 2015-11-09 20:57:57 +01:00
commit acba82c8f7
15 changed files with 703 additions and 152 deletions

View file

@ -1,5 +1,7 @@
package backend package backend
import "sort"
// IDSet is a set of IDs. // IDSet is a set of IDs.
type IDSet map[ID]struct{} type IDSet map[ID]struct{}
@ -36,6 +38,8 @@ func (s IDSet) List() IDs {
list = append(list, id) list = append(list, id)
} }
sort.Sort(list)
return list return list
} }
@ -66,5 +70,5 @@ func (s IDSet) String() string {
return "{}" return "{}"
} }
return "{" + str[1:len(str)-2] + "}" return "{" + str[1:len(str)-1] + "}"
} }

View file

@ -3,7 +3,6 @@ package checker
import ( import (
"errors" "errors"
"fmt" "fmt"
"os"
"sync" "sync"
"github.com/restic/restic" "github.com/restic/restic"
@ -59,6 +58,16 @@ func (e ErrDuplicatePacks) Error() string {
return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID.Str(), e.Indexes) return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID.Str(), e.Indexes)
} }
// ErrOldIndexFormat is returned when an index with the old format is
// found.
type ErrOldIndexFormat struct {
backend.ID
}
func (err ErrOldIndexFormat) Error() string {
return fmt.Sprintf("index %v has old format", err.ID.Str())
}
// LoadIndex loads all index files. // LoadIndex loads all index files.
func (c *Checker) LoadIndex() (hints []error, errs []error) { func (c *Checker) LoadIndex() (hints []error, errs []error) {
debug.Log("LoadIndex", "Start") debug.Log("LoadIndex", "Start")
@ -73,14 +82,10 @@ func (c *Checker) LoadIndex() (hints []error, errs []error) {
debug.Log("LoadIndex", "worker got index %v", id) debug.Log("LoadIndex", "worker got index %v", id)
idx, err := repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeIndex) idx, err := repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeIndex)
if err == repository.ErrOldIndexFormat { if err == repository.ErrOldIndexFormat {
debug.Log("LoadIndex", "old index format found, converting") debug.Log("LoadIndex", "index %v has old format", id.Str())
fmt.Fprintf(os.Stderr, "convert index %v to new format\n", id.Str()) hints = append(hints, ErrOldIndexFormat{id})
id, err = repository.ConvertIndex(c.repo, id)
if err != nil {
return err
}
idx, err = repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeIndex) idx, err = repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeOldIndex)
} }
if err != nil { if err != nil {
@ -617,7 +622,7 @@ func (c *Checker) UnusedBlobs() (blobs backend.IDs) {
debug.Log("Checker.UnusedBlobs", "checking %d blobs", len(c.blobs)) debug.Log("Checker.UnusedBlobs", "checking %d blobs", len(c.blobs))
for id := range c.blobs { for id := range c.blobs {
if c.blobRefs.M[id] == 0 { if c.blobRefs.M[id] == 0 {
debug.Log("Checker.UnusedBlobs", "blob %v not not referenced", id.Str()) debug.Log("Checker.UnusedBlobs", "blob %v not referenced", id.Str())
blobs = append(blobs, id) blobs = append(blobs, id)
} }
} }

163
checker/repacker.go Normal file
View file

@ -0,0 +1,163 @@
package checker
import (
"errors"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/repository"
)
// Repacker extracts still used blobs from packs with unused blobs and creates
// new packs.
type Repacker struct {
unusedBlobs backend.IDSet
repo *repository.Repository
}
// NewRepacker returns a new repacker that (when Repack() in run) cleans up the
// repository and creates new packs and indexs so that all blobs in unusedBlobs
// aren't used any more.
func NewRepacker(repo *repository.Repository, unusedBlobs backend.IDSet) *Repacker {
return &Repacker{
repo: repo,
unusedBlobs: unusedBlobs,
}
}
// Repack runs the process of finding still used blobs in packs with unused
// blobs, extracts them and creates new packs with just the still-in-use blobs.
func (r *Repacker) Repack() error {
debug.Log("Repacker.Repack", "searching packs for %v", r.unusedBlobs)
unneededPacks, err := FindPacksForBlobs(r.repo, r.unusedBlobs)
if err != nil {
return err
}
debug.Log("Repacker.Repack", "found packs: %v", unneededPacks)
blobs, err := FindBlobsForPacks(r.repo, unneededPacks)
if err != nil {
return err
}
debug.Log("Repacker.Repack", "found blobs: %v", blobs)
for id := range r.unusedBlobs {
debug.Log("Repacker.Repack", "remove unused blob %v", id.Str())
blobs.Delete(id)
}
debug.Log("Repacker.Repack", "need to repack blobs: %v", blobs)
err = RepackBlobs(r.repo, r.repo, blobs)
if err != nil {
return err
}
debug.Log("Repacker.Repack", "remove unneeded packs: %v", unneededPacks)
for packID := range unneededPacks {
err = r.repo.Backend().Remove(backend.Data, packID.String())
if err != nil {
return err
}
}
debug.Log("Repacker.Repack", "rebuild index, unneeded packs: %v", unneededPacks)
idx, err := r.repo.Index().RebuildIndex(unneededPacks)
newIndexID, err := repository.SaveIndex(r.repo, idx)
debug.Log("Repacker.Repack", "saved new index at %v, err %v", newIndexID.Str(), err)
if err != nil {
return err
}
debug.Log("Repacker.Repack", "remove old indexes: %v", idx.Supersedes())
for _, id := range idx.Supersedes() {
err = r.repo.Backend().Remove(backend.Index, id.String())
if err != nil {
debug.Log("Repacker.Repack", "error removing index %v: %v", id.Str(), err)
return err
}
debug.Log("Repacker.Repack", "removed index %v", id.Str())
}
return nil
}
// FindPacksForBlobs returns the set of packs that contain the blobs.
func FindPacksForBlobs(repo *repository.Repository, blobs backend.IDSet) (backend.IDSet, error) {
packs := backend.NewIDSet()
idx := repo.Index()
for id := range blobs {
blob, err := idx.Lookup(id)
if err != nil {
return nil, err
}
packs.Insert(blob.PackID)
}
return packs, nil
}
// FindBlobsForPacks returns the set of blobs contained in a pack of packs.
func FindBlobsForPacks(repo *repository.Repository, packs backend.IDSet) (backend.IDSet, error) {
blobs := backend.NewIDSet()
for packID := range packs {
for _, packedBlob := range repo.Index().ListPack(packID) {
blobs.Insert(packedBlob.ID)
}
}
return blobs, nil
}
// repackBlob loads a single blob from src and saves it in dst.
func repackBlob(src, dst *repository.Repository, id backend.ID) error {
blob, err := src.Index().Lookup(id)
if err != nil {
return err
}
debug.Log("RepackBlobs", "repacking blob %v, len %v", id.Str(), blob.PlaintextLength())
buf := make([]byte, 0, blob.PlaintextLength())
buf, err = src.LoadBlob(blob.Type, id, buf)
if err != nil {
return err
}
if uint(len(buf)) != blob.PlaintextLength() {
debug.Log("RepackBlobs", "repack blob %v: len(buf) isn't equal to length: %v = %v", id.Str(), len(buf), blob.PlaintextLength())
return errors.New("LoadBlob returned wrong data, len() doesn't match")
}
_, err = dst.SaveAndEncrypt(blob.Type, buf, &id)
if err != nil {
return err
}
return nil
}
// RepackBlobs reads all blobs in blobIDs from src and saves them into new pack
// files in dst. Source and destination repo may be the same.
func RepackBlobs(src, dst *repository.Repository, blobIDs backend.IDSet) (err error) {
for id := range blobIDs {
err = repackBlob(src, dst, id)
if err != nil {
return err
}
}
err = dst.Flush()
if err != nil {
return err
}
return nil
}

127
checker/repacker_test.go Normal file
View file

@ -0,0 +1,127 @@
package checker_test
import (
"testing"
"github.com/restic/restic/backend"
"github.com/restic/restic/checker"
. "github.com/restic/restic/test"
)
var findPackTests = []struct {
blobIDs backend.IDSet
packIDs backend.IDSet
}{
{
backend.IDSet{
ParseID("534f211b4fc2cf5b362a24e8eba22db5372a75b7e974603ff9263f5a471760f4"): struct{}{},
ParseID("51aa04744b518c6a85b4e7643cfa99d58789c2a6ca2a3fda831fa3032f28535c"): struct{}{},
ParseID("454515bca5f4f60349a527bd814cc2681bc3625716460cc6310771c966d8a3bf"): struct{}{},
ParseID("c01952de4d91da1b1b80bc6e06eaa4ec21523f4853b69dc8231708b9b7ec62d8"): struct{}{},
},
backend.IDSet{
ParseID("19a731a515618ec8b75fc0ff3b887d8feb83aef1001c9899f6702761142ed068"): struct{}{},
ParseID("657f7fb64f6a854fff6fe9279998ee09034901eded4e6db9bcee0e59745bbce6"): struct{}{},
},
},
}
var findBlobTests = []struct {
packIDs backend.IDSet
blobIDs backend.IDSet
}{
{
backend.IDSet{
ParseID("60e0438dcb978ec6860cc1f8c43da648170ee9129af8f650f876bad19f8f788e"): struct{}{},
},
backend.IDSet{
ParseID("356493f0b00a614d36c698591bbb2b1d801932d85328c1f508019550034549fc"): struct{}{},
ParseID("b8a6bcdddef5c0f542b4648b2ef79bc0ed4377d4109755d2fb78aff11e042663"): struct{}{},
ParseID("5714f7274a8aa69b1692916739dc3835d09aac5395946b8ec4f58e563947199a"): struct{}{},
ParseID("b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c"): struct{}{},
ParseID("08d0444e9987fa6e35ce4232b2b71473e1a8f66b2f9664cc44dc57aad3c5a63a"): struct{}{},
},
},
{
backend.IDSet{
ParseID("60e0438dcb978ec6860cc1f8c43da648170ee9129af8f650f876bad19f8f788e"): struct{}{},
ParseID("ff7e12cd66d896b08490e787d1915c641e678d7e6b4a00e60db5d13054f4def4"): struct{}{},
},
backend.IDSet{
ParseID("356493f0b00a614d36c698591bbb2b1d801932d85328c1f508019550034549fc"): struct{}{},
ParseID("b8a6bcdddef5c0f542b4648b2ef79bc0ed4377d4109755d2fb78aff11e042663"): struct{}{},
ParseID("5714f7274a8aa69b1692916739dc3835d09aac5395946b8ec4f58e563947199a"): struct{}{},
ParseID("b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c"): struct{}{},
ParseID("08d0444e9987fa6e35ce4232b2b71473e1a8f66b2f9664cc44dc57aad3c5a63a"): struct{}{},
ParseID("aa79d596dbd4c863e5400deaca869830888fe1ce9f51b4a983f532c77f16a596"): struct{}{},
ParseID("b2396c92781307111accf2ebb1cd62b58134b744d90cb6f153ca456a98dc3e76"): struct{}{},
ParseID("5249af22d3b2acd6da8048ac37b2a87fa346fabde55ed23bb866f7618843c9fe"): struct{}{},
ParseID("f41c2089a9d58a4b0bf39369fa37588e6578c928aea8e90a4490a6315b9905c1"): struct{}{},
},
},
}
func TestRepackerFindPacks(t *testing.T) {
WithTestEnvironment(t, checkerTestData, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
OK(t, repo.LoadIndex())
for _, test := range findPackTests {
packIDs, err := checker.FindPacksForBlobs(repo, test.blobIDs)
OK(t, err)
Equals(t, test.packIDs, packIDs)
}
for _, test := range findBlobTests {
blobs, err := checker.FindBlobsForPacks(repo, test.packIDs)
OK(t, err)
Assert(t, test.blobIDs.Equals(blobs),
"list of blobs for packs %v does not match, expected:\n %v\ngot:\n %v",
test.packIDs, test.blobIDs, blobs)
}
})
}
func TestRepacker(t *testing.T) {
WithTestEnvironment(t, checkerTestData, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
OK(t, repo.LoadIndex())
repo.Backend().Remove(backend.Snapshot, "c2b53c5e6a16db92fbb9aa08bd2794c58b379d8724d661ee30d20898bdfdff22")
unusedBlobs := backend.IDSet{
ParseID("5714f7274a8aa69b1692916739dc3835d09aac5395946b8ec4f58e563947199a"): struct{}{},
ParseID("08d0444e9987fa6e35ce4232b2b71473e1a8f66b2f9664cc44dc57aad3c5a63a"): struct{}{},
ParseID("356493f0b00a614d36c698591bbb2b1d801932d85328c1f508019550034549fc"): struct{}{},
ParseID("b8a6bcdddef5c0f542b4648b2ef79bc0ed4377d4109755d2fb78aff11e042663"): struct{}{},
}
chkr := checker.New(repo)
_, errs := chkr.LoadIndex()
OKs(t, errs)
errs = checkStruct(chkr)
OKs(t, errs)
list := backend.NewIDSet(chkr.UnusedBlobs()...)
if !unusedBlobs.Equals(list) {
t.Fatalf("expected unused blobs:\n %v\ngot:\n %v", unusedBlobs, list)
}
repacker := checker.NewRepacker(repo, unusedBlobs)
OK(t, repacker.Repack())
chkr = checker.New(repo)
_, errs = chkr.LoadIndex()
OKs(t, errs)
OKs(t, checkPacks(chkr))
OKs(t, checkStruct(chkr))
blobs := chkr.UnusedBlobs()
Assert(t, len(blobs) == 0,
"expected zero unused blobs, got %v", blobs)
})
}

View file

@ -167,12 +167,8 @@ func (cmd CmdCat) Execute(args []string) error {
return err return err
} }
if blob.Type != pack.Data {
return errors.New("wrong type for blob")
}
buf := make([]byte, blob.Length) buf := make([]byte, blob.Length)
data, err := repo.LoadBlob(pack.Data, id, buf) data, err := repo.LoadBlob(blob.Type, id, buf)
if err != nil { if err != nil {
return err return err
} }

View file

@ -5,13 +5,12 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/restic/restic/backend"
"github.com/restic/restic/checker" "github.com/restic/restic/checker"
) )
type CmdCheck struct { type CmdCheck struct {
ReadData bool `long:"read-data" description:"Read data blobs" default:"false"` ReadData bool `long:"read-data" description:"Read data blobs" default:"false"`
RemoveOrphaned bool `long:"remove" description:"Remove data that isn't used" default:"false"` CheckUnused bool `long:"check-unused" description:"Check for unused blobs" default:"false"`
global *GlobalOptions global *GlobalOptions
} }
@ -80,14 +79,9 @@ func (cmd CmdCheck) Execute(args []string) error {
cmd.global.Verbosef("Check all packs\n") cmd.global.Verbosef("Check all packs\n")
go chkr.Packs(errChan, done) go chkr.Packs(errChan, done)
foundOrphanedPacks := false
for err := range errChan { for err := range errChan {
errorsFound = true errorsFound = true
fmt.Fprintf(os.Stderr, "%v\n", err) fmt.Fprintf(os.Stderr, "%v\n", err)
if e, ok := err.(checker.PackError); ok && e.Orphaned {
foundOrphanedPacks = true
}
} }
cmd.global.Verbosef("Check snapshots, trees and blobs\n") cmd.global.Verbosef("Check snapshots, trees and blobs\n")
@ -106,21 +100,11 @@ func (cmd CmdCheck) Execute(args []string) error {
} }
} }
if cmd.CheckUnused {
for _, id := range chkr.UnusedBlobs() { for _, id := range chkr.UnusedBlobs() {
cmd.global.Verbosef("unused blob %v\n", id.Str()) cmd.global.Verbosef("unused blob %v\n", id.Str())
errorsFound = true
} }
if foundOrphanedPacks && cmd.RemoveOrphaned {
IDs := chkr.OrphanedPacks()
cmd.global.Verbosef("Remove %d orphaned packs... ", len(IDs))
for _, id := range IDs {
if err := repo.Backend().Remove(backend.Data, id.String()); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}
cmd.global.Verbosef("done\n")
} }
if errorsFound { if errorsFound {

View file

@ -0,0 +1,84 @@
package main
import (
"errors"
"fmt"
"github.com/restic/restic/backend"
"github.com/restic/restic/checker"
)
type CmdOptimize struct {
global *GlobalOptions
}
func init() {
_, err := parser.AddCommand("optimize",
"optimize the repository",
"The optimize command reorganizes the repository and removes uneeded data",
&CmdOptimize{global: &globalOpts})
if err != nil {
panic(err)
}
}
func (cmd CmdOptimize) Usage() string {
return "[optimize-options]"
}
func (cmd CmdOptimize) Execute(args []string) error {
if len(args) != 0 {
return errors.New("optimize has no arguments")
}
repo, err := cmd.global.OpenRepository()
if err != nil {
return err
}
cmd.global.Verbosef("Create exclusive lock for repository\n")
lock, err := lockRepoExclusive(repo)
defer unlockRepo(lock)
if err != nil {
return err
}
chkr := checker.New(repo)
cmd.global.Verbosef("Load indexes\n")
_, errs := chkr.LoadIndex()
if len(errs) > 0 {
for _, err := range errs {
cmd.global.Warnf("error: %v\n", err)
}
return fmt.Errorf("LoadIndex returned errors")
}
done := make(chan struct{})
errChan := make(chan error)
go chkr.Structure(errChan, done)
for err := range errChan {
if e, ok := err.(checker.TreeError); ok {
cmd.global.Warnf("error for tree %v:\n", e.ID.Str())
for _, treeErr := range e.Errors {
cmd.global.Warnf(" %v\n", treeErr)
}
} else {
cmd.global.Warnf("error: %v\n", err)
}
}
unusedBlobs := backend.NewIDSet(chkr.UnusedBlobs()...)
cmd.global.Verbosef("%d unused blobs found, repacking...\n", len(unusedBlobs))
repacker := checker.NewRepacker(repo, unusedBlobs)
err = repacker.Repack()
if err != nil {
return err
}
cmd.global.Verbosef("repacking done\n")
return nil
}

View file

@ -90,7 +90,7 @@ func (cmd CmdRebuildIndex) RebuildIndex() error {
} }
blobsDone[b] = struct{}{} blobsDone[b] = struct{}{}
combinedIndex.Store(packedBlob.Type, packedBlob.ID, packedBlob.PackID, packedBlob.Offset, packedBlob.Length) combinedIndex.Store(packedBlob)
} }
combinedIndex.AddToSupersedes(indexID) combinedIndex.AddToSupersedes(indexID)
@ -162,7 +162,13 @@ func (cmd CmdRebuildIndex) RebuildIndex() error {
for _, blob := range up.Entries { for _, blob := range up.Entries {
debug.Log("RebuildIndex.RebuildIndex", "pack %v: blob %v", packID.Str(), blob) debug.Log("RebuildIndex.RebuildIndex", "pack %v: blob %v", packID.Str(), blob)
combinedIndex.Store(blob.Type, blob.ID, packID, blob.Offset, blob.Length) combinedIndex.Store(repository.PackedBlob{
Type: blob.Type,
ID: blob.ID,
PackID: packID,
Offset: blob.Offset,
Length: blob.Length,
})
} }
err = rd.Close() err = rd.Close()

View file

@ -216,3 +216,13 @@ func withTestEnvironment(t testing.TB, f func(*testEnvironment, GlobalOptions))
RemoveAll(t, tempdir) RemoveAll(t, tempdir)
} }
// removeFile resets the read-only flag and then deletes the file.
func removeFile(fn string) error {
err := os.Chmod(fn, 0666)
if err != nil {
return err
}
return os.Remove(fn)
}

View file

@ -61,7 +61,7 @@ func cmdBackupExcludes(t testing.TB, global GlobalOptions, target []string, pare
OK(t, cmd.Execute(target)) OK(t, cmd.Execute(target))
} }
func cmdList(t testing.TB, global GlobalOptions, tpe string) []backend.ID { func cmdList(t testing.TB, global GlobalOptions, tpe string) backend.IDs {
var buf bytes.Buffer var buf bytes.Buffer
global.stdout = &buf global.stdout = &buf
cmd := &CmdList{global: &global} cmd := &CmdList{global: &global}
@ -87,7 +87,11 @@ func cmdRestoreIncludes(t testing.TB, global GlobalOptions, dir string, snapshot
} }
func cmdCheck(t testing.TB, global GlobalOptions) { func cmdCheck(t testing.TB, global GlobalOptions) {
cmd := &CmdCheck{global: &global, ReadData: true} cmd := &CmdCheck{
global: &global,
ReadData: true,
CheckUnused: true,
}
OK(t, cmd.Execute(nil)) OK(t, cmd.Execute(nil))
} }
@ -105,6 +109,11 @@ func cmdRebuildIndex(t testing.TB, global GlobalOptions) {
OK(t, cmd.Execute(nil)) OK(t, cmd.Execute(nil))
} }
func cmdOptimize(t testing.TB, global GlobalOptions) {
cmd := &CmdOptimize{global: &global}
OK(t, cmd.Execute(nil))
}
func cmdLs(t testing.TB, global GlobalOptions, snapshotID string) []string { func cmdLs(t testing.TB, global GlobalOptions, snapshotID string) []string {
var buf bytes.Buffer var buf bytes.Buffer
global.stdout = &buf global.stdout = &buf
@ -739,3 +748,43 @@ func TestRebuildIndexAlwaysFull(t *testing.T) {
repository.IndexFull = func(*repository.Index) bool { return true } repository.IndexFull = func(*repository.Index) bool { return true }
TestRebuildIndex(t) TestRebuildIndex(t)
} }
var optimizeTests = []struct {
testFilename string
snapshots backend.IDSet
}{
{
filepath.Join("..", "..", "checker", "testdata", "checker-test-repo.tar.gz"),
backend.NewIDSet(ParseID("a13c11e582b77a693dd75ab4e3a3ba96538a056594a4b9076e4cacebe6e06d43")),
},
{
filepath.Join("testdata", "old-index-repo.tar.gz"),
nil,
},
{
filepath.Join("testdata", "old-index-repo.tar.gz"),
backend.NewIDSet(
ParseID("f7d83db709977178c9d1a09e4009355e534cde1a135b8186b8b118a3fc4fcd41"),
ParseID("51d249d28815200d59e4be7b3f21a157b864dc343353df9d8e498220c2499b02"),
),
},
}
func TestOptimizeRemoveUnusedBlobs(t *testing.T) {
for i, test := range optimizeTests {
withTestEnvironment(t, func(env *testEnvironment, global GlobalOptions) {
SetupTarTestFixture(t, env.base, test.testFilename)
for id := range test.snapshots {
OK(t, removeFile(filepath.Join(env.repo, "snapshots", id.String())))
}
cmdOptimize(t, global)
output := cmdCheckOutput(t, global)
if len(output) > 0 {
t.Errorf("expected no output for check in test %d, got:\n%v", i, output)
}
})
}
}

View file

@ -20,6 +20,7 @@ type Index struct {
pack map[backend.ID]indexEntry pack map[backend.ID]indexEntry
final bool // set to true for all indexes read from the backend ("finalized") final bool // set to true for all indexes read from the backend ("finalized")
id backend.ID // set to the ID of the index when it's finalized
supersedes backend.IDs supersedes backend.IDs
created time.Time created time.Time
} }
@ -39,12 +40,12 @@ func NewIndex() *Index {
} }
} }
func (idx *Index) store(t pack.BlobType, id backend.ID, pack backend.ID, offset, length uint) { func (idx *Index) store(blob PackedBlob) {
idx.pack[id] = indexEntry{ idx.pack[blob.ID] = indexEntry{
tpe: t, tpe: blob.Type,
packID: pack, packID: blob.PackID,
offset: offset, offset: blob.Offset,
length: length, length: blob.Length,
} }
} }
@ -95,7 +96,7 @@ var IndexFull = func(idx *Index) bool {
// Store remembers the id and pack in the index. An existing entry will be // Store remembers the id and pack in the index. An existing entry will be
// silently overwritten. // silently overwritten.
func (idx *Index) Store(t pack.BlobType, id backend.ID, pack backend.ID, offset, length uint) { func (idx *Index) Store(blob PackedBlob) {
idx.m.Lock() idx.m.Lock()
defer idx.m.Unlock() defer idx.m.Unlock()
@ -103,10 +104,9 @@ func (idx *Index) Store(t pack.BlobType, id backend.ID, pack backend.ID, offset,
panic("store new item in finalized index") panic("store new item in finalized index")
} }
debug.Log("Index.Store", "pack %v contains id %v (%v), offset %v, length %v", debug.Log("Index.Store", "%v", blob)
pack.Str(), id.Str(), t, offset, length)
idx.store(t, id, pack, offset, length) idx.store(blob)
} }
// Lookup queries the index for the blob ID and returns a PackedBlob. // Lookup queries the index for the blob ID and returns a PackedBlob.
@ -132,6 +132,26 @@ func (idx *Index) Lookup(id backend.ID) (pb PackedBlob, err error) {
return PackedBlob{}, fmt.Errorf("id %v not found in index", id) return PackedBlob{}, fmt.Errorf("id %v not found in index", id)
} }
// ListPack returns a list of blobs contained in a pack.
func (idx *Index) ListPack(id backend.ID) (list []PackedBlob) {
idx.m.Lock()
defer idx.m.Unlock()
for blobID, entry := range idx.pack {
if entry.packID == id {
list = append(list, PackedBlob{
ID: blobID,
Type: entry.tpe,
Length: entry.length,
Offset: entry.offset,
PackID: entry.packID,
})
}
}
return list
}
// Has returns true iff the id is listed in the index. // Has returns true iff the id is listed in the index.
func (idx *Index) Has(id backend.ID) bool { func (idx *Index) Has(id backend.ID) bool {
_, err := idx.Lookup(id) _, err := idx.Lookup(id)
@ -375,6 +395,39 @@ func (idx *Index) Finalize(w io.Writer) error {
return idx.encode(w) return idx.encode(w)
} }
// ID returns the ID of the index, if available. If the index is not yet
// finalized, an error is returned.
func (idx *Index) ID() (backend.ID, error) {
idx.m.Lock()
defer idx.m.Unlock()
if !idx.final {
return backend.ID{}, errors.New("index not finalized")
}
return idx.id, nil
}
// SetID sets the ID the index has been written to. This requires that
// Finalize() has been called before, otherwise an error is returned.
func (idx *Index) SetID(id backend.ID) error {
idx.m.Lock()
defer idx.m.Unlock()
if !idx.final {
return errors.New("indexs is not final")
}
if !idx.id.IsNull() {
return errors.New("ID already set")
}
debug.Log("Index.SetID", "ID set to %v", id.Str())
idx.id = id
return nil
}
// Dump writes the pretty-printed JSON representation of the index to w. // Dump writes the pretty-printed JSON representation of the index to w.
func (idx *Index) Dump(w io.Writer) error { func (idx *Index) Dump(w io.Writer) error {
debug.Log("Index.Dump", "dumping index") debug.Log("Index.Dump", "dumping index")
@ -386,7 +439,12 @@ func (idx *Index) Dump(w io.Writer) error {
return err return err
} }
buf, err := json.MarshalIndent(list, "", " ") outer := jsonIndex{
Supersedes: idx.Supersedes(),
Packs: list,
}
buf, err := json.MarshalIndent(outer, "", " ")
if err != nil { if err != nil {
return err return err
} }
@ -435,7 +493,13 @@ func DecodeIndex(rd io.Reader) (idx *Index, err error) {
idx = NewIndex() idx = NewIndex()
for _, pack := range idxJSON.Packs { for _, pack := range idxJSON.Packs {
for _, blob := range pack.Blobs { for _, blob := range pack.Blobs {
idx.store(blob.Type, blob.ID, pack.ID, blob.Offset, blob.Length) idx.store(PackedBlob{
Type: blob.Type,
ID: blob.ID,
Offset: blob.Offset,
Length: blob.Length,
PackID: pack.ID,
})
} }
} }
idx.supersedes = idxJSON.Supersedes idx.supersedes = idxJSON.Supersedes
@ -460,38 +524,45 @@ func DecodeOldIndex(rd io.Reader) (idx *Index, err error) {
idx = NewIndex() idx = NewIndex()
for _, pack := range list { for _, pack := range list {
for _, blob := range pack.Blobs { for _, blob := range pack.Blobs {
idx.store(blob.Type, blob.ID, pack.ID, blob.Offset, blob.Length) idx.store(PackedBlob{
Type: blob.Type,
ID: blob.ID,
PackID: pack.ID,
Offset: blob.Offset,
Length: blob.Length,
})
} }
} }
idx.final = true
debug.Log("Index.DecodeOldIndex", "done") debug.Log("Index.DecodeOldIndex", "done")
return idx, err return idx, err
} }
// ConvertIndexes loads all indexes from the repo and converts them to the new // LoadIndexWithDecoder loads the index and decodes it with fn.
// format (if necessary). When the conversion is succcessful, the old indexes func LoadIndexWithDecoder(repo *Repository, id string, fn func(io.Reader) (*Index, error)) (*Index, error) {
// are removed. debug.Log("LoadIndexWithDecoder", "Loading index %v", id[:8])
func ConvertIndexes(repo *Repository) error {
debug.Log("ConvertIndexes", "start")
done := make(chan struct{})
defer close(done)
for id := range repo.List(backend.Index, done) { idxID, err := backend.ParseID(id)
debug.Log("ConvertIndexes", "checking index %v", id.Str())
newID, err := ConvertIndex(repo, id)
if err != nil { if err != nil {
debug.Log("ConvertIndexes", "Converting index %v returns error: %v", id.Str(), err) return nil, err
return err
} }
if id != newID { rd, err := repo.GetDecryptReader(backend.Index, idxID.String())
debug.Log("ConvertIndexes", "index %v converted to new format as %v", id.Str(), newID.Str()) if err != nil {
return nil, err
} }
defer rd.Close()
idx, err := fn(rd)
if err != nil {
debug.Log("LoadIndexWithDecoder", "error while decoding index %v: %v", id, err)
return nil, err
} }
debug.Log("ConvertIndexes", "done") idx.id = idxID
return nil
return idx, nil
} }
// ConvertIndex loads the given index from the repo and converts them to the new // ConvertIndex loads the given index from the repo and converts them to the new

View file

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"io" "io"
"path/filepath"
"testing" "testing"
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
@ -41,7 +40,13 @@ func TestIndexSerialize(t *testing.T) {
for j := 0; j < 20; j++ { for j := 0; j < 20; j++ {
id := randomID() id := randomID()
length := uint(i*100 + j) length := uint(i*100 + j)
idx.Store(pack.Data, id, packID, pos, length) idx.Store(repository.PackedBlob{
Type: pack.Data,
ID: id,
PackID: packID,
Offset: pos,
Length: length,
})
tests = append(tests, testEntry{ tests = append(tests, testEntry{
id: id, id: id,
@ -95,7 +100,13 @@ func TestIndexSerialize(t *testing.T) {
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
id := randomID() id := randomID()
length := uint(i*100 + j) length := uint(i*100 + j)
idx.Store(pack.Data, id, packID, pos, length) idx.Store(repository.PackedBlob{
Type: pack.Data,
ID: id,
PackID: packID,
Offset: pos,
Length: length,
})
newtests = append(newtests, testEntry{ newtests = append(newtests, testEntry{
id: id, id: id,
@ -117,6 +128,12 @@ func TestIndexSerialize(t *testing.T) {
Assert(t, idx.Final(), Assert(t, idx.Final(),
"index not final after encoding") "index not final after encoding")
id := randomID()
idx.SetID(id)
id2, err := idx.ID()
Assert(t, id2.Equal(id),
"wrong ID returned: want %v, got %v", id, id2)
idx3, err := repository.DecodeIndex(wr3) idx3, err := repository.DecodeIndex(wr3)
OK(t, err) OK(t, err)
Assert(t, idx3 != nil, Assert(t, idx3 != nil,
@ -148,7 +165,13 @@ func TestIndexSize(t *testing.T) {
for j := 0; j < blobs; j++ { for j := 0; j < blobs; j++ {
id := randomID() id := randomID()
length := uint(i*100 + j) length := uint(i*100 + j)
idx.Store(pack.Data, id, packID, pos, length) idx.Store(repository.PackedBlob{
Type: pack.Data,
ID: id,
PackID: packID,
Offset: pos,
Length: length,
})
pos += length pos += length
} }
@ -240,6 +263,18 @@ var exampleTests = []struct {
}, },
} }
var exampleLookupTest = struct {
packID backend.ID
blobs backend.IDSet
}{
ParseID("73d04e6125cf3c28a299cc2f3cca3b78ceac396e4fcf9575e34536b26782413c"),
backend.IDSet{
ParseID("3ec79977ef0cf5de7b08cd12b874cd0f62bbaf7f07f3497a5b1bbcc8cb39b1ce"): struct{}{},
ParseID("9ccb846e60d90d4eb915848add7aa7ea1e4bbabfc60e573db9f7bfb2789afbae"): struct{}{},
ParseID("d3dc577b4ffd38cc4b32122cabf8655a0223ed22edfd93b353dc0c3f2b0fdf66"): struct{}{},
},
}
func TestIndexUnserialize(t *testing.T) { func TestIndexUnserialize(t *testing.T) {
oldIdx := backend.IDs{ParseID("ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452")} oldIdx := backend.IDs{ParseID("ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452")}
@ -257,6 +292,17 @@ func TestIndexUnserialize(t *testing.T) {
} }
Equals(t, oldIdx, idx.Supersedes()) Equals(t, oldIdx, idx.Supersedes())
blobs := idx.ListPack(exampleLookupTest.packID)
if len(blobs) != len(exampleLookupTest.blobs) {
t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs))
}
for _, blob := range blobs {
if !exampleLookupTest.blobs.Has(blob.ID) {
t.Errorf("unexpected blob %v found", blob.ID.Str())
}
}
} }
func TestIndexUnserializeOld(t *testing.T) { func TestIndexUnserializeOld(t *testing.T) {
@ -276,63 +322,19 @@ func TestIndexUnserializeOld(t *testing.T) {
Equals(t, 0, len(idx.Supersedes())) Equals(t, 0, len(idx.Supersedes()))
} }
var oldIndexTestRepo = filepath.Join("testdata", "old-index-repo.tar.gz")
func TestConvertIndex(t *testing.T) {
WithTestEnvironment(t, oldIndexTestRepo, func(repodir string) {
repo := OpenLocalRepo(t, repodir)
old := make(map[backend.ID]*repository.Index)
for id := range repo.List(backend.Index, nil) {
idx, err := repository.LoadIndex(repo, id.String())
OK(t, err)
old[id] = idx
}
OK(t, repository.ConvertIndexes(repo))
for id := range repo.List(backend.Index, nil) {
idx, err := repository.LoadIndexWithDecoder(repo, id.String(), repository.DecodeIndex)
OK(t, err)
Assert(t, len(idx.Supersedes()) == 1,
"Expected index %v to supersed exactly one index, got %v", id, idx.Supersedes())
oldIndexID := idx.Supersedes()[0]
oldIndex, ok := old[oldIndexID]
Assert(t, ok,
"Index %v superseds %v, but that wasn't found in the old index map", id.Str(), oldIndexID.Str())
Assert(t, idx.Count(pack.Data) == oldIndex.Count(pack.Data),
"Index %v count blobs %v: %v != %v", id.Str(), pack.Data, idx.Count(pack.Data), oldIndex.Count(pack.Data))
Assert(t, idx.Count(pack.Tree) == oldIndex.Count(pack.Tree),
"Index %v count blobs %v: %v != %v", id.Str(), pack.Tree, idx.Count(pack.Tree), oldIndex.Count(pack.Tree))
for packedBlob := range idx.Each(nil) {
blob, err := oldIndex.Lookup(packedBlob.ID)
OK(t, err)
Assert(t, blob.PackID == packedBlob.PackID,
"Check blob %v: pack ID %v != %v", packedBlob.ID, blob.PackID, packedBlob.PackID)
Assert(t, blob.Type == packedBlob.Type,
"Check blob %v: Type %v != %v", packedBlob.ID, blob.Type, packedBlob.Type)
Assert(t, blob.Offset == packedBlob.Offset,
"Check blob %v: Type %v != %v", packedBlob.ID, blob.Offset, packedBlob.Offset)
Assert(t, blob.Length == packedBlob.Length,
"Check blob %v: Type %v != %v", packedBlob.ID, blob.Length, packedBlob.Length)
}
}
})
}
func TestIndexPacks(t *testing.T) { func TestIndexPacks(t *testing.T) {
idx := repository.NewIndex() idx := repository.NewIndex()
packs := backend.NewIDSet() packs := backend.NewIDSet()
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
packID := randomID() packID := randomID()
idx.Store(pack.Data, randomID(), packID, 0, 23) idx.Store(repository.PackedBlob{
Type: pack.Data,
ID: randomID(),
PackID: packID,
Offset: 0,
Length: 23,
})
packs.Insert(packID) packs.Insert(packID)
} }

View file

@ -67,6 +67,22 @@ func (mi *MasterIndex) LookupSize(id backend.ID) (uint, error) {
return 0, fmt.Errorf("id %v not found in any index", id) return 0, fmt.Errorf("id %v not found in any index", id)
} }
// ListPack returns the list of blobs in a pack. The first matching index is
// returned, or nil if no index contains information about the pack id.
func (mi *MasterIndex) ListPack(id backend.ID) (list []PackedBlob) {
mi.idxMutex.RLock()
defer mi.idxMutex.RUnlock()
for _, idx := range mi.idx {
list := idx.ListPack(id)
if len(list) > 0 {
return list
}
}
return nil
}
// Has queries all known Indexes for the ID and returns the first match. // Has queries all known Indexes for the ID and returns the first match.
func (mi *MasterIndex) Has(id backend.ID) bool { func (mi *MasterIndex) Has(id backend.ID) bool {
mi.idxMutex.RLock() mi.idxMutex.RLock()
@ -224,3 +240,49 @@ func (mi *MasterIndex) All() []*Index {
return mi.idx return mi.idx
} }
// RebuildIndex combines all known indexes to a new index, leaving out any
// packs whose ID is contained in packBlacklist. The new index contains the IDs
// of all known indexes in the "supersedes" field.
func (mi *MasterIndex) RebuildIndex(packBlacklist backend.IDSet) (*Index, error) {
mi.idxMutex.Lock()
defer mi.idxMutex.Unlock()
debug.Log("MasterIndex.RebuildIndex", "start rebuilding index of %d indexes, pack blacklist: %v", len(mi.idx), packBlacklist)
newIndex := NewIndex()
done := make(chan struct{})
defer close(done)
for i, idx := range mi.idx {
debug.Log("MasterIndex.RebuildIndex", "adding index %d", i)
for pb := range idx.Each(done) {
if packBlacklist.Has(pb.PackID) {
continue
}
newIndex.Store(pb)
}
if !idx.Final() {
debug.Log("MasterIndex.RebuildIndex", "index %d isn't final, don't add to supersedes field", i)
continue
}
id, err := idx.ID()
if err != nil {
debug.Log("MasterIndex.RebuildIndex", "index %d does not have an ID: %v", err)
return nil, err
}
debug.Log("MasterIndex.RebuildIndex", "adding index id %v to supersedes field", id.Str())
err = newIndex.AddToSupersedes(id)
if err != nil {
return nil, err
}
}
return newIndex, nil
}

View file

@ -270,7 +270,13 @@ func (r *Repository) savePacker(p *pack.Packer) error {
// update blobs in the index // update blobs in the index
for _, b := range p.Blobs() { for _, b := range p.Blobs() {
debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str()) debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), sid.Str())
r.idx.Current().Store(b.Type, b.ID, sid, b.Offset, uint(b.Length)) r.idx.Current().Store(PackedBlob{
Type: b.Type,
ID: b.ID,
PackID: sid,
Offset: b.Offset,
Length: uint(b.Length),
})
r.idx.RemoveFromInFlight(b.ID) r.idx.RemoveFromInFlight(b.ID)
} }
@ -526,7 +532,8 @@ func SaveIndex(repo *Repository, index *Index) (backend.ID, error) {
} }
sid := blob.ID() sid := blob.ID()
return sid, nil err = index.SetID(sid)
return sid, err
} }
// saveIndex saves all indexes in the backend. // saveIndex saves all indexes in the backend.
@ -650,25 +657,6 @@ func (r *Repository) GetDecryptReader(t backend.Type, id string) (io.ReadCloser,
return newDecryptReadCloser(r.key, rd) return newDecryptReadCloser(r.key, rd)
} }
// LoadIndexWithDecoder loads the index and decodes it with fn.
func LoadIndexWithDecoder(repo *Repository, id string, fn func(io.Reader) (*Index, error)) (*Index, error) {
debug.Log("LoadIndexWithDecoder", "Loading index %v", id[:8])
rd, err := repo.GetDecryptReader(backend.Index, id)
if err != nil {
return nil, err
}
defer rd.Close()
idx, err := fn(rd)
if err != nil {
debug.Log("LoadIndexWithDecoder", "error while decoding index %v: %v", id, err)
return nil, err
}
return idx, nil
}
// SearchKey finds a key with the supplied password, afterwards the config is // SearchKey finds a key with the supplied password, afterwards the config is
// read and parsed. // read and parsed.
func (r *Repository) SearchKey(password string) error { func (r *Repository) SearchKey(password string) error {