restore: split downloadPack into smaller methods
This commit is contained in:
parent
e4a7eb09ef
commit
9328f34d43
1 changed files with 65 additions and 57 deletions
|
@ -197,12 +197,13 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
|
|||
return wg.Wait()
|
||||
}
|
||||
|
||||
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||
type blobToFileOffsetsMapping map[restic.ID]struct {
|
||||
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
|
||||
}
|
||||
|
||||
func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
||||
// calculate blob->[]files->[]offsets mappings
|
||||
blobs := make(map[restic.ID]struct {
|
||||
files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file
|
||||
})
|
||||
blobs := make(blobToFileOffsetsMapping)
|
||||
var blobList []restic.Blob
|
||||
for file := range pack.files {
|
||||
addBlob := func(blob restic.Blob, fileOffset int64) {
|
||||
|
@ -239,60 +240,9 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
|||
}
|
||||
}
|
||||
|
||||
sanitizeError := func(file *fileInfo, err error) error {
|
||||
if err != nil {
|
||||
err = r.Error(file.location, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// track already processed blobs for precise error reporting
|
||||
processedBlobs := restic.NewBlobSet()
|
||||
err := repository.StreamPack(ctx, r.packLoader, r.key, pack.id, blobList, func(h restic.BlobHandle, blobData []byte, err error) error {
|
||||
processedBlobs.Insert(h)
|
||||
blob := blobs[h.ID]
|
||||
if err != nil {
|
||||
for file := range blob.files {
|
||||
if errFile := sanitizeError(file, err); errFile != nil {
|
||||
return errFile
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
for file, offsets := range blob.files {
|
||||
for _, offset := range offsets {
|
||||
writeToFile := func() error {
|
||||
// this looks overly complicated and needs explanation
|
||||
// two competing requirements:
|
||||
// - must create the file once and only once
|
||||
// - should allow concurrent writes to the file
|
||||
// so write the first blob while holding file lock
|
||||
// write other blobs after releasing the lock
|
||||
createSize := int64(-1)
|
||||
file.lock.Lock()
|
||||
if file.inProgress {
|
||||
file.lock.Unlock()
|
||||
} else {
|
||||
defer file.lock.Unlock()
|
||||
file.inProgress = true
|
||||
createSize = file.size
|
||||
}
|
||||
writeErr := r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize, file.sparse)
|
||||
|
||||
if r.progress != nil {
|
||||
r.progress.AddProgress(file.location, uint64(len(blobData)), uint64(file.size))
|
||||
}
|
||||
|
||||
return writeErr
|
||||
}
|
||||
err := sanitizeError(file, writeToFile())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
err := r.downloadBlobs(ctx, pack.id, blobList, blobs, processedBlobs)
|
||||
|
||||
if err != nil {
|
||||
// only report error for not yet processed blobs
|
||||
|
@ -308,7 +258,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
|||
}
|
||||
|
||||
for file := range affectedFiles {
|
||||
if errFile := sanitizeError(file, err); errFile != nil {
|
||||
if errFile := r.sanitizeError(file, err); errFile != nil {
|
||||
return errFile
|
||||
}
|
||||
}
|
||||
|
@ -316,3 +266,61 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *fileRestorer) sanitizeError(file *fileInfo, err error) error {
|
||||
if err != nil {
|
||||
err = r.Error(file.location, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID, blobList []restic.Blob,
|
||||
blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error {
|
||||
|
||||
return repository.StreamPack(ctx, r.packLoader, r.key, packID, blobList,
|
||||
func(h restic.BlobHandle, blobData []byte, err error) error {
|
||||
processedBlobs.Insert(h)
|
||||
blob := blobs[h.ID]
|
||||
if err != nil {
|
||||
for file := range blob.files {
|
||||
if errFile := r.sanitizeError(file, err); errFile != nil {
|
||||
return errFile
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
for file, offsets := range blob.files {
|
||||
for _, offset := range offsets {
|
||||
writeToFile := func() error {
|
||||
// this looks overly complicated and needs explanation
|
||||
// two competing requirements:
|
||||
// - must create the file once and only once
|
||||
// - should allow concurrent writes to the file
|
||||
// so write the first blob while holding file lock
|
||||
// write other blobs after releasing the lock
|
||||
createSize := int64(-1)
|
||||
file.lock.Lock()
|
||||
if file.inProgress {
|
||||
file.lock.Unlock()
|
||||
} else {
|
||||
defer file.lock.Unlock()
|
||||
file.inProgress = true
|
||||
createSize = file.size
|
||||
}
|
||||
writeErr := r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize, file.sparse)
|
||||
|
||||
if r.progress != nil {
|
||||
r.progress.AddProgress(file.location, uint64(len(blobData)), uint64(file.size))
|
||||
}
|
||||
|
||||
return writeErr
|
||||
}
|
||||
err := r.sanitizeError(file, writeToFile())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue