restore: Chang fileInfo to use snapshot location instead of target path
* uses less memory as common prefix is only stored once * stepping stone for simpler error callback api, which will allow further memory footprint reduction Signed-off-by: Igor Fedorenko <igor@ifedorenko.com>
This commit is contained in:
parent
32d5ceba87
commit
9f3ca97ee8
6 changed files with 63 additions and 53 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/restic/restic/internal/crypto"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
|
@ -19,7 +20,6 @@ import (
|
|||
// TODO consider replacing pack file cache with blob cache
|
||||
// TODO avoid decrypting the same blob multiple times
|
||||
// TODO evaluate disabled debug logging overhead for large repositories
|
||||
// TODO consider logging snapshot-relative path to reduce log clutter
|
||||
|
||||
const (
|
||||
workerCount = 8
|
||||
|
@ -37,7 +37,7 @@ const (
|
|||
|
||||
// information about regular file being restored
|
||||
type fileInfo struct {
|
||||
path string // full path to the file on local filesystem
|
||||
location string // file on local filesystem relative to restorer basedir
|
||||
blobs []restic.ID // remaining blobs of the file
|
||||
}
|
||||
|
||||
|
@ -65,21 +65,27 @@ type fileRestorer struct {
|
|||
packCache *packCache // pack cache
|
||||
filesWriter *filesWriter // file write
|
||||
|
||||
dst string
|
||||
files []*fileInfo
|
||||
}
|
||||
|
||||
func newFileRestorer(packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, key *crypto.Key, idx filePackTraverser) *fileRestorer {
|
||||
func newFileRestorer(dst string, packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, key *crypto.Key, idx filePackTraverser) *fileRestorer {
|
||||
return &fileRestorer{
|
||||
packLoader: packLoader,
|
||||
key: key,
|
||||
idx: idx,
|
||||
filesWriter: newFilesWriter(filesWriterCount),
|
||||
packCache: newPackCache(packCacheCapacity),
|
||||
dst: dst,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *fileRestorer) addFile(path string, content restic.IDs) {
|
||||
r.files = append(r.files, &fileInfo{path: path, blobs: content})
|
||||
func (r *fileRestorer) addFile(location string, content restic.IDs) {
|
||||
r.files = append(r.files, &fileInfo{location: location, blobs: content})
|
||||
}
|
||||
|
||||
func (r *fileRestorer) targetPath(location string) string {
|
||||
return filepath.Join(r.dst, location)
|
||||
}
|
||||
|
||||
// used to pass information among workers (wish golang channels allowed multivalues)
|
||||
|
@ -90,7 +96,7 @@ type processingInfo struct {
|
|||
|
||||
func (r *fileRestorer) restoreFiles(ctx context.Context, onError func(path string, err error)) error {
|
||||
for _, file := range r.files {
|
||||
dbgmsg := file.path + ": "
|
||||
dbgmsg := file.location + ": "
|
||||
r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
|
||||
if packIdx > 0 {
|
||||
dbgmsg += ", "
|
||||
|
@ -110,12 +116,13 @@ func (r *fileRestorer) restoreFiles(ctx context.Context, onError func(path strin
|
|||
|
||||
// synchronously create empty files (empty files need no packs and are ignored by packQueue)
|
||||
for _, file := range r.files {
|
||||
fullpath := r.targetPath(file.location)
|
||||
if len(file.blobs) == 0 {
|
||||
wr, err := os.OpenFile(file.path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
|
||||
wr, err := os.OpenFile(fullpath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
|
||||
if err == nil {
|
||||
wr.Close()
|
||||
} else {
|
||||
onError(file.path, err)
|
||||
onError(file.location, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -172,9 +179,10 @@ func (r *fileRestorer) restoreFiles(ctx context.Context, onError func(path strin
|
|||
var success []*fileInfo
|
||||
var failure []*fileInfo
|
||||
for file, ferr := range ferrors {
|
||||
target := r.targetPath(file.location)
|
||||
if ferr != nil {
|
||||
onError(file.path, ferr)
|
||||
r.filesWriter.close(file)
|
||||
onError(file.location, ferr)
|
||||
r.filesWriter.close(target)
|
||||
delete(inprogress, file)
|
||||
failure = append(failure, file)
|
||||
} else {
|
||||
|
@ -183,7 +191,7 @@ func (r *fileRestorer) restoreFiles(ctx context.Context, onError func(path strin
|
|||
return false // only interesed in the first pack
|
||||
})
|
||||
if len(file.blobs) == 0 {
|
||||
r.filesWriter.close(file)
|
||||
r.filesWriter.close(target)
|
||||
delete(inprogress, file)
|
||||
}
|
||||
success = append(success, file)
|
||||
|
@ -281,12 +289,13 @@ func (r *fileRestorer) processPack(ctx context.Context, request processingInfo,
|
|||
defer rd.Close()
|
||||
|
||||
for file := range request.files {
|
||||
target := r.targetPath(file.location)
|
||||
r.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
|
||||
for _, blob := range packBlobs {
|
||||
debug.Log("Writing blob %s (%d bytes) from pack %s to %s", blob.ID.Str(), blob.Length, packID.Str(), file.path)
|
||||
debug.Log("Writing blob %s (%d bytes) from pack %s to %s", blob.ID.Str(), blob.Length, packID.Str(), file.location)
|
||||
buf, err := r.loadBlob(rd, blob)
|
||||
if err == nil {
|
||||
err = r.filesWriter.writeToFile(file, buf)
|
||||
err = r.filesWriter.writeToFile(target, buf)
|
||||
}
|
||||
if err != nil {
|
||||
request.files[file] = err
|
||||
|
|
|
@ -62,7 +62,7 @@ func (i *TestRepo) pack(queue *packQueue, name string) *packInfo {
|
|||
}
|
||||
|
||||
func (i *TestRepo) fileContent(file *fileInfo) string {
|
||||
return i.filesPathToContent[file.path]
|
||||
return i.filesPathToContent[file.location]
|
||||
}
|
||||
|
||||
func newTestRepo(content []TestFile) *TestRepo {
|
||||
|
@ -135,7 +135,7 @@ func newTestRepo(content []TestFile) *TestRepo {
|
|||
for _, blob := range file.blobs {
|
||||
content = append(content, restic.Hash([]byte(blob.data)))
|
||||
}
|
||||
files = append(files, &fileInfo{path: file.name, blobs: content})
|
||||
files = append(files, &fileInfo{location: file.name, blobs: content})
|
||||
}
|
||||
|
||||
repo := &TestRepo{
|
||||
|
@ -160,10 +160,10 @@ func newTestRepo(content []TestFile) *TestRepo {
|
|||
return repo
|
||||
}
|
||||
|
||||
func restoreAndVerify(t *testing.T, content []TestFile) {
|
||||
func restoreAndVerify(t *testing.T, tempdir string, content []TestFile) {
|
||||
repo := newTestRepo(content)
|
||||
|
||||
r := newFileRestorer(repo.loader, repo.key, repo.idx)
|
||||
r := newFileRestorer(tempdir, repo.loader, repo.key, repo.idx)
|
||||
r.files = repo.files
|
||||
|
||||
r.restoreFiles(context.TODO(), func(path string, err error) {
|
||||
|
@ -171,17 +171,18 @@ func restoreAndVerify(t *testing.T, content []TestFile) {
|
|||
})
|
||||
|
||||
for _, file := range repo.files {
|
||||
data, err := ioutil.ReadFile(file.path)
|
||||
target := r.targetPath(file.location)
|
||||
data, err := ioutil.ReadFile(target)
|
||||
if err != nil {
|
||||
t.Errorf("unable to read file %v: %v", file.path, err)
|
||||
t.Errorf("unable to read file %v: %v", file.location, err)
|
||||
continue
|
||||
}
|
||||
|
||||
rtest.Equals(t, false, r.filesWriter.writers.Contains(file.path))
|
||||
rtest.Equals(t, false, r.filesWriter.writers.Contains(target))
|
||||
|
||||
content := repo.fileContent(file)
|
||||
if !bytes.Equal(data, []byte(content)) {
|
||||
t.Errorf("file %v has wrong content: want %q, got %q", file.path, content, data)
|
||||
t.Errorf("file %v has wrong content: want %q, got %q", file.location, content, data)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -192,16 +193,16 @@ func TestFileRestorerBasic(t *testing.T) {
|
|||
tempdir, cleanup := rtest.TempDir(t)
|
||||
defer cleanup()
|
||||
|
||||
restoreAndVerify(t, []TestFile{
|
||||
restoreAndVerify(t, tempdir, []TestFile{
|
||||
TestFile{
|
||||
name: tempdir + "/file1",
|
||||
name: "file1",
|
||||
blobs: []TestBlob{
|
||||
TestBlob{"data1-1", "pack1-1"},
|
||||
TestBlob{"data1-2", "pack1-2"},
|
||||
},
|
||||
},
|
||||
TestFile{
|
||||
name: tempdir + "/file2",
|
||||
name: "file2",
|
||||
blobs: []TestBlob{
|
||||
TestBlob{"data2-1", "pack2-1"},
|
||||
TestBlob{"data2-2", "pack2-2"},
|
||||
|
@ -214,9 +215,9 @@ func TestFileRestorerEmptyFile(t *testing.T) {
|
|||
tempdir, cleanup := rtest.TempDir(t)
|
||||
defer cleanup()
|
||||
|
||||
restoreAndVerify(t, []TestFile{
|
||||
restoreAndVerify(t, tempdir, []TestFile{
|
||||
TestFile{
|
||||
name: tempdir + "/empty",
|
||||
name: "empty",
|
||||
blobs: []TestBlob{},
|
||||
},
|
||||
})
|
||||
|
|
|
@ -12,8 +12,8 @@ import (
|
|||
|
||||
type filesWriter struct {
|
||||
lock sync.Mutex // guards concurrent access
|
||||
inprogress map[*fileInfo]struct{} // (logically) opened file writers
|
||||
writers simplelru.LRUCache // key: *fileInfo, value: *os.File
|
||||
inprogress map[string]struct{} // (logically) opened file writers
|
||||
writers simplelru.LRUCache // key: string, value: *os.File
|
||||
}
|
||||
|
||||
func newFilesWriter(count int) *filesWriter {
|
||||
|
@ -21,30 +21,30 @@ func newFilesWriter(count int) *filesWriter {
|
|||
value.(*os.File).Close()
|
||||
debug.Log("Closed and purged cached writer for %v", key)
|
||||
})
|
||||
return &filesWriter{inprogress: make(map[*fileInfo]struct{}), writers: writers}
|
||||
return &filesWriter{inprogress: make(map[string]struct{}), writers: writers}
|
||||
}
|
||||
|
||||
func (w *filesWriter) writeToFile(file *fileInfo, buf []byte) error {
|
||||
func (w *filesWriter) writeToFile(path string, buf []byte) error {
|
||||
acquireWriter := func() (io.Writer, error) {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
if wr, ok := w.writers.Get(file); ok {
|
||||
debug.Log("Used cached writer for %s", file.path)
|
||||
if wr, ok := w.writers.Get(path); ok {
|
||||
debug.Log("Used cached writer for %s", path)
|
||||
return wr.(*os.File), nil
|
||||
}
|
||||
var flags int
|
||||
if _, append := w.inprogress[file]; append {
|
||||
if _, append := w.inprogress[path]; append {
|
||||
flags = os.O_APPEND | os.O_WRONLY
|
||||
} else {
|
||||
w.inprogress[file] = struct{}{}
|
||||
w.inprogress[path] = struct{}{}
|
||||
flags = os.O_CREATE | os.O_TRUNC | os.O_WRONLY
|
||||
}
|
||||
wr, err := os.OpenFile(file.path, flags, 0600)
|
||||
wr, err := os.OpenFile(path, flags, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w.writers.Add(file, wr)
|
||||
debug.Log("Opened and cached writer for %s", file.path)
|
||||
w.writers.Add(path, wr)
|
||||
debug.Log("Opened and cached writer for %s", path)
|
||||
return wr, nil
|
||||
}
|
||||
|
||||
|
@ -57,14 +57,14 @@ func (w *filesWriter) writeToFile(file *fileInfo, buf []byte) error {
|
|||
return err
|
||||
}
|
||||
if n != len(buf) {
|
||||
return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", file.path, len(buf), n)
|
||||
return errors.Errorf("error writing file %v: wrong length written, want %d, got %d", path, len(buf), n)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *filesWriter) close(file *fileInfo) {
|
||||
func (w *filesWriter) close(path string) {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
w.writers.Remove(file)
|
||||
delete(w.inprogress, file)
|
||||
w.writers.Remove(path)
|
||||
delete(w.inprogress, path)
|
||||
}
|
||||
|
|
|
@ -13,8 +13,8 @@ func TestFilesWriterBasic(t *testing.T) {
|
|||
|
||||
w := newFilesWriter(1)
|
||||
|
||||
f1 := &fileInfo{path: dir + "/f1"}
|
||||
f2 := &fileInfo{path: dir + "/f2"}
|
||||
f1 := dir + "/f1"
|
||||
f2 := dir + "/f2"
|
||||
|
||||
rtest.OK(t, w.writeToFile(f1, []byte{1}))
|
||||
rtest.Equals(t, 1, w.writers.Len())
|
||||
|
@ -34,11 +34,11 @@ func TestFilesWriterBasic(t *testing.T) {
|
|||
rtest.Equals(t, 0, w.writers.Len())
|
||||
rtest.Equals(t, 0, len(w.inprogress))
|
||||
|
||||
buf, err := ioutil.ReadFile(f1.path)
|
||||
buf, err := ioutil.ReadFile(f1)
|
||||
rtest.OK(t, err)
|
||||
rtest.Equals(t, []byte{1, 1}, buf)
|
||||
|
||||
buf, err = ioutil.ReadFile(f2.path)
|
||||
buf, err = ioutil.ReadFile(f2)
|
||||
rtest.OK(t, err)
|
||||
rtest.Equals(t, []byte{2, 2}, buf)
|
||||
}
|
||||
|
|
|
@ -112,7 +112,7 @@ func (h *packQueue) nextPack() (*packInfo, []*fileInfo) {
|
|||
var files []*fileInfo
|
||||
for file := range pack.files {
|
||||
h.idx.forEachFilePack(file, func(packIdx int, packID restic.ID, packBlobs []restic.Blob) bool {
|
||||
debug.Log("Pack #%d %s (%d blobs) used by %s", packIdx, packID.Str(), len(packBlobs), file.path)
|
||||
debug.Log("Pack #%d %s (%d blobs) used by %s", packIdx, packID.Str(), len(packBlobs), file.location)
|
||||
if pack.id == packID {
|
||||
files = append(files, file)
|
||||
}
|
||||
|
|
|
@ -192,7 +192,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
|
|||
|
||||
idx := restic.NewHardlinkIndex()
|
||||
|
||||
filerestorer := newFileRestorer(res.repo.Backend().Load, res.repo.Key(), filePackTraverser{lookup: res.repo.Index().Lookup})
|
||||
filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), filePackTraverser{lookup: res.repo.Index().Lookup})
|
||||
|
||||
// path->node map, only needed to call res.Error, which uses the node during tests
|
||||
nodes := make(map[string]*restic.Node)
|
||||
|
@ -221,11 +221,11 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
|
|||
if idx.Has(node.Inode, node.DeviceID) {
|
||||
return nil
|
||||
}
|
||||
idx.Add(node.Inode, node.DeviceID, target)
|
||||
idx.Add(node.Inode, node.DeviceID, location)
|
||||
}
|
||||
|
||||
nodes[target] = node
|
||||
filerestorer.addFile(target, node.Content)
|
||||
filerestorer.addFile(location, node.Content)
|
||||
|
||||
return nil
|
||||
},
|
||||
|
@ -244,8 +244,8 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error {
|
|||
return res.traverseTree(ctx, dst, string(filepath.Separator), *res.sn.Tree, treeVisitor{
|
||||
enterDir: noop,
|
||||
visitNode: func(node *restic.Node, target, location string) error {
|
||||
if idx.Has(node.Inode, node.DeviceID) && idx.GetFilename(node.Inode, node.DeviceID) != target {
|
||||
return res.restoreHardlinkAt(node, idx.GetFilename(node.Inode, node.DeviceID), target, location)
|
||||
if idx.Has(node.Inode, node.DeviceID) && idx.GetFilename(node.Inode, node.DeviceID) != location {
|
||||
return res.restoreHardlinkAt(node, filerestorer.targetPath(idx.GetFilename(node.Inode, node.DeviceID)), target, location)
|
||||
}
|
||||
|
||||
if node.Type != "file" {
|
||||
|
|
Loading…
Reference in a new issue