Save multiple data blobs in parallel

This commit is contained in:
Alexander Neumann 2014-11-22 22:05:39 +01:00
parent d1e4431514
commit d11688f242

View file

@ -13,6 +13,7 @@ import (
const ( const (
maxConcurrentFiles = 32 maxConcurrentFiles = 32
maxConcurrentBlobs = 32
) )
type Archiver struct { type Archiver struct {
@ -23,6 +24,7 @@ type Archiver struct {
bl *BlobList // blobs used for the current snapshot bl *BlobList // blobs used for the current snapshot
fileToken chan struct{} fileToken chan struct{}
blobToken chan struct{}
Stats Stats Stats Stats
@ -48,13 +50,18 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
be: be, be: be,
key: key, key: key,
fileToken: make(chan struct{}, maxConcurrentFiles), fileToken: make(chan struct{}, maxConcurrentFiles),
blobToken: make(chan struct{}, maxConcurrentBlobs),
} }
// fill file token // fill file and blob token
for i := 0; i < maxConcurrentFiles; i++ { for i := 0; i < maxConcurrentFiles; i++ {
arch.fileToken <- struct{}{} arch.fileToken <- struct{}{}
} }
for i := 0; i < maxConcurrentBlobs; i++ {
arch.blobToken <- struct{}{}
}
// abort on all errors // abort on all errors
arch.Error = func(string, os.FileInfo, error) error { return err } arch.Error = func(string, os.FileInfo, error) error { return err }
// allow all files // allow all files
@ -138,6 +145,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
} else { } else {
// else store all chunks // else store all chunks
chnker := chunker.New(file) chnker := chunker.New(file)
chans := [](<-chan Blob){}
for { for {
chunk, err := chnker.Next() chunk, err := chnker.Next()
@ -149,14 +157,28 @@ func (arch *Archiver) SaveFile(node *Node) error {
return err return err
} }
blob, err := arch.ch.Save(backend.Data, chunk.Data) // acquire token, start goroutine to save chunk
if err != nil { token := <-arch.blobToken
return err resCh := make(chan Blob, 1)
}
arch.saveUpdate(Stats{Bytes: blob.Size}) go func(ch chan<- Blob) {
blob, err := arch.ch.Save(backend.Data, chunk.Data)
// TODO handle error
if err != nil {
panic(err)
}
blobs = append(blobs, blob) arch.saveUpdate(Stats{Bytes: blob.Size})
arch.blobToken <- token
ch <- blob
}(resCh)
chans = append(chans, resCh)
}
blobs = []Blob{}
for _, ch := range chans {
blobs = append(blobs, <-ch)
} }
} }