read packs concurrently

This commit is contained in:
Alexander Neumann 2015-12-06 17:09:06 +01:00
parent 43a23f91a6
commit 0e66a66bce
2 changed files with 46 additions and 10 deletions

View file

@ -693,18 +693,41 @@ func checkPack(r *repository.Repository, id backend.ID) error {
func (c *Checker) ReadData(errChan chan<- error, done <-chan struct{}) {
defer close(errChan)
for packID := range c.repo.List(backend.Data, done) {
debug.Log("Checker.ReadData", "checking pack %v", packID.Str())
worker := func(wg *sync.WaitGroup, in <-chan backend.ID) {
defer wg.Done()
for {
var id backend.ID
var ok bool
err := checkPack(c.repo, packID)
if err == nil {
continue
}
select {
case <-done:
return
case id, ok = <-in:
if !ok {
return
}
}
select {
case <-done:
return
case errChan <- err:
err := checkPack(c.repo, id)
if err == nil {
continue
}
select {
case <-done:
return
case errChan <- err:
}
}
}
ch := c.repo.List(backend.Data, done)
var wg sync.WaitGroup
for i := 0; i < defaultParallelism; i++ {
wg.Add(1)
go worker(&wg, ch)
}
wg.Wait()
}

View file

@ -109,6 +109,19 @@ func (cmd CmdCheck) Execute(args []string) error {
}
}
if cmd.ReadData {
cmd.global.Verbosef("reading all data\n")
errChan := make(chan error)
go chkr.ReadData(errChan, done)
for err := range errChan {
errorsFound = true
fmt.Fprintf(os.Stderr, "%v\n", err)
}
}
if errorsFound {
return errors.New("repository contains errors")
}