Move tree walker to restic/walk
This commit is contained in:
parent
bc42dbdf87
commit
714a5d1dc4
4 changed files with 38 additions and 35 deletions
|
@ -12,6 +12,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"restic/errors"
|
"restic/errors"
|
||||||
|
"restic/walk"
|
||||||
|
|
||||||
"restic/debug"
|
"restic/debug"
|
||||||
"restic/fs"
|
"restic/fs"
|
||||||
|
@ -421,7 +422,7 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *restic.Progress, done <-c
|
||||||
}
|
}
|
||||||
|
|
||||||
type archivePipe struct {
|
type archivePipe struct {
|
||||||
Old <-chan restic.WalkTreeJob
|
Old <-chan walk.TreeJob
|
||||||
New <-chan pipe.Job
|
New <-chan pipe.Job
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -456,7 +457,7 @@ func copyJobs(done <-chan struct{}, in <-chan pipe.Job, out chan<- pipe.Job) {
|
||||||
|
|
||||||
type archiveJob struct {
|
type archiveJob struct {
|
||||||
hasOld bool
|
hasOld bool
|
||||||
old restic.WalkTreeJob
|
old walk.TreeJob
|
||||||
new pipe.Job
|
new pipe.Job
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -470,7 +471,7 @@ func (a *archivePipe) compare(done <-chan struct{}, out chan<- pipe.Job) {
|
||||||
var (
|
var (
|
||||||
loadOld, loadNew bool = true, true
|
loadOld, loadNew bool = true, true
|
||||||
ok bool
|
ok bool
|
||||||
oldJob restic.WalkTreeJob
|
oldJob walk.TreeJob
|
||||||
newJob pipe.Job
|
newJob pipe.Job
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -667,12 +668,12 @@ func (arch *Archiver) Snapshot(p *restic.Progress, paths []string, parentID *res
|
||||||
}
|
}
|
||||||
|
|
||||||
// start walker on old tree
|
// start walker on old tree
|
||||||
ch := make(chan restic.WalkTreeJob)
|
ch := make(chan walk.TreeJob)
|
||||||
go restic.WalkTree(arch.repo, *parent.Tree, done, ch)
|
go walk.Tree(arch.repo, *parent.Tree, done, ch)
|
||||||
jobs.Old = ch
|
jobs.Old = ch
|
||||||
} else {
|
} else {
|
||||||
// use closed channel
|
// use closed channel
|
||||||
ch := make(chan restic.WalkTreeJob)
|
ch := make(chan walk.TreeJob)
|
||||||
close(ch)
|
close(ch)
|
||||||
jobs.Old = ch
|
jobs.Old = ch
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,39 +1,40 @@
|
||||||
package restic
|
package walk
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"restic"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"restic/debug"
|
"restic/debug"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WalkTreeJob is a job sent from the tree walker.
|
// TreeJob is a job sent from the tree walker.
|
||||||
type WalkTreeJob struct {
|
type TreeJob struct {
|
||||||
Path string
|
Path string
|
||||||
Error error
|
Error error
|
||||||
|
|
||||||
Node *Node
|
Node *restic.Node
|
||||||
Tree *Tree
|
Tree *restic.Tree
|
||||||
}
|
}
|
||||||
|
|
||||||
// TreeWalker traverses a tree in the repository depth-first and sends a job
|
// TreeWalker traverses a tree in the repository depth-first and sends a job
|
||||||
// for each item (file or dir) that it encounters.
|
// for each item (file or dir) that it encounters.
|
||||||
type TreeWalker struct {
|
type TreeWalker struct {
|
||||||
ch chan<- loadTreeJob
|
ch chan<- loadTreeJob
|
||||||
out chan<- WalkTreeJob
|
out chan<- TreeJob
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTreeWalker uses ch to load trees from the repository and sends jobs to
|
// NewTreeWalker uses ch to load trees from the repository and sends jobs to
|
||||||
// out.
|
// out.
|
||||||
func NewTreeWalker(ch chan<- loadTreeJob, out chan<- WalkTreeJob) *TreeWalker {
|
func NewTreeWalker(ch chan<- loadTreeJob, out chan<- TreeJob) *TreeWalker {
|
||||||
return &TreeWalker{ch: ch, out: out}
|
return &TreeWalker{ch: ch, out: out}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk starts walking the tree given by id. When the channel done is closed,
|
// Walk starts walking the tree given by id. When the channel done is closed,
|
||||||
// processing stops.
|
// processing stops.
|
||||||
func (tw *TreeWalker) Walk(path string, id ID, done chan struct{}) {
|
func (tw *TreeWalker) Walk(path string, id restic.ID, done chan struct{}) {
|
||||||
debug.Log("TreeWalker.Walk", "starting on tree %v for %v", id.Str(), path)
|
debug.Log("TreeWalker.Walk", "starting on tree %v for %v", id.Str(), path)
|
||||||
defer debug.Log("TreeWalker.Walk", "done walking tree %v for %v", id.Str(), path)
|
defer debug.Log("TreeWalker.Walk", "done walking tree %v for %v", id.Str(), path)
|
||||||
|
|
||||||
|
@ -46,7 +47,7 @@ func (tw *TreeWalker) Walk(path string, id ID, done chan struct{}) {
|
||||||
res := <-resCh
|
res := <-resCh
|
||||||
if res.err != nil {
|
if res.err != nil {
|
||||||
select {
|
select {
|
||||||
case tw.out <- WalkTreeJob{Path: path, Error: res.err}:
|
case tw.out <- TreeJob{Path: path, Error: res.err}:
|
||||||
case <-done:
|
case <-done:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -56,13 +57,13 @@ func (tw *TreeWalker) Walk(path string, id ID, done chan struct{}) {
|
||||||
tw.walk(path, res.tree, done)
|
tw.walk(path, res.tree, done)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case tw.out <- WalkTreeJob{Path: path, Tree: res.tree}:
|
case tw.out <- TreeJob{Path: path, Tree: res.tree}:
|
||||||
case <-done:
|
case <-done:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tw *TreeWalker) walk(path string, tree *Tree, done chan struct{}) {
|
func (tw *TreeWalker) walk(path string, tree *restic.Tree, done chan struct{}) {
|
||||||
debug.Log("TreeWalker.walk", "start on %q", path)
|
debug.Log("TreeWalker.walk", "start on %q", path)
|
||||||
defer debug.Log("TreeWalker.walk", "done for %q", path)
|
defer debug.Log("TreeWalker.walk", "done for %q", path)
|
||||||
|
|
||||||
|
@ -84,7 +85,7 @@ func (tw *TreeWalker) walk(path string, tree *Tree, done chan struct{}) {
|
||||||
|
|
||||||
for i, node := range tree.Nodes {
|
for i, node := range tree.Nodes {
|
||||||
p := filepath.Join(path, node.Name)
|
p := filepath.Join(path, node.Name)
|
||||||
var job WalkTreeJob
|
var job TreeJob
|
||||||
|
|
||||||
if node.Type == "dir" {
|
if node.Type == "dir" {
|
||||||
if results[i] == nil {
|
if results[i] == nil {
|
||||||
|
@ -98,9 +99,9 @@ func (tw *TreeWalker) walk(path string, tree *Tree, done chan struct{}) {
|
||||||
fmt.Fprintf(os.Stderr, "error loading tree: %v\n", res.err)
|
fmt.Fprintf(os.Stderr, "error loading tree: %v\n", res.err)
|
||||||
}
|
}
|
||||||
|
|
||||||
job = WalkTreeJob{Path: p, Tree: res.tree, Error: res.err}
|
job = TreeJob{Path: p, Tree: res.tree, Error: res.err}
|
||||||
} else {
|
} else {
|
||||||
job = WalkTreeJob{Path: p, Node: node}
|
job = TreeJob{Path: p, Node: node}
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
@ -112,16 +113,16 @@ func (tw *TreeWalker) walk(path string, tree *Tree, done chan struct{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type loadTreeResult struct {
|
type loadTreeResult struct {
|
||||||
tree *Tree
|
tree *restic.Tree
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
type loadTreeJob struct {
|
type loadTreeJob struct {
|
||||||
id ID
|
id restic.ID
|
||||||
res chan<- loadTreeResult
|
res chan<- loadTreeResult
|
||||||
}
|
}
|
||||||
|
|
||||||
type treeLoader func(ID) (*Tree, error)
|
type treeLoader func(restic.ID) (*restic.Tree, error)
|
||||||
|
|
||||||
func loadTreeWorker(wg *sync.WaitGroup, in <-chan loadTreeJob, load treeLoader, done <-chan struct{}) {
|
func loadTreeWorker(wg *sync.WaitGroup, in <-chan loadTreeJob, load treeLoader, done <-chan struct{}) {
|
||||||
debug.Log("loadTreeWorker", "start")
|
debug.Log("loadTreeWorker", "start")
|
||||||
|
@ -157,15 +158,15 @@ func loadTreeWorker(wg *sync.WaitGroup, in <-chan loadTreeJob, load treeLoader,
|
||||||
|
|
||||||
const loadTreeWorkers = 10
|
const loadTreeWorkers = 10
|
||||||
|
|
||||||
// WalkTree walks the tree specified by id recursively and sends a job for each
|
// Tree walks the tree specified by id recursively and sends a job for each
|
||||||
// file and directory it finds. When the channel done is closed, processing
|
// file and directory it finds. When the channel done is closed, processing
|
||||||
// stops.
|
// stops.
|
||||||
func WalkTree(repo TreeLoader, id ID, done chan struct{}, jobCh chan<- WalkTreeJob) {
|
func Tree(repo restic.TreeLoader, id restic.ID, done chan struct{}, jobCh chan<- TreeJob) {
|
||||||
debug.Log("WalkTree", "start on %v, start workers", id.Str())
|
debug.Log("WalkTree", "start on %v, start workers", id.Str())
|
||||||
|
|
||||||
load := func(id ID) (*Tree, error) {
|
load := func(id restic.ID) (*restic.Tree, error) {
|
||||||
tree := &Tree{}
|
tree := &restic.Tree{}
|
||||||
err := repo.LoadJSONPack(TreeBlob, id, tree)
|
err := repo.LoadJSONPack(restic.TreeBlob, id, tree)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package restic_test
|
package walk_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
@ -12,6 +12,7 @@ import (
|
||||||
"restic/pipe"
|
"restic/pipe"
|
||||||
"restic/repository"
|
"restic/repository"
|
||||||
. "restic/test"
|
. "restic/test"
|
||||||
|
"restic/walk"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWalkTree(t *testing.T) {
|
func TestWalkTree(t *testing.T) {
|
||||||
|
@ -32,8 +33,8 @@ func TestWalkTree(t *testing.T) {
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
|
||||||
// start tree walker
|
// start tree walker
|
||||||
treeJobs := make(chan restic.WalkTreeJob)
|
treeJobs := make(chan walk.TreeJob)
|
||||||
go restic.WalkTree(repo, *sn.Tree, done, treeJobs)
|
go walk.Tree(repo, *sn.Tree, done, treeJobs)
|
||||||
|
|
||||||
// start filesystem walker
|
// start filesystem walker
|
||||||
fsJobs := make(chan pipe.Job)
|
fsJobs := make(chan pipe.Job)
|
||||||
|
@ -1350,8 +1351,8 @@ func TestDelayedWalkTree(t *testing.T) {
|
||||||
dr := delayRepo{repo, 100 * time.Millisecond}
|
dr := delayRepo{repo, 100 * time.Millisecond}
|
||||||
|
|
||||||
// start tree walker
|
// start tree walker
|
||||||
treeJobs := make(chan restic.WalkTreeJob)
|
treeJobs := make(chan walk.TreeJob)
|
||||||
go restic.WalkTree(dr, root, nil, treeJobs)
|
go walk.Tree(dr, root, nil, treeJobs)
|
||||||
|
|
||||||
i := 0
|
i := 0
|
||||||
for job := range treeJobs {
|
for job := range treeJobs {
|
||||||
|
@ -1382,8 +1383,8 @@ func BenchmarkDelayedWalkTree(t *testing.B) {
|
||||||
|
|
||||||
for i := 0; i < t.N; i++ {
|
for i := 0; i < t.N; i++ {
|
||||||
// start tree walker
|
// start tree walker
|
||||||
treeJobs := make(chan restic.WalkTreeJob)
|
treeJobs := make(chan walk.TreeJob)
|
||||||
go restic.WalkTree(dr, root, nil, treeJobs)
|
go walk.Tree(dr, root, nil, treeJobs)
|
||||||
|
|
||||||
for _ = range treeJobs {
|
for _ = range treeJobs {
|
||||||
}
|
}
|
Loading…
Reference in a new issue