restore: New optimized multithreaded implementation

Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
This commit is contained in:
Igor Fedorenko 2018-04-08 08:02:30 -04:00 committed by Alexander Neumann
parent 1213d8fef4
commit 1869930d95
14 changed files with 1854 additions and 24 deletions

View file

@ -0,0 +1,30 @@
The main goal of this POC is two fold
1. Reduce number of backend request, specifically
* download all required blobs of a pack with single backend request
* avoid repeated pack downloads when possible
2. Download multiple pack files concurrently
Here is high-level pseudo-code of the how the POC attempts to achieve these goals
```
while there are packs to process
choose a pack to process [1]
get the pack from the backend or cache [2]
write pack blobs to the files that need them [3]
if not all pack blobs were used
cache the pack for future use [4]
```
Pack download and processing (steps [2] - [4]) runs on multiple concurrent goroutings. The POC runs all steps [2]-[4] sequentially on the same gorouting, but it is possible to split the work differently. For example, one pool of workers can handle download (step [2]) while the other pool can handle pack processing (steps [3] and [4]).
Before a pack is downloaded (step [2]), the required space is "reserved" in the pack cache, which may purge some cached packed to make room for the reservation (well, that's the plan but purging isn't implemented in the POC). Actual download uses single backend request to get all required pack blobs. This may download blobs that are not needed, but I assume it'll still be faster than getting individual blobs. We should be able to optimize this further by changing `Backend.Load(...)` to support multiple byte ranges, for example.
Target files are written (step [3]) in the "right" order, first file blob first, then second, then third and so on. Blob write order implies that some pack blobs may not be immediately used, i.e. they are "out of order" for their respective target files. Packs with unused blobs are cached (step [4]). The cache has capacity limit and may purge packs before they are fully used, in which case the purged packs will need to be redownloaded.
Choosing which pack to process next (step [1]) is little convoluted in the POC. The code avoids processing of any given pack and any given target file by multiple workers concurrently. It also tries to reduce likelihook a pack will be purged from the cache by counting how many other packs will need to be processed before the pack is fully used up. Packs that need fewer other packs are processed first, everything else being equal.
----
Other ideas to consider
* Allow out-of-order target file writes. Inprogress restore will be somewhat confusing to observe, but this will eliminate the need to cache packs and should simplify implimentation. On the other hand, crashed/killed restore will be harder to recover.

View file

@ -0,0 +1,15 @@
Enhancement: Concurrent restore
This change significantly improves restore performance, especially
when using high-latency remote repositories like B2.
Implementation uses threads to download and process miltiple remote
files concurrently. To further reduce restore time, each remote
file is downloaded using single repository request.
Old restore implementation can be enabled with `--signethreaded` flag.
Use `--verify` restore flag to read restored files and verify their
content checksum.
https://github.com/restic/restic/issues/1605
https://github.com/restic/restic/pull/1719

View file

@ -28,13 +28,14 @@ repository.
// RestoreOptions collects all options for the restore command. // RestoreOptions collects all options for the restore command.
type RestoreOptions struct { type RestoreOptions struct {
Exclude []string Exclude []string
Include []string Include []string
Target string Target string
Host string Host string
Paths []string Paths []string
Tags restic.TagLists Tags restic.TagLists
Verify bool Verify bool
SingleThreaded bool
} }
var restoreOptions RestoreOptions var restoreOptions RestoreOptions
@ -51,6 +52,7 @@ func init() {
flags.Var(&restoreOptions.Tags, "tag", "only consider snapshots which include this `taglist` for snapshot ID \"latest\"") flags.Var(&restoreOptions.Tags, "tag", "only consider snapshots which include this `taglist` for snapshot ID \"latest\"")
flags.StringArrayVar(&restoreOptions.Paths, "path", nil, "only consider snapshots which include this (absolute) `path` for snapshot ID \"latest\"") flags.StringArrayVar(&restoreOptions.Paths, "path", nil, "only consider snapshots which include this (absolute) `path` for snapshot ID \"latest\"")
flags.BoolVar(&restoreOptions.Verify, "verify", false, "verify restored files content") flags.BoolVar(&restoreOptions.Verify, "verify", false, "verify restored files content")
flags.BoolVar(&restoreOptions.SingleThreaded, "singlethreaded", false, "use single-threaded (legacy) restore implementation")
} }
func runRestore(opts RestoreOptions, gopts GlobalOptions, args []string) error { func runRestore(opts RestoreOptions, gopts GlobalOptions, args []string) error {
@ -155,7 +157,7 @@ func runRestore(opts RestoreOptions, gopts GlobalOptions, args []string) error {
Verbosef("restoring %s to %s\n", res.Snapshot(), opts.Target) Verbosef("restoring %s to %s\n", res.Snapshot(), opts.Target)
err = res.RestoreTo(ctx, opts.Target) err = res.RestoreTo(ctx, opts.Target, opts.SingleThreaded)
if err == nil && opts.Verify { if err == nil && opts.Verify {
Verbosef("verifying files in %s\n", opts.Target) Verbosef("verifying files in %s\n", opts.Target)
var count int var count int

View file

@ -0,0 +1,52 @@
package restorer
import (
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
type filePackTraverser struct {
lookup func(restic.ID, restic.BlobType) ([]restic.PackedBlob, bool)
}
// iterates over all remaining packs of the file
func (t *filePackTraverser) forEachFilePack(file *fileInfo, fn func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool) error {
if len(file.blobs) == 0 {
return nil
}
getBlobPack := func(blobID restic.ID) (restic.PackedBlob, error) {
packs, found := t.lookup(blobID, restic.DataBlob)
if !found {
return restic.PackedBlob{}, errors.Errorf("Unknown blob %s", blobID.String())
}
// TODO which pack to use if multiple packs have the blob?
// MUST return the same pack for the same blob during the same execution
return packs[0], nil
}
var prevPackID restic.ID
var prevPackBlobs []restic.Blob
packIdx := 0
for _, blobID := range file.blobs {
packedBlob, err := getBlobPack(blobID)
if err != nil {
return err
}
if !prevPackID.IsNull() && prevPackID != packedBlob.PackID {
if !fn(packIdx, prevPackID, prevPackBlobs) {
return nil
}
packIdx++
}
if prevPackID != packedBlob.PackID {
prevPackID = packedBlob.PackID
prevPackBlobs = make([]restic.Blob, 0)
}
prevPackBlobs = append(prevPackBlobs, packedBlob.Blob)
}
if len(prevPackBlobs) > 0 {
fn(packIdx, prevPackID, prevPackBlobs)
}
return nil
}

View file

@ -0,0 +1,330 @@
package restorer
import (
"context"
"io"
"os"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
// TODO if a blob is corrupt, there may be good blob copies in other packs
// TODO evaluate if it makes sense to split download and processing workers
// pro: can (slowly) read network and decrypt/write files concurrently
// con: each worker needs to keep one pack in memory
// TODO evaluate memory footprint for larger repositories, say 10M packs/10M files
// TODO consider replacing pack file cache with blob cache
// TODO avoid decrypting the same blob multiple times
// TODO remove `restore --singlethreaded` and review/simplify hardlink support
// (node.CreateAt shouldn't take HardlinkIndex)
// TODO evaluate disabled debug logging overhead for large repositories
// TODO consider logging snapshot-relative path to reduce log clutter
const (
workerCount = 8
// max number of open output file handles
filesWriterCount = 32
// estimated average pack size used to calculate pack cache capacity
averagePackSize = 5 * 1024 * 1024
// pack cache capacity should support at least one cached pack per worker
// allow space for extra 5 packs for actual caching
packCacheCapacity = (workerCount + 5) * averagePackSize
)
// information about regular file being restored
type fileInfo struct {
path string // full path to the file on local filesystem
blobs []restic.ID // remaining blobs of the file
}
// information about a data pack required to restore one or more files
type packInfo struct {
// the pack id
id restic.ID
// set of files that use blobs from this pack
files map[*fileInfo]struct{}
// number of other packs that must be downloaded before all blobs in this pack can be used
cost int
// used by packHeap
index int
}
// fileRestorer restores set of files
type fileRestorer struct {
key *crypto.Key
idx filePackTraverser
packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
packCache *packCache // pack cache
filesWriter *filesWriter // file write
files []*fileInfo
}
func newFileRestorer(packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, key *crypto.Key, idx filePackTraverser) *fileRestorer {
return &fileRestorer{
packLoader: packLoader,
key: key,
idx: idx,
filesWriter: newFilesWriter(filesWriterCount),
packCache: newPackCache(packCacheCapacity),
}
}
func (r *fileRestorer) addFile(path string, content restic.IDs) {
r.files = append(r.files, &fileInfo{path: path, blobs: content})
}
// used to pass information among workers (wish golang channels allowed multivalues)
type processingInfo struct {
pack *packInfo
files map[*fileInfo]error
}
func (r *fileRestorer) restoreFiles(ctx context.Context, onError func(path string, err error)) error {
for _, file := range r.files {
dbgmsg := file.path + ": "
r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
if packIdx > 0 {
dbgmsg += ", "
}
dbgmsg += "pack{id=" + packID.Str() + ", blobs: "
for blobIdx, blob := range packBlobs {
if blobIdx > 0 {
dbgmsg += ", "
}
dbgmsg += blob.ID.Str()
}
dbgmsg += "}"
return true // keep going
})
debug.Log(dbgmsg)
}
// synchronously create empty files (empty files need no packs and are ignored by packQueue)
for _, file := range r.files {
if len(file.blobs) == 0 {
wr, err := os.OpenFile(file.path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
if err == nil {
wr.Close()
} else {
onError(file.path, err)
}
}
}
inprogress := make(map[*fileInfo]struct{})
queue, err := newPackQueue(r.idx, r.files, func(files map[*fileInfo]struct{}) bool {
for file := range files {
if _, found := inprogress[file]; found {
return true
}
}
return false
})
if err != nil {
return err
}
// workers
downloadCh := make(chan processingInfo)
feedbackCh := make(chan processingInfo)
defer close(downloadCh)
defer close(feedbackCh)
worker := func() {
for {
select {
case <-ctx.Done():
return
case request, ok := <-downloadCh:
if !ok {
return // channel closed
}
rd, err := r.downloadPack(ctx, request.pack)
if err == nil {
r.processPack(ctx, request, rd)
} else {
// mark all files as failed
for file := range request.files {
request.files[file] = err
}
}
feedbackCh <- request
}
}
}
for i := 0; i < workerCount; i++ {
go worker()
}
processFeedback := func(pack *packInfo, ferrors map[*fileInfo]error) {
// update files blobIdx
// must do it here to avoid race among worker and processing feedback threads
var success []*fileInfo
var failure []*fileInfo
for file, ferr := range ferrors {
if ferr != nil {
onError(file.path, ferr)
r.filesWriter.close(file)
delete(inprogress, file)
failure = append(failure, file)
} else {
r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
file.blobs = file.blobs[len(packBlobs):]
return false // only interesed in the first pack
})
if len(file.blobs) == 0 {
r.filesWriter.close(file)
delete(inprogress, file)
}
success = append(success, file)
}
}
// update the queue and requeueu the pack as necessary
if !queue.requeuePack(pack, success, failure) {
r.packCache.remove(pack.id)
debug.Log("Purged used up pack %s from pack cache", pack.id.Str())
}
}
// the main restore loop
for !queue.isEmpty() {
debug.Log("-----------------------------------")
pack, files := queue.nextPack()
if pack != nil {
ferrors := make(map[*fileInfo]error)
for _, file := range files {
ferrors[file] = nil
inprogress[file] = struct{}{}
}
select {
case <-ctx.Done():
return ctx.Err()
case downloadCh <- processingInfo{pack: pack, files: ferrors}:
debug.Log("Scheduled download pack %s (%d files)", pack.id.Str(), len(files))
case feedback := <-feedbackCh:
queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{}) // didn't use the pack during this iteration
processFeedback(feedback.pack, feedback.files)
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case feedback := <-feedbackCh:
processFeedback(feedback.pack, feedback.files)
}
}
}
return nil
}
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) (readerAtCloser, error) {
const MaxInt64 = 1<<63 - 1 // odd Go does not have this predefined somewhere
// calculate pack byte range
start, end := int64(MaxInt64), int64(0)
for file := range pack.files {
r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
if packID.Equal(pack.id) {
for _, blob := range packBlobs {
if start > int64(blob.Offset) {
start = int64(blob.Offset)
}
if end < int64(blob.Offset+blob.Length) {
end = int64(blob.Offset + blob.Length)
}
}
}
return true // keep going
})
}
packReader, err := r.packCache.get(pack.id, start, int(end-start), func(offset int64, length int, wr io.WriteSeeker) error {
h := restic.Handle{Type: restic.DataFile, Name: pack.id.String()}
return r.packLoader(ctx, h, length, offset, func(rd io.Reader) error {
// reset the file in case of a download retry
_, err := wr.Seek(0, io.SeekStart)
if err != nil {
return err
}
len, err := io.Copy(wr, rd)
if err != nil {
return err
}
if len != int64(length) {
return errors.Errorf("unexpected pack size: expected %d but got %d", length, len)
}
return nil
})
})
if err != nil {
return nil, err
}
return packReader, nil
}
func (r *fileRestorer) processPack(ctx context.Context, request processingInfo, rd readerAtCloser) {
defer rd.Close()
for file := range request.files {
r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
for _, blob := range packBlobs {
debug.Log("Writing blob %s (%d bytes) from pack %s to %s", blob.ID.Str(), blob.Length, packID.Str(), file.path)
buf, err := r.loadBlob(rd, blob)
if err == nil {
err = r.filesWriter.writeToFile(file, buf)
}
if err != nil {
request.files[file] = err
break // could not restore the file
}
}
return false
})
}
}
func (r *fileRestorer) loadBlob(rd io.ReaderAt, blob restic.Blob) ([]byte, error) {
// TODO reconcile with Repository#loadBlob implementation
buf := make([]byte, blob.Length)
n, err := rd.ReadAt(buf, int64(blob.Offset))
if err != nil {
return nil, err
}
if n != int(blob.Length) {
return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blob.ID.Str(), blob.Length, n)
}
// decrypt
nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():]
plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
return nil, errors.Errorf("decrypting blob %v failed: %v", blob.ID, err)
}
// check hash
if !restic.Hash(plaintext).Equal(blob.ID) {
return nil, errors.Errorf("blob %v returned invalid hash", blob.ID)
}
return plaintext, nil
}

View file

@ -0,0 +1,226 @@
package restorer
import (
"bytes"
"context"
"io"
"io/ioutil"
"testing"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
///////////////////////////////////////////////////////////////////////////////
// test helpers (TODO move to a dedicated file?)
///////////////////////////////////////////////////////////////////////////////
type _Blob struct {
data string
pack string
}
type _File struct {
name string
blobs []_Blob
}
type _TestData struct {
key *crypto.Key
// pack names and ids
packsNameToID map[string]restic.ID
packsIDToName map[restic.ID]string
packsIDToData map[restic.ID][]byte
// blobs and files
blobs map[restic.ID][]restic.PackedBlob
files []*fileInfo
filesPathToContent map[string]string
//
loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
//
idx filePackTraverser
}
func (i *_TestData) Lookup(blobID restic.ID, _ restic.BlobType) ([]restic.PackedBlob, bool) {
packs, found := i.blobs[blobID]
return packs, found
}
func (i *_TestData) packName(pack *packInfo) string {
return i.packsIDToName[pack.id]
}
func (i *_TestData) packID(name string) restic.ID {
return i.packsNameToID[name]
}
func (i *_TestData) pack(queue *packQueue, name string) *packInfo {
id := i.packsNameToID[name]
return queue.packs[id]
}
func (i *_TestData) fileContent(file *fileInfo) string {
return i.filesPathToContent[file.path]
}
func _newTestData(_files []_File) *_TestData {
type _Pack struct {
name string
data []byte
blobs map[restic.ID]restic.Blob
}
_packs := make(map[string]_Pack)
key := crypto.NewRandomKey()
seal := func(data []byte) []byte {
ciphertext := restic.NewBlobBuffer(len(data))
ciphertext = ciphertext[:0] // TODO what does this actually do?
nonce := crypto.NewRandomNonce()
ciphertext = append(ciphertext, nonce...)
return key.Seal(ciphertext, nonce, data, nil)
}
filesPathToContent := make(map[string]string)
for _, _file := range _files {
var content string
for _, _blob := range _file.blobs {
content += _blob.data
// get the pack, create as necessary
var _pack _Pack
var found bool // TODO is there more concise way of doing this in go?
if _pack, found = _packs[_blob.pack]; !found {
_pack = _Pack{name: _blob.pack, blobs: make(map[restic.ID]restic.Blob)}
}
// calculate blob id and add to the pack as necessary
_blobID := restic.Hash([]byte(_blob.data))
if _, found := _pack.blobs[_blobID]; !found {
_blobData := seal([]byte(_blob.data))
_pack.blobs[_blobID] = restic.Blob{
Type: restic.DataBlob,
ID: _blobID,
Length: uint(len(_blobData)), // XXX is Length encrypted or plaintext?
Offset: uint(len(_pack.data)),
}
_pack.data = append(_pack.data, _blobData...)
}
_packs[_blob.pack] = _pack
}
filesPathToContent[_file.name] = content
}
blobs := make(map[restic.ID][]restic.PackedBlob)
packsIDToName := make(map[restic.ID]string)
packsIDToData := make(map[restic.ID][]byte)
packsNameToID := make(map[string]restic.ID)
for _, _pack := range _packs {
_packID := restic.Hash(_pack.data)
packsIDToName[_packID] = _pack.name
packsIDToData[_packID] = _pack.data
packsNameToID[_pack.name] = _packID
for blobID, blob := range _pack.blobs {
blobs[blobID] = append(blobs[blobID], restic.PackedBlob{Blob: blob, PackID: _packID})
}
}
var files []*fileInfo
for _, _file := range _files {
content := restic.IDs{}
for _, _blob := range _file.blobs {
content = append(content, restic.Hash([]byte(_blob.data)))
}
files = append(files, &fileInfo{path: _file.name, blobs: content})
}
_data := &_TestData{
key: key,
packsIDToName: packsIDToName,
packsIDToData: packsIDToData,
packsNameToID: packsNameToID,
blobs: blobs,
files: files,
filesPathToContent: filesPathToContent,
}
_data.idx = filePackTraverser{lookup: _data.Lookup}
_data.loader = func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
packID, err := restic.ParseID(h.Name)
if err != nil {
return err
}
rd := bytes.NewReader(_data.packsIDToData[packID][int(offset) : int(offset)+length])
return fn(rd)
}
return _data
}
func restoreAndVerify(t *testing.T, _files []_File) {
test := _newTestData(_files)
r := newFileRestorer(test.loader, test.key, test.idx)
r.files = test.files
r.restoreFiles(context.TODO(), func(path string, err error) {
rtest.OK(t, errors.Wrapf(err, "unexpected error"))
})
for _, file := range test.files {
data, err := ioutil.ReadFile(file.path)
if err != nil {
t.Errorf("unable to read file %v: %v", file.path, err)
continue
}
rtest.Equals(t, false, r.filesWriter.writers.Contains(file.path))
content := test.fileContent(file)
if !bytes.Equal(data, []byte(content)) {
t.Errorf("file %v has wrong content: want %q, got %q", file.path, content, data)
}
}
rtest.OK(t, nil)
}
func TestFileRestorer_basic(t *testing.T) {
tempdir, cleanup := rtest.TempDir(t)
defer cleanup()
restoreAndVerify(t, []_File{
_File{
name: tempdir + "/file1",
blobs: []_Blob{
_Blob{"data1-1", "pack1-1"},
_Blob{"data1-2", "pack1-2"},
},
},
_File{
name: tempdir + "/file2",
blobs: []_Blob{
_Blob{"data2-1", "pack2-1"},
_Blob{"data2-2", "pack2-2"},
},
},
})
}
func TestFileRestorer_emptyFile(t *testing.T) {
tempdir, cleanup := rtest.TempDir(t)
defer cleanup()
restoreAndVerify(t, []_File{
_File{
name: tempdir + "/empty",
blobs: []_Blob{},
},
})
}

View file

@ -0,0 +1,69 @@
package restorer
import (
"io"
"os"
"sync"
"github.com/hashicorp/golang-lru/simplelru"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
)
type filesWriter struct {
lock sync.Mutex // guards concurrent access
inprogress map[*fileInfo]struct{} // (logically) opened file writers
writers simplelru.LRUCache // key: *fileInfo, value: *os.File
}
func newFilesWriter(count int) *filesWriter {
writers, _ := simplelru.NewLRU(count, func(key interface{}, value interface{}) {
value.(*os.File).Close()
debug.Log("Closed and purged cached writer for %v", key)
})
return &filesWriter{inprogress: make(map[*fileInfo]struct{}), writers: writers}
}
func (w *filesWriter) writeToFile(file *fileInfo, buf []byte) error {
acquireWriter := func() (io.Writer, error) {
w.lock.Lock()
defer w.lock.Unlock()
if wr, ok := w.writers.Get(file); ok {
debug.Log("Used cached writer for %s", file.path)
return wr.(*os.File), nil
}
var flags int
if _, append := w.inprogress[file]; append {
flags = os.O_APPEND | os.O_WRONLY
} else {
w.inprogress[file] = struct{}{}
flags = os.O_CREATE | os.O_TRUNC | os.O_WRONLY
}
wr, err := os.OpenFile(file.path, flags, 0600)
if err != nil {
return nil, err
}
w.writers.Add(file, wr)
debug.Log("Opened and cached writer for %s", file.path)
return wr, nil
}
wr, err := acquireWriter()
if err != nil {
return err
}
n, err := wr.Write(buf)
if err != nil {
return err
}
if n != len(buf) {
return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", file.path, len(buf), n)
}
return nil
}
func (w *filesWriter) close(file *fileInfo) {
w.lock.Lock()
defer w.lock.Unlock()
w.writers.Remove(file)
}

View file

@ -0,0 +1,238 @@
package restorer
import (
"io"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
// packCache is thread safe in-memory cache of pack files required to restore
// one or more files. The cache is meant to hold pack files that cannot be
// fully used right away. This happens when pack files contains blobs from
// "head" of some files and "middle" of other files. "Middle" blobs cannot be
// written to their files until after blobs from some other packs are written
// to the files first.
//
// While the cache is thread safe, implementation assumes (and enforces)
// that individual entries are used by one client at a time. Clients must
// #Close() entry's reader to make the entry available for use by other
// clients. This limitation can be relaxed in the future if necessary.
type packCache struct {
// guards access to cache internal data structures
lock sync.Mutex
// cache capacity
capacity int
reservedCapacity int
allocatedCapacity int
// pack records currently being used by active restore worker
reservedPacks map[restic.ID]*packCacheRecord
// unused allocated packs, can be deleted if necessary
cachedPacks map[restic.ID]*packCacheRecord
}
type packCacheRecord struct {
master *packCacheRecord
cache *packCache
id restic.ID // cached pack id
offset int64 // cached pack byte range
data []byte
}
type readerAtCloser interface {
io.Closer
io.ReaderAt
}
type bytesWriteSeeker struct {
pos int
data []byte
}
func (wr *bytesWriteSeeker) Write(p []byte) (n int, err error) {
if wr.pos+len(p) > len(wr.data) {
return -1, errors.Errorf("not enough space")
}
n = copy(wr.data[wr.pos:], p)
wr.pos += n
return n, nil
}
func (wr *bytesWriteSeeker) Seek(offset int64, whence int) (int64, error) {
if offset != 0 || whence != io.SeekStart {
return -1, errors.Errorf("unsupported seek request")
}
wr.pos = 0
return 0, nil
}
func newPackCache(capacity int) *packCache {
return &packCache{
capacity: capacity,
reservedPacks: make(map[restic.ID]*packCacheRecord),
cachedPacks: make(map[restic.ID]*packCacheRecord),
}
}
func (c *packCache) reserve(packID restic.ID, offset int64, length int) (record *packCacheRecord, err error) {
c.lock.Lock()
defer c.lock.Unlock()
if offset < 0 || length <= 0 {
return nil, errors.Errorf("illegal pack cache allocation range %s {offset: %d, length: %d}", packID.Str(), offset, length)
}
if c.reservedCapacity+length > c.capacity {
return nil, errors.Errorf("not enough cache capacity: requested %d, available %d", length, c.capacity-c.reservedCapacity)
}
if _, ok := c.reservedPacks[packID]; ok {
return nil, errors.Errorf("pack is already reserved %s", packID.Str())
}
// the pack is available in the cache and currently unused
if pack, ok := c.cachedPacks[packID]; ok {
// check if cached pack includes requested byte range
// the range can shrink, but it never grows bigger unless there is a bug elsewhere
if pack.offset > offset || (pack.offset+int64(len(pack.data))) < (offset+int64(length)) {
return nil, errors.Errorf("cached range %d-%d is smaller than requested range %d-%d for pack %s", pack.offset, pack.offset+int64(len(pack.data)), length, offset+int64(length), packID.Str())
}
// move the pack to the used map
delete(c.cachedPacks, packID)
c.reservedPacks[packID] = pack
c.reservedCapacity += len(pack.data)
debug.Log("Using cached pack %s (%d bytes)", pack.id.Str(), len(pack.data))
if pack.offset != offset || len(pack.data) != length {
// restrict returned record to requested range
return &packCacheRecord{
cache: c,
master: pack,
offset: offset,
data: pack.data[int(offset-pack.offset) : int(offset-pack.offset)+length],
}, nil
}
return pack, nil
}
for c.allocatedCapacity+length > c.capacity {
// all cached packs will be needed at some point
// so it does not matter which one to purge
for _, cached := range c.cachedPacks {
delete(c.cachedPacks, cached.id)
c.allocatedCapacity -= len(cached.data)
debug.Log("dropped cached pack %s (%d bytes)", cached.id.Str(), len(cached.data))
break
}
}
pack := &packCacheRecord{
cache: c,
id: packID,
offset: offset,
}
c.reservedPacks[pack.id] = pack
c.allocatedCapacity += length
c.reservedCapacity += length
return pack, nil
}
// get returns reader of the specified cached pack. Uses provided load func
// to download pack content if necessary.
// The returned reader is only able to read pack within byte range specified
// by offset and length parameters, attempts to read outside that range will
// result in an error.
// The returned reader must be closed before the same packID can be requested
// from the cache again.
func (c *packCache) get(packID restic.ID, offset int64, length int, load func(offset int64, length int, wr io.WriteSeeker) error) (readerAtCloser, error) {
pack, err := c.reserve(packID, offset, length)
if err != nil {
return nil, err
}
if pack.data == nil {
wr := &bytesWriteSeeker{data: make([]byte, length)}
err = load(offset, length, wr)
if err != nil {
return nil, err
}
if wr.pos != length {
delete(c.reservedPacks, pack.id)
c.reservedCapacity -= len(pack.data)
return nil, errors.Errorf("invalid read size")
}
pack.data = wr.data
debug.Log("Downloaded and cached pack %s (%d bytes)", pack.id.Str(), len(pack.data))
}
return pack, nil
}
// releases the pack record back to the cache
func (c *packCache) release(pack *packCacheRecord) error {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.reservedPacks[pack.id]; !ok {
return errors.Errorf("invalid pack release request")
}
delete(c.reservedPacks, pack.id)
c.cachedPacks[pack.id] = pack
c.reservedCapacity -= len(pack.data)
return nil
}
// remove removes specified pack from the cache and frees
// corresponding cache space. should be called after the pack
// was fully used up by the restorer.
func (c *packCache) remove(packID restic.ID) error {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.reservedPacks[packID]; ok {
return errors.Errorf("invalid pack remove request, pack %s is reserved", packID.Str())
}
pack, ok := c.cachedPacks[packID]
if !ok {
return errors.Errorf("invalid pack remove request, pack %s is not cached", packID.Str())
}
delete(c.cachedPacks, pack.id)
c.allocatedCapacity -= len(pack.data)
return nil
}
// ReadAt reads len(b) bytes from the pack starting at byte offset off.
// It returns the number of bytes read and the error, if any.
func (r *packCacheRecord) ReadAt(b []byte, off int64) (n int, err error) {
if off < r.offset || off+int64(len(b)) > r.offset+int64(len(r.data)) {
return -1, errors.Errorf("read outside available range")
}
return copy(b, r.data[off-r.offset:]), nil
}
// Close closes the pack reader and releases corresponding cache record
// to the cache. Once closed, the record can be reused by subsequent
// requests for the same packID or it can be purged from the cache to make
// room for other packs
func (r *packCacheRecord) Close() (err error) {
if r.master != nil {
return r.cache.release(r.master)
}
return r.cache.release(r)
}

View file

@ -0,0 +1,290 @@
package restorer
import (
"io"
"testing"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func assertNotOK(t *testing.T, msg string, err error) {
rtest.Assert(t, err != nil, msg+" did not fail")
}
func TestBytesWriterSeeker(t *testing.T) {
wr := &bytesWriteSeeker{data: make([]byte, 10)}
n, err := wr.Write([]byte{1, 2})
rtest.OK(t, err)
rtest.Equals(t, 2, n)
rtest.Equals(t, []byte{1, 2}, wr.data[0:2])
n64, err := wr.Seek(0, io.SeekStart)
rtest.OK(t, err)
rtest.Equals(t, int64(0), n64)
n, err = wr.Write([]byte{0, 1, 2, 3, 4})
rtest.OK(t, err)
rtest.Equals(t, 5, n)
n, err = wr.Write([]byte{5, 6, 7, 8, 9})
rtest.OK(t, err)
rtest.Equals(t, 5, n)
rtest.Equals(t, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, wr.data)
// negative tests
_, err = wr.Write([]byte{1})
assertNotOK(t, "write overflow", err)
_, err = wr.Seek(1, io.SeekStart)
assertNotOK(t, "unsupported seek", err)
}
func TestPackCache_basic(t *testing.T) {
assertReader := func(expected []byte, offset int64, rd io.ReaderAt) {
actual := make([]byte, len(expected))
rd.ReadAt(actual, offset)
rtest.Equals(t, expected, actual)
}
c := newPackCache(10)
id := restic.NewRandomID()
// load pack to the cache
rd, err := c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
rtest.Equals(t, int64(10), offset)
rtest.Equals(t, 5, length)
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
assertReader([]byte{1, 2, 3, 4, 5}, 10, rd)
// must close pack reader before can request it again
_, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "double-reservation", err)
// close the pack reader and get it from cache
rd.Close()
rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
rtest.OK(t, err)
assertReader([]byte{1, 2, 3, 4, 5}, 10, rd)
// close the pack reader and remove the pack from cache, assert the pack is loaded on request
rd.Close()
c.remove(id)
rd, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
rtest.Equals(t, int64(10), offset)
rtest.Equals(t, 5, length)
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
assertReader([]byte{1, 2, 3, 4, 5}, 10, rd)
}
func TestPackCache_invalid_range(t *testing.T) {
c := newPackCache(10)
id := restic.NewRandomID()
_, err := c.get(id, -1, 1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "negative offset request", err)
_, err = c.get(id, 0, 0, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "zero length request", err)
_, err = c.get(id, 0, -1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "negative length", err)
}
func TestPackCache_capacity(t *testing.T) {
c := newPackCache(10)
id1, id2, id3 := restic.NewRandomID(), restic.NewRandomID(), restic.NewRandomID()
// load and reserve pack1
rd1, err := c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
// load and reserve pack2
_, err = c.get(id2, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
// can't load pack3 because not enough space in the cache
_, err = c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected cache load call")
return nil
})
assertNotOK(t, "request over capacity", err)
// release pack1 and try again
rd1.Close()
rd3, err := c.get(id3, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
// release pack3 and load pack1 (should not come from cache)
rd3.Close()
loaded := false
rd1, err = c.get(id1, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
loaded = true
return nil
})
rtest.OK(t, err)
rtest.Equals(t, true, loaded)
}
func TestPackCache_downsize_record(t *testing.T) {
c := newPackCache(10)
id := restic.NewRandomID()
// get bigger range first
rd, err := c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5})
return nil
})
rtest.OK(t, err)
rd.Close()
// invalid "resize" requests
_, err = c.get(id, 5, 10, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "resize cached record", err)
// invalid before cached range request
_, err = c.get(id, 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "before cached range request", err)
// invalid after cached range request
_, err = c.get(id, 10, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "after cached range request", err)
// now get smaller "nested" range
rd, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
rtest.OK(t, err)
// assert expected data
buf := make([]byte, 1)
rd.ReadAt(buf, 7)
rtest.Equals(t, byte(3), buf[0])
_, err = rd.ReadAt(buf, 0)
assertNotOK(t, "read before downsized pack range", err)
_, err = rd.ReadAt(buf, 9)
assertNotOK(t, "read after downsized pack range", err)
// can't request downsized record again
_, err = c.get(id, 7, 1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "double-allocation of cache record subrange", err)
// can't request another subrange of the original record
_, err = c.get(id, 6, 1, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
assertNotOK(t, "allocation of another subrange of cache record", err)
// release downsized record and assert the original is back in the cache
rd.Close()
rd, err = c.get(id, 5, 5, func(offset int64, length int, wr io.WriteSeeker) error {
t.Error("unexpected pack load")
return nil
})
rtest.OK(t, err)
rd.Close()
}
func TestPackCache_wrong_load_size(t *testing.T) {
c := newPackCache(10)
_, err := c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1})
return nil
})
assertNotOK(t, "not enough bytes read", err)
_, err = c.get(restic.NewRandomID(), 0, 5, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1, 2, 3, 4, 5, 6})
return nil
})
assertNotOK(t, "too many bytes read", err)
}
func TestPackCache_invalidRequests(t *testing.T) {
c := newPackCache(10)
id := restic.NewRandomID()
//
rd, _ := c.get(id, 0, 1, func(offset int64, length int, wr io.WriteSeeker) error {
wr.Write([]byte{1})
return nil
})
assertNotOK(t, "remove() reserved pack", c.remove(id))
rtest.OK(t, rd.Close())
assertNotOK(t, "multiple reader Close() calls)", rd.Close())
//
rtest.OK(t, c.remove(id))
assertNotOK(t, "double remove() the same pack", c.remove(id))
}
func TestPackCacheRecord(t *testing.T) {
rd := &packCacheRecord{
offset: 10,
data: []byte{1},
}
buf := make([]byte, 1)
n, err := rd.ReadAt(buf, 10)
rtest.OK(t, err)
rtest.Equals(t, 1, n)
rtest.Equals(t, byte(1), buf[0])
_, err = rd.ReadAt(buf, 0)
assertNotOK(t, "read before loaded range", err)
_, err = rd.ReadAt(buf, 11)
assertNotOK(t, "read after loaded range", err)
_, err = rd.ReadAt(make([]byte, 2), 10)
assertNotOK(t, "read more than available data", err)
}

View file

@ -0,0 +1,51 @@
package restorer
// packHeap is a heap of packInfo references
// @see https://golang.org/pkg/container/heap/
// @see https://en.wikipedia.org/wiki/Heap_(data_structure)
type packHeap struct {
elements []*packInfo
// returns true if download of any of the files is in progress
inprogress func(files map[*fileInfo]struct{}) bool
}
func (pq *packHeap) Len() int { return len(pq.elements) }
func (pq *packHeap) Less(a, b int) bool {
packA, packB := pq.elements[a], pq.elements[b]
ap := pq.inprogress(packA.files)
bp := pq.inprogress(packB.files)
if ap && !bp {
return true
}
if packA.cost < packB.cost {
return true
}
return false
}
func (pq *packHeap) Swap(i, j int) {
pq.elements[i], pq.elements[j] = pq.elements[j], pq.elements[i]
pq.elements[i].index = i
pq.elements[j].index = j
}
func (pq *packHeap) Push(x interface{}) {
n := len(pq.elements)
item := x.(*packInfo)
item.index = n
pq.elements = append(pq.elements, item)
}
func (pq *packHeap) Pop() interface{} {
old := pq.elements
n := len(old)
item := old[n-1]
item.index = -1 // for safety
pq.elements = old[0 : n-1]
return item
}

View file

@ -0,0 +1,224 @@
package restorer
import (
"container/heap"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
)
// packQueue tracks remaining file contents restore work and decides what pack
// to download and files to write next.
//
// The packs in the queue can be in one of three states: waiting, ready and
// in-progress.
// Waiting packs are the packs that only have blobs from the "middle" of their
// corresponding files and therefore cannot be used until blobs from some other
// packs are written to the files first.
// In-progress packs are the packs that were removed from the queue by #nextPack
// and must be first returned to the queue before they are considered again.
// Ready packs are the packs can be immediately used to restore at least one
// file. Internally ready packs are kept in a heap and are ordered according
// to these criteria:
// - Packs with "head" blobs of in-progress files are considered first. The
// idea is to complete restore of in-progress files before starting restore
// of other files. This is both more intuitive and also reduces number of
// open file handles needed during restore.
// - Packs with smallest cost are considered next. Pack cost is measured in
// number of other packs required before all blobs in the pack can be used
// and the pack can be removed from the pack cache.
// For example, consisder a file that requires two blobs, blob1 from pack1
// and blob2 from pack2. The cost of pack2 is 1, because blob2 cannot be
// used before blob1 is available. The higher the cost, the longer the pack
// must be cached locally to avoid redownload.
//
// Pack queue implementation is NOT thread safe. All pack queue methods must
// be called from single gorouting AND packInfo and fileInfo instances must
// be updated synchronously from the same gorouting.
type packQueue struct {
idx filePackTraverser
packs map[restic.ID]*packInfo // waiting and ready packs
inprogress map[*packInfo]struct{} // inprogress packs
heap *packHeap // heap of ready packs
}
func newPackQueue(idx filePackTraverser, files []*fileInfo, inprogress func(files map[*fileInfo]struct{}) bool) (*packQueue, error) {
packs := make(map[restic.ID]*packInfo) // all packs
// create packInfo from fileInfo
for _, file := range files {
err := idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool {
pack, ok := packs[packID]
if !ok {
pack = &packInfo{
id: packID,
index: -1,
files: make(map[*fileInfo]struct{}),
}
packs[packID] = pack
}
pack.files[file] = struct{}{}
pack.cost += packIdx
return true // keep going
})
if err != nil {
// repository index is messed up, can't do anything
return nil, err
}
}
// create packInfo heap
pheap := &packHeap{inprogress: inprogress}
headPacks := restic.NewIDSet()
for _, file := range files {
idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool {
if !headPacks.Has(packID) {
headPacks.Insert(packID)
pack := packs[packID]
pack.index = len(pheap.elements)
pheap.elements = append(pheap.elements, pack)
}
return false // only first pack
})
}
heap.Init(pheap)
return &packQueue{idx: idx, packs: packs, heap: pheap, inprogress: make(map[*packInfo]struct{})}, nil
}
// isEmpty returns true if the queue is empty, i.e. there are no more packs to
// download and files to write to.
func (h *packQueue) isEmpty() bool {
return len(h.packs) == 0 && len(h.inprogress) == 0
}
// nextPack returns next ready pack and corresponding files ready for download
// and processing. The returned pack and the files are marked as "in progress"
// internally and must be first returned to the queue before they are
// considered by #nextPack again.
func (h *packQueue) nextPack() (*packInfo, []*fileInfo) {
debug.Log("Ready packs %d, outstanding packs %d, inprogress packs %d", h.heap.Len(), len(h.packs), len(h.inprogress))
if h.heap.Len() == 0 {
return nil, nil
}
pack := heap.Pop(h.heap).(*packInfo)
h.inprogress[pack] = struct{}{}
debug.Log("Popped pack %s (%d files), heap size=%d", pack.id.Str(), len(pack.files), len(h.heap.elements))
var files []*fileInfo
for file := range pack.files {
h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
debug.Log("Pack #%d %s (%d blobs) used by %s", packIdx, packID.Str(), len(packBlobs), file.path)
if pack.id == packID {
files = append(files, file)
}
return false // only interested in the fist pack here
})
}
return pack, files
}
// requeuePack conditionally adds back to the queue pack previously returned by
// #nextPack.
// If the pack is needed to restore any incomplete files, adds the pack to the
// queue and adjusts order of all affected packs in the queue. Has no effect
// if the pack is not required to restore any files.
// Returns true if the pack was added to the queue, false otherwise.
func (h *packQueue) requeuePack(pack *packInfo, success []*fileInfo, failure []*fileInfo) bool {
debug.Log("Requeue pack %s (%d/%d/%d files/success/failure)", pack.id.Str(), len(pack.files), len(success), len(failure))
// maintain inprogress pack set
delete(h.inprogress, pack)
affectedPacks := make(map[*packInfo]struct{})
affectedPacks[pack] = struct{}{} // this pack is alwats affected
// apply download success/failure to the packs
onFailure := func(file *fileInfo) {
h.idx.forEachFilePack(file, func(packInx int, packID restic.ID, _ []restic.Blob) bool {
pack := h.packs[packID]
delete(pack.files, file)
pack.cost -= packInx
affectedPacks[pack] = struct{}{}
return true // keep going
})
}
for _, file := range failure {
onFailure(file)
}
onSuccess := func(pack *packInfo, file *fileInfo) {
remove := true
h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool {
if packID.Equal(pack.id) {
// the pack has more blobs required by the file
remove = false
}
otherPack := h.packs[packID]
otherPack.cost--
affectedPacks[otherPack] = struct{}{}
return true // keep going
})
if remove {
delete(pack.files, file)
}
}
for _, file := range success {
onSuccess(pack, file)
}
// drop/update affected packs
isReady := func(affectedPack *packInfo) (ready bool) {
for file := range affectedPack.files {
h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, _ []restic.Blob) bool {
if packID.Equal(affectedPack.id) {
ready = true
}
return false // only file's first pack matters
})
if ready {
break
}
}
return ready
}
for affectedPack := range affectedPacks {
if _, inprogress := h.inprogress[affectedPack]; !inprogress {
if len(affectedPack.files) == 0 {
// drop the pack if it isn't inprogress and has no files that need it
if affectedPack.index >= 0 {
// This can't happen unless there is a bug elsewhere:
// - "current" pack isn't in the heap, hence its index must be < 0
// - "other" packs can't be ready (i.e. in heap) unless they have other files
// in which case len(affectedPack.files) must be > 0
debug.Log("corrupted ready heap: removed unexpected ready pack %s", affectedPack.id.Str())
heap.Remove(h.heap, affectedPack.index)
}
delete(h.packs, affectedPack.id)
} else {
ready := isReady(affectedPack)
switch {
case ready && affectedPack.index < 0:
heap.Push(h.heap, affectedPack)
case ready && affectedPack.index >= 0:
heap.Fix(h.heap, affectedPack.index)
case !ready && affectedPack.index >= 0:
// This can't happen unless there is a bug elsewhere:
// - "current" pack isn't in the heap, hence its index must be < 0
// - "other" packs can't have same head blobs as the "current" pack,
// hence "other" packs can't change their readiness
debug.Log("corrupted ready heap: removed unexpected waiting pack %s", affectedPack.id.Str())
heap.Remove(h.heap, affectedPack.index)
case !ready && affectedPack.index < 0:
// do nothing
}
}
}
}
return len(pack.files) > 0
}

View file

@ -0,0 +1,236 @@
package restorer
import (
"testing"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
func processPack(t *testing.T, data *_TestData, pack *packInfo, files []*fileInfo) {
for _, file := range files {
data.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
// assert file's head pack
rtest.Equals(t, pack.id, packID)
file.blobs = file.blobs[len(packBlobs):]
return false // only interested in the head pack
})
}
}
func TestPackQueue_basic(t *testing.T) {
data := _newTestData([]_File{
_File{
name: "file",
blobs: []_Blob{
_Blob{"data1", "pack1"},
_Blob{"data2", "pack2"},
},
},
})
queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false })
rtest.OK(t, err)
// assert initial queue state
rtest.Equals(t, false, queue.isEmpty())
rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost)
rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost)
// get first pack
pack, files := queue.nextPack()
rtest.Equals(t, "pack1", data.packName(pack))
rtest.Equals(t, 1, len(files))
rtest.Equals(t, false, queue.isEmpty())
// TODO assert pack is inprogress
// can't process the second pack until the first one is processed
{
pack, files := queue.nextPack()
rtest.Equals(t, true, pack == nil)
rtest.Equals(t, true, files == nil)
rtest.Equals(t, false, queue.isEmpty())
}
// requeue the pack without processing
rtest.Equals(t, true, queue.requeuePack(pack, []*fileInfo{}, []*fileInfo{}))
rtest.Equals(t, false, queue.isEmpty())
rtest.Equals(t, 0, queue.packs[data.packID("pack1")].cost)
rtest.Equals(t, 1, queue.packs[data.packID("pack2")].cost)
// get the first pack again
pack, files = queue.nextPack()
rtest.Equals(t, "pack1", data.packName(pack))
rtest.Equals(t, 1, len(files))
rtest.Equals(t, false, queue.isEmpty())
// process the first pack and return it to the queue
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
rtest.Equals(t, 0, queue.packs[data.packID("pack2")].cost)
// get the second pack
pack, files = queue.nextPack()
rtest.Equals(t, "pack2", data.packName(pack))
rtest.Equals(t, 1, len(files))
rtest.Equals(t, false, queue.isEmpty())
// process the second pack and return it to the queue
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
// all packs processed
rtest.Equals(t, true, queue.isEmpty())
}
func TestPackQueue_failedFile(t *testing.T) {
// point of this test is to assert that enqueuePack removes
// all references to files that failed restore
data := _newTestData([]_File{
_File{
name: "file",
blobs: []_Blob{
_Blob{"data1", "pack1"},
_Blob{"data2", "pack2"},
},
},
})
queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false })
rtest.OK(t, err)
pack, files := queue.nextPack()
rtest.Equals(t, false, queue.requeuePack(pack, []*fileInfo{}, files /*failed*/))
rtest.Equals(t, true, queue.isEmpty())
}
func TestPackQueue_ordering_cost(t *testing.T) {
// assert pack1 is selected before pack2:
// pack1 is ready to restore file1, pack2 is ready to restore file2
// but pack2 cannot be immediately used to restore file1
data := _newTestData([]_File{
_File{
name: "file1",
blobs: []_Blob{
_Blob{"data1", "pack1"},
_Blob{"data2", "pack2"},
},
},
_File{
name: "file2",
blobs: []_Blob{
_Blob{"data2", "pack2"},
},
},
})
queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false })
rtest.OK(t, err)
// assert initial pack costs
rtest.Equals(t, 0, data.pack(queue, "pack1").cost)
rtest.Equals(t, 0, data.pack(queue, "pack1").index) // head of the heap
rtest.Equals(t, 1, data.pack(queue, "pack2").cost)
rtest.Equals(t, 1, data.pack(queue, "pack2").index)
pack, files := queue.nextPack()
// assert selected pack and queue state
rtest.Equals(t, "pack1", data.packName(pack))
// process the pack
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
}
func TestPackQueue_ordering_inprogress(t *testing.T) {
// finish restoring one file before starting another
data := _newTestData([]_File{
_File{
name: "file1",
blobs: []_Blob{
_Blob{"data1-1", "pack1-1"},
_Blob{"data1-2", "pack1-2"},
},
},
_File{
name: "file2",
blobs: []_Blob{
_Blob{"data2-1", "pack2-1"},
_Blob{"data2-2", "pack2-2"},
},
},
})
var inprogress *fileInfo
queue, err := newPackQueue(data.idx, data.files, func(files map[*fileInfo]struct{}) bool {
_, found := files[inprogress]
return found
})
rtest.OK(t, err)
// first pack of a file
pack, files := queue.nextPack()
rtest.Equals(t, 1, len(files))
file := files[0]
processPack(t, data, pack, files)
inprogress = files[0]
queue.requeuePack(pack, files, []*fileInfo{})
// second pack of the same file
pack, files = queue.nextPack()
rtest.Equals(t, 1, len(files))
rtest.Equals(t, true, file == files[0]) // same file as before
processPack(t, data, pack, files)
inprogress = nil
queue.requeuePack(pack, files, []*fileInfo{})
// first pack of the second file
pack, files = queue.nextPack()
rtest.Equals(t, 1, len(files))
rtest.Equals(t, false, file == files[0]) // different file as before
}
func TestPackQueue_packMultiuse(t *testing.T) {
// the same pack is required multiple times to restore the same file
data := _newTestData([]_File{
_File{
name: "file",
blobs: []_Blob{
_Blob{"data1", "pack1"},
_Blob{"data2", "pack2"},
_Blob{"data3", "pack1"}, // pack1 reuse, new blob
_Blob{"data2", "pack2"}, // pack2 reuse, same blob
},
},
})
queue, err := newPackQueue(data.idx, data.files, func(_ map[*fileInfo]struct{}) bool { return false })
rtest.OK(t, err)
pack, files := queue.nextPack()
rtest.Equals(t, "pack1", data.packName(pack))
rtest.Equals(t, 1, len(pack.files))
processPack(t, data, pack, files)
rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{}))
pack, files = queue.nextPack()
rtest.Equals(t, "pack2", data.packName(pack))
rtest.Equals(t, 1, len(pack.files))
processPack(t, data, pack, files)
rtest.Equals(t, true, queue.requeuePack(pack, files, []*fileInfo{}))
pack, files = queue.nextPack()
rtest.Equals(t, "pack1", data.packName(pack))
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
pack, files = queue.nextPack()
rtest.Equals(t, "pack2", data.packName(pack))
processPack(t, data, pack, files)
rtest.Equals(t, false, queue.requeuePack(pack, files, []*fileInfo{}))
rtest.Equals(t, true, queue.isEmpty())
}

View file

@ -27,7 +27,8 @@ var restorerAbortOnAllErrors = func(str string, node *restic.Node, err error) er
// NewRestorer creates a restorer preloaded with the content from the snapshot id. // NewRestorer creates a restorer preloaded with the content from the snapshot id.
func NewRestorer(repo restic.Repository, id restic.ID) (*Restorer, error) { func NewRestorer(repo restic.Repository, id restic.ID) (*Restorer, error) {
r := &Restorer{ r := &Restorer{
repo: repo, Error: restorerAbortOnAllErrors, repo: repo,
Error: restorerAbortOnAllErrors,
SelectFilter: func(string, string, *restic.Node) (bool, bool) { return true, true }, SelectFilter: func(string, string, *restic.Node) (bool, bool) { return true, true },
} }
@ -164,7 +165,7 @@ func (res *Restorer) restoreNodeMetadataTo(node *restic.Node, target, location s
// RestoreTo creates the directories and files in the snapshot below dst. // RestoreTo creates the directories and files in the snapshot below dst.
// Before an item is created, res.Filter is called. // Before an item is created, res.Filter is called.
func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { func (res *Restorer) RestoreTo(ctx context.Context, dst string, singlethreaded bool) error {
var err error var err error
if !filepath.IsAbs(dst) { if !filepath.IsAbs(dst) {
dst, err = filepath.Abs(dst) dst, err = filepath.Abs(dst)
@ -173,35 +174,101 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
} }
} }
// make sure the target directory exists restoreNodeMetadata := func(node *restic.Node, target, location string) error {
err = fs.MkdirAll(dst, 0777) // umask takes care of dir permissions return res.restoreNodeMetadataTo(node, target, location)
if err != nil {
return errors.Wrap(err, "MkdirAll")
} }
noop := func(node *restic.Node, target, location string) error { return nil }
idx := restic.NewHardlinkIndex() idx := restic.NewHardlinkIndex()
return res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{ if singlethreaded {
return res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{
enterDir: func(node *restic.Node, target, location string) error {
// create dir with default permissions
// #leaveDir restores dir metadata after visiting all children
return fs.MkdirAll(target, 0700)
},
visitNode: func(node *restic.Node, target, location string) error {
// create parent dir with default permissions
// #leaveDir restores dir metadata after visiting all children
err := fs.MkdirAll(filepath.Dir(target), 0700)
if err != nil {
return err
}
return res.restoreNodeTo(ctx, node, target, location, idx)
},
// Restore directory permissions and timestamp at the end. If we did it earlier
// - children restore could fail because of restictive directory permission
// - children restore could overwrite the timestamp of the directory they are in
leaveDir: restoreNodeMetadata,
})
}
filerestorer := newFileRestorer(res.repo.Backend().Load, res.repo.Key(), filePackTraverser{lookup: res.repo.Index().Lookup})
// path->node map, only needed to call res.Error, which uses the node during tests
nodes := make(map[string]*restic.Node)
// first tree pass: create directories and collect all files to restore
err = res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{
enterDir: func(node *restic.Node, target, location string) error { enterDir: func(node *restic.Node, target, location string) error {
// create dir with default permissions // create dir with default permissions
// #leaveDir restores dir metadata after visiting all children // #leaveDir restores dir metadata after visiting all children
return fs.MkdirAll(target, 0700) return fs.MkdirAll(target, 0700)
}, },
visitNode: func(node *restic.Node, target, location string) error { visitNode: func(node *restic.Node, target, location string) error {
// create parent dir with default permissions // create parent dir with default permissions
// #leaveDir restores dir metadata after visiting all children // second pass #leaveDir restores dir metadata after visiting/restoring all children
err := fs.MkdirAll(filepath.Dir(target), 0700) err := fs.MkdirAll(filepath.Dir(target), 0700)
if err != nil { if err != nil {
return err return err
} }
return res.restoreNodeTo(ctx, node, target, location, idx) if node.Type != "file" {
return nil
}
if node.Links > 1 {
if idx.Has(node.Inode, node.DeviceID) {
return nil
}
idx.Add(node.Inode, node.DeviceID, target)
}
nodes[target] = node
filerestorer.addFile(target, node.Content)
return nil
}, },
leaveDir: func(node *restic.Node, target, location string) error { leaveDir: noop,
// Restore directory permissions and timestamp at the end. If we did it earlier })
// - children restore could fail because of restictive directory permission if err != nil {
// - children restore could overwrite the timestamp of the directory they are in return err
return res.restoreNodeMetadataTo(node, target, location) }
err = filerestorer.restoreFiles(ctx, func(path string, err error) { res.Error(path, nodes[path], err) })
if err != nil {
return err
}
// second tree pass: restore special files and filesystem metadata
return res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{
enterDir: noop,
visitNode: func(node *restic.Node, target, location string) error {
isHardlink := func() bool {
return idx.Has(node.Inode, node.DeviceID) && idx.GetFilename(node.Inode, node.DeviceID) != target
}
if node.Type != "file" || isHardlink() {
return res.restoreNodeTo(ctx, node, target, location, idx)
}
return node.RestoreMetadata(target)
}, },
leaveDir: restoreNodeMetadata,
}) })
} }

View file

@ -340,7 +340,7 @@ func TestRestorer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
err = res.RestoreTo(ctx, tempdir) err = res.RestoreTo(ctx, tempdir, false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -446,7 +446,7 @@ func TestRestorerRelative(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
err = res.RestoreTo(ctx, "restore") err = res.RestoreTo(ctx, "restore", false)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }