forked from TrueCloudLab/restic
archiver: Incrementally serialize tree nodes
That way it is not necessary to keep both the Nodes forming a Tree and the serialized JSON version in memory.
This commit is contained in:
parent
c206a101a3
commit
b817681a11
5 changed files with 101 additions and 17 deletions
|
@ -2,7 +2,6 @@ package archiver
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
@ -175,17 +174,13 @@ func (arch *Archiver) error(item string, err error) error {
|
||||||
|
|
||||||
// saveTree stores a tree in the repo. It checks the index and the known blobs
|
// saveTree stores a tree in the repo. It checks the index and the known blobs
|
||||||
// before saving anything.
|
// before saving anything.
|
||||||
func (arch *Archiver) saveTree(ctx context.Context, t *restic.Tree) (restic.ID, ItemStats, error) {
|
func (arch *Archiver) saveTree(ctx context.Context, t *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) {
|
||||||
var s ItemStats
|
var s ItemStats
|
||||||
buf, err := json.Marshal(t)
|
buf, err := t.Finalize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return restic.ID{}, s, errors.Wrap(err, "MarshalJSON")
|
return restic.ID{}, s, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// append a newline so that the data is always consistent (json.Encoder
|
|
||||||
// adds a newline after each object)
|
|
||||||
buf = append(buf, '\n')
|
|
||||||
|
|
||||||
b := &Buffer{Data: buf}
|
b := &Buffer{Data: buf}
|
||||||
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
|
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
|
||||||
|
|
||||||
|
@ -620,7 +615,11 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
id, nodeStats, err := arch.saveTree(ctx, subtree)
|
tb, err := restic.TreeToBuilder(subtree)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
id, nodeStats, err := arch.saveTree(ctx, tb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -834,7 +833,11 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps
|
||||||
return errors.New("snapshot is empty")
|
return errors.New("snapshot is empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
rootTreeID, stats, err = arch.saveTree(wgCtx, tree)
|
tb, err := restic.TreeToBuilder(tree)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rootTreeID, stats, err = arch.saveTree(wgCtx, tb)
|
||||||
arch.stopWorkers()
|
arch.stopWorkers()
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
|
|
||||||
// TreeSaver concurrently saves incoming trees to the repo.
|
// TreeSaver concurrently saves incoming trees to the repo.
|
||||||
type TreeSaver struct {
|
type TreeSaver struct {
|
||||||
saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error)
|
saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error)
|
||||||
errFn ErrorFunc
|
errFn ErrorFunc
|
||||||
|
|
||||||
ch chan<- saveTreeJob
|
ch chan<- saveTreeJob
|
||||||
|
@ -18,7 +18,7 @@ type TreeSaver struct {
|
||||||
|
|
||||||
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
// NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is
|
||||||
// started, it is stopped when ctx is cancelled.
|
// started, it is stopped when ctx is cancelled.
|
||||||
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
|
func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveTree func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver {
|
||||||
ch := make(chan saveTreeJob)
|
ch := make(chan saveTreeJob)
|
||||||
|
|
||||||
s := &TreeSaver{
|
s := &TreeSaver{
|
||||||
|
@ -78,7 +78,7 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I
|
||||||
// allow GC of nodes array once the loop is finished
|
// allow GC of nodes array once the loop is finished
|
||||||
job.nodes = nil
|
job.nodes = nil
|
||||||
|
|
||||||
tree := restic.NewTree(len(nodes))
|
builder := restic.NewTreeJSONBuilder()
|
||||||
|
|
||||||
for i, fn := range nodes {
|
for i, fn := range nodes {
|
||||||
// fn is a copy, so clear the original value explicitly
|
// fn is a copy, so clear the original value explicitly
|
||||||
|
@ -104,13 +104,13 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I
|
||||||
}
|
}
|
||||||
|
|
||||||
debug.Log("insert %v", fnr.node.Name)
|
debug.Log("insert %v", fnr.node.Name)
|
||||||
err := tree.Insert(fnr.node)
|
err := builder.AddNode(fnr.node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, stats, err
|
return nil, stats, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
id, treeStats, err := s.saveTree(ctx, tree)
|
id, treeStats, err := s.saveTree(ctx, builder)
|
||||||
stats.Add(treeStats)
|
stats.Add(treeStats)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, stats, err
|
return nil, stats, err
|
||||||
|
|
|
@ -18,7 +18,7 @@ func TestTreeSaver(t *testing.T) {
|
||||||
|
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) {
|
saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) {
|
||||||
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
|
return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,7 +73,7 @@ func TestTreeSaverError(t *testing.T) {
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
var num int32
|
var num int32
|
||||||
saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) {
|
saveFn := func(context.Context, *restic.TreeJSONBuilder) (restic.ID, ItemStats, error) {
|
||||||
val := atomic.AddInt32(&num, 1)
|
val := atomic.AddInt32(&num, 1)
|
||||||
if val == test.failAt {
|
if val == test.failAt {
|
||||||
t.Logf("sending error for request %v\n", test.failAt)
|
t.Logf("sending error for request %v\n", test.failAt)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package restic
|
package restic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -143,3 +144,52 @@ func SaveTree(ctx context.Context, r BlobSaver, t *Tree) (ID, error) {
|
||||||
id, _, _, err := r.SaveBlob(ctx, TreeBlob, buf, ID{}, false)
|
id, _, _, err := r.SaveBlob(ctx, TreeBlob, buf, ID{}, false)
|
||||||
return id, err
|
return id, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TreeJSONBuilder struct {
|
||||||
|
buf bytes.Buffer
|
||||||
|
lastName string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTreeJSONBuilder() *TreeJSONBuilder {
|
||||||
|
tb := &TreeJSONBuilder{}
|
||||||
|
_, _ = tb.buf.WriteString(`{"nodes":[`)
|
||||||
|
return tb
|
||||||
|
}
|
||||||
|
|
||||||
|
func (builder *TreeJSONBuilder) AddNode(node *Node) error {
|
||||||
|
if node.Name <= builder.lastName {
|
||||||
|
return errors.Errorf("nodes are not ordered got %q, last %q", node.Name, builder.lastName)
|
||||||
|
}
|
||||||
|
if builder.lastName != "" {
|
||||||
|
_ = builder.buf.WriteByte(',')
|
||||||
|
}
|
||||||
|
builder.lastName = node.Name
|
||||||
|
|
||||||
|
val, err := json.Marshal(node)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, _ = builder.buf.Write(val)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (builder *TreeJSONBuilder) Finalize() ([]byte, error) {
|
||||||
|
// append a newline so that the data is always consistent (json.Encoder
|
||||||
|
// adds a newline after each object)
|
||||||
|
_, _ = builder.buf.WriteString("]}\n")
|
||||||
|
buf := builder.buf.Bytes()
|
||||||
|
// drop reference to buffer
|
||||||
|
builder.buf = bytes.Buffer{}
|
||||||
|
return buf, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TreeToBuilder(t *Tree) (*TreeJSONBuilder, error) {
|
||||||
|
builder := NewTreeJSONBuilder()
|
||||||
|
for _, node := range t.Nodes {
|
||||||
|
err := builder.AddNode(node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return builder, nil
|
||||||
|
}
|
||||||
|
|
|
@ -119,6 +119,37 @@ func TestEmptyLoadTree(t *testing.T) {
|
||||||
tree, tree2)
|
tree, tree2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTreeEqualSerialization(t *testing.T) {
|
||||||
|
files := []string{"node.go", "tree.go", "tree_test.go"}
|
||||||
|
for i := 1; i <= len(files); i++ {
|
||||||
|
tree := restic.NewTree(i)
|
||||||
|
builder := restic.NewTreeJSONBuilder()
|
||||||
|
|
||||||
|
for _, fn := range files[:i] {
|
||||||
|
fi, err := os.Lstat(fn)
|
||||||
|
rtest.OK(t, err)
|
||||||
|
node, err := restic.NodeFromFileInfo(fn, fi)
|
||||||
|
rtest.OK(t, err)
|
||||||
|
|
||||||
|
rtest.OK(t, tree.Insert(node))
|
||||||
|
rtest.OK(t, builder.AddNode(node))
|
||||||
|
|
||||||
|
rtest.Assert(t, tree.Insert(node) != nil, "no error on duplicate node")
|
||||||
|
rtest.Assert(t, builder.AddNode(node) != nil, "no error on duplicate node")
|
||||||
|
}
|
||||||
|
|
||||||
|
treeBytes, err := json.Marshal(tree)
|
||||||
|
treeBytes = append(treeBytes, '\n')
|
||||||
|
rtest.OK(t, err)
|
||||||
|
|
||||||
|
stiBytes, err := builder.Finalize()
|
||||||
|
rtest.OK(t, err)
|
||||||
|
|
||||||
|
// compare serialization of an individual node and the SaveTreeIterator
|
||||||
|
rtest.Equals(t, treeBytes, stiBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkBuildTree(b *testing.B) {
|
func BenchmarkBuildTree(b *testing.B) {
|
||||||
const size = 100 // Directories of this size are not uncommon.
|
const size = 100 // Directories of this size are not uncommon.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue