From 2bc493a839c0e04272bdcb2df254432db7dda2c0 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 27 Jan 2022 14:25:11 +0300 Subject: [PATCH 1/8] mpt: simplify makeStorageKey() interface --- pkg/core/mpt/batch_test.go | 2 +- pkg/core/mpt/billet.go | 4 ++-- pkg/core/mpt/billet_test.go | 2 +- pkg/core/mpt/proof.go | 2 +- pkg/core/mpt/trie.go | 10 +++++----- pkg/core/mpt/trie_test.go | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/core/mpt/batch_test.go b/pkg/core/mpt/batch_test.go index 6d763b61e..3df6f7bc0 100644 --- a/pkg/core/mpt/batch_test.go +++ b/pkg/core/mpt/batch_test.go @@ -274,7 +274,7 @@ func TestTrie_PutBatchHash(t *testing.T) { } tr1.Collapse(1) tr2.Collapse(1) - key := makeStorageKey(tr1.root.(*BranchNode).Children[2].Hash().BytesBE()) + key := makeStorageKey(tr1.root.(*BranchNode).Children[2].Hash()) require.NoError(t, tr1.Store.Delete(key)) require.NoError(t, tr2.Store.Delete(key)) testIncompletePut(t, ps, 1, tr1, tr2) diff --git a/pkg/core/mpt/billet.go b/pkg/core/mpt/billet.go index 39c4770fe..1840e04a5 100644 --- a/pkg/core/mpt/billet.go +++ b/pkg/core/mpt/billet.go @@ -177,7 +177,7 @@ func (b *Billet) putIntoHash(curr *HashNode, path []byte, val Node) (Node, error } func (b *Billet) incrementRefAndStore(h util.Uint256, bs []byte) { - key := makeStorageKey(h.BytesBE()) + key := makeStorageKey(h) if b.refcountEnabled { var ( err error @@ -325,7 +325,7 @@ func (b *Billet) tryCollapseBranch(curr *BranchNode) Node { // GetFromStore returns MPT node from the storage. func (b *Billet) GetFromStore(h util.Uint256) (Node, error) { - data, err := b.Store.Get(makeStorageKey(h.BytesBE())) + data, err := b.Store.Get(makeStorageKey(h)) if err != nil { return nil, err } diff --git a/pkg/core/mpt/billet_test.go b/pkg/core/mpt/billet_test.go index 4893d0d41..be24f9a83 100644 --- a/pkg/core/mpt/billet_test.go +++ b/pkg/core/mpt/billet_test.go @@ -16,7 +16,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { _ = expectedRoot.Hash() _ = tr.root.Hash() require.Equal(t, expectedRoot, tr.root) - expectedBytes, err := tr.Store.Get(makeStorageKey(expectedNode.Hash().BytesBE())) + expectedBytes, err := tr.Store.Get(makeStorageKey(expectedNode.Hash())) if expectedRefCount != 0 { require.NoError(t, err) require.Equal(t, expectedRefCount, binary.LittleEndian.Uint32(expectedBytes[len(expectedBytes)-4:])) diff --git a/pkg/core/mpt/proof.go b/pkg/core/mpt/proof.go index cc8b5876d..a769550ca 100644 --- a/pkg/core/mpt/proof.go +++ b/pkg/core/mpt/proof.go @@ -70,7 +70,7 @@ func VerifyProof(rh util.Uint256, key []byte, proofs [][]byte) ([]byte, bool) { for i := range proofs { h := hash.DoubleSha256(proofs[i]) // no errors in Put to memory store - _ = tr.Store.Put(makeStorageKey(h[:]), proofs[i]) + _ = tr.Store.Put(makeStorageKey(h), proofs[i]) } _, leaf, _, err := tr.getWithPath(tr.root, path, true) if err != nil { diff --git a/pkg/core/mpt/trie.go b/pkg/core/mpt/trie.go index da466096d..7bc1c603e 100644 --- a/pkg/core/mpt/trie.go +++ b/pkg/core/mpt/trie.go @@ -372,8 +372,8 @@ func (t *Trie) StateRoot() util.Uint256 { return t.root.Hash() } -func makeStorageKey(mptKey []byte) []byte { - return append([]byte{byte(storage.DataMPT)}, mptKey...) +func makeStorageKey(mptKey util.Uint256) []byte { + return append([]byte{byte(storage.DataMPT)}, mptKey[:]...) } // Flush puts every node in the trie except Hash ones to the storage. @@ -392,7 +392,7 @@ func (t *Trie) Flush() { delete(t.refcount, h) } } else if node.refcount > 0 { - _ = t.Store.Put(makeStorageKey(h.BytesBE()), node.bytes) + _ = t.Store.Put(makeStorageKey(h), node.bytes) } node.refcount = 0 } else { @@ -407,7 +407,7 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 { panic("`updateRefCount` is called, but GC is disabled") } var data []byte - key := makeStorageKey(h.BytesBE()) + key := makeStorageKey(h) node := t.refcount[h] cnt := node.initial if cnt == 0 { @@ -466,7 +466,7 @@ func (t *Trie) removeRef(h util.Uint256, bs []byte) { } func (t *Trie) getFromStore(h util.Uint256) (Node, error) { - data, err := t.Store.Get(makeStorageKey(h.BytesBE())) + data, err := t.Store.Get(makeStorageKey(h)) if err != nil { return nil, err } diff --git a/pkg/core/mpt/trie_test.go b/pkg/core/mpt/trie_test.go index 81d7c9fe3..465d7964b 100644 --- a/pkg/core/mpt/trie_test.go +++ b/pkg/core/mpt/trie_test.go @@ -251,7 +251,7 @@ func (tr *Trie) putToStore(n Node) { } tr.updateRefCount(n.Hash()) } else { - _ = tr.Store.Put(makeStorageKey(n.Hash().BytesBE()), n.Bytes()) + _ = tr.Store.Put(makeStorageKey(n.Hash()), n.Bytes()) } } From 33072925972cd8ed6edabc55d21b77c78d3af758 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 29 Jan 2022 11:54:25 +0300 Subject: [PATCH 2/8] storage: rework MemoryStore with a single map Doesn't affect any benchmarks or tests, but makes things a bit simpler. --- pkg/core/storage/boltdb_store.go | 16 ++++---- pkg/core/storage/leveldb_store.go | 13 +++---- pkg/core/storage/memcached_store.go | 49 ++++++++---------------- pkg/core/storage/memcached_store_test.go | 4 +- pkg/core/storage/memory_store.go | 24 +++--------- pkg/core/storage/store.go | 2 +- 6 files changed, 36 insertions(+), 72 deletions(-) diff --git a/pkg/core/storage/boltdb_store.go b/pkg/core/storage/boltdb_store.go index 83f2f3144..8f60aa5ee 100644 --- a/pkg/core/storage/boltdb_store.go +++ b/pkg/core/storage/boltdb_store.go @@ -85,21 +85,21 @@ func (s *BoltDBStore) Delete(key []byte) error { // PutBatch implements the Store interface. func (s *BoltDBStore) PutBatch(batch Batch) error { memBatch := batch.(*MemoryBatch) - return s.PutChangeSet(memBatch.mem, memBatch.del) + return s.PutChangeSet(memBatch.mem) } // PutChangeSet implements the Store interface. -func (s *BoltDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { +func (s *BoltDBStore) PutChangeSet(puts map[string][]byte) error { + var err error + return s.db.Batch(func(tx *bbolt.Tx) error { b := tx.Bucket(Bucket) for k, v := range puts { - err := b.Put([]byte(k), v) - if err != nil { - return err + if v != nil { + err = b.Put([]byte(k), v) + } else { + err = b.Delete([]byte(k)) } - } - for k := range dels { - err := b.Delete([]byte(k)) if err != nil { return err } diff --git a/pkg/core/storage/leveldb_store.go b/pkg/core/storage/leveldb_store.go index 410b93e82..6d5439711 100644 --- a/pkg/core/storage/leveldb_store.go +++ b/pkg/core/storage/leveldb_store.go @@ -62,20 +62,17 @@ func (s *LevelDBStore) PutBatch(batch Batch) error { } // PutChangeSet implements the Store interface. -func (s *LevelDBStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { +func (s *LevelDBStore) PutChangeSet(puts map[string][]byte) error { tx, err := s.db.OpenTransaction() if err != nil { return err } for k := range puts { - err = tx.Put([]byte(k), puts[k], nil) - if err != nil { - tx.Discard() - return err + if puts[k] != nil { + err = tx.Put([]byte(k), puts[k], nil) + } else { + err = tx.Delete([]byte(k), nil) } - } - for k := range dels { - err = tx.Delete([]byte(k), nil) if err != nil { tx.Discard() return err diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index a6e5278a1..aed7b9c83 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -55,13 +55,12 @@ func NewMemCachedStore(lower Store) *MemCachedStore { func (s *MemCachedStore) Get(key []byte) ([]byte, error) { s.mut.RLock() defer s.mut.RUnlock() - k := string(key) - if val, ok := s.mem[k]; ok { + if val, ok := s.mem[string(key)]; ok { + if val == nil { + return nil, ErrKeyNotFound + } return val, nil } - if _, ok := s.del[k]; ok { - return nil, ErrKeyNotFound - } return s.ps.Get(key) } @@ -73,19 +72,16 @@ func (s *MemCachedStore) GetBatch() *MemBatch { var b MemBatch b.Put = make([]KeyValueExists, 0, len(s.mem)) + b.Deleted = make([]KeyValueExists, 0) for k, v := range s.mem { key := []byte(k) _, err := s.ps.Get(key) - b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil}) + if v == nil { + b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil}) + } else { + b.Put = append(b.Put, KeyValueExists{KeyValue: KeyValue{Key: key, Value: v}, Exists: err == nil}) + } } - - b.Deleted = make([]KeyValueExists, 0, len(s.del)) - for k := range s.del { - key := []byte(k) - _, err := s.ps.Get(key) - b.Deleted = append(b.Deleted, KeyValueExists{KeyValue: KeyValue{Key: key}, Exists: err == nil}) - } - return &b } @@ -141,16 +137,7 @@ func (s *MemCachedStore) seek(ctx context.Context, rng SeekRange, cutPrefix bool Key: []byte(k), Value: v, }, - Exists: true, - }) - } - } - for k := range s.MemoryStore.del { - if isKeyOK(k) { - memRes = append(memRes, KeyValueExists{ - KeyValue: KeyValue{ - Key: []byte(k), - }, + Exists: v != nil, }) } } @@ -265,15 +252,14 @@ func (s *MemCachedStore) PersistSync() (int, error) { func (s *MemCachedStore) persist(isSync bool) (int, error) { var err error - var keys, dkeys int + var keys int s.plock.Lock() defer s.plock.Unlock() s.mut.Lock() keys = len(s.mem) - dkeys = len(s.del) - if keys == 0 && dkeys == 0 { + if keys == 0 { s.mut.Unlock() return 0, nil } @@ -282,15 +268,14 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { // starts using fresh new maps. This tempstore is only known here and // nothing ever changes it, therefore accesses to it (reads) can go // unprotected while writes are handled by s proper. - var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem, del: s.del}, ps: s.ps} + var tempstore = &MemCachedStore{MemoryStore: MemoryStore{mem: s.mem}, ps: s.ps} s.ps = tempstore s.mem = make(map[string][]byte, len(s.mem)) - s.del = make(map[string]bool, len(s.del)) if !isSync { s.mut.Unlock() } - err = tempstore.ps.PutChangeSet(tempstore.mem, tempstore.del) + err = tempstore.ps.PutChangeSet(tempstore.mem) if !isSync { s.mut.Lock() @@ -306,12 +291,8 @@ func (s *MemCachedStore) persist(isSync bool) (int, error) { for k := range s.mem { tempstore.put(k, s.mem[k]) } - for k := range s.del { - tempstore.drop(k) - } s.ps = tempstore.ps s.mem = tempstore.mem - s.del = tempstore.del } s.mut.Unlock() return keys, err diff --git a/pkg/core/storage/memcached_store_test.go b/pkg/core/storage/memcached_store_test.go index d46af8370..100942d7e 100644 --- a/pkg/core/storage/memcached_store_test.go +++ b/pkg/core/storage/memcached_store_test.go @@ -74,7 +74,7 @@ func testMemCachedStorePersist(t *testing.T, ps Store) { c, err = ts.Persist() checkBatch(t, ts, nil, nil) assert.Equal(t, nil, err) - assert.Equal(t, 0, c) + assert.Equal(t, 1, c) v, err = ps.Get([]byte("key")) assert.Equal(t, ErrKeyNotFound, err) assert.Equal(t, []byte(nil), v) @@ -287,7 +287,7 @@ func (b *BadStore) Put(k, v []byte) error { func (b *BadStore) PutBatch(Batch) error { return nil } -func (b *BadStore) PutChangeSet(_ map[string][]byte, _ map[string]bool) error { +func (b *BadStore) PutChangeSet(_ map[string][]byte) error { b.onPutBatch() return ErrKeyNotFound } diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 506f76224..1a5589aa6 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -14,8 +14,6 @@ import ( type MemoryStore struct { mut sync.RWMutex mem map[string][]byte - // A map, not a slice, to avoid duplicates. - del map[string]bool } // MemoryBatch is an in-memory batch compatible with MemoryStore. @@ -37,7 +35,6 @@ func (b *MemoryBatch) Delete(k []byte) { func NewMemoryStore() *MemoryStore { return &MemoryStore{ mem: make(map[string][]byte), - del: make(map[string]bool), } } @@ -45,7 +42,7 @@ func NewMemoryStore() *MemoryStore { func (s *MemoryStore) Get(key []byte) ([]byte, error) { s.mut.RLock() defer s.mut.RUnlock() - if val, ok := s.mem[string(key)]; ok { + if val, ok := s.mem[string(key)]; ok && val != nil { return val, nil } return nil, ErrKeyNotFound @@ -55,7 +52,6 @@ func (s *MemoryStore) Get(key []byte) ([]byte, error) { // with mutex locked. func (s *MemoryStore) put(key string, value []byte) { s.mem[key] = value - delete(s.del, key) } // Put implements the Store interface. Never returns an error. @@ -71,8 +67,7 @@ func (s *MemoryStore) Put(key, value []byte) error { // drop deletes a key-value pair from the store, it's supposed to be called // with mutex locked. func (s *MemoryStore) drop(key string) { - s.del[key] = true - delete(s.mem, key) + s.mem[key] = nil } // Delete implements Store interface. Never returns an error. @@ -87,18 +82,15 @@ func (s *MemoryStore) Delete(key []byte) error { // PutBatch implements the Store interface. Never returns an error. func (s *MemoryStore) PutBatch(batch Batch) error { b := batch.(*MemoryBatch) - return s.PutChangeSet(b.mem, b.del) + return s.PutChangeSet(b.mem) } // PutChangeSet implements the Store interface. Never returns an error. -func (s *MemoryStore) PutChangeSet(puts map[string][]byte, dels map[string]bool) error { +func (s *MemoryStore) PutChangeSet(puts map[string][]byte) error { s.mut.Lock() for k := range puts { s.put(k, puts[k]) } - for k := range dels { - s.drop(k) - } s.mut.Unlock() return nil } @@ -120,11 +112,6 @@ func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { f([]byte(k), v) } } - for k := range s.del { - if strings.HasPrefix(k, sk) { - f([]byte(k), nil) - } - } } // seek is an internal unlocked implementation of Seek. `start` denotes whether @@ -151,7 +138,7 @@ func (s *MemoryStore) seek(rng SeekRange, f func(k, v []byte) bool) { } for k, v := range s.mem { - if isKeyOK(k) { + if v != nil && isKeyOK(k) { memList = append(memList, KeyValue{ Key: []byte(k), Value: v, @@ -182,7 +169,6 @@ func newMemoryBatch() *MemoryBatch { // error. func (s *MemoryStore) Close() error { s.mut.Lock() - s.del = nil s.mem = nil s.mut.Unlock() return nil diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 8409e7a1f..ff1997234 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -88,7 +88,7 @@ type ( Put(k, v []byte) error PutBatch(Batch) error // PutChangeSet allows to push prepared changeset to the Store. - PutChangeSet(puts map[string][]byte, dels map[string]bool) error + PutChangeSet(puts map[string][]byte) error // Seek can guarantee that provided key (k) and value (v) are the only valid until the next call to f. // Seek continues iteration until false is returned from f. // Key and value slices should not be modified. From de99c3acdbb11b8decd5a0c5c55379ed17b9b3b5 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 5 Feb 2022 10:53:45 +0300 Subject: [PATCH 3/8] storage: provide a way to escape from SeekAsync goroutine A routine blocked on channel send here can't really exit, so avoid goroutine leak: goroutine 2813725 [chan send, 6 minutes]: github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).SeekAsync.func1.1(0xc01a7118f7, 0x2, 0x25, 0xc01a7118f9, 0x23, 0x23, 0xc0366c7c01) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:120 +0x86 github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).seek.func2(0xc0079e7920, 0xa, 0x30, 0xc0079e792a, 0x26, 0x26, 0x1) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:183 +0x347 github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).seek(0xc000458480, 0x135c028, 0xc0000445d0, 0xc00f1721d0, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, ...) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:224 +0x4f4 github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).Seek(0xc000458480, 0xc00f1721d0, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, 0xc0357c6620) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:110 +0x8a github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).seek(0xc0331a4f00, 0x135bff0, 0xc00ae26ec0, 0xc00f1721d0, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, ...) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:210 +0x379 github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).SeekAsync.func1(0xc0331a4f00, 0x135bff0, 0xc00ae26ec0, 0xc00f1721d0, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, ...) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:119 +0xc5 created by github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).SeekAsync github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:118 +0xc8 goroutine 2822823 [chan send, 6 minutes]: github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).SeekAsync.func1.1(0xc011859b77, 0x3, 0x3, 0xc017bea8d0, 0x26, 0x26, 0xc00f1afc00) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:120 +0x86 github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).seek.func2(0xc011859b60, 0xa, 0xa, 0xc017bea8a0, 0x26, 0x26, 0xc00ad9fb00) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:200 +0x47e github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).seek.func2(0xc01d5d8c90, 0xa, 0x30, 0xc01d5d8c9a, 0x26, 0x26, 0x1) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:200 +0x47e github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).seek(0xc035e12900, 0x135c028, 0xc0000445d0, 0xc01773bf60, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, ...) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:224 +0x4f4 github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).Seek(0xc035e12900, 0xc01773bf60, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, 0xc030c9e0e0) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:110 +0x8a github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).seek(0xc000458480, 0x135c028, 0xc0000445d0, 0xc01773bf60, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, ...) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:210 +0x379 github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).Seek(0xc000458480, 0xc01773bf60, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, 0xc030c9e070) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:110 +0x8a github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).seek(0xc00b340c60, 0x135bff0, 0xc00f1afbc0, 0xc01773bf60, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, ...) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:210 +0x379 github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).SeekAsync.func1(0xc00b340c60, 0x135bff0, 0xc00f1afbc0, 0xc01773bf60, 0x7, 0x7, 0x0, 0x0, 0x0, 0x0, ...) github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:119 +0xc5 created by github.com/nspcc-dev/neo-go/pkg/core/storage.(*MemCachedStore).SeekAsync github.com/nspcc-dev/neo-go/pkg/core/storage/memcached_store.go:118 +0xc8 ... --- pkg/core/storage/memcached_store.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/core/storage/memcached_store.go b/pkg/core/storage/memcached_store.go index aed7b9c83..019f25133 100644 --- a/pkg/core/storage/memcached_store.go +++ b/pkg/core/storage/memcached_store.go @@ -97,11 +97,12 @@ func (s *MemCachedStore) SeekAsync(ctx context.Context, rng SeekRange, cutPrefix res := make(chan KeyValue) go func() { s.seek(ctx, rng, cutPrefix, func(k, v []byte) bool { - res <- KeyValue{ - Key: k, - Value: v, + select { + case <-ctx.Done(): + return false + case res <- KeyValue{Key: k, Value: v}: + return true } - return true // always continue, we have context for early stop. }) close(res) }() From 86cb4ed80f61bfac57b57cdd1e029c85baf4d3da Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 28 Jan 2022 11:56:33 +0300 Subject: [PATCH 4/8] mpt: add the notion of MPT mode It directly affects the storage format, so it's important. ModeGC is not used at the moment, but defined for future extensibility. --- pkg/core/blockchain.go | 6 ++-- pkg/core/mpt/batch_test.go | 36 +++++++++---------- pkg/core/mpt/billet.go | 12 +++---- pkg/core/mpt/billet_test.go | 16 ++++----- pkg/core/mpt/compat_test.go | 16 ++++----- pkg/core/mpt/node_test.go | 2 +- pkg/core/mpt/proof.go | 2 +- pkg/core/mpt/proof_test.go | 2 +- pkg/core/mpt/trie.go | 46 ++++++++++++++++++------ pkg/core/mpt/trie_test.go | 60 ++++++++++++++++--------------- pkg/core/stateroot/module.go | 22 +++++++----- pkg/core/statesync/module.go | 12 +++++-- pkg/core/statesync/module_test.go | 4 +-- 13 files changed, 140 insertions(+), 96 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 7518b4c96..616bf4fdf 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -320,7 +320,7 @@ func (bc *Blockchain) init() error { if err != nil { return err } - if err := bc.stateRoot.Init(0, bc.config.KeepOnlyLatestState); err != nil { + if err := bc.stateRoot.Init(0); err != nil { return fmt.Errorf("can't init MPT: %w", err) } return bc.storeBlock(genesisBlock, nil) @@ -426,7 +426,7 @@ func (bc *Blockchain) init() error { } bc.blockHeight = bHeight bc.persistedHeight = bHeight - if err = bc.stateRoot.Init(bHeight, bc.config.KeepOnlyLatestState); err != nil { + if err = bc.stateRoot.Init(bHeight); err != nil { return fmt.Errorf("can't init MPT at height %d: %w", bHeight, err) } @@ -599,7 +599,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error if err = bc.stateRoot.JumpToState(&state.MPTRoot{ Index: p, Root: block.PrevStateRoot, - }, bc.config.KeepOnlyLatestState); err != nil { + }); err != nil { return fmt.Errorf("can't perform MPT jump to height %d: %w", p, err) } diff --git a/pkg/core/mpt/batch_test.go b/pkg/core/mpt/batch_test.go index 3df6f7bc0..8d1923823 100644 --- a/pkg/core/mpt/batch_test.go +++ b/pkg/core/mpt/batch_test.go @@ -57,7 +57,7 @@ func testIncompletePut(t *testing.T, ps pairs, n int, tr1, tr2 *Trie) { t.Run("test restore", func(t *testing.T) { tr2.Flush() - tr3 := NewTrie(NewHashNode(tr2.StateRoot()), false, storage.NewMemCachedStore(tr2.Store)) + tr3 := NewTrie(NewHashNode(tr2.StateRoot()), ModeAll, storage.NewMemCachedStore(tr2.Store)) for _, p := range ps[:n] { val, err := tr3.Get(p[0]) if p[1] == nil { @@ -76,8 +76,8 @@ func testPut(t *testing.T, ps pairs, tr1, tr2 *Trie) { func TestTrie_PutBatchLeaf(t *testing.T) { prepareLeaf := func(t *testing.T) (*Trie, *Trie) { - tr1 := NewTrie(EmptyNode{}, false, newTestStore()) - tr2 := NewTrie(EmptyNode{}, false, newTestStore()) + tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) + tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) require.NoError(t, tr1.Put([]byte{0}, []byte("value"))) require.NoError(t, tr2.Put([]byte{0}, []byte("value"))) return tr1, tr2 @@ -118,8 +118,8 @@ func TestTrie_PutBatchLeaf(t *testing.T) { func TestTrie_PutBatchExtension(t *testing.T) { prepareExtension := func(t *testing.T) (*Trie, *Trie) { - tr1 := NewTrie(EmptyNode{}, false, newTestStore()) - tr2 := NewTrie(EmptyNode{}, false, newTestStore()) + tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) + tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) require.NoError(t, tr1.Put([]byte{1, 2}, []byte("value1"))) require.NoError(t, tr2.Put([]byte{1, 2}, []byte("value1"))) return tr1, tr2 @@ -170,8 +170,8 @@ func TestTrie_PutBatchExtension(t *testing.T) { func TestTrie_PutBatchBranch(t *testing.T) { prepareBranch := func(t *testing.T) (*Trie, *Trie) { - tr1 := NewTrie(EmptyNode{}, false, newTestStore()) - tr2 := NewTrie(EmptyNode{}, false, newTestStore()) + tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) + tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) require.NoError(t, tr1.Put([]byte{0x00, 2}, []byte("value1"))) require.NoError(t, tr2.Put([]byte{0x00, 2}, []byte("value1"))) require.NoError(t, tr1.Put([]byte{0x10, 3}, []byte("value2"))) @@ -201,8 +201,8 @@ func TestTrie_PutBatchBranch(t *testing.T) { require.IsType(t, (*ExtensionNode)(nil), tr1.root) }) t.Run("non-empty child is last node", func(t *testing.T) { - tr1 := NewTrie(EmptyNode{}, false, newTestStore()) - tr2 := NewTrie(EmptyNode{}, false, newTestStore()) + tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) + tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) require.NoError(t, tr1.Put([]byte{0x00, 2}, []byte("value1"))) require.NoError(t, tr2.Put([]byte{0x00, 2}, []byte("value1"))) require.NoError(t, tr1.Put([]byte{0x00}, []byte("value2"))) @@ -248,8 +248,8 @@ func TestTrie_PutBatchBranch(t *testing.T) { func TestTrie_PutBatchHash(t *testing.T) { prepareHash := func(t *testing.T) (*Trie, *Trie) { - tr1 := NewTrie(EmptyNode{}, false, newTestStore()) - tr2 := NewTrie(EmptyNode{}, false, newTestStore()) + tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) + tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) require.NoError(t, tr1.Put([]byte{0x10}, []byte("value1"))) require.NoError(t, tr2.Put([]byte{0x10}, []byte("value1"))) require.NoError(t, tr1.Put([]byte{0x20}, []byte("value2"))) @@ -283,8 +283,8 @@ func TestTrie_PutBatchHash(t *testing.T) { func TestTrie_PutBatchEmpty(t *testing.T) { t.Run("good", func(t *testing.T) { - tr1 := NewTrie(EmptyNode{}, false, newTestStore()) - tr2 := NewTrie(EmptyNode{}, false, newTestStore()) + tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) + tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) var ps = pairs{ {[]byte{0}, []byte("value0")}, {[]byte{1}, []byte("value1")}, @@ -299,15 +299,15 @@ func TestTrie_PutBatchEmpty(t *testing.T) { {[]byte{2}, nil}, {[]byte{3}, []byte("replace3")}, } - tr1 := NewTrie(EmptyNode{}, false, newTestStore()) - tr2 := NewTrie(EmptyNode{}, false, newTestStore()) + tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) + tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) testIncompletePut(t, ps, 4, tr1, tr2) }) } // For the sake of coverage. func TestTrie_InvalidNodeType(t *testing.T) { - tr := NewTrie(EmptyNode{}, false, newTestStore()) + tr := NewTrie(EmptyNode{}, ModeAll, newTestStore()) var b Batch b.Add([]byte{1}, []byte("value")) tr.root = Node(nil) @@ -315,8 +315,8 @@ func TestTrie_InvalidNodeType(t *testing.T) { } func TestTrie_PutBatch(t *testing.T) { - tr1 := NewTrie(EmptyNode{}, false, newTestStore()) - tr2 := NewTrie(EmptyNode{}, false, newTestStore()) + tr1 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) + tr2 := NewTrie(EmptyNode{}, ModeAll, newTestStore()) var ps = pairs{ {[]byte{1}, []byte{1}}, {[]byte{2}, []byte{3}}, diff --git a/pkg/core/mpt/billet.go b/pkg/core/mpt/billet.go index 1840e04a5..aa43a4734 100644 --- a/pkg/core/mpt/billet.go +++ b/pkg/core/mpt/billet.go @@ -31,20 +31,20 @@ type Billet struct { TempStoragePrefix storage.KeyPrefix Store *storage.MemCachedStore - root Node - refcountEnabled bool + root Node + mode TrieMode } // NewBillet returns new billet for MPT trie restoring. It accepts a MemCachedStore // to decouple storage errors from logic errors so that all storage errors are // processed during `store.Persist()` at the caller. This also has the benefit, // that every `Put` can be considered an atomic operation. -func NewBillet(rootHash util.Uint256, enableRefCount bool, prefix storage.KeyPrefix, store *storage.MemCachedStore) *Billet { +func NewBillet(rootHash util.Uint256, mode TrieMode, prefix storage.KeyPrefix, store *storage.MemCachedStore) *Billet { return &Billet{ TempStoragePrefix: prefix, Store: store, root: NewHashNode(rootHash), - refcountEnabled: enableRefCount, + mode: mode, } } @@ -178,7 +178,7 @@ func (b *Billet) putIntoHash(curr *HashNode, path []byte, val Node) (Node, error func (b *Billet) incrementRefAndStore(h util.Uint256, bs []byte) { key := makeStorageKey(h) - if b.refcountEnabled { + if b.mode.RC() { var ( err error data []byte @@ -337,7 +337,7 @@ func (b *Billet) GetFromStore(h util.Uint256) (Node, error) { return nil, r.Err } - if b.refcountEnabled { + if b.mode.RC() { data = data[:len(data)-4] } n.Node.(flushedNode).setCache(data, h) diff --git a/pkg/core/mpt/billet_test.go b/pkg/core/mpt/billet_test.go index be24f9a83..ff77b6436 100644 --- a/pkg/core/mpt/billet_test.go +++ b/pkg/core/mpt/billet_test.go @@ -32,7 +32,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b.Children[5] = NewExtensionNode([]byte{0x01}, NewLeafNode([]byte{0xAB, 0xDE})) path := toNibbles([]byte{0xAC}) e := NewExtensionNode(path, NewHashNode(b.Hash())) - tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore()) + tr := NewBillet(e.Hash(), ModeLatest, storage.STTempStorage, newTestStore()) tr.root = e // OK @@ -61,7 +61,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { l := NewLeafNode([]byte{0xAB, 0xCD}) path := toNibbles([]byte{0xAC}) e := NewExtensionNode(path, NewHashNode(l.Hash())) - tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore()) + tr := NewBillet(e.Hash(), ModeLatest, storage.STTempStorage, newTestStore()) tr.root = e // OK @@ -87,7 +87,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { h := NewHashNode(util.Uint256{1, 2, 3}) path := toNibbles([]byte{0xAC}) e := NewExtensionNode(path, h) - tr := NewBillet(e.Hash(), true, storage.STTempStorage, newTestStore()) + tr := NewBillet(e.Hash(), ModeLatest, storage.STTempStorage, newTestStore()) tr.root = e // no-op @@ -99,7 +99,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { t.Run("parent is Leaf", func(t *testing.T) { l := NewLeafNode([]byte{0xAB, 0xCD}) path := []byte{} - tr := NewBillet(l.Hash(), true, storage.STTempStorage, newTestStore()) + tr := NewBillet(l.Hash(), ModeLatest, storage.STTempStorage, newTestStore()) tr.root = l // Already restored => panic expected @@ -121,7 +121,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b := NewBranchNode() b.Children[5] = NewHashNode(l1.Hash()) b.Children[lastChild] = NewHashNode(l2.Hash()) - tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) + tr := NewBillet(b.Hash(), ModeLatest, storage.STTempStorage, newTestStore()) tr.root = b // OK @@ -152,7 +152,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b := NewBranchNode() b.Children[5] = NewHashNode(l1.Hash()) b.Children[lastChild] = NewHashNode(l2.Hash()) - tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) + tr := NewBillet(b.Hash(), ModeLatest, storage.STTempStorage, newTestStore()) tr.root = b // OK @@ -179,7 +179,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { // two same hashnodes => leaf's refcount expected to be 2 in the end. b.Children[3] = NewHashNode(l.Hash()) b.Children[4] = NewHashNode(l.Hash()) - tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) + tr := NewBillet(b.Hash(), ModeLatest, storage.STTempStorage, newTestStore()) tr.root = b // OK @@ -202,7 +202,7 @@ func TestBillet_RestoreHashNode(t *testing.T) { b := NewBranchNode() b.Children[3] = NewHashNode(l.Hash()) b.Children[4] = NewHashNode(l.Hash()) - tr := NewBillet(b.Hash(), true, storage.STTempStorage, newTestStore()) + tr := NewBillet(b.Hash(), ModeLatest, storage.STTempStorage, newTestStore()) // Should fail, because if it's a hash node with non-empty path, then the node // has already been collapsed. diff --git a/pkg/core/mpt/compat_test.go b/pkg/core/mpt/compat_test.go index 742ad306e..2890b6fbc 100644 --- a/pkg/core/mpt/compat_test.go +++ b/pkg/core/mpt/compat_test.go @@ -23,7 +23,7 @@ func prepareMPTCompat() *Trie { b.Children[16] = v2 b.Children[15] = NewHashNode(e4.Hash()) - tr := NewTrie(r, true, newTestStore()) + tr := NewTrie(r, ModeLatest, newTestStore()) tr.putToStore(r) tr.putToStore(b) tr.putToStore(e1) @@ -132,7 +132,7 @@ func TestCompatibility(t *testing.T) { b.Children[0] = e1 b.Children[15] = NewHashNode(e4.Hash()) - tr := NewTrie(NewHashNode(r.Hash()), false, newTestStore()) + tr := NewTrie(NewHashNode(r.Hash()), ModeAll, newTestStore()) tr.putToStore(r) tr.putToStore(b) tr.putToStore(e1) @@ -152,7 +152,7 @@ func TestCompatibility(t *testing.T) { tr.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd}) tr.Flush() - tr2 := NewTrie(NewHashNode(tr.root.Hash()), false, tr.Store) + tr2 := NewTrie(NewHashNode(tr.root.Hash()), ModeAll, tr.Store) tr2.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd}) }) @@ -189,7 +189,7 @@ func TestCompatibility(t *testing.T) { b.Children[16] = v2 b.Children[15] = NewHashNode(e4.Hash()) - tr := NewTrie(NewHashNode(r.Hash()), true, mainTrie.Store) + tr := NewTrie(NewHashNode(r.Hash()), ModeLatest, mainTrie.Store) require.Equal(t, r.Hash(), tr.root.Hash()) // Tail bytes contain reference counter thus check for prefix. @@ -352,7 +352,7 @@ func TestCompatibility(t *testing.T) { } func copyTrie(t *Trie) *Trie { - return NewTrie(NewHashNode(t.root.Hash()), t.refcountEnabled, t.Store) + return NewTrie(NewHashNode(t.root.Hash()), t.mode, t.Store) } func checkBatchSize(t *testing.T, tr *Trie, n int) { @@ -372,7 +372,7 @@ func testGetProof(t *testing.T, tr *Trie, key []byte, size int) [][]byte { } func newFilledTrie(t *testing.T, args ...[]byte) *Trie { - tr := NewTrie(nil, true, newTestStore()) + tr := NewTrie(nil, ModeLatest, newTestStore()) for i := 0; i < len(args); i += 2 { require.NoError(t, tr.Put(args[i], args[i+1])) } @@ -381,7 +381,7 @@ func newFilledTrie(t *testing.T, args ...[]byte) *Trie { func TestCompatibility_Find(t *testing.T) { check := func(t *testing.T, from []byte, expectedResLen int) { - tr := NewTrie(nil, false, newTestStore()) + tr := NewTrie(nil, ModeAll, newTestStore()) require.NoError(t, tr.Put([]byte("aa"), []byte("02"))) require.NoError(t, tr.Put([]byte("aa10"), []byte("03"))) require.NoError(t, tr.Put([]byte("aa50"), []byte("04"))) @@ -407,7 +407,7 @@ func TestCompatibility_Find(t *testing.T) { check(t, []byte{}, 2) // without `from` key }) t.Run("TestFindStatesIssue652", func(t *testing.T) { - tr := NewTrie(nil, false, newTestStore()) + tr := NewTrie(nil, ModeAll, newTestStore()) // root is an extension node with key=abc; next=branch require.NoError(t, tr.Put([]byte("abc1"), []byte("01"))) require.NoError(t, tr.Put([]byte("abc3"), []byte("02"))) diff --git a/pkg/core/mpt/node_test.go b/pkg/core/mpt/node_test.go index a4425d71c..4a7143e40 100644 --- a/pkg/core/mpt/node_test.go +++ b/pkg/core/mpt/node_test.go @@ -93,7 +93,7 @@ func TestNode_Serializable(t *testing.T) { // https://github.com/neo-project/neo/blob/neox-2.x/neo.UnitTests/UT_MPTTrie.cs#L198 func TestJSONSharp(t *testing.T) { - tr := NewTrie(nil, false, newTestStore()) + tr := NewTrie(nil, ModeAll, newTestStore()) require.NoError(t, tr.Put([]byte{0xac, 0x11}, []byte{0xac, 0x11})) require.NoError(t, tr.Put([]byte{0xac, 0x22}, []byte{0xac, 0x22})) require.NoError(t, tr.Put([]byte{0xac}, []byte{0xac})) diff --git a/pkg/core/mpt/proof.go b/pkg/core/mpt/proof.go index a769550ca..c3ed7d75c 100644 --- a/pkg/core/mpt/proof.go +++ b/pkg/core/mpt/proof.go @@ -66,7 +66,7 @@ func (t *Trie) getProof(curr Node, path []byte, proofs *[][]byte) (Node, error) // It also returns value for the key. func VerifyProof(rh util.Uint256, key []byte, proofs [][]byte) ([]byte, bool) { path := toNibbles(key) - tr := NewTrie(NewHashNode(rh), false, storage.NewMemCachedStore(storage.NewMemoryStore())) + tr := NewTrie(NewHashNode(rh), ModeAll, storage.NewMemCachedStore(storage.NewMemoryStore())) for i := range proofs { h := hash.DoubleSha256(proofs[i]) // no errors in Put to memory store diff --git a/pkg/core/mpt/proof_test.go b/pkg/core/mpt/proof_test.go index d733fb6d0..3b8fd7d15 100644 --- a/pkg/core/mpt/proof_test.go +++ b/pkg/core/mpt/proof_test.go @@ -15,7 +15,7 @@ func newProofTrie(t *testing.T, missingHashNode bool) *Trie { b.Children[4] = NewHashNode(e.Hash()) b.Children[5] = e2 - tr := NewTrie(b, false, newTestStore()) + tr := NewTrie(b, ModeAll, newTestStore()) require.NoError(t, tr.Put([]byte{0x12, 0x31}, []byte("value1"))) require.NoError(t, tr.Put([]byte{0x12, 0x32}, []byte("value2"))) tr.putToStore(l) diff --git a/pkg/core/mpt/trie.go b/pkg/core/mpt/trie.go index 7bc1c603e..fc440d1a7 100644 --- a/pkg/core/mpt/trie.go +++ b/pkg/core/mpt/trie.go @@ -12,13 +12,29 @@ import ( "github.com/nspcc-dev/neo-go/pkg/util/slice" ) +// TrieMode is the storage mode of trie, it affects the DB scheme. +type TrieMode byte + +// TrieMode is the storage mode of trie. +const ( + // ModeAll is used to store everything. + ModeAll TrieMode = 0 + // ModeLatest is used to only store the latest root. + ModeLatest TrieMode = 0x01 + // ModeGCFlag is a flag for GC. + ModeGCFlag TrieMode = 0x02 + // ModeGC is used to store a set of roots with GC possible, it combines + // GCFlag and Latest (because it needs RC, but it has GC enabled). + ModeGC TrieMode = 0x03 +) + // Trie is an MPT trie storing all key-value pairs. type Trie struct { Store *storage.MemCachedStore - root Node - refcountEnabled bool - refcount map[util.Uint256]*cachedNode + root Node + mode TrieMode + refcount map[util.Uint256]*cachedNode } type cachedNode struct { @@ -30,10 +46,20 @@ type cachedNode struct { // ErrNotFound is returned when requested trie item is missing. var ErrNotFound = errors.New("item not found") +// RC returns true when reference counting is enabled. +func (m TrieMode) RC() bool { + return m&ModeLatest != 0 +} + +// GC returns true when garbage collection is enabled. +func (m TrieMode) GC() bool { + return m&ModeGCFlag != 0 +} + // NewTrie returns new MPT trie. It accepts a MemCachedStore to decouple storage errors from logic errors // so that all storage errors are processed during `store.Persist()` at the caller. // This also has the benefit, that every `Put` can be considered an atomic operation. -func NewTrie(root Node, enableRefCount bool, store *storage.MemCachedStore) *Trie { +func NewTrie(root Node, mode TrieMode, store *storage.MemCachedStore) *Trie { if root == nil { root = EmptyNode{} } @@ -42,8 +68,8 @@ func NewTrie(root Node, enableRefCount bool, store *storage.MemCachedStore) *Tri Store: store, root: root, - refcountEnabled: enableRefCount, - refcount: make(map[util.Uint256]*cachedNode), + mode: mode, + refcount: make(map[util.Uint256]*cachedNode), } } @@ -386,7 +412,7 @@ func (t *Trie) Flush() { if node.bytes == nil { panic("item not in trie") } - if t.refcountEnabled { + if t.mode.RC() { node.initial = t.updateRefCount(h) if node.initial == 0 { delete(t.refcount, h) @@ -403,7 +429,7 @@ func (t *Trie) Flush() { // updateRefCount should be called only when refcounting is enabled. func (t *Trie) updateRefCount(h util.Uint256) int32 { - if !t.refcountEnabled { + if !t.mode.RC() { panic("`updateRefCount` is called, but GC is disabled") } var data []byte @@ -478,7 +504,7 @@ func (t *Trie) getFromStore(h util.Uint256) (Node, error) { return nil, r.Err } - if t.refcountEnabled { + if t.mode.RC() { data = data[:len(data)-4] node := t.refcount[h] if node != nil { @@ -566,7 +592,7 @@ func (t *Trie) Find(prefix, from []byte, max int) ([]storage.KeyValue, error) { res []storage.KeyValue count int ) - b := NewBillet(t.root.Hash(), false, 0, t.Store) + b := NewBillet(t.root.Hash(), t.mode, 0, t.Store) process := func(pathToNode []byte, node Node, _ []byte) bool { if leaf, ok := node.(*LeafNode); ok { if from == nil || !bytes.Equal(pathToNode, from) { // (*Billet).traverse includes `from` path into result if so. Need to filter out manually. diff --git a/pkg/core/mpt/trie_test.go b/pkg/core/mpt/trie_test.go index 465d7964b..2e2f7adcf 100644 --- a/pkg/core/mpt/trie_test.go +++ b/pkg/core/mpt/trie_test.go @@ -29,7 +29,7 @@ func newTestTrie(t *testing.T) *Trie { b.Children[10] = NewExtensionNode([]byte{0x0e}, h) e := NewExtensionNode(toNibbles([]byte{0xAC}), b) - tr := NewTrie(e, false, newTestStore()) + tr := NewTrie(e, ModeAll, newTestStore()) tr.putToStore(e) tr.putToStore(b) @@ -46,7 +46,7 @@ func newTestTrie(t *testing.T) *Trie { } func testTrieRefcount(t *testing.T, key1, key2 []byte) { - tr := NewTrie(nil, true, storage.NewMemCachedStore(storage.NewMemoryStore())) + tr := NewTrie(nil, ModeLatest, storage.NewMemCachedStore(storage.NewMemoryStore())) require.NoError(t, tr.Put(key1, []byte{1})) tr.Flush() require.NoError(t, tr.Put(key2, []byte{1})) @@ -89,7 +89,7 @@ func TestTrie_PutIntoBranchNode(t *testing.T) { l := NewLeafNode([]byte{0x8}) b.Children[0x7] = NewHashNode(l.Hash()) b.Children[0x8] = NewHashNode(random.Uint256()) - tr := NewTrie(b, false, newTestStore()) + tr := NewTrie(b, ModeAll, newTestStore()) // empty hash node child require.NoError(t, tr.Put([]byte{0x66}, value)) @@ -119,7 +119,7 @@ func TestTrie_PutIntoExtensionNode(t *testing.T) { l := NewLeafNode([]byte{0x11}) key := []byte{0x12} e := NewExtensionNode(toNibbles(key), NewHashNode(l.Hash())) - tr := NewTrie(e, false, newTestStore()) + tr := NewTrie(e, ModeAll, newTestStore()) // missing hash require.Error(t, tr.Put(key, value)) @@ -145,7 +145,7 @@ func TestTrie_PutIntoHashNode(t *testing.T) { e := NewExtensionNode([]byte{0x02}, l) b.Children[1] = NewHashNode(e.Hash()) b.Children[9] = NewHashNode(random.Uint256()) - tr := NewTrie(b, false, newTestStore()) + tr := NewTrie(b, ModeAll, newTestStore()) tr.putToStore(e) @@ -174,7 +174,7 @@ func TestTrie_PutIntoHashNode(t *testing.T) { func TestTrie_Put(t *testing.T) { trExp := newTestTrie(t) - trAct := NewTrie(nil, false, newTestStore()) + trAct := NewTrie(nil, ModeAll, newTestStore()) require.NoError(t, trAct.Put([]byte{0xAC, 0x01}, []byte{0xAB, 0xCD})) require.NoError(t, trAct.Put([]byte{0xAC, 0x13}, []byte{})) require.NoError(t, trAct.Put([]byte{0xAC, 0x99}, []byte{0x22, 0x22})) @@ -186,7 +186,7 @@ func TestTrie_Put(t *testing.T) { } func TestTrie_PutInvalid(t *testing.T) { - tr := NewTrie(nil, false, newTestStore()) + tr := NewTrie(nil, ModeAll, newTestStore()) key, value := []byte("key"), []byte("value") // empty key @@ -204,7 +204,7 @@ func TestTrie_PutInvalid(t *testing.T) { } func TestTrie_BigPut(t *testing.T) { - tr := NewTrie(nil, false, newTestStore()) + tr := NewTrie(nil, ModeAll, newTestStore()) items := []struct{ k, v string }{ {"item with long key", "value1"}, {"item with matching prefix", "value2"}, @@ -244,7 +244,7 @@ func (tr *Trie) putToStore(n Node) { if n.Type() == HashT { panic("can't put hash node in trie") } - if tr.refcountEnabled { + if tr.mode.RC() { tr.refcount[n.Hash()] = &cachedNode{ bytes: n.Bytes(), refcount: 1, @@ -298,7 +298,7 @@ func TestTrie_Get(t *testing.T) { }) t.Run("UnfoldRoot", func(t *testing.T) { tr := newTestTrie(t) - single := NewTrie(NewHashNode(tr.root.Hash()), false, tr.Store) + single := NewTrie(NewHashNode(tr.root.Hash()), ModeAll, tr.Store) single.testHas(t, []byte{0xAC}, nil) single.testHas(t, []byte{0xAC, 0x01}, []byte{0xAB, 0xCD}) single.testHas(t, []byte{0xAC, 0x99}, []byte{0x22, 0x22}) @@ -313,13 +313,13 @@ func TestTrie_Flush(t *testing.T) { "key2": []byte("value2"), } - tr := NewTrie(nil, false, newTestStore()) + tr := NewTrie(nil, ModeAll, newTestStore()) for k, v := range pairs { require.NoError(t, tr.Put([]byte(k), v)) } tr.Flush() - tr = NewTrie(NewHashNode(tr.StateRoot()), false, tr.Store) + tr = NewTrie(NewHashNode(tr.StateRoot()), ModeAll, tr.Store) for k, v := range pairs { actual, err := tr.Get([]byte(k)) require.NoError(t, err) @@ -337,10 +337,14 @@ func TestTrie_Delete(t *testing.T) { } func testTrieDelete(t *testing.T, enableGC bool) { + var mode TrieMode + if enableGC { + mode = ModeLatest + } t.Run("Hash", func(t *testing.T) { t.Run("FromStore", func(t *testing.T) { l := NewLeafNode([]byte{0x12}) - tr := NewTrie(NewHashNode(l.Hash()), enableGC, newTestStore()) + tr := NewTrie(NewHashNode(l.Hash()), mode, newTestStore()) t.Run("NotInStore", func(t *testing.T) { require.Error(t, tr.Delete([]byte{})) }) @@ -352,7 +356,7 @@ func testTrieDelete(t *testing.T, enableGC bool) { }) t.Run("Empty", func(t *testing.T) { - tr := NewTrie(nil, enableGC, newTestStore()) + tr := NewTrie(nil, mode, newTestStore()) require.NoError(t, tr.Delete([]byte{})) }) }) @@ -360,7 +364,7 @@ func testTrieDelete(t *testing.T, enableGC bool) { t.Run("Leaf", func(t *testing.T) { check := func(t *testing.T, value []byte) { l := NewLeafNode(value) - tr := NewTrie(l, enableGC, newTestStore()) + tr := NewTrie(l, mode, newTestStore()) t.Run("NonExistentKey", func(t *testing.T) { require.NoError(t, tr.Delete([]byte{0x12})) tr.testHas(t, []byte{}, value) @@ -381,7 +385,7 @@ func testTrieDelete(t *testing.T, enableGC bool) { check := func(t *testing.T, value []byte) { l := NewLeafNode(value) e := NewExtensionNode([]byte{0x0A, 0x0B}, l) - tr := NewTrie(e, enableGC, newTestStore()) + tr := NewTrie(e, mode, newTestStore()) t.Run("NonExistentKey", func(t *testing.T) { require.NoError(t, tr.Delete([]byte{})) @@ -405,7 +409,7 @@ func testTrieDelete(t *testing.T, enableGC bool) { b.Children[0] = NewExtensionNode([]byte{0x01}, NewLeafNode(value)) b.Children[6] = NewExtensionNode([]byte{0x07}, NewLeafNode([]byte{0x56, 0x78})) e := NewExtensionNode([]byte{0x01, 0x02}, b) - tr := NewTrie(e, enableGC, newTestStore()) + tr := NewTrie(e, mode, newTestStore()) h := e.Hash() require.NoError(t, tr.Delete([]byte{0x12, 0x01})) @@ -432,7 +436,7 @@ func testTrieDelete(t *testing.T, enableGC bool) { b.Children[lastChild] = NewLeafNode([]byte{0x12}) b.Children[0] = NewExtensionNode([]byte{0x01}, NewLeafNode([]byte{0x34})) b.Children[1] = NewExtensionNode([]byte{0x06}, NewLeafNode(value)) - tr := NewTrie(b, enableGC, newTestStore()) + tr := NewTrie(b, mode, newTestStore()) require.NoError(t, tr.Delete([]byte{0x16})) tr.testHas(t, []byte{}, []byte{0x12}) tr.testHas(t, []byte{0x01}, []byte{0x34}) @@ -454,7 +458,7 @@ func testTrieDelete(t *testing.T, enableGC bool) { l := NewLeafNode([]byte{0x34}) e := NewExtensionNode([]byte{0x06}, l) b.Children[5] = NewHashNode(e.Hash()) - tr := NewTrie(b, enableGC, newTestStore()) + tr := NewTrie(b, mode, newTestStore()) tr.putToStore(l) tr.putToStore(e) require.NoError(t, tr.Delete([]byte{})) @@ -478,7 +482,7 @@ func testTrieDelete(t *testing.T, enableGC bool) { b.Children[3] = NewExtensionNode([]byte{4}, NewLeafNode(value)) b.Children[lastChild] = NewHashNode(h) - tr := NewTrie(NewExtensionNode([]byte{1, 2}, b), enableGC, newTestStore()) + tr := NewTrie(NewExtensionNode([]byte{1, 2}, b), mode, newTestStore()) tr.putToStore(ch) require.NoError(t, tr.Delete([]byte{0x12, 0x34})) @@ -505,7 +509,7 @@ func testTrieDelete(t *testing.T, enableGC bool) { b := NewBranchNode() b.Children[lastChild] = NewLeafNode(value) b.Children[5] = c - tr := NewTrie(b, enableGC, newTestStore()) + tr := NewTrie(b, mode, newTestStore()) require.NoError(t, tr.Delete([]byte{})) tr.testHas(t, []byte{}, nil) @@ -529,7 +533,7 @@ func testTrieDelete(t *testing.T, enableGC bool) { l := NewLeafNode(value) e := NewExtensionNode([]byte{0x06}, l) b.Children[5] = NewHashNode(e.Hash()) - tr := NewTrie(b, enableGC, newTestStore()) + tr := NewTrie(b, mode, newTestStore()) tr.putToStore(l) tr.putToStore(e) require.NoError(t, tr.Delete([]byte{0x56})) @@ -579,7 +583,7 @@ func TestTrie_Collapse(t *testing.T) { b.Children[0] = e hb := b.Hash() - tr := NewTrie(b, false, newTestStore()) + tr := NewTrie(b, ModeAll, newTestStore()) tr.Collapse(1) newb, ok := tr.root.(*BranchNode) @@ -593,7 +597,7 @@ func TestTrie_Collapse(t *testing.T) { hl := l.Hash() e := NewExtensionNode([]byte{0x01}, l) h := e.Hash() - tr := NewTrie(e, false, newTestStore()) + tr := NewTrie(e, ModeAll, newTestStore()) tr.Collapse(1) newe, ok := tr.root.(*ExtensionNode) @@ -604,19 +608,19 @@ func TestTrie_Collapse(t *testing.T) { }) t.Run("Leaf", func(t *testing.T) { l := NewLeafNode([]byte("value")) - tr := NewTrie(l, false, newTestStore()) + tr := NewTrie(l, ModeAll, newTestStore()) tr.Collapse(10) require.Equal(t, NewLeafNode([]byte("value")), tr.root) }) t.Run("Empty Leaf", func(t *testing.T) { l := NewLeafNode([]byte{}) - tr := NewTrie(l, false, newTestStore()) + tr := NewTrie(l, ModeAll, newTestStore()) tr.Collapse(10) require.Equal(t, NewLeafNode([]byte{}), tr.root) }) t.Run("Hash", func(t *testing.T) { t.Run("EmptyNode", func(t *testing.T) { - tr := NewTrie(EmptyNode{}, false, newTestStore()) + tr := NewTrie(EmptyNode{}, ModeAll, newTestStore()) require.NotPanics(t, func() { tr.Collapse(1) }) _, ok := tr.root.(EmptyNode) require.True(t, ok) @@ -624,7 +628,7 @@ func TestTrie_Collapse(t *testing.T) { h := random.Uint256() hn := NewHashNode(h) - tr := NewTrie(hn, false, newTestStore()) + tr := NewTrie(hn, ModeAll, newTestStore()) tr.Collapse(10) newRoot, ok := tr.root.(*HashNode) diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index 2af4665fb..53cdda5b0 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -28,6 +28,7 @@ type ( Store *storage.MemCachedStore network netmode.Magic srInHead bool + mode mpt.TrieMode mpt *mpt.Trie verifier VerifierFunc log *zap.Logger @@ -52,9 +53,14 @@ type ( // NewModule returns new instance of stateroot module. func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Logger, s *storage.MemCachedStore) *Module { + var mode mpt.TrieMode + if cfg.KeepOnlyLatestState { + mode |= mpt.ModeLatest + } return &Module{ network: cfg.Magic, srInHead: cfg.StateRootInHeader, + mode: mode, verifier: verif, log: log, Store: s, @@ -63,7 +69,7 @@ func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Lo // GetState returns value at the specified key fom the MPT with the specified root. func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), false, storage.NewMemCachedStore(s.Store)) + tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store)) return tr.Get(key) } @@ -73,13 +79,13 @@ func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) { // item with key equals to prefix is included into result; if empty `start` specified, // then item with key equals to prefix is not included into result. func (s *Module) FindStates(root util.Uint256, prefix, start []byte, max int) ([]storage.KeyValue, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), false, storage.NewMemCachedStore(s.Store)) + tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store)) return tr.Find(prefix, start, max) } // GetStateProof returns proof of having key in the MPT with the specified root. func (s *Module) GetStateProof(root util.Uint256, key []byte) ([][]byte, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), false, storage.NewMemCachedStore(s.Store)) + tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store)) return tr.GetProof(key) } @@ -104,14 +110,14 @@ func (s *Module) CurrentValidatedHeight() uint32 { } // Init initializes state root module at the given height. -func (s *Module) Init(height uint32, enableRefCount bool) error { +func (s *Module) Init(height uint32) error { data, err := s.Store.Get([]byte{byte(storage.DataMPT), prefixValidated}) if err == nil { s.validatedHeight.Store(binary.LittleEndian.Uint32(data)) } if height == 0 { - s.mpt = mpt.NewTrie(nil, enableRefCount, s.Store) + s.mpt = mpt.NewTrie(nil, s.mode, s.Store) s.currentLocal.Store(util.Uint256{}) return nil } @@ -121,7 +127,7 @@ func (s *Module) Init(height uint32, enableRefCount bool) error { } s.currentLocal.Store(r.Root) s.localHeight.Store(r.Index) - s.mpt = mpt.NewTrie(mpt.NewHashNode(r.Root), enableRefCount, s.Store) + s.mpt = mpt.NewTrie(mpt.NewHashNode(r.Root), s.mode, s.Store) return nil } @@ -157,7 +163,7 @@ func (s *Module) CleanStorage() error { } // JumpToState performs jump to the state specified by given stateroot index. -func (s *Module) JumpToState(sr *state.MPTRoot, enableRefCount bool) error { +func (s *Module) JumpToState(sr *state.MPTRoot) error { if err := s.addLocalStateRoot(s.Store, sr); err != nil { return fmt.Errorf("failed to store local state root: %w", err) } @@ -171,7 +177,7 @@ func (s *Module) JumpToState(sr *state.MPTRoot, enableRefCount bool) error { s.currentLocal.Store(sr.Root) s.localHeight.Store(sr.Index) - s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), enableRefCount, s.Store) + s.mpt = mpt.NewTrie(mpt.NewHashNode(sr.Root), s.mode, s.Store) return nil } diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index e664fe9e2..76ecc56e1 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -221,7 +221,11 @@ func (s *Module) defineSyncStage() error { if err != nil { return fmt.Errorf("failed to get header to initialize MPT billet: %w", err) } - s.billet = mpt.NewBillet(header.PrevStateRoot, s.bc.GetConfig().KeepOnlyLatestState, + var mode mpt.TrieMode + if s.bc.GetConfig().KeepOnlyLatestState { + mode |= mpt.ModeLatest + } + s.billet = mpt.NewBillet(header.PrevStateRoot, mode, TemporaryPrefix(s.dao.Version.StoragePrefix), s.dao.Store) s.log.Info("MPT billet initialized", zap.Uint32("height", s.syncPoint), @@ -494,7 +498,11 @@ func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeByt s.lock.RLock() defer s.lock.RUnlock() - b := mpt.NewBillet(root, s.bc.GetConfig().KeepOnlyLatestState, 0, storage.NewMemCachedStore(s.dao.Store)) + var mode mpt.TrieMode + if s.bc.GetConfig().KeepOnlyLatestState { + mode |= mpt.ModeLatest + } + b := mpt.NewBillet(root, mode, 0, storage.NewMemCachedStore(s.dao.Store)) return b.Traverse(func(pathToNode []byte, node mpt.Node, nodeBytes []byte) bool { return process(node, nodeBytes) }, false) diff --git a/pkg/core/statesync/module_test.go b/pkg/core/statesync/module_test.go index a4a6a253a..69fc6614c 100644 --- a/pkg/core/statesync/module_test.go +++ b/pkg/core/statesync/module_test.go @@ -15,7 +15,7 @@ import ( func TestModule_PR2019_discussion_r689629704(t *testing.T) { expectedStorage := storage.NewMemCachedStore(storage.NewMemoryStore()) - tr := mpt.NewTrie(nil, true, expectedStorage) + tr := mpt.NewTrie(nil, mpt.ModeLatest, expectedStorage) require.NoError(t, tr.Put([]byte{0x03}, []byte("leaf1"))) require.NoError(t, tr.Put([]byte{0x01, 0xab, 0x02}, []byte("leaf2"))) require.NoError(t, tr.Put([]byte{0x01, 0xab, 0x04}, []byte("leaf3"))) @@ -57,7 +57,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) { dao: dao.NewSimple(actualStorage, true, false), mptpool: NewPool(), } - stateSync.billet = mpt.NewBillet(sr, true, + stateSync.billet = mpt.NewBillet(sr, mpt.ModeLatest, TemporaryPrefix(stateSync.dao.Version.StoragePrefix), actualStorage) stateSync.mptpool.Add(sr, []byte{}) From c4ee310e85536aef0f099b1cede7c610e5a2b4e9 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 28 Jan 2022 15:05:13 +0300 Subject: [PATCH 5/8] mpt: modify refcounted storage scheme to make GC possible Add "active" flag into the node data and make the remainder modal, for active nodes it's a reference counter, for inactive ones the deactivation height is stored. Technically, refcounted chains storing just one trie don't need a flag, but it's a bit simpler this way. --- pkg/core/blockchain.go | 2 +- pkg/core/mpt/batch_test.go | 14 ++++---- pkg/core/mpt/billet.go | 4 +-- pkg/core/mpt/compat_test.go | 58 +++++++++++++++---------------- pkg/core/mpt/trie.go | 35 ++++++++++++++----- pkg/core/mpt/trie_test.go | 14 ++++---- pkg/core/stateroot/module.go | 11 +++--- pkg/core/statesync/module.go | 6 ++-- pkg/core/statesync/module_test.go | 2 +- 9 files changed, 84 insertions(+), 62 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 616bf4fdf..4e10d42d2 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -44,7 +44,7 @@ import ( // Tuning parameters. const ( headerBatchCount = 2000 - version = "0.2.1" + version = "0.2.2" defaultInitialGAS = 52000000_00000000 defaultMemPoolSize = 50000 diff --git a/pkg/core/mpt/batch_test.go b/pkg/core/mpt/batch_test.go index 8d1923823..ced30c061 100644 --- a/pkg/core/mpt/batch_test.go +++ b/pkg/core/mpt/batch_test.go @@ -56,7 +56,7 @@ func testIncompletePut(t *testing.T, ps pairs, n int, tr1, tr2 *Trie) { require.Equal(t, tr1.StateRoot(), tr2.StateRoot()) t.Run("test restore", func(t *testing.T) { - tr2.Flush() + tr2.Flush(0) tr3 := NewTrie(NewHashNode(tr2.StateRoot()), ModeAll, storage.NewMemCachedStore(tr2.Store)) for _, p := range ps[:n] { val, err := tr3.Get(p[0]) @@ -191,9 +191,9 @@ func TestTrie_PutBatchBranch(t *testing.T) { t.Run("non-empty child is hash node", func(t *testing.T) { tr1, tr2 := prepareBranch(t) - tr1.Flush() + tr1.Flush(0) tr1.Collapse(1) - tr2.Flush() + tr2.Flush(0) tr2.Collapse(1) var ps = pairs{{[]byte{0x00, 2}, nil}} @@ -208,9 +208,9 @@ func TestTrie_PutBatchBranch(t *testing.T) { require.NoError(t, tr1.Put([]byte{0x00}, []byte("value2"))) require.NoError(t, tr2.Put([]byte{0x00}, []byte("value2"))) - tr1.Flush() + tr1.Flush(0) tr1.Collapse(1) - tr2.Flush() + tr2.Flush(0) tr2.Collapse(1) var ps = pairs{{[]byte{0x00, 2}, nil}} @@ -254,8 +254,8 @@ func TestTrie_PutBatchHash(t *testing.T) { require.NoError(t, tr2.Put([]byte{0x10}, []byte("value1"))) require.NoError(t, tr1.Put([]byte{0x20}, []byte("value2"))) require.NoError(t, tr2.Put([]byte{0x20}, []byte("value2"))) - tr1.Flush() - tr2.Flush() + tr1.Flush(0) + tr2.Flush(0) return tr1, tr2 } diff --git a/pkg/core/mpt/billet.go b/pkg/core/mpt/billet.go index aa43a4734..d50645fa1 100644 --- a/pkg/core/mpt/billet.go +++ b/pkg/core/mpt/billet.go @@ -191,7 +191,7 @@ func (b *Billet) incrementRefAndStore(h util.Uint256, bs []byte) { } cnt++ if len(data) == 0 { - data = append(bs, 0, 0, 0, 0) + data = append(bs, 1, 0, 0, 0, 0) } binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(cnt)) _ = b.Store.Put(key, data) @@ -338,7 +338,7 @@ func (b *Billet) GetFromStore(h util.Uint256) (Node, error) { } if b.mode.RC() { - data = data[:len(data)-4] + data = data[:len(data)-5] } n.Node.(flushedNode).setCache(data, h) return n.Node, nil diff --git a/pkg/core/mpt/compat_test.go b/pkg/core/mpt/compat_test.go index 2890b6fbc..7ac16c83a 100644 --- a/pkg/core/mpt/compat_test.go +++ b/pkg/core/mpt/compat_test.go @@ -1,7 +1,6 @@ package mpt import ( - "bytes" "testing" "github.com/stretchr/testify/require" @@ -113,12 +112,12 @@ func TestCompatibility(t *testing.T) { tr := newFilledTrie(t, []byte{0xac, 0x00}, []byte{0xab, 0xcd}, []byte{0xac, 0x10}, []byte{0xab, 0xcd}) - tr.Flush() + tr.Flush(0) tr2 := copyTrie(tr) require.NoError(t, tr2.Delete([]byte{0xac, 0x00})) - tr2.Flush() + tr2.Flush(0) require.NoError(t, tr2.Delete([]byte{0xac, 0x10})) }) @@ -150,7 +149,7 @@ func TestCompatibility(t *testing.T) { require.NoError(t, tr.Delete([]byte{0xac, 0x01})) tr.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd}) - tr.Flush() + tr.Flush(0) tr2 := NewTrie(NewHashNode(tr.root.Hash()), ModeAll, tr.Store) tr2.testHas(t, []byte{0xac, 0x02}, []byte{0xab, 0xcd}) @@ -161,15 +160,15 @@ func TestCompatibility(t *testing.T) { []byte{0xac, 0x11}, []byte{0xac, 0x11}, []byte{0xac, 0x22}, []byte{0xac, 0x22}, []byte{0xac}, []byte{0xac}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 7) require.NoError(t, tr.Delete([]byte{0xac, 0x11})) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 5) require.NoError(t, tr.Delete([]byte{0xac, 0x22})) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 2) }) @@ -192,12 +191,11 @@ func TestCompatibility(t *testing.T) { tr := NewTrie(NewHashNode(r.Hash()), ModeLatest, mainTrie.Store) require.Equal(t, r.Hash(), tr.root.Hash()) - // Tail bytes contain reference counter thus check for prefix. proof := testGetProof(t, tr, []byte{0xac, 0x01}, 4) - require.True(t, bytes.HasPrefix(r.Bytes(), proof[0])) - require.True(t, bytes.HasPrefix(b.Bytes(), proof[1])) - require.True(t, bytes.HasPrefix(e1.Bytes(), proof[2])) - require.True(t, bytes.HasPrefix(v1.Bytes(), proof[3])) + require.Equal(t, r.Bytes(), proof[0]) + require.Equal(t, b.Bytes(), proof[1]) + require.Equal(t, e1.Bytes(), proof[2]) + require.Equal(t, v1.Bytes(), proof[3]) testGetProof(t, tr, []byte{0xac}, 3) testGetProof(t, tr, []byte{0xac, 0x10}, 0) @@ -242,11 +240,11 @@ func TestCompatibility(t *testing.T) { []byte{0xa1, 0x01}, []byte{0x01}, []byte{0xa2, 0x01}, []byte{0x01}, []byte{0xa3, 0x01}, []byte{0x01}) - tr.Flush() + tr.Flush(0) tr2 := copyTrie(tr) require.NoError(t, tr2.Delete([]byte{0xa3, 0x01})) - tr2.Flush() + tr2.Flush(0) tr3 := copyTrie(tr2) require.NoError(t, tr3.Delete([]byte{0xa2, 0x01})) @@ -258,15 +256,15 @@ func TestCompatibility(t *testing.T) { []byte{0xa1, 0x01}, []byte{0x01}, []byte{0xa2, 0x01}, []byte{0x01}, []byte{0xa3, 0x01}, []byte{0x01}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 4) require.NoError(t, tr.Delete([]byte{0xa3, 0x01})) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 4) require.NoError(t, tr.Delete([]byte{0xa2, 0x01})) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 2) tr.testHas(t, []byte{0xa1, 0x01}, []byte{0x01}) }) @@ -275,17 +273,17 @@ func TestCompatibility(t *testing.T) { tr := newFilledTrie(t, []byte{0xa1}, []byte{0x01}, []byte{0xa2}, []byte{0x02}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 4) tr1 := copyTrie(tr) require.NoError(t, tr1.Delete([]byte{0xa1})) - tr1.Flush() + tr1.Flush(0) require.Equal(t, 2, len(tr1.Store.GetBatch().Put)) tr2 := copyTrie(tr1) require.NoError(t, tr2.Delete([]byte{0xa2})) - tr2.Flush() + tr2.Flush(0) require.Equal(t, 0, len(tr2.Store.GetBatch().Put)) }) @@ -294,21 +292,21 @@ func TestCompatibility(t *testing.T) { []byte{0x10}, []byte{0x01}, []byte{0x20}, []byte{0x02}, []byte{0x30}, []byte{0x03}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 7) tr1 := copyTrie(tr) require.NoError(t, tr1.Delete([]byte{0x10})) - tr1.Flush() + tr1.Flush(0) tr2 := copyTrie(tr1) require.NoError(t, tr2.Delete([]byte{0x20})) - tr2.Flush() + tr2.Flush(0) require.Equal(t, 2, len(tr2.Store.GetBatch().Put)) tr3 := copyTrie(tr2) require.NoError(t, tr3.Delete([]byte{0x30})) - tr3.Flush() + tr3.Flush(0) require.Equal(t, 0, len(tr3.Store.GetBatch().Put)) }) @@ -316,12 +314,12 @@ func TestCompatibility(t *testing.T) { tr := newFilledTrie(t, []byte{0xa1}, []byte{0x01}, []byte{0xa2}, []byte{0x02}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 4) tr1 := copyTrie(tr) require.NoError(t, tr1.Put([]byte{0xa3}, []byte{0x03})) - tr1.Flush() + tr1.Flush(0) require.Equal(t, 5, len(tr1.Store.GetBatch().Put)) }) @@ -329,19 +327,19 @@ func TestCompatibility(t *testing.T) { tr := newFilledTrie(t, []byte{0x10}, []byte{0x01}, []byte{0x20}, []byte{0x02}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 5) tr1 := copyTrie(tr) require.NoError(t, tr1.Put([]byte{0x30}, []byte{0x03})) - tr1.Flush() + tr1.Flush(0) checkBatchSize(t, tr1, 7) }) t.Run("EmptyValueIssue633", func(t *testing.T) { tr := newFilledTrie(t, []byte{0x01}, []byte{}) - tr.Flush() + tr.Flush(0) checkBatchSize(t, tr, 2) proof := testGetProof(t, tr, []byte{0x01}, 2) @@ -411,7 +409,7 @@ func TestCompatibility_Find(t *testing.T) { // root is an extension node with key=abc; next=branch require.NoError(t, tr.Put([]byte("abc1"), []byte("01"))) require.NoError(t, tr.Put([]byte("abc3"), []byte("02"))) - tr.Flush() + tr.Flush(0) // find items with extension's key prefix t.Run("from > start", func(t *testing.T) { res, err := tr.Find([]byte("ab"), []byte("d2"), 100) diff --git a/pkg/core/mpt/trie.go b/pkg/core/mpt/trie.go index fc440d1a7..fef9e682f 100644 --- a/pkg/core/mpt/trie.go +++ b/pkg/core/mpt/trie.go @@ -406,14 +406,14 @@ func makeStorageKey(mptKey util.Uint256) []byte { // Because we care only about block-level changes, there is no need to put every // new node to storage. Normally, flush should be called with every StateRoot persist, i.e. // after every block. -func (t *Trie) Flush() { +func (t *Trie) Flush(index uint32) { for h, node := range t.refcount { if node.refcount != 0 { if node.bytes == nil { panic("item not in trie") } if t.mode.RC() { - node.initial = t.updateRefCount(h) + node.initial = t.updateRefCount(h, index) if node.initial == 0 { delete(t.refcount, h) } @@ -427,8 +427,20 @@ func (t *Trie) Flush() { } } +func IsActiveValue(v []byte) bool { + return len(v) > 4 && v[len(v)-5] == 1 +} + +func getFromStore(key []byte, mode TrieMode, store *storage.MemCachedStore) ([]byte, error) { + data, err := store.Get(key) + if err == nil && mode.GC() && !IsActiveValue(data) { + return nil, storage.ErrKeyNotFound + } + return data, err +} + // updateRefCount should be called only when refcounting is enabled. -func (t *Trie) updateRefCount(h util.Uint256) int32 { +func (t *Trie) updateRefCount(h util.Uint256, index uint32) int32 { if !t.mode.RC() { panic("`updateRefCount` is called, but GC is disabled") } @@ -439,13 +451,13 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 { if cnt == 0 { // A newly created item which may be in store. var err error - data, err = t.Store.Get(key) + data, err = getFromStore(key, t.mode, t.Store) if err == nil { cnt = int32(binary.LittleEndian.Uint32(data[len(data)-4:])) } } if len(data) == 0 { - data = append(node.bytes, 0, 0, 0, 0) + data = append(node.bytes, 1, 0, 0, 0, 0) } cnt += node.refcount switch { @@ -453,7 +465,13 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 { // BUG: negative reference count panic(fmt.Sprintf("negative reference count: %s new %d, upd %d", h.StringBE(), cnt, t.refcount[h])) case cnt == 0: - _ = t.Store.Delete(key) + if !t.mode.GC() { + _ = t.Store.Delete(key) + } else { + data[len(data)-5] = 0 + binary.LittleEndian.PutUint32(data[len(data)-4:], index) + _ = t.Store.Put(key, data) + } default: binary.LittleEndian.PutUint32(data[len(data)-4:], uint32(cnt)) _ = t.Store.Put(key, data) @@ -492,7 +510,7 @@ func (t *Trie) removeRef(h util.Uint256, bs []byte) { } func (t *Trie) getFromStore(h util.Uint256) (Node, error) { - data, err := t.Store.Get(makeStorageKey(h)) + data, err := getFromStore(makeStorageKey(h), t.mode, t.Store) if err != nil { return nil, err } @@ -505,10 +523,11 @@ func (t *Trie) getFromStore(h util.Uint256) (Node, error) { } if t.mode.RC() { - data = data[:len(data)-4] + data = data[:len(data)-5] node := t.refcount[h] if node != nil { node.bytes = data + _ = r.ReadB() node.initial = int32(r.ReadU32LE()) } } diff --git a/pkg/core/mpt/trie_test.go b/pkg/core/mpt/trie_test.go index 2e2f7adcf..e7eda2d0d 100644 --- a/pkg/core/mpt/trie_test.go +++ b/pkg/core/mpt/trie_test.go @@ -48,28 +48,28 @@ func newTestTrie(t *testing.T) *Trie { func testTrieRefcount(t *testing.T, key1, key2 []byte) { tr := NewTrie(nil, ModeLatest, storage.NewMemCachedStore(storage.NewMemoryStore())) require.NoError(t, tr.Put(key1, []byte{1})) - tr.Flush() + tr.Flush(0) require.NoError(t, tr.Put(key2, []byte{1})) - tr.Flush() + tr.Flush(0) tr.testHas(t, key1, []byte{1}) tr.testHas(t, key2, []byte{1}) // remove first, keep second require.NoError(t, tr.Delete(key1)) - tr.Flush() + tr.Flush(0) tr.testHas(t, key1, nil) tr.testHas(t, key2, []byte{1}) // no-op require.NoError(t, tr.Put(key1, []byte{1})) require.NoError(t, tr.Delete(key1)) - tr.Flush() + tr.Flush(0) tr.testHas(t, key1, nil) tr.testHas(t, key2, []byte{1}) // delete non-existent, refcount should not be updated require.NoError(t, tr.Delete(key1)) - tr.Flush() + tr.Flush(0) tr.testHas(t, key1, nil) tr.testHas(t, key2, []byte{1}) } @@ -249,7 +249,7 @@ func (tr *Trie) putToStore(n Node) { bytes: n.Bytes(), refcount: 1, } - tr.updateRefCount(n.Hash()) + tr.updateRefCount(n.Hash(), 0) } else { _ = tr.Store.Put(makeStorageKey(n.Hash()), n.Bytes()) } @@ -318,7 +318,7 @@ func TestTrie_Flush(t *testing.T) { require.NoError(t, tr.Put([]byte(k), v)) } - tr.Flush() + tr.Flush(0) tr = NewTrie(NewHashNode(tr.StateRoot()), ModeAll, tr.Store) for k, v := range pairs { actual, err := tr.Get([]byte(k)) diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index 53cdda5b0..b3d6b1a63 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -69,7 +69,8 @@ func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Lo // GetState returns value at the specified key fom the MPT with the specified root. func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store)) + // Allow accessing old values, it's RO thing. + tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store)) return tr.Get(key) } @@ -79,13 +80,15 @@ func (s *Module) GetState(root util.Uint256, key []byte) ([]byte, error) { // item with key equals to prefix is included into result; if empty `start` specified, // then item with key equals to prefix is not included into result. func (s *Module) FindStates(root util.Uint256, prefix, start []byte, max int) ([]storage.KeyValue, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store)) + // Allow accessing old values, it's RO thing. + tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store)) return tr.Find(prefix, start, max) } // GetStateProof returns proof of having key in the MPT with the specified root. func (s *Module) GetStateProof(root util.Uint256, key []byte) ([][]byte, error) { - tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode, storage.NewMemCachedStore(s.Store)) + // Allow accessing old values, it's RO thing. + tr := mpt.NewTrie(mpt.NewHashNode(root), s.mode&^mpt.ModeGCFlag, storage.NewMemCachedStore(s.Store)) return tr.GetProof(key) } @@ -188,7 +191,7 @@ func (s *Module) AddMPTBatch(index uint32, b mpt.Batch, cache *storage.MemCached if _, err := mpt.PutBatch(b); err != nil { return nil, nil, err } - mpt.Flush() + mpt.Flush(index) sr := &state.MPTRoot{ Index: index, Root: mpt.StateRoot(), diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 76ecc56e1..4157ae983 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -222,7 +222,8 @@ func (s *Module) defineSyncStage() error { return fmt.Errorf("failed to get header to initialize MPT billet: %w", err) } var mode mpt.TrieMode - if s.bc.GetConfig().KeepOnlyLatestState { + // No need to enable GC here, it only has latest things. + if s.bc.GetConfig().KeepOnlyLatestState || s.bc.GetConfig().RemoveUntraceableBlocks { mode |= mpt.ModeLatest } s.billet = mpt.NewBillet(header.PrevStateRoot, mode, @@ -499,7 +500,8 @@ func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeByt defer s.lock.RUnlock() var mode mpt.TrieMode - if s.bc.GetConfig().KeepOnlyLatestState { + // GC must be turned off here to allow access to the archived nodes. + if s.bc.GetConfig().KeepOnlyLatestState || s.bc.GetConfig().RemoveUntraceableBlocks { mode |= mpt.ModeLatest } b := mpt.NewBillet(root, mode, 0, storage.NewMemCachedStore(s.dao.Store)) diff --git a/pkg/core/statesync/module_test.go b/pkg/core/statesync/module_test.go index 69fc6614c..79e24a8ba 100644 --- a/pkg/core/statesync/module_test.go +++ b/pkg/core/statesync/module_test.go @@ -24,7 +24,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) { require.NoError(t, tr.Put([]byte{0x06, 0x03}, []byte("leaf4"))) sr := tr.StateRoot() - tr.Flush() + tr.Flush(0) // Keep MPT nodes in a map in order not to repeat them. We'll use `nodes` map to ask // state sync module to restore the nodes. From 423c7883b81ce14cc37a91d1d3588bcbd1ccb34c Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Sat, 29 Jan 2022 11:28:29 +0300 Subject: [PATCH 6/8] core: implement basic GC for value-based storage scheme The key idea here is that even though we can't ensure MPT code won't make the node active again we can order the changes made to the persistent store in such a way that it practically doesn't matter. What happens is: * after persist if it's time to collect our garbage we do it synchronously right in the same thread working the underlying persistent store directly * all the other node code doesn't see much of it, it works with bc.dao or layers above it * if MPT doesn't find some stale deactivated node in the storage it's OK, it'll recreate it in bc.dao * if MPT finds it and activates it, it's OK too, bc.dao will store it * while GC is being performed nothing else changes the persistent store * all subsequent bc.dao persists only happen after the GC is completed which means that any changes to the (potentially) deleted nodes have a priority, it's OK for GC to delete something that'll be recreated with the next persist cycle Otherwise it's a simple scheme with node status/last active height stored in the value. Preliminary tests show that it works ~18% worse than the simple KeepOnlyLatest scheme, but this seems to be the best result so far. Fixes #2095. --- docs/node-configuration.md | 6 +++-- pkg/config/protocol_config.go | 11 +++++--- pkg/core/blockchain.go | 49 ++++++++++++++++++++++++++++++++++- pkg/core/blockchain_test.go | 36 ++++++++++++++++++++----- pkg/core/stateroot/module.go | 42 ++++++++++++++++++++++++++++++ 5 files changed, 131 insertions(+), 13 deletions(-) diff --git a/docs/node-configuration.md b/docs/node-configuration.md index 3a88c919c..b4e04a2c2 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -198,7 +198,9 @@ protocol-related settings described in the table below. | Section | Type | Default value | Description | Notes | | --- | --- | --- | --- | --- | | CommitteeHistory | map[uint32]int | none | Number of committee members after given height, for example `{0: 1, 20: 4}` sets up a chain with one committee member since the genesis and then changes the setting to 4 committee members at the height of 20. `StandbyCommittee` committee setting must have the number of keys equal or exceeding the highest value in this option. Blocks numbers where the change happens must be divisble by the old and by the new values simultaneously. If not set, committee size is derived from the `StandbyCommittee` setting and never changes. | -| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store latest state. If true, DB size will be smaller, but older roots won't be accessible. This value should remain the same for the same database. | +| GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. | +| KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store latest state. If true, DB size will be smaller, but older roots won't be accessible. This value should remain th +e same for the same database. | | Magic | `uint32` | `0` | Magic number which uniquely identifies NEO network. | | MaxBlockSize | `uint32` | `262144` | Maximum block size in bytes. | | MaxBlockSystemFee | `int64` | `900000000000` | Maximum overall transactions system fee per block. | @@ -209,7 +211,7 @@ protocol-related settings described in the table below. | P2PNotaryRequestPayloadPoolSize | `int` | `1000` | Size of the node's P2P Notary request payloads memory pool where P2P Notary requests are stored before main or fallback transaction is completed and added to the chain.
This option is valid only if `P2PSigExtensions` are enabled. | Not supported by the C# node, thus may affect heterogeneous networks functionality. | | P2PSigExtensions | `bool` | `false` | Enables following additional Notary service related logic:
• Transaction attributes `NotValidBefore`, `Conflicts` and `NotaryAssisted`
• Network payload of the `P2PNotaryRequest` type
• Native `Notary` contract
• Notary node module | Not supported by the C# node, thus may affect heterogeneous networks functionality. | | P2PStateExchangeExtensions | `bool` | `false` | Enables following P2P MPT state data exchange logic:
• `StateSyncInterval` protocol setting
• P2P commands `GetMPTDataCMD` and `MPTDataCMD` | Not supported by the C# node, thus may affect heterogeneous networks functionality. | -| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only last `MaxTraceableBlocks` are stored and accessible to smart contracts. | +| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only last `MaxTraceableBlocks` are stored and accessible to smart contracts. Old MPT data is also deleted in accordance with `GarbageCollectionPeriod` setting. | | ReservedAttributes | `bool` | `false` | Allows to have reserved attributes range for experimental or private purposes. | | SaveStorageBatch | `bool` | `false` | Enables storage batch saving before every persist. It is similar to StorageDump plugin for C# node. | | SecondsPerBlock | `int` | `15` | Minimal time that should pass before next block is accepted. | diff --git a/pkg/config/protocol_config.go b/pkg/config/protocol_config.go index 3e9f5a4fe..007602d0c 100644 --- a/pkg/config/protocol_config.go +++ b/pkg/config/protocol_config.go @@ -15,8 +15,13 @@ type ( ProtocolConfiguration struct { // CommitteeHistory stores committee size change history (height: size). CommitteeHistory map[uint32]int `yaml:"CommitteeHistory"` - Magic netmode.Magic `yaml:"Magic"` - MemPoolSize int `yaml:"MemPoolSize"` + // GarbageCollectionPeriod sets the number of blocks to wait before + // starting the next MPT garbage collection cycle when RemoveUntraceableBlocks + // option is used. + GarbageCollectionPeriod uint32 `yaml:"GarbageCollectionPeriod"` + + Magic netmode.Magic `yaml:"Magic"` + MemPoolSize int `yaml:"MemPoolSize"` // InitialGASSupply is the amount of GAS generated in the genesis block. InitialGASSupply fixedn.Fixed8 `yaml:"InitialGASSupply"` @@ -27,7 +32,7 @@ type ( // If true, DB size will be smaller, but older roots won't be accessible. // This value should remain the same for the same database. KeepOnlyLatestState bool `yaml:"KeepOnlyLatestState"` - // RemoveUntraceableBlocks specifies if old blocks should be removed. + // RemoveUntraceableBlocks specifies if old data should be removed. RemoveUntraceableBlocks bool `yaml:"RemoveUntraceableBlocks"` // MaxBlockSize is the maximum block size in bytes. MaxBlockSize uint32 `yaml:"MaxBlockSize"` diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 4e10d42d2..69863d652 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -47,6 +47,7 @@ const ( version = "0.2.2" defaultInitialGAS = 52000000_00000000 + defaultGCPeriod = 10000 defaultMemPoolSize = 50000 defaultP2PNotaryRequestPayloadPoolSize = 1000 defaultMaxBlockSize = 262144 @@ -124,6 +125,9 @@ type Blockchain struct { // are directly from underlying persistent store. persistent *dao.Simple + // Underlying persistent store. + store storage.Store + // Current index/height of the highest block. // Read access should always be called by BlockHeight(). // Write access should only happen in storeBlock(). @@ -245,6 +249,10 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L zap.Int("StateSyncInterval", cfg.StateSyncInterval)) } } + if cfg.RemoveUntraceableBlocks && cfg.GarbageCollectionPeriod == 0 { + cfg.GarbageCollectionPeriod = defaultGCPeriod + log.Info("GarbageCollectionPeriod is not set or wrong, using default value", zap.Uint32("GarbageCollectionPeriod", cfg.GarbageCollectionPeriod)) + } if len(cfg.NativeUpdateHistories) == 0 { cfg.NativeUpdateHistories = map[string][]uint32{} log.Info("NativeActivations are not set, using default values") @@ -253,6 +261,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L config: cfg, dao: dao.NewSimple(s, cfg.StateRootInHeader, cfg.P2PSigExtensions), persistent: dao.NewSimple(s, cfg.StateRootInHeader, cfg.P2PSigExtensions), + store: s, stopCh: make(chan struct{}), runToExitCh: make(chan struct{}), memPool: mempool.New(cfg.MemPoolSize, 0, false), @@ -647,12 +656,21 @@ func (bc *Blockchain) Run() { case <-bc.stopCh: return case <-persistTimer.C: + var oldPersisted uint32 + var gcDur time.Duration + + if bc.config.RemoveUntraceableBlocks { + oldPersisted = atomic.LoadUint32(&bc.persistedHeight) + } dur, err := bc.persist(nextSync) if err != nil { bc.log.Warn("failed to persist blockchain", zap.Error(err)) } + if bc.config.RemoveUntraceableBlocks { + gcDur = bc.tryRunGC(oldPersisted) + } nextSync = dur > persistInterval*2 - interval := persistInterval - dur + interval := persistInterval - dur - gcDur if interval <= 0 { interval = time.Microsecond // Reset doesn't work with zero value } @@ -661,6 +679,35 @@ func (bc *Blockchain) Run() { } } +func (bc *Blockchain) tryRunGC(old uint32) time.Duration { + var dur time.Duration + + new := atomic.LoadUint32(&bc.persistedHeight) + var tgtBlock = int64(new) + + tgtBlock -= int64(bc.config.MaxTraceableBlocks) + if bc.config.P2PStateExchangeExtensions { + syncP := new / uint32(bc.config.StateSyncInterval) + syncP-- + syncP *= uint32(bc.config.StateSyncInterval) + if tgtBlock > int64(syncP) { + tgtBlock = int64(syncP) + } + } + // Always round to the GCP. + tgtBlock /= int64(bc.config.GarbageCollectionPeriod) + tgtBlock *= int64(bc.config.GarbageCollectionPeriod) + // Count periods. + old /= bc.config.GarbageCollectionPeriod + new /= bc.config.GarbageCollectionPeriod + if tgtBlock > int64(bc.config.GarbageCollectionPeriod) && new != old { + tgtBlock /= int64(bc.config.GarbageCollectionPeriod) + tgtBlock *= int64(bc.config.GarbageCollectionPeriod) + dur = bc.stateRoot.GC(uint32(tgtBlock), bc.store) + } + return dur +} + // notificationDispatcher manages subscription to events and broadcasts new events. func (bc *Blockchain) notificationDispatcher() { var ( diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 42c81bd38..4b9f1da2f 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -1589,7 +1589,7 @@ func TestDumpAndRestore(t *testing.T) { } func TestRemoveUntraceable(t *testing.T) { - check := func(t *testing.T, bc *Blockchain, tHash, bHash util.Uint256, errorExpected bool) { + check := func(t *testing.T, bc *Blockchain, tHash, bHash, sHash util.Uint256, errorExpected bool) { _, _, err := bc.GetTransaction(tHash) if errorExpected { require.Error(t, err) @@ -1610,10 +1610,20 @@ func TestRemoveUntraceable(t *testing.T) { } _, err = bc.GetHeader(bHash) require.NoError(t, err) + if !sHash.Equals(util.Uint256{}) { + sm := bc.GetStateModule() + _, err = sm.GetState(sHash, []byte{0xfb, 0xff, 0xff, 0xff, 0x0e}) // NEO committee key. + if errorExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + } + } } t.Run("P2PStateExchangeExtensions off", func(t *testing.T) { bc := newTestChainWithCustomCfg(t, func(c *config.Config) { c.ProtocolConfiguration.MaxTraceableBlocks = 2 + c.ProtocolConfiguration.GarbageCollectionPeriod = 2 c.ProtocolConfiguration.RemoveUntraceableBlocks = true }) @@ -1622,6 +1632,8 @@ func TestRemoveUntraceable(t *testing.T) { b1 := bc.newBlock(tx1) require.NoError(t, bc.AddBlock(b1)) tx1Height := bc.BlockHeight() + sRoot, err := bc.GetStateModule().GetStateRoot(tx1Height) + require.NoError(t, err) tx2, err := testchain.NewTransferFromOwner(bc, bc.contracts.NEO.Hash, util.Uint160{}, 1, 0, bc.BlockHeight()+1) require.NoError(t, err) @@ -1631,13 +1643,21 @@ func TestRemoveUntraceable(t *testing.T) { require.NoError(t, err) require.Equal(t, tx1Height, h1) + check(t, bc, tx1.Hash(), b1.Hash(), sRoot.Root, false) require.NoError(t, bc.AddBlock(bc.newBlock())) - - check(t, bc, tx1.Hash(), b1.Hash(), true) + require.NoError(t, bc.AddBlock(bc.newBlock())) + require.NoError(t, bc.AddBlock(bc.newBlock())) + require.NoError(t, bc.AddBlock(bc.newBlock())) + // Don't wait for Run(). + _, err = bc.persist(true) + require.NoError(t, err) + bc.tryRunGC(0) + check(t, bc, tx1.Hash(), b1.Hash(), sRoot.Root, true) }) t.Run("P2PStateExchangeExtensions on", func(t *testing.T) { bc := newTestChainWithCustomCfg(t, func(c *config.Config) { c.ProtocolConfiguration.MaxTraceableBlocks = 2 + c.ProtocolConfiguration.GarbageCollectionPeriod = 2 c.ProtocolConfiguration.RemoveUntraceableBlocks = true c.ProtocolConfiguration.P2PStateExchangeExtensions = true c.ProtocolConfiguration.StateSyncInterval = 2 @@ -1649,6 +1669,8 @@ func TestRemoveUntraceable(t *testing.T) { b1 := bc.newBlock(tx1) require.NoError(t, bc.AddBlock(b1)) tx1Height := bc.BlockHeight() + sRoot, err := bc.GetStateModule().GetStateRoot(tx1Height) + require.NoError(t, err) tx2, err := testchain.NewTransferFromOwner(bc, bc.contracts.NEO.Hash, util.Uint160{}, 1, 0, bc.BlockHeight()+1) require.NoError(t, err) @@ -1664,13 +1686,13 @@ func TestRemoveUntraceable(t *testing.T) { require.NoError(t, bc.AddBlock(bc.newBlock())) require.NoError(t, bc.AddBlock(bc.newBlock())) - check(t, bc, tx1.Hash(), b1.Hash(), false) - check(t, bc, tx2.Hash(), b2.Hash(), false) + check(t, bc, tx1.Hash(), b1.Hash(), sRoot.Root, false) + check(t, bc, tx2.Hash(), b2.Hash(), sRoot.Root, false) require.NoError(t, bc.AddBlock(bc.newBlock())) - check(t, bc, tx1.Hash(), b1.Hash(), true) - check(t, bc, tx2.Hash(), b2.Hash(), false) + check(t, bc, tx1.Hash(), b1.Hash(), util.Uint256{}, true) + check(t, bc, tx2.Hash(), b2.Hash(), util.Uint256{}, false) _, h2, err := bc.GetTransaction(tx2.Hash()) require.NoError(t, err) require.Equal(t, tx2Height, h2) diff --git a/pkg/core/stateroot/module.go b/pkg/core/stateroot/module.go index b3d6b1a63..b01828ed8 100644 --- a/pkg/core/stateroot/module.go +++ b/pkg/core/stateroot/module.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config/netmode" @@ -57,6 +58,9 @@ func NewModule(cfg config.ProtocolConfiguration, verif VerifierFunc, log *zap.Lo if cfg.KeepOnlyLatestState { mode |= mpt.ModeLatest } + if cfg.RemoveUntraceableBlocks { + mode |= mpt.ModeGC + } return &Module{ network: cfg.Magic, srInHead: cfg.StateRootInHeader, @@ -184,6 +188,44 @@ func (s *Module) JumpToState(sr *state.MPTRoot) error { return nil } +// GC performs garbage collection. +func (s *Module) GC(index uint32, store storage.Store) time.Duration { + if !s.mode.GC() { + panic("stateroot: GC invoked, but not enabled") + } + var removed int + var stored int64 + s.log.Info("starting MPT garbage collection", zap.Uint32("index", index)) + start := time.Now() + b := store.Batch() + store.Seek(storage.SeekRange{ + Prefix: []byte{byte(storage.DataMPT)}, + }, func(k, v []byte) bool { + stored++ + if !mpt.IsActiveValue(v) { + h := binary.LittleEndian.Uint32(v[len(v)-4:]) + if h > index { + return true + } + b.Delete(k) + removed++ + stored-- + } + return true + }) + err := store.PutBatch(b) + dur := time.Since(start) + if err != nil { + s.log.Error("failed to flush MPT GC changeset", zap.Duration("time", dur), zap.Error(err)) + } else { + s.log.Info("finished MPT garbage collection", + zap.Int("removed", removed), + zap.Int64("stored", stored), + zap.Duration("time", dur)) + } + return dur +} + // AddMPTBatch updates using provided batch. func (s *Module) AddMPTBatch(index uint32, b mpt.Batch, cache *storage.MemCachedStore) (*mpt.Trie, *state.MPTRoot, error) { mpt := *s.mpt From c5f1e2fbcdc4f740956bcd5a4e97104ef81a6f74 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 8 Feb 2022 18:15:05 +0300 Subject: [PATCH 7/8] mpt: don't allocate for every key Microoptimization. --- pkg/core/mpt/trie.go | 9 +++++---- pkg/core/mpt/trie_test.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/core/mpt/trie.go b/pkg/core/mpt/trie.go index fef9e682f..f1d2deebc 100644 --- a/pkg/core/mpt/trie.go +++ b/pkg/core/mpt/trie.go @@ -407,18 +407,20 @@ func makeStorageKey(mptKey util.Uint256) []byte { // new node to storage. Normally, flush should be called with every StateRoot persist, i.e. // after every block. func (t *Trie) Flush(index uint32) { + key := makeStorageKey(util.Uint256{}) for h, node := range t.refcount { if node.refcount != 0 { + copy(key[1:], h[:]) if node.bytes == nil { panic("item not in trie") } if t.mode.RC() { - node.initial = t.updateRefCount(h, index) + node.initial = t.updateRefCount(h, key, index) if node.initial == 0 { delete(t.refcount, h) } } else if node.refcount > 0 { - _ = t.Store.Put(makeStorageKey(h), node.bytes) + _ = t.Store.Put(key, node.bytes) } node.refcount = 0 } else { @@ -440,12 +442,11 @@ func getFromStore(key []byte, mode TrieMode, store *storage.MemCachedStore) ([]b } // updateRefCount should be called only when refcounting is enabled. -func (t *Trie) updateRefCount(h util.Uint256, index uint32) int32 { +func (t *Trie) updateRefCount(h util.Uint256, key []byte, index uint32) int32 { if !t.mode.RC() { panic("`updateRefCount` is called, but GC is disabled") } var data []byte - key := makeStorageKey(h) node := t.refcount[h] cnt := node.initial if cnt == 0 { diff --git a/pkg/core/mpt/trie_test.go b/pkg/core/mpt/trie_test.go index e7eda2d0d..97c5ef4e4 100644 --- a/pkg/core/mpt/trie_test.go +++ b/pkg/core/mpt/trie_test.go @@ -249,7 +249,7 @@ func (tr *Trie) putToStore(n Node) { bytes: n.Bytes(), refcount: 1, } - tr.updateRefCount(n.Hash(), 0) + tr.updateRefCount(n.Hash(), makeStorageKey(n.Hash()), 0) } else { _ = tr.Store.Put(makeStorageKey(n.Hash()), n.Bytes()) } From 373fce54e6986881fc7d0acf2abd4cede3f450f7 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 8 Feb 2022 21:47:17 +0300 Subject: [PATCH 8/8] config: conflict P2PStateExchangeExtensions/KeepOnlyLatestState They don't make sense together, for P2P state exchange to be possible we need a set of MPTs. --- docs/node-configuration.md | 4 ++-- pkg/config/protocol_config.go | 3 +++ pkg/config/protocol_config_test.go | 9 +++++++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/docs/node-configuration.md b/docs/node-configuration.md index b4e04a2c2..9591981ba 100644 --- a/docs/node-configuration.md +++ b/docs/node-configuration.md @@ -200,7 +200,7 @@ protocol-related settings described in the table below. | CommitteeHistory | map[uint32]int | none | Number of committee members after given height, for example `{0: 1, 20: 4}` sets up a chain with one committee member since the genesis and then changes the setting to 4 committee members at the height of 20. `StandbyCommittee` committee setting must have the number of keys equal or exceeding the highest value in this option. Blocks numbers where the change happens must be divisble by the old and by the new values simultaneously. If not set, committee size is derived from the `StandbyCommittee` setting and never changes. | | GarbageCollectionPeriod | `uint32` | 10000 | Controls MPT garbage collection interval (in blocks) for configurations with `RemoveUntraceableBlocks` enabled and `KeepOnlyLatestState` disabled. In this mode the node stores a number of MPT trees (corresponding to `MaxTraceableBlocks` and `StateSyncInterval`), but the DB needs to be clean from old entries from time to time. Doing it too often will cause too much processing overhead, doing it too rarely will leave more useless data in the DB. | | KeepOnlyLatestState | `bool` | `false` | Specifies if MPT should only store latest state. If true, DB size will be smaller, but older roots won't be accessible. This value should remain th -e same for the same database. | +e same for the same database. | Conflicts with `P2PStateExchangeExtensions`. | | Magic | `uint32` | `0` | Magic number which uniquely identifies NEO network. | | MaxBlockSize | `uint32` | `262144` | Maximum block size in bytes. | | MaxBlockSystemFee | `int64` | `900000000000` | Maximum overall transactions system fee per block. | @@ -210,7 +210,7 @@ e same for the same database. | | NativeActivations | `map[string][]uint32` | ContractManagement: [0]
StdLib: [0]
CryptoLib: [0]
LedgerContract: [0]
NeoToken: [0]
GasToken: [0]
PolicyContract: [0]
RoleManagement: [0]
OracleContract: [0] | The list of histories of native contracts updates. Each list item shod be presented as a known native contract name with the corresponding list of chain's heights. The contract is not active until chain reaches the first height value specified in the list. | `Notary` is supported. | | P2PNotaryRequestPayloadPoolSize | `int` | `1000` | Size of the node's P2P Notary request payloads memory pool where P2P Notary requests are stored before main or fallback transaction is completed and added to the chain.
This option is valid only if `P2PSigExtensions` are enabled. | Not supported by the C# node, thus may affect heterogeneous networks functionality. | | P2PSigExtensions | `bool` | `false` | Enables following additional Notary service related logic:
• Transaction attributes `NotValidBefore`, `Conflicts` and `NotaryAssisted`
• Network payload of the `P2PNotaryRequest` type
• Native `Notary` contract
• Notary node module | Not supported by the C# node, thus may affect heterogeneous networks functionality. | -| P2PStateExchangeExtensions | `bool` | `false` | Enables following P2P MPT state data exchange logic:
• `StateSyncInterval` protocol setting
• P2P commands `GetMPTDataCMD` and `MPTDataCMD` | Not supported by the C# node, thus may affect heterogeneous networks functionality. | +| P2PStateExchangeExtensions | `bool` | `false` | Enables following P2P MPT state data exchange logic:
• `StateSyncInterval` protocol setting
• P2P commands `GetMPTDataCMD` and `MPTDataCMD` | Not supported by the C# node, thus may affect heterogeneous networks functionality. Conflicts with `KeepOnlyLatestState`. | | RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only last `MaxTraceableBlocks` are stored and accessible to smart contracts. Old MPT data is also deleted in accordance with `GarbageCollectionPeriod` setting. | | ReservedAttributes | `bool` | `false` | Allows to have reserved attributes range for experimental or private purposes. | | SaveStorageBatch | `bool` | `false` | Enables storage batch saving before every persist. It is similar to StorageDump plugin for C# node. | diff --git a/pkg/config/protocol_config.go b/pkg/config/protocol_config.go index 007602d0c..c2f5dff01 100644 --- a/pkg/config/protocol_config.go +++ b/pkg/config/protocol_config.go @@ -86,6 +86,9 @@ type heightNumber struct { func (p *ProtocolConfiguration) Validate() error { var err error + if p.KeepOnlyLatestState && p.P2PStateExchangeExtensions { + return errors.New("can't have both KeepOnlyLatestState and P2PStateExchangeExtensions") + } for name := range p.NativeUpdateHistories { if !nativenames.IsValid(name) { return fmt.Errorf("NativeActivations configuration section contains unexpected native contract name: %s", name) diff --git a/pkg/config/protocol_config_test.go b/pkg/config/protocol_config_test.go index b2c978279..2d3c2d811 100644 --- a/pkg/config/protocol_config_test.go +++ b/pkg/config/protocol_config_test.go @@ -8,6 +8,15 @@ import ( func TestProtocolConfigurationValidation(t *testing.T) { p := &ProtocolConfiguration{ + StandbyCommittee: []string{ + "02b3622bf4017bdfe317c58aed5f4c753f206b7db896046fa7d774bbc4bf7f8dc2", + }, + ValidatorsCount: 1, + KeepOnlyLatestState: true, + P2PStateExchangeExtensions: true, + } + require.Error(t, p.Validate()) + p = &ProtocolConfiguration{ ValidatorsCount: 1, } require.Error(t, p.Validate())