dao: reuse buffers when storing blocks, txes and aers

Reduce memory allocation pressure.
This commit is contained in:
Roman Khimov 2020-08-31 22:11:49 +03:00
parent 097b2b8e78
commit a2d9b89964
4 changed files with 51 additions and 32 deletions

View file

@ -577,14 +577,17 @@ func (bc *Blockchain) GetStateRoot(height uint32) (*state.MPTRootState, error) {
// This is the only way to change Blockchain state. // This is the only way to change Blockchain state.
func (bc *Blockchain) storeBlock(block *block.Block) error { func (bc *Blockchain) storeBlock(block *block.Block) error {
cache := dao.NewCached(bc.dao) cache := dao.NewCached(bc.dao)
writeBuf := io.NewBufBinWriter()
appExecResults := make([]*state.AppExecResult, 0, 1+len(block.Transactions)) appExecResults := make([]*state.AppExecResult, 0, 1+len(block.Transactions))
if err := cache.StoreAsBlock(block); err != nil { if err := cache.StoreAsBlock(block, writeBuf); err != nil {
return err return err
} }
writeBuf.Reset()
if err := cache.StoreAsCurrentBlock(block); err != nil { if err := cache.StoreAsCurrentBlock(block, writeBuf); err != nil {
return err return err
} }
writeBuf.Reset()
if block.Index > 0 { if block.Index > 0 {
systemInterop := bc.newInteropContext(trigger.System, cache, block, nil) systemInterop := bc.newInteropContext(trigger.System, cache, block, nil)
@ -608,17 +611,19 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
Events: systemInterop.Notifications, Events: systemInterop.Notifications,
} }
appExecResults = append(appExecResults, aer) appExecResults = append(appExecResults, aer)
err := cache.PutAppExecResult(aer) err := cache.PutAppExecResult(aer, writeBuf)
if err != nil { if err != nil {
return fmt.Errorf("failed to store onPersist exec result: %w", err) return fmt.Errorf("failed to store onPersist exec result: %w", err)
} }
writeBuf.Reset()
} }
var txHashes = make([]util.Uint256, len(block.Transactions)) var txHashes = make([]util.Uint256, len(block.Transactions))
for i, tx := range block.Transactions { for i, tx := range block.Transactions {
if err := cache.StoreAsTransaction(tx, block.Index); err != nil { if err := cache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil {
return err return err
} }
writeBuf.Reset()
systemInterop := bc.newInteropContext(trigger.Application, cache, block, tx) systemInterop := bc.newInteropContext(trigger.Application, cache, block, tx)
v := systemInterop.SpawnVM() v := systemInterop.SpawnVM()
@ -650,10 +655,11 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
Events: systemInterop.Notifications, Events: systemInterop.Notifications,
} }
appExecResults = append(appExecResults, aer) appExecResults = append(appExecResults, aer)
err = cache.PutAppExecResult(aer) err = cache.PutAppExecResult(aer, writeBuf)
if err != nil { if err != nil {
return fmt.Errorf("failed to store tx exec result: %w", err) return fmt.Errorf("failed to store tx exec result: %w", err)
} }
writeBuf.Reset()
txHashes[i] = tx.Hash() txHashes[i] = tx.Hash()
} }
sort.Slice(txHashes, func(i, j int) bool { sort.Slice(txHashes, func(i, j int) bool {

View file

@ -44,16 +44,16 @@ type DAO interface {
GetWrapped() DAO GetWrapped() DAO
HasTransaction(hash util.Uint256) bool HasTransaction(hash util.Uint256) bool
Persist() (int, error) Persist() (int, error)
PutAppExecResult(aer *state.AppExecResult) error PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error
PutContractState(cs *state.Contract) error PutContractState(cs *state.Contract) error
PutCurrentHeader(hashAndIndex []byte) error PutCurrentHeader(hashAndIndex []byte) error
PutNEP5Balances(acc util.Uint160, bs *state.NEP5Balances) error PutNEP5Balances(acc util.Uint160, bs *state.NEP5Balances) error
PutNEP5TransferLog(acc util.Uint160, index uint32, lg *state.NEP5TransferLog) error PutNEP5TransferLog(acc util.Uint160, index uint32, lg *state.NEP5TransferLog) error
PutStorageItem(id int32, key []byte, si *state.StorageItem) error PutStorageItem(id int32, key []byte, si *state.StorageItem) error
PutVersion(v string) error PutVersion(v string) error
StoreAsBlock(block *block.Block) error StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error
StoreAsCurrentBlock(block *block.Block) error StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
StoreAsTransaction(tx *transaction.Transaction, index uint32) error StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error
putNEP5Balances(acc util.Uint160, bs *state.NEP5Balances, buf *io.BufBinWriter) error putNEP5Balances(acc util.Uint160, bs *state.NEP5Balances, buf *io.BufBinWriter) error
} }
@ -270,11 +270,14 @@ func (dao *Simple) GetAppExecResult(hash util.Uint256) (*state.AppExecResult, er
} }
// PutAppExecResult puts given application execution result into the // PutAppExecResult puts given application execution result into the
// given store. // given store. It can reuse given buffer for the purpose of value serialization.
func (dao *Simple) PutAppExecResult(aer *state.AppExecResult) error { func (dao *Simple) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error {
key := storage.AppendPrefix(storage.STNotification, aer.TxHash.BytesBE()) key := storage.AppendPrefix(storage.STNotification, aer.TxHash.BytesBE())
if buf == nil {
return dao.Put(aer, key) return dao.Put(aer, key)
} }
return dao.putWithBuffer(aer, key, buf)
}
// -- end notification event. // -- end notification event.
@ -560,12 +563,15 @@ func (dao *Simple) HasTransaction(hash util.Uint256) bool {
return false return false
} }
// StoreAsBlock stores the given block as DataBlock. // StoreAsBlock stores given block as DataBlock. It can reuse given buffer for
func (dao *Simple) StoreAsBlock(block *block.Block) error { // the purpose of value serialization.
func (dao *Simple) StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error {
var ( var (
key = storage.AppendPrefix(storage.DataBlock, block.Hash().BytesLE()) key = storage.AppendPrefix(storage.DataBlock, block.Hash().BytesLE())
buf = io.NewBufBinWriter()
) )
if buf == nil {
buf = io.NewBufBinWriter()
}
b, err := block.Trim() b, err := block.Trim()
if err != nil { if err != nil {
return err return err
@ -577,19 +583,26 @@ func (dao *Simple) StoreAsBlock(block *block.Block) error {
return dao.Store.Put(key, buf.Bytes()) return dao.Store.Put(key, buf.Bytes())
} }
// StoreAsCurrentBlock stores the given block witch prefix SYSCurrentBlock. // StoreAsCurrentBlock stores a hash of the given block with prefix
func (dao *Simple) StoreAsCurrentBlock(block *block.Block) error { // SYSCurrentBlock. It can reuse given buffer for the purpose of value
buf := io.NewBufBinWriter() // serialization.
func (dao *Simple) StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error {
if buf == nil {
buf = io.NewBufBinWriter()
}
h := block.Hash() h := block.Hash()
h.EncodeBinary(buf.BinWriter) h.EncodeBinary(buf.BinWriter)
buf.WriteU32LE(block.Index) buf.WriteU32LE(block.Index)
return dao.Store.Put(storage.SYSCurrentBlock.Bytes(), buf.Bytes()) return dao.Store.Put(storage.SYSCurrentBlock.Bytes(), buf.Bytes())
} }
// StoreAsTransaction stores the given TX as DataTransaction. // StoreAsTransaction stores given TX as DataTransaction. It can reuse given
func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32) error { // buffer for the purpose of value serialization.
func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error {
key := storage.AppendPrefix(storage.DataTransaction, tx.Hash().BytesLE()) key := storage.AppendPrefix(storage.DataTransaction, tx.Hash().BytesLE())
buf := io.NewBufBinWriter() if buf == nil {
buf = io.NewBufBinWriter()
}
buf.WriteU32LE(index) buf.WriteU32LE(index)
tx.EncodeBinary(buf.BinWriter) tx.EncodeBinary(buf.BinWriter)
if buf.Err != nil { if buf.Err != nil {

View file

@ -86,7 +86,7 @@ func TestPutGetAppExecResult(t *testing.T) {
Events: []state.NotificationEvent{}, Events: []state.NotificationEvent{},
Stack: []stackitem.Item{}, Stack: []stackitem.Item{},
} }
err := dao.PutAppExecResult(appExecResult) err := dao.PutAppExecResult(appExecResult, nil)
require.NoError(t, err) require.NoError(t, err)
gotAppExecResult, err := dao.GetAppExecResult(hash) gotAppExecResult, err := dao.GetAppExecResult(hash)
require.NoError(t, err) require.NoError(t, err)
@ -136,7 +136,7 @@ func TestPutGetBlock(t *testing.T) {
}, },
} }
hash := b.Hash() hash := b.Hash()
err := dao.StoreAsBlock(b) err := dao.StoreAsBlock(b, nil)
require.NoError(t, err) require.NoError(t, err)
gotBlock, err := dao.GetBlock(hash) gotBlock, err := dao.GetBlock(hash)
require.NoError(t, err) require.NoError(t, err)
@ -176,7 +176,7 @@ func TestGetCurrentHeaderHeight_Store(t *testing.T) {
}, },
}, },
} }
err := dao.StoreAsCurrentBlock(b) err := dao.StoreAsCurrentBlock(b, nil)
require.NoError(t, err) require.NoError(t, err)
height, err := dao.GetCurrentBlockHeight() height, err := dao.GetCurrentBlockHeight()
require.NoError(t, err) require.NoError(t, err)
@ -187,7 +187,7 @@ func TestStoreAsTransaction(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), netmode.UnitTestNet) dao := NewSimple(storage.NewMemoryStore(), netmode.UnitTestNet)
tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 1) tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 1)
hash := tx.Hash() hash := tx.Hash()
err := dao.StoreAsTransaction(tx, 0) err := dao.StoreAsTransaction(tx, 0, nil)
require.NoError(t, err) require.NoError(t, err)
hasTransaction := dao.HasTransaction(hash) hasTransaction := dao.HasTransaction(hash)
require.True(t, hasTransaction) require.True(t, hasTransaction)

View file

@ -27,7 +27,7 @@ func TestBCGetTransaction(t *testing.T) {
defer chain.Close() defer chain.Close()
t.Run("success", func(t *testing.T) { t.Run("success", func(t *testing.T) {
require.NoError(t, context.DAO.StoreAsTransaction(tx, 0)) require.NoError(t, context.DAO.StoreAsTransaction(tx, 0, nil))
v.Estack().PushVal(tx.Hash().BytesBE()) v.Estack().PushVal(tx.Hash().BytesBE())
err := bcGetTransaction(context) err := bcGetTransaction(context)
require.NoError(t, err) require.NoError(t, err)
@ -47,7 +47,7 @@ func TestBCGetTransaction(t *testing.T) {
}) })
t.Run("isn't traceable", func(t *testing.T) { t.Run("isn't traceable", func(t *testing.T) {
require.NoError(t, context.DAO.StoreAsTransaction(tx, 1)) require.NoError(t, context.DAO.StoreAsTransaction(tx, 1, nil))
v.Estack().PushVal(tx.Hash().BytesBE()) v.Estack().PushVal(tx.Hash().BytesBE())
err := bcGetTransaction(context) err := bcGetTransaction(context)
require.NoError(t, err) require.NoError(t, err)
@ -57,7 +57,7 @@ func TestBCGetTransaction(t *testing.T) {
}) })
t.Run("bad hash", func(t *testing.T) { t.Run("bad hash", func(t *testing.T) {
require.NoError(t, context.DAO.StoreAsTransaction(tx, 1)) require.NoError(t, context.DAO.StoreAsTransaction(tx, 1, nil))
v.Estack().PushVal(tx.Hash().BytesLE()) v.Estack().PushVal(tx.Hash().BytesLE())
err := bcGetTransaction(context) err := bcGetTransaction(context)
require.NoError(t, err) require.NoError(t, err)
@ -71,7 +71,7 @@ func TestBCGetTransactionFromBlock(t *testing.T) {
v, block, context, chain := createVMAndBlock(t) v, block, context, chain := createVMAndBlock(t)
defer chain.Close() defer chain.Close()
require.NoError(t, chain.AddBlock(chain.newBlock())) require.NoError(t, chain.AddBlock(chain.newBlock()))
require.NoError(t, context.DAO.StoreAsBlock(block)) require.NoError(t, context.DAO.StoreAsBlock(block, nil))
t.Run("success", func(t *testing.T) { t.Run("success", func(t *testing.T) {
v.Estack().PushVal(0) v.Estack().PushVal(0)
@ -94,7 +94,7 @@ func TestBCGetTransactionFromBlock(t *testing.T) {
t.Run("isn't traceable", func(t *testing.T) { t.Run("isn't traceable", func(t *testing.T) {
block.Index = 2 block.Index = 2
require.NoError(t, context.DAO.StoreAsBlock(block)) require.NoError(t, context.DAO.StoreAsBlock(block, nil))
v.Estack().PushVal(0) v.Estack().PushVal(0)
v.Estack().PushVal(block.Hash().BytesBE()) v.Estack().PushVal(block.Hash().BytesBE())
err := bcGetTransactionFromBlock(context) err := bcGetTransactionFromBlock(context)
@ -106,7 +106,7 @@ func TestBCGetTransactionFromBlock(t *testing.T) {
t.Run("bad block hash", func(t *testing.T) { t.Run("bad block hash", func(t *testing.T) {
block.Index = 1 block.Index = 1
require.NoError(t, context.DAO.StoreAsBlock(block)) require.NoError(t, context.DAO.StoreAsBlock(block, nil))
v.Estack().PushVal(0) v.Estack().PushVal(0)
v.Estack().PushVal(block.Hash().BytesLE()) v.Estack().PushVal(block.Hash().BytesLE())
err := bcGetTransactionFromBlock(context) err := bcGetTransactionFromBlock(context)
@ -117,7 +117,7 @@ func TestBCGetTransactionFromBlock(t *testing.T) {
}) })
t.Run("bad transaction index", func(t *testing.T) { t.Run("bad transaction index", func(t *testing.T) {
require.NoError(t, context.DAO.StoreAsBlock(block)) require.NoError(t, context.DAO.StoreAsBlock(block, nil))
v.Estack().PushVal(1) v.Estack().PushVal(1)
v.Estack().PushVal(block.Hash().BytesBE()) v.Estack().PushVal(block.Hash().BytesBE())
err := bcGetTransactionFromBlock(context) err := bcGetTransactionFromBlock(context)