From c4ee310e85536aef0f099b1cede7c610e5a2b4e9 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 28 Jan 2022 15:05:13 +0300 Subject: [PATCH] mpt: modify refcounted storage scheme to make GC possible Add "active" flag into the node data and make the remainder modal, for active nodes it's a reference counter, for inactive ones the deactivation height is stored. Technically, refcounted chains storing just one trie don't need a flag, but it's a bit simpler this way. --- pkg/core/blockchain.go | 2 +- pkg/core/mpt/batch_test.go | 14 ++++---- pkg/core/mpt/billet.go | 4 +-- pkg/core/mpt/compat_test.go | 58 +++++++++++++++---------------- pkg/core/mpt/trie.go | 35 ++++++++++++++----- pkg/core/mpt/trie_test.go | 14 ++++---- pkg/core/stateroot/module.go | 11 +++--- pkg/core/statesync/module.go | 6 ++-- pkg/core/statesync/module_test.go | 2 +- 9 files changed, 84 insertions(+), 62 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 616bf4fdf..4e10d42d2 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -44,7 +44,7 @@ import ( // Tuning parameters. const ( headerBatchCount = 2000 - version = "0.2.1" + version = "0.2.2" defaultInitialGAS = 52000000_00000000 defaultMemPoolSize = 50000 diff --git a/pkg/core/mpt/batch_test.go b/pkg/core/mpt/batch_test.go index 8d1923823..ced30c061 100644 --- a/pkg/core/mpt/batch_test.go +++ b/pkg/core/mpt/batch_test.go @@ -56,7 +56,7 @@ func testIncompletePut(t *testing.T, ps pairs, n int, tr1, tr2 *Trie) { require.Equal(t, tr1.StateRoot(), tr2.StateRoot()) t.Run("test restore", func(t *testing.T) { - tr2.Flush() + tr2.Flush(0) tr3 := NewTrie(NewHashNode(tr2.StateRoot()), ModeAll, storage.NewMemCachedStore(tr2.Store)) for _, p := range ps[:n] { val, err := tr3.Get(p[0]) @@ -191,9 +191,9 @@ func TestTrie_PutBatchBranch(t *testing.T) { t.Run("non-empty child is hash node", func(t *testing.T) { tr1, tr2 := prepareBranch(t) - tr1.Flush() + tr1.Flush(0) tr1.Collapse(1) - tr2.Flush() + tr2.Flush(0) tr2.Collapse(1) var ps = pairs{{[]byte{0x00, 2}, nil}} @@ -208,9 +208,9 @@ func TestTrie_PutBatchBranch(t *testing.T) { require.NoError(t, tr1.Put([]byte{0x00}, []byte("value2"))) require.NoError(t, tr2.Put([]byte{0x00}, []byte("value2"))) - tr1.Flush() + tr1.Flush(0) tr1.Collapse(1) - tr2.Flush() + tr2.Flush(0) tr2.Collapse(1) var ps = pairs{{[]byte{0x00, 2}, nil}} @@ -254,8 +254,8 @@ func TestTrie_PutBatchHash(t *testing.T) { require.NoError(t, tr2.Put([]byte{0x10}, []byte("value1"))) require.NoError(t, tr1.Put([]byte{0x20}, []byte("value2"))) require.NoError(t, tr2.Put([]byte{0x20}, []byte("value2"))) - tr1.Flush() - tr2.Flush() + tr1.Flush(0) + tr2.Flush(0) return tr1, tr2 } diff --git a/pkg/core/mpt/billet.go b/pkg/core/mpt/billet.go index aa43a4734..d50645fa1 100644 --- a/pkg/core/mpt/billet.go +++ b/pkg/core/mpt/billet.go @@ -191,7 +191,7 @@ func (b *Billet) incrementRefAndStore(h util.Uint256, bs []byte) { } cnt++ if len(data) == 0 { - data = append(bs, 0, 0, 0, 0) + data = append(bs, 1, 0, 0, 0, 0) } binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(cnt)) _ = b.Store.Put(key, data) @@ -338,7 +338,7 @@ func (b *Billet) GetFromStore(h util.Uint256) (Node, error) { } if b.mode.RC() { - data = data[:len(data)-4] + data = data[:len(data)-5] } n.Node.(flushedNode).setCache(data, h) return n.Node, nil diff --git a/pkg/core/mpt/compat_test.go b/pkg/core/mpt/compat_test.go index 2890b6fbc..7ac16c83a 100644 --- a/pkg/core/mpt/compat_test.go +++ b/pkg/core/mpt/compat_test.go @@ -1,7 +1,6 @@ package mpt import ( - "bytes" "testing" "github.com/stretchr/testify/require" @@ -113,12 +112,12 @@ func TestCompatibility(t *testing.T) { tr := newFilledTrie(t, []byte{0xac, 0x00}, []byte{0xab, 0xcd}, []byte{0xac, 0x10}, []byte{0xab, 0xcd}) - tr.Flush() + tr.Flush(0) tr2 := copyTrie(tr) require.NoError(t, tr2.Delete([]byte{0xac, 0x00})) - tr2.Flush() + tr2.Flush(0) require.NoError(t, tr2.Delete([]byte{0xac, 0x10})) }) @@ -150,7 +149,7 @@ func TestCompatibility(t *testing.T) { require.NoError(t, tr.Delete([]byte{0xac, 0x01})) tr.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd}) - tr.Flush() + tr.Flush(0) tr2 := NewTrie(NewHashNode(tr.root.Hash()), ModeAll, tr.Store) tr2.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd}) @@ -161,15 +160,15 @@ func TestCompatibility(t *testing.T) { []byte{0xac, 0x11}, []byte{0xac, 0x11}, []byte{0xac, 0x22}, []byte{0xac, 0x22}, []byte{0xac}, []byte{0xac}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 7) require.NoError(t, tr.Delete([]byte{0xac, 0x11})) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 5) require.NoError(t, tr.Delete([]byte{0xac, 0x22})) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 2) }) @@ -192,12 +191,11 @@ func TestCompatibility(t *testing.T) { tr := NewTrie(NewHashNode(r.Hash()), ModeLatest, mainTrie.Store) require.Equal(t, r.Hash(), tr.root.Hash()) - // Tail bytes contain reference counter thus check for prefix. proof := testGetProof(t, tr, []byte{0xac, 0x01}, 4) - require.True(t, bytes.HasPrefix(r.Bytes(), proof[0])) - require.True(t, bytes.HasPrefix(b.Bytes(), proof[1])) - require.True(t, bytes.HasPrefix(e1.Bytes(), proof[2])) - require.True(t, bytes.HasPrefix(v1.Bytes(), proof[3])) + require.Equal(t, r.Bytes(), proof[0]) + require.Equal(t, b.Bytes(), proof[1]) + require.Equal(t, e1.Bytes(), proof[2]) + require.Equal(t, v1.Bytes(), proof[3]) testGetProof(t, tr, []byte{0xac}, 3) testGetProof(t, tr, []byte{0xac, 0x10}, 0) @@ -242,11 +240,11 @@ func TestCompatibility(t *testing.T) { []byte{0xa1, 0x01}, []byte{0x01}, []byte{0xa2, 0x01}, []byte{0x01}, []byte{0xa3, 0x01}, []byte{0x01}) - tr.Flush() + tr.Flush(0) tr2 := copyTrie(tr) require.NoError(t, tr2.Delete([]byte{0xa3, 0x01})) - tr2.Flush() + tr2.Flush(0) tr3 := copyTrie(tr2) require.NoError(t, tr3.Delete([]byte{0xa2, 0x01})) @@ -258,15 +256,15 @@ func TestCompatibility(t *testing.T) { []byte{0xa1, 0x01}, []byte{0x01}, []byte{0xa2, 0x01}, []byte{0x01}, []byte{0xa3, 0x01}, []byte{0x01}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 4) require.NoError(t, tr.Delete([]byte{0xa3, 0x01})) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 4) require.NoError(t, tr.Delete([]byte{0xa2, 0x01})) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 2) tr.testHas(t, []byte{0xa1, 0x01}, []byte{0x01}) }) @@ -275,17 +273,17 @@ func TestCompatibility(t *testing.T) { tr := newFilledTrie(t, []byte{0xa1}, []byte{0x01}, []byte{0xa2}, []byte{0x02}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 4) tr1 := copyTrie(tr) require.NoError(t, tr1.Delete([]byte{0xa1})) - tr1.Flush() + tr1.Flush(0) require.Equal(t, 2, len(tr1.Store.GetBatch().Put)) tr2 := copyTrie(tr1) require.NoError(t, tr2.Delete([]byte{0xa2})) - tr2.Flush() + tr2.Flush(0) require.Equal(t, 0, len(tr2.Store.GetBatch().Put)) }) @@ -294,21 +292,21 @@ func TestCompatibility(t *testing.T) { []byte{0x10}, []byte{0x01}, []byte{0x20}, []byte{0x02}, []byte{0x30}, []byte{0x03}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 7) tr1 := copyTrie(tr) require.NoError(t, tr1.Delete([]byte{0x10})) - tr1.Flush() + tr1.Flush(0) tr2 := copyTrie(tr1) require.NoError(t, tr2.Delete([]byte{0x20})) - tr2.Flush() + tr2.Flush(0) require.Equal(t, 2, len(tr2.Store.GetBatch().Put)) tr3 := copyTrie(tr2) require.NoError(t, tr3.Delete([]byte{0x30})) - tr3.Flush() + tr3.Flush(0) require.Equal(t, 0, len(tr3.Store.GetBatch().Put)) }) @@ -316,12 +314,12 @@ func TestCompatibility(t *testing.T) { tr := newFilledTrie(t, []byte{0xa1}, []byte{0x01}, []byte{0xa2}, []byte{0x02}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 4) tr1 := copyTrie(tr) require.NoError(t, tr1.Put([]byte{0xa3}, []byte{0x03})) - tr1.Flush() + tr1.Flush(0) require.Equal(t, 5, len(tr1.Store.GetBatch().Put)) }) @@ -329,19 +327,19 @@ func TestCompatibility(t *testing.T) { tr := newFilledTrie(t, []byte{0x10}, []byte{0x01}, []byte{0x20}, []byte{0x02}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 5) tr1 := copyTrie(tr) require.NoError(t, tr1.Put([]byte{0x30}, []byte{0x03})) - tr1.Flush() + tr1.Flush(0) checkBatchSize(t, tr1, 7) }) t.Run("EmptyValueIssue633", func(t *testing.T) { tr := newFilledTrie(t, []byte{0x01}, []byte{}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 2) proof := testGetProof(t, tr, []byte{0x01}, 2) @@ -411,7 +409,7 @@ func TestCompatibility_Find(t *testing.T) { // root is an extension node with key=abc; next=branch require.NoError(t, tr.Put([]byte("abc1"), []byte("01"))) require.NoError(t, tr.Put([]byte("abc3"), []byte("02"))) - tr.Flush() + tr.Flush(0) // find items with extension's key prefix t.Run("from > start", func(t *testing.T) { res, err := tr.Find([]byte("ab"), []byte("d2"), 100) diff --git a/pkg/core/mpt/trie.go b/pkg/core/mpt/trie.go index fc440d1a7..fef9e682f 100644 --- a/pkg/core/mpt/trie.go +++ b/pkg/core/mpt/trie.go @@ -406,14 +406,14 @@ func makeStorageKey(mptKey util.Uint256) []byte { // Because we care only about block-level changes, there is no need to put every // new node to storage. Normally, flush should be called with every StateRoot persist, i.e. // after every block. -func (t *Trie) Flush() { +func (t *Trie) Flush(index uint32) { for h, node := range t.refcount { if node.refcount != 0 { if node.bytes == nil { panic("item not in trie") } if t.mode.RC() { - node.initial = t.updateRefCount(h) + node.initial = t.updateRefCount(h, index) if node.initial == 0 { delete(t.refcount, h) } @@ -427,8 +427,20 @@ func (t *Trie) Flush() { } } +func IsActiveValue(v []byte) bool { + return len(v) > 4 && v[len(v)-5] == 1 +} + +func getFromStore(key []byte, mode TrieMode, store *storage.MemCachedStore) ([]byte, error) { + data, err := store.Get(key) + if err == nil && mode.GC() && !IsActiveValue(data) { + return nil, storage.ErrKeyNotFound + } + return data, err +} + // updateRefCount should be called only when refcounting is enabled. -func (t *Trie) updateRefCount(h util.Uint256) int32 { +func (t *Trie) updateRefCount(h util.Uint256, index uint32) int32 { if !t.mode.RC() { panic("`updateRefCount` is called, but GC is disabled") } @@ -439,13 +451,13 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 { if cnt == 0 { // A newly created item which may be in store. var err error - data, err = t.Store.Get(key) + data, err = getFromStore(key, t.mode, t.Store) if err == nil { cnt = int32(binary.LittleEndian.Uint32(data[len(data)-4:])) } } if len(data) == 0 { - data = append(node.bytes, 0, 0, 0, 0) + data = append(node.bytes, 1, 0, 0, 0, 0) } cnt += node.refcount switch { @@ -453,7 +465,13 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 { // BUG: negative reference count panic(fmt.Sprintf("negative reference count: %s new %d, upd %d", h.StringBE(), cnt, t.refcount[h])) case cnt == 0: - _ = t.Store.Delete(key) + if !t.mode.GC() { + _ = t.Store.Delete(key) + } else { + data[len(data)-5] = 0 + binary.LittleEndian.PutUint32(data[len(data)-4:], index) + _ = t.Store.Put(key, data) + } default: binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(cnt)) _ = t.Store.Put(key, data) @@ -492,7 +510,7 @@ func (t *Trie) removeRef(h util.Uint256, bs []byte) { } func (t *Trie) getFromStore(h util.Uint256) (Node, error) { - data, err := t.Store.Get(makeStorageKey(h)) + data, err := getFromStore(makeStorageKey(h), t.mode, t.Store) if err != nil { return nil, err } @@ -505,10 +523,11 @@ func (t *Trie) getFromStore(h util.Uint256) (Node, error) { } if t.mode.RC() { - data = data[:len(data)-4] + data = data[:len(data)-5] node := t.refcount[h] if node != nil { node.bytes = data + _ = r.ReadB() node.initial = int32(r.ReadU32LE()) } } diff --git a/pkg/core/mpt/trie_test.go b/pkg/core/mpt/trie_test.go index 2e2f7adcf..e7eda2d0d 100644 --- a/pkg/core/mpt/trie_test.go +++ b/pkg/core/mpt/trie_test.go @@ -48,28 +48,28 @@ func newTestTrie(t *testing.T) *Trie { func testTrieRefcount(t *testing.T, key1, key2 []byte) { tr := NewTrie(nil, ModeLatest, storage.NewMemCachedStore(storage.NewMemoryStore())) require.NoError(t, tr.Put(key1, []byte{1})) - tr.Flush() + tr.Flush(0) require.NoError(t, tr.Put(key2, []byte{1})) - tr.Flush() + tr.Flush(0) tr.testHas(t, key1, []byte{1}) tr.testHas(t, key2, []byte{1}) // remove first, keep second require.NoError(t, tr.Delete(key1)) - tr.Flush() + tr.Flush(0) tr.testHas(t, key1, nil) tr.testHas(t, key2, []byte{1}) // no-op require.NoError(t, tr.Put(key1, []byte{1})) require.NoError(t, tr.Delete(key1)) - tr.Flush() + tr.Flush(0) tr.testHas(t, key1, nil) tr.testHas(t, key2, []byte{1}) // delete non-existent, refcount should not be updated require.NoError(t, tr.Delete(key1)) - tr.Flush() + tr.Flush(0) tr.testHas(t, key1, nil) tr.testHas(t, key2, []byte{1}) } @@ -249,7 +249,7 @@ func (tr *Trie) putToStore(n Node) { bytes: n.Bytes(), refcount: 1, } - tr.updateRefCount(n.Hash()) + tr.updateRefCount(n.Hash(), 0) } else { _ = tr.Store.Put(makeStorageKey(n.Hash()), n.Bytes()) } @@ -318,7 +318,7 @@ func TestTrie_Flush(t *testing.T) { require.NoError(t, tr.Put([]byte(k), v)) } - tr.Flush() + tr.Flush(0) tr = NewTrie(NewHashNode(tr.StateRoot()), ModeAll, tr.Store) for k, v := range pairs { actual, err := tr.Get([]byte(k)) diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index 53cdda5b0..b3d6b1a63 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -69,7 +69,8 @@ func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Lo // GetState returns value at the specified key fom the MPT with the specified root. func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store)) + // Allow accessing old values, it's RO thing. + tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store)) return tr.Get(key) } @@ -79,13 +80,15 @@ func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) { // item with key equals to prefix is included into result; if empty `start` specified, // then item with key equals to prefix is not included into result. func (s *Module) FindStates(root util.Uint256, prefix, start []byte, max int) ([]storage.KeyValue, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store)) + // Allow accessing old values, it's RO thing. + tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store)) return tr.Find(prefix, start, max) } // GetStateProof returns proof of having key in the MPT with the specified root. func (s *Module) GetStateProof(root util.Uint256, key []byte) ([][]byte, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store)) + // Allow accessing old values, it's RO thing. + tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store)) return tr.GetProof(key) } @@ -188,7 +191,7 @@ func (s *Module) AddMPTBatch(index uint32, b mpt.Batch, cache *storage.MemCached if _, err := mpt.PutBatch(b); err != nil { return nil, nil, err } - mpt.Flush() + mpt.Flush(index) sr := &state.MPTRoot{ Index: index, Root: mpt.StateRoot(), diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 76ecc56e1..4157ae983 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -222,7 +222,8 @@ func (s *Module) defineSyncStage() error { return fmt.Errorf("failed to get header to initialize MPT billet: %w", err) } var mode mpt.TrieMode - if s.bc.GetConfig().KeepOnlyLatestState { + // No need to enable GC here, it only has latest things. + if s.bc.GetConfig().KeepOnlyLatestState || s.bc.GetConfig().RemoveUntraceableBlocks { mode |= mpt.ModeLatest } s.billet = mpt.NewBillet(header.PrevStateRoot, mode, @@ -499,7 +500,8 @@ func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeByt defer s.lock.RUnlock() var mode mpt.TrieMode - if s.bc.GetConfig().KeepOnlyLatestState { + // GC must be turned off here to allow access to the archived nodes. + if s.bc.GetConfig().KeepOnlyLatestState || s.bc.GetConfig().RemoveUntraceableBlocks { mode |= mpt.ModeLatest } b := mpt.NewBillet(root, mode, 0, storage.NewMemCachedStore(s.dao.Store)) diff --git a/pkg/core/statesync/module_test.go b/pkg/core/statesync/module_test.go index 69fc6614c..79e24a8ba 100644 --- a/pkg/core/statesync/module_test.go +++ b/pkg/core/statesync/module_test.go @@ -24,7 +24,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) { require.NoError(t, tr.Put([]byte{0x06, 0x03}, []byte("leaf4"))) sr := tr.StateRoot() - tr.Flush() + tr.Flush(0) // Keep MPT nodes in a map in order not to repeat them. We'll use `nodes` map to ask // state sync module to restore the nodes.