forked from TrueCloudLab/restic
Merge pull request #3971 from MichaelEischer/parallel-list
Unify ForAllIndex/Snapshot/Lock functions
This commit is contained in:
commit
aa77702e49
6 changed files with 108 additions and 166 deletions
|
@ -13,6 +13,7 @@ import (
|
|||
"os"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/klauspost/compress/zstd"
|
||||
|
@ -105,7 +106,8 @@ type Blob struct {
|
|||
|
||||
func printPacks(ctx context.Context, repo *repository.Repository, wr io.Writer) error {
|
||||
|
||||
return repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
|
||||
var m sync.Mutex
|
||||
return restic.ParallelList(ctx, repo.Backend(), restic.PackFile, repo.Connections(), func(ctx context.Context, id restic.ID, size int64) error {
|
||||
blobs, _, err := repo.ListPack(ctx, id, size)
|
||||
if err != nil {
|
||||
Warnf("error for pack %v: %v\n", id.Str(), err)
|
||||
|
@ -125,6 +127,8 @@ func printPacks(ctx context.Context, repo *repository.Repository, wr io.Writer)
|
|||
}
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return prettyPrintJSON(wr, p)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
|
@ -56,9 +57,10 @@ func listKeys(ctx context.Context, s *repository.Repository, gopts GlobalOptions
|
|||
Created string `json:"created"`
|
||||
}
|
||||
|
||||
var m sync.Mutex
|
||||
var keys []keyInfo
|
||||
|
||||
err := s.List(ctx, restic.KeyFile, func(id restic.ID, size int64) error {
|
||||
err := restic.ParallelList(ctx, s.Backend(), restic.KeyFile, s.Connections(), func(ctx context.Context, id restic.ID, size int64) error {
|
||||
k, err := repository.LoadKey(ctx, s, id)
|
||||
if err != nil {
|
||||
Warnf("LoadKey() failed: %v\n", err)
|
||||
|
@ -73,6 +75,8 @@ func listKeys(ctx context.Context, s *repository.Repository, gopts GlobalOptions
|
|||
Created: k.Created.Local().Format(TimeFormat),
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
keys = append(keys, key)
|
||||
return nil
|
||||
})
|
||||
|
|
|
@ -5,9 +5,7 @@ import (
|
|||
"runtime"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// ForAllIndexes loads all index files in parallel and calls the given callback.
|
||||
|
@ -16,68 +14,23 @@ import (
|
|||
func ForAllIndexes(ctx context.Context, repo restic.Repository,
|
||||
fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error {
|
||||
|
||||
debug.Log("Start")
|
||||
|
||||
type FileInfo struct {
|
||||
restic.ID
|
||||
Size int64
|
||||
}
|
||||
|
||||
var m sync.Mutex
|
||||
|
||||
// track spawned goroutines using wg, create a new context which is
|
||||
// cancelled as soon as an error occurs.
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
ch := make(chan FileInfo)
|
||||
// send list of index files through ch, which is closed afterwards
|
||||
wg.Go(func() error {
|
||||
defer close(ch)
|
||||
return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case ch <- FileInfo{id, size}:
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
|
||||
worker := func() error {
|
||||
var buf []byte
|
||||
for fi := range ch {
|
||||
debug.Log("worker got file %v", fi.ID.Str())
|
||||
var err error
|
||||
var idx *Index
|
||||
oldFormat := false
|
||||
|
||||
if cap(buf) < int(fi.Size) {
|
||||
// overallocate a bit
|
||||
buf = make([]byte, fi.Size+128*1024)
|
||||
}
|
||||
buf, err = repo.LoadUnpacked(ctx, restic.IndexFile, fi.ID, buf[:0])
|
||||
if err == nil {
|
||||
idx, oldFormat, err = DecodeIndex(buf, fi.ID)
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
err = fn(fi.ID, idx, oldFormat, err)
|
||||
m.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// decoding an index can take quite some time such that this can be both CPU- or IO-bound
|
||||
// as the whole index is kept in memory anyways, a few workers too much don't matter
|
||||
workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0)
|
||||
// run workers on ch
|
||||
for i := 0; i < workerCount; i++ {
|
||||
wg.Go(worker)
|
||||
}
|
||||
workerCount := repo.Connections() + uint(runtime.GOMAXPROCS(0))
|
||||
|
||||
return wg.Wait()
|
||||
var m sync.Mutex
|
||||
return restic.ParallelList(ctx, repo.Backend(), restic.IndexFile, workerCount, func(ctx context.Context, id restic.ID, size int64) error {
|
||||
var err error
|
||||
var idx *Index
|
||||
oldFormat := false
|
||||
|
||||
buf, err := repo.LoadUnpacked(ctx, restic.IndexFile, id, nil)
|
||||
if err == nil {
|
||||
idx, oldFormat, err = DecodeIndex(buf, id)
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return fn(id, idx, oldFormat, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -7,12 +7,12 @@ import (
|
|||
"os/signal"
|
||||
"os/user"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
)
|
||||
|
@ -302,15 +302,15 @@ func RemoveStaleLocks(ctx context.Context, repo Repository) (uint, error) {
|
|||
|
||||
// RemoveAllLocks removes all locks forcefully.
|
||||
func RemoveAllLocks(ctx context.Context, repo Repository) (uint, error) {
|
||||
var processed uint
|
||||
err := repo.List(ctx, LockFile, func(id ID, size int64) error {
|
||||
var processed uint32
|
||||
err := ParallelList(ctx, repo.Backend(), LockFile, repo.Connections(), func(ctx context.Context, id ID, size int64) error {
|
||||
err := repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()})
|
||||
if err == nil {
|
||||
processed++
|
||||
atomic.AddUint32(&processed, 1)
|
||||
}
|
||||
return err
|
||||
})
|
||||
return processed, err
|
||||
return uint(processed), err
|
||||
}
|
||||
|
||||
// ForAllLocks reads all locks in parallel and calls the given callback.
|
||||
|
@ -320,50 +320,15 @@ func RemoveAllLocks(ctx context.Context, repo Repository) (uint, error) {
|
|||
func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID, *Lock, error) error) error {
|
||||
var m sync.Mutex
|
||||
|
||||
// track spawned goroutines using wg, create a new context which is
|
||||
// cancelled as soon as an error occurs.
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
ch := make(chan ID)
|
||||
|
||||
// send list of lock files through ch, which is closed afterwards
|
||||
wg.Go(func() error {
|
||||
defer close(ch)
|
||||
return repo.List(ctx, LockFile, func(id ID, size int64) error {
|
||||
if excludeID != nil && id.Equal(*excludeID) {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case ch <- id:
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
// a worker receives an snapshot ID from ch, loads the snapshot
|
||||
// and runs fn with id, the snapshot and the error
|
||||
worker := func() error {
|
||||
for id := range ch {
|
||||
debug.Log("load lock %v", id)
|
||||
lock, err := LoadLock(ctx, repo, id)
|
||||
|
||||
m.Lock()
|
||||
err = fn(id, lock, err)
|
||||
m.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// For locks decoding is nearly for free, thus just assume were only limited by IO
|
||||
for i := 0; i < int(repo.Connections()); i++ {
|
||||
wg.Go(worker)
|
||||
}
|
||||
return ParallelList(ctx, repo.Backend(), LockFile, repo.Connections(), func(ctx context.Context, id ID, size int64) error {
|
||||
if excludeID != nil && id.Equal(*excludeID) {
|
||||
return nil
|
||||
}
|
||||
lock, err := LoadLock(ctx, repo, id)
|
||||
|
||||
return wg.Wait()
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return fn(id, lock, err)
|
||||
})
|
||||
}
|
||||
|
|
59
internal/restic/parallel.go
Normal file
59
internal/restic/parallel.go
Normal file
|
@ -0,0 +1,59 @@
|
|||
package restic
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, fn func(context.Context, ID, int64) error) error {
|
||||
|
||||
type FileIDInfo struct {
|
||||
ID
|
||||
Size int64
|
||||
}
|
||||
|
||||
// track spawned goroutines using wg, create a new context which is
|
||||
// cancelled as soon as an error occurs.
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
ch := make(chan FileIDInfo)
|
||||
// send list of index files through ch, which is closed afterwards
|
||||
wg.Go(func() error {
|
||||
defer close(ch)
|
||||
return r.List(ctx, t, func(fi FileInfo) error {
|
||||
id, err := ParseID(fi.Name)
|
||||
if err != nil {
|
||||
debug.Log("unable to parse %v as an ID", fi.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case ch <- FileIDInfo{id, fi.Size}:
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
|
||||
worker := func() error {
|
||||
for fi := range ch {
|
||||
debug.Log("worker got file %v", fi.ID.Str())
|
||||
err := fn(ctx, fi.ID, fi.Size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// run workers on ch
|
||||
for i := uint(0); i < parallelism; i++ {
|
||||
wg.Go(worker)
|
||||
}
|
||||
|
||||
return wg.Wait()
|
||||
}
|
|
@ -8,8 +8,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
)
|
||||
|
||||
|
@ -82,58 +80,17 @@ func SaveSnapshot(ctx context.Context, repo SaverUnpacked, sn *Snapshot) (ID, er
|
|||
func ForAllSnapshots(ctx context.Context, be Lister, loader LoaderUnpacked, excludeIDs IDSet, fn func(ID, *Snapshot, error) error) error {
|
||||
var m sync.Mutex
|
||||
|
||||
// track spawned goroutines using wg, create a new context which is
|
||||
// cancelled as soon as an error occurs.
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
ch := make(chan ID)
|
||||
|
||||
// send list of snapshot files through ch, which is closed afterwards
|
||||
wg.Go(func() error {
|
||||
defer close(ch)
|
||||
return be.List(ctx, SnapshotFile, func(fi FileInfo) error {
|
||||
id, err := ParseID(fi.Name)
|
||||
if err != nil {
|
||||
debug.Log("unable to parse %v as an ID", fi.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
if excludeIDs.Has(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case ch <- id:
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
// a worker receives an snapshot ID from ch, loads the snapshot
|
||||
// and runs fn with id, the snapshot and the error
|
||||
worker := func() error {
|
||||
for id := range ch {
|
||||
debug.Log("load snapshot %v", id)
|
||||
sn, err := LoadSnapshot(ctx, loader, id)
|
||||
|
||||
m.Lock()
|
||||
err = fn(id, sn, err)
|
||||
m.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// For most snapshots decoding is nearly for free, thus just assume were only limited by IO
|
||||
for i := 0; i < int(loader.Connections()); i++ {
|
||||
wg.Go(worker)
|
||||
}
|
||||
return ParallelList(ctx, be, SnapshotFile, loader.Connections(), func(ctx context.Context, id ID, size int64) error {
|
||||
if excludeIDs.Has(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return wg.Wait()
|
||||
sn, err := LoadSnapshot(ctx, loader, id)
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
return fn(id, sn, err)
|
||||
})
|
||||
}
|
||||
|
||||
func (sn Snapshot) String() string {
|
||||
|
|
Loading…
Reference in a new issue