From 168ba7960c3f36af72a2e4dc7a2588b5785401b3 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 24 Dec 2020 14:19:10 +0300 Subject: [PATCH 1/2] mpt: do not allocate new buffer when updating dirty node Running time becomes faster under high load while staying the same in the average case. Memory allocation done in `Trie` goes down by about ~10% (even more, actually). --- pkg/core/mpt/base.go | 2 +- pkg/core/mpt/trie.go | 17 ++++++++--------- pkg/io/binaryBufWriter.go | 6 ++++++ 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/pkg/core/mpt/base.go b/pkg/core/mpt/base.go index 1c4ebd8a6..c81bfe6be 100644 --- a/pkg/core/mpt/base.go +++ b/pkg/core/mpt/base.go @@ -63,7 +63,7 @@ func (b *BaseNode) updateHash(n Node) { // updateCache updates hash and bytes fields for this BaseNode. func (b *BaseNode) updateBytes(n Node) { - buf := io.NewBufBinWriter() + buf := io.NewBufBinWriterPreAlloc(b.bytes) encodeNodeWithType(n, buf.BinWriter) b.bytes = buf.Bytes() b.bytesValid = true diff --git a/pkg/core/mpt/trie.go b/pkg/core/mpt/trie.go index 751b1b2f3..acc11639c 100644 --- a/pkg/core/mpt/trie.go +++ b/pkg/core/mpt/trie.go @@ -418,31 +418,29 @@ func (t *Trie) updateRefCount(h util.Uint256) int32 { func (t *Trie) addRef(h util.Uint256, bs []byte) { node := t.refcount[h] if node == nil { + data := make([]byte, len(bs)) + copy(data, bs) t.refcount[h] = &cachedNode{ refcount: 1, - bytes: bs, + bytes: data, } return } node.refcount++ - if node.bytes == nil { - node.bytes = bs - } } func (t *Trie) removeRef(h util.Uint256, bs []byte) { node := t.refcount[h] if node == nil { + data := make([]byte, len(bs)) + copy(data, bs) t.refcount[h] = &cachedNode{ refcount: -1, - bytes: bs, + bytes: data, } return } node.refcount-- - if node.bytes == nil { - node.bytes = bs - } } func (t *Trie) getFromStore(h util.Uint256) (Node, error) { @@ -462,7 +460,8 @@ func (t *Trie) getFromStore(h util.Uint256) (Node, error) { data = data[:len(data)-4] node := t.refcount[h] if node != nil { - node.bytes = data + node.bytes = make([]byte, len(data)) + copy(node.bytes, data) node.initial = int32(r.ReadU32LE()) } } diff --git a/pkg/io/binaryBufWriter.go b/pkg/io/binaryBufWriter.go index 7015082e1..3e1245d3a 100644 --- a/pkg/io/binaryBufWriter.go +++ b/pkg/io/binaryBufWriter.go @@ -19,6 +19,12 @@ func NewBufBinWriter() *BufBinWriter { return &BufBinWriter{BinWriter: NewBinWriterFromIO(b), buf: b} } +// NewBufBinWriterPreAlloc makes a BufBinWriter using preallocated buffer. +func NewBufBinWriterPreAlloc(buf []byte) *BufBinWriter { + b := bytes.NewBuffer(buf[:0]) + return &BufBinWriter{BinWriter: NewBinWriterFromIO(b), buf: b} +} + // Len returns the number of bytes of the unread portion of the buffer. func (bw *BufBinWriter) Len() int { return bw.buf.Len() From 30423f3306a016b130227aa6f5845e0c63bc48c3 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 24 Dec 2020 19:32:27 +0300 Subject: [PATCH 2/2] mpt: update MPT after the block processing --- pkg/core/blockchain.go | 7 ++++++- pkg/core/dao/dao.go | 25 +++++++++++++++---------- pkg/core/native_management_test.go | 1 + pkg/core/storage/memory_store.go | 17 +++++++++++++++++ 4 files changed, 39 insertions(+), 11 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index c15ecb7cf..8ee28a6f4 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -687,7 +687,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error } writeBuf.Reset() - root := bc.dao.MPT.StateRoot() + d := cache.DAO.(*dao.Simple) + if err := d.UpdateMPT(); err != nil { + return fmt.Errorf("error while trying to apply MPT changes: %w", err) + } + + root := d.MPT.StateRoot() var prevHash util.Uint256 if block.Index > 0 { prev, err := bc.dao.GetStateRoot(block.Index - 1) diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index 40911e227..524204ed8 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -382,11 +382,6 @@ func (dao *Simple) PutStorageItem(id int32, key []byte, si *state.StorageItem) e return buf.Err } v := buf.Bytes() - if dao.MPT != nil { - if err := dao.MPT.Put(stKey[1:], v); err != nil && err != mpt.ErrNotFound { - return err - } - } return dao.Store.Put(stKey, v) } @@ -394,11 +389,6 @@ func (dao *Simple) PutStorageItem(id int32, key []byte, si *state.StorageItem) e // given key from the store. func (dao *Simple) DeleteStorageItem(id int32, key []byte) error { stKey := makeStorageItemKey(id, key) - if dao.MPT != nil { - if err := dao.MPT.Delete(stKey[1:]); err != nil && err != mpt.ErrNotFound { - return err - } - } return dao.Store.Delete(stKey) } @@ -701,3 +691,18 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, func (dao *Simple) Persist() (int, error) { return dao.Store.Persist() } + +// UpdateMPT updates MPT using storage items from the underlying memcached store. +func (dao *Simple) UpdateMPT() error { + var err error + dao.Store.MemoryStore.SeekAll([]byte{byte(storage.STStorage)}, func(k, v []byte) { + if err != nil { + return + } else if v != nil { + err = dao.MPT.Put(k[1:], v) + } else { + err = dao.MPT.Delete(k[1:]) + } + }) + return err +} diff --git a/pkg/core/native_management_test.go b/pkg/core/native_management_test.go index 43a0553f0..3c389d061 100644 --- a/pkg/core/native_management_test.go +++ b/pkg/core/native_management_test.go @@ -440,6 +440,7 @@ func TestContractDestroy(t *testing.T) { require.NoError(t, err) err = bc.dao.PutStorageItem(cs1.ID, []byte{1, 2, 3}, &state.StorageItem{Value: []byte{3, 2, 1}}) require.NoError(t, err) + require.NoError(t, bc.dao.UpdateMPT()) t.Run("no contract", func(t *testing.T) { res, err := invokeContractMethod(bc, 1_00000000, mgmtHash, "destroy") diff --git a/pkg/core/storage/memory_store.go b/pkg/core/storage/memory_store.go index 7238e0a42..3d542dd82 100644 --- a/pkg/core/storage/memory_store.go +++ b/pkg/core/storage/memory_store.go @@ -102,6 +102,23 @@ func (s *MemoryStore) Seek(key []byte, f func(k, v []byte)) { s.mut.RUnlock() } +// SeekAll is like seek but also iterates over deleted items. +func (s *MemoryStore) SeekAll(key []byte, f func(k, v []byte)) { + s.mut.RLock() + defer s.mut.RUnlock() + sk := string(key) + for k, v := range s.mem { + if strings.HasPrefix(k, sk) { + f([]byte(k), v) + } + } + for k := range s.del { + if strings.HasPrefix(k, sk) { + f([]byte(k), nil) + } + } +} + // seek is an internal unlocked implementation of Seek. func (s *MemoryStore) seek(key []byte, f func(k, v []byte)) { for k, v := range s.mem {