Merge pull request #3526 from greatroar/dump-refactor
Refactor internal/dump + concurrent load/write
This commit is contained in:
commit
78c7dd53ef
7 changed files with 111 additions and 110 deletions
|
@ -67,42 +67,31 @@ func splitPath(p string) []string {
|
||||||
return append(s, f)
|
return append(s, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
func printFromTree(ctx context.Context, tree *restic.Tree, repo restic.Repository, prefix string, pathComponents []string, writeDump dump.WriteDump) error {
|
func printFromTree(ctx context.Context, tree *restic.Tree, repo restic.Repository, prefix string, pathComponents []string, d *dump.Dumper) error {
|
||||||
if tree == nil {
|
|
||||||
return fmt.Errorf("called with a nil tree")
|
|
||||||
}
|
|
||||||
if repo == nil {
|
|
||||||
return fmt.Errorf("called with a nil repository")
|
|
||||||
}
|
|
||||||
l := len(pathComponents)
|
|
||||||
if l == 0 {
|
|
||||||
return fmt.Errorf("empty path components")
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we print / we need to assume that there are multiple nodes at that
|
// If we print / we need to assume that there are multiple nodes at that
|
||||||
// level in the tree.
|
// level in the tree.
|
||||||
if pathComponents[0] == "" {
|
if pathComponents[0] == "" {
|
||||||
if err := checkStdoutArchive(); err != nil {
|
if err := checkStdoutArchive(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return writeDump(ctx, repo, tree, "/", os.Stdout)
|
return d.DumpTree(ctx, tree, "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
item := filepath.Join(prefix, pathComponents[0])
|
item := filepath.Join(prefix, pathComponents[0])
|
||||||
|
l := len(pathComponents)
|
||||||
for _, node := range tree.Nodes {
|
for _, node := range tree.Nodes {
|
||||||
// If dumping something in the highest level it will just take the
|
// If dumping something in the highest level it will just take the
|
||||||
// first item it finds and dump that according to the switch case below.
|
// first item it finds and dump that according to the switch case below.
|
||||||
if node.Name == pathComponents[0] {
|
if node.Name == pathComponents[0] {
|
||||||
switch {
|
switch {
|
||||||
case l == 1 && dump.IsFile(node):
|
case l == 1 && dump.IsFile(node):
|
||||||
cache := dump.NewCache()
|
return d.WriteNode(ctx, node)
|
||||||
return dump.WriteNodeData(ctx, os.Stdout, repo, node, cache)
|
|
||||||
case l > 1 && dump.IsDir(node):
|
case l > 1 && dump.IsDir(node):
|
||||||
subtree, err := repo.LoadTree(ctx, *node.Subtree)
|
subtree, err := repo.LoadTree(ctx, *node.Subtree)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "cannot load subtree for %q", item)
|
return errors.Wrapf(err, "cannot load subtree for %q", item)
|
||||||
}
|
}
|
||||||
return printFromTree(ctx, subtree, repo, item, pathComponents[1:], writeDump)
|
return printFromTree(ctx, subtree, repo, item, pathComponents[1:], d)
|
||||||
case dump.IsDir(node):
|
case dump.IsDir(node):
|
||||||
if err := checkStdoutArchive(); err != nil {
|
if err := checkStdoutArchive(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -111,7 +100,7 @@ func printFromTree(ctx context.Context, tree *restic.Tree, repo restic.Repositor
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return writeDump(ctx, repo, subtree, item, os.Stdout)
|
return d.DumpTree(ctx, subtree, item)
|
||||||
case l > 1:
|
case l > 1:
|
||||||
return fmt.Errorf("%q should be a dir, but is a %q", item, node.Type)
|
return fmt.Errorf("%q should be a dir, but is a %q", item, node.Type)
|
||||||
case !dump.IsFile(node):
|
case !dump.IsFile(node):
|
||||||
|
@ -129,12 +118,8 @@ func runDump(opts DumpOptions, gopts GlobalOptions, args []string) error {
|
||||||
return errors.Fatal("no file and no snapshot ID specified")
|
return errors.Fatal("no file and no snapshot ID specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
var wd dump.WriteDump
|
|
||||||
switch opts.Archive {
|
switch opts.Archive {
|
||||||
case "tar":
|
case "tar", "zip":
|
||||||
wd = dump.WriteTar
|
|
||||||
case "zip":
|
|
||||||
wd = dump.WriteZip
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unknown archive format %q", opts.Archive)
|
return fmt.Errorf("unknown archive format %q", opts.Archive)
|
||||||
}
|
}
|
||||||
|
@ -188,7 +173,8 @@ func runDump(opts DumpOptions, gopts GlobalOptions, args []string) error {
|
||||||
Exitf(2, "loading tree for snapshot %q failed: %v", snapshotIDString, err)
|
Exitf(2, "loading tree for snapshot %q failed: %v", snapshotIDString, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = printFromTree(ctx, tree, repo, "/", splittedPath, wd)
|
d := dump.New(opts.Archive, repo, os.Stdout)
|
||||||
|
err = printFromTree(ctx, tree, repo, "/", splittedPath, d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exitf(2, "cannot dump file: %v", err)
|
Exitf(2, "cannot dump file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,49 +11,66 @@ import (
|
||||||
"github.com/restic/restic/internal/walker"
|
"github.com/restic/restic/internal/walker"
|
||||||
)
|
)
|
||||||
|
|
||||||
// dumper implements saving node data.
|
// A Dumper writes trees and files from a repository to a Writer
|
||||||
type dumper interface {
|
// in an archive format.
|
||||||
io.Closer
|
type Dumper struct {
|
||||||
dumpNode(ctx context.Context, node *restic.Node, repo restic.Repository) error
|
cache *bloblru.Cache
|
||||||
|
format string
|
||||||
|
repo restic.Repository
|
||||||
|
w io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteDump will write the contents of the given tree to the given destination.
|
func New(format string, repo restic.Repository, w io.Writer) *Dumper {
|
||||||
// It will loop over all nodes in the tree and dump them recursively.
|
return &Dumper{
|
||||||
type WriteDump func(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, dst io.Writer) error
|
cache: bloblru.New(64 << 20),
|
||||||
|
format: format,
|
||||||
func NewCache() *bloblru.Cache {
|
repo: repo,
|
||||||
return bloblru.New(64 << 20)
|
w: w,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func writeDump(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, dmp dumper) error {
|
func (d *Dumper) DumpTree(ctx context.Context, tree *restic.Tree, rootPath string) error {
|
||||||
for _, rootNode := range tree.Nodes {
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
rootNode.Path = rootPath
|
defer cancel()
|
||||||
err := dumpTree(ctx, repo, rootNode, rootPath, dmp)
|
|
||||||
if err != nil {
|
|
||||||
// ignore subsequent errors
|
|
||||||
_ = dmp.Close()
|
|
||||||
|
|
||||||
return err
|
// ch is buffered to deal with variable download/write speeds.
|
||||||
|
ch := make(chan *restic.Node, 10)
|
||||||
|
go sendTrees(ctx, d.repo, tree, rootPath, ch)
|
||||||
|
|
||||||
|
switch d.format {
|
||||||
|
case "tar":
|
||||||
|
return d.dumpTar(ctx, ch)
|
||||||
|
case "zip":
|
||||||
|
return d.dumpZip(ctx, ch)
|
||||||
|
default:
|
||||||
|
panic("unknown dump format")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendTrees(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, ch chan *restic.Node) {
|
||||||
|
defer close(ch)
|
||||||
|
|
||||||
|
for _, root := range tree.Nodes {
|
||||||
|
root.Path = path.Join(rootPath, root.Name)
|
||||||
|
if sendNodes(ctx, repo, root, ch) != nil {
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return dmp.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func dumpTree(ctx context.Context, repo restic.Repository, rootNode *restic.Node, rootPath string, dmp dumper) error {
|
func sendNodes(ctx context.Context, repo restic.Repository, root *restic.Node, ch chan *restic.Node) error {
|
||||||
rootNode.Path = path.Join(rootNode.Path, rootNode.Name)
|
select {
|
||||||
rootPath = rootNode.Path
|
case ch <- root:
|
||||||
|
case <-ctx.Done():
|
||||||
if err := dmp.dumpNode(ctx, rootNode, repo); err != nil {
|
return ctx.Err()
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is no directory we are finished
|
// If this is no directory we are finished
|
||||||
if !IsDir(rootNode) {
|
if !IsDir(root) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err := walker.Walk(ctx, repo, *rootNode.Subtree, nil, func(_ restic.ID, nodepath string, node *restic.Node, err error) (bool, error) {
|
err := walker.Walk(ctx, repo, *root.Subtree, nil, func(_ restic.ID, nodepath string, node *restic.Node, err error) (bool, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -61,13 +78,16 @@ func dumpTree(ctx context.Context, repo restic.Repository, rootNode *restic.Node
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
node.Path = path.Join(rootPath, nodepath)
|
node.Path = path.Join(root.Path, nodepath)
|
||||||
|
|
||||||
if IsFile(node) || IsLink(node) || IsDir(node) {
|
if !IsFile(node) && !IsDir(node) && !IsLink(node) {
|
||||||
err := dmp.dumpNode(ctx, node, repo)
|
return false, nil
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case ch <- node:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
|
@ -76,21 +96,26 @@ func dumpTree(ctx context.Context, repo restic.Repository, rootNode *restic.Node
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteNodeData writes the contents of the node to the given Writer.
|
// WriteNode writes a file node's contents directly to d's Writer,
|
||||||
func WriteNodeData(ctx context.Context, w io.Writer, repo restic.Repository, node *restic.Node, cache *bloblru.Cache) error {
|
// without caring about d's format.
|
||||||
|
func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error {
|
||||||
|
return d.writeNode(ctx, d.w, node)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error {
|
||||||
var (
|
var (
|
||||||
buf []byte
|
buf []byte
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
for _, id := range node.Content {
|
for _, id := range node.Content {
|
||||||
blob, ok := cache.Get(id)
|
blob, ok := d.cache.Get(id)
|
||||||
if !ok {
|
if !ok {
|
||||||
blob, err = repo.LoadBlob(ctx, restic.DataBlob, id, buf)
|
blob, err = d.repo.LoadBlob(ctx, restic.DataBlob, id, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
buf = cache.Add(id, blob) // Reuse evicted buffer.
|
buf = d.cache.Add(id, blob) // Reuse evicted buffer.
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := w.Write(blob); err != nil {
|
if _, err := w.Write(blob); err != nil {
|
||||||
|
|
|
@ -28,7 +28,7 @@ func prepareTempdirRepoSrc(t testing.TB, src archiver.TestDir) (tempdir string,
|
||||||
|
|
||||||
type CheckDump func(t *testing.T, testDir string, testDump *bytes.Buffer) error
|
type CheckDump func(t *testing.T, testDir string, testDump *bytes.Buffer) error
|
||||||
|
|
||||||
func WriteTest(t *testing.T, wd WriteDump, cd CheckDump) {
|
func WriteTest(t *testing.T, format string, cd CheckDump) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
args archiver.TestDir
|
args archiver.TestDir
|
||||||
|
@ -92,8 +92,9 @@ func WriteTest(t *testing.T, wd WriteDump, cd CheckDump) {
|
||||||
rtest.OK(t, err)
|
rtest.OK(t, err)
|
||||||
|
|
||||||
dst := &bytes.Buffer{}
|
dst := &bytes.Buffer{}
|
||||||
if err := wd(ctx, repo, tree, tt.target, dst); err != nil {
|
d := New(format, repo, dst)
|
||||||
t.Fatalf("WriteDump() error = %v", err)
|
if err := d.DumpTree(ctx, tree, tt.target); err != nil {
|
||||||
|
t.Fatalf("Dumper.Run error = %v", err)
|
||||||
}
|
}
|
||||||
if err := cd(t, tmpdir, dst); err != nil {
|
if err := cd(t, tmpdir, dst); err != nil {
|
||||||
t.Errorf("WriteDump() = does not match: %v", err)
|
t.Errorf("WriteDump() = does not match: %v", err)
|
||||||
|
|
|
@ -3,35 +3,30 @@ package dump
|
||||||
import (
|
import (
|
||||||
"archive/tar"
|
"archive/tar"
|
||||||
"context"
|
"context"
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/bloblru"
|
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type tarDumper struct {
|
func (d *Dumper) dumpTar(ctx context.Context, ch <-chan *restic.Node) (err error) {
|
||||||
cache *bloblru.Cache
|
w := tar.NewWriter(d.w)
|
||||||
w *tar.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
// Statically ensure that tarDumper implements dumper.
|
defer func() {
|
||||||
var _ dumper = &tarDumper{}
|
if err == nil {
|
||||||
|
err = w.Close()
|
||||||
// WriteTar will write the contents of the given tree, encoded as a tar to the given destination.
|
err = errors.Wrap(err, "Close")
|
||||||
func WriteTar(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, dst io.Writer) error {
|
|
||||||
dmp := &tarDumper{
|
|
||||||
cache: NewCache(),
|
|
||||||
w: tar.NewWriter(dst),
|
|
||||||
}
|
}
|
||||||
return writeDump(ctx, repo, tree, rootPath, dmp)
|
}()
|
||||||
}
|
|
||||||
|
|
||||||
func (dmp *tarDumper) Close() error {
|
for node := range ch {
|
||||||
return dmp.w.Close()
|
if err := d.dumpNodeTar(ctx, node, w); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// copied from archive/tar.FileInfoHeader
|
// copied from archive/tar.FileInfoHeader
|
||||||
|
@ -43,7 +38,7 @@ const (
|
||||||
cISVTX = 0o1000 // Save text (sticky bit)
|
cISVTX = 0o1000 // Save text (sticky bit)
|
||||||
)
|
)
|
||||||
|
|
||||||
func (dmp *tarDumper) dumpNode(ctx context.Context, node *restic.Node, repo restic.Repository) error {
|
func (d *Dumper) dumpNodeTar(ctx context.Context, node *restic.Node, w *tar.Writer) error {
|
||||||
relPath, err := filepath.Rel("/", node.Path)
|
relPath, err := filepath.Rel("/", node.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -88,13 +83,12 @@ func (dmp *tarDumper) dumpNode(ctx context.Context, node *restic.Node, repo rest
|
||||||
header.Name += "/"
|
header.Name += "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
err = dmp.w.WriteHeader(header)
|
err = w.WriteHeader(header)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "TarHeader")
|
return errors.Wrap(err, "TarHeader")
|
||||||
}
|
}
|
||||||
|
|
||||||
return WriteNodeData(ctx, dmp.w, repo, node, dmp.cache)
|
return d.writeNode(ctx, w, node)
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseXattrs(xattrs []restic.ExtendedAttribute) map[string]string {
|
func parseXattrs(xattrs []restic.ExtendedAttribute) map[string]string {
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWriteTar(t *testing.T) {
|
func TestWriteTar(t *testing.T) {
|
||||||
WriteTest(t, WriteTar, checkTar)
|
WriteTest(t, "tar", checkTar)
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkTar(t *testing.T, testDir string, srcTar *bytes.Buffer) error {
|
func checkTar(t *testing.T, testDir string, srcTar *bytes.Buffer) error {
|
||||||
|
|
|
@ -3,36 +3,31 @@ package dump
|
||||||
import (
|
import (
|
||||||
"archive/zip"
|
"archive/zip"
|
||||||
"context"
|
"context"
|
||||||
"io"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/bloblru"
|
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type zipDumper struct {
|
func (d *Dumper) dumpZip(ctx context.Context, ch <-chan *restic.Node) (err error) {
|
||||||
cache *bloblru.Cache
|
w := zip.NewWriter(d.w)
|
||||||
w *zip.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
// Statically ensure that zipDumper implements dumper.
|
defer func() {
|
||||||
var _ dumper = &zipDumper{}
|
if err == nil {
|
||||||
|
err = w.Close()
|
||||||
// WriteZip will write the contents of the given tree, encoded as a zip to the given destination.
|
err = errors.Wrap(err, "Close")
|
||||||
func WriteZip(ctx context.Context, repo restic.Repository, tree *restic.Tree, rootPath string, dst io.Writer) error {
|
|
||||||
dmp := &zipDumper{
|
|
||||||
cache: NewCache(),
|
|
||||||
w: zip.NewWriter(dst),
|
|
||||||
}
|
}
|
||||||
return writeDump(ctx, repo, tree, rootPath, dmp)
|
}()
|
||||||
|
|
||||||
|
for node := range ch {
|
||||||
|
if err := d.dumpNodeZip(ctx, node, w); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dmp *zipDumper) Close() error {
|
func (d *Dumper) dumpNodeZip(ctx context.Context, node *restic.Node, zw *zip.Writer) error {
|
||||||
return dmp.w.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dmp *zipDumper) dumpNode(ctx context.Context, node *restic.Node, repo restic.Repository) error {
|
|
||||||
relPath, err := filepath.Rel("/", node.Path)
|
relPath, err := filepath.Rel("/", node.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -49,7 +44,7 @@ func (dmp *zipDumper) dumpNode(ctx context.Context, node *restic.Node, repo rest
|
||||||
header.Name += "/"
|
header.Name += "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
w, err := dmp.w.CreateHeader(header)
|
w, err := zw.CreateHeader(header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "ZipHeader")
|
return errors.Wrap(err, "ZipHeader")
|
||||||
}
|
}
|
||||||
|
@ -62,5 +57,5 @@ func (dmp *zipDumper) dumpNode(ctx context.Context, node *restic.Node, repo rest
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return WriteNodeData(ctx, w, repo, node, dmp.cache)
|
return d.writeNode(ctx, w, node)
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWriteZip(t *testing.T) {
|
func TestWriteZip(t *testing.T) {
|
||||||
WriteTest(t, WriteZip, checkZip)
|
WriteTest(t, "zip", checkZip)
|
||||||
}
|
}
|
||||||
|
|
||||||
func readZipFile(f *zip.File) ([]byte, error) {
|
func readZipFile(f *zip.File) ([]byte, error) {
|
||||||
|
|
Loading…
Reference in a new issue