*: store application long along with tx/block

Two times less keys inserted into the DB per tx leads to ~13% TPS
improvement. We also drop one goroutine with it which isn't bad as well.
This commit is contained in:
Roman Khimov 2021-12-07 23:05:28 +03:00
parent f87c595d94
commit 4c39b6600d
10 changed files with 168 additions and 160 deletions

View file

@ -62,11 +62,11 @@ func (b *Block) RebuildMerkleRoot() {
b.MerkleRoot = b.ComputeMerkleRoot() b.MerkleRoot = b.ComputeMerkleRoot()
} }
// NewBlockFromTrimmedBytes returns a new block from trimmed data. // NewTrimmedFromReader returns a new block from trimmed data.
// This is commonly used to create a block from stored data. // This is commonly used to create a block from stored data.
// Blocks created from trimmed data will have their Trimmed field // Blocks created from trimmed data will have their Trimmed field
// set to true. // set to true.
func NewBlockFromTrimmedBytes(stateRootEnabled bool, b []byte) (*Block, error) { func NewTrimmedFromReader(stateRootEnabled bool, br *io.BinReader) (*Block, error) {
block := &Block{ block := &Block{
Header: Header{ Header: Header{
StateRootEnabled: stateRootEnabled, StateRootEnabled: stateRootEnabled,
@ -74,7 +74,6 @@ func NewBlockFromTrimmedBytes(stateRootEnabled bool, b []byte) (*Block, error) {
Trimmed: true, Trimmed: true,
} }
br := io.NewBinReaderFromBuf(b)
block.Header.DecodeBinary(br) block.Header.DecodeBinary(br)
lenHashes := br.ReadVarUint() lenHashes := br.ReadVarUint()
if lenHashes > MaxTransactionsPerBlock { if lenHashes > MaxTransactionsPerBlock {

View file

@ -58,7 +58,8 @@ func TestTrimmedBlock(t *testing.T) {
b, err := block.Trim() b, err := block.Trim()
require.NoError(t, err) require.NoError(t, err)
trimmedBlock, err := NewBlockFromTrimmedBytes(false, b) r := io.NewBinReaderFromBuf(b)
trimmedBlock, err := NewTrimmedFromReader(false, r)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, trimmedBlock.Trimmed) assert.True(t, trimmedBlock.Trimmed)

View file

@ -44,7 +44,7 @@ import (
// Tuning parameters. // Tuning parameters.
const ( const (
headerBatchCount = 2000 headerBatchCount = 2000
version = "0.2.0" version = "0.2.1"
defaultInitialGAS = 52000000_00000000 defaultInitialGAS = 52000000_00000000
defaultMemPoolSize = 50000 defaultMemPoolSize = 50000
@ -883,13 +883,14 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
continue continue
} }
bc.headerHashes = append(bc.headerHashes, h.Hash()) bc.headerHashes = append(bc.headerHashes, h.Hash())
buf.WriteB(storage.ExecBlock)
h.EncodeBinary(buf.BinWriter) h.EncodeBinary(buf.BinWriter)
buf.BinWriter.WriteB(0) buf.BinWriter.WriteB(0)
if buf.Err != nil { if buf.Err != nil {
return buf.Err return buf.Err
} }
key := storage.AppendPrefix(storage.DataBlock, h.Hash().BytesBE()) key := storage.AppendPrefix(storage.DataExecutable, h.Hash().BytesBE())
batch.Put(key, buf.Bytes()) batch.Put(key, buf.Bytes())
buf.Reset() buf.Reset()
lastHeader = h lastHeader = h
@ -936,38 +937,25 @@ func (bc *Blockchain) GetStateSyncModule() blockchainer.StateSync {
func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error { func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error {
var ( var (
cache = bc.dao.GetWrapped() cache = bc.dao.GetWrapped()
blockCache = bc.dao.GetWrapped()
aerCache = bc.dao.GetWrapped() aerCache = bc.dao.GetWrapped()
appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions)) appExecResults = make([]*state.AppExecResult, 0, 2+len(block.Transactions))
aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune. aerchan = make(chan *state.AppExecResult, len(block.Transactions)/8) // Tested 8 and 4 with no practical difference, but feel free to test more and tune.
aerdone = make(chan error) aerdone = make(chan error)
blockdone = make(chan error)
) )
go func() { go func() {
var ( var (
kvcache = blockCache kvcache = aerCache
writeBuf = io.NewBufBinWriter() writeBuf = io.NewBufBinWriter()
err error
txCnt int
baer1, baer2 *state.AppExecResult
transCache = make(map[util.Uint160]transferData)
) )
if err := kvcache.StoreAsBlock(block, writeBuf); err != nil {
blockdone <- err
return
}
writeBuf.Reset()
if err := kvcache.StoreAsCurrentBlock(block, writeBuf); err != nil { if err := kvcache.StoreAsCurrentBlock(block, writeBuf); err != nil {
blockdone <- err aerdone <- err
return return
} }
writeBuf.Reset() writeBuf.Reset()
for _, tx := range block.Transactions {
if err := kvcache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil {
blockdone <- err
return
}
writeBuf.Reset()
}
if bc.config.RemoveUntraceableBlocks { if bc.config.RemoveUntraceableBlocks {
var start, stop uint32 var start, stop uint32
if bc.config.P2PStateExchangeExtensions { if bc.config.P2PStateExchangeExtensions {
@ -994,24 +982,16 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
writeBuf.Reset() writeBuf.Reset()
} }
} }
close(blockdone)
}()
go func() {
var (
kvcache = aerCache
writeBuf = io.NewBufBinWriter()
err error
appendBlock bool
transCache = make(map[util.Uint160]transferData)
)
for aer := range aerchan { for aer := range aerchan {
if aer.Container == block.Hash() && appendBlock {
err = kvcache.AppendAppExecResult(aer, writeBuf)
} else {
err = kvcache.PutAppExecResult(aer, writeBuf)
if aer.Container == block.Hash() { if aer.Container == block.Hash() {
appendBlock = true if baer1 == nil {
baer1 = aer
} else {
baer2 = aer
} }
} else {
err = kvcache.StoreAsTransaction(block.Transactions[txCnt], block.Index, aer, writeBuf)
txCnt++
} }
if err != nil { if err != nil {
err = fmt.Errorf("failed to store exec result: %w", err) err = fmt.Errorf("failed to store exec result: %w", err)
@ -1028,6 +1008,11 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
aerdone <- err aerdone <- err
return return
} }
if err := kvcache.StoreAsBlock(block, baer1, baer2, writeBuf); err != nil {
aerdone <- err
return
}
writeBuf.Reset()
for acc, trData := range transCache { for acc, trData := range transCache {
err = kvcache.PutTokenTransferInfo(acc, &trData.Info) err = kvcache.PutTokenTransferInfo(acc, &trData.Info)
if err != nil { if err != nil {
@ -1055,7 +1040,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
if err != nil { if err != nil {
// Release goroutines, don't care about errors, we already have one. // Release goroutines, don't care about errors, we already have one.
close(aerchan) close(aerchan)
<-blockdone
<-aerdone <-aerdone
return fmt.Errorf("onPersist failed: %w", err) return fmt.Errorf("onPersist failed: %w", err)
} }
@ -1077,7 +1061,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
if err != nil { if err != nil {
// Release goroutines, don't care about errors, we already have one. // Release goroutines, don't care about errors, we already have one.
close(aerchan) close(aerchan)
<-blockdone
<-aerdone <-aerdone
return fmt.Errorf("failed to persist invocation results: %w", err) return fmt.Errorf("failed to persist invocation results: %w", err)
} }
@ -1107,7 +1090,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
if err != nil { if err != nil {
// Release goroutines, don't care about errors, we already have one. // Release goroutines, don't care about errors, we already have one.
close(aerchan) close(aerchan)
<-blockdone
<-aerdone <-aerdone
return fmt.Errorf("postPersist failed: %w", err) return fmt.Errorf("postPersist failed: %w", err)
} }
@ -1119,7 +1101,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
mpt, sr, err := bc.stateRoot.AddMPTBatch(block.Index, b, d.Store) mpt, sr, err := bc.stateRoot.AddMPTBatch(block.Index, b, d.Store)
if err != nil { if err != nil {
// Release goroutines, don't care about errors, we already have one. // Release goroutines, don't care about errors, we already have one.
<-blockdone
<-aerdone <-aerdone
// Here MPT can be left in a half-applied state. // Here MPT can be left in a half-applied state.
// However if this error occurs, this is a bug somewhere in code // However if this error occurs, this is a bug somewhere in code
@ -1135,7 +1116,6 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
} }
if err != nil { if err != nil {
// Release goroutines, don't care about errors, we already have one. // Release goroutines, don't care about errors, we already have one.
<-blockdone
<-aerdone <-aerdone
return err return err
} }
@ -1152,22 +1132,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
mpt.Collapse(10) mpt.Collapse(10)
} }
// Wait for _both_ goroutines to finish.
blockerr := <-blockdone
aererr := <-aerdone aererr := <-aerdone
if blockerr != nil {
return blockerr
}
if aererr != nil { if aererr != nil {
return aererr return aererr
} }
bc.lock.Lock() bc.lock.Lock()
_, err = blockCache.Persist()
if err != nil {
bc.lock.Unlock()
return err
}
_, err = aerCache.Persist() _, err = aerCache.Persist()
if err != nil { if err != nil {
bc.lock.Unlock() bc.lock.Unlock()

View file

@ -126,7 +126,7 @@ func TestAddBlock(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
for _, block := range blocks { for _, block := range blocks {
key := storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE()) key := storage.AppendPrefix(storage.DataExecutable, block.Hash().BytesBE())
_, err := bc.dao.Store.Get(key) _, err := bc.dao.Store.Get(key)
require.NoErrorf(t, err, "block %s not persisted", block.Hash()) require.NoErrorf(t, err, "block %s not persisted", block.Hash())
} }
@ -805,7 +805,7 @@ func TestVerifyTx(t *testing.T) {
}, },
}, },
} }
require.NoError(t, bc.dao.StoreAsTransaction(conflicting, bc.blockHeight, nil)) require.NoError(t, bc.dao.StoreAsTransaction(conflicting, bc.blockHeight, nil, nil))
require.True(t, errors.Is(bc.VerifyTx(tx), ErrHasConflicts)) require.True(t, errors.Is(bc.VerifyTx(tx), ErrHasConflicts))
}) })
t.Run("attribute on-chain conflict", func(t *testing.T) { t.Run("attribute on-chain conflict", func(t *testing.T) {

View file

@ -30,7 +30,6 @@ var (
// DAO is a data access object. // DAO is a data access object.
type DAO interface { type DAO interface {
AppendAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error
DeleteBlock(h util.Uint256, buf *io.BufBinWriter) error DeleteBlock(h util.Uint256, buf *io.BufBinWriter) error
DeleteContractID(id int32) error DeleteContractID(id int32) error
DeleteStorageItem(id int32, key []byte) error DeleteStorageItem(id int32, key []byte) error
@ -55,7 +54,6 @@ type DAO interface {
HasTransaction(hash util.Uint256) error HasTransaction(hash util.Uint256) error
Persist() (int, error) Persist() (int, error)
PersistSync() (int, error) PersistSync() (int, error)
PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error
PutContractID(id int32, hash util.Uint160) error PutContractID(id int32, hash util.Uint160) error
PutCurrentHeader(hashAndIndex []byte) error PutCurrentHeader(hashAndIndex []byte) error
PutTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo) error PutTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo) error
@ -66,9 +64,9 @@ type DAO interface {
PutVersion(v Version) error PutVersion(v Version) error
Seek(id int32, prefix []byte, f func(k, v []byte)) Seek(id int32, prefix []byte, f func(k, v []byte))
SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue SeekAsync(ctx context.Context, id int32, prefix []byte) chan storage.KeyValue
StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error StoreAsTransaction(tx *transaction.Transaction, index uint32, aer *state.AppExecResult, buf *io.BufBinWriter) error
putTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo, buf *io.BufBinWriter) error putTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo, buf *io.BufBinWriter) error
} }
@ -220,12 +218,26 @@ func (dao *Simple) PutTokenTransferLog(acc util.Uint160, index uint32, isNEP11 b
// GetAppExecResults gets application execution results with the specified trigger from the // GetAppExecResults gets application execution results with the specified trigger from the
// given store. // given store.
func (dao *Simple) GetAppExecResults(hash util.Uint256, trig trigger.Type) ([]state.AppExecResult, error) { func (dao *Simple) GetAppExecResults(hash util.Uint256, trig trigger.Type) ([]state.AppExecResult, error) {
key := storage.AppendPrefix(storage.STNotification, hash.BytesBE()) key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE())
aers, err := dao.Store.Get(key) bs, err := dao.Store.Get(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
r := io.NewBinReaderFromBuf(aers) r := io.NewBinReaderFromBuf(bs)
switch r.ReadB() {
case storage.ExecBlock:
_, err = block.NewTrimmedFromReader(dao.Version.StateRootInHeader, r)
if err != nil {
return nil, err
}
case storage.ExecTransaction:
_ = r.ReadU32LE()
tx := &transaction.Transaction{}
tx.DecodeBinary(r)
}
if r.Err != nil {
return nil, r.Err
}
result := make([]state.AppExecResult, 0, 2) result := make([]state.AppExecResult, 0, 2)
for { for {
aer := new(state.AppExecResult) aer := new(state.AppExecResult)
@ -243,39 +255,6 @@ func (dao *Simple) GetAppExecResults(hash util.Uint256, trig trigger.Type) ([]st
return result, nil return result, nil
} }
// AppendAppExecResult appends given application execution result to the existing
// set of execution results for the corresponding hash. It can reuse given buffer
// for the purpose of value serialization.
func (dao *Simple) AppendAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error {
key := storage.AppendPrefix(storage.STNotification, aer.Container.BytesBE())
aers, err := dao.Store.Get(key)
if err != nil && err != storage.ErrKeyNotFound {
return err
}
if len(aers) == 0 {
return dao.PutAppExecResult(aer, buf)
}
if buf == nil {
buf = io.NewBufBinWriter()
}
aer.EncodeBinary(buf.BinWriter)
if buf.Err != nil {
return buf.Err
}
aers = append(aers, buf.Bytes()...)
return dao.Store.Put(key, aers)
}
// PutAppExecResult puts given application execution result into the
// given store. It can reuse given buffer for the purpose of value serialization.
func (dao *Simple) PutAppExecResult(aer *state.AppExecResult, buf *io.BufBinWriter) error {
key := storage.AppendPrefix(storage.STNotification, aer.Container.BytesBE())
if buf == nil {
return dao.Put(aer, key)
}
return dao.putWithBuffer(aer, key, buf)
}
// -- end notification event. // -- end notification event.
// -- start storage item. // -- start storage item.
@ -363,13 +342,17 @@ func makeStorageItemKey(prefix storage.KeyPrefix, id int32, key []byte) []byte {
// GetBlock returns Block by the given hash if it exists in the store. // GetBlock returns Block by the given hash if it exists in the store.
func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) { func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) {
key := storage.AppendPrefix(storage.DataBlock, hash.BytesBE()) key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE())
b, err := dao.Store.Get(key) b, err := dao.Store.Get(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
block, err := block.NewBlockFromTrimmedBytes(dao.Version.StateRootInHeader, b) r := io.NewBinReaderFromBuf(b)
if r.ReadB() != storage.ExecBlock {
return nil, errors.New("internal DB inconsistency")
}
block, err := block.NewTrimmedFromReader(dao.Version.StateRootInHeader, r)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -525,18 +508,22 @@ func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
// GetTransaction returns Transaction and its height by the given hash // GetTransaction returns Transaction and its height by the given hash
// if it exists in the store. It does not return dummy transactions. // if it exists in the store. It does not return dummy transactions.
func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) { func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) {
key := storage.AppendPrefix(storage.DataTransaction, hash.BytesBE()) key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE())
b, err := dao.Store.Get(key) b, err := dao.Store.Get(key)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
if len(b) < 5 { if len(b) < 6 {
return nil, 0, errors.New("bad transaction bytes") return nil, 0, errors.New("bad transaction bytes")
} }
if b[4] == transaction.DummyVersion { if b[0] != storage.ExecTransaction {
return nil, 0, errors.New("internal DB inconsistency")
}
if b[5] == transaction.DummyVersion {
return nil, 0, storage.ErrKeyNotFound return nil, 0, storage.ErrKeyNotFound
} }
r := io.NewBinReaderFromBuf(b) r := io.NewBinReaderFromBuf(b)
_ = r.ReadB()
var height = r.ReadU32LE() var height = r.ReadU32LE()
@ -591,16 +578,16 @@ func read2000Uint256Hashes(b []byte) ([]util.Uint256, error) {
// Transaction hash. It returns an error in case if transaction is in chain // Transaction hash. It returns an error in case if transaction is in chain
// or in the list of conflicting transactions. // or in the list of conflicting transactions.
func (dao *Simple) HasTransaction(hash util.Uint256) error { func (dao *Simple) HasTransaction(hash util.Uint256) error {
key := storage.AppendPrefix(storage.DataTransaction, hash.BytesBE()) key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE())
bytes, err := dao.Store.Get(key) bytes, err := dao.Store.Get(key)
if err != nil { if err != nil {
return nil return nil
} }
if len(bytes) < 5 { if len(bytes) < 6 {
return nil return nil
} }
if bytes[4] == transaction.DummyVersion { if bytes[5] == transaction.DummyVersion {
return ErrHasConflicts return ErrHasConflicts
} }
return ErrAlreadyExists return ErrAlreadyExists
@ -608,18 +595,25 @@ func (dao *Simple) HasTransaction(hash util.Uint256) error {
// StoreAsBlock stores given block as DataBlock. It can reuse given buffer for // StoreAsBlock stores given block as DataBlock. It can reuse given buffer for
// the purpose of value serialization. // the purpose of value serialization.
func (dao *Simple) StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error { func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error {
var ( var (
key = storage.AppendPrefix(storage.DataBlock, block.Hash().BytesBE()) key = storage.AppendPrefix(storage.DataExecutable, block.Hash().BytesBE())
) )
if buf == nil { if buf == nil {
buf = io.NewBufBinWriter() buf = io.NewBufBinWriter()
} }
buf.WriteB(storage.ExecBlock)
b, err := block.Trim() b, err := block.Trim()
if err != nil { if err != nil {
return err return err
} }
buf.WriteBytes(b) buf.WriteBytes(b)
if aer1 != nil {
aer1.EncodeBinary(buf.BinWriter)
}
if aer2 != nil {
aer2.EncodeBinary(buf.BinWriter)
}
if buf.Err != nil { if buf.Err != nil {
return buf.Err return buf.Err
} }
@ -630,14 +624,18 @@ func (dao *Simple) StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error
func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error { func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
batch := dao.Store.Batch() batch := dao.Store.Batch()
key := make([]byte, util.Uint256Size+1) key := make([]byte, util.Uint256Size+1)
key[0] = byte(storage.DataBlock) key[0] = byte(storage.DataExecutable)
copy(key[1:], h.BytesBE()) copy(key[1:], h.BytesBE())
bs, err := dao.Store.Get(key) bs, err := dao.Store.Get(key)
if err != nil { if err != nil {
return err return err
} }
b, err := block.NewBlockFromTrimmedBytes(dao.Version.StateRootInHeader, bs) r := io.NewBinReaderFromBuf(bs)
if r.ReadB() != storage.ExecBlock {
return errors.New("internal DB inconsistency")
}
b, err := block.NewTrimmedFromReader(dao.Version.StateRootInHeader, r)
if err != nil { if err != nil {
return err return err
} }
@ -645,6 +643,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
if w == nil { if w == nil {
w = io.NewBufBinWriter() w = io.NewBufBinWriter()
} }
w.WriteB(storage.ExecBlock)
b.Header.EncodeBinary(w.BinWriter) b.Header.EncodeBinary(w.BinWriter)
w.BinWriter.WriteB(0) w.BinWriter.WriteB(0)
if w.Err != nil { if w.Err != nil {
@ -652,7 +651,6 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
} }
batch.Put(key, w.Bytes()) batch.Put(key, w.Bytes())
key[0] = byte(storage.DataTransaction)
for _, tx := range b.Transactions { for _, tx := range b.Transactions {
copy(key[1:], tx.Hash().BytesBE()) copy(key[1:], tx.Hash().BytesBE())
batch.Delete(key) batch.Delete(key)
@ -663,14 +661,8 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
batch.Delete(key) batch.Delete(key)
} }
} }
key[0] = byte(storage.STNotification)
batch.Delete(key)
} }
key[0] = byte(storage.STNotification)
copy(key[1:], h.BytesBE())
batch.Delete(key)
return dao.Store.PutBatch(batch) return dao.Store.PutBatch(batch)
} }
@ -690,13 +682,17 @@ func (dao *Simple) StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter)
// StoreAsTransaction stores given TX as DataTransaction. It also stores transactions // StoreAsTransaction stores given TX as DataTransaction. It also stores transactions
// given tx has conflicts with as DataTransaction with dummy version. It can reuse given // given tx has conflicts with as DataTransaction with dummy version. It can reuse given
// buffer for the purpose of value serialization. // buffer for the purpose of value serialization.
func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error { func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, aer *state.AppExecResult, buf *io.BufBinWriter) error {
key := storage.AppendPrefix(storage.DataTransaction, tx.Hash().BytesBE()) key := storage.AppendPrefix(storage.DataExecutable, tx.Hash().BytesBE())
if buf == nil { if buf == nil {
buf = io.NewBufBinWriter() buf = io.NewBufBinWriter()
} }
buf.WriteB(storage.ExecTransaction)
buf.WriteU32LE(index) buf.WriteU32LE(index)
tx.EncodeBinary(buf.BinWriter) tx.EncodeBinary(buf.BinWriter)
if aer != nil {
aer.EncodeBinary(buf.BinWriter)
}
if buf.Err != nil { if buf.Err != nil {
return buf.Err return buf.Err
} }
@ -711,6 +707,7 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32,
copy(key[1:], hash.BytesBE()) copy(key[1:], hash.BytesBE())
if value == nil { if value == nil {
buf.Reset() buf.Reset()
buf.WriteB(storage.ExecTransaction)
buf.WriteU32LE(index) buf.WriteU32LE(index)
buf.BinWriter.WriteB(transaction.DummyVersion) buf.BinWriter.WriteB(transaction.DummyVersion)
value = buf.Bytes() value = buf.Bytes()

View file

@ -43,24 +43,6 @@ func (t *TestSerializable) DecodeBinary(reader *io.BinReader) {
t.field = reader.ReadString() t.field = reader.ReadString()
} }
func TestPutGetAppExecResult(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false, false)
hash := random.Uint256()
appExecResult := &state.AppExecResult{
Container: hash,
Execution: state.Execution{
Trigger: trigger.Application,
Events: []state.NotificationEvent{},
Stack: []stackitem.Item{},
},
}
err := dao.AppendAppExecResult(appExecResult, nil)
require.NoError(t, err)
gotAppExecResult, err := dao.GetAppExecResults(hash, trigger.All)
require.NoError(t, err)
require.Equal(t, []state.AppExecResult{*appExecResult}, gotAppExecResult)
}
func TestPutGetStorageItem(t *testing.T) { func TestPutGetStorageItem(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false, false) dao := NewSimple(storage.NewMemoryStore(), false, false)
id := int32(random.Int(0, 1024)) id := int32(random.Int(0, 1024))
@ -104,11 +86,32 @@ func TestPutGetBlock(t *testing.T) {
}, },
} }
hash := b.Hash() hash := b.Hash()
err := dao.StoreAsBlock(b, nil) appExecResult1 := &state.AppExecResult{
Container: hash,
Execution: state.Execution{
Trigger: trigger.OnPersist,
Events: []state.NotificationEvent{},
Stack: []stackitem.Item{},
},
}
appExecResult2 := &state.AppExecResult{
Container: hash,
Execution: state.Execution{
Trigger: trigger.PostPersist,
Events: []state.NotificationEvent{},
Stack: []stackitem.Item{},
},
}
err := dao.StoreAsBlock(b, appExecResult1, appExecResult2, nil)
require.NoError(t, err) require.NoError(t, err)
gotBlock, err := dao.GetBlock(hash) gotBlock, err := dao.GetBlock(hash)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, gotBlock) require.NotNil(t, gotBlock)
gotAppExecResult, err := dao.GetAppExecResults(hash, trigger.All)
require.NoError(t, err)
require.Equal(t, 2, len(gotAppExecResult))
require.Equal(t, *appExecResult1, gotAppExecResult[0])
require.Equal(t, *appExecResult2, gotAppExecResult[1])
} }
func TestGetVersion_NoVersion(t *testing.T) { func TestGetVersion_NoVersion(t *testing.T) {
@ -177,17 +180,33 @@ func TestStoreAsTransaction(t *testing.T) {
t.Run("P2PSigExtensions off", func(t *testing.T) { t.Run("P2PSigExtensions off", func(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false, false) dao := NewSimple(storage.NewMemoryStore(), false, false)
tx := transaction.New([]byte{byte(opcode.PUSH1)}, 1) tx := transaction.New([]byte{byte(opcode.PUSH1)}, 1)
tx.Signers = append(tx.Signers, transaction.Signer{})
tx.Scripts = append(tx.Scripts, transaction.Witness{})
hash := tx.Hash() hash := tx.Hash()
err := dao.StoreAsTransaction(tx, 0, nil) aer := &state.AppExecResult{
Container: hash,
Execution: state.Execution{
Trigger: trigger.Application,
Events: []state.NotificationEvent{},
Stack: []stackitem.Item{},
},
}
err := dao.StoreAsTransaction(tx, 0, aer, nil)
require.NoError(t, err) require.NoError(t, err)
err = dao.HasTransaction(hash) err = dao.HasTransaction(hash)
require.NotNil(t, err) require.NotNil(t, err)
gotAppExecResult, err := dao.GetAppExecResults(hash, trigger.All)
require.NoError(t, err)
require.Equal(t, 1, len(gotAppExecResult))
require.Equal(t, *aer, gotAppExecResult[0])
}) })
t.Run("P2PSigExtensions on", func(t *testing.T) { t.Run("P2PSigExtensions on", func(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false, true) dao := NewSimple(storage.NewMemoryStore(), false, true)
conflictsH := util.Uint256{1, 2, 3} conflictsH := util.Uint256{1, 2, 3}
tx := transaction.New([]byte{byte(opcode.PUSH1)}, 1) tx := transaction.New([]byte{byte(opcode.PUSH1)}, 1)
tx.Signers = append(tx.Signers, transaction.Signer{})
tx.Scripts = append(tx.Scripts, transaction.Witness{})
tx.Attributes = []transaction.Attribute{ tx.Attributes = []transaction.Attribute{
{ {
Type: transaction.ConflictsT, Type: transaction.ConflictsT,
@ -195,12 +214,24 @@ func TestStoreAsTransaction(t *testing.T) {
}, },
} }
hash := tx.Hash() hash := tx.Hash()
err := dao.StoreAsTransaction(tx, 0, nil) aer := &state.AppExecResult{
Container: hash,
Execution: state.Execution{
Trigger: trigger.Application,
Events: []state.NotificationEvent{},
Stack: []stackitem.Item{},
},
}
err := dao.StoreAsTransaction(tx, 0, aer, nil)
require.NoError(t, err) require.NoError(t, err)
err = dao.HasTransaction(hash) err = dao.HasTransaction(hash)
require.True(t, errors.Is(err, ErrAlreadyExists)) require.True(t, errors.Is(err, ErrAlreadyExists))
err = dao.HasTransaction(conflictsH) err = dao.HasTransaction(conflictsH)
require.True(t, errors.Is(err, ErrHasConflicts)) require.True(t, errors.Is(err, ErrHasConflicts))
gotAppExecResult, err := dao.GetAppExecResults(hash, trigger.All)
require.NoError(t, err)
require.Equal(t, 1, len(gotAppExecResult))
require.Equal(t, *aer, gotAppExecResult[0])
}) })
} }
@ -228,11 +259,19 @@ func BenchmarkStoreAsTransaction(b *testing.B) {
}, },
} }
_ = tx.Hash() _ = tx.Hash()
aer := &state.AppExecResult{
Container: tx.Hash(),
Execution: state.Execution{
Trigger: trigger.Application,
Events: []state.NotificationEvent{},
Stack: []stackitem.Item{},
},
}
b.ResetTimer() b.ResetTimer()
b.ReportAllocs() b.ReportAllocs()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
err := dao.StoreAsTransaction(tx, 1, nil) err := dao.StoreAsTransaction(tx, 1, aer, nil)
if err != nil { if err != nil {
b.FailNow() b.FailNow()
} }

View file

@ -18,7 +18,7 @@ func TestLedgerGetTransactionHeight(t *testing.T) {
for i := 0; i < 13; i++ { for i := 0; i < 13; i++ {
require.NoError(t, chain.AddBlock(chain.newBlock())) require.NoError(t, chain.AddBlock(chain.newBlock()))
} }
require.NoError(t, chain.dao.StoreAsTransaction(tx, 13, nil)) require.NoError(t, chain.dao.StoreAsTransaction(tx, 13, nil, nil))
t.Run("good", func(t *testing.T) { t.Run("good", func(t *testing.T) {
res, err := invokeContractMethod(chain, 100000000, ledger, "getTransactionHeight", tx.Hash().BytesBE()) res, err := invokeContractMethod(chain, 100000000, ledger, "getTransactionHeight", tx.Hash().BytesBE())
require.NoError(t, err) require.NoError(t, err)
@ -41,7 +41,7 @@ func TestLedgerGetTransaction(t *testing.T) {
ledger := chain.contracts.ByName(nativenames.Ledger).Metadata().Hash ledger := chain.contracts.ByName(nativenames.Ledger).Metadata().Hash
t.Run("success", func(t *testing.T) { t.Run("success", func(t *testing.T) {
require.NoError(t, chain.dao.StoreAsTransaction(tx, 0, nil)) require.NoError(t, chain.dao.StoreAsTransaction(tx, 0, nil, nil))
res, err := invokeContractMethod(chain, 100000000, ledger, "getTransaction", tx.Hash().BytesBE()) res, err := invokeContractMethod(chain, 100000000, ledger, "getTransaction", tx.Hash().BytesBE())
require.NoError(t, err) require.NoError(t, err)
@ -63,13 +63,13 @@ func TestLedgerGetTransaction(t *testing.T) {
}) })
t.Run("isn't traceable", func(t *testing.T) { t.Run("isn't traceable", func(t *testing.T) {
require.NoError(t, chain.dao.StoreAsTransaction(tx, 2, nil)) // block 1 is added above require.NoError(t, chain.dao.StoreAsTransaction(tx, 2, nil, nil)) // block 1 is added above
res, err := invokeContractMethod(chain, 100000000, ledger, "getTransaction", tx.Hash().BytesBE()) res, err := invokeContractMethod(chain, 100000000, ledger, "getTransaction", tx.Hash().BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Null{}) checkResult(t, res, stackitem.Null{})
}) })
t.Run("bad hash", func(t *testing.T) { t.Run("bad hash", func(t *testing.T) {
require.NoError(t, chain.dao.StoreAsTransaction(tx, 0, nil)) require.NoError(t, chain.dao.StoreAsTransaction(tx, 0, nil, nil))
res, err := invokeContractMethod(chain, 100000000, ledger, "getTransaction", tx.Hash().BytesLE()) res, err := invokeContractMethod(chain, 100000000, ledger, "getTransaction", tx.Hash().BytesLE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Null{}) checkResult(t, res, stackitem.Null{})
@ -120,7 +120,7 @@ func TestLedgerGetTransactionFromBlock(t *testing.T) {
}) })
t.Run("isn't traceable", func(t *testing.T) { t.Run("isn't traceable", func(t *testing.T) {
b.Index = chain.BlockHeight() + 1 b.Index = chain.BlockHeight() + 1
require.NoError(t, chain.dao.StoreAsBlock(b, nil)) require.NoError(t, chain.dao.StoreAsBlock(b, nil, nil, nil))
res, err := invokeContractMethod(chain, 100000000, ledger, "getTransactionFromBlock", bhash.BytesBE(), int64(0)) res, err := invokeContractMethod(chain, 100000000, ledger, "getTransactionFromBlock", bhash.BytesBE(), int64(0))
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Null{}) checkResult(t, res, stackitem.Null{})
@ -166,7 +166,7 @@ func TestLedgerGetBlock(t *testing.T) {
}) })
t.Run("isn't traceable", func(t *testing.T) { t.Run("isn't traceable", func(t *testing.T) {
b.Index = chain.BlockHeight() + 1 b.Index = chain.BlockHeight() + 1
require.NoError(t, chain.dao.StoreAsBlock(b, nil)) require.NoError(t, chain.dao.StoreAsBlock(b, nil, nil, nil))
res, err := invokeContractMethod(chain, 100000000, ledger, "getBlock", bhash.BytesBE()) res, err := invokeContractMethod(chain, 100000000, ledger, "getBlock", bhash.BytesBE())
require.NoError(t, err) require.NoError(t, err)
checkResult(t, res, stackitem.Null{}) checkResult(t, res, stackitem.Null{})

View file

@ -315,7 +315,7 @@ func (s *Module) AddBlock(block *block.Block) error {
} }
cache := s.dao.GetWrapped() cache := s.dao.GetWrapped()
writeBuf := io.NewBufBinWriter() writeBuf := io.NewBufBinWriter()
if err := cache.StoreAsBlock(block, writeBuf); err != nil { if err := cache.StoreAsBlock(block, nil, nil, writeBuf); err != nil {
return err return err
} }
writeBuf.Reset() writeBuf.Reset()
@ -326,7 +326,7 @@ func (s *Module) AddBlock(block *block.Block) error {
} }
for _, tx := range block.Transactions { for _, tx := range block.Transactions {
if err := cache.StoreAsTransaction(tx, block.Index, writeBuf); err != nil { if err := cache.StoreAsTransaction(tx, block.Index, nil, writeBuf); err != nil {
return err return err
} }
writeBuf.Reset() writeBuf.Reset()

View file

@ -8,11 +8,9 @@ import (
// KeyPrefix constants. // KeyPrefix constants.
const ( const (
DataBlock KeyPrefix = 0x01 DataExecutable KeyPrefix = 0x01
DataTransaction KeyPrefix = 0x02
DataMPT KeyPrefix = 0x03 DataMPT KeyPrefix = 0x03
STAccount KeyPrefix = 0x40 STAccount KeyPrefix = 0x40
STNotification KeyPrefix = 0x4d
STContractID KeyPrefix = 0x51 STContractID KeyPrefix = 0x51
STStorage KeyPrefix = 0x70 STStorage KeyPrefix = 0x70
// STTempStorage is used to store contract storage items during state sync process // STTempStorage is used to store contract storage items during state sync process
@ -33,6 +31,12 @@ const (
SYSVersion KeyPrefix = 0xf0 SYSVersion KeyPrefix = 0xf0
) )
// Executable subtypes.
const (
ExecBlock byte = 1
ExecTransaction byte = 2
)
const ( const (
// MaxStorageKeyLen is the maximum length of a key for storage items. // MaxStorageKeyLen is the maximum length of a key for storage items.
MaxStorageKeyLen = 64 MaxStorageKeyLen = 64

View file

@ -8,8 +8,7 @@ import (
var ( var (
prefixes = []KeyPrefix{ prefixes = []KeyPrefix{
DataBlock, DataExecutable,
DataTransaction,
STAccount, STAccount,
STStorage, STStorage,
IXHeaderHashList, IXHeaderHashList,
@ -20,7 +19,6 @@ var (
expected = []uint8{ expected = []uint8{
0x01, 0x01,
0x02,
0x40, 0x40,
0x70, 0x70,
0x80, 0x80,