dao: simplify buffer management for private DAO
Private DAO is only used in a single thread which means we can safely reuse key/data buffers most of the time and handle it all in DAO. Doesn't affect any benchmarks.
This commit is contained in:
parent
9bfb3357f2
commit
7dc8fc443f
5 changed files with 102 additions and 78 deletions
|
@ -499,7 +499,6 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
|
||||||
|
|
||||||
bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p))
|
bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p))
|
||||||
|
|
||||||
writeBuf := io.NewBufBinWriter()
|
|
||||||
jumpStageKey := storage.SYSStateJumpStage.Bytes()
|
jumpStageKey := storage.SYSStateJumpStage.Bytes()
|
||||||
switch stage {
|
switch stage {
|
||||||
case none:
|
case none:
|
||||||
|
@ -530,8 +529,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
|
||||||
// After current state is updated, we need to remove outdated state-related data if so.
|
// After current state is updated, we need to remove outdated state-related data if so.
|
||||||
// The only outdated data we might have is genesis-related data, so check it.
|
// The only outdated data we might have is genesis-related data, so check it.
|
||||||
if p-bc.config.MaxTraceableBlocks > 0 {
|
if p-bc.config.MaxTraceableBlocks > 0 {
|
||||||
writeBuf.Reset()
|
err := cache.DeleteBlock(bc.headerHashes[0])
|
||||||
err := cache.DeleteBlock(bc.headerHashes[0], writeBuf)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err)
|
return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -559,8 +557,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get current block: %w", err)
|
return fmt.Errorf("failed to get current block: %w", err)
|
||||||
}
|
}
|
||||||
writeBuf.Reset()
|
bc.dao.StoreAsCurrentBlock(block)
|
||||||
bc.dao.StoreAsCurrentBlock(block, writeBuf)
|
|
||||||
bc.topBlock.Store(block)
|
bc.topBlock.Store(block)
|
||||||
atomic.StoreUint32(&bc.blockHeight, p)
|
atomic.StoreUint32(&bc.blockHeight, p)
|
||||||
atomic.StoreUint32(&bc.persistedHeight, p)
|
atomic.StoreUint32(&bc.persistedHeight, p)
|
||||||
|
@ -1006,14 +1003,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
||||||
go func() {
|
go func() {
|
||||||
var (
|
var (
|
||||||
kvcache = aerCache
|
kvcache = aerCache
|
||||||
writeBuf = io.NewBufBinWriter()
|
|
||||||
err error
|
err error
|
||||||
txCnt int
|
txCnt int
|
||||||
baer1, baer2 *state.AppExecResult
|
baer1, baer2 *state.AppExecResult
|
||||||
transCache = make(map[util.Uint160]transferData)
|
transCache = make(map[util.Uint160]transferData)
|
||||||
)
|
)
|
||||||
kvcache.StoreAsCurrentBlock(block, writeBuf)
|
kvcache.StoreAsCurrentBlock(block)
|
||||||
writeBuf.Reset()
|
|
||||||
if bc.config.RemoveUntraceableBlocks {
|
if bc.config.RemoveUntraceableBlocks {
|
||||||
var start, stop uint32
|
var start, stop uint32
|
||||||
if bc.config.P2PStateExchangeExtensions {
|
if bc.config.P2PStateExchangeExtensions {
|
||||||
|
@ -1031,13 +1026,12 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
||||||
stop = start + 1
|
stop = start + 1
|
||||||
}
|
}
|
||||||
for index := start; index < stop; index++ {
|
for index := start; index < stop; index++ {
|
||||||
err := kvcache.DeleteBlock(bc.headerHashes[index], writeBuf)
|
err := kvcache.DeleteBlock(bc.headerHashes[index])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
bc.log.Warn("error while removing old block",
|
bc.log.Warn("error while removing old block",
|
||||||
zap.Uint32("index", index),
|
zap.Uint32("index", index),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
}
|
}
|
||||||
writeBuf.Reset()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for aer := range aerchan {
|
for aer := range aerchan {
|
||||||
|
@ -1048,7 +1042,7 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
||||||
baer2 = aer
|
baer2 = aer
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err = kvcache.StoreAsTransaction(block.Transactions[txCnt], block.Index, aer, writeBuf)
|
err = kvcache.StoreAsTransaction(block.Transactions[txCnt], block.Index, aer)
|
||||||
txCnt++
|
txCnt++
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1060,17 +1054,15 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
|
||||||
bc.handleNotification(&aer.Execution.Events[j], kvcache, transCache, block, aer.Container)
|
bc.handleNotification(&aer.Execution.Events[j], kvcache, transCache, block, aer.Container)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
writeBuf.Reset()
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
aerdone <- err
|
aerdone <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := kvcache.StoreAsBlock(block, baer1, baer2, writeBuf); err != nil {
|
if err := kvcache.StoreAsBlock(block, baer1, baer2); err != nil {
|
||||||
aerdone <- err
|
aerdone <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeBuf.Reset()
|
|
||||||
for acc, trData := range transCache {
|
for acc, trData := range transCache {
|
||||||
err = kvcache.PutTokenTransferInfo(acc, &trData.Info)
|
err = kvcache.PutTokenTransferInfo(acc, &trData.Info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -804,7 +804,7 @@ func TestVerifyTx(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
require.NoError(t, bc.dao.StoreAsTransaction(conflicting, bc.blockHeight, nil, nil))
|
require.NoError(t, bc.dao.StoreAsTransaction(conflicting, bc.blockHeight, nil))
|
||||||
require.True(t, errors.Is(bc.VerifyTx(tx), ErrHasConflicts))
|
require.True(t, errors.Is(bc.VerifyTx(tx), ErrHasConflicts))
|
||||||
})
|
})
|
||||||
t.Run("attribute on-chain conflict", func(t *testing.T) {
|
t.Run("attribute on-chain conflict", func(t *testing.T) {
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/io"
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
"github.com/nspcc-dev/neo-go/pkg/smartcontract/trigger"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/util"
|
"github.com/nspcc-dev/neo-go/pkg/util"
|
||||||
|
"github.com/nspcc-dev/neo-go/pkg/util/slice"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HasTransaction errors.
|
// HasTransaction errors.
|
||||||
|
@ -31,6 +32,8 @@ var (
|
||||||
type Simple struct {
|
type Simple struct {
|
||||||
Version Version
|
Version Version
|
||||||
Store *storage.MemCachedStore
|
Store *storage.MemCachedStore
|
||||||
|
keyBuf []byte
|
||||||
|
dataBuf *io.BufBinWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSimple creates new simple dao using provided backend store.
|
// NewSimple creates new simple dao using provided backend store.
|
||||||
|
@ -69,6 +72,16 @@ func (dao *Simple) GetPrivate() *Simple {
|
||||||
st := storage.NewPrivateMemCachedStore(dao.Store)
|
st := storage.NewPrivateMemCachedStore(dao.Store)
|
||||||
d := newSimple(st, dao.Version.StateRootInHeader, dao.Version.P2PSigExtensions)
|
d := newSimple(st, dao.Version.StateRootInHeader, dao.Version.P2PSigExtensions)
|
||||||
d.Version = dao.Version
|
d.Version = dao.Version
|
||||||
|
if dao.keyBuf != nil { // This one is private.
|
||||||
|
d.keyBuf = dao.keyBuf // Thus we can reuse its buffer.
|
||||||
|
} else {
|
||||||
|
d.keyBuf = make([]byte, 0, 1+4+storage.MaxStorageKeyLen) // Prefix, uint32, key.
|
||||||
|
}
|
||||||
|
if dao.dataBuf != nil { // This one is private.
|
||||||
|
d.dataBuf = dao.dataBuf // Thus we can reuse its buffer.
|
||||||
|
} else {
|
||||||
|
d.dataBuf = io.NewBufBinWriter()
|
||||||
|
}
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,8 +106,8 @@ func (dao *Simple) putWithBuffer(entity io.Serializable, key []byte, buf *io.Buf
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeContractIDKey(id int32) []byte {
|
func (dao *Simple) makeContractIDKey(id int32) []byte {
|
||||||
key := make([]byte, 5)
|
key := dao.getKeyBuf(5)
|
||||||
key[0] = byte(storage.STContractID)
|
key[0] = byte(storage.STContractID)
|
||||||
binary.LittleEndian.PutUint32(key[1:], uint32(id))
|
binary.LittleEndian.PutUint32(key[1:], uint32(id))
|
||||||
return key
|
return key
|
||||||
|
@ -102,18 +115,18 @@ func makeContractIDKey(id int32) []byte {
|
||||||
|
|
||||||
// DeleteContractID deletes contract's id to hash mapping.
|
// DeleteContractID deletes contract's id to hash mapping.
|
||||||
func (dao *Simple) DeleteContractID(id int32) {
|
func (dao *Simple) DeleteContractID(id int32) {
|
||||||
dao.Store.Delete(makeContractIDKey(id))
|
dao.Store.Delete(dao.makeContractIDKey(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutContractID adds a mapping from contract's ID to its hash.
|
// PutContractID adds a mapping from contract's ID to its hash.
|
||||||
func (dao *Simple) PutContractID(id int32, hash util.Uint160) {
|
func (dao *Simple) PutContractID(id int32, hash util.Uint160) {
|
||||||
dao.Store.Put(makeContractIDKey(id), hash.BytesBE())
|
dao.Store.Put(dao.makeContractIDKey(id), hash.BytesBE())
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetContractScriptHash retrieves contract's hash given its ID.
|
// GetContractScriptHash retrieves contract's hash given its ID.
|
||||||
func (dao *Simple) GetContractScriptHash(id int32) (util.Uint160, error) {
|
func (dao *Simple) GetContractScriptHash(id int32) (util.Uint160, error) {
|
||||||
var data = new(util.Uint160)
|
var data = new(util.Uint160)
|
||||||
if err := dao.GetAndDecode(data, makeContractIDKey(id)); err != nil {
|
if err := dao.GetAndDecode(data, dao.makeContractIDKey(id)); err != nil {
|
||||||
return *data, err
|
return *data, err
|
||||||
}
|
}
|
||||||
return *data, nil
|
return *data, nil
|
||||||
|
@ -121,9 +134,16 @@ func (dao *Simple) GetContractScriptHash(id int32) (util.Uint160, error) {
|
||||||
|
|
||||||
// -- start NEP-17 transfer info.
|
// -- start NEP-17 transfer info.
|
||||||
|
|
||||||
|
func (dao *Simple) makeTTIKey(acc util.Uint160) []byte {
|
||||||
|
key := dao.getKeyBuf(1 + util.Uint160Size)
|
||||||
|
key[0] = byte(storage.STTokenTransferInfo)
|
||||||
|
copy(key[1:], acc.BytesBE())
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
// GetTokenTransferInfo retrieves NEP-17 transfer info from the cache.
|
// GetTokenTransferInfo retrieves NEP-17 transfer info from the cache.
|
||||||
func (dao *Simple) GetTokenTransferInfo(acc util.Uint160) (*state.TokenTransferInfo, error) {
|
func (dao *Simple) GetTokenTransferInfo(acc util.Uint160) (*state.TokenTransferInfo, error) {
|
||||||
key := storage.AppendPrefix(storage.STTokenTransferInfo, acc.BytesBE())
|
key := dao.makeTTIKey(acc)
|
||||||
bs := state.NewTokenTransferInfo()
|
bs := state.NewTokenTransferInfo()
|
||||||
err := dao.GetAndDecode(bs, key)
|
err := dao.GetAndDecode(bs, key)
|
||||||
if err != nil && err != storage.ErrKeyNotFound {
|
if err != nil && err != storage.ErrKeyNotFound {
|
||||||
|
@ -134,20 +154,19 @@ func (dao *Simple) GetTokenTransferInfo(acc util.Uint160) (*state.TokenTransferI
|
||||||
|
|
||||||
// PutTokenTransferInfo saves NEP-17 transfer info in the cache.
|
// PutTokenTransferInfo saves NEP-17 transfer info in the cache.
|
||||||
func (dao *Simple) PutTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo) error {
|
func (dao *Simple) PutTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo) error {
|
||||||
return dao.putTokenTransferInfo(acc, bs, io.NewBufBinWriter())
|
return dao.putTokenTransferInfo(acc, bs, dao.getDataBuf())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dao *Simple) putTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo, buf *io.BufBinWriter) error {
|
func (dao *Simple) putTokenTransferInfo(acc util.Uint160, bs *state.TokenTransferInfo, buf *io.BufBinWriter) error {
|
||||||
key := storage.AppendPrefix(storage.STTokenTransferInfo, acc.BytesBE())
|
return dao.putWithBuffer(bs, dao.makeTTIKey(acc), buf)
|
||||||
return dao.putWithBuffer(bs, key, buf)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// -- end NEP-17 transfer info.
|
// -- end NEP-17 transfer info.
|
||||||
|
|
||||||
// -- start transfer log.
|
// -- start transfer log.
|
||||||
|
|
||||||
func getTokenTransferLogKey(acc util.Uint160, newestTimestamp uint64, index uint32, isNEP11 bool) []byte {
|
func (dao *Simple) getTokenTransferLogKey(acc util.Uint160, newestTimestamp uint64, index uint32, isNEP11 bool) []byte {
|
||||||
key := make([]byte, 1+util.Uint160Size+8+4)
|
key := dao.getKeyBuf(1 + util.Uint160Size + 8 + 4)
|
||||||
if isNEP11 {
|
if isNEP11 {
|
||||||
key[0] = byte(storage.STNEP11Transfers)
|
key[0] = byte(storage.STNEP11Transfers)
|
||||||
} else {
|
} else {
|
||||||
|
@ -163,7 +182,7 @@ func getTokenTransferLogKey(acc util.Uint160, newestTimestamp uint64, index uint
|
||||||
// the transfer with the newest timestamp up to the oldest transfer. It continues
|
// the transfer with the newest timestamp up to the oldest transfer. It continues
|
||||||
// iteration until false is returned from f. The last non-nil error is returned.
|
// iteration until false is returned from f. The last non-nil error is returned.
|
||||||
func (dao *Simple) SeekNEP17TransferLog(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP17Transfer) (bool, error)) error {
|
func (dao *Simple) SeekNEP17TransferLog(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP17Transfer) (bool, error)) error {
|
||||||
key := getTokenTransferLogKey(acc, newestTimestamp, 0, false)
|
key := dao.getTokenTransferLogKey(acc, newestTimestamp, 0, false)
|
||||||
prefixLen := 1 + util.Uint160Size
|
prefixLen := 1 + util.Uint160Size
|
||||||
var seekErr error
|
var seekErr error
|
||||||
dao.Store.Seek(storage.SeekRange{
|
dao.Store.Seek(storage.SeekRange{
|
||||||
|
@ -185,7 +204,7 @@ func (dao *Simple) SeekNEP17TransferLog(acc util.Uint160, newestTimestamp uint64
|
||||||
// the transfer with the newest timestamp up to the oldest transfer. It continues
|
// the transfer with the newest timestamp up to the oldest transfer. It continues
|
||||||
// iteration until false is returned from f. The last non-nil error is returned.
|
// iteration until false is returned from f. The last non-nil error is returned.
|
||||||
func (dao *Simple) SeekNEP11TransferLog(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP11Transfer) (bool, error)) error {
|
func (dao *Simple) SeekNEP11TransferLog(acc util.Uint160, newestTimestamp uint64, f func(*state.NEP11Transfer) (bool, error)) error {
|
||||||
key := getTokenTransferLogKey(acc, newestTimestamp, 0, true)
|
key := dao.getTokenTransferLogKey(acc, newestTimestamp, 0, true)
|
||||||
prefixLen := 1 + util.Uint160Size
|
prefixLen := 1 + util.Uint160Size
|
||||||
var seekErr error
|
var seekErr error
|
||||||
dao.Store.Seek(storage.SeekRange{
|
dao.Store.Seek(storage.SeekRange{
|
||||||
|
@ -205,7 +224,7 @@ func (dao *Simple) SeekNEP11TransferLog(acc util.Uint160, newestTimestamp uint64
|
||||||
|
|
||||||
// GetTokenTransferLog retrieves transfer log from the cache.
|
// GetTokenTransferLog retrieves transfer log from the cache.
|
||||||
func (dao *Simple) GetTokenTransferLog(acc util.Uint160, newestTimestamp uint64, index uint32, isNEP11 bool) (*state.TokenTransferLog, error) {
|
func (dao *Simple) GetTokenTransferLog(acc util.Uint160, newestTimestamp uint64, index uint32, isNEP11 bool) (*state.TokenTransferLog, error) {
|
||||||
key := getTokenTransferLogKey(acc, newestTimestamp, index, isNEP11)
|
key := dao.getTokenTransferLogKey(acc, newestTimestamp, index, isNEP11)
|
||||||
value, err := dao.Store.Get(key)
|
value, err := dao.Store.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == storage.ErrKeyNotFound {
|
if err == storage.ErrKeyNotFound {
|
||||||
|
@ -218,7 +237,7 @@ func (dao *Simple) GetTokenTransferLog(acc util.Uint160, newestTimestamp uint64,
|
||||||
|
|
||||||
// PutTokenTransferLog saves given transfer log in the cache.
|
// PutTokenTransferLog saves given transfer log in the cache.
|
||||||
func (dao *Simple) PutTokenTransferLog(acc util.Uint160, start uint64, index uint32, isNEP11 bool, lg *state.TokenTransferLog) {
|
func (dao *Simple) PutTokenTransferLog(acc util.Uint160, start uint64, index uint32, isNEP11 bool, lg *state.TokenTransferLog) {
|
||||||
key := getTokenTransferLogKey(acc, start, index, isNEP11)
|
key := dao.getTokenTransferLogKey(acc, start, index, isNEP11)
|
||||||
dao.Store.Put(key, lg.Raw)
|
dao.Store.Put(key, lg.Raw)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,10 +245,17 @@ func (dao *Simple) PutTokenTransferLog(acc util.Uint160, start uint64, index uin
|
||||||
|
|
||||||
// -- start notification event.
|
// -- start notification event.
|
||||||
|
|
||||||
|
func (dao *Simple) makeExecutableKey(hash util.Uint256) []byte {
|
||||||
|
key := dao.getKeyBuf(1 + util.Uint256Size)
|
||||||
|
key[0] = byte(storage.DataExecutable)
|
||||||
|
copy(key[1:], hash.BytesBE())
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
// GetAppExecResults gets application execution results with the specified trigger from the
|
// GetAppExecResults gets application execution results with the specified trigger from the
|
||||||
// given store.
|
// given store.
|
||||||
func (dao *Simple) GetAppExecResults(hash util.Uint256, trig trigger.Type) ([]state.AppExecResult, error) {
|
func (dao *Simple) GetAppExecResults(hash util.Uint256, trig trigger.Type) ([]state.AppExecResult, error) {
|
||||||
key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE())
|
key := dao.makeExecutableKey(hash)
|
||||||
bs, err := dao.Store.Get(key)
|
bs, err := dao.Store.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -272,7 +298,7 @@ func (dao *Simple) GetAppExecResults(hash util.Uint256, trig trigger.Type) ([]st
|
||||||
|
|
||||||
// GetStorageItem returns StorageItem if it exists in the given store.
|
// GetStorageItem returns StorageItem if it exists in the given store.
|
||||||
func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem {
|
func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem {
|
||||||
b, err := dao.Store.Get(makeStorageItemKey(dao.Version.StoragePrefix, id, key))
|
b, err := dao.Store.Get(dao.makeStorageItemKey(id, key))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -282,14 +308,14 @@ func (dao *Simple) GetStorageItem(id int32, key []byte) state.StorageItem {
|
||||||
// PutStorageItem puts given StorageItem for given id with given
|
// PutStorageItem puts given StorageItem for given id with given
|
||||||
// key into the given store.
|
// key into the given store.
|
||||||
func (dao *Simple) PutStorageItem(id int32, key []byte, si state.StorageItem) {
|
func (dao *Simple) PutStorageItem(id int32, key []byte, si state.StorageItem) {
|
||||||
stKey := makeStorageItemKey(dao.Version.StoragePrefix, id, key)
|
stKey := dao.makeStorageItemKey(id, key)
|
||||||
dao.Store.Put(stKey, si)
|
dao.Store.Put(stKey, si)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteStorageItem drops storage item for the given id with the
|
// DeleteStorageItem drops storage item for the given id with the
|
||||||
// given key from the store.
|
// given key from the store.
|
||||||
func (dao *Simple) DeleteStorageItem(id int32, key []byte) {
|
func (dao *Simple) DeleteStorageItem(id int32, key []byte) {
|
||||||
stKey := makeStorageItemKey(dao.Version.StoragePrefix, id, key)
|
stKey := dao.makeStorageItemKey(id, key)
|
||||||
dao.Store.Delete(stKey)
|
dao.Store.Delete(stKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,7 +346,7 @@ func (dao *Simple) GetStorageItemsWithPrefix(id int32, prefix []byte) ([]state.S
|
||||||
// starting from the point specified). If key or value is to be used outside of f, they
|
// starting from the point specified). If key or value is to be used outside of f, they
|
||||||
// may not be copied. Seek continues iterating until false is returned from f.
|
// may not be copied. Seek continues iterating until false is returned from f.
|
||||||
func (dao *Simple) Seek(id int32, rng storage.SeekRange, f func(k, v []byte) bool) {
|
func (dao *Simple) Seek(id int32, rng storage.SeekRange, f func(k, v []byte) bool) {
|
||||||
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
rng.Prefix = slice.Copy(dao.makeStorageItemKey(id, rng.Prefix)) // f() can use dao too.
|
||||||
dao.Store.Seek(rng, func(k, v []byte) bool {
|
dao.Store.Seek(rng, func(k, v []byte) bool {
|
||||||
return f(k[len(rng.Prefix):], v)
|
return f(k[len(rng.Prefix):], v)
|
||||||
})
|
})
|
||||||
|
@ -330,15 +356,15 @@ func (dao *Simple) Seek(id int32, rng storage.SeekRange, f func(k, v []byte) boo
|
||||||
// starting from the point specified) to a channel and returns the channel.
|
// starting from the point specified) to a channel and returns the channel.
|
||||||
// Resulting keys and values may not be copied.
|
// Resulting keys and values may not be copied.
|
||||||
func (dao *Simple) SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue {
|
func (dao *Simple) SeekAsync(ctx context.Context, id int32, rng storage.SeekRange) chan storage.KeyValue {
|
||||||
rng.Prefix = makeStorageItemKey(dao.Version.StoragePrefix, id, rng.Prefix)
|
rng.Prefix = slice.Copy(dao.makeStorageItemKey(id, rng.Prefix))
|
||||||
return dao.Store.SeekAsync(ctx, rng, true)
|
return dao.Store.SeekAsync(ctx, rng, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeStorageItemKey returns a key used to store StorageItem in the DB.
|
// makeStorageItemKey returns a key used to store StorageItem in the DB.
|
||||||
func makeStorageItemKey(prefix storage.KeyPrefix, id int32, key []byte) []byte {
|
func (dao *Simple) makeStorageItemKey(id int32, key []byte) []byte {
|
||||||
// 1 for prefix + 4 for Uint32 + len(key) for key
|
// 1 for prefix + 4 for Uint32 + len(key) for key
|
||||||
buf := make([]byte, 5+len(key))
|
buf := dao.getKeyBuf(5 + len(key))
|
||||||
buf[0] = byte(prefix)
|
buf[0] = byte(dao.Version.StoragePrefix)
|
||||||
binary.LittleEndian.PutUint32(buf[1:], uint32(id))
|
binary.LittleEndian.PutUint32(buf[1:], uint32(id))
|
||||||
copy(buf[5:], key)
|
copy(buf[5:], key)
|
||||||
return buf
|
return buf
|
||||||
|
@ -350,7 +376,7 @@ func makeStorageItemKey(prefix storage.KeyPrefix, id int32, key []byte) []byte {
|
||||||
|
|
||||||
// GetBlock returns Block by the given hash if it exists in the store.
|
// GetBlock returns Block by the given hash if it exists in the store.
|
||||||
func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) {
|
func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) {
|
||||||
key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE())
|
key := dao.makeExecutableKey(hash)
|
||||||
b, err := dao.Store.Get(key)
|
b, err := dao.Store.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -519,7 +545,7 @@ func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
|
||||||
// GetTransaction returns Transaction and its height by the given hash
|
// GetTransaction returns Transaction and its height by the given hash
|
||||||
// if it exists in the store. It does not return dummy transactions.
|
// if it exists in the store. It does not return dummy transactions.
|
||||||
func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) {
|
func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction, uint32, error) {
|
||||||
key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE())
|
key := dao.makeExecutableKey(hash)
|
||||||
b, err := dao.Store.Get(key)
|
b, err := dao.Store.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
|
@ -560,14 +586,14 @@ func (dao *Simple) PutCurrentHeader(hashAndIndex []byte) {
|
||||||
|
|
||||||
// PutStateSyncPoint stores current state synchronisation point P.
|
// PutStateSyncPoint stores current state synchronisation point P.
|
||||||
func (dao *Simple) PutStateSyncPoint(p uint32) {
|
func (dao *Simple) PutStateSyncPoint(p uint32) {
|
||||||
buf := make([]byte, 4)
|
buf := dao.getKeyBuf(4) // It's very small, no point in using BufBinWriter.
|
||||||
binary.LittleEndian.PutUint32(buf, p)
|
binary.LittleEndian.PutUint32(buf, p)
|
||||||
dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), buf)
|
dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutStateSyncCurrentBlockHeight stores current block height during state synchronisation process.
|
// PutStateSyncCurrentBlockHeight stores current block height during state synchronisation process.
|
||||||
func (dao *Simple) PutStateSyncCurrentBlockHeight(h uint32) {
|
func (dao *Simple) PutStateSyncCurrentBlockHeight(h uint32) {
|
||||||
buf := make([]byte, 4)
|
buf := dao.getKeyBuf(4) // It's very small, no point in using BufBinWriter.
|
||||||
binary.LittleEndian.PutUint32(buf, h)
|
binary.LittleEndian.PutUint32(buf, h)
|
||||||
dao.Store.Put(storage.SYSStateSyncCurrentBlockHeight.Bytes(), buf)
|
dao.Store.Put(storage.SYSStateSyncCurrentBlockHeight.Bytes(), buf)
|
||||||
}
|
}
|
||||||
|
@ -589,7 +615,7 @@ func read2000Uint256Hashes(b []byte) ([]util.Uint256, error) {
|
||||||
// Transaction hash. It returns an error in case if transaction is in chain
|
// Transaction hash. It returns an error in case if transaction is in chain
|
||||||
// or in the list of conflicting transactions.
|
// or in the list of conflicting transactions.
|
||||||
func (dao *Simple) HasTransaction(hash util.Uint256) error {
|
func (dao *Simple) HasTransaction(hash util.Uint256) error {
|
||||||
key := storage.AppendPrefix(storage.DataExecutable, hash.BytesBE())
|
key := dao.makeExecutableKey(hash)
|
||||||
bytes, err := dao.Store.Get(key)
|
bytes, err := dao.Store.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -606,13 +632,11 @@ func (dao *Simple) HasTransaction(hash util.Uint256) error {
|
||||||
|
|
||||||
// StoreAsBlock stores given block as DataBlock. It can reuse given buffer for
|
// StoreAsBlock stores given block as DataBlock. It can reuse given buffer for
|
||||||
// the purpose of value serialization.
|
// the purpose of value serialization.
|
||||||
func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult, buf *io.BufBinWriter) error {
|
func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, aer2 *state.AppExecResult) error {
|
||||||
var (
|
var (
|
||||||
key = storage.AppendPrefix(storage.DataExecutable, block.Hash().BytesBE())
|
key = dao.makeExecutableKey(block.Hash())
|
||||||
|
buf = dao.getDataBuf()
|
||||||
)
|
)
|
||||||
if buf == nil {
|
|
||||||
buf = io.NewBufBinWriter()
|
|
||||||
}
|
|
||||||
buf.WriteB(storage.ExecBlock)
|
buf.WriteB(storage.ExecBlock)
|
||||||
b, err := block.Trim()
|
b, err := block.Trim()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -634,10 +658,8 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a
|
||||||
|
|
||||||
// DeleteBlock removes block from dao. It's not atomic, so make sure you're
|
// DeleteBlock removes block from dao. It's not atomic, so make sure you're
|
||||||
// using private MemCached instance here.
|
// using private MemCached instance here.
|
||||||
func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
|
func (dao *Simple) DeleteBlock(h util.Uint256) error {
|
||||||
key := make([]byte, util.Uint256Size+1)
|
key := dao.makeExecutableKey(h)
|
||||||
key[0] = byte(storage.DataExecutable)
|
|
||||||
copy(key[1:], h.BytesBE())
|
|
||||||
bs, err := dao.Store.Get(key)
|
bs, err := dao.Store.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -652,9 +674,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if w == nil {
|
w := dao.getDataBuf()
|
||||||
w = io.NewBufBinWriter()
|
|
||||||
}
|
|
||||||
w.WriteB(storage.ExecBlock)
|
w.WriteB(storage.ExecBlock)
|
||||||
b.Header.EncodeBinary(w.BinWriter)
|
b.Header.EncodeBinary(w.BinWriter)
|
||||||
w.BinWriter.WriteB(0)
|
w.BinWriter.WriteB(0)
|
||||||
|
@ -681,10 +701,8 @@ func (dao *Simple) DeleteBlock(h util.Uint256, w *io.BufBinWriter) error {
|
||||||
// StoreAsCurrentBlock stores a hash of the given block with prefix
|
// StoreAsCurrentBlock stores a hash of the given block with prefix
|
||||||
// SYSCurrentBlock. It can reuse given buffer for the purpose of value
|
// SYSCurrentBlock. It can reuse given buffer for the purpose of value
|
||||||
// serialization.
|
// serialization.
|
||||||
func (dao *Simple) StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) {
|
func (dao *Simple) StoreAsCurrentBlock(block *block.Block) {
|
||||||
if buf == nil {
|
buf := dao.getDataBuf()
|
||||||
buf = io.NewBufBinWriter()
|
|
||||||
}
|
|
||||||
h := block.Hash()
|
h := block.Hash()
|
||||||
h.EncodeBinary(buf.BinWriter)
|
h.EncodeBinary(buf.BinWriter)
|
||||||
buf.WriteU32LE(block.Index)
|
buf.WriteU32LE(block.Index)
|
||||||
|
@ -694,11 +712,10 @@ func (dao *Simple) StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter)
|
||||||
// StoreAsTransaction stores given TX as DataTransaction. It also stores transactions
|
// StoreAsTransaction stores given TX as DataTransaction. It also stores transactions
|
||||||
// given tx has conflicts with as DataTransaction with dummy version. It can reuse given
|
// given tx has conflicts with as DataTransaction with dummy version. It can reuse given
|
||||||
// buffer for the purpose of value serialization.
|
// buffer for the purpose of value serialization.
|
||||||
func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, aer *state.AppExecResult, buf *io.BufBinWriter) error {
|
func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32, aer *state.AppExecResult) error {
|
||||||
key := storage.AppendPrefix(storage.DataExecutable, tx.Hash().BytesBE())
|
key := dao.makeExecutableKey(tx.Hash())
|
||||||
if buf == nil {
|
buf := dao.getDataBuf()
|
||||||
buf = io.NewBufBinWriter()
|
|
||||||
}
|
|
||||||
buf.WriteB(storage.ExecTransaction)
|
buf.WriteB(storage.ExecTransaction)
|
||||||
buf.WriteU32LE(index)
|
buf.WriteU32LE(index)
|
||||||
tx.EncodeBinary(buf.BinWriter)
|
tx.EncodeBinary(buf.BinWriter)
|
||||||
|
@ -727,6 +744,21 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dao *Simple) getKeyBuf(len int) []byte {
|
||||||
|
if dao.keyBuf != nil { // Private DAO.
|
||||||
|
return dao.keyBuf[:len] // Should have enough capacity.
|
||||||
|
}
|
||||||
|
return make([]byte, len)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dao *Simple) getDataBuf() *io.BufBinWriter {
|
||||||
|
if dao.dataBuf != nil {
|
||||||
|
dao.dataBuf.Reset()
|
||||||
|
return dao.dataBuf
|
||||||
|
}
|
||||||
|
return io.NewBufBinWriter()
|
||||||
|
}
|
||||||
|
|
||||||
// Persist flushes all the changes made into the (supposedly) persistent
|
// Persist flushes all the changes made into the (supposedly) persistent
|
||||||
// underlying store. It doesn't block accesses to DAO from other threads.
|
// underlying store. It doesn't block accesses to DAO from other threads.
|
||||||
func (dao *Simple) Persist() (int, error) {
|
func (dao *Simple) Persist() (int, error) {
|
||||||
|
|
|
@ -98,7 +98,7 @@ func TestPutGetBlock(t *testing.T) {
|
||||||
Stack: []stackitem.Item{},
|
Stack: []stackitem.Item{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err := dao.StoreAsBlock(b, appExecResult1, appExecResult2, nil)
|
err := dao.StoreAsBlock(b, appExecResult1, appExecResult2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
gotBlock, err := dao.GetBlock(hash)
|
gotBlock, err := dao.GetBlock(hash)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -164,7 +164,7 @@ func TestGetCurrentHeaderHeight_Store(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
dao.StoreAsCurrentBlock(b, nil)
|
dao.StoreAsCurrentBlock(b)
|
||||||
height, err := dao.GetCurrentBlockHeight()
|
height, err := dao.GetCurrentBlockHeight()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, uint32(0), height)
|
require.Equal(t, uint32(0), height)
|
||||||
|
@ -185,7 +185,7 @@ func TestStoreAsTransaction(t *testing.T) {
|
||||||
Stack: []stackitem.Item{},
|
Stack: []stackitem.Item{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err := dao.StoreAsTransaction(tx, 0, aer, nil)
|
err := dao.StoreAsTransaction(tx, 0, aer)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = dao.HasTransaction(hash)
|
err = dao.HasTransaction(hash)
|
||||||
require.NotNil(t, err)
|
require.NotNil(t, err)
|
||||||
|
@ -216,7 +216,7 @@ func TestStoreAsTransaction(t *testing.T) {
|
||||||
Stack: []stackitem.Item{},
|
Stack: []stackitem.Item{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
err := dao.StoreAsTransaction(tx, 0, aer, nil)
|
err := dao.StoreAsTransaction(tx, 0, aer)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = dao.HasTransaction(hash)
|
err = dao.HasTransaction(hash)
|
||||||
require.True(t, errors.Is(err, ErrAlreadyExists))
|
require.True(t, errors.Is(err, ErrAlreadyExists))
|
||||||
|
@ -265,7 +265,7 @@ func BenchmarkStoreAsTransaction(b *testing.B) {
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
err := dao.StoreAsTransaction(tx, 1, aer, nil)
|
err := dao.StoreAsTransaction(tx, 1, aer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.FailNow()
|
b.FailNow()
|
||||||
}
|
}
|
||||||
|
@ -275,18 +275,21 @@ func BenchmarkStoreAsTransaction(b *testing.B) {
|
||||||
func TestMakeStorageItemKey(t *testing.T) {
|
func TestMakeStorageItemKey(t *testing.T) {
|
||||||
var id int32 = 5
|
var id int32 = 5
|
||||||
|
|
||||||
|
dao := NewSimple(storage.NewMemoryStore(), true, false)
|
||||||
|
|
||||||
expected := []byte{byte(storage.STStorage), 0, 0, 0, 0, 1, 2, 3}
|
expected := []byte{byte(storage.STStorage), 0, 0, 0, 0, 1, 2, 3}
|
||||||
binary.LittleEndian.PutUint32(expected[1:5], uint32(id))
|
binary.LittleEndian.PutUint32(expected[1:5], uint32(id))
|
||||||
actual := makeStorageItemKey(storage.STStorage, id, []byte{1, 2, 3})
|
actual := dao.makeStorageItemKey(id, []byte{1, 2, 3})
|
||||||
require.Equal(t, expected, actual)
|
require.Equal(t, expected, actual)
|
||||||
|
|
||||||
expected = expected[0:5]
|
expected = expected[0:5]
|
||||||
actual = makeStorageItemKey(storage.STStorage, id, nil)
|
actual = dao.makeStorageItemKey(id, nil)
|
||||||
require.Equal(t, expected, actual)
|
require.Equal(t, expected, actual)
|
||||||
|
|
||||||
expected = []byte{byte(storage.STTempStorage), 0, 0, 0, 0, 1, 2, 3}
|
expected = []byte{byte(storage.STTempStorage), 0, 0, 0, 0, 1, 2, 3}
|
||||||
binary.LittleEndian.PutUint32(expected[1:5], uint32(id))
|
binary.LittleEndian.PutUint32(expected[1:5], uint32(id))
|
||||||
actual = makeStorageItemKey(storage.STTempStorage, id, []byte{1, 2, 3})
|
dao.Version.StoragePrefix = storage.STTempStorage
|
||||||
|
actual = dao.makeStorageItemKey(id, []byte{1, 2, 3})
|
||||||
require.Equal(t, expected, actual)
|
require.Equal(t, expected, actual)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -330,19 +330,16 @@ func (s *Module) AddBlock(block *block.Block) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cache := s.dao.GetPrivate()
|
cache := s.dao.GetPrivate()
|
||||||
writeBuf := io.NewBufBinWriter()
|
if err := cache.StoreAsBlock(block, nil, nil); err != nil {
|
||||||
if err := cache.StoreAsBlock(block, nil, nil, writeBuf); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
writeBuf.Reset()
|
|
||||||
|
|
||||||
cache.PutStateSyncCurrentBlockHeight(block.Index)
|
cache.PutStateSyncCurrentBlockHeight(block.Index)
|
||||||
|
|
||||||
for _, tx := range block.Transactions {
|
for _, tx := range block.Transactions {
|
||||||
if err := cache.StoreAsTransaction(tx, block.Index, nil, writeBuf); err != nil {
|
if err := cache.StoreAsTransaction(tx, block.Index, nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
writeBuf.Reset()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := cache.Persist()
|
_, err := cache.Persist()
|
||||||
|
|
Loading…
Reference in a new issue