forked from TrueCloudLab/restic
bdf7ba20cb
The Save methods of the BlobSaver, FileSaver and TreeSaver return early on when the archiver is stopped due to an error. For that they select on both the tomb.Dying() and context.Done() channels, which can lead to a race condition when the tomb is killed due to an error: The tomb first closes its Dying channel before canceling all child contexts. Archiver.SaveDir only aborts its execution once the context was canceled. When the tomb killing is paused between closing its Dying channel and canceling the child contexts, this lets the FileSaver/TreeSaver.Save methods return immediately, however, ScanDir still reads further files causing the test case to fail. As a killed tomb always cancels all child contexts and as the Savers always use a context bound to the tomb, it is sufficient to just use context.Done() as escape hatch in the Save functions. This fixes the mismatch between SaveDir and Save. Adjust the tests to use contexts bound to the tomb for all interactions with the Savers.
170 lines
3.6 KiB
Go
170 lines
3.6 KiB
Go
package archiver
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/restic/restic/internal/debug"
|
|
"github.com/restic/restic/internal/restic"
|
|
tomb "gopkg.in/tomb.v2"
|
|
)
|
|
|
|
// Saver allows saving a blob.
|
|
type Saver interface {
|
|
SaveBlob(ctx context.Context, t restic.BlobType, data []byte, id restic.ID) (restic.ID, error)
|
|
Index() restic.Index
|
|
}
|
|
|
|
// BlobSaver concurrently saves incoming blobs to the repo.
|
|
type BlobSaver struct {
|
|
repo Saver
|
|
|
|
m sync.Mutex
|
|
knownBlobs restic.BlobSet
|
|
|
|
ch chan<- saveBlobJob
|
|
}
|
|
|
|
// NewBlobSaver returns a new blob. A worker pool is started, it is stopped
|
|
// when ctx is cancelled.
|
|
func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *BlobSaver {
|
|
ch := make(chan saveBlobJob)
|
|
s := &BlobSaver{
|
|
repo: repo,
|
|
knownBlobs: restic.NewBlobSet(),
|
|
ch: ch,
|
|
}
|
|
|
|
for i := uint(0); i < workers; i++ {
|
|
t.Go(func() error {
|
|
return s.worker(t.Context(ctx), ch)
|
|
})
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
// Save stores a blob in the repo. It checks the index and the known blobs
|
|
// before saving anything. The second return parameter is true if the blob was
|
|
// previously unknown.
|
|
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
|
|
ch := make(chan saveBlobResponse, 1)
|
|
select {
|
|
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
|
|
case <-ctx.Done():
|
|
debug.Log("not sending job, context is cancelled")
|
|
close(ch)
|
|
return FutureBlob{ch: ch}
|
|
}
|
|
|
|
return FutureBlob{ch: ch, length: len(buf.Data)}
|
|
}
|
|
|
|
// FutureBlob is returned by SaveBlob and will return the data once it has been processed.
|
|
type FutureBlob struct {
|
|
ch <-chan saveBlobResponse
|
|
length int
|
|
res saveBlobResponse
|
|
}
|
|
|
|
// Wait blocks until the result is available or the context is cancelled.
|
|
func (s *FutureBlob) Wait(ctx context.Context) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case res, ok := <-s.ch:
|
|
if ok {
|
|
s.res = res
|
|
}
|
|
}
|
|
}
|
|
|
|
// ID returns the ID of the blob after it has been saved.
|
|
func (s *FutureBlob) ID() restic.ID {
|
|
return s.res.id
|
|
}
|
|
|
|
// Known returns whether or not the blob was already known.
|
|
func (s *FutureBlob) Known() bool {
|
|
return s.res.known
|
|
}
|
|
|
|
// Length returns the length of the blob.
|
|
func (s *FutureBlob) Length() int {
|
|
return s.length
|
|
}
|
|
|
|
type saveBlobJob struct {
|
|
restic.BlobType
|
|
buf *Buffer
|
|
ch chan<- saveBlobResponse
|
|
}
|
|
|
|
type saveBlobResponse struct {
|
|
id restic.ID
|
|
known bool
|
|
}
|
|
|
|
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) {
|
|
id := restic.Hash(buf)
|
|
h := restic.BlobHandle{ID: id, Type: t}
|
|
|
|
// check if another goroutine has already saved this blob
|
|
known := false
|
|
s.m.Lock()
|
|
if s.knownBlobs.Has(h) {
|
|
known = true
|
|
} else {
|
|
s.knownBlobs.Insert(h)
|
|
known = false
|
|
}
|
|
s.m.Unlock()
|
|
|
|
// blob is already known, nothing to do
|
|
if known {
|
|
return saveBlobResponse{
|
|
id: id,
|
|
known: true,
|
|
}, nil
|
|
}
|
|
|
|
// check if the repo knows this blob
|
|
if s.repo.Index().Has(id, t) {
|
|
return saveBlobResponse{
|
|
id: id,
|
|
known: true,
|
|
}, nil
|
|
}
|
|
|
|
// otherwise we're responsible for saving it
|
|
_, err := s.repo.SaveBlob(ctx, t, buf, id)
|
|
if err != nil {
|
|
return saveBlobResponse{}, err
|
|
}
|
|
|
|
return saveBlobResponse{
|
|
id: id,
|
|
known: false,
|
|
}, nil
|
|
}
|
|
|
|
func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
|
|
for {
|
|
var job saveBlobJob
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case job = <-jobs:
|
|
}
|
|
|
|
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
|
|
if err != nil {
|
|
debug.Log("saveBlob returned error, exiting: %v", err)
|
|
close(job.ch)
|
|
return err
|
|
}
|
|
job.ch <- res
|
|
close(job.ch)
|
|
job.buf.Release()
|
|
}
|
|
}
|