backup: report files whose chunks failed to upload
This commit is contained in:
parent
667a2f5369
commit
5b5d506472
6 changed files with 18 additions and 15 deletions
|
@ -2,6 +2,7 @@ package archiver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/debug"
|
"github.com/restic/restic/internal/debug"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
@ -43,9 +44,9 @@ func (s *BlobSaver) TriggerShutdown() {
|
||||||
|
|
||||||
// Save stores a blob in the repo. It checks the index and the known blobs
|
// Save stores a blob in the repo. It checks the index and the known blobs
|
||||||
// before saving anything. It takes ownership of the buffer passed in.
|
// before saving anything. It takes ownership of the buffer passed in.
|
||||||
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) {
|
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, filename string, cb func(res SaveBlobResponse)) {
|
||||||
select {
|
select {
|
||||||
case s.ch <- saveBlobJob{BlobType: t, buf: buf, cb: cb}:
|
case s.ch <- saveBlobJob{BlobType: t, buf: buf, fn: filename, cb: cb}:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
debug.Log("not sending job, context is cancelled")
|
debug.Log("not sending job, context is cancelled")
|
||||||
}
|
}
|
||||||
|
@ -54,6 +55,7 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, cb
|
||||||
type saveBlobJob struct {
|
type saveBlobJob struct {
|
||||||
restic.BlobType
|
restic.BlobType
|
||||||
buf *Buffer
|
buf *Buffer
|
||||||
|
fn string
|
||||||
cb func(res SaveBlobResponse)
|
cb func(res SaveBlobResponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,7 +97,7 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error {
|
||||||
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
|
res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("saveBlob returned error, exiting: %v", err)
|
debug.Log("saveBlob returned error, exiting: %v", err)
|
||||||
return err
|
return fmt.Errorf("failed to save blob from file %q: %w", job.fn, err)
|
||||||
}
|
}
|
||||||
job.cb(res)
|
job.cb(res)
|
||||||
job.buf.Release()
|
job.buf.Release()
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -11,6 +12,7 @@ import (
|
||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/index"
|
"github.com/restic/restic/internal/index"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
rtest "github.com/restic/restic/internal/test"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -57,7 +59,7 @@ func TestBlobSaver(t *testing.T) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
results = append(results, SaveBlobResponse{})
|
results = append(results, SaveBlobResponse{})
|
||||||
lock.Unlock()
|
lock.Unlock()
|
||||||
b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {
|
b.Save(ctx, restic.DataBlob, buf, "file", func(res SaveBlobResponse) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
results[idx] = res
|
results[idx] = res
|
||||||
lock.Unlock()
|
lock.Unlock()
|
||||||
|
@ -106,7 +108,7 @@ func TestBlobSaverError(t *testing.T) {
|
||||||
|
|
||||||
for i := 0; i < test.blobs; i++ {
|
for i := 0; i < test.blobs; i++ {
|
||||||
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
|
buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))}
|
||||||
b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {})
|
b.Save(ctx, restic.DataBlob, buf, "errfile", func(res SaveBlobResponse) {})
|
||||||
}
|
}
|
||||||
|
|
||||||
b.TriggerShutdown()
|
b.TriggerShutdown()
|
||||||
|
@ -116,9 +118,8 @@ func TestBlobSaverError(t *testing.T) {
|
||||||
t.Errorf("expected error not found")
|
t.Errorf("expected error not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != errTest {
|
rtest.Assert(t, errors.Is(err, errTest), "unexpected error %v", err)
|
||||||
t.Fatalf("unexpected error found: %v", err)
|
rtest.Assert(t, strings.Contains(err.Error(), "errfile"), "expected error to contain 'errfile' got: %v", err)
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// SaveBlobFn saves a blob to a repo.
|
// SaveBlobFn saves a blob to a repo.
|
||||||
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer, func(res SaveBlobResponse))
|
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer, string, func(res SaveBlobResponse))
|
||||||
|
|
||||||
// FileSaver concurrently saves incoming files to the repo.
|
// FileSaver concurrently saves incoming files to the repo.
|
||||||
type FileSaver struct {
|
type FileSaver struct {
|
||||||
|
@ -205,7 +205,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
|
||||||
node.Content = append(node.Content, restic.ID{})
|
node.Content = append(node.Content, restic.ID{})
|
||||||
lock.Unlock()
|
lock.Unlock()
|
||||||
|
|
||||||
s.saveBlob(ctx, restic.DataBlob, buf, func(sbr SaveBlobResponse) {
|
s.saveBlob(ctx, restic.DataBlob, buf, target, func(sbr SaveBlobResponse) {
|
||||||
lock.Lock()
|
lock.Lock()
|
||||||
if !sbr.known {
|
if !sbr.known {
|
||||||
fnr.stats.DataBlobs++
|
fnr.stats.DataBlobs++
|
||||||
|
|
|
@ -33,7 +33,7 @@ func createTestFiles(t testing.TB, num int) (files []string) {
|
||||||
func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *errgroup.Group) {
|
func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *errgroup.Group) {
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer, cb func(SaveBlobResponse)) {
|
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer, _ string, cb func(SaveBlobResponse)) {
|
||||||
cb(SaveBlobResponse{
|
cb(SaveBlobResponse{
|
||||||
id: restic.Hash(buf.Data),
|
id: restic.Hash(buf.Data),
|
||||||
length: len(buf.Data),
|
length: len(buf.Data),
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
|
|
||||||
// TreeSaver concurrently saves incoming trees to the repo.
|
// TreeSaver concurrently saves incoming trees to the repo.
|
||||||
type TreeSaver struct {
|
type TreeSaver struct {
|
||||||
saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse))
|
saveBlob SaveBlobFn
|
||||||
errFn ErrorFunc
|
errFn ErrorFunc
|
||||||
|
|
||||||
ch chan<- saveTreeJob
|
ch chan<- saveTreeJob
|
||||||
|
@ -19,7 +19,7 @@ type TreeSaver struct {
|
||||||
|
|
||||||
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
||||||
// started, it is stopped when ctx is cancelled.
|
// started, it is stopped when ctx is cancelled.
|
||||||
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)), errFn ErrorFunc) *TreeSaver {
|
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob SaveBlobFn, errFn ErrorFunc) *TreeSaver {
|
||||||
ch := make(chan saveTreeJob)
|
ch := make(chan saveTreeJob)
|
||||||
|
|
||||||
s := &TreeSaver{
|
s := &TreeSaver{
|
||||||
|
@ -126,7 +126,7 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I
|
||||||
|
|
||||||
b := &Buffer{Data: buf}
|
b := &Buffer{Data: buf}
|
||||||
ch := make(chan SaveBlobResponse, 1)
|
ch := make(chan SaveBlobResponse, 1)
|
||||||
s.saveBlob(ctx, restic.TreeBlob, b, func(res SaveBlobResponse) {
|
s.saveBlob(ctx, restic.TreeBlob, b, job.target, func(res SaveBlobResponse) {
|
||||||
ch <- res
|
ch <- res
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func treeSaveHelper(_ context.Context, _ restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) {
|
func treeSaveHelper(_ context.Context, _ restic.BlobType, buf *Buffer, _ string, cb func(res SaveBlobResponse)) {
|
||||||
cb(SaveBlobResponse{
|
cb(SaveBlobResponse{
|
||||||
id: restic.NewRandomID(),
|
id: restic.NewRandomID(),
|
||||||
known: false,
|
known: false,
|
||||||
|
|
Loading…
Reference in a new issue