repository: Replace StreamPack function with LoadBlobsFromPack method

LoadBlobsFromPack is now part of the repository struct. This ensures
that users of that method don't have to deal will internals of the
repository implementation.

The filerestorer tests now also contain far fewer pack file
implementation details.
This commit is contained in:
Michael Eischer 2023-12-31 12:07:19 +01:00
parent 6b7b5c89e9
commit 2c310a526e
8 changed files with 66 additions and 107 deletions

View file

@ -116,7 +116,7 @@ func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repo
continue
}
err = repository.StreamPack(wgCtx, repo.Backend().Load, repo.Key(), b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
err = repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
// Fallback path
buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)

View file

@ -77,7 +77,7 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
worker := func() error {
for t := range downloadQueue {
err := StreamPack(wgCtx, repo.Backend().Load, repo.Key(), t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
var ierr error
// check whether we can get a valid copy somewhere else

View file

@ -875,16 +875,20 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
return newID, known, size, err
}
type BackendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
// Skip sections with more than 4MB unused blobs
const maxUnusedRange = 4 * 1024 * 1024
// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match.
// handleBlobFn is called at most once for each blob. If the callback returns an error,
// then StreamPack will abort and not retry it.
func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
// then LoadBlobsFromPack will abort and not retry it.
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn)
}
func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 {
// nothing to do
return nil
@ -915,7 +919,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
}
func streamPackPart(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}
dataStart := blobs[0].Offset

View file

@ -276,7 +276,7 @@ func testStreamPack(t *testing.T, version uint) {
loadCalls = 0
shortFirstLoad = test.shortFirstLoad
err = StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
if err != nil {
t.Fatal(err)
}
@ -339,7 +339,7 @@ func testStreamPack(t *testing.T, version uint) {
return err
}
err = StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
if err == nil {
t.Fatalf("wanted error %v, got nil", test.err)
}

View file

@ -44,6 +44,7 @@ type Repository interface {
ListPack(context.Context, ID, int64) ([]Blob, uint32, error)
LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error)
LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error
SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, int, error)
// StartPackUploader start goroutines to upload new pack files. The errgroup

View file

@ -7,7 +7,6 @@ import (
"golang.org/x/sync/errgroup"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
@ -45,11 +44,12 @@ type packInfo struct {
files map[*fileInfo]struct{} // set of files that use blobs from this pack
}
type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error
// fileRestorer restores set of files
type fileRestorer struct {
key *crypto.Key
idx func(restic.BlobHandle) []restic.PackedBlob
packLoader repository.BackendLoadFn
idx func(restic.BlobHandle) []restic.PackedBlob
blobsLoader blobsLoaderFn
workerCount int
filesWriter *filesWriter
@ -63,8 +63,7 @@ type fileRestorer struct {
}
func newFileRestorer(dst string,
packLoader repository.BackendLoadFn,
key *crypto.Key,
blobsLoader blobsLoaderFn,
idx func(restic.BlobHandle) []restic.PackedBlob,
connections uint,
sparse bool,
@ -74,9 +73,8 @@ func newFileRestorer(dst string,
workerCount := int(connections)
return &fileRestorer{
key: key,
idx: idx,
packLoader: packLoader,
blobsLoader: blobsLoader,
filesWriter: newFilesWriter(workerCount),
zeroChunk: repository.ZeroChunk(),
sparse: sparse,
@ -310,7 +308,7 @@ func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID,
for _, entry := range blobs {
blobList = append(blobList, entry.blob)
}
return repository.StreamPack(ctx, r.packLoader, r.key, packID, blobList,
return r.blobsLoader(ctx, packID, blobList,
func(h restic.BlobHandle, blobData []byte, err error) error {
processedBlobs.Insert(h)
blob := blobs[h.ID]

View file

@ -4,14 +4,11 @@ import (
"bytes"
"context"
"fmt"
"io"
"os"
"sort"
"testing"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/crypto"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/repository"
"github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test"
)
@ -27,11 +24,6 @@ type TestFile struct {
}
type TestRepo struct {
key *crypto.Key
// pack names and ids
packsNameToID map[string]restic.ID
packsIDToName map[restic.ID]string
packsIDToData map[restic.ID][]byte
// blobs and files
@ -40,7 +32,7 @@ type TestRepo struct {
filesPathToContent map[string]string
//
loader repository.BackendLoadFn
loader blobsLoaderFn
}
func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob {
@ -59,16 +51,6 @@ func newTestRepo(content []TestFile) *TestRepo {
blobs map[restic.ID]restic.Blob
}
packs := make(map[string]Pack)
key := crypto.NewRandomKey()
seal := func(data []byte) []byte {
ciphertext := crypto.NewBlobBuffer(len(data))
ciphertext = ciphertext[:0] // truncate the slice
nonce := crypto.NewRandomNonce()
ciphertext = append(ciphertext, nonce...)
return key.Seal(ciphertext, nonce, data, nil)
}
filesPathToContent := make(map[string]string)
for _, file := range content {
@ -86,14 +68,15 @@ func newTestRepo(content []TestFile) *TestRepo {
// calculate blob id and add to the pack as necessary
blobID := restic.Hash([]byte(blob.data))
if _, found := pack.blobs[blobID]; !found {
blobData := seal([]byte(blob.data))
blobData := []byte(blob.data)
pack.blobs[blobID] = restic.Blob{
BlobHandle: restic.BlobHandle{
Type: restic.DataBlob,
ID: blobID,
},
Length: uint(len(blobData)),
Offset: uint(len(pack.data)),
Length: uint(len(blobData)),
UncompressedLength: uint(len(blobData)),
Offset: uint(len(pack.data)),
}
pack.data = append(pack.data, blobData...)
}
@ -104,15 +87,11 @@ func newTestRepo(content []TestFile) *TestRepo {
}
blobs := make(map[restic.ID][]restic.PackedBlob)
packsIDToName := make(map[restic.ID]string)
packsIDToData := make(map[restic.ID][]byte)
packsNameToID := make(map[string]restic.ID)
for _, pack := range packs {
packID := restic.Hash(pack.data)
packsIDToName[packID] = pack.name
packsIDToData[packID] = pack.data
packsNameToID[pack.name] = packID
for blobID, blob := range pack.blobs {
blobs[blobID] = append(blobs[blobID], restic.PackedBlob{Blob: blob, PackID: packID})
}
@ -128,30 +107,44 @@ func newTestRepo(content []TestFile) *TestRepo {
}
repo := &TestRepo{
key: key,
packsIDToName: packsIDToName,
packsIDToData: packsIDToData,
packsNameToID: packsNameToID,
blobs: blobs,
files: files,
filesPathToContent: filesPathToContent,
}
repo.loader = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
packID, err := restic.ParseID(h.Name)
if err != nil {
return err
repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
blobs = append([]restic.Blob{}, blobs...)
sort.Slice(blobs, func(i, j int) bool {
return blobs[i].Offset < blobs[j].Offset
})
for _, blob := range blobs {
found := false
for _, e := range repo.blobs[blob.ID] {
if packID == e.PackID {
found = true
buf := repo.packsIDToData[packID][e.Offset : e.Offset+e.Length]
err := handleBlobFn(e.BlobHandle, buf, nil)
if err != nil {
return err
}
}
}
if !found {
return fmt.Errorf("missing blob: %v", blob)
}
}
rd := bytes.NewReader(repo.packsIDToData[packID][int(offset) : int(offset)+length])
return fn(rd)
return nil
}
return repo
}
func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files map[string]bool, sparse bool) {
t.Helper()
repo := newTestRepo(content)
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2, sparse, nil)
r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, sparse, nil)
if files == nil {
r.files = repo.files
@ -170,6 +163,7 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files ma
}
func verifyRestore(t *testing.T, r *fileRestorer, repo *TestRepo) {
t.Helper()
for _, file := range r.files {
target := r.targetPath(file.location)
data, err := os.ReadFile(target)
@ -283,62 +277,17 @@ func TestErrorRestoreFiles(t *testing.T) {
loadError := errors.New("load error")
// loader always returns an error
repo.loader = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return loadError
}
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2, false, nil)
r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, nil)
r.files = repo.files
err := r.restoreFiles(context.TODO())
rtest.Assert(t, errors.Is(err, loadError), "got %v, expected contained error %v", err, loadError)
}
func TestDownloadError(t *testing.T) {
for i := 0; i < 100; i += 10 {
testPartialDownloadError(t, i)
}
}
func testPartialDownloadError(t *testing.T, part int) {
tempdir := rtest.TempDir(t)
content := []TestFile{
{
name: "file1",
blobs: []TestBlob{
{"data1-1", "pack1"},
{"data1-2", "pack1"},
{"data1-3", "pack1"},
},
}}
repo := newTestRepo(content)
// loader always returns an error
loader := repo.loader
repo.loader = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
// only load partial data to exercise fault handling in different places
err := loader(ctx, h, length*part/100, offset, fn)
if err == nil {
return nil
}
fmt.Println("Retry after error", err)
return loader(ctx, h, length, offset, fn)
}
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2, false, nil)
r.files = repo.files
r.Error = func(s string, e error) error {
// ignore errors as in the `restore` command
fmt.Println("error during restore", s, e)
return nil
}
err := r.restoreFiles(context.TODO())
rtest.OK(t, err)
verifyRestore(t, r, repo)
}
func TestFatalDownloadError(t *testing.T) {
tempdir := rtest.TempDir(t)
content := []TestFile{
@ -361,12 +310,19 @@ func TestFatalDownloadError(t *testing.T) {
repo := newTestRepo(content)
loader := repo.loader
repo.loader = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
// only return half the data to break file2
return loader(ctx, h, length/2, offset, fn)
repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
ctr := 0
return loader(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if ctr < 2 {
ctr++
return handleBlobFn(blob, buf, err)
}
// break file2
return errors.New("failed to load blob")
})
}
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2, false, nil)
r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, nil)
r.files = repo.files
var errors []string

View file

@ -231,7 +231,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
}
idx := NewHardlinkIndex[string]()
filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup,
filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.Index().Lookup,
res.repo.Connections(), res.sparse, res.progress)
filerestorer.Error = res.Error