archiver: Process dirs concurrently
This commit is contained in:
parent
78bd591c7c
commit
4e34325035
4 changed files with 223 additions and 53 deletions
|
@ -50,6 +50,7 @@ type Archiver struct {
|
|||
|
||||
blobSaver *BlobSaver
|
||||
fileSaver *FileSaver
|
||||
treeSaver *TreeSaver
|
||||
|
||||
// Error is called for all errors that occur during backup.
|
||||
Error ErrorFunc
|
||||
|
@ -86,6 +87,10 @@ type Options struct {
|
|||
// concurrently. If it's set to zero, the default is the number of CPUs
|
||||
// available in the system.
|
||||
SaveBlobConcurrency uint
|
||||
|
||||
// SaveTreeConcurrency sets how many trees are marshalled and saved to the
|
||||
// repo concurrently.
|
||||
SaveTreeConcurrency uint
|
||||
}
|
||||
|
||||
// ApplyDefaults returns a copy of o with the default options set for all unset
|
||||
|
@ -102,6 +107,12 @@ func (o Options) ApplyDefaults() Options {
|
|||
o.SaveBlobConcurrency = uint(runtime.NumCPU())
|
||||
}
|
||||
|
||||
if o.SaveTreeConcurrency == 0 {
|
||||
// use a relatively high concurrency here, having multiple SaveTree
|
||||
// workers is cheap
|
||||
o.SaveTreeConcurrency = o.SaveBlobConcurrency * 20
|
||||
}
|
||||
|
||||
return o
|
||||
}
|
||||
|
||||
|
@ -212,24 +223,20 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) *resti
|
|||
|
||||
// SaveDir stores a directory in the repo and returns the node. snPath is the
|
||||
// path within the current snapshot.
|
||||
func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (*restic.Node, ItemStats, error) {
|
||||
func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (d FutureTree, err error) {
|
||||
debug.Log("%v %v", snPath, dir)
|
||||
|
||||
var s ItemStats
|
||||
|
||||
treeNode, err := arch.nodeFromFileInfo(dir, fi)
|
||||
if err != nil {
|
||||
return nil, s, err
|
||||
return FutureTree{}, err
|
||||
}
|
||||
|
||||
names, err := readdirnames(arch.FS, dir)
|
||||
if err != nil {
|
||||
return nil, s, err
|
||||
return FutureTree{}, err
|
||||
}
|
||||
|
||||
var futures []FutureNode
|
||||
|
||||
tree := restic.NewTree()
|
||||
nodes := make([]FutureNode, 0, len(names))
|
||||
|
||||
for _, name := range names {
|
||||
pathname := arch.FS.Join(dir, name)
|
||||
|
@ -245,54 +252,22 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo
|
|||
continue
|
||||
}
|
||||
|
||||
return nil, s, err
|
||||
return FutureTree{}, err
|
||||
}
|
||||
|
||||
if excluded {
|
||||
continue
|
||||
}
|
||||
|
||||
futures = append(futures, fn)
|
||||
nodes = append(nodes, fn)
|
||||
}
|
||||
|
||||
for _, fn := range futures {
|
||||
fn.wait()
|
||||
ft := arch.treeSaver.Save(ctx, snPath, treeNode, nodes)
|
||||
|
||||
// return the error if it wasn't ignored
|
||||
if fn.err != nil {
|
||||
fn.err = arch.error(fn.target, fn.fi, fn.err)
|
||||
if fn.err == nil {
|
||||
// ignore error
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, s, fn.err
|
||||
}
|
||||
|
||||
// when the error is ignored, the node could not be saved, so ignore it
|
||||
if fn.node == nil {
|
||||
debug.Log("%v excluded: %v", fn.snPath, fn.target)
|
||||
continue
|
||||
}
|
||||
|
||||
err := tree.Insert(fn.node)
|
||||
if err != nil {
|
||||
return nil, s, err
|
||||
}
|
||||
}
|
||||
|
||||
id, treeStats, err := arch.saveTree(ctx, tree)
|
||||
if err != nil {
|
||||
return nil, ItemStats{}, err
|
||||
}
|
||||
|
||||
s.Add(treeStats)
|
||||
|
||||
treeNode.Subtree = &id
|
||||
return treeNode, s, nil
|
||||
return ft, nil
|
||||
}
|
||||
|
||||
// FutureNode holds a reference to a node or a FutureFile.
|
||||
// FutureNode holds a reference to a node, FutureFile, or FutureTree.
|
||||
type FutureNode struct {
|
||||
snPath, target string
|
||||
|
||||
|
@ -306,14 +281,31 @@ type FutureNode struct {
|
|||
|
||||
isFile bool
|
||||
file FutureFile
|
||||
isDir bool
|
||||
dir FutureTree
|
||||
}
|
||||
|
||||
func (fn *FutureNode) wait() {
|
||||
if fn.isFile {
|
||||
func (fn *FutureNode) wait(ctx context.Context) {
|
||||
switch {
|
||||
case fn.isFile:
|
||||
// wait for and collect the data for the file
|
||||
fn.node = fn.file.Node()
|
||||
fn.err = fn.file.Err()
|
||||
fn.stats = fn.file.Stats()
|
||||
|
||||
// ensure the other stuff can be garbage-collected
|
||||
fn.file = FutureFile{}
|
||||
fn.isFile = false
|
||||
|
||||
case fn.isDir:
|
||||
// wait for and collect the data for the dir
|
||||
fn.node = fn.dir.Node()
|
||||
fn.err = fn.dir.Err()
|
||||
fn.stats = fn.dir.Stats()
|
||||
|
||||
// ensure the other stuff can be garbage-collected
|
||||
fn.dir = FutureTree{}
|
||||
fn.isDir = false
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -324,6 +316,8 @@ func (fn *FutureNode) wait() {
|
|||
//
|
||||
// snPath is the path within the current snapshot.
|
||||
func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) {
|
||||
start := time.Now()
|
||||
|
||||
fn = FutureNode{
|
||||
snPath: snPath,
|
||||
target: target,
|
||||
|
@ -400,7 +394,9 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
|
|||
snItem := snPath + "/"
|
||||
start := time.Now()
|
||||
oldSubtree := arch.loadSubtree(ctx, previous)
|
||||
fn.node, fn.stats, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree)
|
||||
|
||||
fn.isDir = true
|
||||
fn.dir, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree)
|
||||
if err == nil {
|
||||
arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start))
|
||||
} else {
|
||||
|
@ -429,6 +425,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
|
|||
}
|
||||
}
|
||||
|
||||
debug.Log("return after %.3f", time.Since(start).Seconds())
|
||||
|
||||
return fn, false, nil
|
||||
}
|
||||
|
||||
|
@ -564,9 +562,11 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
|
|||
arch.CompleteItem(snItem, oldNode, node, nodeStats, time.Since(start))
|
||||
}
|
||||
|
||||
debug.Log("waiting on %d nodes", len(futureNodes))
|
||||
|
||||
// process all futures
|
||||
for name, fn := range futureNodes {
|
||||
fn.wait()
|
||||
fn.wait(ctx)
|
||||
|
||||
// return the error, or ignore it
|
||||
if fn.err != nil {
|
||||
|
@ -720,14 +720,16 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID)
|
|||
// runWorkers starts the worker pools, which are stopped when the context is cancelled.
|
||||
func (arch *Archiver) runWorkers(ctx context.Context) {
|
||||
arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency)
|
||||
|
||||
arch.fileSaver = NewFileSaver(ctx,
|
||||
arch.FS,
|
||||
arch.blobSaver,
|
||||
arch.Repo.Config().ChunkerPolynomial,
|
||||
arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency)
|
||||
arch.fileSaver.CompleteBlob = arch.CompleteBlob
|
||||
|
||||
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
|
||||
|
||||
arch.treeSaver = NewTreeSaver(ctx, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error)
|
||||
}
|
||||
|
||||
// Snapshot saves several targets and returns a snapshot.
|
||||
|
|
|
@ -608,7 +608,12 @@ func TestArchiverSaveDir(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node, stats, err := arch.SaveDir(ctx, "/", fi, test.target, nil)
|
||||
ft, err := arch.SaveDir(ctx, "/", fi, test.target, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node, stats, err := ft.Node(), ft.Stats(), ft.Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -681,7 +686,12 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node, stats, err := arch.SaveDir(ctx, "/", fi, tempdir, nil)
|
||||
ft, err := arch.SaveDir(ctx, "/", fi, tempdir, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node, stats, err := ft.Node(), ft.Stats(), ft.Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ type BlobSaver struct {
|
|||
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
|
||||
// when ctx is cancelled.
|
||||
func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
|
||||
ch := make(chan saveBlobJob, 2*int(workers))
|
||||
ch := make(chan saveBlobJob)
|
||||
s := &BlobSaver{
|
||||
repo: repo,
|
||||
knownBlobs: restic.NewBlobSet(),
|
||||
|
|
158
internal/archiver/tree_saver.go
Normal file
158
internal/archiver/tree_saver.go
Normal file
|
@ -0,0 +1,158 @@
|
|||
package archiver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// FutureTree is returned by Save and will return the data once it
|
||||
// has been processed.
|
||||
type FutureTree struct {
|
||||
ch <-chan saveTreeResponse
|
||||
res saveTreeResponse
|
||||
}
|
||||
|
||||
func (s *FutureTree) wait() {
|
||||
res, ok := <-s.ch
|
||||
if ok {
|
||||
s.res = res
|
||||
}
|
||||
}
|
||||
|
||||
// Node returns the node once it is available.
|
||||
func (s *FutureTree) Node() *restic.Node {
|
||||
s.wait()
|
||||
return s.res.node
|
||||
}
|
||||
|
||||
// Stats returns the stats for the file once they are available.
|
||||
func (s *FutureTree) Stats() ItemStats {
|
||||
s.wait()
|
||||
return s.res.stats
|
||||
}
|
||||
|
||||
// Err returns the error in case an error occurred.
|
||||
func (s *FutureTree) Err() error {
|
||||
s.wait()
|
||||
return s.res.err
|
||||
}
|
||||
|
||||
// TreeSaver concurrently saves incoming trees to the repo.
|
||||
type TreeSaver struct {
|
||||
saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error)
|
||||
errFn ErrorFunc
|
||||
|
||||
ch chan<- saveTreeJob
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
||||
// started, it is stopped when ctx is cancelled.
|
||||
func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
|
||||
ch := make(chan saveTreeJob)
|
||||
|
||||
s := &TreeSaver{
|
||||
ch: ch,
|
||||
saveTree: saveTree,
|
||||
errFn: errFn,
|
||||
}
|
||||
|
||||
for i := uint(0); i < treeWorkers; i++ {
|
||||
s.wg.Add(1)
|
||||
go s.worker(ctx, &s.wg, ch)
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Save stores the dir d and returns the data once it has been completed.
|
||||
func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) FutureTree {
|
||||
ch := make(chan saveTreeResponse, 1)
|
||||
s.ch <- saveTreeJob{
|
||||
snPath: snPath,
|
||||
node: node,
|
||||
nodes: nodes,
|
||||
ch: ch,
|
||||
}
|
||||
|
||||
return FutureTree{ch: ch}
|
||||
}
|
||||
|
||||
type saveTreeJob struct {
|
||||
snPath string
|
||||
nodes []FutureNode
|
||||
node *restic.Node
|
||||
ch chan<- saveTreeResponse
|
||||
}
|
||||
|
||||
type saveTreeResponse struct {
|
||||
node *restic.Node
|
||||
stats ItemStats
|
||||
err error
|
||||
}
|
||||
|
||||
// save stores the nodes as a tree in the repo.
|
||||
func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) (*restic.Node, ItemStats, error) {
|
||||
var stats ItemStats
|
||||
|
||||
tree := restic.NewTree()
|
||||
for _, fn := range nodes {
|
||||
fn.wait(ctx)
|
||||
|
||||
// return the error if it wasn't ignored
|
||||
if fn.err != nil {
|
||||
debug.Log("err for %v: %v", fn.node.Name, fn.err)
|
||||
fn.err = s.errFn(fn.target, fn.fi, fn.err)
|
||||
if fn.err == nil {
|
||||
// ignore error
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, stats, fn.err
|
||||
}
|
||||
|
||||
// when the error is ignored, the node could not be saved, so ignore it
|
||||
if fn.node == nil {
|
||||
debug.Log("%v excluded: %v", fn.snPath, fn.target)
|
||||
continue
|
||||
}
|
||||
|
||||
debug.Log("insert %v", fn.node.Name)
|
||||
err := tree.Insert(fn.node)
|
||||
if err != nil {
|
||||
return nil, stats, err
|
||||
}
|
||||
}
|
||||
|
||||
id, treeStats, err := s.saveTree(ctx, tree)
|
||||
stats.Add(treeStats)
|
||||
if err != nil {
|
||||
return nil, stats, err
|
||||
}
|
||||
|
||||
node.Subtree = &id
|
||||
return node, stats, nil
|
||||
}
|
||||
|
||||
func (s *TreeSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveTreeJob) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
var job saveTreeJob
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case job = <-jobs:
|
||||
}
|
||||
|
||||
node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes)
|
||||
job.ch <- saveTreeResponse{
|
||||
node: node,
|
||||
stats: stats,
|
||||
err: err,
|
||||
}
|
||||
close(job.ch)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue