Limit memory usage, add several sync.Pool

This commit is contained in:
Alexander Neumann 2014-11-23 16:48:00 +01:00
parent 575635753e
commit f95788ed90
14 changed files with 277 additions and 97 deletions

View file

@ -2,7 +2,6 @@ package khepri
import ( import (
"io" "io"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
@ -14,8 +13,8 @@ import (
) )
const ( const (
maxConcurrentFiles = 32 maxConcurrentFiles = 8
maxConcurrentBlobs = 32 maxConcurrentBlobs = 8
statTimeout = 20 * time.Millisecond statTimeout = 20 * time.Millisecond
) )
@ -159,12 +158,20 @@ func (arch *Archiver) SaveFile(node *Node) error {
// if the file is small enough, store it directly // if the file is small enough, store it directly
if node.Size < chunker.MinSize { if node.Size < chunker.MinSize {
buf, err := ioutil.ReadAll(file) // acquire token
token := <-arch.blobToken
defer func() {
arch.blobToken <- token
}()
buf := GetChunkBuf("blob single file")
defer FreeChunkBuf("blob single file", buf)
n, err := io.ReadFull(file, buf)
if err != nil { if err != nil {
return err return err
} }
blob, err := arch.ch.Save(backend.Data, buf) blob, err := arch.ch.Save(backend.Data, buf[:n])
if err != nil { if err != nil {
return err return err
} }
@ -176,14 +183,18 @@ func (arch *Archiver) SaveFile(node *Node) error {
// else store all chunks // else store all chunks
chnker := chunker.New(file) chnker := chunker.New(file)
chans := [](<-chan Blob){} chans := [](<-chan Blob){}
defer chnker.Free()
for { for {
chunk, err := chnker.Next() buf := GetChunkBuf("blob chunker")
chunk, err := chnker.Next(buf)
if err == io.EOF { if err == io.EOF {
FreeChunkBuf("blob chunker", buf)
break break
} }
if err != nil { if err != nil {
FreeChunkBuf("blob chunker", buf)
return err return err
} }
@ -198,6 +209,8 @@ func (arch *Archiver) SaveFile(node *Node) error {
panic(err) panic(err)
} }
FreeChunkBuf("blob chunker", buf)
arch.update(arch.SaveStats, Stats{Bytes: blob.Size}) arch.update(arch.SaveStats, Stats{Bytes: blob.Size})
arch.blobToken <- token arch.blobToken <- token
ch <- blob ch <- blob
@ -318,13 +331,13 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
node.Subtree = b.ID node.Subtree = b.ID
arch.update(arch.SaveStats, Stats{Directories: 1}) arch.update(arch.SaveStats, Stats{Directories: 1})
} else if node.Type == "file" && len(node.Content) == 0 { } else if node.Type == "file" && len(node.Content) == 0 {
// get token
token := <-arch.fileToken
// start goroutine // start goroutine
wg.Add(1) wg.Add(1)
go func(n *Node) { go func(n *Node) {
defer wg.Done() defer wg.Done()
// get token
token := <-arch.fileToken
defer func() { defer func() {
arch.fileToken <- token arch.fileToken <- token
}() }()

View file

@ -6,6 +6,7 @@ import (
"math/rand" "math/rand"
"testing" "testing"
"github.com/fd0/khepri"
"github.com/fd0/khepri/chunker" "github.com/fd0/khepri/chunker"
) )
@ -30,6 +31,7 @@ func BenchmarkChunkEncrypt(b *testing.B) {
be := setupBackend(b) be := setupBackend(b)
defer teardownBackend(b, be) defer teardownBackend(b, be)
key := setupKey(b, be, "geheim") key := setupKey(b, be, "geheim")
chunkBuf := make([]byte, chunker.MaxSize)
b.ResetTimer() b.ResetTimer()
b.SetBytes(int64(len(data))) b.SetBytes(int64(len(data)))
@ -38,7 +40,7 @@ func BenchmarkChunkEncrypt(b *testing.B) {
ch := chunker.New(bytes.NewReader(data)) ch := chunker.New(bytes.NewReader(data))
for { for {
chunk_data, err := ch.Next() chunk_data, err := ch.Next(chunkBuf)
if err == io.EOF { if err == io.EOF {
break break
@ -46,7 +48,8 @@ func BenchmarkChunkEncrypt(b *testing.B) {
ok(b, err) ok(b, err)
_, err = key.Encrypt(chunk_data.Data) buf := make([]byte, khepri.MaxCiphertextSize)
_, err = key.Encrypt(buf, chunk_data.Data)
ok(b, err) ok(b, err)
} }
} }

View file

@ -5,8 +5,11 @@ import (
"compress/zlib" "compress/zlib"
"crypto/sha256" "crypto/sha256"
"io/ioutil" "io/ioutil"
"sync"
) )
var idPool = sync.Pool{New: func() interface{} { return ID(make([]byte, IDSize)) }}
// Each lists all entries of type t in the backend and calls function f() with // Each lists all entries of type t in the backend and calls function f() with
// the id and data. // the id and data.
func Each(be Server, t Type, f func(id ID, data []byte, err error)) error { func Each(be Server, t Type, f func(id ID, data []byte, err error)) error {
@ -78,7 +81,7 @@ func Uncompress(data []byte) []byte {
// Hash returns the ID for data. // Hash returns the ID for data.
func Hash(data []byte) ID { func Hash(data []byte) ID {
h := sha256.Sum256(data) h := sha256.Sum256(data)
id := make(ID, 32) id := idPool.Get().(ID)
copy(id, h[:]) copy(id, h[:])
return id return id
} }

View file

@ -8,7 +8,8 @@ import (
"errors" "errors"
) )
const sha256_length = 32 // in bytes // IDSize contains the size of an ID, in bytes.
const IDSize = sha256.Size
// References content within a repository. // References content within a repository.
type ID []byte type ID []byte
@ -21,7 +22,7 @@ func ParseID(s string) (ID, error) {
return nil, err return nil, err
} }
if len(b) != sha256_length { if len(b) != IDSize {
return nil, errors.New("invalid length for sha256 hash") return nil, errors.New("invalid length for sha256 hash")
} }
@ -63,7 +64,7 @@ func (id *ID) UnmarshalJSON(b []byte) error {
return err return err
} }
*id = make([]byte, len(s)/2) *id = idPool.Get().(ID)
_, err = hex.Decode(*id, []byte(s)) _, err = hex.Decode(*id, []byte(s))
if err != nil { if err != nil {
return err return err
@ -74,11 +75,16 @@ func (id *ID) UnmarshalJSON(b []byte) error {
func IDFromData(d []byte) ID { func IDFromData(d []byte) ID {
hash := sha256.Sum256(d) hash := sha256.Sum256(d)
id := make([]byte, sha256_length) id := idPool.Get().(ID)
copy(id, hash[:]) copy(id, hash[:])
return id return id
} }
// Free returns the ID byte slice back to the allocation pool.
func (id ID) Free() {
idPool.Put(id)
}
type IDs []ID type IDs []ID
func (ids IDs) Len() int { func (ids IDs) Len() int {

View file

@ -11,14 +11,13 @@ import (
"time" "time"
"github.com/fd0/khepri" "github.com/fd0/khepri"
"github.com/fd0/khepri/backend"
) )
const backendIDSize = 8
var maxWorkers = flag.Uint("workers", 100, "number of workers to test BlobList concurrent access against") var maxWorkers = flag.Uint("workers", 100, "number of workers to test BlobList concurrent access against")
func randomID() []byte { func randomID() []byte {
buf := make([]byte, backendIDSize) buf := make([]byte, backend.IDSize)
_, err := io.ReadFull(rand.Reader, buf) _, err := io.ReadFull(rand.Reader, buf)
if err != nil { if err != nil {
panic(err) panic(err)
@ -69,7 +68,7 @@ func TestBlobList(t *testing.T) {
// Test JSON encode/decode // Test JSON encode/decode
func TestBlobListJSON(t *testing.T) { func TestBlobListJSON(t *testing.T) {
bl := khepri.NewBlobList() bl := khepri.NewBlobList()
b := khepri.Blob{ID: []byte{1, 2, 3, 4}} b := khepri.Blob{ID: randomID()}
bl.Insert(b) bl.Insert(b)
b2, err := bl.Find(b) b2, err := bl.Find(b)

View file

@ -30,6 +30,15 @@ var (
once sync.Once once sync.Once
mod_table [256]uint64 mod_table [256]uint64
out_table [256]uint64 out_table [256]uint64
chunkerPool = sync.Pool{
New: func() interface{} {
return &Chunker{
window: make([]byte, WindowSize),
buf: make([]byte, MaxSize),
}
},
}
) )
// A chunk is one content-dependent chunk of bytes whose end was cut when the // A chunk is one content-dependent chunk of bytes whose end was cut when the
@ -41,17 +50,8 @@ type Chunk struct {
Data []byte Data []byte
} }
// A chunker takes a stream of bytes and emits average size chunks.
type Chunker interface {
// Next returns the next chunk of data. If an error occurs while reading,
// the error is returned with a nil chunk. The state of the current chunk
// is undefined. When the last chunk has been returned, all subsequent
// calls yield a nil chunk and an io.EOF error.
Next() (*Chunk, error)
}
// A chunker internally holds everything needed to split content. // A chunker internally holds everything needed to split content.
type chunker struct { type Chunker struct {
rd io.Reader rd io.Reader
closed bool closed bool
@ -62,7 +62,6 @@ type chunker struct {
bpos int bpos int
bmax int bmax int
data []byte
start int start int
count int count int
pos int pos int
@ -71,16 +70,9 @@ type chunker struct {
} }
// New returns a new Chunker that reads from data from rd. // New returns a new Chunker that reads from data from rd.
func New(rd io.Reader) Chunker { func New(rd io.Reader) *Chunker {
c := &chunker{ c := chunkerPool.Get().(*Chunker)
rd: rd, c.rd = rd
window: make([]byte, WindowSize),
buf: make([]byte, MaxSize),
data: make([]byte, 0, MaxSize),
}
once.Do(c.fill_tables) once.Do(c.fill_tables)
c.reset() c.reset()
@ -88,7 +80,13 @@ func New(rd io.Reader) Chunker {
return c return c
} }
func (c *chunker) reset() { // Free returns this chunker to the allocation pool
func (c *Chunker) Free() {
c.rd = nil
chunkerPool.Put(c)
}
func (c *Chunker) reset() {
for i := 0; i < WindowSize; i++ { for i := 0; i < WindowSize; i++ {
c.window[i] = 0 c.window[i] = 0
} }
@ -97,11 +95,10 @@ func (c *chunker) reset() {
c.pos = 0 c.pos = 0
c.count = 0 c.count = 0
c.slide(1) c.slide(1)
c.data = make([]byte, 0, MaxSize)
} }
// Calculate out_table and mod_table for optimization. Must be called only once. // Calculate out_table and mod_table for optimization. Must be called only once.
func (c *chunker) fill_tables() { func (c *Chunker) fill_tables() {
// calculate table for sliding out bytes. The byte to slide out is used as // calculate table for sliding out bytes. The byte to slide out is used as
// the index for the table, the value contains the following: // the index for the table, the value contains the following:
// out_table[b] = Hash(b || 0 || ... || 0) // out_table[b] = Hash(b || 0 || ... || 0)
@ -137,7 +134,12 @@ func (c *chunker) fill_tables() {
} }
} }
func (c *chunker) Next() (*Chunk, error) { // Next returns the next chunk of data. If an error occurs while reading,
// the error is returned with a nil chunk. The state of the current chunk
// is undefined. When the last chunk has been returned, all subsequent
// calls yield a nil chunk and an io.EOF error.
func (c *Chunker) Next(dst []byte) (*Chunk, error) {
dst = dst[:0]
for { for {
if c.bpos >= c.bmax { if c.bpos >= c.bmax {
n, err := io.ReadFull(c.rd, c.buf) n, err := io.ReadFull(c.rd, c.buf)
@ -160,7 +162,7 @@ func (c *chunker) Next() (*Chunk, error) {
Start: c.start, Start: c.start,
Length: c.count, Length: c.count,
Cut: c.digest, Cut: c.digest,
Data: c.data, Data: dst,
}, nil }, nil
} }
} }
@ -188,7 +190,7 @@ func (c *chunker) Next() (*Chunk, error) {
c.digest ^= mod_table[index] c.digest ^= mod_table[index]
if (c.count+i+1 >= MinSize && (c.digest&splitmask) == 0) || c.count+i+1 >= MaxSize { if (c.count+i+1 >= MinSize && (c.digest&splitmask) == 0) || c.count+i+1 >= MaxSize {
c.data = append(c.data, c.buf[c.bpos:c.bpos+i+1]...) dst = append(dst, c.buf[c.bpos:c.bpos+i+1]...)
c.count += i + 1 c.count += i + 1
c.pos += i + 1 c.pos += i + 1
c.bpos += i + 1 c.bpos += i + 1
@ -197,7 +199,7 @@ func (c *chunker) Next() (*Chunk, error) {
Start: c.start, Start: c.start,
Length: c.count, Length: c.count,
Cut: c.digest, Cut: c.digest,
Data: c.data, Data: dst,
} }
// keep position // keep position
@ -212,7 +214,7 @@ func (c *chunker) Next() (*Chunk, error) {
steps := c.bmax - c.bpos steps := c.bmax - c.bpos
if steps > 0 { if steps > 0 {
c.data = append(c.data, c.buf[c.bpos:c.bpos+steps]...) dst = append(dst, c.buf[c.bpos:c.bpos+steps]...)
} }
c.count += steps c.count += steps
c.pos += steps c.pos += steps
@ -220,7 +222,7 @@ func (c *chunker) Next() (*Chunk, error) {
} }
} }
func (c *chunker) append(b byte) { func (c *Chunker) append(b byte) {
index := c.digest >> uint(pol_shift) index := c.digest >> uint(pol_shift)
c.digest <<= 8 c.digest <<= 8
c.digest |= uint64(b) c.digest |= uint64(b)
@ -228,7 +230,7 @@ func (c *chunker) append(b byte) {
c.digest ^= mod_table[index] c.digest ^= mod_table[index]
} }
func (c *chunker) slide(b byte) { func (c *Chunker) slide(b byte) {
out := c.window[c.wpos] out := c.window[c.wpos]
c.window[c.wpos] = b c.window[c.wpos] = b
c.digest ^= out_table[out] c.digest ^= out_table[out]

View file

@ -54,9 +54,10 @@ var chunks2 = []chunk{
chunk{chunker.MinSize, 0}, chunk{chunker.MinSize, 0},
} }
func test_with_data(t *testing.T, chunker chunker.Chunker, chunks []chunk) { func test_with_data(t *testing.T, chnker *chunker.Chunker, chunks []chunk) {
buf := make([]byte, chunker.MaxSize)
for i, chunk := range chunks { for i, chunk := range chunks {
c, err := chunker.Next() c, err := chnker.Next(buf)
if err != nil { if err != nil {
t.Fatalf("Error returned with chunk %d: %v", i, err) t.Fatalf("Error returned with chunk %d: %v", i, err)
@ -84,7 +85,7 @@ func test_with_data(t *testing.T, chunker chunker.Chunker, chunks []chunk) {
} }
} }
c, err := chunker.Next() c, err := chnker.Next(buf)
if c != nil { if c != nil {
t.Fatal("additional non-nil chunk returned") t.Fatal("additional non-nil chunk returned")
@ -115,20 +116,24 @@ func TestChunker(t *testing.T) {
buf := get_random(23, 32*1024*1024) buf := get_random(23, 32*1024*1024)
ch := chunker.New(bytes.NewReader(buf)) ch := chunker.New(bytes.NewReader(buf))
test_with_data(t, ch, chunks1) test_with_data(t, ch, chunks1)
ch.Free()
// setup nullbyte data source // setup nullbyte data source
buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize) buf = bytes.Repeat([]byte{0}, len(chunks2)*chunker.MinSize)
ch = chunker.New(bytes.NewReader(buf)) ch = chunker.New(bytes.NewReader(buf))
test_with_data(t, ch, chunks2) test_with_data(t, ch, chunks2)
ch.Free()
} }
func BenchmarkChunker(b *testing.B) { func BenchmarkChunker(b *testing.B) {
size := 10 * 1024 * 1024 size := 10 * 1024 * 1024
buf := get_random(23, size) buf := get_random(23, size)
dst := make([]byte, chunker.MaxSize)
b.ResetTimer() b.ResetTimer()
b.SetBytes(int64(size)) b.SetBytes(int64(size))
var chunks int var chunks int
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
chunks = 0 chunks = 0
@ -136,7 +141,7 @@ func BenchmarkChunker(b *testing.B) {
ch := chunker.New(bytes.NewReader(buf)) ch := chunker.New(bytes.NewReader(buf))
for { for {
_, err := ch.Next() _, err := ch.Next(dst)
if err == io.EOF { if err == io.EOF {
break break

View file

@ -135,6 +135,8 @@ func init() {
} }
func main() { func main() {
// defer profile.Start(profile.MemProfileRate(100000), profile.ProfilePath(".")).Stop()
log.SetOutput(os.Stdout) log.SetOutput(os.Stdout)
opts.Repo = os.Getenv("KHEPRI_REPOSITORY") opts.Repo = os.Getenv("KHEPRI_REPOSITORY")
@ -192,4 +194,6 @@ func main() {
if err != nil { if err != nil {
errx(1, "error executing command %q: %v", cmd, err) errx(1, "error executing command %q: %v", cmd, err)
} }
khepri.PoolAlloc()
} }

View file

@ -66,6 +66,7 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (Blob, error) {
// test if the hash is already in the backend // test if the hash is already in the backend
blob, err := ch.bl.Find(Blob{ID: id}) blob, err := ch.bl.Find(Blob{ID: id})
if err == nil { if err == nil {
id.Free()
return blob, nil return blob, nil
} }
@ -76,10 +77,13 @@ func (ch *ContentHandler) Save(t backend.Type, data []byte) (Blob, error) {
} }
// encrypt blob // encrypt blob
ciphertext, err := ch.key.Encrypt(data) ciphertext := GetChunkBuf("ch.Save()")
defer FreeChunkBuf("ch.Save()", ciphertext)
n, err := ch.key.Encrypt(ciphertext, data)
if err != nil { if err != nil {
return Blob{}, err return Blob{}, err
} }
ciphertext = ciphertext[:n]
// save blob // save blob
sid, err := ch.be.Create(t, ciphertext) sid, err := ch.be.Create(t, ciphertext)

72
key.go
View file

@ -15,10 +15,17 @@ import (
"time" "time"
"github.com/fd0/khepri/backend" "github.com/fd0/khepri/backend"
"github.com/fd0/khepri/chunker"
"golang.org/x/crypto/scrypt" "golang.org/x/crypto/scrypt"
) )
// max size is 8MiB, defined in chunker
const maxDataSize = chunker.MaxSize
const ivSize = aes.BlockSize
const hmacSize = sha256.Size
const MaxCiphertextSize = ivSize + maxDataSize + hmacSize
var ( var (
// ErrUnauthenticated is returned when ciphertext verification has failed. // ErrUnauthenticated is returned when ciphertext verification has failed.
ErrUnauthenticated = errors.New("ciphertext verification failed") ErrUnauthenticated = errors.New("ciphertext verification failed")
@ -108,7 +115,9 @@ func CreateKey(be backend.Server, password string) (*Key, error) {
return nil, err return nil, err
} }
k.Data, err = k.EncryptUser(buf) k.Data = GetChunkBuf("key")
n, err = k.EncryptUser(k.Data, buf)
k.Data = k.Data[:n]
// dump as json // dump as json
buf, err = json.Marshal(k) buf, err = json.Marshal(k)
@ -122,6 +131,8 @@ func CreateKey(be backend.Server, password string) (*Key, error) {
return nil, err return nil, err
} }
FreeChunkBuf("key", k.Data)
return k, nil return k, nil
} }
@ -229,20 +240,29 @@ func (k *Key) newKeys() (*keys, error) {
return ks, nil return ks, nil
} }
func (k *Key) newIV() ([]byte, error) { func (k *Key) newIV(buf []byte) error {
buf := make([]byte, aes.BlockSize) _, err := io.ReadFull(rand.Reader, buf[:ivSize])
_, err := io.ReadFull(rand.Reader, buf) buf = buf[:ivSize]
if err != nil { if err != nil {
return nil, err return err
} }
return buf, nil return nil
} }
// Encrypt encrypts and signs data. Returned is IV || Ciphertext || HMAC. For // Encrypt encrypts and signs data. Stored in ciphertext is IV || Ciphertext ||
// the hash function, SHA256 is used, so the overhead is 16+32=48 byte. // HMAC. Encrypt returns the ciphertext's length. For the hash function, SHA256
func (k *Key) encrypt(ks *keys, plaintext []byte) ([]byte, error) { // is used, so the overhead is 16+32=48 byte.
iv, err := k.newIV() func (k *Key) encrypt(ks *keys, ciphertext, plaintext []byte) (int, error) {
if cap(ciphertext) < MaxCiphertextSize {
panic("encryption buffer is too small")
}
if len(plaintext) > maxDataSize {
panic("plaintext is too large")
}
_, err := io.ReadFull(rand.Reader, ciphertext[:ivSize])
if err != nil { if err != nil {
panic(fmt.Sprintf("unable to generate new random iv: %v", err)) panic(fmt.Sprintf("unable to generate new random iv: %v", err))
} }
@ -252,11 +272,9 @@ func (k *Key) encrypt(ks *keys, plaintext []byte) ([]byte, error) {
panic(fmt.Sprintf("unable to create cipher: %v", err)) panic(fmt.Sprintf("unable to create cipher: %v", err))
} }
e := cipher.NewCTR(c, iv) e := cipher.NewCTR(c, ciphertext[:ivSize])
l := len(iv) e.XORKeyStream(ciphertext[ivSize:cap(ciphertext)], plaintext)
ciphertext := make([]byte, l+len(plaintext)) ciphertext = ciphertext[:ivSize+len(plaintext)]
copy(ciphertext[:l], iv)
e.XORKeyStream(ciphertext[l:], plaintext)
hm := hmac.New(sha256.New, ks.Sign) hm := hmac.New(sha256.New, ks.Sign)
@ -265,21 +283,23 @@ func (k *Key) encrypt(ks *keys, plaintext []byte) ([]byte, error) {
panic(fmt.Sprintf("unable to calculate hmac of ciphertext: %v", err)) panic(fmt.Sprintf("unable to calculate hmac of ciphertext: %v", err))
} }
return hm.Sum(ciphertext), nil ciphertext = hm.Sum(ciphertext)
return len(ciphertext), nil
} }
// EncryptUser encrypts and signs data with the user key. Returned is IV || // EncryptUser encrypts and signs data with the user key. Stored in ciphertext
// Ciphertext || HMAC. For the hash function, SHA256 is used, so the overhead // is IV || Ciphertext || HMAC. Returns the ciphertext length. For the hash
// is 16+32=48 byte. // function, SHA256 is used, so the overhead is 16+32=48 byte.
func (k *Key) EncryptUser(plaintext []byte) ([]byte, error) { func (k *Key) EncryptUser(ciphertext, plaintext []byte) (int, error) {
return k.encrypt(k.user, plaintext) return k.encrypt(k.user, ciphertext, plaintext)
} }
// Encrypt encrypts and signs data with the master key. Returned is IV || // Encrypt encrypts and signs data with the master key. Stored in ciphertext is
// Ciphertext || HMAC. For the hash function, SHA256 is used, so the overhead // IV || Ciphertext || HMAC. Returns the ciphertext length. For the hash
// is 16+32=48 byte. // function, SHA256 is used, so the overhead is 16+32=48 byte.
func (k *Key) Encrypt(plaintext []byte) ([]byte, error) { func (k *Key) Encrypt(ciphertext, plaintext []byte) (int, error) {
return k.encrypt(k.master, plaintext) return k.encrypt(k.master, ciphertext, plaintext)
} }
// Decrypt verifes and decrypts the ciphertext. Ciphertext must be in the form // Decrypt verifes and decrypts the ciphertext. Ciphertext must be in the form

View file

@ -48,10 +48,12 @@ func TestCrypto(t *testing.T) {
Sign: tv.skey, Sign: tv.skey,
} }
msg, err := r.encrypt(r.master, tv.plaintext) msg := make([]byte, MaxCiphertextSize)
n, err := r.encrypt(r.master, msg, tv.plaintext)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
msg = msg[:n]
// decrypt message // decrypt message
_, err = r.decrypt(r.master, msg) _, err = r.decrypt(r.master, msg)

View file

@ -51,7 +51,7 @@ func TestEncryptDecrypt(t *testing.T) {
defer teardownBackend(t, be) defer teardownBackend(t, be)
k := setupKey(t, be, testPassword) k := setupKey(t, be, testPassword)
for _, size := range []int{5, 23, 1 << 20, 10<<20 + 123} { for _, size := range []int{5, 23, 1 << 20, 7<<20 + 123} {
data := make([]byte, size) data := make([]byte, size)
f, err := os.Open("/dev/urandom") f, err := os.Open("/dev/urandom")
ok(t, err) ok(t, err)
@ -59,18 +59,21 @@ func TestEncryptDecrypt(t *testing.T) {
_, err = io.ReadFull(f, data) _, err = io.ReadFull(f, data)
ok(t, err) ok(t, err)
ciphertext, err := k.Encrypt(data) ciphertext := khepri.GetChunkBuf("TestEncryptDecrypt")
n, err := k.Encrypt(ciphertext, data)
ok(t, err) ok(t, err)
plaintext, err := k.Decrypt(ciphertext) plaintext, err := k.Decrypt(ciphertext[:n])
ok(t, err) ok(t, err)
khepri.FreeChunkBuf("TestEncryptDecrypt", ciphertext)
equals(t, plaintext, data) equals(t, plaintext, data)
} }
} }
func BenchmarkEncrypt(b *testing.B) { func BenchmarkEncrypt(b *testing.B) {
size := 16 << 20 // 16MiB size := 8 << 20 // 8MiB
data := make([]byte, size) data := make([]byte, size)
be := setupBackend(b) be := setupBackend(b)
@ -80,28 +83,32 @@ func BenchmarkEncrypt(b *testing.B) {
b.ResetTimer() b.ResetTimer()
b.SetBytes(int64(size)) b.SetBytes(int64(size))
buf := khepri.GetChunkBuf("BenchmarkEncrypt")
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := k.Encrypt(data) _, err := k.Encrypt(buf, data)
ok(b, err) ok(b, err)
} }
khepri.FreeChunkBuf("BenchmarkEncrypt", buf)
} }
func BenchmarkDecrypt(b *testing.B) { func BenchmarkDecrypt(b *testing.B) {
size := 16 << 20 // 16MiB size := 8 << 20 // 8MiB
data := make([]byte, size) data := make([]byte, size)
be := setupBackend(b) be := setupBackend(b)
defer teardownBackend(b, be) defer teardownBackend(b, be)
k := setupKey(b, be, testPassword) k := setupKey(b, be, testPassword)
ciphertext, err := k.Encrypt(data) ciphertext := khepri.GetChunkBuf("BenchmarkDecrypt")
n, err := k.Encrypt(ciphertext, data)
ok(b, err) ok(b, err)
b.ResetTimer() b.ResetTimer()
b.SetBytes(int64(size)) b.SetBytes(int64(size))
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := k.Decrypt(ciphertext) _, err := k.Decrypt(ciphertext[:n])
ok(b, err) ok(b, err)
} }
khepri.FreeChunkBuf("BenchmarkDecrypt", ciphertext)
} }

103
pools.go Normal file
View file

@ -0,0 +1,103 @@
package khepri
import "sync"
var (
chunkPool = sync.Pool{New: newChunkBuf}
nodePool = sync.Pool{New: newNode}
)
type alloc_stats struct {
m sync.Mutex
alloc_map map[string]int
free_map map[string]int
alloc int
free int
new int
all int
max int
}
var (
chunk_stats alloc_stats
node_stats alloc_stats
)
func init() {
chunk_stats.alloc_map = make(map[string]int)
chunk_stats.free_map = make(map[string]int)
}
func newChunkBuf() interface{} {
chunk_stats.m.Lock()
chunk_stats.new += 1
chunk_stats.m.Unlock()
// create buffer for iv, data and hmac
return make([]byte, MaxCiphertextSize)
}
func newNode() interface{} {
node_stats.m.Lock()
node_stats.new += 1
node_stats.m.Unlock()
// create buffer for iv, data and hmac
return new(Node)
}
func GetChunkBuf(s string) []byte {
chunk_stats.m.Lock()
if _, ok := chunk_stats.alloc_map[s]; !ok {
chunk_stats.alloc_map[s] = 0
}
chunk_stats.alloc_map[s] += 1
chunk_stats.all += 1
if chunk_stats.all > chunk_stats.max {
chunk_stats.max = chunk_stats.all
}
chunk_stats.m.Unlock()
return chunkPool.Get().([]byte)
}
func FreeChunkBuf(s string, buf []byte) {
chunk_stats.m.Lock()
if _, ok := chunk_stats.free_map[s]; !ok {
chunk_stats.free_map[s] = 0
}
chunk_stats.free_map[s] += 1
chunk_stats.all -= 1
chunk_stats.m.Unlock()
chunkPool.Put(buf)
}
func GetNode() *Node {
node_stats.m.Lock()
node_stats.alloc += 1
node_stats.all += 1
if node_stats.all > node_stats.max {
node_stats.max = node_stats.all
}
node_stats.m.Unlock()
return nodePool.Get().(*Node)
}
func FreeNode(n *Node) {
node_stats.m.Lock()
node_stats.all -= 1
node_stats.free += 1
node_stats.m.Unlock()
nodePool.Put(n)
}
func PoolAlloc() {
// fmt.Fprintf(os.Stderr, "alloc max: %d, new: %d\n", chunk_stats.max, chunk_stats.new)
// for k, v := range chunk_stats.alloc_map {
// fmt.Fprintf(os.Stderr, "alloc[%s] %d, free %d diff: %d\n", k, v, chunk_stats.free_map[k], v-chunk_stats.free_map[k])
// }
// fmt.Fprintf(os.Stderr, "nodes alloc max: %d, new: %d\n", node_stats.max, node_stats.new)
// fmt.Fprintf(os.Stderr, "alloc %d, free %d diff: %d\n", node_stats.alloc, node_stats.free, node_stats.alloc-node_stats.free)
}

21
tree.go
View file

@ -115,12 +115,11 @@ func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) {
} }
func NodeFromFileInfo(path string, fi os.FileInfo) (*Node, error) { func NodeFromFileInfo(path string, fi os.FileInfo) (*Node, error) {
node := &Node{ node := GetNode()
path: path, node.path = path
Name: fi.Name(), node.Name = fi.Name()
Mode: fi.Mode() & os.ModePerm, node.Mode = fi.Mode() & os.ModePerm
ModTime: fi.ModTime(), node.ModTime = fi.ModTime()
}
switch fi.Mode() & (os.ModeType | os.ModeCharDevice) { switch fi.Mode() & (os.ModeType | os.ModeCharDevice) {
case 0: case 0:
@ -265,3 +264,13 @@ func (node *Node) CreateAt(ch *ContentHandler, path string) error {
return nil return nil
} }
func (b Blob) Free() {
if b.ID != nil {
b.ID.Free()
}
if b.Storage != nil {
b.Storage.Free()
}
}