From f40abd92fa97e5e54fc0b24666cc0498d889a41d Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 Aug 2021 12:12:38 +0200 Subject: [PATCH] restorer: convert to use StreamPack --- internal/repository/repository.go | 2 +- internal/restorer/filerestorer.go | 144 +++++++------------------ internal/restorer/filerestorer_test.go | 5 +- 3 files changed, 40 insertions(+), 111 deletions(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index df62f3ad7..291e00da6 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -798,7 +798,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack sort.Slice(blobs, func(i, j int) bool { return blobs[i].Offset < blobs[j].Offset }) - h := restic.Handle{Type: restic.PackFile, Name: packID.String()} + h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob} dataStart := blobs[0].Offset dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index d3d52f13a..323b69cf8 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -1,12 +1,9 @@ package restorer import ( - "bufio" "context" - "io" "math" "path/filepath" - "sort" "sync" "golang.org/x/sync/errgroup" @@ -14,6 +11,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" ) @@ -52,7 +50,7 @@ type packInfo struct { type fileRestorer struct { key *crypto.Key idx func(restic.BlobHandle) []restic.PackedBlob - packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + packLoader repository.BackendLoadFn filesWriter *filesWriter @@ -62,7 +60,7 @@ type fileRestorer struct { } func newFileRestorer(dst string, - packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, + packLoader repository.BackendLoadFn, key *crypto.Key, idx func(restic.BlobHandle) []restic.PackedBlob) *fileRestorer { @@ -175,17 +173,14 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { return wg.Wait() } -const maxBufferSize = 4 * 1024 * 1024 - func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { // calculate pack byte range and blob->[]files->[]offsets mappings start, end := int64(math.MaxInt64), int64(0) blobs := make(map[restic.ID]struct { - offset int64 // offset of the blob in the pack - length int // length of the blob - files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file + files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file }) + var blobList []restic.Blob for file := range pack.files { addBlob := func(blob restic.Blob, fileOffset int64) { if start > int64(blob.Offset) { @@ -196,9 +191,8 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } blobInfo, ok := blobs[blob.ID] if !ok { - blobInfo.offset = int64(blob.Offset) - blobInfo.length = int(blob.Length) blobInfo.files = make(map[*fileInfo][]int64) + blobList = append(blobList, blob) blobs[blob.ID] = blobInfo } blobInfo.files[file] = append(blobInfo.files[file], fileOffset) @@ -228,14 +222,6 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } } - sortedBlobs := make([]restic.ID, 0, len(blobs)) - for blobID := range blobs { - sortedBlobs = append(sortedBlobs, blobID) - } - sort.Slice(sortedBlobs, func(i, j int) bool { - return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset - }) - sanitizeError := func(file *fileInfo, err error) error { if err != nil { err = r.Error(file.location, err) @@ -243,59 +229,39 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { return err } - h := restic.Handle{Type: restic.PackFile, Name: pack.id.String(), ContainedBlobType: restic.DataBlob} - err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error { - bufferSize := int(end - start) - if bufferSize > maxBufferSize { - bufferSize = maxBufferSize - } - bufRd := bufio.NewReaderSize(rd, bufferSize) - currentBlobEnd := start - var blobData, buf []byte - for _, blobID := range sortedBlobs { - blob := blobs[blobID] - _, err := bufRd.Discard(int(blob.offset - currentBlobEnd)) - if err != nil { - return err - } - buf, err = r.downloadBlob(bufRd, blobID, blob.length, buf) - if err != nil { - return err - } - blobData, err = r.decryptBlob(blobID, buf) - if err != nil { - for file := range blob.files { - if errFile := sanitizeError(file, err); errFile != nil { - return errFile - } + err := repository.StreamPack(ctx, r.packLoader, r.key, pack.id, blobList, func(h restic.BlobHandle, blobData []byte, err error) error { + blob := blobs[h.ID] + if err != nil { + for file := range blob.files { + if errFile := sanitizeError(file, err); errFile != nil { + return errFile } - continue } - currentBlobEnd = blob.offset + int64(blob.length) - for file, offsets := range blob.files { - for _, offset := range offsets { - writeToFile := func() error { - // this looks overly complicated and needs explanation - // two competing requirements: - // - must create the file once and only once - // - should allow concurrent writes to the file - // so write the first blob while holding file lock - // write other blobs after releasing the lock - createSize := int64(-1) - file.lock.Lock() - if file.inProgress { - file.lock.Unlock() - } else { - defer file.lock.Unlock() - file.inProgress = true - createSize = file.size - } - return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize) - } - err := sanitizeError(file, writeToFile()) - if err != nil { - return err + return nil + } + for file, offsets := range blob.files { + for _, offset := range offsets { + writeToFile := func() error { + // this looks overly complicated and needs explanation + // two competing requirements: + // - must create the file once and only once + // - should allow concurrent writes to the file + // so write the first blob while holding file lock + // write other blobs after releasing the lock + createSize := int64(-1) + file.lock.Lock() + if file.inProgress { + file.lock.Unlock() + } else { + defer file.lock.Unlock() + file.inProgress = true + createSize = file.size } + return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize) + } + err := sanitizeError(file, writeToFile()) + if err != nil { + return err } } } @@ -312,41 +278,3 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { return nil } - -func (r *fileRestorer) downloadBlob(rd io.Reader, blobID restic.ID, length int, buf []byte) ([]byte, error) { - // TODO reconcile with Repository#loadBlob implementation - - if cap(buf) < length { - buf = make([]byte, length) - } else { - buf = buf[:length] - } - - n, err := io.ReadFull(rd, buf) - if err != nil { - return nil, err - } - - if n != length { - return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n) - } - return buf, nil -} - -func (r *fileRestorer) decryptBlob(blobID restic.ID, buf []byte) ([]byte, error) { - // TODO reconcile with Repository#loadBlob implementation - - // decrypt - nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():] - plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil) - if err != nil { - return nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err) - } - - // check hash - if !restic.Hash(plaintext).Equal(blobID) { - return nil, errors.Errorf("blob %v returned invalid hash", blobID) - } - - return plaintext, nil -} diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index 333420b70..f5760f54a 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -10,6 +10,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" ) @@ -38,7 +39,7 @@ type TestRepo struct { filesPathToContent map[string]string // - loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + loader repository.BackendLoadFn } func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob { @@ -267,7 +268,7 @@ func TestErrorRestoreFiles(t *testing.T) { r.files = repo.files err := r.restoreFiles(context.TODO()) - rtest.Equals(t, loadError, err) + rtest.Assert(t, errors.Is(err, loadError), "got %v, expected contained error %v", err, loadError) } func TestDownloadError(t *testing.T) {