Merge pull request #1741 from restic/improve-archiver

Improve archiver, fix hang on fifos
This commit is contained in:
Alexander Neumann 2018-04-30 16:00:10 +02:00
commit 438719f269
11 changed files with 304 additions and 107 deletions

View file

@ -94,7 +94,7 @@ func init() {
f.BoolVarP(&globalOptions.JSON, "json", "", false, "set output mode to JSON for commands that support it") f.BoolVarP(&globalOptions.JSON, "json", "", false, "set output mode to JSON for commands that support it")
f.StringVar(&globalOptions.CacheDir, "cache-dir", "", "set the cache directory") f.StringVar(&globalOptions.CacheDir, "cache-dir", "", "set the cache directory")
f.BoolVar(&globalOptions.NoCache, "no-cache", false, "do not use a local cache") f.BoolVar(&globalOptions.NoCache, "no-cache", false, "do not use a local cache")
f.StringSliceVar(&globalOptions.CACerts, "cacert", nil, "path to load root certificates from (default: use system certificates)") f.StringSliceVar(&globalOptions.CACerts, "cacert", nil, "`file` to load root certificates from (default: use system certificates)")
f.StringVar(&globalOptions.TLSClientCert, "tls-client-cert", "", "path to a file containing PEM encoded TLS client certificate and private key") f.StringVar(&globalOptions.TLSClientCert, "tls-client-cert", "", "path to a file containing PEM encoded TLS client certificate and private key")
f.BoolVar(&globalOptions.CleanupCache, "cleanup-cache", false, "auto remove old cache directories") f.BoolVar(&globalOptions.CleanupCache, "cleanup-cache", false, "auto remove old cache directories")
f.IntVar(&globalOptions.LimitUploadKb, "limit-upload", 0, "limits uploads to a maximum rate in KiB/s. (default: unlimited)") f.IntVar(&globalOptions.LimitUploadKb, "limit-upload", 0, "limits uploads to a maximum rate in KiB/s. (default: unlimited)")
@ -355,7 +355,11 @@ func OpenRepository(opts GlobalOptions) (*repository.Repository, error) {
} }
if stdoutIsTerminal() { if stdoutIsTerminal() {
Verbosef("password is correct\n") id := s.Config().ID
if len(id) > 8 {
id = id[:8]
}
Verbosef("repository %v opened successfully, password is correct\n", id)
} }
if opts.NoCache { if opts.NoCache {

View file

@ -1,4 +1,4 @@
// +build debug // +build debug profile
package main package main
@ -15,19 +15,21 @@ import (
) )
var ( var (
listenMemoryProfile string listenProfile string
memProfilePath string memProfilePath string
cpuProfilePath string cpuProfilePath string
traceProfilePath string traceProfilePath string
blockProfilePath string
insecure bool insecure bool
) )
func init() { func init() {
f := cmdRoot.PersistentFlags() f := cmdRoot.PersistentFlags()
f.StringVar(&listenMemoryProfile, "listen-profile", "", "listen on this `address:port` for memory profiling") f.StringVar(&listenProfile, "listen-profile", "", "listen on this `address:port` for memory profiling")
f.StringVar(&memProfilePath, "mem-profile", "", "write memory profile to `dir`") f.StringVar(&memProfilePath, "mem-profile", "", "write memory profile to `dir`")
f.StringVar(&cpuProfilePath, "cpu-profile", "", "write cpu profile to `dir`") f.StringVar(&cpuProfilePath, "cpu-profile", "", "write cpu profile to `dir`")
f.StringVar(&traceProfilePath, "trace-profile", "", "write trace to `dir`") f.StringVar(&traceProfilePath, "trace-profile", "", "write trace to `dir`")
f.StringVar(&blockProfilePath, "block-profile", "", "write block profile to `dir`")
f.BoolVar(&insecure, "insecure-kdf", false, "use insecure KDF settings") f.BoolVar(&insecure, "insecure-kdf", false, "use insecure KDF settings")
} }
@ -38,12 +40,12 @@ func (fakeTestingTB) Logf(msg string, args ...interface{}) {
} }
func runDebug() error { func runDebug() error {
if listenMemoryProfile != "" { if listenProfile != "" {
fmt.Fprintf(os.Stderr, "running memory profile HTTP server on %v\n", listenMemoryProfile) fmt.Fprintf(os.Stderr, "running profile HTTP server on %v\n", listenProfile)
go func() { go func() {
err := http.ListenAndServe(listenMemoryProfile, nil) err := http.ListenAndServe(listenProfile, nil)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "memory profile listen failed: %v\n", err) fmt.Fprintf(os.Stderr, "profile HTTP server listen failed: %v\n", err)
} }
}() }()
} }
@ -58,9 +60,12 @@ func runDebug() error {
if traceProfilePath != "" { if traceProfilePath != "" {
profilesEnabled++ profilesEnabled++
} }
if blockProfilePath != "" {
profilesEnabled++
}
if profilesEnabled > 1 { if profilesEnabled > 1 {
return errors.Fatal("only one profile (memory or CPU) may be activated at the same time") return errors.Fatal("only one profile (memory, CPU, trace, or block) may be activated at the same time")
} }
var prof interface { var prof interface {
@ -73,6 +78,8 @@ func runDebug() error {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.CPUProfile, profile.ProfilePath(cpuProfilePath)) prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.CPUProfile, profile.ProfilePath(cpuProfilePath))
} else if traceProfilePath != "" { } else if traceProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.TraceProfile, profile.ProfilePath(traceProfilePath)) prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.TraceProfile, profile.ProfilePath(traceProfilePath))
} else if blockProfilePath != "" {
prof = profile.Start(profile.Quiet, profile.NoShutdownHook, profile.BlockProfile, profile.ProfilePath(blockProfilePath))
} }
if prof != nil { if prof != nil {

View file

@ -1,4 +1,4 @@
// +build !debug // +build !debug,!profile
package main package main

View file

@ -50,6 +50,7 @@ type Archiver struct {
blobSaver *BlobSaver blobSaver *BlobSaver
fileSaver *FileSaver fileSaver *FileSaver
treeSaver *TreeSaver
// Error is called for all errors that occur during backup. // Error is called for all errors that occur during backup.
Error ErrorFunc Error ErrorFunc
@ -86,6 +87,10 @@ type Options struct {
// concurrently. If it's set to zero, the default is the number of CPUs // concurrently. If it's set to zero, the default is the number of CPUs
// available in the system. // available in the system.
SaveBlobConcurrency uint SaveBlobConcurrency uint
// SaveTreeConcurrency sets how many trees are marshalled and saved to the
// repo concurrently.
SaveTreeConcurrency uint
} }
// ApplyDefaults returns a copy of o with the default options set for all unset // ApplyDefaults returns a copy of o with the default options set for all unset
@ -102,6 +107,12 @@ func (o Options) ApplyDefaults() Options {
o.SaveBlobConcurrency = uint(runtime.NumCPU()) o.SaveBlobConcurrency = uint(runtime.NumCPU())
} }
if o.SaveTreeConcurrency == 0 {
// use a relatively high concurrency here, having multiple SaveTree
// workers is cheap
o.SaveTreeConcurrency = o.SaveBlobConcurrency * 20
}
return o return o
} }
@ -172,7 +183,7 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID,
// adds a newline after each object) // adds a newline after each object)
buf = append(buf, '\n') buf = append(buf, '\n')
b := Buffer{Data: buf} b := &Buffer{Data: buf}
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b) res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
if res.Err() != nil { if res.Err() != nil {
return restic.ID{}, s, res.Err() return restic.ID{}, s, res.Err()
@ -212,24 +223,20 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) *resti
// SaveDir stores a directory in the repo and returns the node. snPath is the // SaveDir stores a directory in the repo and returns the node. snPath is the
// path within the current snapshot. // path within the current snapshot.
func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (*restic.Node, ItemStats, error) { func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo, dir string, previous *restic.Tree) (d FutureTree, err error) {
debug.Log("%v %v", snPath, dir) debug.Log("%v %v", snPath, dir)
var s ItemStats
treeNode, err := arch.nodeFromFileInfo(dir, fi) treeNode, err := arch.nodeFromFileInfo(dir, fi)
if err != nil { if err != nil {
return nil, s, err return FutureTree{}, err
} }
names, err := readdirnames(arch.FS, dir) names, err := readdirnames(arch.FS, dir)
if err != nil { if err != nil {
return nil, s, err return FutureTree{}, err
} }
var futures []FutureNode nodes := make([]FutureNode, 0, len(names))
tree := restic.NewTree()
for _, name := range names { for _, name := range names {
pathname := arch.FS.Join(dir, name) pathname := arch.FS.Join(dir, name)
@ -245,54 +252,22 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo
continue continue
} }
return nil, s, err return FutureTree{}, err
} }
if excluded { if excluded {
continue continue
} }
futures = append(futures, fn) nodes = append(nodes, fn)
} }
for _, fn := range futures { ft := arch.treeSaver.Save(ctx, snPath, treeNode, nodes)
fn.wait()
// return the error if it wasn't ignored return ft, nil
if fn.err != nil {
fn.err = arch.error(fn.target, fn.fi, fn.err)
if fn.err == nil {
// ignore error
continue
}
return nil, s, fn.err
}
// when the error is ignored, the node could not be saved, so ignore it
if fn.node == nil {
debug.Log("%v excluded: %v", fn.snPath, fn.target)
continue
}
err := tree.Insert(fn.node)
if err != nil {
return nil, s, err
}
}
id, treeStats, err := arch.saveTree(ctx, tree)
if err != nil {
return nil, ItemStats{}, err
}
s.Add(treeStats)
treeNode.Subtree = &id
return treeNode, s, nil
} }
// FutureNode holds a reference to a node or a FutureFile. // FutureNode holds a reference to a node, FutureFile, or FutureTree.
type FutureNode struct { type FutureNode struct {
snPath, target string snPath, target string
@ -306,14 +281,31 @@ type FutureNode struct {
isFile bool isFile bool
file FutureFile file FutureFile
isDir bool
dir FutureTree
} }
func (fn *FutureNode) wait() { func (fn *FutureNode) wait(ctx context.Context) {
if fn.isFile { switch {
case fn.isFile:
// wait for and collect the data for the file // wait for and collect the data for the file
fn.node = fn.file.Node() fn.node = fn.file.Node()
fn.err = fn.file.Err() fn.err = fn.file.Err()
fn.stats = fn.file.Stats() fn.stats = fn.file.Stats()
// ensure the other stuff can be garbage-collected
fn.file = FutureFile{}
fn.isFile = false
case fn.isDir:
// wait for and collect the data for the dir
fn.node = fn.dir.Node()
fn.err = fn.dir.Err()
fn.stats = fn.dir.Stats()
// ensure the other stuff can be garbage-collected
fn.dir = FutureTree{}
fn.isDir = false
} }
} }
@ -324,6 +316,8 @@ func (fn *FutureNode) wait() {
// //
// snPath is the path within the current snapshot. // snPath is the path within the current snapshot.
func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) { func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous *restic.Node) (fn FutureNode, excluded bool, err error) {
start := time.Now()
fn = FutureNode{ fn = FutureNode{
snPath: snPath, snPath: snPath,
target: target, target: target,
@ -340,7 +334,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
var fi os.FileInfo var fi os.FileInfo
var errFI error var errFI error
file, errOpen := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW, 0) file, errOpen := arch.FS.OpenFile(target, fs.O_RDONLY|fs.O_NOFOLLOW|fs.O_NONBLOCK, 0)
if errOpen == nil { if errOpen == nil {
fi, errFI = file.Stat() fi, errFI = file.Stat()
} }
@ -400,7 +394,9 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
snItem := snPath + "/" snItem := snPath + "/"
start := time.Now() start := time.Now()
oldSubtree := arch.loadSubtree(ctx, previous) oldSubtree := arch.loadSubtree(ctx, previous)
fn.node, fn.stats, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree)
fn.isDir = true
fn.dir, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree)
if err == nil { if err == nil {
arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start)) arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start))
} else { } else {
@ -429,6 +425,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
} }
} }
debug.Log("return after %.3f", time.Since(start).Seconds())
return fn, false, nil return fn, false, nil
} }
@ -564,9 +562,11 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
arch.CompleteItem(snItem, oldNode, node, nodeStats, time.Since(start)) arch.CompleteItem(snItem, oldNode, node, nodeStats, time.Since(start))
} }
debug.Log("waiting on %d nodes", len(futureNodes))
// process all futures // process all futures
for name, fn := range futureNodes { for name, fn := range futureNodes {
fn.wait() fn.wait(ctx)
// return the error, or ignore it // return the error, or ignore it
if fn.err != nil { if fn.err != nil {
@ -720,10 +720,16 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID)
// runWorkers starts the worker pools, which are stopped when the context is cancelled. // runWorkers starts the worker pools, which are stopped when the context is cancelled.
func (arch *Archiver) runWorkers(ctx context.Context) { func (arch *Archiver) runWorkers(ctx context.Context) {
arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency) arch.blobSaver = NewBlobSaver(ctx, arch.Repo, arch.Options.SaveBlobConcurrency)
arch.fileSaver = NewFileSaver(ctx, arch.FS, arch.blobSaver, arch.Repo.Config().ChunkerPolynomial, arch.Options.FileReadConcurrency)
arch.fileSaver.CompleteBlob = arch.CompleteBlob
arch.fileSaver = NewFileSaver(ctx,
arch.FS,
arch.blobSaver,
arch.Repo.Config().ChunkerPolynomial,
arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency)
arch.fileSaver.CompleteBlob = arch.CompleteBlob
arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo
arch.treeSaver = NewTreeSaver(ctx, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error)
} }
// Snapshot saves several targets and returns a snapshot. // Snapshot saves several targets and returns a snapshot.

View file

@ -608,7 +608,12 @@ func TestArchiverSaveDir(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
node, stats, err := arch.SaveDir(ctx, "/", fi, test.target, nil) ft, err := arch.SaveDir(ctx, "/", fi, test.target, nil)
if err != nil {
t.Fatal(err)
}
node, stats, err := ft.Node(), ft.Stats(), ft.Err()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -681,7 +686,12 @@ func TestArchiverSaveDirIncremental(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
node, stats, err := arch.SaveDir(ctx, "/", fi, tempdir, nil) ft, err := arch.SaveDir(ctx, "/", fi, tempdir, nil)
if err != nil {
t.Fatal(err)
}
node, stats, err := ft.Node(), ft.Stats(), ft.Err()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -27,7 +27,7 @@ type BlobSaver struct {
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped // NewBlobSaver returns a new blob. A worker pool is started, it is stopped
// when ctx is cancelled. // when ctx is cancelled.
func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver { func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
ch := make(chan saveBlobJob, 2*int(workers)) ch := make(chan saveBlobJob)
s := &BlobSaver{ s := &BlobSaver{
repo: repo, repo: repo,
knownBlobs: restic.NewBlobSet(), knownBlobs: restic.NewBlobSet(),
@ -45,7 +45,7 @@ func NewBlobSaver(ctx context.Context, repo Saver, workers uint) *BlobSaver {
// Save stores a blob in the repo. It checks the index and the known blobs // Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. The second return parameter is true if the blob was // before saving anything. The second return parameter is true if the blob was
// previously unknown. // previously unknown.
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf Buffer) FutureBlob { func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan saveBlobResponse, 1) ch := make(chan saveBlobResponse, 1)
s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch} s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}
@ -91,7 +91,7 @@ func (s *FutureBlob) Length() int {
type saveBlobJob struct { type saveBlobJob struct {
restic.BlobType restic.BlobType
buf Buffer buf *Buffer
ch chan<- saveBlobResponse ch chan<- saveBlobResponse
} }

View file

@ -9,19 +9,19 @@ import (
// be called so the underlying slice is put back into the pool. // be called so the underlying slice is put back into the pool.
type Buffer struct { type Buffer struct {
Data []byte Data []byte
Put func([]byte) Put func(*Buffer)
} }
// Release puts the buffer back into the pool it came from. // Release puts the buffer back into the pool it came from.
func (b Buffer) Release() { func (b *Buffer) Release() {
if b.Put != nil { if b.Put != nil {
b.Put(b.Data) b.Put(b)
} }
} }
// BufferPool implements a limited set of reusable buffers. // BufferPool implements a limited set of reusable buffers.
type BufferPool struct { type BufferPool struct {
ch chan []byte ch chan *Buffer
chM sync.Mutex chM sync.Mutex
defaultSize int defaultSize int
clearOnce sync.Once clearOnce sync.Once
@ -33,7 +33,7 @@ type BufferPool struct {
// back. // back.
func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool { func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool {
b := &BufferPool{ b := &BufferPool{
ch: make(chan []byte, max), ch: make(chan *Buffer, max),
defaultSize: defaultSize, defaultSize: defaultSize,
} }
go func() { go func() {
@ -44,22 +44,29 @@ func NewBufferPool(ctx context.Context, max int, defaultSize int) *BufferPool {
} }
// Get returns a new buffer, either from the pool or newly allocated. // Get returns a new buffer, either from the pool or newly allocated.
func (pool *BufferPool) Get() Buffer { func (pool *BufferPool) Get() *Buffer {
b := Buffer{Put: pool.put}
pool.chM.Lock() pool.chM.Lock()
defer pool.chM.Unlock() defer pool.chM.Unlock()
select { select {
case buf := <-pool.ch: case buf := <-pool.ch:
b.Data = buf return buf
default: default:
b.Data = make([]byte, pool.defaultSize) }
b := &Buffer{
Put: pool.Put,
Data: make([]byte, pool.defaultSize),
} }
return b return b
} }
func (pool *BufferPool) put(b []byte) { // Put returns a buffer to the pool for reuse.
func (pool *BufferPool) Put(b *Buffer) {
if cap(b.Data) > pool.defaultSize {
return
}
pool.chM.Lock() pool.chM.Lock()
defer pool.chM.Unlock() defer pool.chM.Unlock()
select { select {
@ -68,14 +75,6 @@ func (pool *BufferPool) put(b []byte) {
} }
} }
// Put returns a buffer to the pool for reuse.
func (pool *BufferPool) Put(b Buffer) {
if cap(b.Data) > pool.defaultSize {
return
}
pool.put(b.Data)
}
// clear empties the buffer so that all items can be garbage collected. // clear empties the buffer so that all items can be garbage collected.
func (pool *BufferPool) clear() { func (pool *BufferPool) clear() {
pool.clearOnce.Do(func() { pool.clearOnce.Do(func() {

View file

@ -13,7 +13,7 @@ import (
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
) )
// FutureFile is returned by SaveFile and will return the data once it // FutureFile is returned by Save and will return the data once it
// has been processed. // has been processed.
type FutureFile struct { type FutureFile struct {
ch <-chan saveFileResponse ch <-chan saveFileResponse
@ -61,22 +61,26 @@ type FileSaver struct {
NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error) NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error)
} }
// NewFileSaver returns a new file saver. A worker pool with workers is // NewFileSaver returns a new file saver. A worker pool with fileWorkers is
// started, it is stopped when ctx is cancelled. // started, it is stopped when ctx is cancelled.
func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, workers uint) *FileSaver { func NewFileSaver(ctx context.Context, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
ch := make(chan saveFileJob, workers) ch := make(chan saveFileJob)
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
poolSize := fileWorkers + blobWorkers
s := &FileSaver{ s := &FileSaver{
fs: fs, fs: fs,
blobSaver: blobSaver, blobSaver: blobSaver,
saveFilePool: NewBufferPool(ctx, 3*int(workers), chunker.MaxSize/4), saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize),
pol: pol, pol: pol,
ch: ch, ch: ch,
CompleteBlob: func(string, uint64) {}, CompleteBlob: func(string, uint64) {},
} }
for i := uint(0); i < workers; i++ { for i := uint(0); i < fileWorkers; i++ {
s.wg.Add(1) s.wg.Add(1)
go s.worker(ctx, &s.wg, ch) go s.worker(ctx, &s.wg, ch)
} }
@ -151,6 +155,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
buf.Release() buf.Release()
break break
} }
buf.Data = chunk.Data buf.Data = chunk.Data
size += uint64(chunk.Length) size += uint64(chunk.Length)

View file

@ -0,0 +1,158 @@
package archiver
import (
"context"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic"
)
// FutureTree is returned by Save and will return the data once it
// has been processed.
type FutureTree struct {
ch <-chan saveTreeResponse
res saveTreeResponse
}
func (s *FutureTree) wait() {
res, ok := <-s.ch
if ok {
s.res = res
}
}
// Node returns the node once it is available.
func (s *FutureTree) Node() *restic.Node {
s.wait()
return s.res.node
}
// Stats returns the stats for the file once they are available.
func (s *FutureTree) Stats() ItemStats {
s.wait()
return s.res.stats
}
// Err returns the error in case an error occurred.
func (s *FutureTree) Err() error {
s.wait()
return s.res.err
}
// TreeSaver concurrently saves incoming trees to the repo.
type TreeSaver struct {
saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error)
errFn ErrorFunc
ch chan<- saveTreeJob
wg sync.WaitGroup
}
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
// started, it is stopped when ctx is cancelled.
func NewTreeSaver(ctx context.Context, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
ch := make(chan saveTreeJob)
s := &TreeSaver{
ch: ch,
saveTree: saveTree,
errFn: errFn,
}
for i := uint(0); i < treeWorkers; i++ {
s.wg.Add(1)
go s.worker(ctx, &s.wg, ch)
}
return s
}
// Save stores the dir d and returns the data once it has been completed.
func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) FutureTree {
ch := make(chan saveTreeResponse, 1)
s.ch <- saveTreeJob{
snPath: snPath,
node: node,
nodes: nodes,
ch: ch,
}
return FutureTree{ch: ch}
}
type saveTreeJob struct {
snPath string
nodes []FutureNode
node *restic.Node
ch chan<- saveTreeResponse
}
type saveTreeResponse struct {
node *restic.Node
stats ItemStats
err error
}
// save stores the nodes as a tree in the repo.
func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode) (*restic.Node, ItemStats, error) {
var stats ItemStats
tree := restic.NewTree()
for _, fn := range nodes {
fn.wait(ctx)
// return the error if it wasn't ignored
if fn.err != nil {
debug.Log("err for %v: %v", fn.node.Name, fn.err)
fn.err = s.errFn(fn.target, fn.fi, fn.err)
if fn.err == nil {
// ignore error
continue
}
return nil, stats, fn.err
}
// when the error is ignored, the node could not be saved, so ignore it
if fn.node == nil {
debug.Log("%v excluded: %v", fn.snPath, fn.target)
continue
}
debug.Log("insert %v", fn.node.Name)
err := tree.Insert(fn.node)
if err != nil {
return nil, stats, err
}
}
id, treeStats, err := s.saveTree(ctx, tree)
stats.Add(treeStats)
if err != nil {
return nil, stats, err
}
node.Subtree = &id
return node, stats, nil
}
func (s *TreeSaver) worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan saveTreeJob) {
defer wg.Done()
for {
var job saveTreeJob
select {
case <-ctx.Done():
return
case job = <-jobs:
}
node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes)
job.ch <- saveTreeResponse{
node: node,
stats: stats,
err: err,
}
close(job.ch)
}
}

View file

@ -13,4 +13,5 @@ const (
O_EXCL int = syscall.O_EXCL // used with O_CREATE, file must not exist O_EXCL int = syscall.O_EXCL // used with O_CREATE, file must not exist
O_SYNC int = syscall.O_SYNC // open for synchronous I/O. O_SYNC int = syscall.O_SYNC // open for synchronous I/O.
O_TRUNC int = syscall.O_TRUNC // if possible, truncate file when opened. O_TRUNC int = syscall.O_TRUNC // if possible, truncate file when opened.
O_NONBLOCK int = syscall.O_NONBLOCK // don't block open on fifos etc.
) )

View file

@ -40,6 +40,7 @@ type Backup struct {
processedCh chan counter processedCh chan counter
errCh chan struct{} errCh chan struct{}
workerCh chan fileWorkerMessage workerCh chan fileWorkerMessage
clearStatus chan struct{}
summary struct { summary struct {
sync.Mutex sync.Mutex
@ -68,6 +69,7 @@ func NewBackup(term *termstatus.Terminal, verbosity uint) *Backup {
processedCh: make(chan counter), processedCh: make(chan counter),
errCh: make(chan struct{}), errCh: make(chan struct{}),
workerCh: make(chan fileWorkerMessage), workerCh: make(chan fileWorkerMessage),
clearStatus: make(chan struct{}),
} }
} }
@ -90,6 +92,9 @@ func (b *Backup) Run(ctx context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
case <-b.clearStatus:
started = false
b.term.SetStatus([]string{""})
case t, ok := <-b.totalCh: case t, ok := <-b.totalCh:
if ok { if ok {
total = t total = t
@ -147,7 +152,7 @@ func (b *Backup) update(total, processed counter, errors uint, currentFiles map[
} else { } else {
var eta string var eta string
if secs > 0 { if secs > 0 && processed.Bytes < total.Bytes {
eta = fmt.Sprintf(" ETA %s", formatSeconds(secs)) eta = fmt.Sprintf(" ETA %s", formatSeconds(secs))
} }
@ -332,6 +337,8 @@ func (b *Backup) ReportTotal(item string, s archiver.ScanStats) {
// Finish prints the finishing messages. // Finish prints the finishing messages.
func (b *Backup) Finish() { func (b *Backup) Finish() {
b.clearStatus <- struct{}{}
b.V("processed %s in %s", formatBytes(b.totalBytes), formatDuration(time.Since(b.start))) b.V("processed %s in %s", formatBytes(b.totalBytes), formatDuration(time.Since(b.start)))
b.V("\n") b.V("\n")
b.V("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged) b.V("Files: %5d new, %5d changed, %5d unmodified\n", b.summary.Files.New, b.summary.Files.Changed, b.summary.Files.Unchanged)