From 4c39b6600db7ce1dd880a935e55cbebd4e710066 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 7 Dec 2021 23:05:28 +0300 Subject: [PATCH] *: store application long along with tx/block Two times less keys inserted into the DB per tx leads to ~13% TPS improvement. We also drop one goroutine with it which isn't bad as well. --- pkg/core/block/block.go | 5 +- pkg/core/block/block_test.go | 3 +- pkg/core/blockchain.go | 76 +++++++-------------- pkg/core/blockchain_test.go | 4 +- pkg/core/dao/dao.go | 119 ++++++++++++++++----------------- pkg/core/dao/dao_test.go | 83 +++++++++++++++++------ pkg/core/native_ledger_test.go | 12 ++-- pkg/core/statesync/module.go | 4 +- pkg/core/storage/store.go | 18 +++-- pkg/core/storage/store_test.go | 4 +- 10 files changed, 168 insertions(+), 160 deletions(-) diff --git a/pkg/core/block/block.go b/pkg/core/block/block.go index a7bf0770c..028b278d2 100644 --- a/pkg/core/block/block.go +++ b/pkg/core/block/block.go @@ -62,11 +62,11 @@ func (b *Block) RebuildMerkleRoot() { b.MerkleRoot = b.ComputeMerkleRoot() } -// NewBlockFromTrimmedBytes returns a new block from trimmed data. +// NewTrimmedFromReader returns a new block from trimmed data. // This is commonly used to create a block from stored data. // Blocks created from trimmed data will have their Trimmed field // set to true. -func NewBlockFromTrimmedBytes(stateRootEnabled bool, b []byte) (*Block, error) { +func NewTrimmedFromReader(stateRootEnabled bool, br *io.BinReader) (*Block, error) { block := &Block{ Header: Header{ StateRootEnabled: stateRootEnabled, @@ -74,7 +74,6 @@ func NewBlockFromTrimmedBytes(stateRootEnabled bool, b []byte) (*Block, error) { Trimmed: true, } - br := io.NewBinReaderFromBuf(b) block.Header.DecodeBinary(br) lenHashes := br.ReadVarUint() if lenHashes > MaxTransactionsPerBlock { diff --git a/pkg/core/block/block_test.go b/pkg/core/block/block_test.go index a5b178db9..e4995dfe2 100644 --- a/pkg/core/block/block_test.go +++ b/pkg/core/block/block_test.go @@ -58,7 +58,8 @@ func TestTrimmedBlock(t *testing.T) { b, err := block.Trim() require.NoError(t, err) - trimmedBlock, err := NewBlockFromTrimmedBytes(false, b) + r := io.NewBinReaderFromBuf(b) + trimmedBlock, err := NewTrimmedFromReader(false, r) require.NoError(t, err) assert.True(t, trimmedBlock.Trimmed) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index fd96656b7..d30ead62c 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -44,7 +44,7 @@ import ( // Tuning parameters. const ( headerBatchCount = 2000 - version = "0.2.0" + version = "0.2.1" defaultInitialGAS = 52000000_00000000 defaultMemPoolSize = 50000 @@ -883,13 +883,14 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error { continue } bc.headerHashes = append(bc.headerHashes, h.Hash()) + buf.WriteB(storage.ExecBlock) h.EncodeBinary(buf.BinWriter) buf.BinWriter.WriteB(0) if buf.Err != nil { return buf.Err } - key := storage.AppendPrefix(storage.DataBlock, h.Hash().BytesBE()) + key := storage.AppendPrefix(storage.DataExecutable, h.Hash().BytesBE()) batch.Put(key, buf.Bytes()) buf.Reset() lastHeader = h @@ -936,38 +937,25 @@ func (bc *Blockchain) GetStateSyncModule() blockchainer.StateSync { func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error { var ( cache = bc.dao.GetWrapped() - blockCache = bc.dao.GetWrapped() aerCache = bc.dao.GetWrapped() appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions)) aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune. aerdone = make(chan error) - blockdone = make(chan error) ) go func() { var ( - kvcache = blockCache - writeBuf = io.NewBufBinWriter() + kvcache = aerCache + writeBuf = io.NewBufBinWriter() + err error + txCnt int + baer1, baer2 *state.AppExecResult + transCache = make(map[util.Uint160]transferData) ) - if err := kvcache.StoreAsBlock(block, writeBuf); err != nil { - blockdone <- err - return - } - writeBuf.Reset() - if err := kvcache.StoreAsCurrentBlock(block, writeBuf); err != nil { - blockdone <- err + aerdone <- err return } writeBuf.Reset() - - for _, tx := range block.Transactions { - if err := kvcache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil { - blockdone <- err - return - } - - writeBuf.Reset() - } if bc.config.RemoveUntraceableBlocks { var start, stop uint32 if bc.config.P2PStateExchangeExtensions { @@ -994,24 +982,16 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error writeBuf.Reset() } } - close(blockdone) - }() - go func() { - var ( - kvcache = aerCache - writeBuf = io.NewBufBinWriter() - err error - appendBlock bool - transCache = make(map[util.Uint160]transferData) - ) for aer := range aerchan { - if aer.Container == block.Hash() && appendBlock { - err = kvcache.AppendAppExecResult(aer, writeBuf) - } else { - err = kvcache.PutAppExecResult(aer, writeBuf) - if aer.Container == block.Hash() { - appendBlock = true + if aer.Container == block.Hash() { + if baer1 == nil { + baer1 = aer + } else { + baer2 = aer } + } else { + err = kvcache.StoreAsTransaction(block.Transactions[txCnt], block.Index, aer, writeBuf) + txCnt++ } if err != nil { err = fmt.Errorf("failed to store exec result: %w", err) @@ -1028,6 +1008,11 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error aerdone <- err return } + if err := kvcache.StoreAsBlock(block, baer1, baer2, writeBuf); err != nil { + aerdone <- err + return + } + writeBuf.Reset() for acc, trData := range transCache { err = kvcache.PutTokenTransferInfo(acc, &trData.Info) if err != nil { @@ -1055,7 +1040,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error if err != nil { // Release goroutines, don't care about errors, we already have one. close(aerchan) - <-blockdone <-aerdone return fmt.Errorf("onPersist failed: %w", err) } @@ -1077,7 +1061,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error if err != nil { // Release goroutines, don't care about errors, we already have one. close(aerchan) - <-blockdone <-aerdone return fmt.Errorf("failed to persist invocation results: %w", err) } @@ -1107,7 +1090,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error if err != nil { // Release goroutines, don't care about errors, we already have one. close(aerchan) - <-blockdone <-aerdone return fmt.Errorf("postPersist failed: %w", err) } @@ -1119,7 +1101,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error mpt, sr, err := bc.stateRoot.AddMPTBatch(block.Index, b, d.Store) if err != nil { // Release goroutines, don't care about errors, we already have one. - <-blockdone <-aerdone // Here MPT can be left in a half-applied state. // However if this error occurs, this is a bug somewhere in code @@ -1135,7 +1116,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error } if err != nil { // Release goroutines, don't care about errors, we already have one. - <-blockdone <-aerdone return err } @@ -1152,22 +1132,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error mpt.Collapse(10) } - // Wait for _both_ goroutines to finish. - blockerr := <-blockdone aererr := <-aerdone - if blockerr != nil { - return blockerr - } if aererr != nil { return aererr } bc.lock.Lock() - _, err = blockCache.Persist() - if err != nil { - bc.lock.Unlock() - return err - } _, err = aerCache.Persist() if err != nil { bc.lock.Unlock() diff --git a/pkg/core/blockchain_test.go b/pkg/core/blockchain_test.go index 1276f37e3..2e309189a 100644 --- a/pkg/core/blockchain_test.go +++ b/pkg/core/blockchain_test.go @@ -126,7 +126,7 @@ func TestAddBlock(t *testing.T) { require.NoError(t, err) for _, block := range blocks { - key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE()) + key := storage.AppendPrefix(storage.DataExecutable, block.Hash().BytesBE()) _, err := bc.dao.Store.Get(key) require.NoErrorf(t, err, "block %s not persisted", block.Hash()) } @@ -805,7 +805,7 @@ func TestVerifyTx(t *testing.T) { }, }, } - require.NoError(t, bc.dao.StoreAsTransaction(conflicting, bc.blockHeight, nil)) + require.NoError(t, bc.dao.StoreAsTransaction(conflicting, bc.blockHeight, nil, nil)) require.True(t, errors.Is(bc.VerifyTx(tx), ErrHasConflicts)) }) t.Run("attribute on-chain conflict", func(t *testing.T) { diff --git a/pkg/core/dao/dao.go b/pkg/core/dao/dao.go index e0e9bad9b..4220f6e0e 100644 --- a/pkg/core/dao/dao.go +++ b/pkg/core/dao/dao.go @@ -30,7 +30,6 @@ var ( // DAO is a data access object. type DAO interface { - AppendAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error DeleteBlock(h util.Uint256, buf *io.BufBinWriter) error DeleteContractID(id int32) error DeleteStorageItem(id int32, key []byte) error @@ -55,7 +54,6 @@ type DAO interface { HasTransaction(hash util.Uint256) error Persist() (int, error) PersistSync() (int, error) - PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error PutContractID(id int32, hash util.Uint160) error PutCurrentHeader(hashAndIndex []byte) error PutTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo) error @@ -66,9 +64,9 @@ type DAO interface { PutVersion(v Version) error Seek(id int32, prefix []byte, f func(k, v []byte)) SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue - StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error + StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error - StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error + StoreAsTransaction(tx *transaction.Transaction, index uint32, aer *state.AppExecResult, buf *io.BufBinWriter) error putTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo, buf *io.BufBinWriter) error } @@ -220,12 +218,26 @@ func (dao *Simple) PutTokenTransferLog(acc util.Uint160, index uint32, isNEP11 b // GetAppExecResults gets application execution results with the specified trigger from the // given store. func (dao *Simple) GetAppExecResults(hash util.Uint256, trig trigger.Type) ([]state.AppExecResult, error) { - key := storage.AppendPrefix(storage.STNotification, hash.BytesBE()) - aers, err := dao.Store.Get(key) + key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE()) + bs, err := dao.Store.Get(key) if err != nil { return nil, err } - r := io.NewBinReaderFromBuf(aers) + r := io.NewBinReaderFromBuf(bs) + switch r.ReadB() { + case storage.ExecBlock: + _, err = block.NewTrimmedFromReader(dao.Version.StateRootInHeader, r) + if err != nil { + return nil, err + } + case storage.ExecTransaction: + _ = r.ReadU32LE() + tx := &transaction.Transaction{} + tx.DecodeBinary(r) + } + if r.Err != nil { + return nil, r.Err + } result := make([]state.AppExecResult, 0, 2) for { aer := new(state.AppExecResult) @@ -243,39 +255,6 @@ func (dao *Simple) GetAppExecResults(hash util.Uint256, trig trigger.Type) ([]st return result, nil } -// AppendAppExecResult appends given application execution result to the existing -// set of execution results for the corresponding hash. It can reuse given buffer -// for the purpose of value serialization. -func (dao *Simple) AppendAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error { - key := storage.AppendPrefix(storage.STNotification, aer.Container.BytesBE()) - aers, err := dao.Store.Get(key) - if err != nil && err != storage.ErrKeyNotFound { - return err - } - if len(aers) == 0 { - return dao.PutAppExecResult(aer, buf) - } - if buf == nil { - buf = io.NewBufBinWriter() - } - aer.EncodeBinary(buf.BinWriter) - if buf.Err != nil { - return buf.Err - } - aers = append(aers, buf.Bytes()...) - return dao.Store.Put(key, aers) -} - -// PutAppExecResult puts given application execution result into the -// given store. It can reuse given buffer for the purpose of value serialization. -func (dao *Simple) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error { - key := storage.AppendPrefix(storage.STNotification, aer.Container.BytesBE()) - if buf == nil { - return dao.Put(aer, key) - } - return dao.putWithBuffer(aer, key, buf) -} - // -- end notification event. // -- start storage item. @@ -363,13 +342,17 @@ func makeStorageItemKey(prefix storage.KeyPrefix, id int32, key []byte) []byte { // GetBlock returns Block by the given hash if it exists in the store. func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) { - key := storage.AppendPrefix(storage.DataBlock, hash.BytesBE()) + key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE()) b, err := dao.Store.Get(key) if err != nil { return nil, err } - block, err := block.NewBlockFromTrimmedBytes(dao.Version.StateRootInHeader, b) + r := io.NewBinReaderFromBuf(b) + if r.ReadB() != storage.ExecBlock { + return nil, errors.New("internal DB inconsistency") + } + block, err := block.NewTrimmedFromReader(dao.Version.StateRootInHeader, r) if err != nil { return nil, err } @@ -525,18 +508,22 @@ func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) { // GetTransaction returns Transaction and its height by the given hash // if it exists in the store. It does not return dummy transactions. func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) { - key := storage.AppendPrefix(storage.DataTransaction, hash.BytesBE()) + key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE()) b, err := dao.Store.Get(key) if err != nil { return nil, 0, err } - if len(b) < 5 { + if len(b) < 6 { return nil, 0, errors.New("bad transaction bytes") } - if b[4] == transaction.DummyVersion { + if b[0] != storage.ExecTransaction { + return nil, 0, errors.New("internal DB inconsistency") + } + if b[5] == transaction.DummyVersion { return nil, 0, storage.ErrKeyNotFound } r := io.NewBinReaderFromBuf(b) + _ = r.ReadB() var height = r.ReadU32LE() @@ -591,16 +578,16 @@ func read2000Uint256Hashes(b []byte) ([]util.Uint256, error) { // Transaction hash. It returns an error in case if transaction is in chain // or in the list of conflicting transactions. func (dao *Simple) HasTransaction(hash util.Uint256) error { - key := storage.AppendPrefix(storage.DataTransaction, hash.BytesBE()) + key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE()) bytes, err := dao.Store.Get(key) if err != nil { return nil } - if len(bytes) < 5 { + if len(bytes) < 6 { return nil } - if bytes[4] == transaction.DummyVersion { + if bytes[5] == transaction.DummyVersion { return ErrHasConflicts } return ErrAlreadyExists @@ -608,18 +595,25 @@ func (dao *Simple) HasTransaction(hash util.Uint256) error { // StoreAsBlock stores given block as DataBlock. It can reuse given buffer for // the purpose of value serialization. -func (dao *Simple) StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error { +func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error { var ( - key = storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE()) + key = storage.AppendPrefix(storage.DataExecutable, block.Hash().BytesBE()) ) if buf == nil { buf = io.NewBufBinWriter() } + buf.WriteB(storage.ExecBlock) b, err := block.Trim() if err != nil { return err } buf.WriteBytes(b) + if aer1 != nil { + aer1.EncodeBinary(buf.BinWriter) + } + if aer2 != nil { + aer2.EncodeBinary(buf.BinWriter) + } if buf.Err != nil { return buf.Err } @@ -630,14 +624,18 @@ func (dao *Simple) StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { batch := dao.Store.Batch() key := make([]byte, util.Uint256Size+1) - key[0] = byte(storage.DataBlock) + key[0] = byte(storage.DataExecutable) copy(key[1:], h.BytesBE()) bs, err := dao.Store.Get(key) if err != nil { return err } - b, err := block.NewBlockFromTrimmedBytes(dao.Version.StateRootInHeader, bs) + r := io.NewBinReaderFromBuf(bs) + if r.ReadB() != storage.ExecBlock { + return errors.New("internal DB inconsistency") + } + b, err := block.NewTrimmedFromReader(dao.Version.StateRootInHeader, r) if err != nil { return err } @@ -645,6 +643,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { if w == nil { w = io.NewBufBinWriter() } + w.WriteB(storage.ExecBlock) b.Header.EncodeBinary(w.BinWriter) w.BinWriter.WriteB(0) if w.Err != nil { @@ -652,7 +651,6 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { } batch.Put(key, w.Bytes()) - key[0] = byte(storage.DataTransaction) for _, tx := range b.Transactions { copy(key[1:], tx.Hash().BytesBE()) batch.Delete(key) @@ -663,14 +661,8 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { batch.Delete(key) } } - key[0] = byte(storage.STNotification) - batch.Delete(key) } - key[0] = byte(storage.STNotification) - copy(key[1:], h.BytesBE()) - batch.Delete(key) - return dao.Store.PutBatch(batch) } @@ -690,13 +682,17 @@ func (dao *Simple) StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) // StoreAsTransaction stores given TX as DataTransaction. It also stores transactions // given tx has conflicts with as DataTransaction with dummy version. It can reuse given // buffer for the purpose of value serialization. -func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error { - key := storage.AppendPrefix(storage.DataTransaction, tx.Hash().BytesBE()) +func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, aer *state.AppExecResult, buf *io.BufBinWriter) error { + key := storage.AppendPrefix(storage.DataExecutable, tx.Hash().BytesBE()) if buf == nil { buf = io.NewBufBinWriter() } + buf.WriteB(storage.ExecTransaction) buf.WriteU32LE(index) tx.EncodeBinary(buf.BinWriter) + if aer != nil { + aer.EncodeBinary(buf.BinWriter) + } if buf.Err != nil { return buf.Err } @@ -711,6 +707,7 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, copy(key[1:], hash.BytesBE()) if value == nil { buf.Reset() + buf.WriteB(storage.ExecTransaction) buf.WriteU32LE(index) buf.BinWriter.WriteB(transaction.DummyVersion) value = buf.Bytes() diff --git a/pkg/core/dao/dao_test.go b/pkg/core/dao/dao_test.go index 29c6de4b8..fa3fb9d41 100644 --- a/pkg/core/dao/dao_test.go +++ b/pkg/core/dao/dao_test.go @@ -43,24 +43,6 @@ func (t *TestSerializable) DecodeBinary(reader *io.BinReader) { t.field = reader.ReadString() } -func TestPutGetAppExecResult(t *testing.T) { - dao := NewSimple(storage.NewMemoryStore(), false, false) - hash := random.Uint256() - appExecResult := &state.AppExecResult{ - Container: hash, - Execution: state.Execution{ - Trigger: trigger.Application, - Events: []state.NotificationEvent{}, - Stack: []stackitem.Item{}, - }, - } - err := dao.AppendAppExecResult(appExecResult, nil) - require.NoError(t, err) - gotAppExecResult, err := dao.GetAppExecResults(hash, trigger.All) - require.NoError(t, err) - require.Equal(t, []state.AppExecResult{*appExecResult}, gotAppExecResult) -} - func TestPutGetStorageItem(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), false, false) id := int32(random.Int(0, 1024)) @@ -104,11 +86,32 @@ func TestPutGetBlock(t *testing.T) { }, } hash := b.Hash() - err := dao.StoreAsBlock(b, nil) + appExecResult1 := &state.AppExecResult{ + Container: hash, + Execution: state.Execution{ + Trigger: trigger.OnPersist, + Events: []state.NotificationEvent{}, + Stack: []stackitem.Item{}, + }, + } + appExecResult2 := &state.AppExecResult{ + Container: hash, + Execution: state.Execution{ + Trigger: trigger.PostPersist, + Events: []state.NotificationEvent{}, + Stack: []stackitem.Item{}, + }, + } + err := dao.StoreAsBlock(b, appExecResult1, appExecResult2, nil) require.NoError(t, err) gotBlock, err := dao.GetBlock(hash) require.NoError(t, err) require.NotNil(t, gotBlock) + gotAppExecResult, err := dao.GetAppExecResults(hash, trigger.All) + require.NoError(t, err) + require.Equal(t, 2, len(gotAppExecResult)) + require.Equal(t, *appExecResult1, gotAppExecResult[0]) + require.Equal(t, *appExecResult2, gotAppExecResult[1]) } func TestGetVersion_NoVersion(t *testing.T) { @@ -177,17 +180,33 @@ func TestStoreAsTransaction(t *testing.T) { t.Run("P2PSigExtensions off", func(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), false, false) tx := transaction.New([]byte{byte(opcode.PUSH1)}, 1) + tx.Signers = append(tx.Signers, transaction.Signer{}) + tx.Scripts = append(tx.Scripts, transaction.Witness{}) hash := tx.Hash() - err := dao.StoreAsTransaction(tx, 0, nil) + aer := &state.AppExecResult{ + Container: hash, + Execution: state.Execution{ + Trigger: trigger.Application, + Events: []state.NotificationEvent{}, + Stack: []stackitem.Item{}, + }, + } + err := dao.StoreAsTransaction(tx, 0, aer, nil) require.NoError(t, err) err = dao.HasTransaction(hash) require.NotNil(t, err) + gotAppExecResult, err := dao.GetAppExecResults(hash, trigger.All) + require.NoError(t, err) + require.Equal(t, 1, len(gotAppExecResult)) + require.Equal(t, *aer, gotAppExecResult[0]) }) t.Run("P2PSigExtensions on", func(t *testing.T) { dao := NewSimple(storage.NewMemoryStore(), false, true) conflictsH := util.Uint256{1, 2, 3} tx := transaction.New([]byte{byte(opcode.PUSH1)}, 1) + tx.Signers = append(tx.Signers, transaction.Signer{}) + tx.Scripts = append(tx.Scripts, transaction.Witness{}) tx.Attributes = []transaction.Attribute{ { Type: transaction.ConflictsT, @@ -195,12 +214,24 @@ func TestStoreAsTransaction(t *testing.T) { }, } hash := tx.Hash() - err := dao.StoreAsTransaction(tx, 0, nil) + aer := &state.AppExecResult{ + Container: hash, + Execution: state.Execution{ + Trigger: trigger.Application, + Events: []state.NotificationEvent{}, + Stack: []stackitem.Item{}, + }, + } + err := dao.StoreAsTransaction(tx, 0, aer, nil) require.NoError(t, err) err = dao.HasTransaction(hash) require.True(t, errors.Is(err, ErrAlreadyExists)) err = dao.HasTransaction(conflictsH) require.True(t, errors.Is(err, ErrHasConflicts)) + gotAppExecResult, err := dao.GetAppExecResults(hash, trigger.All) + require.NoError(t, err) + require.Equal(t, 1, len(gotAppExecResult)) + require.Equal(t, *aer, gotAppExecResult[0]) }) } @@ -228,11 +259,19 @@ func BenchmarkStoreAsTransaction(b *testing.B) { }, } _ = tx.Hash() + aer := &state.AppExecResult{ + Container: tx.Hash(), + Execution: state.Execution{ + Trigger: trigger.Application, + Events: []state.NotificationEvent{}, + Stack: []stackitem.Item{}, + }, + } b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - err := dao.StoreAsTransaction(tx, 1, nil) + err := dao.StoreAsTransaction(tx, 1, aer, nil) if err != nil { b.FailNow() } diff --git a/pkg/core/native_ledger_test.go b/pkg/core/native_ledger_test.go index 859ccef23..e8166dff4 100644 --- a/pkg/core/native_ledger_test.go +++ b/pkg/core/native_ledger_test.go @@ -18,7 +18,7 @@ func TestLedgerGetTransactionHeight(t *testing.T) { for i := 0; i < 13; i++ { require.NoError(t, chain.AddBlock(chain.newBlock())) } - require.NoError(t, chain.dao.StoreAsTransaction(tx, 13, nil)) + require.NoError(t, chain.dao.StoreAsTransaction(tx, 13, nil, nil)) t.Run("good", func(t *testing.T) { res, err := invokeContractMethod(chain, 100000000, ledger, "getTransactionHeight", tx.Hash().BytesBE()) require.NoError(t, err) @@ -41,7 +41,7 @@ func TestLedgerGetTransaction(t *testing.T) { ledger := chain.contracts.ByName(nativenames.Ledger).Metadata().Hash t.Run("success", func(t *testing.T) { - require.NoError(t, chain.dao.StoreAsTransaction(tx, 0, nil)) + require.NoError(t, chain.dao.StoreAsTransaction(tx, 0, nil, nil)) res, err := invokeContractMethod(chain, 100000000, ledger, "getTransaction", tx.Hash().BytesBE()) require.NoError(t, err) @@ -63,13 +63,13 @@ func TestLedgerGetTransaction(t *testing.T) { }) t.Run("isn't traceable", func(t *testing.T) { - require.NoError(t, chain.dao.StoreAsTransaction(tx, 2, nil)) // block 1 is added above + require.NoError(t, chain.dao.StoreAsTransaction(tx, 2, nil, nil)) // block 1 is added above res, err := invokeContractMethod(chain, 100000000, ledger, "getTransaction", tx.Hash().BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.Null{}) }) t.Run("bad hash", func(t *testing.T) { - require.NoError(t, chain.dao.StoreAsTransaction(tx, 0, nil)) + require.NoError(t, chain.dao.StoreAsTransaction(tx, 0, nil, nil)) res, err := invokeContractMethod(chain, 100000000, ledger, "getTransaction", tx.Hash().BytesLE()) require.NoError(t, err) checkResult(t, res, stackitem.Null{}) @@ -120,7 +120,7 @@ func TestLedgerGetTransactionFromBlock(t *testing.T) { }) t.Run("isn't traceable", func(t *testing.T) { b.Index = chain.BlockHeight() + 1 - require.NoError(t, chain.dao.StoreAsBlock(b, nil)) + require.NoError(t, chain.dao.StoreAsBlock(b, nil, nil, nil)) res, err := invokeContractMethod(chain, 100000000, ledger, "getTransactionFromBlock", bhash.BytesBE(), int64(0)) require.NoError(t, err) checkResult(t, res, stackitem.Null{}) @@ -166,7 +166,7 @@ func TestLedgerGetBlock(t *testing.T) { }) t.Run("isn't traceable", func(t *testing.T) { b.Index = chain.BlockHeight() + 1 - require.NoError(t, chain.dao.StoreAsBlock(b, nil)) + require.NoError(t, chain.dao.StoreAsBlock(b, nil, nil, nil)) res, err := invokeContractMethod(chain, 100000000, ledger, "getBlock", bhash.BytesBE()) require.NoError(t, err) checkResult(t, res, stackitem.Null{}) diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 9bdd1a775..ebf888e2a 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -315,7 +315,7 @@ func (s *Module) AddBlock(block *block.Block) error { } cache := s.dao.GetWrapped() writeBuf := io.NewBufBinWriter() - if err := cache.StoreAsBlock(block, writeBuf); err != nil { + if err := cache.StoreAsBlock(block, nil, nil, writeBuf); err != nil { return err } writeBuf.Reset() @@ -326,7 +326,7 @@ func (s *Module) AddBlock(block *block.Block) error { } for _, tx := range block.Transactions { - if err := cache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil { + if err := cache.StoreAsTransaction(tx, block.Index, nil, writeBuf); err != nil { return err } writeBuf.Reset() diff --git a/pkg/core/storage/store.go b/pkg/core/storage/store.go index 20c9be321..40014e0c0 100644 --- a/pkg/core/storage/store.go +++ b/pkg/core/storage/store.go @@ -8,13 +8,11 @@ import ( // KeyPrefix constants. const ( - DataBlock KeyPrefix = 0x01 - DataTransaction KeyPrefix = 0x02 - DataMPT KeyPrefix = 0x03 - STAccount KeyPrefix = 0x40 - STNotification KeyPrefix = 0x4d - STContractID KeyPrefix = 0x51 - STStorage KeyPrefix = 0x70 + DataExecutable KeyPrefix = 0x01 + DataMPT KeyPrefix = 0x03 + STAccount KeyPrefix = 0x40 + STContractID KeyPrefix = 0x51 + STStorage KeyPrefix = 0x70 // STTempStorage is used to store contract storage items during state sync process // in order not to mess up the previous state which has its own items stored by // STStorage prefix. Once state exchange process is completed, all items with @@ -33,6 +31,12 @@ const ( SYSVersion KeyPrefix = 0xf0 ) +// Executable subtypes. +const ( + ExecBlock byte = 1 + ExecTransaction byte = 2 +) + const ( // MaxStorageKeyLen is the maximum length of a key for storage items. MaxStorageKeyLen = 64 diff --git a/pkg/core/storage/store_test.go b/pkg/core/storage/store_test.go index c20904bbd..97794f6fb 100644 --- a/pkg/core/storage/store_test.go +++ b/pkg/core/storage/store_test.go @@ -8,8 +8,7 @@ import ( var ( prefixes = []KeyPrefix{ - DataBlock, - DataTransaction, + DataExecutable, STAccount, STStorage, IXHeaderHashList, @@ -20,7 +19,6 @@ var ( expected = []uint8{ 0x01, - 0x02, 0x40, 0x70, 0x80,