restic: Actually parallelize FindUsedBlobs
This commit is contained in:
parent
6e03f80ca2
commit
f2a1b125cb
1 changed files with 35 additions and 22 deletions
|
@ -1,6 +1,11 @@
|
|||
package restic
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// TreeLoader loads a tree from a repository.
|
||||
type TreeLoader interface {
|
||||
|
@ -10,30 +15,38 @@ type TreeLoader interface {
|
|||
// FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data
|
||||
// blobs) to the set blobs. Already seen tree blobs will not be visited again.
|
||||
func FindUsedBlobs(ctx context.Context, repo TreeLoader, treeID ID, blobs BlobSet) error {
|
||||
h := BlobHandle{ID: treeID, Type: TreeBlob}
|
||||
if blobs.Has(h) {
|
||||
return nil
|
||||
}
|
||||
blobs.Insert(h)
|
||||
var lock sync.Mutex
|
||||
|
||||
tree, err := repo.LoadTree(ctx, treeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
treeStream := StreamTrees(ctx, wg, repo, IDs{treeID}, func(treeID ID) bool {
|
||||
// locking is necessary the goroutine below concurrently adds data blobs
|
||||
lock.Lock()
|
||||
h := BlobHandle{ID: treeID, Type: TreeBlob}
|
||||
blobReferenced := blobs.Has(h)
|
||||
// noop if already referenced
|
||||
blobs.Insert(h)
|
||||
lock.Unlock()
|
||||
return blobReferenced
|
||||
})
|
||||
|
||||
for _, node := range tree.Nodes {
|
||||
switch node.Type {
|
||||
case "file":
|
||||
for _, blob := range node.Content {
|
||||
blobs.Insert(BlobHandle{ID: blob, Type: DataBlob})
|
||||
wg.Go(func() error {
|
||||
for tree := range treeStream {
|
||||
if tree.Error != nil {
|
||||
return tree.Error
|
||||
}
|
||||
case "dir":
|
||||
err := FindUsedBlobs(ctx, repo, *node.Subtree, blobs)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
lock.Lock()
|
||||
for _, node := range tree.Nodes {
|
||||
switch node.Type {
|
||||
case "file":
|
||||
for _, blob := range node.Content {
|
||||
blobs.Insert(BlobHandle{ID: blob, Type: DataBlob})
|
||||
}
|
||||
}
|
||||
}
|
||||
lock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil
|
||||
})
|
||||
return wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue