Reworked Backend.Load API to retry errors during ongoing download

Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
This commit is contained in:
Igor Fedorenko 2018-01-16 23:59:16 -05:00
parent b723094739
commit d58ae43317
26 changed files with 388 additions and 257 deletions

View file

@ -0,0 +1,3 @@
Enhancement: retry all repository file download errors
https://github.com/restic/restic/pull/1560

View file

@ -38,13 +38,13 @@ func randomID() restic.ID {
// forgetfulBackend returns a backend that forgets everything. // forgetfulBackend returns a backend that forgets everything.
func forgetfulBackend() restic.Backend { func forgetfulBackend() restic.Backend {
be := &mock.Backend{} be := mock.NewBackend()
be.TestFn = func(ctx context.Context, h restic.Handle) (bool, error) { be.TestFn = func(ctx context.Context, h restic.Handle) (bool, error) {
return false, nil return false, nil
} }
be.LoadFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
return nil, errors.New("not found") return nil, errors.New("not found")
} }

View file

@ -178,10 +178,13 @@ func (wr wrapReader) Close() error {
return err return err
} }
// Load returns a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is // given offset.
// returned. rd must be closed after use. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return nil, err return nil, err

View file

@ -142,9 +142,13 @@ func (be *b2Backend) IsNotExist(err error) bool {
return b2.IsNotExist(errors.Cause(err)) return b2.IsNotExist(errors.Cause(err))
} }
// Load returns the data stored in the backend for h at the given offset // Load runs fn with a reader that yields the contents of the file at h at the
// and saves it in p. Load has the same semantics as io.ReaderAt. // given offset.
func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return nil, err return nil, err

View file

@ -66,12 +66,12 @@ func (be *ErrorBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader)
// given offset. If length is larger than zero, only a portion of the file // given offset. If length is larger than zero, only a portion of the file
// is returned. rd must be closed after use. If an error is returned, the // is returned. rd must be closed after use. If an error is returned, the
// ReadCloser must be nil. // ReadCloser must be nil.
func (be *ErrorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { func (be *ErrorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
if be.fail(be.FailLoad) { if be.fail(be.FailLoad) {
return nil, errors.Errorf("Load(%v, %v, %v) random error induced", h, length, offset) return errors.Errorf("Load(%v, %v, %v) random error induced", h, length, offset)
} }
return be.Backend.Load(ctx, h, length, offset) return be.Backend.Load(ctx, h, length, offset, consumer)
} }
// Stat returns information about the File identified by h. // Stat returns information about the File identified by h.

View file

@ -88,15 +88,11 @@ func (be *RetryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader)
// given offset. If length is larger than zero, only a portion of the file // given offset. If length is larger than zero, only a portion of the file
// is returned. rd must be closed after use. If an error is returned, the // is returned. rd must be closed after use. If an error is returned, the
// ReadCloser must be nil. // ReadCloser must be nil.
func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (rd io.ReadCloser, err error) { func (be *RetryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) (err error) {
err = be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset), return be.retry(ctx, fmt.Sprintf("Load(%v, %v, %v)", h, length, offset),
func() error { func() error {
var innerError error return be.Backend.Load(ctx, h, length, offset, consumer)
rd, innerError = be.Backend.Load(ctx, h, length, offset)
return innerError
}) })
return rd, err
} }
// Stat returns information about the File identified by h. // Stat returns information about the File identified by h.

View file

@ -123,3 +123,66 @@ func TestBackendListRetry(t *testing.T) {
test.Equals(t, 2, retry) // assert retried once test.Equals(t, 2, retry) // assert retried once
test.Equals(t, []string{ID1, ID2}, listed) // assert no duplicate files test.Equals(t, []string{ID1, ID2}, listed) // assert no duplicate files
} }
// failingReader returns an error after reading limit number of bytes
type failingReader struct {
data []byte
pos int
limit int
}
func (r failingReader) Read(p []byte) (n int, err error) {
i := 0
for ; i < len(p) && i+r.pos < r.limit; i++ {
p[i] = r.data[r.pos+i]
}
r.pos += i
if r.pos >= r.limit {
return i, errors.Errorf("reader reached limit of %d", r.limit)
}
return i, nil
}
func (r failingReader) Close() error {
return nil
}
// closingReader adapts io.Reader to io.ReadCloser interface
type closingReader struct {
rd io.Reader
}
func (r closingReader) Read(p []byte) (n int, err error) {
return r.rd.Read(p)
}
func (r closingReader) Close() error {
return nil
}
func TestBackendLoadRetry(t *testing.T) {
data := test.Random(23, 1024)
limit := 100
attempt := 0
be := mock.NewBackend()
be.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
// returns failing reader on first invocation, good reader on subsequent invocations
attempt++
if attempt > 1 {
return closingReader{rd: bytes.NewReader(data)}, nil
}
return failingReader{data: data, limit: limit}, nil
}
retryBackend := RetryBackend{
Backend: be,
}
var buf []byte
err := retryBackend.Load(context.TODO(), restic.Handle{}, 0, 0, func(rd io.Reader) (err error) {
buf, err = ioutil.ReadAll(rd)
return err
})
test.OK(t, err)
test.Equals(t, data, buf)
test.Equals(t, 2, attempt)
}

View file

@ -282,10 +282,13 @@ func (wr wrapReader) Close() error {
return err return err
} }
// Load returns a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is // given offset.
// returned. rd must be closed after use. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return nil, err return nil, err

View file

@ -146,10 +146,13 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd io.Reader) error {
return setNewFileMode(filename, backend.Modes.File) return setNewFileMode(filename, backend.Modes.File)
} }
// Load returns a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is // given offset.
// returned. rd must be closed after use. func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { return backend.DefaultLoad(ctx, h, length, offset, b.openReader, fn)
}
func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v", h, length, offset) debug.Log("Load %v, length %v, offset %v", h, length, offset)
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return nil, err return nil, err

View file

@ -7,6 +7,7 @@ import (
"io/ioutil" "io/ioutil"
"sync" "sync"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
@ -85,10 +86,13 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader
return nil return nil
} }
// Load returns a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is // given offset.
// returned. rd must be closed after use. func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return nil, err return nil, err
} }

View file

@ -166,10 +166,13 @@ func (b *restBackend) IsNotExist(err error) bool {
return ok return ok
} }
// Load returns a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is // given offset.
// returned. rd must be closed after use. func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
func (b *restBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { return backend.DefaultLoad(ctx, h, length, offset, b.openReader, fn)
}
func (b *restBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v", h, length, offset) debug.Log("Load %v, length %v, offset %v", h, length, offset)
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return nil, err return nil, err

View file

@ -281,10 +281,13 @@ func (wr wrapReader) Close() error {
return err return err
} }
// Load returns a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is // given offset.
// returned. rd must be closed after use. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return nil, err return nil, err

View file

@ -327,10 +327,13 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err err
return errors.Wrap(r.c.Chmod(filename, backend.Modes.File), "Chmod") return errors.Wrap(r.c.Chmod(filename, backend.Modes.File), "Chmod")
} }
// Load returns a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is // given offset.
// returned. rd must be closed after use. func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn)
}
func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v", h, length, offset) debug.Log("Load %v, length %v, offset %v", h, length, offset)
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return nil, err return nil, err

View file

@ -109,10 +109,13 @@ func (be *beSwift) Location() string {
return be.container return be.container
} }
// Load returns a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is nonzero, only a portion of the file is // given offset.
// returned. rd must be closed after use. func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn)
}
func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v", h, length, offset) debug.Log("Load %v, length %v, offset %v", h, length, offset)
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return nil, err return nil, err

View file

@ -42,20 +42,15 @@ func (s *Suite) BenchmarkLoadFile(t *testing.B) {
t.ResetTimer() t.ResetTimer()
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
rd, err := be.Load(context.TODO(), handle, 0, 0) var n int
err := be.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, buf)
return ierr
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
n, err := io.ReadFull(rd, buf)
if err != nil {
t.Fatal(err)
}
if err = rd.Close(); err != nil {
t.Fatalf("Close() returned error: %v", err)
}
if n != length { if n != length {
t.Fatalf("wrong number of bytes read: want %v, got %v", length, n) t.Fatalf("wrong number of bytes read: want %v, got %v", length, n)
} }
@ -84,20 +79,15 @@ func (s *Suite) BenchmarkLoadPartialFile(t *testing.B) {
t.ResetTimer() t.ResetTimer()
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
rd, err := be.Load(context.TODO(), handle, testLength, 0) var n int
err := be.Load(context.TODO(), handle, testLength, 0, func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, buf)
return ierr
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
n, err := io.ReadFull(rd, buf)
if err != nil {
t.Fatal(err)
}
if err = rd.Close(); err != nil {
t.Fatalf("Close() returned error: %v", err)
}
if n != testLength { if n != testLength {
t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n) t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n)
} }
@ -128,20 +118,15 @@ func (s *Suite) BenchmarkLoadPartialFileOffset(t *testing.B) {
t.ResetTimer() t.ResetTimer()
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
rd, err := be.Load(context.TODO(), handle, testLength, int64(testOffset)) var n int
err := be.Load(context.TODO(), handle, testLength, int64(testOffset), func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, buf)
return ierr
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
n, err := io.ReadFull(rd, buf)
if err != nil {
t.Fatal(err)
}
if err = rd.Close(); err != nil {
t.Fatalf("Close() returned error: %v", err)
}
if n != testLength { if n != testLength {
t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n) t.Fatalf("wrong number of bytes read: want %v, got %v", testLength, n)
} }

View file

@ -115,13 +115,14 @@ func (s *Suite) TestLoad(t *testing.T) {
b := s.open(t) b := s.open(t)
defer s.close(t, b) defer s.close(t, b)
rd, err := b.Load(context.TODO(), restic.Handle{}, 0, 0) noop := func(rd io.Reader) error {
return nil
}
err := b.Load(context.TODO(), restic.Handle{}, 0, 0, noop)
if err == nil { if err == nil {
t.Fatalf("Load() did not return an error for invalid handle") t.Fatalf("Load() did not return an error for invalid handle")
} }
if rd != nil {
_ = rd.Close()
}
err = testLoad(b, restic.Handle{Type: restic.DataFile, Name: "foobar"}, 0, 0) err = testLoad(b, restic.Handle{Type: restic.DataFile, Name: "foobar"}, 0, 0)
if err == nil { if err == nil {
@ -141,13 +142,19 @@ func (s *Suite) TestLoad(t *testing.T) {
t.Logf("saved %d bytes as %v", length, handle) t.Logf("saved %d bytes as %v", length, handle)
rd, err = b.Load(context.TODO(), handle, 100, -1) err = b.Load(context.TODO(), handle, 100, -1, noop)
if err == nil { if err == nil {
t.Fatalf("Load() returned no error for negative offset!") t.Fatalf("Load() returned no error for negative offset!")
} }
if rd != nil { err = b.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) error {
t.Fatalf("Load() returned a non-nil reader for negative offset!") return errors.Errorf("deliberate error")
})
if err == nil {
t.Fatalf("Load() did not propagate consumer error!")
}
if err.Error() != "deliberate error" {
t.Fatalf("Load() did not correctly propagate consumer error!")
} }
loadTests := 50 loadTests := 50
@ -176,63 +183,38 @@ func (s *Suite) TestLoad(t *testing.T) {
d = d[:l] d = d[:l]
} }
rd, err := b.Load(context.TODO(), handle, getlen, int64(o)) var buf []byte
err := b.Load(context.TODO(), handle, getlen, int64(o), func(rd io.Reader) (ierr error) {
buf, ierr = ioutil.ReadAll(rd)
return ierr
})
if err != nil { if err != nil {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) returned unexpected error: %+v", l, o, err) t.Errorf("Load(%d, %d) returned unexpected error: %+v", l, o, err)
continue continue
} }
buf, err := ioutil.ReadAll(rd)
if err != nil {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) ReadAll() returned unexpected error: %+v", l, o, err)
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue
}
if l == 0 && len(buf) != len(d) { if l == 0 && len(buf) != len(d) {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, len(d), len(buf)) t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, len(d), len(buf))
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue continue
} }
if l > 0 && l <= len(d) && len(buf) != l { if l > 0 && l <= len(d) && len(buf) != l {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, l, len(buf)) t.Errorf("Load(%d, %d) wrong number of bytes read: want %d, got %d", l, o, l, len(buf))
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue continue
} }
if l > len(d) && len(buf) != len(d) { if l > len(d) && len(buf) != len(d) {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) wrong number of bytes read for overlong read: want %d, got %d", l, o, l, len(buf)) t.Errorf("Load(%d, %d) wrong number of bytes read for overlong read: want %d, got %d", l, o, l, len(buf))
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue continue
} }
if !bytes.Equal(buf, d) { if !bytes.Equal(buf, d) {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen) t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) returned wrong bytes", l, o) t.Errorf("Load(%d, %d) returned wrong bytes", l, o)
if err = rd.Close(); err != nil {
t.Errorf("Load(%d, %d) rd.Close() returned error: %+v", l, o, err)
}
continue
}
err = rd.Close()
if err != nil {
t.Logf("Load, l %v, o %v, len(d) %v, getlen %v", l, o, len(d), getlen)
t.Errorf("Load(%d, %d) rd.Close() returned unexpected error: %+v", l, o, err)
continue continue
} }
} }
@ -647,17 +629,10 @@ func store(t testing.TB, b restic.Backend, tpe restic.FileType, data []byte) res
// testLoad loads a blob (but discards its contents). // testLoad loads a blob (but discards its contents).
func testLoad(b restic.Backend, h restic.Handle, length int, offset int64) error { func testLoad(b restic.Backend, h restic.Handle, length int, offset int64) error {
rd, err := b.Load(context.TODO(), h, 0, 0) return b.Load(context.TODO(), h, 0, 0, func(rd io.Reader) (ierr error) {
if err != nil { _, ierr = io.Copy(ioutil.Discard, rd)
return err return ierr
} })
_, err = io.Copy(ioutil.Discard, rd)
cerr := rd.Close()
if err == nil {
err = cerr
}
return err
} }
func (s *Suite) delayedRemove(t testing.TB, be restic.Backend, handles ...restic.Handle) error { func (s *Suite) delayedRemove(t testing.TB, be restic.Backend, handles ...restic.Handle) error {
@ -776,18 +751,14 @@ func (s *Suite) TestBackend(t *testing.T) {
length := end - start length := end - start
buf2 := make([]byte, length) buf2 := make([]byte, length)
rd, err := b.Load(context.TODO(), h, len(buf2), int64(start)) var n int
err = b.Load(context.TODO(), h, len(buf2), int64(start), func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, buf2)
return ierr
})
test.OK(t, err) test.OK(t, err)
n, err := io.ReadFull(rd, buf2)
test.OK(t, err) test.OK(t, err)
test.Equals(t, len(buf2), n) test.Equals(t, len(buf2), n)
remaining, err := io.Copy(ioutil.Discard, rd)
test.OK(t, err)
test.Equals(t, int64(0), remaining)
test.OK(t, rd.Close())
test.Equals(t, ts.data[start:end], string(buf2)) test.Equals(t, ts.data[start:end], string(buf2))
} }

View file

@ -10,24 +10,11 @@ import (
// LoadAll reads all data stored in the backend for the handle. // LoadAll reads all data stored in the backend for the handle.
func LoadAll(ctx context.Context, be restic.Backend, h restic.Handle) (buf []byte, err error) { func LoadAll(ctx context.Context, be restic.Backend, h restic.Handle) (buf []byte, err error) {
rd, err := be.Load(ctx, h, 0, 0) err = be.Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) {
if err != nil { buf, ierr = ioutil.ReadAll(rd)
return nil, err return ierr
} })
return buf, err
defer func() {
_, e := io.Copy(ioutil.Discard, rd)
if err == nil {
err = e
}
e = rd.Close()
if err == nil {
err = e
}
}()
return ioutil.ReadAll(rd)
} }
// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method. // LimitedReadCloser wraps io.LimitedReader and exposes the Close() method.
@ -46,3 +33,19 @@ func (l *LimitedReadCloser) Read(p []byte) (int, error) {
func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser {
return &LimitedReadCloser{ReadCloser: r, Reader: io.LimitReader(r, n)} return &LimitedReadCloser{ReadCloser: r, Reader: io.LimitReader(r, n)}
} }
// DefaultLoad implements Backend.Load using lower-level openReader func
func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64,
openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error),
fn func(rd io.Reader) error) error {
rd, err := openReader(ctx, h, length, offset)
if err != nil {
return err
}
err = fn(rd)
if err != nil {
rd.Close() // ignore secondary errors closing the reader
return err
}
return rd.Close()
}

View file

@ -3,11 +3,13 @@ package backend_test
import ( import (
"bytes" "bytes"
"context" "context"
"io"
"math/rand" "math/rand"
"testing" "testing"
"github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/backend/mem"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
rtest "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test"
) )
@ -89,3 +91,54 @@ func TestLoadLargeBuffer(t *testing.T) {
} }
} }
} }
type mockReader struct {
closed bool
}
func (rd *mockReader) Read(p []byte) (n int, err error) {
return 0, nil
}
func (rd *mockReader) Close() error {
rd.closed = true
return nil
}
func TestDefaultLoad(t *testing.T) {
h := restic.Handle{Name: "id", Type: restic.DataFile}
rd := &mockReader{}
// happy case, assert correct parameters are passed around and content stream is closed
err := backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) {
rtest.Equals(t, h, ih)
rtest.Equals(t, int(10), length)
rtest.Equals(t, int64(11), offset)
return rd, nil
}, func(ird io.Reader) error {
rtest.Equals(t, rd, ird)
return nil
})
rtest.OK(t, err)
rtest.Equals(t, true, rd.closed)
// unhappy case, assert producer errors are handled correctly
err = backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) {
return nil, errors.Errorf("producer error")
}, func(ird io.Reader) error {
t.Fatalf("unexpected consumer invocation")
return nil
})
rtest.Equals(t, "producer error", err.Error())
// unhappy case, assert consumer errors are handled correctly
rd = &mockReader{}
err = backend.DefaultLoad(context.TODO(), h, 10, 11, func(ctx context.Context, ih restic.Handle, length int, offset int64) (io.ReadCloser, error) {
return rd, nil
}, func(ird io.Reader) error {
return errors.Errorf("consumer error")
})
rtest.Equals(t, true, rd.closed)
rtest.Equals(t, "consumer error", err.Error())
}

View file

@ -121,17 +121,10 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
return nil return nil
} }
rd, err := b.Backend.Load(ctx, h, 0, 0) err := b.Backend.Load(ctx, h, 0, 0, func(rd io.Reader) error {
return b.Cache.Save(h, rd)
})
if err != nil { if err != nil {
return err
}
if err = b.Cache.Save(h, rd); err != nil {
_ = rd.Close()
return err
}
if err = rd.Close(); err != nil {
// try to remove from the cache, ignore errors // try to remove from the cache, ignore errors
_ = b.Cache.Remove(h) _ = b.Cache.Remove(h)
return err return err
@ -142,17 +135,22 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
// loadFromCacheOrDelegate will try to load the file from the cache, and fall // loadFromCacheOrDelegate will try to load the file from the cache, and fall
// back to the backend if that fails. // back to the backend if that fails.
func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
rd, err := b.Cache.Load(h, length, offset) rd, err := b.Cache.Load(h, length, offset)
if err == nil { if err != nil {
return rd, nil return b.Backend.Load(ctx, h, length, offset, consumer)
} }
return b.Backend.Load(ctx, h, length, offset) err = consumer(rd)
if err != nil {
rd.Close() // ignore secondary errors
return err
}
return rd.Close()
} }
// Load loads a file from the cache or the backend. // Load loads a file from the cache or the backend.
func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
b.inProgressMutex.Lock() b.inProgressMutex.Lock()
waitForFinish, inProgress := b.inProgress[h] waitForFinish, inProgress := b.inProgress[h]
b.inProgressMutex.Unlock() b.inProgressMutex.Unlock()
@ -167,7 +165,12 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
debug.Log("Load(%v, %v, %v) from cache", h, length, offset) debug.Log("Load(%v, %v, %v) from cache", h, length, offset)
rd, err := b.Cache.Load(h, length, offset) rd, err := b.Cache.Load(h, length, offset)
if err == nil { if err == nil {
return rd, nil err = consumer(rd)
if err != nil {
rd.Close() // ignore secondary errors
return err
}
return rd.Close()
} }
debug.Log("error loading %v from cache: %v", h, err) debug.Log("error loading %v from cache: %v", h, err)
} }
@ -179,20 +182,20 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
err := b.cacheFile(ctx, h) err := b.cacheFile(ctx, h)
if err == nil { if err == nil {
return b.loadFromCacheOrDelegate(ctx, h, length, offset) return b.loadFromCacheOrDelegate(ctx, h, length, offset, consumer)
} }
debug.Log("error caching %v: %v", h, err) debug.Log("error caching %v: %v", h, err)
} }
debug.Log("Load(%v, %v, %v): partial file requested, delegating to backend", h, length, offset) debug.Log("Load(%v, %v, %v): partial file requested, delegating to backend", h, length, offset)
return b.Backend.Load(ctx, h, length, offset) return b.Backend.Load(ctx, h, length, offset, consumer)
} }
// if we don't automatically cache this file type, fall back to the backend // if we don't automatically cache this file type, fall back to the backend
if _, ok := autoCacheFiles[h.Type]; !ok { if _, ok := autoCacheFiles[h.Type]; !ok {
debug.Log("Load(%v, %v, %v): delegating to backend", h, length, offset) debug.Log("Load(%v, %v, %v): delegating to backend", h, length, offset)
return b.Backend.Load(ctx, h, length, offset) return b.Backend.Load(ctx, h, length, offset, consumer)
} }
debug.Log("auto-store %v in the cache", h) debug.Log("auto-store %v in the cache", h)
@ -200,11 +203,20 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
if err == nil { if err == nil {
// load the cached version // load the cached version
return b.Cache.Load(h, 0, 0) rd, err := b.Cache.Load(h, 0, 0)
if err != nil {
return err
}
err = consumer(rd)
if err != nil {
rd.Close() // ignore secondary errors
return err
}
return rd.Close()
} }
debug.Log("error caching %v: %v, falling back to backend", h, err) debug.Log("error caching %v: %v, falling back to backend", h, err)
return b.Backend.Load(ctx, h, length, offset) return b.Backend.Load(ctx, h, length, offset, consumer)
} }
// Stat tests whether the backend has a file. If it does not exist but still // Stat tests whether the backend has a file. If it does not exist but still

View file

@ -630,14 +630,8 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
debug.Log("checking pack %v", id) debug.Log("checking pack %v", id)
h := restic.Handle{Type: restic.DataFile, Name: id.String()} h := restic.Handle{Type: restic.DataFile, Name: id.String()}
rd, err := r.Backend().Load(ctx, h, 0, 0)
if err != nil {
return err
}
packfile, err := fs.TempFile("", "restic-temp-check-") packfile, err := fs.TempFile("", "restic-temp-check-")
if err != nil { if err != nil {
_ = rd.Close()
return errors.Wrap(err, "TempFile") return errors.Wrap(err, "TempFile")
} }
@ -646,18 +640,25 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
_ = os.Remove(packfile.Name()) _ = os.Remove(packfile.Name())
}() }()
hrd := hashing.NewReader(rd, sha256.New()) var hash restic.ID
size, err := io.Copy(packfile, hrd) var size int64
err = r.Backend().Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) {
_, ierr = packfile.Seek(0, io.SeekStart)
if ierr == nil {
ierr = packfile.Truncate(0)
}
if ierr != nil {
return ierr
}
hrd := hashing.NewReader(rd, sha256.New())
size, ierr = io.Copy(packfile, hrd)
hash = restic.IDFromHash(hrd.Sum(nil))
return ierr
})
if err != nil { if err != nil {
_ = rd.Close() return errors.Wrap(err, "checkPack")
return errors.Wrap(err, "Copy")
} }
if err = rd.Close(); err != nil {
return err
}
hash := restic.IDFromHash(hrd.Sum(nil))
debug.Log("hash for pack %v is %v", id, hash) debug.Log("hash for pack %v is %v", id, hash)
if !hash.Equal(id) { if !hash.Equal(id) {

View file

@ -195,20 +195,17 @@ func TestModifiedIndex(t *testing.T) {
Type: restic.IndexFile, Type: restic.IndexFile,
Name: "90f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd", Name: "90f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd",
} }
f, err := repo.Backend().Load(context.TODO(), h, 0, 0) err := repo.Backend().Load(context.TODO(), h, 0, 0, func(rd io.Reader) error {
// save the index again with a modified name so that the hash doesn't match
// the content any more
h2 := restic.Handle{
Type: restic.IndexFile,
Name: "80f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd",
}
return repo.Backend().Save(context.TODO(), h2, rd)
})
test.OK(t, err) test.OK(t, err)
// save the index again with a modified name so that the hash doesn't match
// the content any more
h2 := restic.Handle{
Type: restic.IndexFile,
Name: "80f838b4ac28735fda8644fe6a08dbc742e57aaf81b30977b4fefa357010eafd",
}
err = repo.Backend().Save(context.TODO(), h2, f)
test.OK(t, err)
test.OK(t, f.Close())
chkr := checker.New(repo) chkr := checker.New(repo)
hints, errs := chkr.LoadIndex(context.TODO()) hints, errs := chkr.LoadIndex(context.TODO())
if len(errs) == 0 { if len(errs) == 0 {
@ -262,35 +259,27 @@ type errorBackend struct {
ProduceErrors bool ProduceErrors bool
} }
func (b errorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { func (b errorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
rd, err := b.Backend.Load(ctx, h, length, offset) return b.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
if err != nil { if b.ProduceErrors {
return rd, err return consumer(errorReadCloser{rd})
} }
return consumer(rd)
if b.ProduceErrors { })
return errorReadCloser{rd}, err
}
return rd, nil
} }
type errorReadCloser struct { type errorReadCloser struct {
io.ReadCloser io.Reader
} }
func (erd errorReadCloser) Read(p []byte) (int, error) { func (erd errorReadCloser) Read(p []byte) (int, error) {
n, err := erd.ReadCloser.Read(p) n, err := erd.Reader.Read(p)
if n > 0 { if n > 0 {
induceError(p[:n]) induceError(p[:n])
} }
return n, err return n, err
} }
func (erd errorReadCloser) Close() error {
return erd.ReadCloser.Close()
}
// induceError flips a bit in the slice. // induceError flips a bit in the slice.
func induceError(data []byte) { func induceError(data []byte) {
if rand.Float32() < 0.2 { if rand.Float32() < 0.2 {

View file

@ -25,16 +25,13 @@ func (r rateLimitedBackend) Save(ctx context.Context, h restic.Handle, rd io.Rea
return r.Backend.Save(ctx, h, r.limiter.Upstream(rd)) return r.Backend.Save(ctx, h, r.limiter.Upstream(rd))
} }
func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
rc, err := r.Backend.Load(ctx, h, length, offset) return r.Backend.Load(ctx, h, length, offset, func(rd io.Reader) error {
if err != nil { lrd := limitedReadCloser{
return nil, err limited: r.limiter.Downstream(rd),
} }
return consumer(lrd)
return limitedReadCloser{ })
original: rc,
limited: r.limiter.Downstream(rc),
}, nil
} }
type limitedReadCloser struct { type limitedReadCloser struct {
@ -47,6 +44,9 @@ func (l limitedReadCloser) Read(b []byte) (n int, err error) {
} }
func (l limitedReadCloser) Close() error { func (l limitedReadCloser) Close() error {
if l.original == nil {
return nil
}
return l.original.Close() return l.original.Close()
} }

View file

@ -13,7 +13,7 @@ type Backend struct {
CloseFn func() error CloseFn func() error
IsNotExistFn func(err error) bool IsNotExistFn func(err error) bool
SaveFn func(ctx context.Context, h restic.Handle, rd io.Reader) error SaveFn func(ctx context.Context, h restic.Handle, rd io.Reader) error
LoadFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) OpenReaderFn func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error)
StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) StatFn func(ctx context.Context, h restic.Handle) (restic.FileInfo, error)
ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error ListFn func(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error
RemoveFn func(ctx context.Context, h restic.Handle) error RemoveFn func(ctx context.Context, h restic.Handle) error
@ -22,6 +22,12 @@ type Backend struct {
LocationFn func() string LocationFn func() string
} }
// NewBackend returns new mock Backend instance
func NewBackend() *Backend {
be := &Backend{}
return be
}
// Close the backend. // Close the backend.
func (m *Backend) Close() error { func (m *Backend) Close() error {
if m.CloseFn == nil { if m.CloseFn == nil {
@ -58,13 +64,27 @@ func (m *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) error
return m.SaveFn(ctx, h, rd) return m.SaveFn(ctx, h, rd)
} }
// Load loads data from the backend. // Load runs fn with a reader that yields the contents of the file at h at the
func (m *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { // given offset.
if m.LoadFn == nil { func (m *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
rd, err := m.openReader(ctx, h, length, offset)
if err != nil {
return err
}
err = fn(rd)
if err != nil {
rd.Close() // ignore secondary errors closing the reader
return err
}
return rd.Close()
}
func (m *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
if m.OpenReaderFn == nil {
return nil, errors.New("not implemented") return nil, errors.New("not implemented")
} }
return m.LoadFn(ctx, h, length, offset) return m.OpenReaderFn(ctx, h, length, offset)
} }
// Stat an object in the backend. // Stat an object in the backend.

View file

@ -32,23 +32,26 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee
return nil, errors.Wrap(err, "TempFile") return nil, errors.Wrap(err, "TempFile")
} }
beRd, err := repo.Backend().Load(ctx, h, 0, 0) // TODO very similar code in checker, consider moving to utils.go
var hash restic.ID
var packLength int64
err = repo.Backend().Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) {
_, ierr = tempfile.Seek(0, io.SeekStart)
if ierr == nil {
ierr = tempfile.Truncate(0)
}
if ierr != nil {
return ierr
}
hrd := hashing.NewReader(rd, sha256.New())
packLength, ierr = io.Copy(tempfile, hrd)
hash = restic.IDFromHash(hrd.Sum(nil))
return ierr
})
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "Repack")
} }
hrd := hashing.NewReader(beRd, sha256.New())
packLength, err := io.Copy(tempfile, hrd)
if err != nil {
_ = beRd.Close()
return nil, errors.Wrap(err, "Copy")
}
if err = beRd.Close(); err != nil {
return nil, errors.Wrap(err, "Close")
}
hash := restic.IDFromHash(hrd.Sum(nil))
debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash) debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash)
if !packID.Equal(hash) { if !packID.Equal(hash) {

View file

@ -23,11 +23,15 @@ type Backend interface {
// Save stores the data in the backend under the given handle. // Save stores the data in the backend under the given handle.
Save(ctx context.Context, h Handle, rd io.Reader) error Save(ctx context.Context, h Handle, rd io.Reader) error
// Load returns a reader that yields the contents of the file at h at the // Load runs fn with a reader that yields the contents of the file at h at the
// given offset. If length is larger than zero, only a portion of the file // given offset. If length is larger than zero, only a portion of the file
// is returned. rd must be closed after use. If an error is returned, the // is read.
// ReadCloser must be nil. //
Load(ctx context.Context, h Handle, length int, offset int64) (io.ReadCloser, error) // The function fn may be called multiple times during the same Load invocation
// and therefore must be idempotent.
//
// Implementations are encouraged to use backend.DefaultLoad
Load(ctx context.Context, h Handle, length int, offset int64, fn func(rd io.Reader) error) error
// Stat returns information about the File identified by h. // Stat returns information about the File identified by h.
Stat(ctx context.Context, h Handle) (FileInfo, error) Stat(ctx context.Context, h Handle) (FileInfo, error)

View file

@ -25,17 +25,16 @@ func ReaderAt(be Backend, h Handle) io.ReaderAt {
// ReadAt reads from the backend handle h at the given position. // ReadAt reads from the backend handle h at the given position.
func ReadAt(ctx context.Context, be Backend, h Handle, offset int64, p []byte) (n int, err error) { func ReadAt(ctx context.Context, be Backend, h Handle, offset int64, p []byte) (n int, err error) {
debug.Log("ReadAt(%v) at %v, len %v", h, offset, len(p)) debug.Log("ReadAt(%v) at %v, len %v", h, offset, len(p))
rd, err := be.Load(ctx, h, len(p), offset)
err = be.Load(ctx, h, len(p), offset, func(rd io.Reader) (ierr error) {
n, ierr = io.ReadFull(rd, p)
return ierr
})
if err != nil { if err != nil {
return 0, err return 0, err
} }
n, err = io.ReadFull(rd, p)
e := rd.Close()
if err == nil {
err = e
}
debug.Log("ReadAt(%v) ReadFull returned %v bytes", h, n) debug.Log("ReadAt(%v) ReadFull returned %v bytes", h, n)
return n, errors.Wrapf(err, "ReadFull(%v)", h) return n, errors.Wrapf(err, "ReadFull(%v)", h)