Merge pull request #4800 from MichaelEischer/cleanup-load
Retry loading of corrupted data from backend / cache
This commit is contained in:
commit
eb6c653f89
25 changed files with 594 additions and 485 deletions
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
@ -146,9 +145,9 @@ func runCat(ctx context.Context, gopts GlobalOptions, args []string) error {
|
|||
return nil
|
||||
|
||||
case "pack":
|
||||
h := backend.Handle{Type: restic.PackFile, Name: id.String()}
|
||||
buf, err := backend.LoadAll(ctx, nil, repo.Backend(), h)
|
||||
if err != nil {
|
||||
buf, err := repo.LoadRaw(ctx, restic.PackFile, id)
|
||||
// allow returning broken pack files
|
||||
if buf == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -316,10 +316,11 @@ func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Reposi
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
be := repo.Backend()
|
||||
h := backend.Handle{
|
||||
Name: packID.String(),
|
||||
Type: restic.PackFile,
|
||||
|
||||
pack, err := repo.LoadRaw(ctx, restic.PackFile, packID)
|
||||
// allow processing broken pack files
|
||||
if pack == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
@ -331,19 +332,11 @@ func loadBlobs(ctx context.Context, opts DebugExamineOptions, repo restic.Reposi
|
|||
wg.Go(func() error {
|
||||
for _, blob := range list {
|
||||
Printf(" loading blob %v at %v (length %v)\n", blob.ID, blob.Offset, blob.Length)
|
||||
buf := make([]byte, blob.Length)
|
||||
err := be.Load(ctx, h, int(blob.Length), int64(blob.Offset), func(rd io.Reader) error {
|
||||
n, err := io.ReadFull(rd, buf)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read error after %d bytes: %v", n, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
Warnf("error read: %v\n", err)
|
||||
if int(blob.Offset+blob.Length) > len(pack) {
|
||||
Warnf("skipping truncated blob\n")
|
||||
continue
|
||||
}
|
||||
|
||||
buf := pack[blob.Offset : blob.Offset+blob.Length]
|
||||
key := repo.Key()
|
||||
|
||||
nonce, plaintext := buf[:key.NonceSize()], buf[key.NonceSize():]
|
||||
|
@ -492,8 +485,9 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo
|
|||
}
|
||||
Printf(" file size is %v\n", fi.Size)
|
||||
|
||||
buf, err := backend.LoadAll(ctx, nil, repo.Backend(), h)
|
||||
if err != nil {
|
||||
buf, err := repo.LoadRaw(ctx, restic.PackFile, id)
|
||||
// also process damaged pack files
|
||||
if buf == nil {
|
||||
return err
|
||||
}
|
||||
gotID := restic.Hash(buf)
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
@ -17,8 +17,6 @@ var cmdRepairPacks = &cobra.Command{
|
|||
Use: "packs [packIDs...]",
|
||||
Short: "Salvage damaged pack files",
|
||||
Long: `
|
||||
WARNING: The CLI for this command is experimental and will likely change in the future!
|
||||
|
||||
The "repair packs" command extracts intact blobs from the specified pack files, rebuilds
|
||||
the index to remove the damaged pack files and removes the pack files from the repository.
|
||||
|
||||
|
@ -68,20 +66,17 @@ func runRepairPacks(ctx context.Context, gopts GlobalOptions, term *termstatus.T
|
|||
|
||||
printer.P("saving backup copies of pack files to current folder")
|
||||
for id := range ids {
|
||||
buf, err := repo.LoadRaw(ctx, restic.PackFile, id)
|
||||
// corrupted data is fine
|
||||
if buf == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f, err := os.OpenFile("pack-"+id.String(), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o666)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = repo.Backend().Load(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}, 0, 0, func(rd io.Reader) error {
|
||||
_, err := f.Seek(0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(f, rd)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
if _, err := io.Copy(f, bytes.NewReader(buf)); err != nil {
|
||||
_ = f.Close()
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -247,7 +247,7 @@ func (b *Local) openReader(_ context.Context, h backend.Handle, length int, offs
|
|||
}
|
||||
|
||||
if length > 0 {
|
||||
return backend.LimitReadCloser(f, int64(length)), nil
|
||||
return util.LimitReadCloser(f, int64(length)), nil
|
||||
}
|
||||
|
||||
return f, nil
|
||||
|
|
|
@ -437,7 +437,7 @@ func (r *SFTP) Load(ctx context.Context, h backend.Handle, length int, offset in
|
|||
|
||||
// check the underlying reader to be agnostic to however fn() handles the returned error
|
||||
_, rderr := rd.Read([]byte{0})
|
||||
if rderr == io.EOF && rd.(*backend.LimitedReadCloser).N != 0 {
|
||||
if rderr == io.EOF && rd.(*util.LimitedReadCloser).N != 0 {
|
||||
// file is too short
|
||||
return fmt.Errorf("%w: %v", errTooShort, err)
|
||||
}
|
||||
|
@ -463,7 +463,7 @@ func (r *SFTP) openReader(_ context.Context, h backend.Handle, length int, offse
|
|||
if length > 0 {
|
||||
// unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader
|
||||
// limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go
|
||||
return backend.LimitReadCloser(f, int64(length)), nil
|
||||
return util.LimitReadCloser(f, int64(length)), nil
|
||||
}
|
||||
|
||||
return f, nil
|
||||
|
|
|
@ -36,6 +36,19 @@ func beTest(ctx context.Context, be backend.Backend, h backend.Handle) (bool, er
|
|||
return err == nil, err
|
||||
}
|
||||
|
||||
func LoadAll(ctx context.Context, be backend.Backend, h backend.Handle) ([]byte, error) {
|
||||
var buf []byte
|
||||
err := be.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||
var err error
|
||||
buf, err = io.ReadAll(rd)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// TestStripPasswordCall tests that the StripPassword method of a factory can be called without crashing.
|
||||
// It does not verify whether passwords are removed correctly
|
||||
func (s *Suite[C]) TestStripPasswordCall(_ *testing.T) {
|
||||
|
@ -94,7 +107,7 @@ func (s *Suite[C]) TestConfig(t *testing.T) {
|
|||
var testString = "Config"
|
||||
|
||||
// create config and read it back
|
||||
_, err := backend.LoadAll(context.TODO(), nil, b, backend.Handle{Type: backend.ConfigFile})
|
||||
_, err := LoadAll(context.TODO(), b, backend.Handle{Type: backend.ConfigFile})
|
||||
if err == nil {
|
||||
t.Fatalf("did not get expected error for non-existing config")
|
||||
}
|
||||
|
@ -110,7 +123,7 @@ func (s *Suite[C]) TestConfig(t *testing.T) {
|
|||
// same config
|
||||
for _, name := range []string{"", "foo", "bar", "0000000000000000000000000000000000000000000000000000000000000000"} {
|
||||
h := backend.Handle{Type: backend.ConfigFile, Name: name}
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, b, h)
|
||||
buf, err := LoadAll(context.TODO(), b, h)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to read config with name %q: %+v", name, err)
|
||||
}
|
||||
|
@ -519,7 +532,7 @@ func (s *Suite[C]) TestSave(t *testing.T) {
|
|||
err := b.Save(context.TODO(), h, backend.NewByteReader(data, b.Hasher()))
|
||||
test.OK(t, err)
|
||||
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, b, h)
|
||||
buf, err := LoadAll(context.TODO(), b, h)
|
||||
test.OK(t, err)
|
||||
if len(buf) != len(data) {
|
||||
t.Fatalf("number of bytes does not match, want %v, got %v", len(data), len(buf))
|
||||
|
@ -821,7 +834,7 @@ func (s *Suite[C]) TestBackend(t *testing.T) {
|
|||
|
||||
// test Load()
|
||||
h := backend.Handle{Type: tpe, Name: ts.id}
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, b, h)
|
||||
buf, err := LoadAll(context.TODO(), b, h)
|
||||
test.OK(t, err)
|
||||
test.Equals(t, ts.data, string(buf))
|
||||
|
||||
|
|
15
internal/backend/util/limited_reader.go
Normal file
15
internal/backend/util/limited_reader.go
Normal file
|
@ -0,0 +1,15 @@
|
|||
package util
|
||||
|
||||
import "io"
|
||||
|
||||
// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method.
|
||||
type LimitedReadCloser struct {
|
||||
io.Closer
|
||||
io.LimitedReader
|
||||
}
|
||||
|
||||
// LimitReadCloser returns a new reader wraps r in an io.LimitedReader, but also
|
||||
// exposes the Close() method.
|
||||
func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser {
|
||||
return &LimitedReadCloser{Closer: r, LimitedReader: io.LimitedReader{R: r, N: n}}
|
||||
}
|
|
@ -1,76 +0,0 @@
|
|||
package backend
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/minio/sha256-simd"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
)
|
||||
|
||||
func verifyContentMatchesName(s string, data []byte) (bool, error) {
|
||||
if len(s) != hex.EncodedLen(sha256.Size) {
|
||||
return false, fmt.Errorf("invalid length for ID: %q", s)
|
||||
}
|
||||
|
||||
b, err := hex.DecodeString(s)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("invalid ID: %s", err)
|
||||
}
|
||||
var id [sha256.Size]byte
|
||||
copy(id[:], b)
|
||||
|
||||
hashed := sha256.Sum256(data)
|
||||
return id == hashed, nil
|
||||
}
|
||||
|
||||
// LoadAll reads all data stored in the backend for the handle into the given
|
||||
// buffer, which is truncated. If the buffer is not large enough or nil, a new
|
||||
// one is allocated.
|
||||
func LoadAll(ctx context.Context, buf []byte, be Backend, h Handle) ([]byte, error) {
|
||||
retriedInvalidData := false
|
||||
err := be.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||
// make sure this is idempotent, in case an error occurs this function may be called multiple times!
|
||||
wr := bytes.NewBuffer(buf[:0])
|
||||
_, cerr := io.Copy(wr, rd)
|
||||
if cerr != nil {
|
||||
return cerr
|
||||
}
|
||||
buf = wr.Bytes()
|
||||
|
||||
// retry loading damaged data only once. If a file fails to download correctly
|
||||
// the second time, then it is likely corrupted at the backend. Return the data
|
||||
// to the caller in that case to let it decide what to do with the data.
|
||||
if !retriedInvalidData && h.Type != ConfigFile {
|
||||
if matches, err := verifyContentMatchesName(h.Name, buf); err == nil && !matches {
|
||||
debug.Log("retry loading broken blob %v", h)
|
||||
retriedInvalidData = true
|
||||
return errors.Errorf("loadAll(%v): invalid data returned", h)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method.
|
||||
type LimitedReadCloser struct {
|
||||
io.Closer
|
||||
io.LimitedReader
|
||||
}
|
||||
|
||||
// LimitReadCloser returns a new reader wraps r in an io.LimitedReader, but also
|
||||
// exposes the Close() method.
|
||||
func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser {
|
||||
return &LimitedReadCloser{Closer: r, LimitedReader: io.LimitedReader{R: r, N: n}}
|
||||
}
|
|
@ -1,149 +0,0 @@
|
|||
package backend_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/mem"
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
const KiB = 1 << 10
|
||||
const MiB = 1 << 20
|
||||
|
||||
func TestLoadAll(t *testing.T) {
|
||||
b := mem.New()
|
||||
var buf []byte
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
data := rtest.Random(23+i, rand.Intn(MiB)+500*KiB)
|
||||
|
||||
id := restic.Hash(data)
|
||||
h := backend.Handle{Name: id.String(), Type: backend.PackFile}
|
||||
err := b.Save(context.TODO(), h, backend.NewByteReader(data, b.Hasher()))
|
||||
rtest.OK(t, err)
|
||||
|
||||
buf, err := backend.LoadAll(context.TODO(), buf, b, backend.Handle{Type: backend.PackFile, Name: id.String()})
|
||||
rtest.OK(t, err)
|
||||
|
||||
if len(buf) != len(data) {
|
||||
t.Errorf("length of returned buffer does not match, want %d, got %d", len(data), len(buf))
|
||||
continue
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf, data) {
|
||||
t.Errorf("wrong data returned")
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func save(t testing.TB, be backend.Backend, buf []byte) backend.Handle {
|
||||
id := restic.Hash(buf)
|
||||
h := backend.Handle{Name: id.String(), Type: backend.PackFile}
|
||||
err := be.Save(context.TODO(), h, backend.NewByteReader(buf, be.Hasher()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return h
|
||||
}
|
||||
|
||||
type quickRetryBackend struct {
|
||||
backend.Backend
|
||||
}
|
||||
|
||||
func (be *quickRetryBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
err := be.Backend.Load(ctx, h, length, offset, fn)
|
||||
if err != nil {
|
||||
// retry
|
||||
err = be.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func TestLoadAllBroken(t *testing.T) {
|
||||
b := mock.NewBackend()
|
||||
|
||||
data := rtest.Random(23, rand.Intn(MiB)+500*KiB)
|
||||
id := restic.Hash(data)
|
||||
// damage buffer
|
||||
data[0] ^= 0xff
|
||||
|
||||
b.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
return io.NopCloser(bytes.NewReader(data)), nil
|
||||
}
|
||||
|
||||
// must fail on first try
|
||||
_, err := backend.LoadAll(context.TODO(), nil, b, backend.Handle{Type: backend.PackFile, Name: id.String()})
|
||||
if err == nil {
|
||||
t.Fatalf("missing expected error")
|
||||
}
|
||||
|
||||
// must return the broken data after a retry
|
||||
be := &quickRetryBackend{Backend: b}
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, be, backend.Handle{Type: backend.PackFile, Name: id.String()})
|
||||
rtest.OK(t, err)
|
||||
|
||||
if !bytes.Equal(buf, data) {
|
||||
t.Fatalf("wrong data returned")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadAllAppend(t *testing.T) {
|
||||
b := mem.New()
|
||||
|
||||
h1 := save(t, b, []byte("foobar test string"))
|
||||
randomData := rtest.Random(23, rand.Intn(MiB)+500*KiB)
|
||||
h2 := save(t, b, randomData)
|
||||
|
||||
var tests = []struct {
|
||||
handle backend.Handle
|
||||
buf []byte
|
||||
want []byte
|
||||
}{
|
||||
{
|
||||
handle: h1,
|
||||
buf: nil,
|
||||
want: []byte("foobar test string"),
|
||||
},
|
||||
{
|
||||
handle: h1,
|
||||
buf: []byte("xxx"),
|
||||
want: []byte("foobar test string"),
|
||||
},
|
||||
{
|
||||
handle: h2,
|
||||
buf: nil,
|
||||
want: randomData,
|
||||
},
|
||||
{
|
||||
handle: h2,
|
||||
buf: make([]byte, 0, 200),
|
||||
want: randomData,
|
||||
},
|
||||
{
|
||||
handle: h2,
|
||||
buf: []byte("foobarbaz"),
|
||||
want: randomData,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run("", func(t *testing.T) {
|
||||
buf, err := backend.LoadAll(context.TODO(), test.buf, b, test.handle)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf, test.want) {
|
||||
t.Errorf("wrong data returned, want %q, got %q", test.want, buf)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
35
internal/cache/backend.go
vendored
35
internal/cache/backend.go
vendored
|
@ -40,7 +40,8 @@ func (b *Backend) Remove(ctx context.Context, h backend.Handle) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return b.Cache.remove(h)
|
||||
_, err = b.Cache.remove(h)
|
||||
return err
|
||||
}
|
||||
|
||||
func autoCacheTypes(h backend.Handle) bool {
|
||||
|
@ -79,10 +80,9 @@ func (b *Backend) Save(ctx context.Context, h backend.Handle, rd backend.RewindR
|
|||
return err
|
||||
}
|
||||
|
||||
err = b.Cache.Save(h, rd)
|
||||
err = b.Cache.save(h, rd)
|
||||
if err != nil {
|
||||
debug.Log("unable to save %v to cache: %v", h, err)
|
||||
_ = b.Cache.remove(h)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -120,11 +120,11 @@ func (b *Backend) cacheFile(ctx context.Context, h backend.Handle) error {
|
|||
if !b.Cache.Has(h) {
|
||||
// nope, it's still not in the cache, pull it from the repo and save it
|
||||
err := b.Backend.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||
return b.Cache.Save(h, rd)
|
||||
return b.Cache.save(h, rd)
|
||||
})
|
||||
if err != nil {
|
||||
// try to remove from the cache, ignore errors
|
||||
_ = b.Cache.remove(h)
|
||||
_, _ = b.Cache.remove(h)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -134,9 +134,9 @@ func (b *Backend) cacheFile(ctx context.Context, h backend.Handle) error {
|
|||
|
||||
// loadFromCache will try to load the file from the cache.
|
||||
func (b *Backend) loadFromCache(h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) (bool, error) {
|
||||
rd, err := b.Cache.load(h, length, offset)
|
||||
rd, inCache, err := b.Cache.load(h, length, offset)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return inCache, err
|
||||
}
|
||||
|
||||
err = consumer(rd)
|
||||
|
@ -162,14 +162,10 @@ func (b *Backend) Load(ctx context.Context, h backend.Handle, length int, offset
|
|||
// try loading from cache without checking that the handle is actually cached
|
||||
inCache, err := b.loadFromCache(h, length, offset, consumer)
|
||||
if inCache {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// drop from cache and retry once
|
||||
_ = b.Cache.remove(h)
|
||||
}
|
||||
debug.Log("error loading %v from cache: %v", h, err)
|
||||
// the caller must explicitly use cache.Forget() to remove the cache entry
|
||||
return err
|
||||
}
|
||||
|
||||
// if we don't automatically cache this file type, fall back to the backend
|
||||
if !autoCacheTypes(h) {
|
||||
|
@ -185,6 +181,9 @@ func (b *Backend) Load(ctx context.Context, h backend.Handle, length int, offset
|
|||
|
||||
inCache, err = b.loadFromCache(h, length, offset, consumer)
|
||||
if inCache {
|
||||
if err != nil {
|
||||
debug.Log("error loading %v from cache: %v", h, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -198,13 +197,9 @@ func (b *Backend) Stat(ctx context.Context, h backend.Handle) (backend.FileInfo,
|
|||
debug.Log("cache Stat(%v)", h)
|
||||
|
||||
fi, err := b.Backend.Stat(ctx, h)
|
||||
if err != nil {
|
||||
if b.Backend.IsNotExist(err) {
|
||||
if err != nil && b.Backend.IsNotExist(err) {
|
||||
// try to remove from the cache, ignore errors
|
||||
_ = b.Cache.remove(h)
|
||||
}
|
||||
|
||||
return fi, err
|
||||
_, _ = b.Cache.remove(h)
|
||||
}
|
||||
|
||||
return fi, err
|
||||
|
|
114
internal/cache/backend_test.go
vendored
114
internal/cache/backend_test.go
vendored
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -12,12 +13,13 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/mem"
|
||||
backendtest "github.com/restic/restic/internal/backend/test"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
func loadAndCompare(t testing.TB, be backend.Backend, h backend.Handle, data []byte) {
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, be, h)
|
||||
buf, err := backendtest.LoadAll(context.TODO(), be, h)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -90,7 +92,7 @@ func TestBackend(t *testing.T) {
|
|||
loadAndCompare(t, be, h, data)
|
||||
|
||||
// load data via cache
|
||||
loadAndCompare(t, be, h, data)
|
||||
loadAndCompare(t, wbe, h, data)
|
||||
|
||||
// remove directly
|
||||
remove(t, be, h)
|
||||
|
@ -113,6 +115,77 @@ func TestBackend(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type loadCountingBackend struct {
|
||||
backend.Backend
|
||||
ctr int
|
||||
}
|
||||
|
||||
func (l *loadCountingBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
l.ctr++
|
||||
return l.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
|
||||
func TestOutOfBoundsAccess(t *testing.T) {
|
||||
be := &loadCountingBackend{Backend: mem.New()}
|
||||
c := TestNewCache(t)
|
||||
wbe := c.Wrap(be)
|
||||
|
||||
h, data := randomData(50)
|
||||
save(t, be, h, data)
|
||||
|
||||
// load out of bounds
|
||||
err := wbe.Load(context.TODO(), h, 100, 100, func(rd io.Reader) error {
|
||||
t.Error("cache returned non-existant file section")
|
||||
return errors.New("broken")
|
||||
})
|
||||
test.Assert(t, strings.Contains(err.Error(), " is too short"), "expected too short error, got %v", err)
|
||||
test.Equals(t, 1, be.ctr, "expected file to be loaded only once")
|
||||
// file must nevertheless get cached
|
||||
if !c.Has(h) {
|
||||
t.Errorf("cache doesn't have file after load")
|
||||
}
|
||||
|
||||
// start within bounds, but request too large chunk
|
||||
err = wbe.Load(context.TODO(), h, 100, 0, func(rd io.Reader) error {
|
||||
t.Error("cache returned non-existant file section")
|
||||
return errors.New("broken")
|
||||
})
|
||||
test.Assert(t, strings.Contains(err.Error(), " is too short"), "expected too short error, got %v", err)
|
||||
test.Equals(t, 1, be.ctr, "expected file to be loaded only once")
|
||||
}
|
||||
|
||||
func TestForget(t *testing.T) {
|
||||
be := &loadCountingBackend{Backend: mem.New()}
|
||||
c := TestNewCache(t)
|
||||
wbe := c.Wrap(be)
|
||||
|
||||
h, data := randomData(50)
|
||||
save(t, be, h, data)
|
||||
|
||||
loadAndCompare(t, wbe, h, data)
|
||||
test.Equals(t, 1, be.ctr, "expected file to be loaded once")
|
||||
|
||||
// must still exist even if load returns an error
|
||||
exp := errors.New("error")
|
||||
err := wbe.Load(context.TODO(), h, 0, 0, func(rd io.Reader) error {
|
||||
return exp
|
||||
})
|
||||
test.Equals(t, exp, err, "wrong error")
|
||||
test.Assert(t, c.Has(h), "missing cache entry")
|
||||
|
||||
test.OK(t, c.Forget(h))
|
||||
test.Assert(t, !c.Has(h), "cache entry should have been removed")
|
||||
|
||||
// cache it again
|
||||
loadAndCompare(t, wbe, h, data)
|
||||
test.Assert(t, c.Has(h), "missing cache entry")
|
||||
|
||||
// forget must delete file only once
|
||||
err = c.Forget(h)
|
||||
test.Assert(t, strings.Contains(err.Error(), "circuit breaker prevents repeated deletion of cached file"), "wrong error message %q", err)
|
||||
test.Assert(t, c.Has(h), "cache entry should still exist")
|
||||
}
|
||||
|
||||
type loadErrorBackend struct {
|
||||
backend.Backend
|
||||
loadError error
|
||||
|
@ -140,7 +213,7 @@ func TestErrorBackend(t *testing.T) {
|
|||
loadTest := func(wg *sync.WaitGroup, be backend.Backend) {
|
||||
defer wg.Done()
|
||||
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, be, h)
|
||||
buf, err := backendtest.LoadAll(context.TODO(), be, h)
|
||||
if err == testErr {
|
||||
return
|
||||
}
|
||||
|
@ -165,38 +238,3 @@ func TestErrorBackend(t *testing.T) {
|
|||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestBackendRemoveBroken(t *testing.T) {
|
||||
be := mem.New()
|
||||
c := TestNewCache(t)
|
||||
|
||||
h, data := randomData(5234142)
|
||||
// save directly in backend
|
||||
save(t, be, h, data)
|
||||
|
||||
// prime cache with broken copy
|
||||
broken := append([]byte{}, data...)
|
||||
broken[0] ^= 0xff
|
||||
err := c.Save(h, bytes.NewReader(broken))
|
||||
test.OK(t, err)
|
||||
|
||||
// loadall retries if broken data was returned
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, c.Wrap(be), h)
|
||||
test.OK(t, err)
|
||||
|
||||
if !bytes.Equal(buf, data) {
|
||||
t.Fatalf("wrong data returned")
|
||||
}
|
||||
|
||||
// check that the cache now contains the correct data
|
||||
rd, err := c.load(h, 0, 0)
|
||||
defer func() {
|
||||
_ = rd.Close()
|
||||
}()
|
||||
test.OK(t, err)
|
||||
cached, err := io.ReadAll(rd)
|
||||
test.OK(t, err)
|
||||
if !bytes.Equal(cached, data) {
|
||||
t.Fatalf("wrong data cache")
|
||||
}
|
||||
}
|
||||
|
|
3
internal/cache/cache.go
vendored
3
internal/cache/cache.go
vendored
|
@ -6,6 +6,7 @@ import (
|
|||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -20,6 +21,8 @@ type Cache struct {
|
|||
path string
|
||||
Base string
|
||||
Created bool
|
||||
|
||||
forgotten sync.Map
|
||||
}
|
||||
|
||||
const dirMode = 0700
|
||||
|
|
63
internal/cache/file.go
vendored
63
internal/cache/file.go
vendored
|
@ -1,6 +1,7 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -8,6 +9,7 @@ import (
|
|||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/util"
|
||||
"github.com/restic/restic/internal/crypto"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/fs"
|
||||
|
@ -31,54 +33,54 @@ func (c *Cache) canBeCached(t backend.FileType) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
// Load returns a reader that yields the contents of the file with the
|
||||
// load returns a reader that yields the contents of the file with the
|
||||
// given handle. rd must be closed after use. If an error is returned, the
|
||||
// ReadCloser is nil.
|
||||
func (c *Cache) load(h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
// ReadCloser is nil. The bool return value indicates whether the requested
|
||||
// file exists in the cache. It can be true even when no reader is returned
|
||||
// because length or offset are out of bounds
|
||||
func (c *Cache) load(h backend.Handle, length int, offset int64) (io.ReadCloser, bool, error) {
|
||||
debug.Log("Load(%v, %v, %v) from cache", h, length, offset)
|
||||
if !c.canBeCached(h.Type) {
|
||||
return nil, errors.New("cannot be cached")
|
||||
return nil, false, errors.New("cannot be cached")
|
||||
}
|
||||
|
||||
f, err := fs.Open(c.filename(h))
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
return nil, false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
return nil, errors.WithStack(err)
|
||||
return nil, true, errors.WithStack(err)
|
||||
}
|
||||
|
||||
size := fi.Size()
|
||||
if size <= int64(crypto.CiphertextLength(0)) {
|
||||
_ = f.Close()
|
||||
_ = c.remove(h)
|
||||
return nil, errors.Errorf("cached file %v is truncated, removing", h)
|
||||
return nil, true, errors.Errorf("cached file %v is truncated", h)
|
||||
}
|
||||
|
||||
if size < offset+int64(length) {
|
||||
_ = f.Close()
|
||||
_ = c.remove(h)
|
||||
return nil, errors.Errorf("cached file %v is too short, removing", h)
|
||||
return nil, true, errors.Errorf("cached file %v is too short", h)
|
||||
}
|
||||
|
||||
if offset > 0 {
|
||||
if _, err = f.Seek(offset, io.SeekStart); err != nil {
|
||||
_ = f.Close()
|
||||
return nil, err
|
||||
return nil, true, err
|
||||
}
|
||||
}
|
||||
|
||||
if length <= 0 {
|
||||
return f, nil
|
||||
return f, true, nil
|
||||
}
|
||||
return backend.LimitReadCloser(f, int64(length)), nil
|
||||
return util.LimitReadCloser(f, int64(length)), true, nil
|
||||
}
|
||||
|
||||
// Save saves a file in the cache.
|
||||
func (c *Cache) Save(h backend.Handle, rd io.Reader) error {
|
||||
// save saves a file in the cache.
|
||||
func (c *Cache) save(h backend.Handle, rd io.Reader) error {
|
||||
debug.Log("Save to cache: %v", h)
|
||||
if rd == nil {
|
||||
return errors.New("Save() called with nil reader")
|
||||
|
@ -138,13 +140,34 @@ func (c *Cache) Save(h backend.Handle, rd io.Reader) error {
|
|||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Remove deletes a file. When the file is not cache, no error is returned.
|
||||
func (c *Cache) remove(h backend.Handle) error {
|
||||
if !c.Has(h) {
|
||||
return nil
|
||||
func (c *Cache) Forget(h backend.Handle) error {
|
||||
h.IsMetadata = false
|
||||
|
||||
if _, ok := c.forgotten.Load(h); ok {
|
||||
// Delete a file at most once while restic runs.
|
||||
// This prevents repeatedly caching and forgetting broken files
|
||||
return fmt.Errorf("circuit breaker prevents repeated deletion of cached file %v", h)
|
||||
}
|
||||
|
||||
return fs.Remove(c.filename(h))
|
||||
removed, err := c.remove(h)
|
||||
if removed {
|
||||
c.forgotten.Store(h, struct{}{})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// remove deletes a file. When the file is not cached, no error is returned.
|
||||
func (c *Cache) remove(h backend.Handle) (bool, error) {
|
||||
if !c.canBeCached(h.Type) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
err := fs.Remove(c.filename(h))
|
||||
removed := err == nil
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
err = nil
|
||||
}
|
||||
return removed, err
|
||||
}
|
||||
|
||||
// Clear removes all files of type t from the cache that are not contained in
|
||||
|
|
32
internal/cache/file_test.go
vendored
32
internal/cache/file_test.go
vendored
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/fs"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
@ -22,7 +22,7 @@ import (
|
|||
func generateRandomFiles(t testing.TB, tpe backend.FileType, c *Cache) restic.IDSet {
|
||||
ids := restic.NewIDSet()
|
||||
for i := 0; i < rand.Intn(15)+10; i++ {
|
||||
buf := test.Random(rand.Int(), 1<<19)
|
||||
buf := rtest.Random(rand.Int(), 1<<19)
|
||||
id := restic.Hash(buf)
|
||||
h := backend.Handle{Type: tpe, Name: id.String()}
|
||||
|
||||
|
@ -30,7 +30,7 @@ func generateRandomFiles(t testing.TB, tpe backend.FileType, c *Cache) restic.ID
|
|||
t.Errorf("index %v present before save", id)
|
||||
}
|
||||
|
||||
err := c.Save(h, bytes.NewReader(buf))
|
||||
err := c.save(h, bytes.NewReader(buf))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -48,10 +48,11 @@ func randomID(s restic.IDSet) restic.ID {
|
|||
}
|
||||
|
||||
func load(t testing.TB, c *Cache, h backend.Handle) []byte {
|
||||
rd, err := c.load(h, 0, 0)
|
||||
rd, inCache, err := c.load(h, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rtest.Equals(t, true, inCache, "expected inCache flag to be true")
|
||||
|
||||
if rd == nil {
|
||||
t.Fatalf("load() returned nil reader")
|
||||
|
@ -144,14 +145,14 @@ func TestFileLoad(t *testing.T) {
|
|||
c := TestNewCache(t)
|
||||
|
||||
// save about 5 MiB of data in the cache
|
||||
data := test.Random(rand.Int(), 5234142)
|
||||
data := rtest.Random(rand.Int(), 5234142)
|
||||
id := restic.ID{}
|
||||
copy(id[:], data)
|
||||
h := backend.Handle{
|
||||
Type: restic.PackFile,
|
||||
Name: id.String(),
|
||||
}
|
||||
if err := c.Save(h, bytes.NewReader(data)); err != nil {
|
||||
if err := c.save(h, bytes.NewReader(data)); err != nil {
|
||||
t.Fatalf("Save() returned error: %v", err)
|
||||
}
|
||||
|
||||
|
@ -169,10 +170,11 @@ func TestFileLoad(t *testing.T) {
|
|||
|
||||
for _, test := range tests {
|
||||
t.Run(fmt.Sprintf("%v/%v", test.length, test.offset), func(t *testing.T) {
|
||||
rd, err := c.load(h, test.length, test.offset)
|
||||
rd, inCache, err := c.load(h, test.length, test.offset)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rtest.Equals(t, true, inCache, "expected inCache flag to be true")
|
||||
|
||||
buf, err := io.ReadAll(rd)
|
||||
if err != nil {
|
||||
|
@ -225,7 +227,7 @@ func TestFileSaveConcurrent(t *testing.T) {
|
|||
|
||||
var (
|
||||
c = TestNewCache(t)
|
||||
data = test.Random(1, 10000)
|
||||
data = rtest.Random(1, 10000)
|
||||
g errgroup.Group
|
||||
id restic.ID
|
||||
)
|
||||
|
@ -237,7 +239,7 @@ func TestFileSaveConcurrent(t *testing.T) {
|
|||
}
|
||||
|
||||
for i := 0; i < nproc/2; i++ {
|
||||
g.Go(func() error { return c.Save(h, bytes.NewReader(data)) })
|
||||
g.Go(func() error { return c.save(h, bytes.NewReader(data)) })
|
||||
|
||||
// Can't use load because only the main goroutine may call t.Fatal.
|
||||
g.Go(func() error {
|
||||
|
@ -245,7 +247,7 @@ func TestFileSaveConcurrent(t *testing.T) {
|
|||
// ensure is ENOENT or nil error.
|
||||
time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond)
|
||||
|
||||
f, err := c.load(h, 0, 0)
|
||||
f, _, err := c.load(h, 0, 0)
|
||||
t.Logf("Load error: %v", err)
|
||||
switch {
|
||||
case err == nil:
|
||||
|
@ -264,23 +266,23 @@ func TestFileSaveConcurrent(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
test.OK(t, g.Wait())
|
||||
rtest.OK(t, g.Wait())
|
||||
saved := load(t, c, h)
|
||||
test.Equals(t, data, saved)
|
||||
rtest.Equals(t, data, saved)
|
||||
}
|
||||
|
||||
func TestFileSaveAfterDamage(t *testing.T) {
|
||||
c := TestNewCache(t)
|
||||
test.OK(t, fs.RemoveAll(c.path))
|
||||
rtest.OK(t, fs.RemoveAll(c.path))
|
||||
|
||||
// save a few bytes of data in the cache
|
||||
data := test.Random(123456789, 42)
|
||||
data := rtest.Random(123456789, 42)
|
||||
id := restic.Hash(data)
|
||||
h := backend.Handle{
|
||||
Type: restic.PackFile,
|
||||
Name: id.String(),
|
||||
}
|
||||
if err := c.Save(h, bytes.NewReader(data)); err == nil {
|
||||
if err := c.save(h, bytes.NewReader(data)); err == nil {
|
||||
t.Fatal("Missing error when saving to deleted cache directory")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -532,6 +532,21 @@ func (e *partialReadError) Error() string {
|
|||
|
||||
// checkPack reads a pack and checks the integrity of all blobs.
|
||||
func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
|
||||
err := checkPackInner(ctx, r, id, blobs, size, bufRd, dec)
|
||||
if err != nil {
|
||||
// retry pack verification to detect transient errors
|
||||
err2 := checkPackInner(ctx, r, id, blobs, size, bufRd, dec)
|
||||
if err2 != nil {
|
||||
err = err2
|
||||
} else {
|
||||
err = fmt.Errorf("check successful on second attempt, original error %w", err)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func checkPackInner(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader, dec *zstd.Decoder) error {
|
||||
|
||||
debug.Log("checking pack %v", id.String())
|
||||
|
||||
if len(blobs) == 0 {
|
||||
|
@ -590,11 +605,13 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
|
|||
if err != nil {
|
||||
return &partialReadError{err}
|
||||
}
|
||||
curPos += minHdrStart - curPos
|
||||
}
|
||||
|
||||
// read remainder, which should be the pack header
|
||||
var err error
|
||||
hdrBuf, err = io.ReadAll(bufRd)
|
||||
hdrBuf = make([]byte, int(size-int64(curPos)))
|
||||
_, err = io.ReadFull(bufRd, hdrBuf)
|
||||
if err != nil {
|
||||
return &partialReadError{err}
|
||||
}
|
||||
|
@ -608,12 +625,12 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
|
|||
// failed to load the pack file, return as further checks cannot succeed anyways
|
||||
debug.Log(" error streaming pack (partial %v): %v", isPartialReadError, err)
|
||||
if isPartialReadError {
|
||||
return &ErrPackData{PackID: id, errs: append(errs, errors.Errorf("partial download error: %w", err))}
|
||||
return &ErrPackData{PackID: id, errs: append(errs, fmt.Errorf("partial download error: %w", err))}
|
||||
}
|
||||
|
||||
// The check command suggests to repair files for which a `ErrPackData` is returned. However, this file
|
||||
// completely failed to download such that there's no point in repairing anything.
|
||||
return errors.Errorf("download error: %w", err)
|
||||
return fmt.Errorf("download error: %w", err)
|
||||
}
|
||||
if !hash.Equal(id) {
|
||||
debug.Log("pack ID does not match, want %v, got %v", id, hash)
|
||||
|
@ -725,6 +742,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
|
|||
}
|
||||
|
||||
err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd, dec)
|
||||
|
||||
p.Add(1)
|
||||
if err == nil {
|
||||
continue
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -325,13 +326,60 @@ func induceError(data []byte) {
|
|||
data[pos] ^= 1
|
||||
}
|
||||
|
||||
// errorOnceBackend randomly modifies data when reading a file for the first time.
|
||||
type errorOnceBackend struct {
|
||||
backend.Backend
|
||||
m sync.Map
|
||||
}
|
||||
|
||||
func (b *errorOnceBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||
_, isRetry := b.m.LoadOrStore(h, struct{}{})
|
||||
return b.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
|
||||
if !isRetry && h.Type != restic.ConfigFile {
|
||||
return consumer(errorReadCloser{rd})
|
||||
}
|
||||
return consumer(rd)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCheckerModifiedData(t *testing.T) {
|
||||
repo := repository.TestRepository(t)
|
||||
sn := archiver.TestSnapshot(t, repo, ".", nil)
|
||||
t.Logf("archived as %v", sn.ID().Str())
|
||||
|
||||
beError := &errorBackend{Backend: repo.Backend()}
|
||||
checkRepo := repository.TestOpenBackend(t, beError)
|
||||
errBe := &errorBackend{Backend: repo.Backend()}
|
||||
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
be backend.Backend
|
||||
damage func()
|
||||
check func(t *testing.T, err error)
|
||||
}{
|
||||
{
|
||||
"errorBackend",
|
||||
errBe,
|
||||
func() {
|
||||
errBe.ProduceErrors = true
|
||||
},
|
||||
func(t *testing.T, err error) {
|
||||
if err == nil {
|
||||
t.Fatal("no error found, checker is broken")
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
"errorOnceBackend",
|
||||
&errorOnceBackend{Backend: repo.Backend()},
|
||||
func() {},
|
||||
func(t *testing.T, err error) {
|
||||
if !strings.Contains(err.Error(), "check successful on second attempt, original error pack") {
|
||||
t.Fatalf("wrong error found, got %v", err)
|
||||
}
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
checkRepo := repository.TestOpenBackend(t, test.be)
|
||||
|
||||
chkr := checker.New(checkRepo, false)
|
||||
|
||||
|
@ -344,8 +392,8 @@ func TestCheckerModifiedData(t *testing.T) {
|
|||
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
|
||||
}
|
||||
|
||||
beError.ProduceErrors = true
|
||||
errFound := false
|
||||
test.damage()
|
||||
var err error
|
||||
for _, err := range checkPacks(chkr) {
|
||||
t.Logf("pack error: %v", err)
|
||||
}
|
||||
|
@ -354,13 +402,15 @@ func TestCheckerModifiedData(t *testing.T) {
|
|||
t.Logf("struct error: %v", err)
|
||||
}
|
||||
|
||||
for _, err := range checkData(chkr) {
|
||||
t.Logf("data error: %v", err)
|
||||
errFound = true
|
||||
for _, cerr := range checkData(chkr) {
|
||||
t.Logf("data error: %v", cerr)
|
||||
if err == nil {
|
||||
err = cerr
|
||||
}
|
||||
}
|
||||
|
||||
if !errFound {
|
||||
t.Fatal("no error found, checker is broken")
|
||||
test.check(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,6 @@ package migrations
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
|
@ -89,11 +88,7 @@ func (m *UpgradeRepoV2) Apply(ctx context.Context, repo restic.Repository) error
|
|||
h := backend.Handle{Type: restic.ConfigFile}
|
||||
|
||||
// read raw config file and save it to a temp dir, just in case
|
||||
var rawConfigFile []byte
|
||||
err = repo.Backend().Load(ctx, h, 0, 0, func(rd io.Reader) (err error) {
|
||||
rawConfigFile, err = io.ReadAll(rd)
|
||||
return err
|
||||
})
|
||||
rawConfigFile, err := repo.LoadRaw(ctx, restic.ConfigFile, restic.ID{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config file failed: %w", err)
|
||||
}
|
||||
|
|
|
@ -178,8 +178,7 @@ func SearchKey(ctx context.Context, s *Repository, password string, maxKeys int,
|
|||
|
||||
// LoadKey loads a key from the backend.
|
||||
func LoadKey(ctx context.Context, s *Repository, id restic.ID) (k *Key, err error) {
|
||||
h := backend.Handle{Type: restic.KeyFile, Name: id.String()}
|
||||
data, err := backend.LoadAll(ctx, nil, s.be, h)
|
||||
data, err := s.LoadRaw(ctx, restic.KeyFile, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
56
internal/repository/raw.go
Normal file
56
internal/repository/raw.go
Normal file
|
@ -0,0 +1,56 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// LoadRaw reads all data stored in the backend for the file with id and filetype t.
|
||||
// If the backend returns data that does not match the id, then the buffer is returned
|
||||
// along with an error that is a restic.ErrInvalidData error.
|
||||
func (r *Repository) LoadRaw(ctx context.Context, t restic.FileType, id restic.ID) (buf []byte, err error) {
|
||||
h := backend.Handle{Type: t, Name: id.String()}
|
||||
|
||||
buf, err = loadRaw(ctx, r.be, h)
|
||||
|
||||
// retry loading damaged data only once. If a file fails to download correctly
|
||||
// the second time, then it is likely corrupted at the backend.
|
||||
if h.Type != backend.ConfigFile && id != restic.Hash(buf) {
|
||||
if r.Cache != nil {
|
||||
// Cleanup cache to make sure it's not the cached copy that is broken.
|
||||
// Ignore error as there's not much we can do in that case.
|
||||
_ = r.Cache.Forget(h)
|
||||
}
|
||||
|
||||
buf, err = loadRaw(ctx, r.be, h)
|
||||
|
||||
if err == nil && id != restic.Hash(buf) {
|
||||
// Return corrupted data to the caller if it is still broken the second time to
|
||||
// let the caller decide what to do with the data.
|
||||
return buf, fmt.Errorf("LoadRaw(%v): %w", h, restic.ErrInvalidData)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
func loadRaw(ctx context.Context, be backend.Backend, h backend.Handle) (buf []byte, err error) {
|
||||
err = be.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||
wr := new(bytes.Buffer)
|
||||
_, cerr := io.Copy(wr, rd)
|
||||
if cerr != nil {
|
||||
return cerr
|
||||
}
|
||||
buf = wr.Bytes()
|
||||
return cerr
|
||||
})
|
||||
return buf, err
|
||||
}
|
108
internal/repository/raw_test.go
Normal file
108
internal/repository/raw_test.go
Normal file
|
@ -0,0 +1,108 @@
|
|||
package repository_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/mem"
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/cache"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
const KiB = 1 << 10
|
||||
const MiB = 1 << 20
|
||||
|
||||
func TestLoadRaw(t *testing.T) {
|
||||
b := mem.New()
|
||||
repo, err := repository.New(b, repository.Options{})
|
||||
rtest.OK(t, err)
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
data := rtest.Random(23+i, 500*KiB)
|
||||
|
||||
id := restic.Hash(data)
|
||||
h := backend.Handle{Name: id.String(), Type: backend.PackFile}
|
||||
err := b.Save(context.TODO(), h, backend.NewByteReader(data, b.Hasher()))
|
||||
rtest.OK(t, err)
|
||||
|
||||
buf, err := repo.LoadRaw(context.TODO(), backend.PackFile, id)
|
||||
rtest.OK(t, err)
|
||||
|
||||
if len(buf) != len(data) {
|
||||
t.Errorf("length of returned buffer does not match, want %d, got %d", len(data), len(buf))
|
||||
continue
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf, data) {
|
||||
t.Errorf("wrong data returned")
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadRawBroken(t *testing.T) {
|
||||
b := mock.NewBackend()
|
||||
repo, err := repository.New(b, repository.Options{})
|
||||
rtest.OK(t, err)
|
||||
|
||||
data := rtest.Random(23, 10*KiB)
|
||||
id := restic.Hash(data)
|
||||
// damage buffer
|
||||
data[0] ^= 0xff
|
||||
|
||||
b.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
return io.NopCloser(bytes.NewReader(data)), nil
|
||||
}
|
||||
|
||||
// must detect but still return corrupt data
|
||||
buf, err := repo.LoadRaw(context.TODO(), backend.PackFile, id)
|
||||
rtest.Assert(t, bytes.Equal(buf, data), "wrong data returned")
|
||||
rtest.Assert(t, errors.Is(err, restic.ErrInvalidData), "missing expected ErrInvalidData error, got %v", err)
|
||||
|
||||
// cause the first access to fail, but repair the data for the second access
|
||||
data[0] ^= 0xff
|
||||
loadCtr := 0
|
||||
b.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
data[0] ^= 0xff
|
||||
loadCtr++
|
||||
return io.NopCloser(bytes.NewReader(data)), nil
|
||||
}
|
||||
|
||||
// must retry load of corrupted data
|
||||
buf, err = repo.LoadRaw(context.TODO(), backend.PackFile, id)
|
||||
rtest.OK(t, err)
|
||||
rtest.Assert(t, bytes.Equal(buf, data), "wrong data returned")
|
||||
rtest.Equals(t, 2, loadCtr, "missing retry on broken data")
|
||||
}
|
||||
|
||||
func TestLoadRawBrokenWithCache(t *testing.T) {
|
||||
b := mock.NewBackend()
|
||||
c := cache.TestNewCache(t)
|
||||
repo, err := repository.New(b, repository.Options{})
|
||||
rtest.OK(t, err)
|
||||
repo.UseCache(c)
|
||||
|
||||
data := rtest.Random(23, 10*KiB)
|
||||
id := restic.Hash(data)
|
||||
|
||||
loadCtr := 0
|
||||
// cause the first access to fail, but repair the data for the second access
|
||||
b.OpenReaderFn = func(ctx context.Context, h backend.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
data[0] ^= 0xff
|
||||
loadCtr++
|
||||
return io.NopCloser(bytes.NewReader(data)), nil
|
||||
}
|
||||
|
||||
// must retry load of corrupted data
|
||||
buf, err := repo.LoadRaw(context.TODO(), backend.SnapshotFile, id)
|
||||
rtest.OK(t, err)
|
||||
rtest.Assert(t, bytes.Equal(buf, data), "wrong data returned")
|
||||
rtest.Equals(t, 2, loadCtr, "missing retry on broken data")
|
||||
}
|
|
@ -30,14 +30,10 @@ func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet,
|
|||
}
|
||||
|
||||
err := repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||
if err != nil {
|
||||
// Fallback path
|
||||
buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
|
||||
if err != nil {
|
||||
printer.E("failed to load blob %v: %v", blob.ID, err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
id, _, _, err := repo.SaveBlob(wgCtx, blob.Type, buf, restic.ID{}, true)
|
||||
if !id.Equal(blob.ID) {
|
||||
panic("pack id mismatch during upload")
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
backendtest "github.com/restic/restic/internal/backend/test"
|
||||
"github.com/restic/restic/internal/index"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
@ -24,7 +25,7 @@ func listBlobs(repo restic.Repository) restic.BlobSet {
|
|||
}
|
||||
|
||||
func replaceFile(t *testing.T, repo restic.Repository, h backend.Handle, damage func([]byte) []byte) {
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, repo.Backend(), h)
|
||||
buf, err := backendtest.LoadAll(context.TODO(), repo.Backend(), h)
|
||||
test.OK(t, err)
|
||||
buf = damage(buf)
|
||||
test.OK(t, repo.Backend().Remove(context.TODO(), h))
|
||||
|
|
|
@ -174,46 +174,11 @@ func (r *Repository) LoadUnpacked(ctx context.Context, t restic.FileType, id res
|
|||
id = restic.ID{}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
h := backend.Handle{Type: t, Name: id.String()}
|
||||
retriedInvalidData := false
|
||||
var dataErr error
|
||||
wr := new(bytes.Buffer)
|
||||
|
||||
err := r.be.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||
// make sure this call is idempotent, in case an error occurs
|
||||
wr.Reset()
|
||||
_, cerr := io.Copy(wr, rd)
|
||||
if cerr != nil {
|
||||
return cerr
|
||||
}
|
||||
|
||||
buf := wr.Bytes()
|
||||
if t != restic.ConfigFile && !restic.Hash(buf).Equal(id) {
|
||||
debug.Log("retry loading broken blob %v", h)
|
||||
if !retriedInvalidData {
|
||||
retriedInvalidData = true
|
||||
} else {
|
||||
// with a canceled context there is not guarantee which error will
|
||||
// be returned by `be.Load`.
|
||||
dataErr = fmt.Errorf("load(%v): %w", h, restic.ErrInvalidData)
|
||||
cancel()
|
||||
}
|
||||
return restic.ErrInvalidData
|
||||
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if dataErr != nil {
|
||||
return nil, dataErr
|
||||
}
|
||||
buf, err := r.LoadRaw(ctx, t, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
buf := wr.Bytes()
|
||||
nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():]
|
||||
plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil)
|
||||
if err != nil {
|
||||
|
@ -270,16 +235,27 @@ func (r *Repository) LoadBlob(ctx context.Context, t restic.BlobType, id restic.
|
|||
// try cached pack files first
|
||||
sortCachedPacksFirst(r.Cache, blobs)
|
||||
|
||||
var lastError error
|
||||
buf, err := r.loadBlob(ctx, blobs, buf)
|
||||
if err != nil {
|
||||
if r.Cache != nil {
|
||||
for _, blob := range blobs {
|
||||
debug.Log("blob %v/%v found: %v", t, id, blob)
|
||||
|
||||
if blob.Type != t {
|
||||
debug.Log("blob %v has wrong block type, want %v", blob, t)
|
||||
h := backend.Handle{Type: restic.PackFile, Name: blob.PackID.String(), IsMetadata: blob.Type.IsMetadata()}
|
||||
// ignore errors as there's not much we can do here
|
||||
_ = r.Cache.Forget(h)
|
||||
}
|
||||
}
|
||||
|
||||
buf, err = r.loadBlob(ctx, blobs, buf)
|
||||
}
|
||||
return buf, err
|
||||
}
|
||||
|
||||
func (r *Repository) loadBlob(ctx context.Context, blobs []restic.PackedBlob, buf []byte) ([]byte, error) {
|
||||
var lastError error
|
||||
for _, blob := range blobs {
|
||||
debug.Log("blob %v found: %v", blob.BlobHandle, blob)
|
||||
// load blob from pack
|
||||
h := backend.Handle{Type: restic.PackFile, Name: blob.PackID.String(), IsMetadata: t.IsMetadata()}
|
||||
h := backend.Handle{Type: restic.PackFile, Name: blob.PackID.String(), IsMetadata: blob.Type.IsMetadata()}
|
||||
|
||||
switch {
|
||||
case cap(buf) < int(blob.Length):
|
||||
|
@ -288,42 +264,26 @@ func (r *Repository) LoadBlob(ctx context.Context, t restic.BlobType, id restic.
|
|||
buf = buf[:blob.Length]
|
||||
}
|
||||
|
||||
n, err := backend.ReadAt(ctx, r.be, h, int64(blob.Offset), buf)
|
||||
_, err := backend.ReadAt(ctx, r.be, h, int64(blob.Offset), buf)
|
||||
if err != nil {
|
||||
debug.Log("error loading blob %v: %v", blob, err)
|
||||
lastError = err
|
||||
continue
|
||||
}
|
||||
|
||||
if uint(n) != blob.Length {
|
||||
lastError = errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d",
|
||||
id.Str(), blob.Length, uint(n))
|
||||
debug.Log("lastError: %v", lastError)
|
||||
continue
|
||||
}
|
||||
it := NewPackBlobIterator(blob.PackID, newByteReader(buf), uint(blob.Offset), []restic.Blob{blob.Blob}, r.key, r.getZstdDecoder())
|
||||
pbv, err := it.Next()
|
||||
|
||||
// decrypt
|
||||
nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():]
|
||||
plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil)
|
||||
if err == nil {
|
||||
err = pbv.Err
|
||||
}
|
||||
if err != nil {
|
||||
lastError = errors.Errorf("decrypting blob %v failed: %v", id, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if blob.IsCompressed() {
|
||||
plaintext, err = r.getZstdDecoder().DecodeAll(plaintext, make([]byte, 0, blob.DataLength()))
|
||||
if err != nil {
|
||||
lastError = errors.Errorf("decompressing blob %v failed: %v", id, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// check hash
|
||||
if !restic.Hash(plaintext).Equal(id) {
|
||||
lastError = errors.Errorf("blob %v returned invalid hash", id)
|
||||
debug.Log("error decoding blob %v: %v", blob, err)
|
||||
lastError = err
|
||||
continue
|
||||
}
|
||||
|
||||
plaintext := pbv.Plaintext
|
||||
if len(plaintext) > cap(buf) {
|
||||
return plaintext, nil
|
||||
}
|
||||
|
@ -337,7 +297,7 @@ func (r *Repository) LoadBlob(ctx context.Context, t restic.BlobType, id restic.
|
|||
return nil, lastError
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("loading blob %v from %v packs failed", id.Str(), len(blobs))
|
||||
return nil, errors.Errorf("loading %v from %v packs failed", blobs[0].BlobHandle, len(blobs))
|
||||
}
|
||||
|
||||
// LookupBlobSize returns the size of blob id.
|
||||
|
@ -909,7 +869,17 @@ func (r *Repository) List(ctx context.Context, t restic.FileType, fn func(restic
|
|||
func (r *Repository) ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, uint32, error) {
|
||||
h := backend.Handle{Type: restic.PackFile, Name: id.String()}
|
||||
|
||||
return pack.List(r.Key(), backend.ReaderAt(ctx, r.Backend(), h), size)
|
||||
entries, hdrSize, err := pack.List(r.Key(), backend.ReaderAt(ctx, r.Backend(), h), size)
|
||||
if err != nil {
|
||||
if r.Cache != nil {
|
||||
// ignore error as there is not much we can do here
|
||||
_ = r.Cache.Forget(h)
|
||||
}
|
||||
|
||||
// retry on error
|
||||
entries, hdrSize, err = pack.List(r.Key(), backend.ReaderAt(ctx, r.Backend(), h), size)
|
||||
}
|
||||
return entries, hdrSize, err
|
||||
}
|
||||
|
||||
// Delete calls backend.Delete() if implemented, and returns an error
|
||||
|
@ -1023,7 +993,7 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn
|
|||
}
|
||||
|
||||
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
|
||||
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}
|
||||
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: blobs[0].Type.IsMetadata()}
|
||||
|
||||
dataStart := blobs[0].Offset
|
||||
dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length
|
||||
|
|
|
@ -9,16 +9,20 @@ import (
|
|||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/local"
|
||||
"github.com/restic/restic/internal/backend/mem"
|
||||
"github.com/restic/restic/internal/cache"
|
||||
"github.com/restic/restic/internal/crypto"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/index"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
@ -139,6 +143,28 @@ func testLoadBlob(t *testing.T, version uint) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestLoadBlobBroken(t *testing.T) {
|
||||
be := mem.New()
|
||||
repo := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{}).(*repository.Repository)
|
||||
buf := test.Random(42, 1000)
|
||||
|
||||
var wg errgroup.Group
|
||||
repo.StartPackUploader(context.TODO(), &wg)
|
||||
id, _, _, err := repo.SaveBlob(context.TODO(), restic.TreeBlob, buf, restic.ID{}, false)
|
||||
rtest.OK(t, err)
|
||||
rtest.OK(t, repo.Flush(context.Background()))
|
||||
|
||||
// setup cache after saving the blob to make sure that the damageOnceBackend damages the cached data
|
||||
c := cache.TestNewCache(t)
|
||||
repo.UseCache(c)
|
||||
|
||||
data, err := repo.LoadBlob(context.TODO(), restic.TreeBlob, id, nil)
|
||||
rtest.OK(t, err)
|
||||
rtest.Assert(t, bytes.Equal(buf, data), "data mismatch")
|
||||
pack := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID
|
||||
rtest.Assert(t, c.Has(backend.Handle{Type: restic.PackFile, Name: pack.String()}), "expected tree pack to be cached")
|
||||
}
|
||||
|
||||
func BenchmarkLoadBlob(b *testing.B) {
|
||||
repository.BenchmarkAllVersions(b, benchmarkLoadBlob)
|
||||
}
|
||||
|
@ -254,16 +280,13 @@ func TestRepositoryLoadUnpackedBroken(t *testing.T) {
|
|||
err := repo.Backend().Save(context.TODO(), h, backend.NewByteReader(data, repo.Backend().Hasher()))
|
||||
rtest.OK(t, err)
|
||||
|
||||
// without a retry backend this will just return an error that the file is broken
|
||||
_, err = repo.LoadUnpacked(context.TODO(), restic.IndexFile, id)
|
||||
if err == nil {
|
||||
t.Fatal("missing expected error")
|
||||
}
|
||||
rtest.Assert(t, strings.Contains(err.Error(), "invalid data returned"), "unexpected error: %v", err)
|
||||
rtest.Assert(t, errors.Is(err, restic.ErrInvalidData), "unexpected error: %v", err)
|
||||
}
|
||||
|
||||
type damageOnceBackend struct {
|
||||
backend.Backend
|
||||
m sync.Map
|
||||
}
|
||||
|
||||
func (be *damageOnceBackend) Load(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
|
@ -271,13 +294,14 @@ func (be *damageOnceBackend) Load(ctx context.Context, h backend.Handle, length
|
|||
if h.Type == restic.ConfigFile {
|
||||
return be.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
|
||||
h.IsMetadata = false
|
||||
_, isRetry := be.m.LoadOrStore(h, true)
|
||||
if !isRetry {
|
||||
// return broken data on the first try
|
||||
err := be.Backend.Load(ctx, h, length+1, offset, fn)
|
||||
if err != nil {
|
||||
// retry
|
||||
err = be.Backend.Load(ctx, h, length, offset, fn)
|
||||
offset++
|
||||
}
|
||||
return err
|
||||
return be.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
|
||||
func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) {
|
||||
|
@ -398,3 +422,38 @@ func TestInvalidCompression(t *testing.T) {
|
|||
_, err = repository.New(nil, repository.Options{Compression: comp})
|
||||
rtest.Assert(t, err != nil, "missing error")
|
||||
}
|
||||
|
||||
func TestListPack(t *testing.T) {
|
||||
be := mem.New()
|
||||
repo := repository.TestRepositoryWithBackend(t, &damageOnceBackend{Backend: be}, restic.StableRepoVersion, repository.Options{}).(*repository.Repository)
|
||||
buf := test.Random(42, 1000)
|
||||
|
||||
var wg errgroup.Group
|
||||
repo.StartPackUploader(context.TODO(), &wg)
|
||||
id, _, _, err := repo.SaveBlob(context.TODO(), restic.TreeBlob, buf, restic.ID{}, false)
|
||||
rtest.OK(t, err)
|
||||
rtest.OK(t, repo.Flush(context.Background()))
|
||||
|
||||
// setup cache after saving the blob to make sure that the damageOnceBackend damages the cached data
|
||||
c := cache.TestNewCache(t)
|
||||
repo.UseCache(c)
|
||||
|
||||
// Forcibly cache pack file
|
||||
packID := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID
|
||||
rtest.OK(t, repo.Backend().Load(context.TODO(), backend.Handle{Type: restic.PackFile, IsMetadata: true, Name: packID.String()}, 0, 0, func(rd io.Reader) error { return nil }))
|
||||
|
||||
// Get size to list pack
|
||||
var size int64
|
||||
rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, sz int64) error {
|
||||
if id == packID {
|
||||
size = sz
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
|
||||
blobs, _, err := repo.ListPack(context.TODO(), packID, size)
|
||||
rtest.OK(t, err)
|
||||
rtest.Assert(t, len(blobs) == 1 && blobs[0].ID == id, "unexpected blobs in pack: %v", blobs)
|
||||
|
||||
rtest.Assert(t, !c.Has(backend.Handle{Type: restic.PackFile, Name: packID.String()}), "tree pack should no longer be cached as ListPack does not set IsMetadata in the backend.Handle")
|
||||
}
|
||||
|
|
|
@ -57,6 +57,11 @@ type Repository interface {
|
|||
// LoadUnpacked loads and decrypts the file with the given type and ID.
|
||||
LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error)
|
||||
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
|
||||
|
||||
// LoadRaw reads all data stored in the backend for the file with id and filetype t.
|
||||
// If the backend returns data that does not match the id, then the buffer is returned
|
||||
// along with an error that is a restic.ErrInvalidData error.
|
||||
LoadRaw(ctx context.Context, t FileType, id ID) (data []byte, err error)
|
||||
}
|
||||
|
||||
type FileType = backend.FileType
|
||||
|
|
Loading…
Reference in a new issue