forked from TrueCloudLab/restic
Merge pull request #2217 from restic/improve-memory-usage
WIP: improve memory usage
This commit is contained in:
commit
6b700d02f5
17 changed files with 417 additions and 526 deletions
|
@ -74,7 +74,7 @@ func runCat(gopts GlobalOptions, args []string) error {
|
||||||
fmt.Println(string(buf))
|
fmt.Println(string(buf))
|
||||||
return nil
|
return nil
|
||||||
case "index":
|
case "index":
|
||||||
buf, err := repo.LoadAndDecrypt(gopts.ctx, restic.IndexFile, id)
|
buf, err := repo.LoadAndDecrypt(gopts.ctx, nil, restic.IndexFile, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ func runCat(gopts GlobalOptions, args []string) error {
|
||||||
return nil
|
return nil
|
||||||
case "key":
|
case "key":
|
||||||
h := restic.Handle{Type: restic.KeyFile, Name: id.String()}
|
h := restic.Handle{Type: restic.KeyFile, Name: id.String()}
|
||||||
buf, err := backend.LoadAll(gopts.ctx, repo.Backend(), h)
|
buf, err := backend.LoadAll(gopts.ctx, nil, repo.Backend(), h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -150,7 +150,7 @@ func runCat(gopts GlobalOptions, args []string) error {
|
||||||
switch tpe {
|
switch tpe {
|
||||||
case "pack":
|
case "pack":
|
||||||
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
|
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
|
||||||
buf, err := backend.LoadAll(gopts.ctx, repo.Backend(), h)
|
buf, err := backend.LoadAll(gopts.ctx, nil, repo.Backend(), h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,7 @@ func (s *Suite) TestConfig(t *testing.T) {
|
||||||
var testString = "Config"
|
var testString = "Config"
|
||||||
|
|
||||||
// create config and read it back
|
// create config and read it back
|
||||||
_, err := backend.LoadAll(context.TODO(), b, restic.Handle{Type: restic.ConfigFile})
|
_, err := backend.LoadAll(context.TODO(), nil, b, restic.Handle{Type: restic.ConfigFile})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("did not get expected error for non-existing config")
|
t.Fatalf("did not get expected error for non-existing config")
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ func (s *Suite) TestConfig(t *testing.T) {
|
||||||
// same config
|
// same config
|
||||||
for _, name := range []string{"", "foo", "bar", "0000000000000000000000000000000000000000000000000000000000000000"} {
|
for _, name := range []string{"", "foo", "bar", "0000000000000000000000000000000000000000000000000000000000000000"} {
|
||||||
h := restic.Handle{Type: restic.ConfigFile, Name: name}
|
h := restic.Handle{Type: restic.ConfigFile, Name: name}
|
||||||
buf, err := backend.LoadAll(context.TODO(), b, h)
|
buf, err := backend.LoadAll(context.TODO(), nil, b, h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unable to read config with name %q: %+v", name, err)
|
t.Fatalf("unable to read config with name %q: %+v", name, err)
|
||||||
}
|
}
|
||||||
|
@ -491,7 +491,7 @@ func (s *Suite) TestSave(t *testing.T) {
|
||||||
err := b.Save(context.TODO(), h, restic.NewByteReader(data))
|
err := b.Save(context.TODO(), h, restic.NewByteReader(data))
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
|
|
||||||
buf, err := backend.LoadAll(context.TODO(), b, h)
|
buf, err := backend.LoadAll(context.TODO(), nil, b, h)
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
if len(buf) != len(data) {
|
if len(buf) != len(data) {
|
||||||
t.Fatalf("number of bytes does not match, want %v, got %v", len(data), len(buf))
|
t.Fatalf("number of bytes does not match, want %v, got %v", len(data), len(buf))
|
||||||
|
@ -584,7 +584,7 @@ func (s *Suite) TestSaveFilenames(t *testing.T) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
buf, err := backend.LoadAll(context.TODO(), b, h)
|
buf, err := backend.LoadAll(context.TODO(), nil, b, h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("test %d failed: Load() returned %+v", i, err)
|
t.Errorf("test %d failed: Load() returned %+v", i, err)
|
||||||
continue
|
continue
|
||||||
|
@ -734,7 +734,7 @@ func (s *Suite) TestBackend(t *testing.T) {
|
||||||
|
|
||||||
// test Load()
|
// test Load()
|
||||||
h := restic.Handle{Type: tpe, Name: ts.id}
|
h := restic.Handle{Type: tpe, Name: ts.id}
|
||||||
buf, err := backend.LoadAll(context.TODO(), b, h)
|
buf, err := backend.LoadAll(context.TODO(), nil, b, h)
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
test.Equals(t, ts.data, string(buf))
|
test.Equals(t, ts.data, string(buf))
|
||||||
|
|
||||||
|
|
|
@ -1,20 +1,33 @@
|
||||||
package backend
|
package backend
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
|
||||||
|
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LoadAll reads all data stored in the backend for the handle.
|
// LoadAll reads all data stored in the backend for the handle into the given
|
||||||
func LoadAll(ctx context.Context, be restic.Backend, h restic.Handle) (buf []byte, err error) {
|
// buffer, which is truncated. If the buffer is not large enough or nil, a new
|
||||||
err = be.Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) {
|
// one is allocated.
|
||||||
buf, ierr = ioutil.ReadAll(rd)
|
func LoadAll(ctx context.Context, buf []byte, be restic.Backend, h restic.Handle) ([]byte, error) {
|
||||||
return ierr
|
err := be.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||||
|
// make sure this is idempotent, in case an error occurs this function may be called multiple times!
|
||||||
|
wr := bytes.NewBuffer(buf[:0])
|
||||||
|
_, cerr := io.Copy(wr, rd)
|
||||||
|
if cerr != nil {
|
||||||
|
return cerr
|
||||||
|
}
|
||||||
|
buf = wr.Bytes()
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
return buf, err
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method.
|
// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method.
|
||||||
|
|
|
@ -19,6 +19,7 @@ const MiB = 1 << 20
|
||||||
|
|
||||||
func TestLoadAll(t *testing.T) {
|
func TestLoadAll(t *testing.T) {
|
||||||
b := mem.New()
|
b := mem.New()
|
||||||
|
var buf []byte
|
||||||
|
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
data := rtest.Random(23+i, rand.Intn(MiB)+500*KiB)
|
data := rtest.Random(23+i, rand.Intn(MiB)+500*KiB)
|
||||||
|
@ -28,7 +29,7 @@ func TestLoadAll(t *testing.T) {
|
||||||
err := b.Save(context.TODO(), h, restic.NewByteReader(data))
|
err := b.Save(context.TODO(), h, restic.NewByteReader(data))
|
||||||
rtest.OK(t, err)
|
rtest.OK(t, err)
|
||||||
|
|
||||||
buf, err := backend.LoadAll(context.TODO(), b, restic.Handle{Type: restic.DataFile, Name: id.String()})
|
buf, err := backend.LoadAll(context.TODO(), buf, b, restic.Handle{Type: restic.DataFile, Name: id.String()})
|
||||||
rtest.OK(t, err)
|
rtest.OK(t, err)
|
||||||
|
|
||||||
if len(buf) != len(data) {
|
if len(buf) != len(data) {
|
||||||
|
@ -43,55 +44,66 @@ func TestLoadAll(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLoadSmallBuffer(t *testing.T) {
|
func save(t testing.TB, be restic.Backend, buf []byte) restic.Handle {
|
||||||
b := mem.New()
|
id := restic.Hash(buf)
|
||||||
|
h := restic.Handle{Name: id.String(), Type: restic.DataFile}
|
||||||
for i := 0; i < 20; i++ {
|
err := be.Save(context.TODO(), h, restic.NewByteReader(buf))
|
||||||
data := rtest.Random(23+i, rand.Intn(MiB)+500*KiB)
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
id := restic.Hash(data)
|
|
||||||
h := restic.Handle{Name: id.String(), Type: restic.DataFile}
|
|
||||||
err := b.Save(context.TODO(), h, restic.NewByteReader(data))
|
|
||||||
rtest.OK(t, err)
|
|
||||||
|
|
||||||
buf, err := backend.LoadAll(context.TODO(), b, restic.Handle{Type: restic.DataFile, Name: id.String()})
|
|
||||||
rtest.OK(t, err)
|
|
||||||
|
|
||||||
if len(buf) != len(data) {
|
|
||||||
t.Errorf("length of returned buffer does not match, want %d, got %d", len(data), len(buf))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !bytes.Equal(buf, data) {
|
|
||||||
t.Errorf("wrong data returned")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return h
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLoadLargeBuffer(t *testing.T) {
|
func TestLoadAllAppend(t *testing.T) {
|
||||||
b := mem.New()
|
b := mem.New()
|
||||||
|
|
||||||
for i := 0; i < 20; i++ {
|
h1 := save(t, b, []byte("foobar test string"))
|
||||||
data := rtest.Random(23+i, rand.Intn(MiB)+500*KiB)
|
randomData := rtest.Random(23, rand.Intn(MiB)+500*KiB)
|
||||||
|
h2 := save(t, b, randomData)
|
||||||
|
|
||||||
id := restic.Hash(data)
|
var tests = []struct {
|
||||||
h := restic.Handle{Name: id.String(), Type: restic.DataFile}
|
handle restic.Handle
|
||||||
err := b.Save(context.TODO(), h, restic.NewByteReader(data))
|
buf []byte
|
||||||
rtest.OK(t, err)
|
want []byte
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
handle: h1,
|
||||||
|
buf: nil,
|
||||||
|
want: []byte("foobar test string"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
handle: h1,
|
||||||
|
buf: []byte("xxx"),
|
||||||
|
want: []byte("foobar test string"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
handle: h2,
|
||||||
|
buf: nil,
|
||||||
|
want: randomData,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
handle: h2,
|
||||||
|
buf: make([]byte, 0, 200),
|
||||||
|
want: randomData,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
handle: h2,
|
||||||
|
buf: []byte("foobarbaz"),
|
||||||
|
want: randomData,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
buf, err := backend.LoadAll(context.TODO(), b, restic.Handle{Type: restic.DataFile, Name: id.String()})
|
for _, test := range tests {
|
||||||
rtest.OK(t, err)
|
t.Run("", func(t *testing.T) {
|
||||||
|
buf, err := backend.LoadAll(context.TODO(), test.buf, b, test.handle)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
if len(buf) != len(data) {
|
if !bytes.Equal(buf, test.want) {
|
||||||
t.Errorf("length of returned buffer does not match, want %d, got %d", len(data), len(buf))
|
t.Errorf("wrong data returned, want %q, got %q", test.want, buf)
|
||||||
continue
|
}
|
||||||
}
|
})
|
||||||
|
|
||||||
if !bytes.Equal(buf, data) {
|
|
||||||
t.Errorf("wrong data returned")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
4
internal/cache/backend_test.go
vendored
4
internal/cache/backend_test.go
vendored
|
@ -17,7 +17,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func loadAndCompare(t testing.TB, be restic.Backend, h restic.Handle, data []byte) {
|
func loadAndCompare(t testing.TB, be restic.Backend, h restic.Handle, data []byte) {
|
||||||
buf, err := backend.LoadAll(context.TODO(), be, h)
|
buf, err := backend.LoadAll(context.TODO(), nil, be, h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -147,7 +147,7 @@ func TestErrorBackend(t *testing.T) {
|
||||||
loadTest := func(wg *sync.WaitGroup, be restic.Backend) {
|
loadTest := func(wg *sync.WaitGroup, be restic.Backend) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
buf, err := backend.LoadAll(context.TODO(), be, h)
|
buf, err := backend.LoadAll(context.TODO(), nil, be, h)
|
||||||
if err == testErr {
|
if err == testErr {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ func New(repo restic.Repository) *Checker {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultParallelism = 40
|
const defaultParallelism = 5
|
||||||
|
|
||||||
// ErrDuplicatePacks is returned when a pack is found in more than one index.
|
// ErrDuplicatePacks is returned when a pack is found in more than one index.
|
||||||
type ErrDuplicatePacks struct {
|
type ErrDuplicatePacks struct {
|
||||||
|
@ -74,82 +74,110 @@ func (err ErrOldIndexFormat) Error() string {
|
||||||
// LoadIndex loads all index files.
|
// LoadIndex loads all index files.
|
||||||
func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
|
func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
|
||||||
debug.Log("Start")
|
debug.Log("Start")
|
||||||
type indexRes struct {
|
|
||||||
Index *repository.Index
|
// track spawned goroutines using wg, create a new context which is
|
||||||
err error
|
// cancelled as soon as an error occurs.
|
||||||
ID string
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
|
type FileInfo struct {
|
||||||
|
restic.ID
|
||||||
|
Size int64
|
||||||
}
|
}
|
||||||
|
|
||||||
indexCh := make(chan indexRes)
|
type Result struct {
|
||||||
|
*repository.Index
|
||||||
|
restic.ID
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
worker := func(ctx context.Context, id restic.ID) error {
|
ch := make(chan FileInfo)
|
||||||
debug.Log("worker got index %v", id)
|
resultCh := make(chan Result)
|
||||||
idx, err := repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeIndex)
|
|
||||||
if errors.Cause(err) == repository.ErrOldIndexFormat {
|
|
||||||
debug.Log("index %v has old format", id)
|
|
||||||
hints = append(hints, ErrOldIndexFormat{id})
|
|
||||||
|
|
||||||
idx, err = repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeOldIndex)
|
// send list of index files through ch, which is closed afterwards
|
||||||
|
wg.Go(func() error {
|
||||||
|
defer close(ch)
|
||||||
|
return c.repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case ch <- FileInfo{id, size}:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
|
||||||
|
worker := func() error {
|
||||||
|
var buf []byte
|
||||||
|
for fi := range ch {
|
||||||
|
debug.Log("worker got file %v", fi.ID.Str())
|
||||||
|
var err error
|
||||||
|
var idx *repository.Index
|
||||||
|
idx, buf, err = repository.LoadIndexWithDecoder(ctx, c.repo, buf[:0], fi.ID, repository.DecodeIndex)
|
||||||
|
if errors.Cause(err) == repository.ErrOldIndexFormat {
|
||||||
|
debug.Log("index %v has old format", fi.ID.Str())
|
||||||
|
hints = append(hints, ErrOldIndexFormat{fi.ID})
|
||||||
|
|
||||||
|
idx, buf, err = repository.LoadIndexWithDecoder(ctx, c.repo, buf[:0], fi.ID, repository.DecodeOldIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = errors.Wrapf(err, "error loading index %v", fi.ID.Str())
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resultCh <- Result{idx, fi.ID, err}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = errors.Wrapf(err, "error loading index %v", id.Str())
|
|
||||||
|
|
||||||
select {
|
|
||||||
case indexCh <- indexRes{Index: idx, ID: id.String(), err: err}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
// final closes indexCh after all workers have terminated
|
||||||
defer close(indexCh)
|
final := func() error {
|
||||||
debug.Log("start loading indexes in parallel")
|
close(resultCh)
|
||||||
err := repository.FilesInParallel(ctx, c.repo.Backend(), restic.IndexFile, defaultParallelism,
|
return nil
|
||||||
repository.ParallelWorkFuncParseID(worker))
|
}
|
||||||
debug.Log("loading indexes finished, error: %v", err)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
// run workers on ch
|
||||||
defer close(done)
|
wg.Go(func() error {
|
||||||
|
return repository.RunWorkers(ctx, defaultParallelism, worker, final)
|
||||||
|
})
|
||||||
|
|
||||||
|
// receive decoded indexes
|
||||||
packToIndex := make(map[restic.ID]restic.IDSet)
|
packToIndex := make(map[restic.ID]restic.IDSet)
|
||||||
|
wg.Go(func() error {
|
||||||
|
for res := range resultCh {
|
||||||
|
debug.Log("process index %v, err %v", res.ID, res.Err)
|
||||||
|
|
||||||
for res := range indexCh {
|
if res.Err != nil {
|
||||||
debug.Log("process index %v, err %v", res.ID, res.err)
|
errs = append(errs, res.Err)
|
||||||
|
continue
|
||||||
if res.err != nil {
|
|
||||||
errs = append(errs, res.err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
idxID, err := restic.ParseID(res.ID)
|
|
||||||
if err != nil {
|
|
||||||
errs = append(errs, errors.Errorf("unable to parse as index ID: %v", res.ID))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
c.indexes[idxID] = res.Index
|
|
||||||
c.masterIndex.Insert(res.Index)
|
|
||||||
|
|
||||||
debug.Log("process blobs")
|
|
||||||
cnt := 0
|
|
||||||
for blob := range res.Index.Each(ctx) {
|
|
||||||
c.packs.Insert(blob.PackID)
|
|
||||||
c.blobs.Insert(blob.ID)
|
|
||||||
c.blobRefs.M[blob.ID] = 0
|
|
||||||
cnt++
|
|
||||||
|
|
||||||
if _, ok := packToIndex[blob.PackID]; !ok {
|
|
||||||
packToIndex[blob.PackID] = restic.NewIDSet()
|
|
||||||
}
|
}
|
||||||
packToIndex[blob.PackID].Insert(idxID)
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("%d blobs processed", cnt)
|
c.indexes[res.ID] = res.Index
|
||||||
|
c.masterIndex.Insert(res.Index)
|
||||||
|
|
||||||
|
debug.Log("process blobs")
|
||||||
|
cnt := 0
|
||||||
|
for blob := range res.Index.Each(ctx) {
|
||||||
|
c.packs.Insert(blob.PackID)
|
||||||
|
c.blobs.Insert(blob.ID)
|
||||||
|
c.blobRefs.M[blob.ID] = 0
|
||||||
|
cnt++
|
||||||
|
|
||||||
|
if _, ok := packToIndex[blob.PackID]; !ok {
|
||||||
|
packToIndex[blob.PackID] = restic.NewIDSet()
|
||||||
|
}
|
||||||
|
packToIndex[blob.PackID].Insert(res.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("%d blobs processed", cnt)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
err := wg.Wait()
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
debug.Log("checking for duplicate packs")
|
debug.Log("checking for duplicate packs")
|
||||||
|
@ -163,7 +191,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.repo.SetIndex(c.masterIndex)
|
err = c.repo.SetIndex(c.masterIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("SetIndex returned error: %v", err)
|
debug.Log("SetIndex returned error: %v", err)
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
|
@ -281,31 +309,52 @@ func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (restic.ID
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshotWorker := func(ctx context.Context, strID string) error {
|
// track spawned goroutines using wg, create a new context which is
|
||||||
id, err := restic.ParseID(strID)
|
// cancelled as soon as an error occurs.
|
||||||
if err != nil {
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
debug.Log("load snapshot %v", id)
|
ch := make(chan restic.ID)
|
||||||
|
|
||||||
treeID, err := loadTreeFromSnapshot(ctx, repo, id)
|
// send list of index files through ch, which is closed afterwards
|
||||||
if err != nil {
|
wg.Go(func() error {
|
||||||
errs.Lock()
|
defer close(ch)
|
||||||
errs.errs = append(errs.errs, err)
|
return repo.List(ctx, restic.SnapshotFile, func(id restic.ID, size int64) error {
|
||||||
errs.Unlock()
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case ch <- id:
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
// a worker receives an index ID from ch, loads the snapshot and the tree,
|
||||||
|
// and adds the result to errs and trees.
|
||||||
|
worker := func() error {
|
||||||
|
for id := range ch {
|
||||||
|
debug.Log("load snapshot %v", id)
|
||||||
|
|
||||||
|
treeID, err := loadTreeFromSnapshot(ctx, repo, id)
|
||||||
|
if err != nil {
|
||||||
|
errs.Lock()
|
||||||
|
errs.errs = append(errs.errs, err)
|
||||||
|
errs.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("snapshot %v has tree %v", id, treeID)
|
||||||
|
trees.Lock()
|
||||||
|
trees.IDs = append(trees.IDs, treeID)
|
||||||
|
trees.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
debug.Log("snapshot %v has tree %v", id, treeID)
|
|
||||||
trees.Lock()
|
|
||||||
trees.IDs = append(trees.IDs, treeID)
|
|
||||||
trees.Unlock()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := repository.FilesInParallel(ctx, repo.Backend(), restic.SnapshotFile, defaultParallelism, snapshotWorker)
|
for i := 0; i < defaultParallelism; i++ {
|
||||||
|
wg.Go(worker)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := wg.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs.errs = append(errs.errs, err)
|
errs.errs = append(errs.errs, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,141 +0,0 @@
|
||||||
package mock
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/restic/restic/internal/crypto"
|
|
||||||
"github.com/restic/restic/internal/restic"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Repository implements a mock Repository.
|
|
||||||
type Repository struct {
|
|
||||||
BackendFn func() restic.Backend
|
|
||||||
|
|
||||||
KeyFn func() *crypto.Key
|
|
||||||
|
|
||||||
SetIndexFn func(restic.Index) error
|
|
||||||
|
|
||||||
IndexFn func() restic.Index
|
|
||||||
SaveFullIndexFn func() error
|
|
||||||
SaveIndexFn func() error
|
|
||||||
LoadIndexFn func() error
|
|
||||||
|
|
||||||
ConfigFn func() restic.Config
|
|
||||||
|
|
||||||
LookupBlobSizeFn func(restic.ID, restic.BlobType) (uint, error)
|
|
||||||
|
|
||||||
ListFn func(restic.FileType, <-chan struct{}) <-chan restic.ID
|
|
||||||
ListPackFn func(restic.ID) ([]restic.Blob, int64, error)
|
|
||||||
|
|
||||||
FlushFn func() error
|
|
||||||
|
|
||||||
SaveUnpackedFn func(restic.FileType, []byte) (restic.ID, error)
|
|
||||||
SaveJSONUnpackedFn func(restic.FileType, interface{}) (restic.ID, error)
|
|
||||||
|
|
||||||
LoadJSONUnpackedFn func(restic.FileType, restic.ID, interface{}) error
|
|
||||||
LoadAndDecryptFn func(restic.FileType, restic.ID) ([]byte, error)
|
|
||||||
|
|
||||||
LoadBlobFn func(restic.BlobType, restic.ID, []byte) (int, error)
|
|
||||||
SaveBlobFn func(restic.BlobType, []byte, restic.ID) (restic.ID, error)
|
|
||||||
|
|
||||||
LoadTreeFn func(restic.ID) (*restic.Tree, error)
|
|
||||||
SaveTreeFn func(t *restic.Tree) (restic.ID, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Backend is a stub method.
|
|
||||||
func (repo Repository) Backend() restic.Backend {
|
|
||||||
return repo.BackendFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Key is a stub method.
|
|
||||||
func (repo Repository) Key() *crypto.Key {
|
|
||||||
return repo.KeyFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetIndex is a stub method.
|
|
||||||
func (repo Repository) SetIndex(idx restic.Index) error {
|
|
||||||
return repo.SetIndexFn(idx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Index is a stub method.
|
|
||||||
func (repo Repository) Index() restic.Index {
|
|
||||||
return repo.IndexFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveFullIndex is a stub method.
|
|
||||||
func (repo Repository) SaveFullIndex() error {
|
|
||||||
return repo.SaveFullIndexFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveIndex is a stub method.
|
|
||||||
func (repo Repository) SaveIndex() error {
|
|
||||||
return repo.SaveIndexFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadIndex is a stub method.
|
|
||||||
func (repo Repository) LoadIndex() error {
|
|
||||||
return repo.LoadIndexFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Config is a stub method.
|
|
||||||
func (repo Repository) Config() restic.Config {
|
|
||||||
return repo.ConfigFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// LookupBlobSize is a stub method.
|
|
||||||
func (repo Repository) LookupBlobSize(id restic.ID, t restic.BlobType) (uint, error) {
|
|
||||||
return repo.LookupBlobSizeFn(id, t)
|
|
||||||
}
|
|
||||||
|
|
||||||
// List is a stub method.
|
|
||||||
func (repo Repository) List(t restic.FileType, done <-chan struct{}) <-chan restic.ID {
|
|
||||||
return repo.ListFn(t, done)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListPack is a stub method.
|
|
||||||
func (repo Repository) ListPack(id restic.ID) ([]restic.Blob, int64, error) {
|
|
||||||
return repo.ListPackFn(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flush is a stub method.
|
|
||||||
func (repo Repository) Flush() error {
|
|
||||||
return repo.FlushFn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveUnpacked is a stub method.
|
|
||||||
func (repo Repository) SaveUnpacked(t restic.FileType, buf []byte) (restic.ID, error) {
|
|
||||||
return repo.SaveUnpackedFn(t, buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveJSONUnpacked is a stub method.
|
|
||||||
func (repo Repository) SaveJSONUnpacked(t restic.FileType, item interface{}) (restic.ID, error) {
|
|
||||||
return repo.SaveJSONUnpackedFn(t, item)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadJSONUnpacked is a stub method.
|
|
||||||
func (repo Repository) LoadJSONUnpacked(t restic.FileType, id restic.ID, item interface{}) error {
|
|
||||||
return repo.LoadJSONUnpackedFn(t, id, item)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadAndDecrypt is a stub method.
|
|
||||||
func (repo Repository) LoadAndDecrypt(t restic.FileType, id restic.ID) ([]byte, error) {
|
|
||||||
return repo.LoadAndDecryptFn(t, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadBlob is a stub method.
|
|
||||||
func (repo Repository) LoadBlob(t restic.BlobType, id restic.ID, buf []byte) (int, error) {
|
|
||||||
return repo.LoadBlobFn(t, id, buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveBlob is a stub method.
|
|
||||||
func (repo Repository) SaveBlob(t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) {
|
|
||||||
return repo.SaveBlobFn(t, buf, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadTree is a stub method.
|
|
||||||
func (repo Repository) LoadTree(id restic.ID) (*restic.Tree, error) {
|
|
||||||
return repo.LoadTreeFn(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveTree is a stub method.
|
|
||||||
func (repo Repository) SaveTree(t *restic.Tree) (restic.ID, error) {
|
|
||||||
return repo.SaveTreeFn(t)
|
|
||||||
}
|
|
|
@ -549,21 +549,21 @@ func DecodeOldIndex(buf []byte) (idx *Index, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadIndexWithDecoder loads the index and decodes it with fn.
|
// LoadIndexWithDecoder loads the index and decodes it with fn.
|
||||||
func LoadIndexWithDecoder(ctx context.Context, repo restic.Repository, id restic.ID, fn func([]byte) (*Index, error)) (idx *Index, err error) {
|
func LoadIndexWithDecoder(ctx context.Context, repo restic.Repository, buf []byte, id restic.ID, fn func([]byte) (*Index, error)) (*Index, []byte, error) {
|
||||||
debug.Log("Loading index %v", id)
|
debug.Log("Loading index %v", id)
|
||||||
|
|
||||||
buf, err := repo.LoadAndDecrypt(ctx, restic.IndexFile, id)
|
buf, err := repo.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, buf[:0], err
|
||||||
}
|
}
|
||||||
|
|
||||||
idx, err = fn(buf)
|
idx, err := fn(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("error while decoding index %v: %v", id, err)
|
debug.Log("error while decoding index %v: %v", id, err)
|
||||||
return nil, err
|
return nil, buf[:0], err
|
||||||
}
|
}
|
||||||
|
|
||||||
idx.id = id
|
idx.id = id
|
||||||
|
|
||||||
return idx, nil
|
return idx, buf, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,7 +184,7 @@ func SearchKey(ctx context.Context, s *Repository, password string, maxKeys int,
|
||||||
// LoadKey loads a key from the backend.
|
// LoadKey loads a key from the backend.
|
||||||
func LoadKey(ctx context.Context, s *Repository, name string) (k *Key, err error) {
|
func LoadKey(ctx context.Context, s *Repository, name string) (k *Key, err error) {
|
||||||
h := restic.Handle{Type: restic.KeyFile, Name: name}
|
h := restic.Handle{Type: restic.KeyFile, Name: name}
|
||||||
data, err := backend.LoadAll(ctx, s.be, h)
|
data, err := backend.LoadAll(ctx, nil, s.be, h)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,65 +0,0 @@
|
||||||
package repository
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"github.com/restic/restic/internal/debug"
|
|
||||||
"github.com/restic/restic/internal/restic"
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ParallelWorkFunc gets one file ID to work on. If an error is returned,
|
|
||||||
// processing stops. When the contect is cancelled the function should return.
|
|
||||||
type ParallelWorkFunc func(ctx context.Context, id string) error
|
|
||||||
|
|
||||||
// ParallelIDWorkFunc gets one restic.ID to work on. If an error is returned,
|
|
||||||
// processing stops. When the context is cancelled the function should return.
|
|
||||||
type ParallelIDWorkFunc func(ctx context.Context, id restic.ID) error
|
|
||||||
|
|
||||||
// FilesInParallel runs n workers of f in parallel, on the IDs that
|
|
||||||
// repo.List(t) yields. If f returns an error, the process is aborted and the
|
|
||||||
// first error is returned.
|
|
||||||
func FilesInParallel(ctx context.Context, repo restic.Lister, t restic.FileType, n int, f ParallelWorkFunc) error {
|
|
||||||
g, ctx := errgroup.WithContext(ctx)
|
|
||||||
|
|
||||||
ch := make(chan string, n)
|
|
||||||
g.Go(func() error {
|
|
||||||
defer close(ch)
|
|
||||||
return repo.List(ctx, t, func(fi restic.FileInfo) error {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
case ch <- fi.Name:
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
g.Go(func() error {
|
|
||||||
for name := range ch {
|
|
||||||
err := f(ctx, name)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return g.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ParallelWorkFuncParseID converts a function that takes a restic.ID to a
|
|
||||||
// function that takes a string. Filenames that do not parse as a restic.ID
|
|
||||||
// are ignored.
|
|
||||||
func ParallelWorkFuncParseID(f ParallelIDWorkFunc) ParallelWorkFunc {
|
|
||||||
return func(ctx context.Context, s string) error {
|
|
||||||
id, err := restic.ParseID(s)
|
|
||||||
if err != nil {
|
|
||||||
debug.Log("invalid ID %q: %v", id, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return f(ctx, id)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,129 +0,0 @@
|
||||||
package repository_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"math/rand"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/restic/restic/internal/errors"
|
|
||||||
"github.com/restic/restic/internal/restic"
|
|
||||||
|
|
||||||
"github.com/restic/restic/internal/repository"
|
|
||||||
rtest "github.com/restic/restic/internal/test"
|
|
||||||
)
|
|
||||||
|
|
||||||
type testIDs []string
|
|
||||||
|
|
||||||
var lister = testIDs{
|
|
||||||
"40bb581cd36de952985c97a3ff6b21df41ee897d4db2040354caa36a17ff5268",
|
|
||||||
"2e15811a4d14ffac66d36a9ff456019d8de4c10c949d45b643f8477d17e92ff3",
|
|
||||||
"70c11b3ed521ad6b76d905c002ca98b361fca06aca060a063432c7311155a4da",
|
|
||||||
"8056a33e75dccdda701b6c989c7ed0cb71bbb6da13c6427fe5986f0896cc91c0",
|
|
||||||
"79d8776200596aa0237b10d470f7b850b86f8a1a80988ef5c8bee2874ce992e2",
|
|
||||||
"f9f1f29791c6b79b90b35efd083f17a3b163bbbafb1a2fdf43d46d56cffda289",
|
|
||||||
"3834178d05d0f6dd07f872ee0262ff1ace0f0f375768227d3c902b0b66591369",
|
|
||||||
"66d5cc68c9186414806f366ae5493ce7f229212993750a4992be4030f6af28c5",
|
|
||||||
"ebca5af4f397944f68cd215e3dfa2b197a7ba0f7c17d65d9f7390d0a15cde296",
|
|
||||||
"d4511ce6ff732d106275a57e40745c599e987c0da44c42cddbef592aac102437",
|
|
||||||
"f366202f0bfeefaedd7b49e2f21a90d3cbddb97d257a74d788dd34e19a684dae",
|
|
||||||
"a5c17728ab2433cd50636dd5c6c7068c7a44f2999d09c46e8f528466da8a059d",
|
|
||||||
"bae0f9492b9b208233029b87692a1a55cbd7fbe1cf3f6d7bc693ac266a6d6f0e",
|
|
||||||
"9d500187913c7510d71d1902703d312c7aaa56f1e98351385b9535fdabae595e",
|
|
||||||
"ffbddd8a4c1e54d258bb3e16d3929b546b61af63cb560b3e3061a8bef5b24552",
|
|
||||||
"201bb3abf655e7ef71e79ed4fb1079b0502b5acb4d9fad5e72a0de690c50a386",
|
|
||||||
"08eb57bbd559758ea96e99f9b7688c30e7b3bcf0c4562ff4535e2d8edeffaeed",
|
|
||||||
"e50b7223b04985ff38d9e11d1cba333896ef4264f82bd5d0653a028bce70e542",
|
|
||||||
"65a9421cd59cc7b7a71dcd9076136621af607fb4701d2e5c2af23b6396cf2f37",
|
|
||||||
"995a655b3521c19b4d0c266222266d89c8fc62889597d61f45f336091e646d57",
|
|
||||||
"51ec6f0bce77ed97df2dd7ae849338c3a8155a057da927eedd66e3d61be769ad",
|
|
||||||
"7b3923a0c0666431efecdbf6cb171295ec1710b6595eebcba3b576b49d13e214",
|
|
||||||
"2cedcc3d14698bea7e4b0546f7d5d48951dd90add59e6f2d44b693fd8913717d",
|
|
||||||
"fd6770cbd54858fdbd3d7b4239b985e5599180064d93ca873f27e86e8407d011",
|
|
||||||
"9edc51d8e6e04d05c9757848c1bfbfdc8e86b6330982294632488922e59fdb1b",
|
|
||||||
"1a6c4fbb24ad724c968b2020417c3d057e6c89e49bdfb11d91006def65eab6a0",
|
|
||||||
"cb3b29808cd0adfa2dca1f3a04f98114fbccf4eb487cdd4022f49bd70eeb049b",
|
|
||||||
"f55edcb40c619e29a20e432f8aaddc83a649be2c2d1941ccdc474cd2af03d490",
|
|
||||||
"e8ccc1763a92de23566b95c3ad1414a098016ece69a885fc8a72782a7517d17c",
|
|
||||||
"0fe2e3db8c5a12ad7101a63a0fffee901be54319cfe146bead7aec851722f82d",
|
|
||||||
"36be45a6ae7c95ad97cee1b33023be324bce7a7b4b7036e24125679dd9ff5b44",
|
|
||||||
"1685ed1a57c37859fbef1f7efb7509f20b84ec17a765605de43104d2fa37884b",
|
|
||||||
"9d83629a6a004c505b100a0b5d0b246833b63aa067aa9b59e3abd6b74bc4d3a8",
|
|
||||||
"be49a66b60175c5e2ee273b42165f86ef11bb6518c1c79950bcd3f4c196c98bd",
|
|
||||||
"0fd89885d821761b4a890782908e75793028747d15ace3c6cbf0ad56582b4fa5",
|
|
||||||
"94a767519a4e352a88796604943841fea21429f3358b4d5d55596dbda7d15dce",
|
|
||||||
"8dd07994afe6e572ddc9698fb0d13a0d4c26a38b7992818a71a99d1e0ac2b034",
|
|
||||||
"f7380a6f795ed31fbeb2945c72c5fd1d45044e5ab152311e75e007fa530f5847",
|
|
||||||
"5ca1ce01458e484393d7e9c8af42b0ff37a73a2fee0f18e14cff0fb180e33014",
|
|
||||||
"8f44178be3fe0a2bd41f922576fb7a9b19d589754504be746f56c759df328fda",
|
|
||||||
"12d33847c2be711c989f37360dd7aa8537fd14972262a4530634a08fdf32a767",
|
|
||||||
"31e077f5080f78846a00093caff2b6b839519cc47516142eeba9c41d4072a605",
|
|
||||||
"14f01db8a0054e70222b76d2555d70114b4bf8a0f02084324af2df226f14a795",
|
|
||||||
"7f5dbbaf31b4551828e8e76cef408375db9fbcdcdb6b5949f2d1b0c4b8632132",
|
|
||||||
"42a5d9b9bb7e4a16f23ba916bcf87f38c1aa1f2de2ab79736f725850a8ff6a1b",
|
|
||||||
"e06f8f901ea708beba8712a11b6e2d0be7c4b018d0254204ef269bcdf5e8c6cc",
|
|
||||||
"d9ba75785bf45b0c4fd3b2365c968099242483f2f0d0c7c20306dac11fae96e9",
|
|
||||||
"428debbb280873907cef2ec099efe1566e42a59775d6ec74ded0c4048d5a6515",
|
|
||||||
"3b51049d4dae701098e55a69536fa31ad2be1adc17b631a695a40e8a294fe9c0",
|
|
||||||
"168f88aa4b105e9811f5f79439cc1a689be4eec77f3361d42f22fe8f7ddc74a9",
|
|
||||||
"0baa0ab2249b33d64449a899cb7bd8eae5231f0d4ff70f09830dc1faa2e4abee",
|
|
||||||
"0c3896d346b580306a49de29f3a78913a41e14b8461b124628c33a64636241f2",
|
|
||||||
"b18313f1651c15e100e7179aa3eb8ffa62c3581159eaf7f83156468d19781e42",
|
|
||||||
"996361f7d988e48267ccc7e930fed4637be35fe7562b8601dceb7a32313a14c8",
|
|
||||||
"dfb4e6268437d53048d22b811048cd045df15693fc6789affd002a0fc80a6e60",
|
|
||||||
"34dd044c228727f2226a0c9c06a3e5ceb5e30e31cb7854f8fa1cde846b395a58",
|
|
||||||
}
|
|
||||||
|
|
||||||
func (tests testIDs) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error {
|
|
||||||
for i := 0; i < 500; i++ {
|
|
||||||
for _, id := range tests {
|
|
||||||
if ctx.Err() != nil {
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
fi := restic.FileInfo{
|
|
||||||
Name: id,
|
|
||||||
}
|
|
||||||
|
|
||||||
err := fn(fi)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestFilesInParallel(t *testing.T) {
|
|
||||||
f := func(ctx context.Context, id string) error {
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for n := 1; n < 5; n++ {
|
|
||||||
err := repository.FilesInParallel(context.TODO(), lister, restic.DataFile, n*100, f)
|
|
||||||
rtest.OK(t, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var errTest = errors.New("test error")
|
|
||||||
|
|
||||||
func TestFilesInParallelWithError(t *testing.T) {
|
|
||||||
f := func(ctx context.Context, id string) error {
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
|
|
||||||
if rand.Float32() < 0.01 {
|
|
||||||
return errTest
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for n := 1; n < 5; n++ {
|
|
||||||
err := repository.FilesInParallel(context.TODO(), lister, restic.DataFile, n*100, f)
|
|
||||||
if err != errTest {
|
|
||||||
t.Fatalf("wrong error returned, want %q, got %v", errTest, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/backend"
|
|
||||||
"github.com/restic/restic/internal/cache"
|
"github.com/restic/restic/internal/cache"
|
||||||
"github.com/restic/restic/internal/crypto"
|
"github.com/restic/restic/internal/crypto"
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
|
@ -18,6 +17,7 @@ import (
|
||||||
"github.com/restic/restic/internal/hashing"
|
"github.com/restic/restic/internal/hashing"
|
||||||
"github.com/restic/restic/internal/pack"
|
"github.com/restic/restic/internal/pack"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Repository is used to access a repository in a backend.
|
// Repository is used to access a repository in a backend.
|
||||||
|
@ -66,15 +66,29 @@ func (r *Repository) PrefixLength(t restic.FileType) (int, error) {
|
||||||
return restic.PrefixLength(r.be, t)
|
return restic.PrefixLength(r.be, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadAndDecrypt loads and decrypts data identified by t and id from the
|
// LoadAndDecrypt loads and decrypts the file with the given type and ID, using
|
||||||
// backend.
|
// the supplied buffer (which must be empty). If the buffer is nil, a new
|
||||||
func (r *Repository) LoadAndDecrypt(ctx context.Context, t restic.FileType, id restic.ID) (buf []byte, err error) {
|
// buffer will be allocated and returned.
|
||||||
|
func (r *Repository) LoadAndDecrypt(ctx context.Context, buf []byte, t restic.FileType, id restic.ID) ([]byte, error) {
|
||||||
|
if len(buf) != 0 {
|
||||||
|
panic("buf is not empty")
|
||||||
|
}
|
||||||
|
|
||||||
debug.Log("load %v with id %v", t, id)
|
debug.Log("load %v with id %v", t, id)
|
||||||
|
|
||||||
h := restic.Handle{Type: t, Name: id.String()}
|
h := restic.Handle{Type: t, Name: id.String()}
|
||||||
buf, err = backend.LoadAll(ctx, r.be, h)
|
err := r.be.Load(ctx, h, 0, 0, func(rd io.Reader) error {
|
||||||
|
// make sure this call is idempotent, in case an error occurs
|
||||||
|
wr := bytes.NewBuffer(buf[:0])
|
||||||
|
_, cerr := io.Copy(wr, rd)
|
||||||
|
if cerr != nil {
|
||||||
|
return cerr
|
||||||
|
}
|
||||||
|
buf = wr.Bytes()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("error loading %v: %v", h, err)
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,7 +201,7 @@ func (r *Repository) loadBlob(ctx context.Context, id restic.ID, t restic.BlobTy
|
||||||
// LoadJSONUnpacked decrypts the data and afterwards calls json.Unmarshal on
|
// LoadJSONUnpacked decrypts the data and afterwards calls json.Unmarshal on
|
||||||
// the item.
|
// the item.
|
||||||
func (r *Repository) LoadJSONUnpacked(ctx context.Context, t restic.FileType, id restic.ID, item interface{}) (err error) {
|
func (r *Repository) LoadJSONUnpacked(ctx context.Context, t restic.FileType, id restic.ID, item interface{}) (err error) {
|
||||||
buf, err := r.LoadAndDecrypt(ctx, t, id)
|
buf, err := r.LoadAndDecrypt(ctx, nil, t, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -391,45 +405,86 @@ const loadIndexParallelism = 4
|
||||||
func (r *Repository) LoadIndex(ctx context.Context) error {
|
func (r *Repository) LoadIndex(ctx context.Context) error {
|
||||||
debug.Log("Loading index")
|
debug.Log("Loading index")
|
||||||
|
|
||||||
errCh := make(chan error, 1)
|
// track spawned goroutines using wg, create a new context which is
|
||||||
indexes := make(chan *Index)
|
// cancelled as soon as an error occurs.
|
||||||
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
worker := func(ctx context.Context, id restic.ID) error {
|
type FileInfo struct {
|
||||||
idx, err := LoadIndex(ctx, r, id)
|
restic.ID
|
||||||
if err != nil {
|
Size int64
|
||||||
fmt.Fprintf(os.Stderr, "%v, ignoring\n", err)
|
}
|
||||||
|
ch := make(chan FileInfo)
|
||||||
|
indexCh := make(chan *Index)
|
||||||
|
|
||||||
|
// send list of index files through ch, which is closed afterwards
|
||||||
|
wg.Go(func() error {
|
||||||
|
defer close(ch)
|
||||||
|
return r.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case ch <- FileInfo{id, size}:
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
})
|
||||||
|
})
|
||||||
|
|
||||||
select {
|
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
|
||||||
case indexes <- idx:
|
worker := func() error {
|
||||||
case <-ctx.Done():
|
var buf []byte
|
||||||
|
for fi := range ch {
|
||||||
|
var err error
|
||||||
|
var idx *Index
|
||||||
|
idx, buf, err = LoadIndexWithDecoder(ctx, r, buf[:0], fi.ID, DecodeIndex)
|
||||||
|
if err != nil && errors.Cause(err) == ErrOldIndexFormat {
|
||||||
|
idx, buf, err = LoadIndexWithDecoder(ctx, r, buf[:0], fi.ID, DecodeOldIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case indexCh <- idx:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
// final closes indexCh after all workers have terminated
|
||||||
defer close(indexes)
|
final := func() error {
|
||||||
errCh <- FilesInParallel(ctx, r.be, restic.IndexFile, loadIndexParallelism,
|
close(indexCh)
|
||||||
ParallelWorkFuncParseID(worker))
|
return nil
|
||||||
}()
|
|
||||||
|
|
||||||
validIndex := restic.NewIDSet()
|
|
||||||
for idx := range indexes {
|
|
||||||
id, err := idx.ID()
|
|
||||||
if err == nil {
|
|
||||||
validIndex.Insert(id)
|
|
||||||
}
|
|
||||||
r.idx.Insert(idx)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err := r.PrepareCache(validIndex)
|
// run workers on ch
|
||||||
|
wg.Go(func() error {
|
||||||
|
return RunWorkers(ctx, loadIndexParallelism, worker, final)
|
||||||
|
})
|
||||||
|
|
||||||
|
// receive decoded indexes
|
||||||
|
validIndex := restic.NewIDSet()
|
||||||
|
wg.Go(func() error {
|
||||||
|
for idx := range indexCh {
|
||||||
|
id, err := idx.ID()
|
||||||
|
if err == nil {
|
||||||
|
validIndex.Insert(id)
|
||||||
|
}
|
||||||
|
r.idx.Insert(idx)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
err := wg.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return <-errCh
|
// remove index files from the cache which have been removed in the repo
|
||||||
|
err = r.PrepareCache(validIndex)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// PrepareCache initializes the local cache. indexIDs is the list of IDs of
|
// PrepareCache initializes the local cache. indexIDs is the list of IDs of
|
||||||
|
@ -495,14 +550,15 @@ func (r *Repository) PrepareCache(indexIDs restic.IDSet) error {
|
||||||
|
|
||||||
// LoadIndex loads the index id from backend and returns it.
|
// LoadIndex loads the index id from backend and returns it.
|
||||||
func LoadIndex(ctx context.Context, repo restic.Repository, id restic.ID) (*Index, error) {
|
func LoadIndex(ctx context.Context, repo restic.Repository, id restic.ID) (*Index, error) {
|
||||||
idx, err := LoadIndexWithDecoder(ctx, repo, id, DecodeIndex)
|
idx, _, err := LoadIndexWithDecoder(ctx, repo, nil, id, DecodeIndex)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return idx, nil
|
return idx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if errors.Cause(err) == ErrOldIndexFormat {
|
if errors.Cause(err) == ErrOldIndexFormat {
|
||||||
fmt.Fprintf(os.Stderr, "index %v has old format\n", id.Str())
|
fmt.Fprintf(os.Stderr, "index %v has old format\n", id.Str())
|
||||||
return LoadIndexWithDecoder(ctx, repo, id, DecodeOldIndex)
|
idx, _, err := LoadIndexWithDecoder(ctx, repo, nil, id, DecodeOldIndex)
|
||||||
|
return idx, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -244,7 +244,7 @@ func BenchmarkLoadAndDecrypt(b *testing.B) {
|
||||||
b.SetBytes(int64(length))
|
b.SetBytes(int64(length))
|
||||||
|
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
data, err := repo.LoadAndDecrypt(context.TODO(), restic.DataFile, storageID)
|
data, err := repo.LoadAndDecrypt(context.TODO(), nil, restic.DataFile, storageID)
|
||||||
rtest.OK(b, err)
|
rtest.OK(b, err)
|
||||||
if len(data) != length {
|
if len(data) != length {
|
||||||
b.Errorf("wanted %d bytes, got %d", length, len(data))
|
b.Errorf("wanted %d bytes, got %d", length, len(data))
|
||||||
|
|
35
internal/repository/worker_group.go
Normal file
35
internal/repository/worker_group.go
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RunWorkers runs count instances of workerFunc using an errgroup.Group.
|
||||||
|
// After all workers have terminated, finalFunc is run. If an error occurs in
|
||||||
|
// one of the workers, it is returned. FinalFunc is always run, regardless of
|
||||||
|
// any other previous errors.
|
||||||
|
func RunWorkers(ctx context.Context, count int, workerFunc, finalFunc func() error) error {
|
||||||
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
|
// run workers
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
wg.Go(workerFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for termination
|
||||||
|
err := wg.Wait()
|
||||||
|
|
||||||
|
// make sure finalFunc is run
|
||||||
|
finalErr := finalFunc()
|
||||||
|
|
||||||
|
// if the workers returned an error, return it to the caller (disregarding
|
||||||
|
// any error from finalFunc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// if not, return the value finalFunc returned
|
||||||
|
return finalErr
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
|
@ -101,13 +102,33 @@ func (id ID) MarshalJSON() ([]byte, error) {
|
||||||
|
|
||||||
// UnmarshalJSON parses the JSON-encoded data and stores the result in id.
|
// UnmarshalJSON parses the JSON-encoded data and stores the result in id.
|
||||||
func (id *ID) UnmarshalJSON(b []byte) error {
|
func (id *ID) UnmarshalJSON(b []byte) error {
|
||||||
var s string
|
// check string length
|
||||||
err := json.Unmarshal(b, &s)
|
if len(b) < 2 {
|
||||||
if err != nil {
|
return fmt.Errorf("invalid ID: %q", b)
|
||||||
return errors.Wrap(err, "Unmarshal")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = hex.Decode(id[:], []byte(s))
|
if len(b)%2 != 0 {
|
||||||
|
return fmt.Errorf("invalid ID length: %q", b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check string delimiters
|
||||||
|
if b[0] != '"' && b[0] != '\'' {
|
||||||
|
return fmt.Errorf("invalid start of string: %q", b[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
last := len(b) - 1
|
||||||
|
if b[0] != b[last] {
|
||||||
|
return fmt.Errorf("starting string delimiter (%q) does not match end (%q)", b[0], b[last])
|
||||||
|
}
|
||||||
|
|
||||||
|
// strip JSON string delimiters
|
||||||
|
b = b[1:last]
|
||||||
|
|
||||||
|
if len(b) != 2*len(id) {
|
||||||
|
return fmt.Errorf("invalid length for ID")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := hex.Decode(id[:], b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "hex.Decode")
|
return errors.Wrap(err, "hex.Decode")
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,10 +51,47 @@ func TestID(t *testing.T) {
|
||||||
var id3 ID
|
var id3 ID
|
||||||
err = id3.UnmarshalJSON(buf)
|
err = id3.UnmarshalJSON(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatalf("error for %q: %v", buf, err)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(id, id3) {
|
if !reflect.DeepEqual(id, id3) {
|
||||||
t.Error("ids are not equal")
|
t.Error("ids are not equal")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIDUnmarshal(t *testing.T) {
|
||||||
|
var tests = []struct {
|
||||||
|
s string
|
||||||
|
valid bool
|
||||||
|
}{
|
||||||
|
{`"`, false},
|
||||||
|
{`""`, false},
|
||||||
|
{`'`, false},
|
||||||
|
{`"`, false},
|
||||||
|
{`"c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4"`, false},
|
||||||
|
{`"c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f"`, false},
|
||||||
|
{`"c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f2"`, true},
|
||||||
|
}
|
||||||
|
|
||||||
|
wantID, err := ParseID("c3ab8ff13720e8ad9047dd39466b3c8974e592c2fa383d4a3960714caef0c4f2")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run("", func(t *testing.T) {
|
||||||
|
id := &ID{}
|
||||||
|
err := id.UnmarshalJSON([]byte(test.s))
|
||||||
|
if test.valid && err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !test.valid && err == nil {
|
||||||
|
t.Fatalf("want error for invalid value, got nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.valid && !id.Equal(wantID) {
|
||||||
|
t.Fatalf("wrong ID returned, want %s, got %s", wantID, id)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -39,8 +39,11 @@ type Repository interface {
|
||||||
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
|
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
|
||||||
SaveJSONUnpacked(context.Context, FileType, interface{}) (ID, error)
|
SaveJSONUnpacked(context.Context, FileType, interface{}) (ID, error)
|
||||||
|
|
||||||
LoadJSONUnpacked(context.Context, FileType, ID, interface{}) error
|
LoadJSONUnpacked(ctx context.Context, t FileType, id ID, dest interface{}) error
|
||||||
LoadAndDecrypt(context.Context, FileType, ID) ([]byte, error)
|
// LoadAndDecrypt loads and decrypts the file with the given type and ID,
|
||||||
|
// using the supplied buffer (which must be empty). If the buffer is nil, a
|
||||||
|
// new buffer will be allocated and returned.
|
||||||
|
LoadAndDecrypt(ctx context.Context, buf []byte, t FileType, id ID) (data []byte, err error)
|
||||||
|
|
||||||
LoadBlob(context.Context, BlobType, ID, []byte) (int, error)
|
LoadBlob(context.Context, BlobType, ID, []byte) (int, error)
|
||||||
SaveBlob(context.Context, BlobType, []byte, ID) (ID, error)
|
SaveBlob(context.Context, BlobType, []byte, ID) (ID, error)
|
||||||
|
|
Loading…
Reference in a new issue