Merge pull request #3611 from MichaelEischer/auto-concurrency

Adjust worker goroutines to number of backend connections
This commit is contained in:
MichaelEischer 2022-07-03 12:33:32 +02:00 committed by GitHub
commit b6a38d43b3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 93 additions and 40 deletions

View file

@ -0,0 +1,18 @@
Enhancement: Adapt IO concurrency based on backend connections
Many commands used hard-coded limits for the number of concurrent operations.
This prevented speed improvements by increasing the number of connections used
by a backend.
These limits have been replaced by using the configured number of backend
connections instead. It can be controlled using the
`-o <backend-name>.connections=5` option. Commands will then automatically
scale their parallelism accordingly.
To limit the number of CPU cores used by restic, you can set the environment
variable `GOMAXPROCS` accordingly. For example to use a single CPU core, use
`GOMAXPROCS=1`.
https://github.com/restic/restic/issues/2162
https://github.com/restic/restic/issues/1467
https://github.com/restic/restic/pull/3611

View file

@ -18,8 +18,6 @@ func DeleteFilesChecked(gopts GlobalOptions, repo restic.Repository, fileList re
return deleteFiles(gopts, false, repo, fileList, fileType)
}
const numDeleteWorkers = 8
// deleteFiles deletes the given fileList of fileType in parallel
// if ignoreError=true, it will print a warning if there was an error, else it will abort.
func deleteFiles(gopts GlobalOptions, ignoreError bool, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) error {
@ -40,7 +38,9 @@ func deleteFiles(gopts GlobalOptions, ignoreError bool, repo restic.Repository,
bar := newProgressMax(!gopts.JSON && !gopts.Quiet, uint64(totalCount), "files deleted")
defer bar.Done()
for i := 0; i < numDeleteWorkers; i++ {
// deleting files is IO-bound
workerCount := repo.Connections()
for i := 0; i < int(workerCount); i++ {
wg.Go(func() error {
for id := range fileChan {
h := restic.Handle{Type: fileType, Name: id.String()}

View file

@ -30,6 +30,15 @@ to increase the number of connections. Please be aware that this increases the r
consumption of restic and that a too high connection count *will degrade performace*.
CPU Usage
=========
By default, restic uses all available CPU cores. You can set the environment variable
`GOMAXPROCS` to limit the number of used CPU cores. For example to use a single CPU core,
use `GOMAXPROCS=1`. Limiting the number of usable CPU cores, can slightly reduce the memory
usage of restic.
Compression
===========

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"io/ioutil"
"runtime"
"sort"
"sync"
@ -55,8 +56,6 @@ func New(repo restic.Repository, trackUnused bool) *Checker {
return c
}
const defaultParallelism = 5
// ErrDuplicatePacks is returned when a pack is found in more than one index.
type ErrDuplicatePacks struct {
PackID restic.ID
@ -322,7 +321,9 @@ func (c *Checker) Structure(ctx context.Context, p *progress.Counter, errChan ch
}, p)
defer close(errChan)
for i := 0; i < defaultParallelism; i++ {
// The checkTree worker only processes already decoded trees and is thus CPU-bound
workerCount := runtime.GOMAXPROCS(0)
for i := 0; i < workerCount; i++ {
wg.Go(func() error {
c.checkTreeWorker(ctx, treeStream, errChan)
return nil
@ -574,8 +575,10 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
}
ch := make(chan checkTask)
// as packs are streamed the concurrency is limited by IO
workerCount := int(c.repo.Connections())
// run workers
for i := 0; i < defaultParallelism; i++ {
for i := 0; i < workerCount; i++ {
g.Go(func() error {
// create a buffer that is large enough to be reused by repository.StreamPack
// this ensures that we can read the pack header later on

View file

@ -2,6 +2,7 @@ package repository
import (
"context"
"runtime"
"sync"
"github.com/restic/restic/internal/debug"
@ -9,8 +10,6 @@ import (
"golang.org/x/sync/errgroup"
)
const loadIndexParallelism = 5
// ForAllIndexes loads all index files in parallel and calls the given callback.
// It is guaranteed that the function is not run concurrently. If the callback
// returns an error, this function is cancelled and also returns that error.
@ -68,8 +67,11 @@ func ForAllIndexes(ctx context.Context, repo restic.Repository,
return nil
}
// decoding an index can take quite some time such that this can be both CPU- or IO-bound
// as the whole index is kept in memory anyways, a few workers too much don't matter
workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < loadIndexParallelism; i++ {
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"runtime"
"sync"
"github.com/restic/restic/internal/debug"
@ -291,8 +292,6 @@ func (mi *MasterIndex) MergeFinalIndexes() error {
return nil
}
const saveIndexParallelism = 4
// Save saves all known indexes to index files, leaving out any
// packs whose ID is contained in packBlacklist from finalized indexes.
// The new index contains the IDs of all known indexes in the "supersedes"
@ -376,8 +375,10 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverUnpacked, pack
return nil
}
// encoding an index can take quite some time such that this can be both CPU- or IO-bound
workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0)
// run workers on ch
for i := 0; i < saveIndexParallelism; i++ {
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}
err = wg.Wait()

View file

@ -12,8 +12,6 @@ import (
"golang.org/x/sync/errgroup"
)
const numRepackWorkers = 8
// Repack takes a list of packs together with a list of blobs contained in
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
// into a new pack. Returned is the list of obsolete packs which can then
@ -107,11 +105,10 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
return nil
}
connectionLimit := dstRepo.Backend().Connections() - 1
if connectionLimit > numRepackWorkers {
connectionLimit = numRepackWorkers
}
for i := 0; i < int(connectionLimit); i++ {
// as packs are streamed the concurrency is limited by IO
// reduce by one to ensure that uploading is always possible
repackWorkerCount := int(repo.Connections() - 1)
for i := 0; i < repackWorkerCount; i++ {
wg.Go(worker)
}

View file

@ -558,6 +558,10 @@ func (r *Repository) Backend() restic.Backend {
return r.be
}
func (r *Repository) Connections() uint {
return r.be.Connections()
}
// Index returns the currently used MasterIndex.
func (r *Repository) Index() restic.MasterIndex {
return r.idx
@ -606,8 +610,6 @@ func (r *Repository) LoadIndex(ctx context.Context) error {
return r.PrepareCache()
}
const listPackParallelism = 10
// CreateIndexFromPacks creates a new index by reading all given pack files (with sizes).
// The index is added to the MasterIndex but not marked as finalized.
// Returned is the list of pack files which could not be read.
@ -656,8 +658,10 @@ func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[rest
return nil
}
// decoding the pack header is usually quite fast, thus we are primarily IO-bound
workerCount := int(r.Connections())
// run workers on ch
for i := 0; i < listPackParallelism; i++ {
for i := 0; i < workerCount; i++ {
wg.Go(worker)
}

View file

@ -12,6 +12,7 @@ import (
type TreeLoader interface {
LoadTree(context.Context, ID) (*Tree, error)
LookupBlobSize(id ID, tpe BlobType) (uint, bool)
Connections() uint
}
// FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data

View file

@ -170,6 +170,10 @@ func (r ForbiddenRepo) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint,
return 0, false
}
func (r ForbiddenRepo) Connections() uint {
return 2
}
func TestFindUsedBlobsSkipsSeenBlobs(t *testing.T) {
repo, cleanup := repository.TestRepository(t)
defer cleanup()

View file

@ -287,8 +287,6 @@ func RemoveAllLocks(ctx context.Context, repo Repository) error {
})
}
const loadLockParallelism = 5
// ForAllLocks reads all locks in parallel and calls the given callback.
// It is guaranteed that the function is not run concurrently. If the
// callback returns an error, this function is cancelled and also returns that error.
@ -336,7 +334,8 @@ func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID
return nil
}
for i := 0; i < loadLockParallelism; i++ {
// For locks decoding is nearly for free, thus just assume were only limited by IO
for i := 0; i < int(repo.Connections()); i++ {
wg.Go(worker)
}

View file

@ -14,6 +14,8 @@ type Repository interface {
// Backend returns the backend used by the repository
Backend() Backend
// Connections returns the maximum number of concurrent backend operations
Connections() uint
Key() *crypto.Key
@ -64,11 +66,15 @@ type Lister interface {
// LoadJSONUnpackeder allows loading a JSON file not stored in a pack file
type LoadJSONUnpackeder interface {
// Connections returns the maximum number of concurrent backend operations
Connections() uint
LoadJSONUnpacked(ctx context.Context, t FileType, id ID, dest interface{}) error
}
// SaverUnpacked allows saving a blob not stored in a pack file
type SaverUnpacked interface {
// Connections returns the maximum number of concurrent backend operations
Connections() uint
SaveUnpacked(context.Context, FileType, []byte) (ID, error)
}

View file

@ -69,8 +69,6 @@ func LoadSnapshot(ctx context.Context, loader LoadJSONUnpackeder, id ID) (*Snaps
return sn, nil
}
const loadSnapshotParallelism = 5
// ForAllSnapshots reads all snapshots in parallel and calls the
// given function. It is guaranteed that the function is not run concurrently.
// If the called function returns an error, this function is cancelled and
@ -125,7 +123,8 @@ func ForAllSnapshots(ctx context.Context, be Lister, loader LoadJSONUnpackeder,
return nil
}
for i := 0; i < loadSnapshotParallelism; i++ {
// For most snapshots decoding is nearly for free, thus just assume were only limited by IO
for i := 0; i < int(loader.Connections()); i++ {
wg.Go(worker)
}

View file

@ -3,6 +3,7 @@ package restic
import (
"context"
"errors"
"runtime"
"sync"
"github.com/restic/restic/internal/debug"
@ -10,8 +11,6 @@ import (
"golang.org/x/sync/errgroup"
)
const streamTreeParallelism = 6
// TreeItem is used to return either an error or the tree for a tree id
type TreeItem struct {
ID
@ -163,7 +162,10 @@ func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees
var loadTreeWg sync.WaitGroup
for i := 0; i < streamTreeParallelism; i++ {
// decoding a tree can take quite some time such that this can be both CPU- or IO-bound
// one extra worker to handle huge tree blobs
workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + 1
for i := 0; i < workerCount; i++ {
workerLoaderChan := loaderChan
if i == 0 {
workerLoaderChan = hugeTreeChan

View file

@ -20,8 +20,6 @@ import (
// con: each worker needs to keep one pack in memory
const (
workerCount = 8
largeFileBlobCount = 25
)
@ -51,6 +49,7 @@ type fileRestorer struct {
idx func(restic.BlobHandle) []restic.PackedBlob
packLoader repository.BackendLoadFn
workerCount int
filesWriter *filesWriter
dst string
@ -61,13 +60,18 @@ type fileRestorer struct {
func newFileRestorer(dst string,
packLoader repository.BackendLoadFn,
key *crypto.Key,
idx func(restic.BlobHandle) []restic.PackedBlob) *fileRestorer {
idx func(restic.BlobHandle) []restic.PackedBlob,
connections uint) *fileRestorer {
// as packs are streamed the concurrency is limited by IO
workerCount := int(connections)
return &fileRestorer{
key: key,
idx: idx,
packLoader: packLoader,
filesWriter: newFilesWriter(workerCount),
workerCount: workerCount,
dst: dst,
Error: restorerAbortOnAllErrors,
}
@ -150,7 +154,7 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error {
}
return nil
}
for i := 0; i < workerCount; i++ {
for i := 0; i < r.workerCount; i++ {
wg.Go(worker)
}

View file

@ -150,7 +150,7 @@ func newTestRepo(content []TestFile) *TestRepo {
func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files map[string]bool) {
repo := newTestRepo(content)
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup)
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2)
if files == nil {
r.files = repo.files
@ -264,7 +264,7 @@ func TestErrorRestoreFiles(t *testing.T) {
return loadError
}
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup)
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2)
r.files = repo.files
err := r.restoreFiles(context.TODO())
@ -304,7 +304,7 @@ func testPartialDownloadError(t *testing.T, part int) {
return loader(ctx, h, length, offset, fn)
}
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup)
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2)
r.files = repo.files
r.Error = func(s string, e error) error {
// ignore errors as in the `restore` command

View file

@ -219,7 +219,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
}
idx := restic.NewHardlinkIndex()
filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup)
filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup, res.repo.Connections())
filerestorer.Error = res.Error
debug.Log("first pass for %q", dst)

View file

@ -76,6 +76,10 @@ func (t TreeMap) LoadTree(ctx context.Context, id restic.ID) (*restic.Tree, erro
return tree, nil
}
func (t TreeMap) Connections() uint {
return 2
}
// checkFunc returns a function suitable for walking the tree to check
// something, and a function which will check the final result.
type checkFunc func(t testing.TB) (walker WalkFunc, final func(testing.TB))