forked from TrueCloudLab/restic
Merge pull request #3955 from MichaelEischer/async-futurefile-completion
Improve archiver performance for small files
This commit is contained in:
commit
3499c6354e
10 changed files with 195 additions and 149 deletions
9
changelog/unreleased/pull-3955
Normal file
9
changelog/unreleased/pull-3955
Normal file
|
@ -0,0 +1,9 @@
|
|||
Enhancement: Improve backup performance for small files
|
||||
|
||||
When backing up small files restic was slower than it could be. In particular
|
||||
this affected backups using maximum compression.
|
||||
|
||||
This has been fixed by reworking the internal parallelism of the backup
|
||||
command.
|
||||
|
||||
https://github.com/restic/restic/issues/3955
|
|
@ -68,6 +68,9 @@ type Archiver struct {
|
|||
// be in the snapshot after saving. s contains some statistics about this
|
||||
// particular file/dir.
|
||||
//
|
||||
// Once reading a file has completed successfully (but not saving it yet),
|
||||
// CompleteItem will be called with current == nil.
|
||||
//
|
||||
// CompleteItem may be called asynchronously from several different
|
||||
// goroutines!
|
||||
CompleteItem func(item string, previous, current *restic.Node, s ItemStats, d time.Duration)
|
||||
|
@ -431,6 +434,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous
|
|||
// Save will close the file, we don't need to do that
|
||||
fn = arch.fileSaver.Save(ctx, snPath, target, file, fi, func() {
|
||||
arch.StartFile(snPath)
|
||||
}, func() {
|
||||
arch.CompleteItem(snPath, nil, nil, ItemStats{}, 0)
|
||||
}, func(node *restic.Node, stats ItemStats) {
|
||||
arch.CompleteItem(snPath, previous, node, stats, time.Since(start))
|
||||
})
|
||||
|
|
|
@ -53,6 +53,8 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
|
|||
}
|
||||
|
||||
var (
|
||||
completeReadingCallback bool
|
||||
|
||||
completeCallbackNode *restic.Node
|
||||
completeCallbackStats ItemStats
|
||||
completeCallback bool
|
||||
|
@ -60,6 +62,13 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
|
|||
startCallback bool
|
||||
)
|
||||
|
||||
completeReading := func() {
|
||||
completeReadingCallback = true
|
||||
if completeCallback {
|
||||
t.Error("callbacks called in wrong order")
|
||||
}
|
||||
}
|
||||
|
||||
complete := func(node *restic.Node, stats ItemStats) {
|
||||
completeCallback = true
|
||||
completeCallbackNode = node
|
||||
|
@ -80,7 +89,7 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, complete)
|
||||
res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, completeReading, complete)
|
||||
|
||||
fnr := res.take(ctx)
|
||||
if fnr.err != nil {
|
||||
|
@ -101,6 +110,10 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem
|
|||
t.Errorf("start callback did not happen")
|
||||
}
|
||||
|
||||
if !completeReadingCallback {
|
||||
t.Errorf("completeReading callback did not happen")
|
||||
}
|
||||
|
||||
if !completeCallback {
|
||||
t.Errorf("complete callback did not happen")
|
||||
}
|
||||
|
|
|
@ -43,51 +43,18 @@ func (s *BlobSaver) TriggerShutdown() {
|
|||
|
||||
// Save stores a blob in the repo. It checks the index and the known blobs
|
||||
// before saving anything. It takes ownership of the buffer passed in.
|
||||
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
|
||||
ch := make(chan SaveBlobResponse, 1)
|
||||
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) {
|
||||
select {
|
||||
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
|
||||
case s.ch <- saveBlobJob{BlobType: t, buf: buf, cb: cb}:
|
||||
case <-ctx.Done():
|
||||
debug.Log("not sending job, context is cancelled")
|
||||
close(ch)
|
||||
return FutureBlob{ch: ch}
|
||||
}
|
||||
|
||||
return FutureBlob{ch: ch}
|
||||
}
|
||||
|
||||
// FutureBlob is returned by SaveBlob and will return the data once it has been processed.
|
||||
type FutureBlob struct {
|
||||
ch <-chan SaveBlobResponse
|
||||
}
|
||||
|
||||
func (s *FutureBlob) Poll() *SaveBlobResponse {
|
||||
select {
|
||||
case res, ok := <-s.ch:
|
||||
if ok {
|
||||
return &res
|
||||
}
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Take blocks until the result is available or the context is cancelled.
|
||||
func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse {
|
||||
select {
|
||||
case res, ok := <-s.ch:
|
||||
if ok {
|
||||
return res
|
||||
}
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return SaveBlobResponse{}
|
||||
}
|
||||
|
||||
type saveBlobJob struct {
|
||||
restic.BlobType
|
||||
buf *Buffer
|
||||
ch chan<- SaveBlobResponse
|
||||
cb func(res SaveBlobResponse)
|
||||
}
|
||||
|
||||
type SaveBlobResponse struct {
|
||||
|
@ -128,11 +95,9 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
|
|||
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
|
||||
if err != nil {
|
||||
debug.Log("saveBlob returned error, exiting: %v", err)
|
||||
close(job.ch)
|
||||
return err
|
||||
}
|
||||
job.ch <- res
|
||||
close(job.ch)
|
||||
job.cb(res)
|
||||
job.buf.Release()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
|
@ -45,16 +46,22 @@ func TestBlobSaver(t *testing.T) {
|
|||
|
||||
b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU()))
|
||||
|
||||
var results []FutureBlob
|
||||
var wait sync.WaitGroup
|
||||
var results []SaveBlobResponse
|
||||
|
||||
wait.Add(20)
|
||||
for i := 0; i < 20; i++ {
|
||||
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
|
||||
fb := b.Save(ctx, restic.DataBlob, buf)
|
||||
results = append(results, fb)
|
||||
idx := i
|
||||
results = append(results, SaveBlobResponse{})
|
||||
b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {
|
||||
results[idx] = res
|
||||
wait.Done()
|
||||
})
|
||||
}
|
||||
|
||||
for i, blob := range results {
|
||||
sbr := blob.Take(ctx)
|
||||
wait.Wait()
|
||||
for i, sbr := range results {
|
||||
if sbr.known {
|
||||
t.Errorf("blob %v is known, that should not be the case", i)
|
||||
}
|
||||
|
@ -94,7 +101,7 @@ func TestBlobSaverError(t *testing.T) {
|
|||
|
||||
for i := 0; i < test.blobs; i++ {
|
||||
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
|
||||
b.Save(ctx, restic.DataBlob, buf)
|
||||
b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {})
|
||||
}
|
||||
|
||||
b.TriggerShutdown()
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/restic/chunker"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
|
@ -14,7 +15,7 @@ import (
|
|||
)
|
||||
|
||||
// SaveBlobFn saves a blob to a repo.
|
||||
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob
|
||||
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer, func(res SaveBlobResponse))
|
||||
|
||||
// FileSaver concurrently saves incoming files to the repo.
|
||||
type FileSaver struct {
|
||||
|
@ -66,17 +67,21 @@ func (s *FileSaver) TriggerShutdown() {
|
|||
type CompleteFunc func(*restic.Node, ItemStats)
|
||||
|
||||
// Save stores the file f and returns the data once it has been completed. The
|
||||
// file is closed by Save.
|
||||
func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureNode {
|
||||
// file is closed by Save. completeReading is only called if the file was read
|
||||
// successfully. complete is always called. If completeReading is called, then
|
||||
// this will always happen before calling complete.
|
||||
func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), completeReading func(), complete CompleteFunc) FutureNode {
|
||||
fn, ch := newFutureNode()
|
||||
job := saveFileJob{
|
||||
snPath: snPath,
|
||||
target: target,
|
||||
file: file,
|
||||
fi: fi,
|
||||
start: start,
|
||||
complete: complete,
|
||||
ch: ch,
|
||||
|
||||
start: start,
|
||||
completeReading: completeReading,
|
||||
complete: complete,
|
||||
}
|
||||
|
||||
select {
|
||||
|
@ -96,51 +101,74 @@ type saveFileJob struct {
|
|||
file fs.File
|
||||
fi os.FileInfo
|
||||
ch chan<- futureNodeResult
|
||||
complete CompleteFunc
|
||||
|
||||
start func()
|
||||
completeReading func()
|
||||
complete CompleteFunc
|
||||
}
|
||||
|
||||
// saveFile stores the file f in the repo, then closes it.
|
||||
func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult {
|
||||
func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func(), finishReading func(), finish func(res futureNodeResult)) {
|
||||
start()
|
||||
|
||||
stats := ItemStats{}
|
||||
fnr := futureNodeResult{
|
||||
snPath: snPath,
|
||||
target: target,
|
||||
}
|
||||
var lock sync.Mutex
|
||||
remaining := 0
|
||||
isCompleted := false
|
||||
|
||||
completeBlob := func() {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
remaining--
|
||||
if remaining == 0 && fnr.err == nil {
|
||||
if isCompleted {
|
||||
panic("completed twice")
|
||||
}
|
||||
isCompleted = true
|
||||
finish(fnr)
|
||||
}
|
||||
}
|
||||
completeError := func(err error) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
if fnr.err == nil {
|
||||
if isCompleted {
|
||||
panic("completed twice")
|
||||
}
|
||||
isCompleted = true
|
||||
fnr.err = err
|
||||
fnr.node = nil
|
||||
fnr.stats = ItemStats{}
|
||||
finish(fnr)
|
||||
}
|
||||
}
|
||||
|
||||
debug.Log("%v", snPath)
|
||||
|
||||
node, err := s.NodeFromFileInfo(snPath, f.Name(), fi)
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
fnr.err = err
|
||||
return fnr
|
||||
completeError(err)
|
||||
return
|
||||
}
|
||||
|
||||
if node.Type != "file" {
|
||||
_ = f.Close()
|
||||
fnr.err = errors.Errorf("node type %q is wrong", node.Type)
|
||||
return fnr
|
||||
completeError(errors.Errorf("node type %q is wrong", node.Type))
|
||||
return
|
||||
}
|
||||
|
||||
// reuse the chunker
|
||||
chnker.Reset(f, s.pol)
|
||||
|
||||
var results []FutureBlob
|
||||
complete := func(sbr SaveBlobResponse) {
|
||||
if !sbr.known {
|
||||
stats.DataBlobs++
|
||||
stats.DataSize += uint64(sbr.length)
|
||||
stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
|
||||
}
|
||||
|
||||
node.Content = append(node.Content, sbr.id)
|
||||
}
|
||||
|
||||
node.Content = []restic.ID{}
|
||||
var size uint64
|
||||
node.Size = 0
|
||||
var idx int
|
||||
for {
|
||||
buf := s.saveFilePool.Get()
|
||||
chunk, err := chnker.Next(buf.Data)
|
||||
|
@ -150,62 +178,63 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
|
|||
}
|
||||
|
||||
buf.Data = chunk.Data
|
||||
|
||||
size += uint64(chunk.Length)
|
||||
node.Size += uint64(chunk.Length)
|
||||
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
fnr.err = err
|
||||
return fnr
|
||||
completeError(err)
|
||||
return
|
||||
}
|
||||
// test if the context has been cancelled, return the error
|
||||
if ctx.Err() != nil {
|
||||
_ = f.Close()
|
||||
completeError(ctx.Err())
|
||||
return
|
||||
}
|
||||
|
||||
// add a place to store the saveBlob result
|
||||
pos := idx
|
||||
node.Content = append(node.Content, restic.ID{})
|
||||
|
||||
s.saveBlob(ctx, restic.DataBlob, buf, func(sbr SaveBlobResponse) {
|
||||
lock.Lock()
|
||||
if !sbr.known {
|
||||
fnr.stats.DataBlobs++
|
||||
fnr.stats.DataSize += uint64(sbr.length)
|
||||
fnr.stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
|
||||
}
|
||||
|
||||
node.Content[pos] = sbr.id
|
||||
lock.Unlock()
|
||||
|
||||
completeBlob()
|
||||
})
|
||||
idx++
|
||||
|
||||
// test if the context has been cancelled, return the error
|
||||
if ctx.Err() != nil {
|
||||
_ = f.Close()
|
||||
fnr.err = ctx.Err()
|
||||
return fnr
|
||||
}
|
||||
|
||||
res := s.saveBlob(ctx, restic.DataBlob, buf)
|
||||
results = append(results, res)
|
||||
|
||||
// test if the context has been cancelled, return the error
|
||||
if ctx.Err() != nil {
|
||||
_ = f.Close()
|
||||
fnr.err = ctx.Err()
|
||||
return fnr
|
||||
completeError(ctx.Err())
|
||||
return
|
||||
}
|
||||
|
||||
s.CompleteBlob(uint64(len(chunk.Data)))
|
||||
|
||||
// collect already completed blobs
|
||||
for len(results) > 0 {
|
||||
sbr := results[0].Poll()
|
||||
if sbr == nil {
|
||||
break
|
||||
}
|
||||
results[0] = FutureBlob{}
|
||||
results = results[1:]
|
||||
complete(*sbr)
|
||||
}
|
||||
}
|
||||
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
fnr.err = err
|
||||
return fnr
|
||||
completeError(err)
|
||||
return
|
||||
}
|
||||
|
||||
for i, res := range results {
|
||||
results[i] = FutureBlob{}
|
||||
sbr := res.Take(ctx)
|
||||
complete(sbr)
|
||||
}
|
||||
|
||||
node.Size = size
|
||||
fnr.node = node
|
||||
fnr.stats = stats
|
||||
return fnr
|
||||
lock.Lock()
|
||||
// require one additional completeFuture() call to ensure that the future only completes
|
||||
// after reaching the end of this method
|
||||
remaining += idx + 1
|
||||
lock.Unlock()
|
||||
finishReading()
|
||||
completeBlob()
|
||||
}
|
||||
|
||||
func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
|
||||
|
@ -224,11 +253,16 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
|
|||
}
|
||||
}
|
||||
|
||||
res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start)
|
||||
s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start, func() {
|
||||
if job.completeReading != nil {
|
||||
job.completeReading()
|
||||
}
|
||||
}, func(res futureNodeResult) {
|
||||
if job.complete != nil {
|
||||
job.complete(res.node, res.stats)
|
||||
}
|
||||
job.ch <- res
|
||||
close(job.ch)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,10 +34,8 @@ func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) {
|
|||
func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *errgroup.Group) {
|
||||
wg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob {
|
||||
ch := make(chan SaveBlobResponse)
|
||||
close(ch)
|
||||
return FutureBlob{ch: ch}
|
||||
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer, cb func(SaveBlobResponse)) {
|
||||
cb(SaveBlobResponse{})
|
||||
}
|
||||
|
||||
workers := uint(runtime.NumCPU())
|
||||
|
@ -62,6 +60,7 @@ func TestFileSaver(t *testing.T) {
|
|||
defer cleanup()
|
||||
|
||||
startFn := func() {}
|
||||
completeReadingFn := func() {}
|
||||
completeFn := func(*restic.Node, ItemStats) {}
|
||||
|
||||
testFs := fs.Local{}
|
||||
|
@ -80,7 +79,7 @@ func TestFileSaver(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ff := s.Save(ctx, filename, filename, f, fi, startFn, completeFn)
|
||||
ff := s.Save(ctx, filename, filename, f, fi, startFn, completeReadingFn, completeFn)
|
||||
results = append(results, ff)
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
// TreeSaver concurrently saves incoming trees to the repo.
|
||||
type TreeSaver struct {
|
||||
saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob
|
||||
saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse))
|
||||
errFn ErrorFunc
|
||||
|
||||
ch chan<- saveTreeJob
|
||||
|
@ -19,7 +19,7 @@ type TreeSaver struct {
|
|||
|
||||
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
||||
// started, it is stopped when ctx is cancelled.
|
||||
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob, errFn ErrorFunc) *TreeSaver {
|
||||
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)), errFn ErrorFunc) *TreeSaver {
|
||||
ch := make(chan saveTreeJob)
|
||||
|
||||
s := &TreeSaver{
|
||||
|
@ -124,21 +124,24 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I
|
|||
}
|
||||
|
||||
b := &Buffer{Data: buf}
|
||||
res := s.saveBlob(ctx, restic.TreeBlob, b)
|
||||
ch := make(chan SaveBlobResponse, 1)
|
||||
s.saveBlob(ctx, restic.TreeBlob, b, func(res SaveBlobResponse) {
|
||||
ch <- res
|
||||
})
|
||||
|
||||
sbr := res.Take(ctx)
|
||||
select {
|
||||
case sbr := <-ch:
|
||||
if !sbr.known {
|
||||
stats.TreeBlobs++
|
||||
stats.TreeSize += uint64(sbr.length)
|
||||
stats.TreeSizeInRepo += uint64(sbr.sizeInRepo)
|
||||
}
|
||||
// The context was canceled in the meantime, id might be invalid
|
||||
if ctx.Err() != nil {
|
||||
return nil, stats, ctx.Err()
|
||||
}
|
||||
|
||||
node.Subtree = &sbr.id
|
||||
return node, stats, nil
|
||||
case <-ctx.Done():
|
||||
return nil, stats, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error {
|
||||
|
|
|
@ -12,15 +12,13 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
|
||||
ch := make(chan SaveBlobResponse, 1)
|
||||
ch <- SaveBlobResponse{
|
||||
func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) {
|
||||
cb(SaveBlobResponse{
|
||||
id: restic.NewRandomID(),
|
||||
known: false,
|
||||
length: len(buf.Data),
|
||||
sizeInRepo: len(buf.Data),
|
||||
}
|
||||
return FutureBlob{ch: ch}
|
||||
})
|
||||
}
|
||||
|
||||
func setupTreeSaver() (context.Context, context.CancelFunc, *TreeSaver, func() error) {
|
||||
|
|
|
@ -25,6 +25,7 @@ type Terminal struct {
|
|||
msg chan message
|
||||
status chan status
|
||||
canUpdateStatus bool
|
||||
lastStatusLen int
|
||||
|
||||
// will be closed when the goroutine which runs Run() terminates, so it'll
|
||||
// yield a default value immediately
|
||||
|
@ -154,6 +155,18 @@ func (t *Terminal) run(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (t *Terminal) writeStatus(status []string) {
|
||||
statusLen := len(status)
|
||||
status = append([]string{}, status...)
|
||||
for i := len(status); i < t.lastStatusLen; i++ {
|
||||
// clear no longer used status lines
|
||||
status = append(status, "")
|
||||
if i > 0 {
|
||||
// all lines except the last one must have a line break
|
||||
status[i-1] = status[i-1] + "\n"
|
||||
}
|
||||
}
|
||||
t.lastStatusLen = statusLen
|
||||
|
||||
for _, line := range status {
|
||||
t.clearCurrentLine(t.wr, t.fd)
|
||||
|
||||
|
|
Loading…
Reference in a new issue