From 621012dac08f88b844349d5a976a3f336a883be1 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 31 Dec 2023 15:27:36 +0100 Subject: [PATCH] repository: Add blob loading fallback to LoadBlobsFromPack Try to retrieve individual blobs via LoadBlob if streaming did not work. --- internal/repository/repack.go | 9 +- internal/repository/repository.go | 35 ++++++-- .../repository/repository_internal_test.go | 86 +++++++++++++++++-- 3 files changed, 109 insertions(+), 21 deletions(-) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 5588984f6..e839a9c0f 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -79,13 +79,8 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito for t := range downloadQueue { err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { - var ierr error - // check whether we can get a valid copy somewhere else - buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil) - if ierr != nil { - // no luck, return the original error - return err - } + // a required blob couldn't be retrieved + return err } keepMutex.Lock() diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 8e34c7125..f2cde014a 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -943,6 +943,7 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte } type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error +type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) // Skip sections with more than 4MB unused blobs const maxUnusedRange = 4 * 1024 * 1024 @@ -952,10 +953,10 @@ const maxUnusedRange = 4 * 1024 * 1024 // handleBlobFn is called at most once for each blob. If the callback returns an error, // then LoadBlobsFromPack will abort and not retry it. func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { - return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn) + return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn) } -func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { if len(blobs) == 0 { // nothing to do return nil @@ -974,7 +975,7 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack } if blobs[i].Offset-lastPos > maxUnusedRange { // load everything up to the skipped file section - err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn) + err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn) if err != nil { return err } @@ -983,10 +984,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack lastPos = blobs[i].Offset + blobs[i].Length } // load remainder - return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn) + return streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:], handleBlobFn) } -func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false} dataStart := blobs[0].Offset @@ -1022,6 +1023,17 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, return err } + if val.Err != nil && loadBlobFn != nil { + var ierr error + // check whether we can get a valid copy somewhere else + buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil) + if ierr == nil { + // success + val.Plaintext = buf + val.Err = nil + } + } + err = handleBlobFn(val.Handle, val.Plaintext, val.Err) if err != nil { cancel() @@ -1032,6 +1044,19 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, } return nil }) + + // the context is only still valid if handleBlobFn never returned an error + if ctx.Err() == nil && loadBlobFn != nil { + // check whether we can get the remaining blobs somewhere else + for _, entry := range blobs { + buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil) + err = handleBlobFn(entry.BlobHandle, buf, ierr) + if err != nil { + break + } + } + } + return errors.Wrap(err, "StreamPack") } diff --git a/internal/repository/repository_internal_test.go b/internal/repository/repository_internal_test.go index 0c7115bc9..1f71b17de 100644 --- a/internal/repository/repository_internal_test.go +++ b/internal/repository/repository_internal_test.go @@ -147,13 +147,7 @@ func TestStreamPack(t *testing.T) { func testStreamPack(t *testing.T, version uint) { // always use the same key for deterministic output - const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}` - - var key crypto.Key - err := json.Unmarshal([]byte(jsonKey), &key) - if err != nil { - t.Fatal(err) - } + key := testKey(t) blobSizes := []int{ 5522811, @@ -276,7 +270,7 @@ func testStreamPack(t *testing.T, version uint) { loadCalls = 0 shortFirstLoad = test.shortFirstLoad - err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob) if err != nil { t.Fatal(err) } @@ -339,7 +333,7 @@ func testStreamPack(t *testing.T, version uint) { return err } - err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + err := streamPack(ctx, load, nil, &key, restic.ID{}, test.blobs, handleBlob) if err == nil { t.Fatalf("wanted error %v, got nil", test.err) } @@ -449,3 +443,77 @@ func TestUnpackedVerification(t *testing.T) { } } } + +func testKey(t *testing.T) crypto.Key { + const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}` + + var key crypto.Key + err := json.Unmarshal([]byte(jsonKey), &key) + if err != nil { + t.Fatal(err) + } + return key +} + +func TestStreamPackFallback(t *testing.T) { + test := func(t *testing.T, failLoad bool) { + key := testKey(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + plaintext := rtest.Random(800, 42) + blobID := restic.Hash(plaintext) + blobs := []restic.Blob{ + { + Length: uint(crypto.CiphertextLength(len(plaintext))), + Offset: 0, + BlobHandle: restic.BlobHandle{ + ID: blobID, + Type: restic.DataBlob, + }, + }, + } + + var loadPack backendLoadFn + if failLoad { + loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return errors.New("load error") + } + } else { + loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + // just return an empty array to provoke an error + data := make([]byte, length) + return fn(bytes.NewReader(data)) + } + } + + loadBlob := func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) { + if id == blobID { + return plaintext, nil + } + return nil, errors.New("unknown blob") + } + + blobOK := false + handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error { + rtest.OK(t, err) + rtest.Equals(t, blobID, blob.ID) + rtest.Equals(t, plaintext, buf) + blobOK = true + return err + } + + err := streamPack(ctx, loadPack, loadBlob, &key, restic.ID{}, blobs, handleBlob) + rtest.OK(t, err) + rtest.Assert(t, blobOK, "blob failed to load") + } + + t.Run("corrupted blob", func(t *testing.T) { + test(t, false) + }) + + // test fallback for failed pack loading + t.Run("failed load", func(t *testing.T) { + test(t, true) + }) +}