Limit number of large tree blobs loaded in parallel by StreamTrees
Load tree blobs with more than 50MB only from a single goroutine. Very large tree blobs with for example 400 MB size can otherwise require roughly 1GB * streamTreeParallelism memory.
This commit is contained in:
parent
ad4f4dbc7a
commit
254c8743fc
4 changed files with 28 additions and 7 deletions
|
@ -11,6 +11,7 @@ import (
|
|||
// TreeLoader loads a tree from a repository.
|
||||
type TreeLoader interface {
|
||||
LoadTree(context.Context, ID) (*Tree, error)
|
||||
LookupBlobSize(id ID, tpe BlobType) (uint, bool)
|
||||
}
|
||||
|
||||
// FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data
|
||||
|
|
|
@ -166,6 +166,10 @@ func (r ForbiddenRepo) LoadTree(ctx context.Context, id restic.ID) (*restic.Tree
|
|||
return nil, errors.New("should not be called")
|
||||
}
|
||||
|
||||
func (r ForbiddenRepo) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bool) {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
func TestFindUsedBlobsSkipsSeenBlobs(t *testing.T) {
|
||||
repo, cleanup := repository.TestRepository(t)
|
||||
defer cleanup()
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const streamTreeParallelism = 5
|
||||
const streamTreeParallelism = 6
|
||||
|
||||
// TreeItem is used to return either an error or the tree for a tree id
|
||||
type TreeItem struct {
|
||||
|
@ -46,7 +46,7 @@ func loadTreeWorker(ctx context.Context, repo TreeLoader,
|
|||
}
|
||||
}
|
||||
|
||||
func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID,
|
||||
func filterTrees(ctx context.Context, repo TreeLoader, trees IDs, loaderChan chan<- trackedID, hugeTreeLoaderChan chan<- trackedID,
|
||||
in <-chan trackedTreeItem, out chan<- TreeItem, skip func(tree ID) bool, p *progress.Counter) {
|
||||
|
||||
var (
|
||||
|
@ -78,7 +78,12 @@ func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID,
|
|||
continue
|
||||
}
|
||||
|
||||
loadCh = loaderChan
|
||||
treeSize, found := repo.LookupBlobSize(nextTreeID.ID, TreeBlob)
|
||||
if found && treeSize > 50*1024*1024 {
|
||||
loadCh = hugeTreeLoaderChan
|
||||
} else {
|
||||
loadCh = loaderChan
|
||||
}
|
||||
}
|
||||
|
||||
if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 {
|
||||
|
@ -152,16 +157,21 @@ func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID,
|
|||
// on the errgroup until all goroutines were stopped.
|
||||
func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees IDs, skip func(tree ID) bool, p *progress.Counter) <-chan TreeItem {
|
||||
loaderChan := make(chan trackedID)
|
||||
hugeTreeChan := make(chan trackedID, 10)
|
||||
loadedTreeChan := make(chan trackedTreeItem)
|
||||
treeStream := make(chan TreeItem)
|
||||
|
||||
var loadTreeWg sync.WaitGroup
|
||||
|
||||
for i := 0; i < streamTreeParallelism; i++ {
|
||||
workerLoaderChan := loaderChan
|
||||
if i == 0 {
|
||||
workerLoaderChan = hugeTreeChan
|
||||
}
|
||||
loadTreeWg.Add(1)
|
||||
wg.Go(func() error {
|
||||
defer loadTreeWg.Done()
|
||||
loadTreeWorker(ctx, repo, loaderChan, loadedTreeChan)
|
||||
loadTreeWorker(ctx, repo, workerLoaderChan, loadedTreeChan)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -175,8 +185,9 @@ func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees
|
|||
|
||||
wg.Go(func() error {
|
||||
defer close(loaderChan)
|
||||
defer close(hugeTreeChan)
|
||||
defer close(treeStream)
|
||||
filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream, skip, p)
|
||||
filterTrees(ctx, repo, trees, loaderChan, hugeTreeChan, loadedTreeChan, treeStream, skip, p)
|
||||
return nil
|
||||
})
|
||||
return treeStream
|
||||
|
|
|
@ -10,6 +10,11 @@ import (
|
|||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// TreeLoader loads a tree from a repository.
|
||||
type TreeLoader interface {
|
||||
LoadTree(context.Context, restic.ID) (*restic.Tree, error)
|
||||
}
|
||||
|
||||
// ErrSkipNode is returned by WalkFunc when a dir node should not be walked.
|
||||
var ErrSkipNode = errors.New("skip this node")
|
||||
|
||||
|
@ -33,7 +38,7 @@ type WalkFunc func(parentTreeID restic.ID, path string, node *restic.Node, nodeE
|
|||
// Walk calls walkFn recursively for each node in root. If walkFn returns an
|
||||
// error, it is passed up the call stack. The trees in ignoreTrees are not
|
||||
// walked. If walkFn ignores trees, these are added to the set.
|
||||
func Walk(ctx context.Context, repo restic.TreeLoader, root restic.ID, ignoreTrees restic.IDSet, walkFn WalkFunc) error {
|
||||
func Walk(ctx context.Context, repo TreeLoader, root restic.ID, ignoreTrees restic.IDSet, walkFn WalkFunc) error {
|
||||
tree, err := repo.LoadTree(ctx, root)
|
||||
_, err = walkFn(root, "/", nil, err)
|
||||
|
||||
|
@ -55,7 +60,7 @@ func Walk(ctx context.Context, repo restic.TreeLoader, root restic.ID, ignoreTre
|
|||
// walk recursively traverses the tree, ignoring subtrees when the ID of the
|
||||
// subtree is in ignoreTrees. If err is nil and ignore is true, the subtree ID
|
||||
// will be added to ignoreTrees by walk.
|
||||
func walk(ctx context.Context, repo restic.TreeLoader, prefix string, parentTreeID restic.ID, tree *restic.Tree, ignoreTrees restic.IDSet, walkFn WalkFunc) (ignore bool, err error) {
|
||||
func walk(ctx context.Context, repo TreeLoader, prefix string, parentTreeID restic.ID, tree *restic.Tree, ignoreTrees restic.IDSet, walkFn WalkFunc) (ignore bool, err error) {
|
||||
var allNodesIgnored = true
|
||||
|
||||
if len(tree.Nodes) == 0 {
|
||||
|
|
Loading…
Reference in a new issue