This commit is contained in:
Alexander Neumann 2015-02-15 14:44:54 +01:00
parent 2f1137bac4
commit f8f8107d55
8 changed files with 171 additions and 59 deletions

View file

@ -12,11 +12,12 @@ import (
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
"github.com/restic/restic/chunker" "github.com/restic/restic/chunker"
"github.com/restic/restic/debug" "github.com/restic/restic/debug"
"github.com/restic/restic/pipe"
) )
const ( const (
maxConcurrentFiles = 16 maxConcurrentBlobs = 32
maxConcurrentBlobs = 16 maxConcurrency = 10
// chunkerBufSize is used in pool.go // chunkerBufSize is used in pool.go
chunkerBufSize = 512 * chunker.KiB chunkerBufSize = 512 * chunker.KiB
@ -26,7 +27,6 @@ type Archiver struct {
s Server s Server
m *Map m *Map
fileToken chan struct{}
blobToken chan struct{} blobToken chan struct{}
Error func(dir string, fi os.FileInfo, err error) error Error func(dir string, fi os.FileInfo, err error) error
@ -40,15 +40,10 @@ func NewArchiver(s Server, p *Progress) (*Archiver, error) {
arch := &Archiver{ arch := &Archiver{
s: s, s: s,
p: p, p: p,
fileToken: make(chan struct{}, maxConcurrentFiles),
blobToken: make(chan struct{}, maxConcurrentBlobs), blobToken: make(chan struct{}, maxConcurrentBlobs),
} }
// fill file and blob token // fill blob token
for i := 0; i < maxConcurrentFiles; i++ {
arch.fileToken <- struct{}{}
}
for i := 0; i < maxConcurrentBlobs; i++ { for i := 0; i < maxConcurrentBlobs; i++ {
arch.blobToken <- struct{}{} arch.blobToken <- struct{}{}
} }
@ -283,16 +278,10 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
} }
if len(node.Content) == 0 { if len(node.Content) == 0 {
// get token
token := <-arch.fileToken
// start goroutine // start goroutine
wg.Add(1) wg.Add(1)
go func(n *Node) { go func(n *Node) {
defer wg.Done() defer wg.Done()
defer func() {
arch.fileToken <- token
}()
var blobs Blobs var blobs Blobs
blobs, n.err = arch.SaveFile(n) blobs, n.err = arch.SaveFile(n)
@ -354,27 +343,143 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
return blob, nil return blob, nil
} }
func (arch *Archiver) Snapshot(dir string, t *Tree, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { func (arch *Archiver) Snapshot(path string, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) {
debug.Break("Archiver.Snapshot") debug.Break("Archiver.Snapshot")
arch.p.Start() arch.p.Start()
defer arch.p.Done() defer arch.p.Done()
sn, err := NewSnapshot(dir) sn, err := NewSnapshot(path)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
sn.Parent = parentSnapshot sn.Parent = parentSnapshot
blob, err := arch.saveTree(t) done := make(chan struct{})
entCh := make(chan pipe.Entry)
dirCh := make(chan pipe.Dir)
fileWorker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry) {
defer wg.Done()
for {
select {
case e, ok := <-entCh:
if !ok {
// channel is closed
return
}
node, err := NodeFromFileInfo(e.Path, e.Info)
if err != nil {
panic(err)
}
if node.Type == "file" {
node.blobs, err = arch.SaveFile(node)
if err != nil {
panic(err)
}
}
e.Result <- node
case <-done:
// pipeline was cancelled
return
}
}
}
dirWorker := func(wg *sync.WaitGroup, done <-chan struct{}, dirCh <-chan pipe.Dir) {
defer wg.Done()
for {
select {
case dir, ok := <-dirCh:
if !ok {
// channel is closed
return
}
tree := NewTree()
// wait for all content
for _, ch := range dir.Entries {
node := (<-ch).(*Node)
tree.Insert(node)
if node.Type == "dir" {
debug.Log("Archiver.DirWorker", "got tree node for %s: %v", node.path, node.blobs)
}
for _, blob := range node.blobs {
tree.Map.Insert(blob)
arch.m.Insert(blob)
}
}
node, err := NodeFromFileInfo(dir.Path, dir.Info)
if err != nil {
node.Error = err.Error()
dir.Result <- node
continue
}
blob, err := arch.SaveTreeJSON(tree)
if err != nil {
panic(err)
}
debug.Log("Archiver.DirWorker", "save tree for %s: %v", dir.Path, blob)
node.Subtree = blob.ID
node.blobs = Blobs{blob}
dir.Result <- node
case <-done:
// pipeline was cancelled
return
}
}
}
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(2)
go fileWorker(&wg, done, entCh)
go dirWorker(&wg, done, dirCh)
}
resCh, err := pipe.Walk(path, done, entCh, dirCh)
if err != nil {
close(done)
}
// wait for all workers to terminate
wg.Wait()
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
sn.Tree = blob
// wait for top-level node
node := (<-resCh).(*Node)
// add tree for top-level directory
tree := NewTree()
tree.Insert(node)
for _, blob := range node.blobs {
blob = arch.m.Insert(blob)
tree.Map.Insert(blob)
}
tb, err := arch.SaveTreeJSON(tree)
if err != nil {
return nil, nil, err
}
sn.Tree = tb
// save snapshot // save snapshot
blob, err = arch.s.SaveJSON(backend.Snapshot, sn) blob, err := arch.s.SaveJSON(backend.Snapshot, sn)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View file

@ -140,13 +140,10 @@ func BenchmarkArchiveDirectory(b *testing.B) {
key := setupKey(b, be, "geheim") key := setupKey(b, be, "geheim")
server := restic.NewServerWithKey(be, key) server := restic.NewServerWithKey(be, key)
tree, err := restic.NewScanner(nil).Scan(*benchArchiveDirectory)
ok(b, err)
arch, err := restic.NewArchiver(server, nil) arch, err := restic.NewArchiver(server, nil)
ok(b, err) ok(b, err)
_, id, err := arch.Snapshot(*benchArchiveDirectory, tree, nil) _, id, err := arch.Snapshot(*benchArchiveDirectory, nil)
b.Logf("snapshot archived as %v", id) b.Logf("snapshot archived as %v", id)
} }

View file

@ -183,7 +183,7 @@ func (cmd CmdBackup) Execute(args []string) error {
return nil return nil
} }
_, id, err := arch.Snapshot(target, newTree, parentSnapshotID) _, id, err := arch.Snapshot(target, parentSnapshotID)
if err != nil { if err != nil {
return err return err
} }

View file

@ -35,8 +35,9 @@ type Node struct {
tree *Tree tree *Tree
path string path string
err error err error
blobs Blobs
} }
func (n Node) String() string { func (n Node) String() string {

View file

@ -17,6 +17,7 @@ type Entry struct {
type Dir struct { type Dir struct {
Path string Path string
Error error Error error
Info os.FileInfo
Entries [](<-chan interface{}) Entries [](<-chan interface{})
Result chan<- interface{} Result chan<- interface{}
@ -59,7 +60,6 @@ func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir,
names, err := readDirNames(path) names, err := readDirNames(path)
if err != nil { if err != nil {
dirCh <- Dir{Path: path, Error: err}
return err return err
} }
@ -67,26 +67,28 @@ func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir,
for _, name := range names { for _, name := range names {
subpath := filepath.Join(path, name) subpath := filepath.Join(path, name)
ch := make(chan interface{}, 1) ch := make(chan interface{}, 1)
entries = append(entries, ch)
fi, err := os.Lstat(subpath) fi, err := os.Lstat(subpath)
if err != nil { if err != nil {
entries = append(entries, ch) // entCh <- Entry{Info: fi, Error: err, Result: ch}
entCh <- Entry{Info: fi, Error: err, Result: ch} return err
continue
} }
if isFile(fi) { if isDir(fi) {
ch := make(chan interface{}, 1) err = walk(subpath, done, entCh, dirCh, ch)
if err != nil {
return err
}
} else {
entCh <- Entry{Info: fi, Path: subpath, Result: ch} entCh <- Entry{Info: fi, Path: subpath, Result: ch}
} else if isDir(fi) {
ch := make(chan interface{}, 1)
entries = append(entries, ch)
walk(subpath, done, entCh, dirCh, ch)
} }
} }
dirCh <- Dir{Path: path, Entries: entries, Result: res} dirCh <- Dir{Path: path, Info: info, Entries: entries, Result: res}
return nil return nil
} }

View file

@ -33,7 +33,7 @@ func statPath(path string) (stats, error) {
if fi.IsDir() { if fi.IsDir() {
s.dirs++ s.dirs++
} else if isFile(fi) { } else {
s.files++ s.files++
} }
@ -57,8 +57,7 @@ func TestPipelineWalker(t *testing.T) {
after := stats{} after := stats{}
m := sync.Mutex{} m := sync.Mutex{}
var wg sync.WaitGroup worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) {
worker := func(done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) {
defer wg.Done() defer wg.Done()
for { for {
select { select {
@ -97,13 +96,14 @@ func TestPipelineWalker(t *testing.T) {
} }
} }
var wg sync.WaitGroup
done := make(chan struct{}) done := make(chan struct{})
entCh := make(chan pipe.Entry) entCh := make(chan pipe.Entry)
dirCh := make(chan pipe.Dir) dirCh := make(chan pipe.Dir)
for i := 0; i < *maxWorkers; i++ { for i := 0; i < *maxWorkers; i++ {
wg.Add(1) wg.Add(1)
go worker(done, entCh, dirCh) go worker(&wg, done, entCh, dirCh)
} }
resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh) resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh)
@ -129,24 +129,32 @@ func BenchmarkPipelineWalker(b *testing.B) {
var max time.Duration var max time.Duration
m := sync.Mutex{} m := sync.Mutex{}
worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) { fileWorker := func(wg *sync.WaitGroup, done <-chan struct{}, ch <-chan pipe.Entry) {
defer wg.Done() defer wg.Done()
for { for {
select { select {
case e, ok := <-entCh: case e, ok := <-ch:
if !ok { if !ok {
// channel is closed // channel is closed
return return
} }
// fmt.Printf("file: %v\n", j.Path)
// simulate backup // simulate backup
time.Sleep(10 * time.Millisecond) //time.Sleep(10 * time.Millisecond)
e.Result <- true e.Result <- true
case <-done:
// pipeline was cancelled
return
}
}
}
case dir, ok := <-dirCh: dirWorker := func(wg *sync.WaitGroup, done <-chan struct{}, ch <-chan pipe.Dir) {
defer wg.Done()
for {
select {
case dir, ok := <-ch:
if !ok { if !ok {
// channel is closed // channel is closed
return return
@ -164,8 +172,6 @@ func BenchmarkPipelineWalker(b *testing.B) {
if d > max { if d > max {
max = d max = d
} }
// fmt.Printf("dir %v: %v\n", d, j.Path)
m.Unlock() m.Unlock()
dir.Result <- true dir.Result <- true
@ -177,15 +183,17 @@ func BenchmarkPipelineWalker(b *testing.B) {
} }
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
max = 0
done := make(chan struct{}) done := make(chan struct{})
entCh := make(chan pipe.Entry, 100) entCh := make(chan pipe.Entry, 200)
dirCh := make(chan pipe.Dir, 100) dirCh := make(chan pipe.Dir, 200)
var wg sync.WaitGroup var wg sync.WaitGroup
b.Logf("starting %d workers", *maxWorkers) b.Logf("starting %d workers", *maxWorkers)
for i := 0; i < *maxWorkers; i++ { for i := 0; i < *maxWorkers; i++ {
wg.Add(1) wg.Add(2)
go worker(&wg, done, entCh, dirCh) go dirWorker(&wg, done, dirCh)
go fileWorker(&wg, done, entCh)
} }
resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh) resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh)
@ -196,7 +204,7 @@ func BenchmarkPipelineWalker(b *testing.B) {
// wait for final result // wait for final result
<-resCh <-resCh
}
b.Logf("max duration for a dir: %v", max) b.Logf("max duration for a dir: %v", max)
}
} }

View file

@ -14,4 +14,4 @@ go build -a -tags debug -o "${BINDIR}/restic.debug" ./cmd/restic
go build -a -o "${BINDIR}/dirdiff" ./cmd/dirdiff go build -a -o "${BINDIR}/dirdiff" ./cmd/dirdiff
# run tests # run tests
testsuite/run.sh testsuite/run.sh "$@"

View file

@ -183,8 +183,7 @@ func (t *Tree) Insert(node *Node) error {
return ErrNodeAlreadyInTree return ErrNodeAlreadyInTree
} }
// insert blob // https://code.google.com/p/go-wiki/wiki/SliceTricks
// https://code.google.com/p/go-wiki/wiki/bliceTricks
t.Nodes = append(t.Nodes, &Node{}) t.Nodes = append(t.Nodes, &Node{})
copy(t.Nodes[pos+1:], t.Nodes[pos:]) copy(t.Nodes[pos+1:], t.Nodes[pos:])
t.Nodes[pos] = node t.Nodes[pos] = node