restore: Don't save (part of) pack in memory

This commit is contained in:
Alexander Weiss 2020-11-18 12:36:06 +01:00
parent 8b84c96d9d
commit 3e0acf1395

View file

@ -1,11 +1,12 @@
package restorer
import (
"bytes"
"bufio"
"context"
"io"
"math"
"path/filepath"
"sort"
"sync"
"github.com/restic/restic/internal/crypto"
@ -179,6 +180,8 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
return nil
}
const maxBufferSize = 4 * 1024 * 1024
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
// calculate pack byte range and blob->[]files->[]offsets mappings
@ -226,18 +229,12 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
}
}
packData := make([]byte, int(end-start))
h := restic.Handle{Type: restic.PackFile, Name: pack.id.String()}
err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error {
l, err := io.ReadFull(rd, packData)
if err != nil {
return err
}
if l != len(packData) {
return errors.Errorf("unexpected pack size: expected %d but got %d", len(packData), l)
}
return nil
sortedBlobs := make([]restic.ID, 0, len(blobs))
for blobID := range blobs {
sortedBlobs = append(sortedBlobs, blobID)
}
sort.Slice(sortedBlobs, func(i, j int) bool {
return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset
})
markFileError := func(file *fileInfo, err error) {
@ -248,6 +245,61 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
}
}
h := restic.Handle{Type: restic.PackFile, Name: pack.id.String()}
err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error {
bufferSize := int(end - start)
if bufferSize > maxBufferSize {
bufferSize = maxBufferSize
}
BufRd := bufio.NewReaderSize(rd, bufferSize)
currentBlobEnd := start
for _, blobID := range sortedBlobs {
blob := blobs[blobID]
_, err := BufRd.Discard(int(blob.offset - currentBlobEnd))
if err != nil {
return err
}
blobData, err := r.loadBlob(BufRd, blobID, blob.length)
if err != nil {
for file := range blob.files {
markFileError(file, err)
}
continue
}
currentBlobEnd = blob.offset + int64(blob.length)
for file, offsets := range blob.files {
for _, offset := range offsets {
writeToFile := func() error {
// this looks overly complicated and needs explanation
// two competing requirements:
// - must create the file once and only once
// - should allow concurrent writes to the file
// so write the first blob while holding file lock
// write other blobs after releasing the lock
file.lock.Lock()
create := file.flags&fileProgress == 0
createSize := int64(-1)
if create {
defer file.lock.Unlock()
file.flags |= fileProgress
createSize = file.size
} else {
file.lock.Unlock()
}
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize)
}
err := writeToFile()
if err != nil {
markFileError(file, err)
break
}
}
}
}
return nil
})
if err != nil {
for file := range pack.files {
markFileError(file, err)
@ -255,53 +307,14 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) {
return
}
rd := bytes.NewReader(packData)
for blobID, blob := range blobs {
blobData, err := r.loadBlob(rd, blobID, blob.offset-start, blob.length)
if err != nil {
for file := range blob.files {
markFileError(file, err)
}
continue
}
for file, offsets := range blob.files {
for _, offset := range offsets {
writeToFile := func() error {
// this looks overly complicated and needs explanation
// two competing requirements:
// - must create the file once and only once
// - should allow concurrent writes to the file
// so write the first blob while holding file lock
// write other blobs after releasing the lock
file.lock.Lock()
create := file.flags&fileProgress == 0
createSize := int64(-1)
if create {
defer file.lock.Unlock()
file.flags |= fileProgress
createSize = file.size
} else {
file.lock.Unlock()
}
return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize)
}
err := writeToFile()
if err != nil {
markFileError(file, err)
break
}
}
}
}
}
func (r *fileRestorer) loadBlob(rd io.ReaderAt, blobID restic.ID, offset int64, length int) ([]byte, error) {
func (r *fileRestorer) loadBlob(rd io.Reader, blobID restic.ID, length int) ([]byte, error) {
// TODO reconcile with Repository#loadBlob implementation
buf := make([]byte, length)
n, err := rd.ReadAt(buf, offset)
n, err := rd.Read(buf)
if err != nil {
return nil, err
}