repository: Add blob loading fallback to LoadBlobsFromPack
Try to retrieve individual blobs via LoadBlob if streaming did not work.
This commit is contained in:
parent
228b35f074
commit
621012dac0
3 changed files with 109 additions and 21 deletions
|
@ -79,13 +79,8 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
|
|||
for t := range downloadQueue {
|
||||
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if err != nil {
|
||||
var ierr error
|
||||
// check whether we can get a valid copy somewhere else
|
||||
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
|
||||
if ierr != nil {
|
||||
// no luck, return the original error
|
||||
return err
|
||||
}
|
||||
// a required blob couldn't be retrieved
|
||||
return err
|
||||
}
|
||||
|
||||
keepMutex.Lock()
|
||||
|
|
|
@ -943,6 +943,7 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
|
|||
}
|
||||
|
||||
type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
|
||||
type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error)
|
||||
|
||||
// Skip sections with more than 4MB unused blobs
|
||||
const maxUnusedRange = 4 * 1024 * 1024
|
||||
|
@ -952,10 +953,10 @@ const maxUnusedRange = 4 * 1024 * 1024
|
|||
// handleBlobFn is called at most once for each blob. If the callback returns an error,
|
||||
// then LoadBlobsFromPack will abort and not retry it.
|
||||
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn)
|
||||
return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn)
|
||||
}
|
||||
|
||||
func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
if len(blobs) == 0 {
|
||||
// nothing to do
|
||||
return nil
|
||||
|
@ -974,7 +975,7 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
|
|||
}
|
||||
if blobs[i].Offset-lastPos > maxUnusedRange {
|
||||
// load everything up to the skipped file section
|
||||
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn)
|
||||
err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -983,10 +984,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
|
|||
lastPos = blobs[i].Offset + blobs[i].Length
|
||||
}
|
||||
// load remainder
|
||||
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
|
||||
return streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:], handleBlobFn)
|
||||
}
|
||||
|
||||
func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}
|
||||
|
||||
dataStart := blobs[0].Offset
|
||||
|
@ -1022,6 +1023,17 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key,
|
|||
return err
|
||||
}
|
||||
|
||||
if val.Err != nil && loadBlobFn != nil {
|
||||
var ierr error
|
||||
// check whether we can get a valid copy somewhere else
|
||||
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
|
||||
if ierr == nil {
|
||||
// success
|
||||
val.Plaintext = buf
|
||||
val.Err = nil
|
||||
}
|
||||
}
|
||||
|
||||
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
|
||||
if err != nil {
|
||||
cancel()
|
||||
|
@ -1032,6 +1044,19 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key,
|
|||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
// the context is only still valid if handleBlobFn never returned an error
|
||||
if ctx.Err() == nil && loadBlobFn != nil {
|
||||
// check whether we can get the remaining blobs somewhere else
|
||||
for _, entry := range blobs {
|
||||
buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil)
|
||||
err = handleBlobFn(entry.BlobHandle, buf, ierr)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Wrap(err, "StreamPack")
|
||||
}
|
||||
|
||||
|
|
|
@ -147,13 +147,7 @@ func TestStreamPack(t *testing.T) {
|
|||
|
||||
func testStreamPack(t *testing.T, version uint) {
|
||||
// always use the same key for deterministic output
|
||||
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
|
||||
|
||||
var key crypto.Key
|
||||
err := json.Unmarshal([]byte(jsonKey), &key)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
key := testKey(t)
|
||||
|
||||
blobSizes := []int{
|
||||
5522811,
|
||||
|
@ -276,7 +270,7 @@ func testStreamPack(t *testing.T, version uint) {
|
|||
|
||||
loadCalls = 0
|
||||
shortFirstLoad = test.shortFirstLoad
|
||||
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
|
||||
err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -339,7 +333,7 @@ func testStreamPack(t *testing.T, version uint) {
|
|||
return err
|
||||
}
|
||||
|
||||
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
|
||||
err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob)
|
||||
if err == nil {
|
||||
t.Fatalf("wanted error %v, got nil", test.err)
|
||||
}
|
||||
|
@ -449,3 +443,77 @@ func TestUnpackedVerification(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testKey(t *testing.T) crypto.Key {
|
||||
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
|
||||
|
||||
var key crypto.Key
|
||||
err := json.Unmarshal([]byte(jsonKey), &key)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
func TestStreamPackFallback(t *testing.T) {
|
||||
test := func(t *testing.T, failLoad bool) {
|
||||
key := testKey(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
plaintext := rtest.Random(800, 42)
|
||||
blobID := restic.Hash(plaintext)
|
||||
blobs := []restic.Blob{
|
||||
{
|
||||
Length: uint(crypto.CiphertextLength(len(plaintext))),
|
||||
Offset: 0,
|
||||
BlobHandle: restic.BlobHandle{
|
||||
ID: blobID,
|
||||
Type: restic.DataBlob,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var loadPack backendLoadFn
|
||||
if failLoad {
|
||||
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
return errors.New("load error")
|
||||
}
|
||||
} else {
|
||||
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
// just return an empty array to provoke an error
|
||||
data := make([]byte, length)
|
||||
return fn(bytes.NewReader(data))
|
||||
}
|
||||
}
|
||||
|
||||
loadBlob := func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
|
||||
if id == blobID {
|
||||
return plaintext, nil
|
||||
}
|
||||
return nil, errors.New("unknown blob")
|
||||
}
|
||||
|
||||
blobOK := false
|
||||
handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
rtest.OK(t, err)
|
||||
rtest.Equals(t, blobID, blob.ID)
|
||||
rtest.Equals(t, plaintext, buf)
|
||||
blobOK = true
|
||||
return err
|
||||
}
|
||||
|
||||
err := streamPack(ctx, loadPack, loadBlob, &key, restic.ID{}, blobs, handleBlob)
|
||||
rtest.OK(t, err)
|
||||
rtest.Assert(t, blobOK, "blob failed to load")
|
||||
}
|
||||
|
||||
t.Run("corrupted blob", func(t *testing.T) {
|
||||
test(t, false)
|
||||
})
|
||||
|
||||
// test fallback for failed pack loading
|
||||
t.Run("failed load", func(t *testing.T) {
|
||||
test(t, true)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue