restic: Unify code to load Index/Lock/Snapshot

This commit is contained in:
Michael Eischer 2022-10-15 17:24:47 +02:00
parent 8e2695be0b
commit ae45f3b04f
4 changed files with 93 additions and 160 deletions

View file

@ -5,9 +5,7 @@ import (
"runtime" "runtime"
"sync" "sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
) )
// ForAllIndexes loads all index files in parallel and calls the given callback. // ForAllIndexes loads all index files in parallel and calls the given callback.
@ -16,68 +14,23 @@ import (
func ForAllIndexes(ctx context.Context, repo restic.Repository, func ForAllIndexes(ctx context.Context, repo restic.Repository,
fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error { fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error {
debug.Log("Start") // decoding an index can take quite some time such that this can be both CPU- or IO-bound
// as the whole index is kept in memory anyways, a few workers too much don't matter
type FileInfo struct { workerCount := repo.Connections() + uint(runtime.GOMAXPROCS(0))
restic.ID
Size int64
}
var m sync.Mutex var m sync.Mutex
return restic.ParallelList(ctx, repo.Backend(), restic.IndexFile, workerCount, func(ctx context.Context, id restic.ID, size int64) error {
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan FileInfo)
// send list of index files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- FileInfo{id, size}:
}
return nil
})
})
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
worker := func() error {
var buf []byte
for fi := range ch {
debug.Log("worker got file %v", fi.ID.Str())
var err error var err error
var idx *Index var idx *Index
oldFormat := false oldFormat := false
if cap(buf) < int(fi.Size) { buf, err := repo.LoadUnpacked(ctx, restic.IndexFile, id, nil)
// overallocate a bit
buf = make([]byte, fi.Size+128*1024)
}
buf, err = repo.LoadUnpacked(ctx, restic.IndexFile, fi.ID, buf[:0])
if err == nil { if err == nil {
idx, oldFormat, err = DecodeIndex(buf, fi.ID) idx, oldFormat, err = DecodeIndex(buf, id)
} }
m.Lock() m.Lock()
err = fn(fi.ID, idx, oldFormat, err) defer m.Unlock()
m.Unlock() return fn(id, idx, oldFormat, err)
if err != nil { })
return err
}
}
return nil
}
// decoding an index can take quite some time such that this can be both CPU- or IO-bound
// as the whole index is kept in memory anyways, a few workers too much don't matter
workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
return wg.Wait()
} }

View file

@ -12,7 +12,6 @@ import (
"time" "time"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"golang.org/x/sync/errgroup"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
) )
@ -320,50 +319,15 @@ func RemoveAllLocks(ctx context.Context, repo Repository) (uint, error) {
func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID, *Lock, error) error) error { func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID, *Lock, error) error) error {
var m sync.Mutex var m sync.Mutex
// track spawned goroutines using wg, create a new context which is // For locks decoding is nearly for free, thus just assume were only limited by IO
// cancelled as soon as an error occurs. return ParallelList(ctx, repo.Backend(), LockFile, repo.Connections(), func(ctx context.Context, id ID, size int64) error {
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan ID)
// send list of lock files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return repo.List(ctx, LockFile, func(id ID, size int64) error {
if excludeID != nil && id.Equal(*excludeID) { if excludeID != nil && id.Equal(*excludeID) {
return nil return nil
} }
select {
case <-ctx.Done():
return nil
case ch <- id:
}
return nil
})
})
// a worker receives an snapshot ID from ch, loads the snapshot
// and runs fn with id, the snapshot and the error
worker := func() error {
for id := range ch {
debug.Log("load lock %v", id)
lock, err := LoadLock(ctx, repo, id) lock, err := LoadLock(ctx, repo, id)
m.Lock() m.Lock()
err = fn(id, lock, err) defer m.Unlock()
m.Unlock() return fn(id, lock, err)
if err != nil { })
return err
}
}
return nil
}
// For locks decoding is nearly for free, thus just assume were only limited by IO
for i := 0; i < int(repo.Connections()); i++ {
wg.Go(worker)
}
return wg.Wait()
} }

View file

@ -0,0 +1,59 @@
package restic
import (
"context"
"github.com/restic/restic/internal/debug"
"golang.org/x/sync/errgroup"
)
func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, fn func(context.Context, ID, int64) error) error {
type FileIDInfo struct {
ID
Size int64
}
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan FileIDInfo)
// send list of index files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return r.List(ctx, t, func(fi FileInfo) error {
id, err := ParseID(fi.Name)
if err != nil {
debug.Log("unable to parse %v as an ID", fi.Name)
return nil
}
select {
case <-ctx.Done():
return nil
case ch <- FileIDInfo{id, fi.Size}:
}
return nil
})
})
// a worker receives an index ID from ch, loads the index, and sends it to indexCh
worker := func() error {
for fi := range ch {
debug.Log("worker got file %v", fi.ID.Str())
err := fn(ctx, fi.ID, fi.Size)
if err != nil {
return err
}
}
return nil
}
// run workers on ch
for i := uint(0); i < parallelism; i++ {
wg.Go(worker)
}
return wg.Wait()
}

View file

@ -8,8 +8,6 @@ import (
"sync" "sync"
"time" "time"
"golang.org/x/sync/errgroup"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
) )
@ -82,58 +80,17 @@ func SaveSnapshot(ctx context.Context, repo SaverUnpacked, sn *Snapshot) (ID, er
func ForAllSnapshots(ctx context.Context, be Lister, loader LoaderUnpacked, excludeIDs IDSet, fn func(ID, *Snapshot, error) error) error { func ForAllSnapshots(ctx context.Context, be Lister, loader LoaderUnpacked, excludeIDs IDSet, fn func(ID, *Snapshot, error) error) error {
var m sync.Mutex var m sync.Mutex
// track spawned goroutines using wg, create a new context which is // For most snapshots decoding is nearly for free, thus just assume were only limited by IO
// cancelled as soon as an error occurs. return ParallelList(ctx, be, SnapshotFile, loader.Connections(), func(ctx context.Context, id ID, size int64) error {
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan ID)
// send list of snapshot files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return be.List(ctx, SnapshotFile, func(fi FileInfo) error {
id, err := ParseID(fi.Name)
if err != nil {
debug.Log("unable to parse %v as an ID", fi.Name)
return nil
}
if excludeIDs.Has(id) { if excludeIDs.Has(id) {
return nil return nil
} }
select {
case <-ctx.Done():
return nil
case ch <- id:
}
return nil
})
})
// a worker receives an snapshot ID from ch, loads the snapshot
// and runs fn with id, the snapshot and the error
worker := func() error {
for id := range ch {
debug.Log("load snapshot %v", id)
sn, err := LoadSnapshot(ctx, loader, id) sn, err := LoadSnapshot(ctx, loader, id)
m.Lock() m.Lock()
err = fn(id, sn, err) defer m.Unlock()
m.Unlock() return fn(id, sn, err)
if err != nil { })
return err
}
}
return nil
}
// For most snapshots decoding is nearly for free, thus just assume were only limited by IO
for i := 0; i < int(loader.Connections()); i++ {
wg.Go(worker)
}
return wg.Wait()
} }
func (sn Snapshot) String() string { func (sn Snapshot) String() string {