repository: streamPack: replace streaming with chunked download

Due to the interface of streamPack, we cannot guarantee that operations
progress fast enough that the underlying connections remains open. This
introduces partial failures which massively complicate the error
handling.

Switch to a simpler approach that retrieves the pack in chunks of 32MB.
If a blob is larger than this limit, then it is downloaded separately.

To avoid multiple copies in memory, an auxiliary interface
`discardReader` is introduced that allows directly accessing the
downloaded byte slices, while still supporting the streaming used by the
`check` command.
This commit is contained in:
Michael Eischer 2024-04-22 20:53:31 +02:00
parent 621012dac0
commit 666a0b0bdb
2 changed files with 134 additions and 70 deletions

View file

@ -561,7 +561,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
hrd := hashing.NewReader(rd, sha256.New()) hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd) bufRd.Reset(hrd)
it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec) it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec)
for { for {
val, err := it.Next() val, err := it.Next()
if err == repository.ErrPackEOF { if err == repository.ErrPackEOF {
@ -647,11 +647,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
return nil return nil
} }
type bufReader struct {
rd *bufio.Reader
buf []byte
}
func newBufReader(rd *bufio.Reader) *bufReader {
return &bufReader{
rd: rd,
}
}
func (b *bufReader) Discard(n int) (discarded int, err error) {
return b.rd.Discard(n)
}
func (b *bufReader) ReadFull(n int) (buf []byte, err error) {
if cap(b.buf) < n {
b.buf = make([]byte, n)
}
b.buf = b.buf[:n]
_, err = io.ReadFull(b.rd, b.buf)
if err != nil {
return nil, err
}
return b.buf, nil
}
// ReadData loads all data from the repository and checks the integrity. // ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) { func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, nil, errChan) c.ReadPacks(ctx, c.packs, nil, errChan)
} }
const maxStreamBufferSize = 4 * 1024 * 1024
// ReadPacks loads data from specified packs and checks the integrity. // ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) { func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan) defer close(errChan)
@ -669,9 +699,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
// run workers // run workers
for i := 0; i < workerCount; i++ { for i := 0; i < workerCount; i++ {
g.Go(func() error { g.Go(func() error {
// create a buffer that is large enough to be reused by repository.StreamPack bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
// this ensures that we can read the pack header later on
bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize)
dec, err := zstd.NewReader(nil) dec, err := zstd.NewReader(nil)
if err != nil { if err != nil {
panic(dec) panic(dec)

View file

@ -1,7 +1,6 @@
package repository package repository
import ( import (
"bufio"
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
@ -11,7 +10,6 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/cenkalti/backoff/v4"
"github.com/klauspost/compress/zstd" "github.com/klauspost/compress/zstd"
"github.com/restic/chunker" "github.com/restic/chunker"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
@ -28,8 +26,6 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
const MaxStreamBufferSize = 4 * 1024 * 1024
const MinPackSize = 4 * 1024 * 1024 const MinPackSize = 4 * 1024 * 1024
const DefaultPackSize = 16 * 1024 * 1024 const DefaultPackSize = 16 * 1024 * 1024
const MaxPackSize = 128 * 1024 * 1024 const MaxPackSize = 128 * 1024 * 1024
@ -951,7 +947,8 @@ const maxUnusedRange = 4 * 1024 * 1024
// LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. // the handleBlobFn callback or an error if decryption failed or the blob hash does not match.
// handleBlobFn is called at most once for each blob. If the callback returns an error, // handleBlobFn is called at most once for each blob. If the callback returns an error,
// then LoadBlobsFromPack will abort and not retry it. // then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
// this specific call. The callback must not keep a reference to buf.
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn) return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.key, packID, blobs, handleBlobFn)
} }
@ -968,12 +965,27 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn
lowerIdx := 0 lowerIdx := 0
lastPos := blobs[0].Offset lastPos := blobs[0].Offset
const maxChunkSize = 2 * DefaultPackSize
for i := 0; i < len(blobs); i++ { for i := 0; i < len(blobs); i++ {
if blobs[i].Offset < lastPos { if blobs[i].Offset < lastPos {
// don't wait for streamPackPart to fail // don't wait for streamPackPart to fail
return errors.Errorf("overlapping blobs in pack %v", packID) return errors.Errorf("overlapping blobs in pack %v", packID)
} }
chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset
split := false
// split if the chunk would become larger than maxChunkSize. Oversized chunks are
// handled by the requirement that the chunk contains at least one blob (i > lowerIdx)
if i > lowerIdx && chunkSizeAfter >= maxChunkSize {
split = true
}
// skip too large gaps as a new request is typically much cheaper than data transfers
if blobs[i].Offset-lastPos > maxUnusedRange { if blobs[i].Offset-lastPos > maxUnusedRange {
split = true
}
if split {
// load everything up to the skipped file section // load everything up to the skipped file section
err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn) err := streamPackPart(ctx, beLoad, loadBlobFn, key, packID, blobs[lowerIdx:i], handleBlobFn)
if err != nil { if err != nil {
@ -1001,75 +1013,108 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBl
} }
defer dec.Close() defer dec.Close()
ctx, cancel := context.WithCancel(ctx) data := make([]byte, int(dataEnd-dataStart))
// stream blobs in pack
err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
// prevent callbacks after cancellation _, cerr := io.ReadFull(rd, data)
if ctx.Err() != nil { return cerr
return ctx.Err() })
} // prevent callbacks after cancellation
bufferSize := int(dataEnd - dataStart) if ctx.Err() != nil {
if bufferSize > MaxStreamBufferSize { return ctx.Err()
bufferSize = MaxStreamBufferSize }
} if err != nil {
bufRd := bufio.NewReaderSize(rd, bufferSize) // the context is only still valid if handleBlobFn never returned an error
it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec) if loadBlobFn != nil {
// check whether we can get the remaining blobs somewhere else
for { for _, entry := range blobs {
val, err := it.Next() buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil)
if err == ErrPackEOF { err = handleBlobFn(entry.BlobHandle, buf, ierr)
break if err != nil {
} else if err != nil { break
return err
}
if val.Err != nil && loadBlobFn != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
if ierr == nil {
// success
val.Plaintext = buf
val.Err = nil
} }
} }
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
cancel()
return backoff.Permanent(err)
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
} }
return nil return errors.Wrap(err, "StreamPack")
}) }
// the context is only still valid if handleBlobFn never returned an error it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec)
if ctx.Err() == nil && loadBlobFn != nil {
// check whether we can get the remaining blobs somewhere else for {
for _, entry := range blobs { val, err := it.Next()
buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil) if err == ErrPackEOF {
err = handleBlobFn(entry.BlobHandle, buf, ierr) break
if err != nil { } else if err != nil {
break return err
}
if val.Err != nil && loadBlobFn != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
if ierr == nil {
// success
val.Plaintext = buf
val.Err = nil
} }
} }
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
return err
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
} }
return errors.Wrap(err, "StreamPack") return errors.Wrap(err, "StreamPack")
} }
// discardReader allows the PackBlobIterator to perform zero copy
// reads if the underlying data source is a byte slice.
type discardReader interface {
Discard(n int) (discarded int, err error)
// ReadFull reads the next n bytes into a byte slice. The caller must not
// retain a reference to the byte. Modifications are only allowed within
// the boundaries of the returned slice.
ReadFull(n int) (buf []byte, err error)
}
type byteReader struct {
buf []byte
}
func newByteReader(buf []byte) *byteReader {
return &byteReader{
buf: buf,
}
}
func (b *byteReader) Discard(n int) (discarded int, err error) {
if len(b.buf) < n {
return 0, io.ErrUnexpectedEOF
}
b.buf = b.buf[n:]
return n, nil
}
func (b *byteReader) ReadFull(n int) (buf []byte, err error) {
if len(b.buf) < n {
return nil, io.ErrUnexpectedEOF
}
buf = b.buf[:n]
b.buf = b.buf[n:]
return buf, nil
}
type PackBlobIterator struct { type PackBlobIterator struct {
packID restic.ID packID restic.ID
rd *bufio.Reader rd discardReader
currentOffset uint currentOffset uint
blobs []restic.Blob blobs []restic.Blob
key *crypto.Key key *crypto.Key
dec *zstd.Decoder dec *zstd.Decoder
buf []byte
decode []byte decode []byte
} }
@ -1081,7 +1126,7 @@ type PackBlobValue struct {
var ErrPackEOF = errors.New("reached EOF of pack file") var ErrPackEOF = errors.New("reached EOF of pack file")
func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint, func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator { blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator {
return &PackBlobIterator{ return &PackBlobIterator{
packID: packID, packID: packID,
@ -1116,21 +1161,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry) debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry)
if uint(cap(b.buf)) < entry.Length { buf, err := b.rd.ReadFull(int(entry.Length))
b.buf = make([]byte, entry.Length)
}
b.buf = b.buf[:entry.Length]
n, err := io.ReadFull(b.rd, b.buf)
if err != nil { if err != nil {
debug.Log(" read error %v", err) debug.Log(" read error %v", err)
return PackBlobValue{}, fmt.Errorf("readFull: %w", err) return PackBlobValue{}, fmt.Errorf("readFull: %w", err)
} }
if n != len(b.buf) {
return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, b.packID.Str(), len(b.buf), n)
}
b.currentOffset = entry.Offset + entry.Length b.currentOffset = entry.Offset + entry.Length
if int(entry.Length) <= b.key.NonceSize() { if int(entry.Length) <= b.key.NonceSize() {
@ -1139,7 +1175,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
} }
// decryption errors are likely permanent, give the caller a chance to skip them // decryption errors are likely permanent, give the caller a chance to skip them
nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():] nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():]
plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil) plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil { if err != nil {
err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err) err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err)