Add pipe package
This commit is contained in:
parent
937c91e1cf
commit
2f1137bac4
3 changed files with 339 additions and 0 deletions
36
pipe/generic_test.go
Normal file
36
pipe/generic_test.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package pipe_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// assert fails the test if the condition is false.
|
||||||
|
func assert(tb testing.TB, condition bool, msg string, v ...interface{}) {
|
||||||
|
if !condition {
|
||||||
|
_, file, line, _ := runtime.Caller(1)
|
||||||
|
fmt.Printf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...)
|
||||||
|
tb.FailNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ok fails the test if an err is not nil.
|
||||||
|
func ok(tb testing.TB, err error) {
|
||||||
|
if err != nil {
|
||||||
|
_, file, line, _ := runtime.Caller(1)
|
||||||
|
fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error())
|
||||||
|
tb.FailNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// equals fails the test if exp is not equal to act.
|
||||||
|
func equals(tb testing.TB, exp, act interface{}) {
|
||||||
|
if !reflect.DeepEqual(exp, act) {
|
||||||
|
_, file, line, _ := runtime.Caller(1)
|
||||||
|
fmt.Printf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act)
|
||||||
|
tb.FailNow()
|
||||||
|
}
|
||||||
|
}
|
101
pipe/pipe.go
Normal file
101
pipe/pipe.go
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
package pipe
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Entry struct {
|
||||||
|
Path string
|
||||||
|
Info os.FileInfo
|
||||||
|
Error error
|
||||||
|
Result chan<- interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Dir struct {
|
||||||
|
Path string
|
||||||
|
Error error
|
||||||
|
|
||||||
|
Entries [](<-chan interface{})
|
||||||
|
Result chan<- interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// readDirNames reads the directory named by dirname and returns
|
||||||
|
// a sorted list of directory entries.
|
||||||
|
// taken from filepath/path.go
|
||||||
|
func readDirNames(dirname string) ([]string, error) {
|
||||||
|
f, err := os.Open(dirname)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
names, err := f.Readdirnames(-1)
|
||||||
|
f.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
sort.Strings(names)
|
||||||
|
return names, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isDir(fi os.FileInfo) bool {
|
||||||
|
return fi.IsDir()
|
||||||
|
}
|
||||||
|
|
||||||
|
func isFile(fi os.FileInfo) bool {
|
||||||
|
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir, res chan<- interface{}) error {
|
||||||
|
info, err := os.Lstat(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !info.IsDir() {
|
||||||
|
return fmt.Errorf("path is not a directory, cannot walk: %s", path)
|
||||||
|
}
|
||||||
|
|
||||||
|
names, err := readDirNames(path)
|
||||||
|
if err != nil {
|
||||||
|
dirCh <- Dir{Path: path, Error: err}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
entries := make([]<-chan interface{}, 0, len(names))
|
||||||
|
|
||||||
|
for _, name := range names {
|
||||||
|
subpath := filepath.Join(path, name)
|
||||||
|
ch := make(chan interface{}, 1)
|
||||||
|
|
||||||
|
fi, err := os.Lstat(subpath)
|
||||||
|
if err != nil {
|
||||||
|
entries = append(entries, ch)
|
||||||
|
entCh <- Entry{Info: fi, Error: err, Result: ch}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if isFile(fi) {
|
||||||
|
ch := make(chan interface{}, 1)
|
||||||
|
entCh <- Entry{Info: fi, Path: subpath, Result: ch}
|
||||||
|
} else if isDir(fi) {
|
||||||
|
ch := make(chan interface{}, 1)
|
||||||
|
entries = append(entries, ch)
|
||||||
|
walk(subpath, done, entCh, dirCh, ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dirCh <- Dir{Path: path, Entries: entries, Result: res}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Walk takes a path and sends a Job for each file and directory it finds below
|
||||||
|
// the path. When the channel done is closed, processing stops.
|
||||||
|
func Walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir) (<-chan interface{}, error) {
|
||||||
|
resCh := make(chan interface{}, 1)
|
||||||
|
err := walk(path, done, entCh, dirCh, resCh)
|
||||||
|
close(entCh)
|
||||||
|
close(dirCh)
|
||||||
|
return resCh, err
|
||||||
|
}
|
202
pipe/pipe_test.go
Normal file
202
pipe/pipe_test.go
Normal file
|
@ -0,0 +1,202 @@
|
||||||
|
package pipe_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/restic/restic/pipe"
|
||||||
|
)
|
||||||
|
|
||||||
|
var testWalkerPath = flag.String("test.walkerpath", ".", "pipeline walker testpath (default: .)")
|
||||||
|
var maxWorkers = flag.Int("test.workers", 100, "max concurrency (default: 100)")
|
||||||
|
|
||||||
|
func isFile(fi os.FileInfo) bool {
|
||||||
|
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
type stats struct {
|
||||||
|
dirs, files int
|
||||||
|
}
|
||||||
|
|
||||||
|
func statPath(path string) (stats, error) {
|
||||||
|
var s stats
|
||||||
|
|
||||||
|
// count files and directories with filepath.Walk()
|
||||||
|
err := filepath.Walk(*testWalkerPath, func(p string, fi os.FileInfo, err error) error {
|
||||||
|
if fi == nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if fi.IsDir() {
|
||||||
|
s.dirs++
|
||||||
|
} else if isFile(fi) {
|
||||||
|
s.files++
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
return s, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPipelineWalker(t *testing.T) {
|
||||||
|
if *testWalkerPath == "" {
|
||||||
|
t.Skipf("walkerpah not set, skipping TestPipelineWalker")
|
||||||
|
}
|
||||||
|
|
||||||
|
before, err := statPath(*testWalkerPath)
|
||||||
|
ok(t, err)
|
||||||
|
|
||||||
|
t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath,
|
||||||
|
before.dirs, before.files)
|
||||||
|
|
||||||
|
after := stats{}
|
||||||
|
m := sync.Mutex{}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
worker := func(done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case e, ok := <-entCh:
|
||||||
|
if !ok {
|
||||||
|
// channel is closed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Lock()
|
||||||
|
after.files++
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
e.Result <- true
|
||||||
|
|
||||||
|
case dir, ok := <-dirCh:
|
||||||
|
if !ok {
|
||||||
|
// channel is closed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for all content
|
||||||
|
for _, ch := range dir.Entries {
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Lock()
|
||||||
|
after.dirs++
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
dir.Result <- true
|
||||||
|
case <-done:
|
||||||
|
// pipeline was cancelled
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
entCh := make(chan pipe.Entry)
|
||||||
|
dirCh := make(chan pipe.Dir)
|
||||||
|
|
||||||
|
for i := 0; i < *maxWorkers; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go worker(done, entCh, dirCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh)
|
||||||
|
ok(t, err)
|
||||||
|
|
||||||
|
// wait for all workers to terminate
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// wait for top-level blob
|
||||||
|
<-resCh
|
||||||
|
|
||||||
|
t.Logf("walked path %s with %d dirs, %d files", *testWalkerPath,
|
||||||
|
after.dirs, after.files)
|
||||||
|
|
||||||
|
assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkPipelineWalker(b *testing.B) {
|
||||||
|
if *testWalkerPath == "" {
|
||||||
|
b.Skipf("walkerpah not set, skipping BenchPipelineWalker")
|
||||||
|
}
|
||||||
|
|
||||||
|
var max time.Duration
|
||||||
|
m := sync.Mutex{}
|
||||||
|
|
||||||
|
worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case e, ok := <-entCh:
|
||||||
|
if !ok {
|
||||||
|
// channel is closed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmt.Printf("file: %v\n", j.Path)
|
||||||
|
|
||||||
|
// simulate backup
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
e.Result <- true
|
||||||
|
|
||||||
|
case dir, ok := <-dirCh:
|
||||||
|
if !ok {
|
||||||
|
// channel is closed
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
// wait for all content
|
||||||
|
for _, ch := range dir.Entries {
|
||||||
|
<-ch
|
||||||
|
}
|
||||||
|
|
||||||
|
d := time.Since(start)
|
||||||
|
m.Lock()
|
||||||
|
if d > max {
|
||||||
|
max = d
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmt.Printf("dir %v: %v\n", d, j.Path)
|
||||||
|
m.Unlock()
|
||||||
|
|
||||||
|
dir.Result <- true
|
||||||
|
case <-done:
|
||||||
|
// pipeline was cancelled
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
done := make(chan struct{})
|
||||||
|
entCh := make(chan pipe.Entry, 100)
|
||||||
|
dirCh := make(chan pipe.Dir, 100)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
b.Logf("starting %d workers", *maxWorkers)
|
||||||
|
for i := 0; i < *maxWorkers; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go worker(&wg, done, entCh, dirCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh)
|
||||||
|
ok(b, err)
|
||||||
|
|
||||||
|
// wait for all workers to terminate
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// wait for final result
|
||||||
|
<-resCh
|
||||||
|
}
|
||||||
|
|
||||||
|
b.Logf("max duration for a dir: %v", max)
|
||||||
|
}
|
Loading…
Reference in a new issue