forked from TrueCloudLab/restic
restorer: convert to use StreamPack
This commit is contained in:
parent
f00f690658
commit
f40abd92fa
3 changed files with 40 additions and 111 deletions
|
@ -798,7 +798,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack
|
||||||
sort.Slice(blobs, func(i, j int) bool {
|
sort.Slice(blobs, func(i, j int) bool {
|
||||||
return blobs[i].Offset < blobs[j].Offset
|
return blobs[i].Offset < blobs[j].Offset
|
||||||
})
|
})
|
||||||
h := restic.Handle{Type: restic.PackFile, Name: packID.String()}
|
h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob}
|
||||||
|
|
||||||
dataStart := blobs[0].Offset
|
dataStart := blobs[0].Offset
|
||||||
dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length
|
dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length
|
||||||
|
|
|
@ -1,12 +1,9 @@
|
||||||
package restorer
|
package restorer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"context"
|
"context"
|
||||||
"io"
|
|
||||||
"math"
|
"math"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
@ -14,6 +11,7 @@ import (
|
||||||
"github.com/restic/restic/internal/crypto"
|
"github.com/restic/restic/internal/crypto"
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
|
"github.com/restic/restic/internal/repository"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -52,7 +50,7 @@ type packInfo struct {
|
||||||
type fileRestorer struct {
|
type fileRestorer struct {
|
||||||
key *crypto.Key
|
key *crypto.Key
|
||||||
idx func(restic.BlobHandle) []restic.PackedBlob
|
idx func(restic.BlobHandle) []restic.PackedBlob
|
||||||
packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
|
packLoader repository.BackendLoadFn
|
||||||
|
|
||||||
filesWriter *filesWriter
|
filesWriter *filesWriter
|
||||||
|
|
||||||
|
@ -62,7 +60,7 @@ type fileRestorer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFileRestorer(dst string,
|
func newFileRestorer(dst string,
|
||||||
packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error,
|
packLoader repository.BackendLoadFn,
|
||||||
key *crypto.Key,
|
key *crypto.Key,
|
||||||
idx func(restic.BlobHandle) []restic.PackedBlob) *fileRestorer {
|
idx func(restic.BlobHandle) []restic.PackedBlob) *fileRestorer {
|
||||||
|
|
||||||
|
@ -175,17 +173,14 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
|
||||||
return wg.Wait()
|
return wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
const maxBufferSize = 4 * 1024 * 1024
|
|
||||||
|
|
||||||
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||||
|
|
||||||
// calculate pack byte range and blob->[]files->[]offsets mappings
|
// calculate pack byte range and blob->[]files->[]offsets mappings
|
||||||
start, end := int64(math.MaxInt64), int64(0)
|
start, end := int64(math.MaxInt64), int64(0)
|
||||||
blobs := make(map[restic.ID]struct {
|
blobs := make(map[restic.ID]struct {
|
||||||
offset int64 // offset of the blob in the pack
|
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
|
||||||
length int // length of the blob
|
|
||||||
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
|
|
||||||
})
|
})
|
||||||
|
var blobList []restic.Blob
|
||||||
for file := range pack.files {
|
for file := range pack.files {
|
||||||
addBlob := func(blob restic.Blob, fileOffset int64) {
|
addBlob := func(blob restic.Blob, fileOffset int64) {
|
||||||
if start > int64(blob.Offset) {
|
if start > int64(blob.Offset) {
|
||||||
|
@ -196,9 +191,8 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||||
}
|
}
|
||||||
blobInfo, ok := blobs[blob.ID]
|
blobInfo, ok := blobs[blob.ID]
|
||||||
if !ok {
|
if !ok {
|
||||||
blobInfo.offset = int64(blob.Offset)
|
|
||||||
blobInfo.length = int(blob.Length)
|
|
||||||
blobInfo.files = make(map[*fileInfo][]int64)
|
blobInfo.files = make(map[*fileInfo][]int64)
|
||||||
|
blobList = append(blobList, blob)
|
||||||
blobs[blob.ID] = blobInfo
|
blobs[blob.ID] = blobInfo
|
||||||
}
|
}
|
||||||
blobInfo.files[file] = append(blobInfo.files[file], fileOffset)
|
blobInfo.files[file] = append(blobInfo.files[file], fileOffset)
|
||||||
|
@ -228,14 +222,6 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sortedBlobs := make([]restic.ID, 0, len(blobs))
|
|
||||||
for blobID := range blobs {
|
|
||||||
sortedBlobs = append(sortedBlobs, blobID)
|
|
||||||
}
|
|
||||||
sort.Slice(sortedBlobs, func(i, j int) bool {
|
|
||||||
return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset
|
|
||||||
})
|
|
||||||
|
|
||||||
sanitizeError := func(file *fileInfo, err error) error {
|
sanitizeError := func(file *fileInfo, err error) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = r.Error(file.location, err)
|
err = r.Error(file.location, err)
|
||||||
|
@ -243,59 +229,39 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
h := restic.Handle{Type: restic.PackFile, Name: pack.id.String(), ContainedBlobType: restic.DataBlob}
|
err := repository.StreamPack(ctx, r.packLoader, r.key, pack.id, blobList, func(h restic.BlobHandle, blobData []byte, err error) error {
|
||||||
err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error {
|
blob := blobs[h.ID]
|
||||||
bufferSize := int(end - start)
|
if err != nil {
|
||||||
if bufferSize > maxBufferSize {
|
for file := range blob.files {
|
||||||
bufferSize = maxBufferSize
|
if errFile := sanitizeError(file, err); errFile != nil {
|
||||||
}
|
return errFile
|
||||||
bufRd := bufio.NewReaderSize(rd, bufferSize)
|
|
||||||
currentBlobEnd := start
|
|
||||||
var blobData, buf []byte
|
|
||||||
for _, blobID := range sortedBlobs {
|
|
||||||
blob := blobs[blobID]
|
|
||||||
_, err := bufRd.Discard(int(blob.offset - currentBlobEnd))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
buf, err = r.downloadBlob(bufRd, blobID, blob.length, buf)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
blobData, err = r.decryptBlob(blobID, buf)
|
|
||||||
if err != nil {
|
|
||||||
for file := range blob.files {
|
|
||||||
if errFile := sanitizeError(file, err); errFile != nil {
|
|
||||||
return errFile
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
currentBlobEnd = blob.offset + int64(blob.length)
|
return nil
|
||||||
for file, offsets := range blob.files {
|
}
|
||||||
for _, offset := range offsets {
|
for file, offsets := range blob.files {
|
||||||
writeToFile := func() error {
|
for _, offset := range offsets {
|
||||||
// this looks overly complicated and needs explanation
|
writeToFile := func() error {
|
||||||
// two competing requirements:
|
// this looks overly complicated and needs explanation
|
||||||
// - must create the file once and only once
|
// two competing requirements:
|
||||||
// - should allow concurrent writes to the file
|
// - must create the file once and only once
|
||||||
// so write the first blob while holding file lock
|
// - should allow concurrent writes to the file
|
||||||
// write other blobs after releasing the lock
|
// so write the first blob while holding file lock
|
||||||
createSize := int64(-1)
|
// write other blobs after releasing the lock
|
||||||
file.lock.Lock()
|
createSize := int64(-1)
|
||||||
if file.inProgress {
|
file.lock.Lock()
|
||||||
file.lock.Unlock()
|
if file.inProgress {
|
||||||
} else {
|
file.lock.Unlock()
|
||||||
defer file.lock.Unlock()
|
} else {
|
||||||
file.inProgress = true
|
defer file.lock.Unlock()
|
||||||
createSize = file.size
|
file.inProgress = true
|
||||||
}
|
createSize = file.size
|
||||||
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize)
|
|
||||||
}
|
|
||||||
err := sanitizeError(file, writeToFile())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize)
|
||||||
|
}
|
||||||
|
err := sanitizeError(file, writeToFile())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -312,41 +278,3 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *fileRestorer) downloadBlob(rd io.Reader, blobID restic.ID, length int, buf []byte) ([]byte, error) {
|
|
||||||
// TODO reconcile with Repository#loadBlob implementation
|
|
||||||
|
|
||||||
if cap(buf) < length {
|
|
||||||
buf = make([]byte, length)
|
|
||||||
} else {
|
|
||||||
buf = buf[:length]
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := io.ReadFull(rd, buf)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if n != length {
|
|
||||||
return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n)
|
|
||||||
}
|
|
||||||
return buf, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *fileRestorer) decryptBlob(blobID restic.ID, buf []byte) ([]byte, error) {
|
|
||||||
// TODO reconcile with Repository#loadBlob implementation
|
|
||||||
|
|
||||||
// decrypt
|
|
||||||
nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():]
|
|
||||||
plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check hash
|
|
||||||
if !restic.Hash(plaintext).Equal(blobID) {
|
|
||||||
return nil, errors.Errorf("blob %v returned invalid hash", blobID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return plaintext, nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/restic/restic/internal/crypto"
|
"github.com/restic/restic/internal/crypto"
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
|
"github.com/restic/restic/internal/repository"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
rtest "github.com/restic/restic/internal/test"
|
rtest "github.com/restic/restic/internal/test"
|
||||||
)
|
)
|
||||||
|
@ -38,7 +39,7 @@ type TestRepo struct {
|
||||||
filesPathToContent map[string]string
|
filesPathToContent map[string]string
|
||||||
|
|
||||||
//
|
//
|
||||||
loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
|
loader repository.BackendLoadFn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob {
|
func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob {
|
||||||
|
@ -267,7 +268,7 @@ func TestErrorRestoreFiles(t *testing.T) {
|
||||||
r.files = repo.files
|
r.files = repo.files
|
||||||
|
|
||||||
err := r.restoreFiles(context.TODO())
|
err := r.restoreFiles(context.TODO())
|
||||||
rtest.Equals(t, loadError, err)
|
rtest.Assert(t, errors.Is(err, loadError), "got %v, expected contained error %v", err, loadError)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDownloadError(t *testing.T) {
|
func TestDownloadError(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue