archiver: Asynchronously complete FutureFile

After reading and chunking all data in a file, the FutureFile still has
to wait until the FutureBlobs are completed. This was done synchronously
which results in blocking the file saver and prevents the next file from
being read.

By replacing the FutureBlob with a callback, it becomes possible to
complete the FutureFile asynchronously.
This commit is contained in:
Michael Eischer 2022-10-07 20:23:38 +02:00
parent 47e05080a9
commit b4de902596
6 changed files with 125 additions and 131 deletions

View file

@ -43,51 +43,18 @@ func (s *BlobSaver) TriggerShutdown() {
// Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. It takes ownership of the buffer passed in.
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan SaveBlobResponse, 1)
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) {
select {
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
case s.ch <- saveBlobJob{BlobType: t, buf: buf, cb: cb}:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled")
close(ch)
return FutureBlob{ch: ch}
}
return FutureBlob{ch: ch}
}
// FutureBlob is returned by SaveBlob and will return the data once it has been processed.
type FutureBlob struct {
ch <-chan SaveBlobResponse
}
func (s *FutureBlob) Poll() *SaveBlobResponse {
select {
case res, ok := <-s.ch:
if ok {
return &res
}
default:
}
return nil
}
// Take blocks until the result is available or the context is cancelled.
func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse {
select {
case res, ok := <-s.ch:
if ok {
return res
}
case <-ctx.Done():
}
return SaveBlobResponse{}
}
type saveBlobJob struct {
restic.BlobType
buf *Buffer
ch chan<- SaveBlobResponse
cb func(res SaveBlobResponse)
}
type SaveBlobResponse struct {
@ -128,11 +95,9 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
if err != nil {
debug.Log("saveBlob returned error, exiting: %v", err)
close(job.ch)
return err
}
job.ch <- res
close(job.ch)
job.cb(res)
job.buf.Release()
}
}

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
@ -45,16 +46,22 @@ func TestBlobSaver(t *testing.T) {
b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU()))
var results []FutureBlob
var wait sync.WaitGroup
var results []SaveBlobResponse
wait.Add(20)
for i := 0; i < 20; i++ {
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
fb := b.Save(ctx, restic.DataBlob, buf)
results = append(results, fb)
idx := i
results = append(results, SaveBlobResponse{})
b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {
results[idx] = res
wait.Done()
})
}
for i, blob := range results {
sbr := blob.Take(ctx)
wait.Wait()
for i, sbr := range results {
if sbr.known {
t.Errorf("blob %v is known, that should not be the case", i)
}
@ -94,7 +101,7 @@ func TestBlobSaverError(t *testing.T) {
for i := 0; i < test.blobs; i++ {
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
b.Save(ctx, restic.DataBlob, buf)
b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {})
}
b.TriggerShutdown()

View file

@ -4,6 +4,7 @@ import (
"context"
"io"
"os"
"sync"
"github.com/restic/chunker"
"github.com/restic/restic/internal/debug"
@ -14,7 +15,7 @@ import (
)
// SaveBlobFn saves a blob to a repo.
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer, func(res SaveBlobResponse))
// FileSaver concurrently saves incoming files to the repo.
type FileSaver struct {
@ -101,46 +102,67 @@ type saveFileJob struct {
}
// saveFile stores the file f in the repo, then closes it.
func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult {
func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func(), finish func(res futureNodeResult)) {
start()
stats := ItemStats{}
fnr := futureNodeResult{
snPath: snPath,
target: target,
}
var lock sync.Mutex
remaining := 0
isCompleted := false
completeBlob := func() {
lock.Lock()
defer lock.Unlock()
remaining--
if remaining == 0 && fnr.err == nil {
if isCompleted {
panic("completed twice")
}
isCompleted = true
finish(fnr)
}
}
completeError := func(err error) {
lock.Lock()
defer lock.Unlock()
if fnr.err == nil {
if isCompleted {
panic("completed twice")
}
isCompleted = true
fnr.err = err
fnr.node = nil
fnr.stats = ItemStats{}
finish(fnr)
}
}
debug.Log("%v", snPath)
node, err := s.NodeFromFileInfo(snPath, f.Name(), fi)
if err != nil {
_ = f.Close()
fnr.err = err
return fnr
completeError(err)
return
}
if node.Type != "file" {
_ = f.Close()
fnr.err = errors.Errorf("node type %q is wrong", node.Type)
return fnr
completeError(errors.Errorf("node type %q is wrong", node.Type))
return
}
// reuse the chunker
chnker.Reset(f, s.pol)
var results []FutureBlob
complete := func(sbr SaveBlobResponse) {
if !sbr.known {
stats.DataBlobs++
stats.DataSize += uint64(sbr.length)
stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
}
node.Content = append(node.Content, sbr.id)
}
node.Content = []restic.ID{}
var size uint64
node.Size = 0
var idx int
for {
buf := s.saveFilePool.Get()
chunk, err := chnker.Next(buf.Data)
@ -150,62 +172,62 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
}
buf.Data = chunk.Data
size += uint64(chunk.Length)
node.Size += uint64(chunk.Length)
if err != nil {
_ = f.Close()
fnr.err = err
return fnr
completeError(err)
return
}
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
_ = f.Close()
completeError(ctx.Err())
return
}
// add a place to store the saveBlob result
pos := idx
node.Content = append(node.Content, restic.ID{})
s.saveBlob(ctx, restic.DataBlob, buf, func(sbr SaveBlobResponse) {
lock.Lock()
if !sbr.known {
fnr.stats.DataBlobs++
fnr.stats.DataSize += uint64(sbr.length)
fnr.stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
}
node.Content[pos] = sbr.id
lock.Unlock()
completeBlob()
})
idx++
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
_ = f.Close()
fnr.err = ctx.Err()
return fnr
}
res := s.saveBlob(ctx, restic.DataBlob, buf)
results = append(results, res)
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
_ = f.Close()
fnr.err = ctx.Err()
return fnr
completeError(ctx.Err())
return
}
s.CompleteBlob(uint64(len(chunk.Data)))
// collect already completed blobs
for len(results) > 0 {
sbr := results[0].Poll()
if sbr == nil {
break
}
results[0] = FutureBlob{}
results = results[1:]
complete(*sbr)
}
}
err = f.Close()
if err != nil {
fnr.err = err
return fnr
completeError(err)
return
}
for i, res := range results {
results[i] = FutureBlob{}
sbr := res.Take(ctx)
complete(sbr)
}
node.Size = size
fnr.node = node
fnr.stats = stats
return fnr
lock.Lock()
// require one additional completeFuture() call to ensure that the future only completes
// after reaching the end of this method
remaining += idx + 1
lock.Unlock()
completeBlob()
}
func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
@ -224,11 +246,12 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
}
}
res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start)
s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start, func(res futureNodeResult) {
if job.complete != nil {
job.complete(res.node, res.stats)
}
job.ch <- res
close(job.ch)
})
}
}

View file

@ -34,10 +34,8 @@ func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) {
func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *errgroup.Group) {
wg, ctx := errgroup.WithContext(ctx)
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan SaveBlobResponse)
close(ch)
return FutureBlob{ch: ch}
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer, cb func(SaveBlobResponse)) {
cb(SaveBlobResponse{})
}
workers := uint(runtime.NumCPU())

View file

@ -11,7 +11,7 @@ import (
// TreeSaver concurrently saves incoming trees to the repo.
type TreeSaver struct {
saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob
saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse))
errFn ErrorFunc
ch chan<- saveTreeJob
@ -19,7 +19,7 @@ type TreeSaver struct {
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
// started, it is stopped when ctx is cancelled.
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob, errFn ErrorFunc) *TreeSaver {
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)), errFn ErrorFunc) *TreeSaver {
ch := make(chan saveTreeJob)
s := &TreeSaver{
@ -124,21 +124,24 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I
}
b := &Buffer{Data: buf}
res := s.saveBlob(ctx, restic.TreeBlob, b)
ch := make(chan SaveBlobResponse, 1)
s.saveBlob(ctx, restic.TreeBlob, b, func(res SaveBlobResponse) {
ch <- res
})
sbr := res.Take(ctx)
select {
case sbr := <-ch:
if !sbr.known {
stats.TreeBlobs++
stats.TreeSize += uint64(sbr.length)
stats.TreeSizeInRepo += uint64(sbr.sizeInRepo)
}
// The context was canceled in the meantime, id might be invalid
if ctx.Err() != nil {
return nil, stats, ctx.Err()
}
node.Subtree = &sbr.id
return node, stats, nil
case <-ctx.Done():
return nil, stats, ctx.Err()
}
}
func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error {

View file

@ -12,15 +12,13 @@ import (
"golang.org/x/sync/errgroup"
)
func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan SaveBlobResponse, 1)
ch <- SaveBlobResponse{
func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) {
cb(SaveBlobResponse{
id: restic.NewRandomID(),
known: false,
length: len(buf.Data),
sizeInRepo: len(buf.Data),
}
return FutureBlob{ch: ch}
})
}
func setupTreeSaver() (context.Context, context.CancelFunc, *TreeSaver, func() error) {