From a2d9b8996432205d1e5c8ec11d9783a51abde16f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 31 Aug 2020 22:11:49 +0300 Subject: [PATCH] dao: reuse buffers when storing blocks, txes and aers Reduce memory allocation pressure. --- pkg/core/blockchain.go | 16 ++++++++---- pkg/core/dao/dao.go | 45 +++++++++++++++++++++------------ pkg/core/dao/dao_test.go | 8 +++--- pkg/core/interop_system_test.go | 14 +++++----- 4 files changed, 51 insertions(+), 32 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 3e8c1e855..6c577c0de 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -577,14 +577,17 @@ func (bc *Blockchain) GetStateRoot(height uint32) (*state.MPTRootState, error) { // This is the only way to change Blockchain state. func (bc *Blockchain) storeBlock(block *block.Block) error { cache := dao.NewCached(bc.dao) + writeBuf := io.NewBufBinWriter() appExecResults := make([]*state.AppExecResult, 0, 1+len(block.Transactions)) - if err := cache.StoreAsBlock(block); err != nil { + if err := cache.StoreAsBlock(block, writeBuf); err != nil { return err } + writeBuf.Reset() - if err := cache.StoreAsCurrentBlock(block); err != nil { + if err := cache.StoreAsCurrentBlock(block, writeBuf); err != nil { return err } + writeBuf.Reset() if block.Index > 0 { systemInterop := bc.newInteropContext(trigger.System, cache, block, nil) @@ -608,17 +611,19 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { Events: systemInterop.Notifications, } appExecResults = append(appExecResults, aer) - err := cache.PutAppExecResult(aer) + err := cache.PutAppExecResult(aer, writeBuf) if err != nil { return fmt.Errorf("failed to store onPersist exec result: %w", err) } + writeBuf.Reset() } var txHashes = make([]util.Uint256, len(block.Transactions)) for i, tx := range block.Transactions { - if err := cache.StoreAsTransaction(tx, block.Index); err != nil { + if err := cache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil { return err } + writeBuf.Reset() systemInterop := bc.newInteropContext(trigger.Application, cache, block, tx) v := systemInterop.SpawnVM() @@ -650,10 +655,11 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { Events: systemInterop.Notifications, } appExecResults = append(appExecResults, aer) - err = cache.PutAppExecResult(aer) + err = cache.PutAppExecResult(aer, writeBuf) if err != nil { return fmt.Errorf("failed to store tx exec result: %w", err) } + writeBuf.Reset() txHashes[i] = tx.Hash() } sort.Slice(txHashes, func(i, j int) bool { diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index efcc9f0cc..22030a974 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -44,16 +44,16 @@ type DAO interface { GetWrapped() DAO HasTransaction(hash util.Uint256) bool Persist() (int, error) - PutAppExecResult(aer *state.AppExecResult) error + PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error PutContractState(cs *state.Contract) error PutCurrentHeader(hashAndIndex []byte) error PutNEP5Balances(acc util.Uint160, bs *state.NEP5Balances) error PutNEP5TransferLog(acc util.Uint160, index uint32, lg *state.NEP5TransferLog) error PutStorageItem(id int32, key []byte, si *state.StorageItem) error PutVersion(v string) error - StoreAsBlock(block *block.Block) error - StoreAsCurrentBlock(block *block.Block) error - StoreAsTransaction(tx *transaction.Transaction, index uint32) error + StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error + StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error + StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error putNEP5Balances(acc util.Uint160, bs *state.NEP5Balances, buf *io.BufBinWriter) error } @@ -270,10 +270,13 @@ func (dao *Simple) GetAppExecResult(hash util.Uint256) (*state.AppExecResult, er } // PutAppExecResult puts given application execution result into the -// given store. -func (dao *Simple) PutAppExecResult(aer *state.AppExecResult) error { +// given store. It can reuse given buffer for the purpose of value serialization. +func (dao *Simple) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error { key := storage.AppendPrefix(storage.STNotification, aer.TxHash.BytesBE()) - return dao.Put(aer, key) + if buf == nil { + return dao.Put(aer, key) + } + return dao.putWithBuffer(aer, key, buf) } // -- end notification event. @@ -560,12 +563,15 @@ func (dao *Simple) HasTransaction(hash util.Uint256) bool { return false } -// StoreAsBlock stores the given block as DataBlock. -func (dao *Simple) StoreAsBlock(block *block.Block) error { +// StoreAsBlock stores given block as DataBlock. It can reuse given buffer for +// the purpose of value serialization. +func (dao *Simple) StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error { var ( key = storage.AppendPrefix(storage.DataBlock, block.Hash().BytesLE()) - buf = io.NewBufBinWriter() ) + if buf == nil { + buf = io.NewBufBinWriter() + } b, err := block.Trim() if err != nil { return err @@ -577,19 +583,26 @@ func (dao *Simple) StoreAsBlock(block *block.Block) error { return dao.Store.Put(key, buf.Bytes()) } -// StoreAsCurrentBlock stores the given block witch prefix SYSCurrentBlock. -func (dao *Simple) StoreAsCurrentBlock(block *block.Block) error { - buf := io.NewBufBinWriter() +// StoreAsCurrentBlock stores a hash of the given block with prefix +// SYSCurrentBlock. It can reuse given buffer for the purpose of value +// serialization. +func (dao *Simple) StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error { + if buf == nil { + buf = io.NewBufBinWriter() + } h := block.Hash() h.EncodeBinary(buf.BinWriter) buf.WriteU32LE(block.Index) return dao.Store.Put(storage.SYSCurrentBlock.Bytes(), buf.Bytes()) } -// StoreAsTransaction stores the given TX as DataTransaction. -func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32) error { +// StoreAsTransaction stores given TX as DataTransaction. It can reuse given +// buffer for the purpose of value serialization. +func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error { key := storage.AppendPrefix(storage.DataTransaction, tx.Hash().BytesLE()) - buf := io.NewBufBinWriter() + if buf == nil { + buf = io.NewBufBinWriter() + } buf.WriteU32LE(index) tx.EncodeBinary(buf.BinWriter) if buf.Err != nil { diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 64a85accd..9fcbd0676 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -86,7 +86,7 @@ func TestPutGetAppExecResult(t *testing.T) { Events: []state.NotificationEvent{}, Stack: []stackitem.Item{}, } - err := dao.PutAppExecResult(appExecResult) + err := dao.PutAppExecResult(appExecResult, nil) require.NoError(t, err) gotAppExecResult, err := dao.GetAppExecResult(hash) require.NoError(t, err) @@ -136,7 +136,7 @@ func TestPutGetBlock(t *testing.T) { }, } hash := b.Hash() - err := dao.StoreAsBlock(b) + err := dao.StoreAsBlock(b, nil) require.NoError(t, err) gotBlock, err := dao.GetBlock(hash) require.NoError(t, err) @@ -176,7 +176,7 @@ func TestGetCurrentHeaderHeight_Store(t *testing.T) { }, }, } - err := dao.StoreAsCurrentBlock(b) + err := dao.StoreAsCurrentBlock(b, nil) require.NoError(t, err) height, err := dao.GetCurrentBlockHeight() require.NoError(t, err) @@ -187,7 +187,7 @@ func TestStoreAsTransaction(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), netmode.UnitTestNet) tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 1) hash := tx.Hash() - err := dao.StoreAsTransaction(tx, 0) + err := dao.StoreAsTransaction(tx, 0, nil) require.NoError(t, err) hasTransaction := dao.HasTransaction(hash) require.True(t, hasTransaction) diff --git a/pkg/core/interop_system_test.go b/pkg/core/interop_system_test.go index a7187cbb0..56eb18ade 100644 --- a/pkg/core/interop_system_test.go +++ b/pkg/core/interop_system_test.go @@ -27,7 +27,7 @@ func TestBCGetTransaction(t *testing.T) { defer chain.Close() t.Run("success", func(t *testing.T) { - require.NoError(t, context.DAO.StoreAsTransaction(tx, 0)) + require.NoError(t, context.DAO.StoreAsTransaction(tx, 0, nil)) v.Estack().PushVal(tx.Hash().BytesBE()) err := bcGetTransaction(context) require.NoError(t, err) @@ -47,7 +47,7 @@ func TestBCGetTransaction(t *testing.T) { }) t.Run("isn't traceable", func(t *testing.T) { - require.NoError(t, context.DAO.StoreAsTransaction(tx, 1)) + require.NoError(t, context.DAO.StoreAsTransaction(tx, 1, nil)) v.Estack().PushVal(tx.Hash().BytesBE()) err := bcGetTransaction(context) require.NoError(t, err) @@ -57,7 +57,7 @@ func TestBCGetTransaction(t *testing.T) { }) t.Run("bad hash", func(t *testing.T) { - require.NoError(t, context.DAO.StoreAsTransaction(tx, 1)) + require.NoError(t, context.DAO.StoreAsTransaction(tx, 1, nil)) v.Estack().PushVal(tx.Hash().BytesLE()) err := bcGetTransaction(context) require.NoError(t, err) @@ -71,7 +71,7 @@ func TestBCGetTransactionFromBlock(t *testing.T) { v, block, context, chain := createVMAndBlock(t) defer chain.Close() require.NoError(t, chain.AddBlock(chain.newBlock())) - require.NoError(t, context.DAO.StoreAsBlock(block)) + require.NoError(t, context.DAO.StoreAsBlock(block, nil)) t.Run("success", func(t *testing.T) { v.Estack().PushVal(0) @@ -94,7 +94,7 @@ func TestBCGetTransactionFromBlock(t *testing.T) { t.Run("isn't traceable", func(t *testing.T) { block.Index = 2 - require.NoError(t, context.DAO.StoreAsBlock(block)) + require.NoError(t, context.DAO.StoreAsBlock(block, nil)) v.Estack().PushVal(0) v.Estack().PushVal(block.Hash().BytesBE()) err := bcGetTransactionFromBlock(context) @@ -106,7 +106,7 @@ func TestBCGetTransactionFromBlock(t *testing.T) { t.Run("bad block hash", func(t *testing.T) { block.Index = 1 - require.NoError(t, context.DAO.StoreAsBlock(block)) + require.NoError(t, context.DAO.StoreAsBlock(block, nil)) v.Estack().PushVal(0) v.Estack().PushVal(block.Hash().BytesLE()) err := bcGetTransactionFromBlock(context) @@ -117,7 +117,7 @@ func TestBCGetTransactionFromBlock(t *testing.T) { }) t.Run("bad transaction index", func(t *testing.T) { - require.NoError(t, context.DAO.StoreAsBlock(block)) + require.NoError(t, context.DAO.StoreAsBlock(block, nil)) v.Estack().PushVal(1) v.Estack().PushVal(block.Hash().BytesBE()) err := bcGetTransactionFromBlock(context)