Merge pull request #5074 from greatroar/dump

dump: Simplify writeNode and use fewer goroutines
This commit is contained in:
Michael Eischer 2024-10-16 18:33:35 +00:00 committed by GitHub
commit e21496f217
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -6,7 +6,6 @@ import (
"path" "path"
"github.com/restic/restic/internal/bloblru" "github.com/restic/restic/internal/bloblru"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/walker" "github.com/restic/restic/internal/walker"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
@ -104,75 +103,50 @@ func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error {
} }
func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error { func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error {
type loadTask struct {
id restic.ID
out chan<- []byte
}
type writeTask struct {
data <-chan []byte
}
loaderCh := make(chan loadTask)
// per worker: allows for one blob that gets download + one blob thats queue for writing
writerCh := make(chan writeTask, d.repo.Connections()*2)
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
limit := d.repo.Connections() - 1 // See below for the -1.
blobs := make(chan (<-chan []byte), limit)
wg.Go(func() error { wg.Go(func() error {
defer close(loaderCh) for ch := range blobs {
defer close(writerCh)
for _, id := range node.Content {
// non-blocking blob handover to allow the loader to load the next blob
// while the old one is still written
ch := make(chan []byte, 1)
select { select {
case loaderCh <- loadTask{id: id, out: ch}:
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} case blob := <-ch:
if _, err := w.Write(blob); err != nil {
select {
case writerCh <- writeTask{data: ch}:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
for i := uint(0); i < d.repo.Connections(); i++ {
wg.Go(func() error {
for task := range loaderCh {
blob, err := d.cache.GetOrCompute(task.id, func() ([]byte, error) {
return d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil)
})
if err != nil {
return err return err
} }
select {
case task.out <- blob:
case <-ctx.Done():
return ctx.Err()
} }
} }
return nil return nil
}) })
}
// Start short-lived goroutines to load blobs.
// There will be at most 1+cap(blobs) calling LoadBlob at any moment.
loop:
for _, id := range node.Content {
// This needs to be buffered, so that loaders can quit
// without waiting for the writer.
ch := make(chan []byte, 1)
wg.Go(func() error { wg.Go(func() error {
for result := range writerCh { blob, err := d.cache.GetOrCompute(id, func() ([]byte, error) {
select { return d.repo.LoadBlob(ctx, restic.DataBlob, id, nil)
case data := <-result.data:
if _, err := w.Write(data); err != nil {
return errors.Wrap(err, "Write")
}
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}) })
if err == nil {
ch <- blob
}
return err
})
select {
case blobs <- ch:
case <-ctx.Done():
break loop
}
}
close(blobs)
return wg.Wait() return wg.Wait()
} }