forked from TrueCloudLab/restic
Remove code duplication
The top-level tree is now handled by the DirWorkers
This commit is contained in:
parent
6007452fd2
commit
ab8b97c4ba
4 changed files with 114 additions and 32 deletions
46
archiver.go
46
archiver.go
|
@ -513,11 +513,21 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
node, err := NodeFromFileInfo(dir.Path(), dir.Info())
|
var (
|
||||||
if err != nil {
|
node *Node
|
||||||
node.Error = err.Error()
|
err error
|
||||||
dir.Result() <- node
|
)
|
||||||
continue
|
if dir.Path() == "" {
|
||||||
|
// if this is the top-level dir, only create a stub node
|
||||||
|
node = &Node{}
|
||||||
|
} else {
|
||||||
|
// else create note from path and fi
|
||||||
|
node, err = NodeFromFileInfo(dir.Path(), dir.Info())
|
||||||
|
if err != nil {
|
||||||
|
node.Error = err.Error()
|
||||||
|
dir.Result() <- node
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blob, err := arch.SaveTreeJSON(tree)
|
blob, err := arch.SaveTreeJSON(tree)
|
||||||
|
@ -775,28 +785,10 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn
|
||||||
|
|
||||||
debug.Log("Archiver.Snapshot", "workers terminated")
|
debug.Log("Archiver.Snapshot", "workers terminated")
|
||||||
|
|
||||||
// add the top-level tree
|
// receive the top-level tree
|
||||||
tree := NewTree()
|
root := (<-resCh).(*Node)
|
||||||
root := (<-resCh).(pipe.Dir)
|
debug.Log("Archiver.Snapshot", "root node received: %#v", root.blobs[0])
|
||||||
for i := 0; i < len(paths); i++ {
|
sn.Tree = root.blobs[0]
|
||||||
node := (<-root.Entries[i]).(*Node)
|
|
||||||
|
|
||||||
debug.Log("Archiver.Snapshot", "got toplevel node %v, %d/%d blobs", node, len(node.Content), len(node.blobs))
|
|
||||||
|
|
||||||
tree.Insert(node)
|
|
||||||
for _, blob := range node.blobs {
|
|
||||||
debug.Log("Archiver.Snapshot", " add toplevel blob %v", blob)
|
|
||||||
blob = arch.m.Insert(blob)
|
|
||||||
tree.Map.Insert(blob)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tb, err := arch.SaveTreeJSON(tree)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
sn.Tree = tb
|
|
||||||
|
|
||||||
// save snapshot
|
// save snapshot
|
||||||
blob, err := arch.s.SaveJSON(backend.Snapshot, sn)
|
blob, err := arch.s.SaveJSON(backend.Snapshot, sn)
|
||||||
|
|
12
pipe/pipe.go
12
pipe/pipe.go
|
@ -150,7 +150,6 @@ func walk(basedir, path string, done chan struct{}, jobs chan<- Job, res chan<-
|
||||||
func Walk(paths []string, done chan struct{}, jobs chan<- Job, res chan<- Result) error {
|
func Walk(paths []string, done chan struct{}, jobs chan<- Job, res chan<- Result) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
debug.Log("pipe.Walk", "output channel closed")
|
debug.Log("pipe.Walk", "output channel closed")
|
||||||
close(res)
|
|
||||||
close(jobs)
|
close(jobs)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -165,7 +164,16 @@ func Walk(paths []string, done chan struct{}, jobs chan<- Job, res chan<- Result
|
||||||
}
|
}
|
||||||
debug.Log("pipe.Walk", "walker for %v done", path)
|
debug.Log("pipe.Walk", "walker for %v done", path)
|
||||||
}
|
}
|
||||||
res <- Dir{Entries: entries}
|
|
||||||
|
debug.Log("pipe.Walk", "sending root node")
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return errCancelled
|
||||||
|
case jobs <- Dir{Entries: entries, result: res}:
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("pipe.Walk", "walker done")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,9 @@ func TestPipelineWalkerWithSplit(t *testing.T) {
|
||||||
t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath,
|
t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath,
|
||||||
before.dirs, before.files)
|
before.dirs, before.files)
|
||||||
|
|
||||||
|
// account for top level dir
|
||||||
|
before.dirs++
|
||||||
|
|
||||||
after := stats{}
|
after := stats{}
|
||||||
m := sync.Mutex{}
|
m := sync.Mutex{}
|
||||||
|
|
||||||
|
@ -142,6 +145,9 @@ func TestPipelineWalker(t *testing.T) {
|
||||||
t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath,
|
t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath,
|
||||||
before.dirs, before.files)
|
before.dirs, before.files)
|
||||||
|
|
||||||
|
// account for top level dir
|
||||||
|
before.dirs++
|
||||||
|
|
||||||
after := stats{}
|
after := stats{}
|
||||||
m := sync.Mutex{}
|
m := sync.Mutex{}
|
||||||
|
|
||||||
|
@ -305,3 +311,81 @@ func BenchmarkPipelineWalker(b *testing.B) {
|
||||||
b.Logf("max duration for a dir: %v", max)
|
b.Logf("max duration for a dir: %v", max)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPipelineWalkerMultiple(t *testing.T) {
|
||||||
|
if *testWalkerPath == "" {
|
||||||
|
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
|
||||||
|
}
|
||||||
|
|
||||||
|
paths, err := filepath.Glob(filepath.Join(*testWalkerPath, "*"))
|
||||||
|
|
||||||
|
before, err := statPath(*testWalkerPath)
|
||||||
|
ok(t, err)
|
||||||
|
|
||||||
|
t.Logf("walking paths %v with %d dirs, %d files", paths,
|
||||||
|
before.dirs, before.files)
|
||||||
|
|
||||||
|
after := stats{}
|
||||||
|
m := sync.Mutex{}
|
||||||
|
|
||||||
|
worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan pipe.Job) {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case job, ok := <-jobs:
|
||||||
|
if !ok {
|
||||||
|
// channel is closed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
assert(t, job != nil, "job is nil")
|
||||||
|
|
||||||
|
switch j := job.(type) {
|
||||||
|
case pipe.Dir:
|
||||||
|
// wait for all content
|
||||||
|
for _, ch := range j.Entries {
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Lock()
|
||||||
|
after.dirs++
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
j.Result() <- true
|
||||||
|
case pipe.Entry:
|
||||||
|
m.Lock()
|
||||||
|
after.files++
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
j.Result() <- true
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-done:
|
||||||
|
// pipeline was cancelled
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
done := make(chan struct{})
|
||||||
|
jobs := make(chan pipe.Job)
|
||||||
|
|
||||||
|
for i := 0; i < *maxWorkers; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go worker(&wg, done, jobs)
|
||||||
|
}
|
||||||
|
|
||||||
|
resCh := make(chan pipe.Result, 1)
|
||||||
|
err = pipe.Walk(paths, done, jobs, resCh)
|
||||||
|
ok(t, err)
|
||||||
|
|
||||||
|
// wait for all workers to terminate
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// wait for top-level blob
|
||||||
|
<-resCh
|
||||||
|
|
||||||
|
t.Logf("walked %d paths with %d dirs, %d files", len(paths), after.dirs, after.files)
|
||||||
|
|
||||||
|
assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
|
||||||
|
}
|
||||||
|
|
4
walk.go
4
walk.go
|
@ -43,9 +43,7 @@ func walkTree(s Server, path string, id backend.ID, done chan struct{}, jobCh ch
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if path != "" {
|
jobCh <- WalkTreeJob{Path: filepath.Join(path), Tree: t}
|
||||||
jobCh <- WalkTreeJob{Path: filepath.Join(path), Tree: t}
|
|
||||||
}
|
|
||||||
debug.Log("walkTree", "done for %q (%v)", path, id.Str())
|
debug.Log("walkTree", "done for %q (%v)", path, id.Str())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue