mpt: modify refcounted storage scheme to make GC possible

Add "active" flag into the node data and make the remainder modal, for active
nodes it's a reference counter, for inactive ones the deactivation height is
stored.

Technically, refcounted chains storing just one trie don't need a flag, but
it's a bit simpler this way.
This commit is contained in:
Roman Khimov 2022-01-28 15:05:13 +03:00
parent 86cb4ed80f
commit c4ee310e85
9 changed files with 84 additions and 62 deletions

View file

@ -44,7 +44,7 @@ import (
// Tuning parameters.
const (
headerBatchCount = 2000
version = "0.2.1"
version = "0.2.2"
defaultInitialGAS = 52000000_00000000
defaultMemPoolSize = 50000

View file

@ -56,7 +56,7 @@ func testIncompletePut(t *testing.T, ps pairs, n int, tr1, tr2 *Trie) {
require.Equal(t, tr1.StateRoot(), tr2.StateRoot())
t.Run("test restore", func(t *testing.T) {
tr2.Flush()
tr2.Flush(0)
tr3 := NewTrie(NewHashNode(tr2.StateRoot()), ModeAll, storage.NewMemCachedStore(tr2.Store))
for _, p := range ps[:n] {
val, err := tr3.Get(p[0])
@ -191,9 +191,9 @@ func TestTrie_PutBatchBranch(t *testing.T) {
t.Run("non-empty child is hash node", func(t *testing.T) {
tr1, tr2 := prepareBranch(t)
tr1.Flush()
tr1.Flush(0)
tr1.Collapse(1)
tr2.Flush()
tr2.Flush(0)
tr2.Collapse(1)
var ps = pairs{{[]byte{0x00, 2}, nil}}
@ -208,9 +208,9 @@ func TestTrie_PutBatchBranch(t *testing.T) {
require.NoError(t, tr1.Put([]byte{0x00}, []byte("value2")))
require.NoError(t, tr2.Put([]byte{0x00}, []byte("value2")))
tr1.Flush()
tr1.Flush(0)
tr1.Collapse(1)
tr2.Flush()
tr2.Flush(0)
tr2.Collapse(1)
var ps = pairs{{[]byte{0x00, 2}, nil}}
@ -254,8 +254,8 @@ func TestTrie_PutBatchHash(t *testing.T) {
require.NoError(t, tr2.Put([]byte{0x10}, []byte("value1")))
require.NoError(t, tr1.Put([]byte{0x20}, []byte("value2")))
require.NoError(t, tr2.Put([]byte{0x20}, []byte("value2")))
tr1.Flush()
tr2.Flush()
tr1.Flush(0)
tr2.Flush(0)
return tr1, tr2
}

View file

@ -191,7 +191,7 @@ func (b *Billet) incrementRefAndStore(h util.Uint256, bs []byte) {
}
cnt++
if len(data) == 0 {
data = append(bs, 0, 0, 0, 0)
data = append(bs, 1, 0, 0, 0, 0)
}
binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(cnt))
_ = b.Store.Put(key, data)
@ -338,7 +338,7 @@ func (b *Billet) GetFromStore(h util.Uint256) (Node, error) {
}
if b.mode.RC() {
data = data[:len(data)-4]
data = data[:len(data)-5]
}
n.Node.(flushedNode).setCache(data, h)
return n.Node, nil

View file

@ -1,7 +1,6 @@
package mpt
import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
@ -113,12 +112,12 @@ func TestCompatibility(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0xac, 0x00}, []byte{0xab, 0xcd},
[]byte{0xac, 0x10}, []byte{0xab, 0xcd})
tr.Flush()
tr.Flush(0)
tr2 := copyTrie(tr)
require.NoError(t, tr2.Delete([]byte{0xac, 0x00}))
tr2.Flush()
tr2.Flush(0)
require.NoError(t, tr2.Delete([]byte{0xac, 0x10}))
})
@ -150,7 +149,7 @@ func TestCompatibility(t *testing.T) {
require.NoError(t, tr.Delete([]byte{0xac, 0x01}))
tr.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd})
tr.Flush()
tr.Flush(0)
tr2 := NewTrie(NewHashNode(tr.root.Hash()), ModeAll, tr.Store)
tr2.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd})
@ -161,15 +160,15 @@ func TestCompatibility(t *testing.T) {
[]byte{0xac, 0x11}, []byte{0xac, 0x11},
[]byte{0xac, 0x22}, []byte{0xac, 0x22},
[]byte{0xac}, []byte{0xac})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 7)
require.NoError(t, tr.Delete([]byte{0xac, 0x11}))
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 5)
require.NoError(t, tr.Delete([]byte{0xac, 0x22}))
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 2)
})
@ -192,12 +191,11 @@ func TestCompatibility(t *testing.T) {
tr := NewTrie(NewHashNode(r.Hash()), ModeLatest, mainTrie.Store)
require.Equal(t, r.Hash(), tr.root.Hash())
// Tail bytes contain reference counter thus check for prefix.
proof := testGetProof(t, tr, []byte{0xac, 0x01}, 4)
require.True(t, bytes.HasPrefix(r.Bytes(), proof[0]))
require.True(t, bytes.HasPrefix(b.Bytes(), proof[1]))
require.True(t, bytes.HasPrefix(e1.Bytes(), proof[2]))
require.True(t, bytes.HasPrefix(v1.Bytes(), proof[3]))
require.Equal(t, r.Bytes(), proof[0])
require.Equal(t, b.Bytes(), proof[1])
require.Equal(t, e1.Bytes(), proof[2])
require.Equal(t, v1.Bytes(), proof[3])
testGetProof(t, tr, []byte{0xac}, 3)
testGetProof(t, tr, []byte{0xac, 0x10}, 0)
@ -242,11 +240,11 @@ func TestCompatibility(t *testing.T) {
[]byte{0xa1, 0x01}, []byte{0x01},
[]byte{0xa2, 0x01}, []byte{0x01},
[]byte{0xa3, 0x01}, []byte{0x01})
tr.Flush()
tr.Flush(0)
tr2 := copyTrie(tr)
require.NoError(t, tr2.Delete([]byte{0xa3, 0x01}))
tr2.Flush()
tr2.Flush(0)
tr3 := copyTrie(tr2)
require.NoError(t, tr3.Delete([]byte{0xa2, 0x01}))
@ -258,15 +256,15 @@ func TestCompatibility(t *testing.T) {
[]byte{0xa1, 0x01}, []byte{0x01},
[]byte{0xa2, 0x01}, []byte{0x01},
[]byte{0xa3, 0x01}, []byte{0x01})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 4)
require.NoError(t, tr.Delete([]byte{0xa3, 0x01}))
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 4)
require.NoError(t, tr.Delete([]byte{0xa2, 0x01}))
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 2)
tr.testHas(t, []byte{0xa1, 0x01}, []byte{0x01})
})
@ -275,17 +273,17 @@ func TestCompatibility(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0xa1}, []byte{0x01},
[]byte{0xa2}, []byte{0x02})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 4)
tr1 := copyTrie(tr)
require.NoError(t, tr1.Delete([]byte{0xa1}))
tr1.Flush()
tr1.Flush(0)
require.Equal(t, 2, len(tr1.Store.GetBatch().Put))
tr2 := copyTrie(tr1)
require.NoError(t, tr2.Delete([]byte{0xa2}))
tr2.Flush()
tr2.Flush(0)
require.Equal(t, 0, len(tr2.Store.GetBatch().Put))
})
@ -294,21 +292,21 @@ func TestCompatibility(t *testing.T) {
[]byte{0x10}, []byte{0x01},
[]byte{0x20}, []byte{0x02},
[]byte{0x30}, []byte{0x03})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 7)
tr1 := copyTrie(tr)
require.NoError(t, tr1.Delete([]byte{0x10}))
tr1.Flush()
tr1.Flush(0)
tr2 := copyTrie(tr1)
require.NoError(t, tr2.Delete([]byte{0x20}))
tr2.Flush()
tr2.Flush(0)
require.Equal(t, 2, len(tr2.Store.GetBatch().Put))
tr3 := copyTrie(tr2)
require.NoError(t, tr3.Delete([]byte{0x30}))
tr3.Flush()
tr3.Flush(0)
require.Equal(t, 0, len(tr3.Store.GetBatch().Put))
})
@ -316,12 +314,12 @@ func TestCompatibility(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0xa1}, []byte{0x01},
[]byte{0xa2}, []byte{0x02})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 4)
tr1 := copyTrie(tr)
require.NoError(t, tr1.Put([]byte{0xa3}, []byte{0x03}))
tr1.Flush()
tr1.Flush(0)
require.Equal(t, 5, len(tr1.Store.GetBatch().Put))
})
@ -329,19 +327,19 @@ func TestCompatibility(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0x10}, []byte{0x01},
[]byte{0x20}, []byte{0x02})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 5)
tr1 := copyTrie(tr)
require.NoError(t, tr1.Put([]byte{0x30}, []byte{0x03}))
tr1.Flush()
tr1.Flush(0)
checkBatchSize(t, tr1, 7)
})
t.Run("EmptyValueIssue633", func(t *testing.T) {
tr := newFilledTrie(t,
[]byte{0x01}, []byte{})
tr.Flush()
tr.Flush(0)
checkBatchSize(t, tr, 2)
proof := testGetProof(t, tr, []byte{0x01}, 2)
@ -411,7 +409,7 @@ func TestCompatibility_Find(t *testing.T) {
// root is an extension node with key=abc; next=branch
require.NoError(t, tr.Put([]byte("abc1"), []byte("01")))
require.NoError(t, tr.Put([]byte("abc3"), []byte("02")))
tr.Flush()
tr.Flush(0)
// find items with extension's key prefix
t.Run("from > start", func(t *testing.T) {
res, err := tr.Find([]byte("ab"), []byte("d2"), 100)

View file

@ -406,14 +406,14 @@ func makeStorageKey(mptKey util.Uint256) []byte {
// Because we care only about block-level changes, there is no need to put every
// new node to storage. Normally, flush should be called with every StateRoot persist, i.e.
// after every block.
func (t *Trie) Flush() {
func (t *Trie) Flush(index uint32) {
for h, node := range t.refcount {
if node.refcount != 0 {
if node.bytes == nil {
panic("item not in trie")
}
if t.mode.RC() {
node.initial = t.updateRefCount(h)
node.initial = t.updateRefCount(h, index)
if node.initial == 0 {
delete(t.refcount, h)
}
@ -427,8 +427,20 @@ func (t *Trie) Flush() {
}
}
func IsActiveValue(v []byte) bool {
return len(v) > 4 && v[len(v)-5] == 1
}
func getFromStore(key []byte, mode TrieMode, store *storage.MemCachedStore) ([]byte, error) {
data, err := store.Get(key)
if err == nil && mode.GC() && !IsActiveValue(data) {
return nil, storage.ErrKeyNotFound
}
return data, err
}
// updateRefCount should be called only when refcounting is enabled.
func (t *Trie) updateRefCount(h util.Uint256) int32 {
func (t *Trie) updateRefCount(h util.Uint256, index uint32) int32 {
if !t.mode.RC() {
panic("`updateRefCount` is called, but GC is disabled")
}
@ -439,13 +451,13 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 {
if cnt == 0 {
// A newly created item which may be in store.
var err error
data, err = t.Store.Get(key)
data, err = getFromStore(key, t.mode, t.Store)
if err == nil {
cnt = int32(binary.LittleEndian.Uint32(data[len(data)-4:]))
}
}
if len(data) == 0 {
data = append(node.bytes, 0, 0, 0, 0)
data = append(node.bytes, 1, 0, 0, 0, 0)
}
cnt += node.refcount
switch {
@ -453,7 +465,13 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 {
// BUG: negative reference count
panic(fmt.Sprintf("negative reference count: %s new %d, upd %d", h.StringBE(), cnt, t.refcount[h]))
case cnt == 0:
if !t.mode.GC() {
_ = t.Store.Delete(key)
} else {
data[len(data)-5] = 0
binary.LittleEndian.PutUint32(data[len(data)-4:], index)
_ = t.Store.Put(key, data)
}
default:
binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(cnt))
_ = t.Store.Put(key, data)
@ -492,7 +510,7 @@ func (t *Trie) removeRef(h util.Uint256, bs []byte) {
}
func (t *Trie) getFromStore(h util.Uint256) (Node, error) {
data, err := t.Store.Get(makeStorageKey(h))
data, err := getFromStore(makeStorageKey(h), t.mode, t.Store)
if err != nil {
return nil, err
}
@ -505,10 +523,11 @@ func (t *Trie) getFromStore(h util.Uint256) (Node, error) {
}
if t.mode.RC() {
data = data[:len(data)-4]
data = data[:len(data)-5]
node := t.refcount[h]
if node != nil {
node.bytes = data
_ = r.ReadB()
node.initial = int32(r.ReadU32LE())
}
}

View file

@ -48,28 +48,28 @@ func newTestTrie(t *testing.T) *Trie {
func testTrieRefcount(t *testing.T, key1, key2 []byte) {
tr := NewTrie(nil, ModeLatest, storage.NewMemCachedStore(storage.NewMemoryStore()))
require.NoError(t, tr.Put(key1, []byte{1}))
tr.Flush()
tr.Flush(0)
require.NoError(t, tr.Put(key2, []byte{1}))
tr.Flush()
tr.Flush(0)
tr.testHas(t, key1, []byte{1})
tr.testHas(t, key2, []byte{1})
// remove first, keep second
require.NoError(t, tr.Delete(key1))
tr.Flush()
tr.Flush(0)
tr.testHas(t, key1, nil)
tr.testHas(t, key2, []byte{1})
// no-op
require.NoError(t, tr.Put(key1, []byte{1}))
require.NoError(t, tr.Delete(key1))
tr.Flush()
tr.Flush(0)
tr.testHas(t, key1, nil)
tr.testHas(t, key2, []byte{1})
// delete non-existent, refcount should not be updated
require.NoError(t, tr.Delete(key1))
tr.Flush()
tr.Flush(0)
tr.testHas(t, key1, nil)
tr.testHas(t, key2, []byte{1})
}
@ -249,7 +249,7 @@ func (tr *Trie) putToStore(n Node) {
bytes: n.Bytes(),
refcount: 1,
}
tr.updateRefCount(n.Hash())
tr.updateRefCount(n.Hash(), 0)
} else {
_ = tr.Store.Put(makeStorageKey(n.Hash()), n.Bytes())
}
@ -318,7 +318,7 @@ func TestTrie_Flush(t *testing.T) {
require.NoError(t, tr.Put([]byte(k), v))
}
tr.Flush()
tr.Flush(0)
tr = NewTrie(NewHashNode(tr.StateRoot()), ModeAll, tr.Store)
for k, v := range pairs {
actual, err := tr.Get([]byte(k))

View file

@ -69,7 +69,8 @@ func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Lo
// GetState returns value at the specified key fom the MPT with the specified root.
func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) {
tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store))
// Allow accessing old values, it's RO thing.
tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store))
return tr.Get(key)
}
@ -79,13 +80,15 @@ func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) {
// item with key equals to prefix is included into result; if empty `start` specified,
// then item with key equals to prefix is not included into result.
func (s *Module) FindStates(root util.Uint256, prefix, start []byte, max int) ([]storage.KeyValue, error) {
tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store))
// Allow accessing old values, it's RO thing.
tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store))
return tr.Find(prefix, start, max)
}
// GetStateProof returns proof of having key in the MPT with the specified root.
func (s *Module) GetStateProof(root util.Uint256, key []byte) ([][]byte, error) {
tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store))
// Allow accessing old values, it's RO thing.
tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store))
return tr.GetProof(key)
}
@ -188,7 +191,7 @@ func (s *Module) AddMPTBatch(index uint32, b mpt.Batch, cache *storage.MemCached
if _, err := mpt.PutBatch(b); err != nil {
return nil, nil, err
}
mpt.Flush()
mpt.Flush(index)
sr := &state.MPTRoot{
Index: index,
Root: mpt.StateRoot(),

View file

@ -222,7 +222,8 @@ func (s *Module) defineSyncStage() error {
return fmt.Errorf("failed to get header to initialize MPT billet: %w", err)
}
var mode mpt.TrieMode
if s.bc.GetConfig().KeepOnlyLatestState {
// No need to enable GC here, it only has latest things.
if s.bc.GetConfig().KeepOnlyLatestState || s.bc.GetConfig().RemoveUntraceableBlocks {
mode |= mpt.ModeLatest
}
s.billet = mpt.NewBillet(header.PrevStateRoot, mode,
@ -499,7 +500,8 @@ func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeByt
defer s.lock.RUnlock()
var mode mpt.TrieMode
if s.bc.GetConfig().KeepOnlyLatestState {
// GC must be turned off here to allow access to the archived nodes.
if s.bc.GetConfig().KeepOnlyLatestState || s.bc.GetConfig().RemoveUntraceableBlocks {
mode |= mpt.ModeLatest
}
b := mpt.NewBillet(root, mode, 0, storage.NewMemCachedStore(s.dao.Store))

View file

@ -24,7 +24,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
require.NoError(t, tr.Put([]byte{0x06, 0x03}, []byte("leaf4")))
sr := tr.StateRoot()
tr.Flush()
tr.Flush(0)
// Keep MPT nodes in a map in order not to repeat them. We'll use `nodes` map to ask
// state sync module to restore the nodes.