Restructure, adapt for max 16MB chunk size

This commit is contained in:
Alexander Neumann 2014-09-21 16:33:20 +02:00
parent f1f96c4206
commit f0287b2c9a
10 changed files with 42 additions and 271 deletions

View file

@ -1,8 +1,8 @@
package main package main
import ( import (
"crypto/sha256"
"encoding/json" "encoding/json"
"io/ioutil"
"log" "log"
"github.com/fd0/khepri" "github.com/fd0/khepri"
@ -16,17 +16,15 @@ func fsck_tree(repo *khepri.Repository, id khepri.ID) (bool, error) {
return false, err return false, err
} }
hr := khepri.NewHashingReader(rd, sha256.New) buf, err := ioutil.ReadAll(rd)
dec := json.NewDecoder(hr)
tree := &khepri.Tree{} tree := &khepri.Tree{}
err = dec.Decode(tree) err = json.Unmarshal(buf, tree)
if err != nil { if err != nil {
return false, err return false, err
} }
if !id.Equal(hr.Hash()) { if !id.Equal(khepri.IDFromData(buf)) {
return false, nil return false, nil
} }

View file

@ -1,84 +0,0 @@
package khepri
import (
"hash"
"io"
)
// HashingReader is the interfaces that wraps a normal reader. When Hash() is called,
// it returns the hash for all data that has been read so far.
type HashingReader interface {
io.Reader
Hash() []byte
}
// HashingWriter is the interfaces that wraps a normal writer. When Hash() is called,
// it returns the hash for all data that has been written so far.
type HashingWriter interface {
io.Writer
Hash() []byte
}
type reader struct {
reader io.Reader
hash hash.Hash
}
// NewHashingReader wraps an io.Reader and in addition feeds all data read through the
// given hash.
func NewHashingReader(r io.Reader, h func() hash.Hash) *reader {
return &reader{
reader: r,
hash: h(),
}
}
func (h *reader) Read(p []byte) (int, error) {
// call original reader
n, err := h.reader.Read(p)
// hash bytes
if n > 0 {
// hash
h.hash.Write(p[0:n])
}
// return result
return n, err
}
func (h *reader) Hash() []byte {
return h.hash.Sum([]byte{})
}
type writer struct {
writer io.Writer
hash hash.Hash
}
// NewHashingWriter wraps an io.Reader and in addition feeds all data written through
// the given hash.
func NewHashingWriter(w io.Writer, h func() hash.Hash) *writer {
return &writer{
writer: w,
hash: h(),
}
}
func (h *writer) Write(p []byte) (int, error) {
// call original writer
n, err := h.writer.Write(p)
// hash bytes
if n > 0 {
// hash
h.hash.Write(p[0:n])
}
// return result
return n, err
}
func (h *writer) Hash() []byte {
return h.hash.Sum([]byte{})
}

View file

@ -1,33 +0,0 @@
package khepri_test
import (
"bytes"
"crypto/md5"
"crypto/sha1"
"fmt"
"strings"
"github.com/fd0/khepri"
)
func ExampleReader() {
str := "foobar"
reader := khepri.NewHashingReader(strings.NewReader(str), md5.New)
buf := make([]byte, len(str))
reader.Read(buf)
fmt.Printf("hash for %q is %02x", str, reader.Hash())
// Output: hash for "foobar" is 3858f62230ac3c915f300c664312c63f
}
func ExampleWriter() {
str := "foobar"
var buf bytes.Buffer
writer := khepri.NewHashingWriter(&buf, sha1.New)
writer.Write([]byte(str))
fmt.Printf("hash for %q is %02x", str, writer.Hash())
// Output: hash for "foobar" is 8843d7f92416211de9ebb963ff4ce28125932878
}

View file

@ -1,47 +0,0 @@
package khepri_test
import (
"bytes"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"hash"
"io/ioutil"
"testing"
"github.com/fd0/khepri"
)
var static_tests = []struct {
hash func() hash.Hash
text string
digest string
}{
{md5.New, "foobar\n", "14758f1afd44c09b7992073ccf00b43d"},
// test data from http://www.nsrl.nist.gov/testdata/
{sha1.New, "abc", "a9993e364706816aba3e25717850c26c9cd0d89d"},
{sha1.New, "abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq", "84983e441c3bd26ebaae4aa1f95129e5e54670f1"},
}
func TestReader(t *testing.T) {
for _, test := range static_tests {
r := khepri.NewHashingReader(bytes.NewBuffer([]byte(test.text)), test.hash)
buf, err := ioutil.ReadAll(r)
ok(t, err)
equals(t, test.text, string(buf))
equals(t, hex.EncodeToString(r.Hash()), test.digest)
}
}
func TestWriter(t *testing.T) {
for _, test := range static_tests {
var buf bytes.Buffer
w := khepri.NewHashingWriter(&buf, test.hash)
_, err := w.Write([]byte(test.text))
ok(t, err)
equals(t, hex.EncodeToString(w.Hash()), test.digest)
}
}

8
id.go
View file

@ -2,6 +2,7 @@ package khepri
import ( import (
"bytes" "bytes"
"crypto/sha256"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
) )
@ -58,3 +59,10 @@ func (id *ID) UnmarshalJSON(b []byte) error {
return nil return nil
} }
func IDFromData(d []byte) ID {
hash := sha256.Sum256(d)
id := make([]byte, 32)
copy(id, hash[:])
return id
}

View file

@ -1,66 +1,27 @@
package khepri package khepri
import ( func (repo *Repository) Create(t Type, data []byte) (ID, error) {
"io" // TODO: make sure that tempfile is removed upon error
"os"
)
type createObject struct { // create tempfile in repository
repo *Repository
tpe Type
hw HashingWriter
file *os.File
ch chan ID
}
func (repo *Repository) Create(t Type) (io.WriteCloser, <-chan ID, error) {
obj := &createObject{
repo: repo,
tpe: t,
ch: make(chan ID, 1),
}
// save contents to tempfile in repository, hash while writing
var err error var err error
obj.file, err = obj.repo.tempFile() file, err := repo.tempFile()
if err != nil { if err != nil {
return nil, nil, err return nil, err
} }
// create hashing writer // write data to tempfile
obj.hw = NewHashingWriter(obj.file, obj.repo.hash) _, err = file.Write(data)
return obj, obj.ch, nil
}
func (obj *createObject) Write(data []byte) (int, error) {
if obj.hw == nil {
panic("createObject: already closed!")
}
return obj.hw.Write(data)
}
func (obj *createObject) Close() error {
if obj.hw == nil {
panic("createObject: already closed!")
}
obj.file.Close()
id := ID(obj.hw.Hash())
obj.ch <- id
// move file to final name using hash of contents
err := obj.repo.renameFile(obj.file, obj.tpe, id)
if err != nil { if err != nil {
return err return nil, err
} }
obj.hw = nil // close tempfile, return id
obj.file = nil id := IDFromData(data)
return nil err = repo.renameFile(file, t, id)
if err != nil {
return nil, err
}
return id, nil
} }

View file

@ -16,18 +16,12 @@ func TestObjects(t *testing.T) {
}() }()
for _, test := range TestStrings { for _, test := range TestStrings {
obj, ch, err := repo.Create(khepri.TYPE_BLOB) id, err := repo.Create(khepri.TYPE_BLOB, []byte(test.data))
ok(t, err) ok(t, err)
_, err = obj.Write([]byte(test.data)) id2, err := khepri.ParseID(test.id)
ok(t, err) ok(t, err)
err = obj.Close() equals(t, id2, id)
ok(t, err)
id, err := khepri.ParseID(test.id)
ok(t, err)
equals(t, id, <-ch)
} }
} }

View file

@ -75,16 +75,9 @@ func TestRepository(t *testing.T) {
// add files // add files
for _, test := range TestStrings { for _, test := range TestStrings {
// store string in repository // store string in repository
obj, id_ch, err := repo.Create(test.t) id, err := repo.Create(test.t, []byte(test.data))
ok(t, err) ok(t, err)
_, err = obj.Write([]byte(test.data))
ok(t, err)
err = obj.Close()
ok(t, err)
id := <-id_ch
equals(t, test.id, id.String()) equals(t, test.id, id.String())
// try to get it out again // try to get it out again

View file

@ -47,25 +47,17 @@ func (sn *Snapshot) Save(repo *Repository) (ID, error) {
panic("Snapshot.Save() called with nil tree id") panic("Snapshot.Save() called with nil tree id")
} }
obj, id_ch, err := repo.Create(TYPE_REF) data, err := json.Marshal(sn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
enc := json.NewEncoder(obj) id, err := repo.Create(TYPE_REF, data)
err = enc.Encode(sn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = obj.Close() return id, nil
if err != nil {
return nil, err
}
sn.id = <-id_ch
return sn.id, nil
} }
func LoadSnapshot(repo *Repository, id ID) (*Snapshot, error) { func LoadSnapshot(repo *Repository, id ID) (*Snapshot, error) {

25
tree.go
View file

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"os/user" "os/user"
"path/filepath" "path/filepath"
@ -48,18 +49,17 @@ func NewTree() *Tree {
} }
func store_chunk(repo *Repository, rd io.Reader) (ID, error) { func store_chunk(repo *Repository, rd io.Reader) (ID, error) {
wr, idch, err := repo.Create(TYPE_BLOB) data, err := ioutil.ReadAll(rd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
io.Copy(wr, rd) id, err := repo.Create(TYPE_BLOB, data)
err = wr.Close()
if err != nil { if err != nil {
return nil, err return nil, err
} }
return <-idch, nil return id, nil
} }
func NewTreeFromPath(repo *Repository, dir string) (*Tree, error) { func NewTreeFromPath(repo *Repository, dir string) (*Tree, error) {
@ -152,27 +152,17 @@ func (tree *Tree) Save(repo *Repository) (ID, error) {
} }
} }
buf, err := json.Marshal(tree) data, err := json.Marshal(tree)
if err != nil { if err != nil {
return nil, err return nil, err
} }
wr, idch, err := repo.Create(TYPE_BLOB) id, err := repo.Create(TYPE_BLOB, data)
if err != nil { if err != nil {
return nil, err return nil, err
} }
_, err = wr.Write(buf) return id, nil
if err != nil {
return nil, err
}
err = wr.Close()
if err != nil {
return nil, err
}
return <-idch, nil
} }
func NewTreeFromRepo(repo *Repository, id ID) (*Tree, error) { func NewTreeFromRepo(repo *Repository, id ID) (*Tree, error) {
@ -208,7 +198,6 @@ func NewTreeFromRepo(repo *Repository, id ID) (*Tree, error) {
func (tree *Tree) CreateAt(path string) error { func (tree *Tree) CreateAt(path string) error {
for _, node := range tree.Nodes { for _, node := range tree.Nodes {
nodepath := filepath.Join(path, node.Name) nodepath := filepath.Join(path, node.Name)
// fmt.Printf("%s:%s\n", node.Type, nodepath)
if node.Type == "dir" { if node.Type == "dir" {
err := os.Mkdir(nodepath, 0700) err := os.Mkdir(nodepath, 0700)