restic/internal/archiver/file_saver.go
greatroar 0db1d11b2e archiver: Remove cleanup goroutine from BufferPool
This isn't doing anything. Channels should get cleaned up by the GC when
the last reference to them disappears, just like all other data
structures. Also inlined BufferPool.Put in Buffer.Release, its only
caller.
2022-05-29 17:09:16 +02:00

242 lines
5.2 KiB
Go

package archiver
import (
"context"
"io"
"os"
"github.com/restic/chunker"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
tomb "gopkg.in/tomb.v2"
)
// FutureFile is returned by Save and will return the data once it
// has been processed.
type FutureFile struct {
ch <-chan saveFileResponse
res saveFileResponse
}
// Wait blocks until the result of the save operation is received or ctx is
// cancelled.
func (s *FutureFile) Wait(ctx context.Context) {
select {
case res, ok := <-s.ch:
if ok {
s.res = res
}
case <-ctx.Done():
return
}
}
// Node returns the node once it is available.
func (s *FutureFile) Node() *restic.Node {
return s.res.node
}
// Stats returns the stats for the file once they are available.
func (s *FutureFile) Stats() ItemStats {
return s.res.stats
}
// Err returns the error in case an error occurred.
func (s *FutureFile) Err() error {
return s.res.err
}
// SaveBlobFn saves a blob to a repo.
type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob
// FileSaver concurrently saves incoming files to the repo.
type FileSaver struct {
saveFilePool *BufferPool
saveBlob SaveBlobFn
pol chunker.Pol
ch chan<- saveFileJob
CompleteBlob func(filename string, bytes uint64)
NodeFromFileInfo func(filename string, fi os.FileInfo) (*restic.Node, error)
}
// NewFileSaver returns a new file saver. A worker pool with fileWorkers is
// started, it is stopped when ctx is cancelled.
func NewFileSaver(ctx context.Context, t *tomb.Tomb, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver {
ch := make(chan saveFileJob)
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
poolSize := fileWorkers + blobWorkers
s := &FileSaver{
saveBlob: save,
saveFilePool: NewBufferPool(int(poolSize), chunker.MaxSize),
pol: pol,
ch: ch,
CompleteBlob: func(string, uint64) {},
}
for i := uint(0); i < fileWorkers; i++ {
t.Go(func() error {
s.worker(t.Context(ctx), ch)
return nil
})
}
return s
}
// CompleteFunc is called when the file has been saved.
type CompleteFunc func(*restic.Node, ItemStats)
// Save stores the file f and returns the data once it has been completed. The
// file is closed by Save.
func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureFile {
ch := make(chan saveFileResponse, 1)
job := saveFileJob{
snPath: snPath,
file: file,
fi: fi,
start: start,
complete: complete,
ch: ch,
}
select {
case s.ch <- job:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
_ = file.Close()
close(ch)
return FutureFile{ch: ch}
}
return FutureFile{ch: ch}
}
type saveFileJob struct {
snPath string
file fs.File
fi os.FileInfo
ch chan<- saveFileResponse
complete CompleteFunc
start func()
}
type saveFileResponse struct {
node *restic.Node
stats ItemStats
err error
}
// saveFile stores the file f in the repo, then closes it.
func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, f fs.File, fi os.FileInfo, start func()) saveFileResponse {
start()
stats := ItemStats{}
debug.Log("%v", snPath)
node, err := s.NodeFromFileInfo(f.Name(), fi)
if err != nil {
_ = f.Close()
return saveFileResponse{err: err}
}
if node.Type != "file" {
_ = f.Close()
return saveFileResponse{err: errors.Errorf("node type %q is wrong", node.Type)}
}
// reuse the chunker
chnker.Reset(f, s.pol)
var results []FutureBlob
node.Content = []restic.ID{}
var size uint64
for {
buf := s.saveFilePool.Get()
chunk, err := chnker.Next(buf.Data)
if errors.Cause(err) == io.EOF {
buf.Release()
break
}
buf.Data = chunk.Data
size += uint64(chunk.Length)
if err != nil {
_ = f.Close()
return saveFileResponse{err: err}
}
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
_ = f.Close()
return saveFileResponse{err: ctx.Err()}
}
res := s.saveBlob(ctx, restic.DataBlob, buf)
results = append(results, res)
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
_ = f.Close()
return saveFileResponse{err: ctx.Err()}
}
s.CompleteBlob(f.Name(), uint64(len(chunk.Data)))
}
err = f.Close()
if err != nil {
return saveFileResponse{err: err}
}
for _, res := range results {
res.Wait(ctx)
if !res.Known() {
stats.DataBlobs++
stats.DataSize += uint64(res.Length())
}
node.Content = append(node.Content, res.ID())
}
node.Size = size
return saveFileResponse{
node: node,
stats: stats,
}
}
func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
// a worker has one chunker which is reused for each file (because it contains a rather large buffer)
chnker := chunker.New(nil, s.pol)
for {
var job saveFileJob
select {
case <-ctx.Done():
return
case job = <-jobs:
}
res := s.saveFile(ctx, chnker, job.snPath, job.file, job.fi, job.start)
if job.complete != nil {
job.complete(res.node, res.stats)
}
job.ch <- res
close(job.ch)
}
}