forked from TrueCloudLab/restic
Merge pull request #3521 from MichaelEischer/redownload-broken-files
Redownload files with wrong hash
This commit is contained in:
commit
9ec7eee803
7 changed files with 190 additions and 20 deletions
13
changelog/unreleased/issue-2533
Normal file
13
changelog/unreleased/issue-2533
Normal file
|
@ -0,0 +1,13 @@
|
|||
Enhancement: Redownload cached data if invalid
|
||||
|
||||
In rare situations, like for example after a system crash, the data stored
|
||||
in the cache might be corrupted. This could cause restic to fail and
|
||||
required manually deleting the cache.
|
||||
|
||||
Restic now automatically removes broken data from the cache, allowing it
|
||||
to recover from such a situation without user intervention. In addition,
|
||||
restic retries downloads which return corrupt data in order to handle
|
||||
temporary download problems.
|
||||
|
||||
https://github.com/restic/restic/issues/2533
|
||||
https://github.com/restic/restic/pull/3521
|
|
@ -6,6 +6,8 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
|
@ -13,6 +15,7 @@ import (
|
|||
// buffer, which is truncated. If the buffer is not large enough or nil, a new
|
||||
// one is allocated.
|
||||
func LoadAll(ctx context.Context, buf []byte, be restic.Backend, h restic.Handle) ([]byte, error) {
|
||||
retriedInvalidData := false
|
||||
err := be.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||
// make sure this is idempotent, in case an error occurs this function may be called multiple times!
|
||||
wr := bytes.NewBuffer(buf[:0])
|
||||
|
@ -21,6 +24,18 @@ func LoadAll(ctx context.Context, buf []byte, be restic.Backend, h restic.Handle
|
|||
return cerr
|
||||
}
|
||||
buf = wr.Bytes()
|
||||
|
||||
// retry loading damaged data only once. If a file fails to download correctly
|
||||
// the second time, then it is likely corrupted at the backend. Return the data
|
||||
// to the caller in that case to let it decide what to do with the data.
|
||||
if !retriedInvalidData && h.Type != restic.ConfigFile {
|
||||
id, err := restic.ParseID(h.Name)
|
||||
if err == nil && !restic.Hash(buf).Equal(id) {
|
||||
debug.Log("retry loading broken blob %v", h)
|
||||
retriedInvalidData = true
|
||||
return errors.Errorf("loadAll(%v): invalid data returned", h)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
|
@ -56,6 +57,47 @@ func save(t testing.TB, be restic.Backend, buf []byte) restic.Handle {
|
|||
return h
|
||||
}
|
||||
|
||||
type quickRetryBackend struct {
|
||||
restic.Backend
|
||||
}
|
||||
|
||||
func (be *quickRetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
err := be.Backend.Load(ctx, h, length, offset, fn)
|
||||
if err != nil {
|
||||
// retry
|
||||
err = be.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func TestLoadAllBroken(t *testing.T) {
|
||||
b := mock.NewBackend()
|
||||
|
||||
data := rtest.Random(23, rand.Intn(MiB)+500*KiB)
|
||||
id := restic.Hash(data)
|
||||
// damage buffer
|
||||
data[0] ^= 0xff
|
||||
|
||||
b.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
|
||||
return ioutil.NopCloser(bytes.NewReader(data)), nil
|
||||
}
|
||||
|
||||
// must fail on first try
|
||||
_, err := backend.LoadAll(context.TODO(), nil, b, restic.Handle{Type: restic.PackFile, Name: id.String()})
|
||||
if err == nil {
|
||||
t.Fatalf("missing expected error")
|
||||
}
|
||||
|
||||
// must return the broken data after a retry
|
||||
be := &quickRetryBackend{Backend: b}
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, be, restic.Handle{Type: restic.PackFile, Name: id.String()})
|
||||
rtest.OK(t, err)
|
||||
|
||||
if !bytes.Equal(buf, data) {
|
||||
t.Fatalf("wrong data returned")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoadAllAppend(t *testing.T) {
|
||||
b := mem.New()
|
||||
|
||||
|
|
31
internal/cache/backend.go
vendored
31
internal/cache/backend.go
vendored
|
@ -131,21 +131,19 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// loadFromCacheOrDelegate will try to load the file from the cache, and fall
|
||||
// back to the backend if that fails.
|
||||
func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||
// loadFromCache will try to load the file from the cache.
|
||||
func (b *Backend) loadFromCache(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) (bool, error) {
|
||||
rd, err := b.Cache.load(h, length, offset)
|
||||
if err != nil {
|
||||
debug.Log("error caching %v: %v, falling back to backend", h, err)
|
||||
return b.Backend.Load(ctx, h, length, offset, consumer)
|
||||
return false, err
|
||||
}
|
||||
|
||||
err = consumer(rd)
|
||||
if err != nil {
|
||||
_ = rd.Close() // ignore secondary errors
|
||||
return err
|
||||
return true, err
|
||||
}
|
||||
return rd.Close()
|
||||
return true, rd.Close()
|
||||
}
|
||||
|
||||
// Load loads a file from the cache or the backend.
|
||||
|
@ -161,14 +159,14 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
|||
}
|
||||
|
||||
// try loading from cache without checking that the handle is actually cached
|
||||
rd, err := b.Cache.load(h, length, offset)
|
||||
if err == nil {
|
||||
err = consumer(rd)
|
||||
if err != nil {
|
||||
_ = rd.Close() // ignore secondary errors
|
||||
return err
|
||||
inCache, err := b.loadFromCache(ctx, h, length, offset, consumer)
|
||||
if inCache {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return rd.Close()
|
||||
|
||||
// drop from cache and retry once
|
||||
_ = b.Cache.remove(h)
|
||||
}
|
||||
debug.Log("error loading %v from cache: %v", h, err)
|
||||
|
||||
|
@ -181,7 +179,10 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
|||
debug.Log("auto-store %v in the cache", h)
|
||||
err = b.cacheFile(ctx, h)
|
||||
if err == nil {
|
||||
return b.loadFromCacheOrDelegate(ctx, h, length, offset, consumer)
|
||||
inCache, err = b.loadFromCache(ctx, h, length, offset, consumer)
|
||||
if inCache {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
debug.Log("error caching %v: %v, falling back to backend", h, err)
|
||||
|
|
36
internal/cache/backend_test.go
vendored
36
internal/cache/backend_test.go
vendored
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -48,7 +49,6 @@ func remove(t testing.TB, be restic.Backend, h restic.Handle) {
|
|||
func randomData(n int) (restic.Handle, []byte) {
|
||||
data := test.Random(rand.Int(), n)
|
||||
id := restic.Hash(data)
|
||||
copy(id[:], data)
|
||||
h := restic.Handle{
|
||||
Type: restic.IndexFile,
|
||||
Name: id.String(),
|
||||
|
@ -172,3 +172,37 @@ func TestErrorBackend(t *testing.T) {
|
|||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestBackendRemoveBroken(t *testing.T) {
|
||||
be := mem.New()
|
||||
|
||||
c, cleanup := TestNewCache(t)
|
||||
defer cleanup()
|
||||
|
||||
h, data := randomData(5234142)
|
||||
// save directly in backend
|
||||
save(t, be, h, data)
|
||||
|
||||
// prime cache with broken copy
|
||||
broken := append([]byte{}, data...)
|
||||
broken[0] ^= 0xff
|
||||
err := c.Save(h, bytes.NewReader(broken))
|
||||
test.OK(t, err)
|
||||
|
||||
// loadall retries if broken data was returned
|
||||
buf, err := backend.LoadAll(context.TODO(), nil, c.Wrap(be), h)
|
||||
test.OK(t, err)
|
||||
|
||||
if !bytes.Equal(buf, data) {
|
||||
t.Fatalf("wrong data returned")
|
||||
}
|
||||
|
||||
// check that the cache now contains the correct data
|
||||
rd, err := c.load(h, 0, 0)
|
||||
test.OK(t, err)
|
||||
cached, err := ioutil.ReadAll(rd)
|
||||
test.OK(t, err)
|
||||
if !bytes.Equal(cached, data) {
|
||||
t.Fatalf("wrong data cache")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,7 +183,10 @@ func (r *Repository) LoadUnpacked(ctx context.Context, t restic.FileType, id res
|
|||
id = restic.ID{}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
h := restic.Handle{Type: t, Name: id.String()}
|
||||
retriedInvalidData := false
|
||||
err := r.be.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||
// make sure this call is idempotent, in case an error occurs
|
||||
wr := bytes.NewBuffer(buf[:0])
|
||||
|
@ -192,6 +195,16 @@ func (r *Repository) LoadUnpacked(ctx context.Context, t restic.FileType, id res
|
|||
return cerr
|
||||
}
|
||||
buf = wr.Bytes()
|
||||
|
||||
if t != restic.ConfigFile && !restic.Hash(buf).Equal(id) {
|
||||
debug.Log("retry loading broken blob %v", h)
|
||||
if !retriedInvalidData {
|
||||
retriedInvalidData = true
|
||||
} else {
|
||||
cancel()
|
||||
}
|
||||
return errors.Errorf("load(%v): invalid data returned", h)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
|
@ -199,10 +212,6 @@ func (r *Repository) LoadUnpacked(ctx context.Context, t restic.FileType, id res
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if t != restic.ConfigFile && !restic.Hash(buf).Equal(id) {
|
||||
return nil, errors.Errorf("load %v: invalid data returned", h)
|
||||
}
|
||||
|
||||
nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():]
|
||||
plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil)
|
||||
if err != nil {
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/klauspost/compress/zstd"
|
||||
"github.com/restic/restic/internal/backend/local"
|
||||
"github.com/restic/restic/internal/crypto"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
@ -279,6 +280,61 @@ func loadIndex(ctx context.Context, repo restic.Repository, id restic.ID) (*repo
|
|||
return idx, err
|
||||
}
|
||||
|
||||
func TestRepositoryLoadUnpackedBroken(t *testing.T) {
|
||||
repodir, cleanup := rtest.Env(t, repoFixture)
|
||||
defer cleanup()
|
||||
|
||||
data := rtest.Random(23, 12345)
|
||||
id := restic.Hash(data)
|
||||
h := restic.Handle{Type: restic.IndexFile, Name: id.String()}
|
||||
// damage buffer
|
||||
data[0] ^= 0xff
|
||||
|
||||
repo := repository.TestOpenLocal(t, repodir)
|
||||
// store broken file
|
||||
err := repo.Backend().Save(context.TODO(), h, restic.NewByteReader(data, nil))
|
||||
rtest.OK(t, err)
|
||||
|
||||
// without a retry backend this will just return an error that the file is broken
|
||||
_, err = repo.LoadUnpacked(context.TODO(), restic.IndexFile, id, nil)
|
||||
if err == nil {
|
||||
t.Fatal("missing expected error")
|
||||
}
|
||||
rtest.Assert(t, strings.Contains(err.Error(), "invalid data returned"), "unexpected error: %v", err)
|
||||
}
|
||||
|
||||
type damageOnceBackend struct {
|
||||
restic.Backend
|
||||
}
|
||||
|
||||
func (be *damageOnceBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
// don't break the config file as we can't retry it
|
||||
if h.Type == restic.ConfigFile {
|
||||
return be.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
// return broken data on the first try
|
||||
err := be.Backend.Load(ctx, h, length+1, offset, fn)
|
||||
if err != nil {
|
||||
// retry
|
||||
err = be.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func TestRepositoryLoadUnpackedRetryBroken(t *testing.T) {
|
||||
repodir, cleanup := rtest.Env(t, repoFixture)
|
||||
defer cleanup()
|
||||
|
||||
be, err := local.Open(context.TODO(), local.Config{Path: repodir, Connections: 2})
|
||||
rtest.OK(t, err)
|
||||
repo, err := repository.New(&damageOnceBackend{Backend: be}, repository.Options{})
|
||||
rtest.OK(t, err)
|
||||
err = repo.SearchKey(context.TODO(), test.TestPassword, 10, "")
|
||||
rtest.OK(t, err)
|
||||
|
||||
rtest.OK(t, repo.LoadIndex(context.TODO()))
|
||||
}
|
||||
|
||||
func BenchmarkLoadIndex(b *testing.B) {
|
||||
repository.BenchmarkAllVersions(b, benchmarkLoadIndex)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue