2b88cd6eab
SaveTree did not use the TreeSaver but rather managed the tree collection and upload itself. This prevents using the parallelism offered by the TreeSaver and duplicates all related code. Using the TreeSaver can provide some speed-ups as all steps within the backup tree now rely on FutureNodes. This can be especially relevant for backups with large amounts of explicitly specified files. The main difference between SaveTree and SaveDir is, that only the former can save tree blobs in which nodes have a different name than the actual file on disk. This is the result of resolving name conflicts between multiple files with the same name. The filename that must be used within the snapshot is now passed directly to restic.NodeFromFileInfo. This ensures that a FutureNode already contains the correct filename.
234 lines
4.9 KiB
Go
234 lines
4.9 KiB
Go
package archiver
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"os"
|
|
|
|
"github.com/restic/chunker"
|
|
"github.com/restic/restic/internal/debug"
|
|
"github.com/restic/restic/internal/errors"
|
|
"github.com/restic/restic/internal/fs"
|
|
"github.com/restic/restic/internal/restic"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// SaveBlobFn saves a blob to a repo.
|
|
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob
|
|
|
|
// FileSaver concurrently saves incoming files to the repo.
|
|
type FileSaver struct {
|
|
saveFilePool *BufferPool
|
|
saveBlob SaveBlobFn
|
|
|
|
pol chunker.Pol
|
|
|
|
ch chan<- saveFileJob
|
|
|
|
CompleteBlob func(filename string, bytes uint64)
|
|
|
|
NodeFromFileInfo func(snPath, filename string, fi os.FileInfo) (*restic.Node, error)
|
|
}
|
|
|
|
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
|
|
// started, it is stopped when ctx is cancelled.
|
|
func NewFileSaver(ctx context.Context, wg *errgroup.Group, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
|
|
ch := make(chan saveFileJob)
|
|
|
|
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
|
|
|
|
poolSize := fileWorkers + blobWorkers
|
|
|
|
s := &FileSaver{
|
|
saveBlob: save,
|
|
saveFilePool: NewBufferPool(int(poolSize), chunker.MaxSize),
|
|
pol: pol,
|
|
ch: ch,
|
|
|
|
CompleteBlob: func(string, uint64) {},
|
|
}
|
|
|
|
for i := uint(0); i < fileWorkers; i++ {
|
|
wg.Go(func() error {
|
|
s.worker(ctx, ch)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *FileSaver) TriggerShutdown() {
|
|
close(s.ch)
|
|
}
|
|
|
|
// CompleteFunc is called when the file has been saved.
|
|
type CompleteFunc func(*restic.Node, ItemStats)
|
|
|
|
// Save stores the file f and returns the data once it has been completed. The
|
|
// file is closed by Save.
|
|
func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureNode {
|
|
fn, ch := newFutureNode()
|
|
job := saveFileJob{
|
|
snPath: snPath,
|
|
target: target,
|
|
file: file,
|
|
fi: fi,
|
|
start: start,
|
|
complete: complete,
|
|
ch: ch,
|
|
}
|
|
|
|
select {
|
|
case s.ch <- job:
|
|
case <-ctx.Done():
|
|
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
|
|
_ = file.Close()
|
|
close(ch)
|
|
}
|
|
|
|
return fn
|
|
}
|
|
|
|
type saveFileJob struct {
|
|
snPath string
|
|
target string
|
|
file fs.File
|
|
fi os.FileInfo
|
|
ch chan<- futureNodeResult
|
|
complete CompleteFunc
|
|
start func()
|
|
}
|
|
|
|
// saveFile stores the file f in the repo, then closes it.
|
|
func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult {
|
|
start()
|
|
|
|
stats := ItemStats{}
|
|
fnr := futureNodeResult{
|
|
snPath: snPath,
|
|
target: target,
|
|
}
|
|
|
|
debug.Log("%v", snPath)
|
|
|
|
node, err := s.NodeFromFileInfo(snPath, f.Name(), fi)
|
|
if err != nil {
|
|
_ = f.Close()
|
|
fnr.err = err
|
|
return fnr
|
|
}
|
|
|
|
if node.Type != "file" {
|
|
_ = f.Close()
|
|
fnr.err = errors.Errorf("node type %q is wrong", node.Type)
|
|
return fnr
|
|
}
|
|
|
|
// reuse the chunker
|
|
chnker.Reset(f, s.pol)
|
|
|
|
var results []FutureBlob
|
|
complete := func(sbr SaveBlobResponse) {
|
|
if !sbr.known {
|
|
stats.DataBlobs++
|
|
stats.DataSize += uint64(sbr.length)
|
|
stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
|
|
}
|
|
|
|
node.Content = append(node.Content, sbr.id)
|
|
}
|
|
|
|
node.Content = []restic.ID{}
|
|
var size uint64
|
|
for {
|
|
buf := s.saveFilePool.Get()
|
|
chunk, err := chnker.Next(buf.Data)
|
|
if err == io.EOF {
|
|
buf.Release()
|
|
break
|
|
}
|
|
|
|
buf.Data = chunk.Data
|
|
|
|
size += uint64(chunk.Length)
|
|
|
|
if err != nil {
|
|
_ = f.Close()
|
|
fnr.err = err
|
|
return fnr
|
|
}
|
|
|
|
// test if the context has been cancelled, return the error
|
|
if ctx.Err() != nil {
|
|
_ = f.Close()
|
|
fnr.err = ctx.Err()
|
|
return fnr
|
|
}
|
|
|
|
res := s.saveBlob(ctx, restic.DataBlob, buf)
|
|
results = append(results, res)
|
|
|
|
// test if the context has been cancelled, return the error
|
|
if ctx.Err() != nil {
|
|
_ = f.Close()
|
|
fnr.err = ctx.Err()
|
|
return fnr
|
|
}
|
|
|
|
s.CompleteBlob(f.Name(), uint64(len(chunk.Data)))
|
|
|
|
// collect already completed blobs
|
|
for len(results) > 0 {
|
|
sbr := results[0].Poll()
|
|
if sbr == nil {
|
|
break
|
|
}
|
|
results[0] = FutureBlob{}
|
|
results = results[1:]
|
|
complete(*sbr)
|
|
}
|
|
}
|
|
|
|
err = f.Close()
|
|
if err != nil {
|
|
fnr.err = err
|
|
return fnr
|
|
}
|
|
|
|
for i, res := range results {
|
|
results[i] = FutureBlob{}
|
|
sbr := res.Take(ctx)
|
|
complete(sbr)
|
|
}
|
|
|
|
node.Size = size
|
|
fnr.node = node
|
|
fnr.stats = stats
|
|
return fnr
|
|
}
|
|
|
|
func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
|
|
// a worker has one chunker which is reused for each file (because it contains a rather large buffer)
|
|
chnker := chunker.New(nil, s.pol)
|
|
|
|
for {
|
|
var job saveFileJob
|
|
var ok bool
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case job, ok = <-jobs:
|
|
if !ok {
|
|
return
|
|
}
|
|
}
|
|
|
|
res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start)
|
|
if job.complete != nil {
|
|
job.complete(res.node, res.stats)
|
|
}
|
|
job.ch <- res
|
|
close(job.ch)
|
|
}
|
|
}
|