Merge pull request #2366 from nspcc-dev/tiny-dao-deduplication
Some DAO deduplication
This commit is contained in:
commit
4cc1413805
8 changed files with 120 additions and 175 deletions
|
@ -46,7 +46,7 @@ import (
|
||||||
// Tuning parameters.
|
// Tuning parameters.
|
||||||
const (
|
const (
|
||||||
headerBatchCount = 2000
|
headerBatchCount = 2000
|
||||||
version = "0.2.3"
|
version = "0.2.5"
|
||||||
|
|
||||||
defaultInitialGAS = 52000000_00000000
|
defaultInitialGAS = 52000000_00000000
|
||||||
defaultGCPeriod = 10000
|
defaultGCPeriod = 10000
|
||||||
|
@ -325,7 +325,7 @@ func (bc *Blockchain) init() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
bc.headerHashes = []util.Uint256{genesisBlock.Hash()}
|
bc.headerHashes = []util.Uint256{genesisBlock.Hash()}
|
||||||
bc.dao.PutCurrentHeader(hashAndIndexToBytes(genesisBlock.Hash(), genesisBlock.Index))
|
bc.dao.PutCurrentHeader(genesisBlock.Hash(), genesisBlock.Index)
|
||||||
if err := bc.stateRoot.Init(0); err != nil {
|
if err := bc.stateRoot.Init(0); err != nil {
|
||||||
return fmt.Errorf("can't init MPT: %w", err)
|
return fmt.Errorf("can't init MPT: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -406,7 +406,7 @@ func (bc *Blockchain) init() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check whether StateJump stage is in the storage and continue interrupted state jump if so.
|
// Check whether StateJump stage is in the storage and continue interrupted state jump if so.
|
||||||
jumpStage, err := bc.dao.Store.Get(storage.SYSStateJumpStage.Bytes())
|
jumpStage, err := bc.dao.Store.Get([]byte{byte(storage.SYSStateJumpStage)})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) {
|
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().RemoveUntraceableBlocks) {
|
||||||
return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " +
|
return errors.New("state jump was not completed, but P2PStateExchangeExtensions are disabled or archival node capability is on. " +
|
||||||
|
@ -500,7 +500,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateJumpStage) error
|
||||||
|
|
||||||
bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p))
|
bc.log.Info("jumping to state sync point", zap.Uint32("state sync point", p))
|
||||||
|
|
||||||
jumpStageKey := storage.SYSStateJumpStage.Bytes()
|
jumpStageKey := []byte{byte(storage.SYSStateJumpStage)}
|
||||||
switch stage {
|
switch stage {
|
||||||
case none:
|
case none:
|
||||||
bc.dao.Store.Put(jumpStageKey, []byte{byte(stateJumpStarted)})
|
bc.dao.Store.Put(jumpStageKey, []byte{byte(stateJumpStarted)})
|
||||||
|
@ -932,7 +932,6 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := io.NewBufBinWriter()
|
|
||||||
bc.headerHashesLock.Lock()
|
bc.headerHashesLock.Lock()
|
||||||
defer bc.headerHashesLock.Unlock()
|
defer bc.headerHashesLock.Unlock()
|
||||||
oldlen := len(bc.headerHashes)
|
oldlen := len(bc.headerHashes)
|
||||||
|
@ -941,33 +940,25 @@ func (bc *Blockchain) addHeaders(verify bool, headers ...*block.Header) error {
|
||||||
if int(h.Index) != len(bc.headerHashes) {
|
if int(h.Index) != len(bc.headerHashes) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
bc.headerHashes = append(bc.headerHashes, h.Hash())
|
err = batch.StoreHeader(h)
|
||||||
buf.WriteB(storage.ExecBlock)
|
if err != nil {
|
||||||
h.EncodeBinary(buf.BinWriter)
|
return err
|
||||||
buf.BinWriter.WriteB(0)
|
|
||||||
if buf.Err != nil {
|
|
||||||
return buf.Err
|
|
||||||
}
|
}
|
||||||
|
bc.headerHashes = append(bc.headerHashes, h.Hash())
|
||||||
key := storage.AppendPrefix(storage.DataExecutable, h.Hash().BytesBE())
|
|
||||||
batch.Store.Put(key, buf.Bytes())
|
|
||||||
buf.Reset()
|
|
||||||
lastHeader = h
|
lastHeader = h
|
||||||
}
|
}
|
||||||
|
|
||||||
if oldlen != len(bc.headerHashes) {
|
if oldlen != len(bc.headerHashes) {
|
||||||
for int(lastHeader.Index)-headerBatchCount >= int(bc.storedHeaderCount) {
|
for int(lastHeader.Index)-headerBatchCount >= int(bc.storedHeaderCount) {
|
||||||
buf.WriteArray(bc.headerHashes[bc.storedHeaderCount : bc.storedHeaderCount+headerBatchCount])
|
err = batch.StoreHeaderHashes(bc.headerHashes[bc.storedHeaderCount:bc.storedHeaderCount+headerBatchCount],
|
||||||
if buf.Err != nil {
|
bc.storedHeaderCount)
|
||||||
return buf.Err
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
key := storage.AppendPrefixInt(storage.IXHeaderHashList, int(bc.storedHeaderCount))
|
|
||||||
batch.Store.Put(key, buf.Bytes())
|
|
||||||
bc.storedHeaderCount += headerBatchCount
|
bc.storedHeaderCount += headerBatchCount
|
||||||
}
|
}
|
||||||
|
|
||||||
batch.Store.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndexToBytes(lastHeader.Hash(), lastHeader.Index))
|
batch.PutCurrentHeader(lastHeader.Hash(), lastHeader.Index)
|
||||||
updateHeaderHeightMetric(len(bc.headerHashes) - 1)
|
updateHeaderHeightMetric(len(bc.headerHashes) - 1)
|
||||||
if _, err = batch.Persist(); err != nil {
|
if _, err = batch.Persist(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -2146,8 +2137,7 @@ func (bc *Blockchain) GetEnrollments() ([]state.Validator, error) {
|
||||||
|
|
||||||
// GetTestVM returns an interop context with VM set up for a test run.
|
// GetTestVM returns an interop context with VM set up for a test run.
|
||||||
func (bc *Blockchain) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *interop.Context {
|
func (bc *Blockchain) GetTestVM(t trigger.Type, tx *transaction.Transaction, b *block.Block) *interop.Context {
|
||||||
d := bc.dao.GetPrivate()
|
systemInterop := bc.newInteropContext(t, bc.dao, b, tx)
|
||||||
systemInterop := bc.newInteropContext(t, d, b, tx)
|
|
||||||
vm := systemInterop.SpawnVM()
|
vm := systemInterop.SpawnVM()
|
||||||
vm.SetPriceGetter(systemInterop.GetPrice)
|
vm.SetPriceGetter(systemInterop.GetPrice)
|
||||||
vm.LoadToken = contract.LoadToken(systemInterop)
|
vm.LoadToken = contract.LoadToken(systemInterop)
|
||||||
|
@ -2309,13 +2299,6 @@ func (bc *Blockchain) ManagementContractHash() util.Uint160 {
|
||||||
return bc.contracts.Management.Hash
|
return bc.contracts.Management.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
func hashAndIndexToBytes(h util.Uint256, index uint32) []byte {
|
|
||||||
buf := io.NewBufBinWriter()
|
|
||||||
buf.WriteBytes(h.BytesLE())
|
|
||||||
buf.WriteU32LE(index)
|
|
||||||
return buf.Bytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *Blockchain) newInteropContext(trigger trigger.Type, d *dao.Simple, block *block.Block, tx *transaction.Transaction) *interop.Context {
|
func (bc *Blockchain) newInteropContext(trigger trigger.Type, d *dao.Simple, block *block.Block, tx *transaction.Transaction) *interop.Context {
|
||||||
ic := interop.NewContext(trigger, bc, d, bc.contracts.Management.GetContract, bc.contracts.Contracts, block, tx, bc.log)
|
ic := interop.NewContext(trigger, bc, d, bc.contracts.Management.GetContract, bc.contracts.Contracts, block, tx, bc.log)
|
||||||
ic.Functions = systemInterops
|
ic.Functions = systemInterops
|
||||||
|
|
|
@ -124,8 +124,10 @@ func TestAddBlock(t *testing.T) {
|
||||||
_, err = bc.persist(false)
|
_, err = bc.persist(false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
key := make([]byte, 1+util.Uint256Size)
|
||||||
|
key[0] = byte(storage.DataExecutable)
|
||||||
for _, block := range blocks {
|
for _, block := range blocks {
|
||||||
key := storage.AppendPrefix(storage.DataExecutable, block.Hash().BytesBE())
|
copy(key[1:], block.Hash().BytesBE())
|
||||||
_, err := bc.dao.Store.Get(key)
|
_, err := bc.dao.Store.Get(key)
|
||||||
require.NoErrorf(t, err, "block %s not persisted", block.Hash())
|
require.NoErrorf(t, err, "block %s not persisted", block.Hash())
|
||||||
}
|
}
|
||||||
|
@ -1851,7 +1853,9 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
|
||||||
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
|
if bcSpout.dao.Version.StoragePrefix == tempPrefix {
|
||||||
tempPrefix = storage.STStorage
|
tempPrefix = storage.STStorage
|
||||||
}
|
}
|
||||||
bcSpout.dao.Store.Seek(storage.SeekRange{Prefix: bcSpout.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) bool {
|
bPrefix := make([]byte, 1)
|
||||||
|
bPrefix[0] = byte(bcSpout.dao.Version.StoragePrefix)
|
||||||
|
bcSpout.dao.Store.Seek(storage.SeekRange{Prefix: bPrefix}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
key[0] = byte(tempPrefix)
|
key[0] = byte(tempPrefix)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
|
@ -1878,34 +1882,35 @@ func TestBlockchain_InitWithIncompleteStateJump(t *testing.T) {
|
||||||
c.ProtocolConfiguration.KeepOnlyLatestState = true
|
c.ProtocolConfiguration.KeepOnlyLatestState = true
|
||||||
}
|
}
|
||||||
// manually store statejump stage to check statejump recover process
|
// manually store statejump stage to check statejump recover process
|
||||||
|
bPrefix[0] = byte(storage.SYSStateJumpStage)
|
||||||
t.Run("invalid RemoveUntraceableBlocks setting", func(t *testing.T) {
|
t.Run("invalid RemoveUntraceableBlocks setting", func(t *testing.T) {
|
||||||
bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{byte(stateJumpStarted)})
|
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
|
||||||
checkNewBlockchainErr(t, func(c *config.Config) {
|
checkNewBlockchainErr(t, func(c *config.Config) {
|
||||||
boltCfg(c)
|
boltCfg(c)
|
||||||
c.ProtocolConfiguration.RemoveUntraceableBlocks = false
|
c.ProtocolConfiguration.RemoveUntraceableBlocks = false
|
||||||
}, bcSpout.dao.Store, true)
|
}, bcSpout.dao.Store, true)
|
||||||
})
|
})
|
||||||
t.Run("invalid state jump stage format", func(t *testing.T) {
|
t.Run("invalid state jump stage format", func(t *testing.T) {
|
||||||
bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{0x01, 0x02})
|
bcSpout.dao.Store.Put(bPrefix, []byte{0x01, 0x02})
|
||||||
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true)
|
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true)
|
||||||
})
|
})
|
||||||
t.Run("missing state sync point", func(t *testing.T) {
|
t.Run("missing state sync point", func(t *testing.T) {
|
||||||
bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{byte(stateJumpStarted)})
|
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
|
||||||
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true)
|
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true)
|
||||||
})
|
})
|
||||||
t.Run("invalid state sync point", func(t *testing.T) {
|
t.Run("invalid state sync point", func(t *testing.T) {
|
||||||
bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{byte(stateJumpStarted)})
|
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stateJumpStarted)})
|
||||||
point := make([]byte, 4)
|
point := make([]byte, 4)
|
||||||
binary.LittleEndian.PutUint32(point, uint32(len(bcSpout.headerHashes)))
|
binary.LittleEndian.PutUint32(point, uint32(len(bcSpout.headerHashes)))
|
||||||
bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point)
|
bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point)
|
||||||
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true)
|
checkNewBlockchainErr(t, boltCfg, bcSpout.dao.Store, true)
|
||||||
})
|
})
|
||||||
for _, stage := range []stateJumpStage{stateJumpStarted, newStorageItemsAdded, genesisStateRemoved, 0x03} {
|
for _, stage := range []stateJumpStage{stateJumpStarted, newStorageItemsAdded, genesisStateRemoved, 0x03} {
|
||||||
t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) {
|
t.Run(fmt.Sprintf("state jump stage %d", stage), func(t *testing.T) {
|
||||||
bcSpout.dao.Store.Put(storage.SYSStateJumpStage.Bytes(), []byte{byte(stage)})
|
bcSpout.dao.Store.Put(bPrefix, []byte{byte(stage)})
|
||||||
point := make([]byte, 4)
|
point := make([]byte, 4)
|
||||||
binary.LittleEndian.PutUint32(point, uint32(stateSyncPoint))
|
binary.LittleEndian.PutUint32(point, uint32(stateSyncPoint))
|
||||||
bcSpout.dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), point)
|
bcSpout.dao.Store.Put([]byte{byte(storage.SYSStateSyncPoint)}, point)
|
||||||
shouldFail := stage == 0x03 // unknown stage
|
shouldFail := stage == 0x03 // unknown stage
|
||||||
checkNewBlockchainErr(t, spountCfg, bcSpout.dao.Store, shouldFail)
|
checkNewBlockchainErr(t, spountCfg, bcSpout.dao.Store, shouldFail)
|
||||||
})
|
})
|
||||||
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
iocore "io"
|
iocore "io"
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
"github.com/nspcc-dev/neo-go/pkg/core/block"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
"github.com/nspcc-dev/neo-go/pkg/core/state"
|
||||||
|
@ -31,6 +30,7 @@ var (
|
||||||
type Simple struct {
|
type Simple struct {
|
||||||
Version Version
|
Version Version
|
||||||
Store *storage.MemCachedStore
|
Store *storage.MemCachedStore
|
||||||
|
private bool
|
||||||
keyBuf []byte
|
keyBuf []byte
|
||||||
dataBuf *io.BufBinWriter
|
dataBuf *io.BufBinWriter
|
||||||
}
|
}
|
||||||
|
@ -68,19 +68,10 @@ func (dao *Simple) GetWrapped() *Simple {
|
||||||
// GetPrivate returns new DAO instance with another layer of private
|
// GetPrivate returns new DAO instance with another layer of private
|
||||||
// MemCachedStore around the current DAO Store.
|
// MemCachedStore around the current DAO Store.
|
||||||
func (dao *Simple) GetPrivate() *Simple {
|
func (dao *Simple) GetPrivate() *Simple {
|
||||||
st := storage.NewPrivateMemCachedStore(dao.Store)
|
d := &Simple{}
|
||||||
d := newSimple(st, dao.Version.StateRootInHeader, dao.Version.P2PSigExtensions)
|
*d = *dao // Inherit everything...
|
||||||
d.Version = dao.Version
|
d.Store = storage.NewPrivateMemCachedStore(dao.Store) // except storage, wrap another layer.
|
||||||
if dao.keyBuf != nil { // This one is private.
|
d.private = true
|
||||||
d.keyBuf = dao.keyBuf // Thus we can reuse its buffer.
|
|
||||||
} else {
|
|
||||||
d.keyBuf = make([]byte, 0, 1+4+storage.MaxStorageKeyLen) // Prefix, uint32, key.
|
|
||||||
}
|
|
||||||
if dao.dataBuf != nil { // This one is private.
|
|
||||||
d.dataBuf = dao.dataBuf // Thus we can reuse its buffer.
|
|
||||||
} else {
|
|
||||||
d.dataBuf = io.NewBufBinWriter()
|
|
||||||
}
|
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +99,7 @@ func (dao *Simple) putWithBuffer(entity io.Serializable, key []byte, buf *io.Buf
|
||||||
func (dao *Simple) makeContractIDKey(id int32) []byte {
|
func (dao *Simple) makeContractIDKey(id int32) []byte {
|
||||||
key := dao.getKeyBuf(5)
|
key := dao.getKeyBuf(5)
|
||||||
key[0] = byte(storage.STContractID)
|
key[0] = byte(storage.STContractID)
|
||||||
binary.LittleEndian.PutUint32(key[1:], uint32(id))
|
binary.BigEndian.PutUint32(key[1:], uint32(id))
|
||||||
return key
|
return key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,7 +366,10 @@ func (dao *Simple) makeStorageItemKey(id int32, key []byte) []byte {
|
||||||
|
|
||||||
// GetBlock returns Block by the given hash if it exists in the store.
|
// GetBlock returns Block by the given hash if it exists in the store.
|
||||||
func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) {
|
func (dao *Simple) GetBlock(hash util.Uint256) (*block.Block, error) {
|
||||||
key := dao.makeExecutableKey(hash)
|
return dao.getBlock(dao.makeExecutableKey(hash))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dao *Simple) getBlock(key []byte) (*block.Block, error) {
|
||||||
b, err := dao.Store.Get(key)
|
b, err := dao.Store.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -454,12 +448,18 @@ func (v *Version) Bytes() []byte {
|
||||||
return append([]byte(v.Value), '\x00', byte(v.StoragePrefix), mask)
|
return append([]byte(v.Value), '\x00', byte(v.StoragePrefix), mask)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dao *Simple) mkKeyPrefix(k storage.KeyPrefix) []byte {
|
||||||
|
b := dao.getKeyBuf(1)
|
||||||
|
b[0] = byte(k)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
// GetVersion attempts to get the current version stored in the
|
// GetVersion attempts to get the current version stored in the
|
||||||
// underlying store.
|
// underlying store.
|
||||||
func (dao *Simple) GetVersion() (Version, error) {
|
func (dao *Simple) GetVersion() (Version, error) {
|
||||||
var version Version
|
var version Version
|
||||||
|
|
||||||
data, err := dao.Store.Get(storage.SYSVersion.Bytes())
|
data, err := dao.Store.Get(dao.mkKeyPrefix(storage.SYSVersion))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = version.FromBytes(data)
|
err = version.FromBytes(data)
|
||||||
}
|
}
|
||||||
|
@ -469,7 +469,7 @@ func (dao *Simple) GetVersion() (Version, error) {
|
||||||
// GetCurrentBlockHeight returns the current block height found in the
|
// GetCurrentBlockHeight returns the current block height found in the
|
||||||
// underlying store.
|
// underlying store.
|
||||||
func (dao *Simple) GetCurrentBlockHeight() (uint32, error) {
|
func (dao *Simple) GetCurrentBlockHeight() (uint32, error) {
|
||||||
b, err := dao.Store.Get(storage.SYSCurrentBlock.Bytes())
|
b, err := dao.Store.Get(dao.mkKeyPrefix(storage.SYSCurrentBlock))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -480,7 +480,7 @@ func (dao *Simple) GetCurrentBlockHeight() (uint32, error) {
|
||||||
// the underlying store.
|
// the underlying store.
|
||||||
func (dao *Simple) GetCurrentHeaderHeight() (i uint32, h util.Uint256, err error) {
|
func (dao *Simple) GetCurrentHeaderHeight() (i uint32, h util.Uint256, err error) {
|
||||||
var b []byte
|
var b []byte
|
||||||
b, err = dao.Store.Get(storage.SYSCurrentHeader.Bytes())
|
b, err = dao.Store.Get(dao.mkKeyPrefix(storage.SYSCurrentHeader))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -491,7 +491,7 @@ func (dao *Simple) GetCurrentHeaderHeight() (i uint32, h util.Uint256, err error
|
||||||
|
|
||||||
// GetStateSyncPoint returns current state synchronisation point P.
|
// GetStateSyncPoint returns current state synchronisation point P.
|
||||||
func (dao *Simple) GetStateSyncPoint() (uint32, error) {
|
func (dao *Simple) GetStateSyncPoint() (uint32, error) {
|
||||||
b, err := dao.Store.Get(storage.SYSStateSyncPoint.Bytes())
|
b, err := dao.Store.Get(dao.mkKeyPrefix(storage.SYSStateSyncPoint))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -501,7 +501,7 @@ func (dao *Simple) GetStateSyncPoint() (uint32, error) {
|
||||||
// GetStateSyncCurrentBlockHeight returns current block height stored during state
|
// GetStateSyncCurrentBlockHeight returns current block height stored during state
|
||||||
// synchronisation process.
|
// synchronisation process.
|
||||||
func (dao *Simple) GetStateSyncCurrentBlockHeight() (uint32, error) {
|
func (dao *Simple) GetStateSyncCurrentBlockHeight() (uint32, error) {
|
||||||
b, err := dao.Store.Get(storage.SYSStateSyncCurrentBlockHeight.Bytes())
|
b, err := dao.Store.Get(dao.mkKeyPrefix(storage.SYSStateSyncCurrentBlockHeight))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -511,33 +511,19 @@ func (dao *Simple) GetStateSyncCurrentBlockHeight() (uint32, error) {
|
||||||
// GetHeaderHashes returns a sorted list of header hashes retrieved from
|
// GetHeaderHashes returns a sorted list of header hashes retrieved from
|
||||||
// the given underlying store.
|
// the given underlying store.
|
||||||
func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
|
func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
|
||||||
hashMap := make(map[uint32][]util.Uint256)
|
var hashes = make([]util.Uint256, 0)
|
||||||
|
|
||||||
dao.Store.Seek(storage.SeekRange{
|
dao.Store.Seek(storage.SeekRange{
|
||||||
Prefix: storage.IXHeaderHashList.Bytes(),
|
Prefix: dao.mkKeyPrefix(storage.IXHeaderHashList),
|
||||||
}, func(k, v []byte) bool {
|
}, func(k, v []byte) bool {
|
||||||
storedCount := binary.LittleEndian.Uint32(k[1:])
|
newHashes, err := read2000Uint256Hashes(v)
|
||||||
hashes, err := read2000Uint256Hashes(v)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
hashMap[storedCount] = hashes
|
hashes = append(hashes, newHashes...)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
var (
|
|
||||||
hashes = make([]util.Uint256, 0, len(hashMap))
|
|
||||||
sortedKeys = make([]uint32, 0, len(hashMap))
|
|
||||||
)
|
|
||||||
|
|
||||||
for k := range hashMap {
|
|
||||||
sortedKeys = append(sortedKeys, k)
|
|
||||||
}
|
|
||||||
sort.Slice(sortedKeys, func(i, j int) bool { return sortedKeys[i] < sortedKeys[j] })
|
|
||||||
|
|
||||||
for _, key := range sortedKeys {
|
|
||||||
hashes = append(hashes[:key], hashMap[key]...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return hashes, nil
|
return hashes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -575,26 +561,29 @@ func (dao *Simple) GetTransaction(hash util.Uint256) (*transaction.Transaction,
|
||||||
// PutVersion stores the given version in the underlying store.
|
// PutVersion stores the given version in the underlying store.
|
||||||
func (dao *Simple) PutVersion(v Version) {
|
func (dao *Simple) PutVersion(v Version) {
|
||||||
dao.Version = v
|
dao.Version = v
|
||||||
dao.Store.Put(storage.SYSVersion.Bytes(), v.Bytes())
|
dao.Store.Put(dao.mkKeyPrefix(storage.SYSVersion), v.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutCurrentHeader stores current header.
|
// PutCurrentHeader stores current header.
|
||||||
func (dao *Simple) PutCurrentHeader(hashAndIndex []byte) {
|
func (dao *Simple) PutCurrentHeader(h util.Uint256, index uint32) {
|
||||||
dao.Store.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndex)
|
buf := dao.getDataBuf()
|
||||||
|
buf.WriteBytes(h.BytesLE())
|
||||||
|
buf.WriteU32LE(index)
|
||||||
|
dao.Store.Put(dao.mkKeyPrefix(storage.SYSCurrentHeader), buf.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutStateSyncPoint stores current state synchronisation point P.
|
// PutStateSyncPoint stores current state synchronisation point P.
|
||||||
func (dao *Simple) PutStateSyncPoint(p uint32) {
|
func (dao *Simple) PutStateSyncPoint(p uint32) {
|
||||||
buf := dao.getKeyBuf(4) // It's very small, no point in using BufBinWriter.
|
buf := dao.getDataBuf()
|
||||||
binary.LittleEndian.PutUint32(buf, p)
|
buf.WriteU32LE(p)
|
||||||
dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), buf)
|
dao.Store.Put(dao.mkKeyPrefix(storage.SYSStateSyncPoint), buf.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutStateSyncCurrentBlockHeight stores current block height during state synchronisation process.
|
// PutStateSyncCurrentBlockHeight stores current block height during state synchronisation process.
|
||||||
func (dao *Simple) PutStateSyncCurrentBlockHeight(h uint32) {
|
func (dao *Simple) PutStateSyncCurrentBlockHeight(h uint32) {
|
||||||
buf := dao.getKeyBuf(4) // It's very small, no point in using BufBinWriter.
|
buf := dao.getDataBuf()
|
||||||
binary.LittleEndian.PutUint32(buf, h)
|
buf.WriteU32LE(h)
|
||||||
dao.Store.Put(storage.SYSStateSyncCurrentBlockHeight.Bytes(), buf)
|
dao.Store.Put(dao.mkKeyPrefix(storage.SYSStateSyncCurrentBlockHeight), buf.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
// read2000Uint256Hashes attempts to read 2000 Uint256 hashes from
|
// read2000Uint256Hashes attempts to read 2000 Uint256 hashes from
|
||||||
|
@ -610,6 +599,25 @@ func read2000Uint256Hashes(b []byte) ([]util.Uint256, error) {
|
||||||
return hashes, nil
|
return hashes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dao *Simple) mkHeaderHashKey(h uint32) []byte {
|
||||||
|
b := dao.getKeyBuf(1 + 4)
|
||||||
|
b[0] = byte(storage.IXHeaderHashList)
|
||||||
|
binary.BigEndian.PutUint32(b[1:], h)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreHeaderHashes pushes a batch of header hashes into the store.
|
||||||
|
func (dao *Simple) StoreHeaderHashes(hashes []util.Uint256, height uint32) error {
|
||||||
|
key := dao.mkHeaderHashKey(height)
|
||||||
|
buf := dao.getDataBuf()
|
||||||
|
buf.WriteArray(hashes)
|
||||||
|
if buf.Err != nil {
|
||||||
|
return buf.Err
|
||||||
|
}
|
||||||
|
dao.Store.Put(key, buf.Bytes())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// HasTransaction returns nil if the given store does not contain the given
|
// HasTransaction returns nil if the given store does not contain the given
|
||||||
// Transaction hash. It returns an error in case if transaction is in chain
|
// Transaction hash. It returns an error in case if transaction is in chain
|
||||||
// or in the list of conflicting transactions.
|
// or in the list of conflicting transactions.
|
||||||
|
@ -659,29 +667,17 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a
|
||||||
// using private MemCached instance here.
|
// using private MemCached instance here.
|
||||||
func (dao *Simple) DeleteBlock(h util.Uint256) error {
|
func (dao *Simple) DeleteBlock(h util.Uint256) error {
|
||||||
key := dao.makeExecutableKey(h)
|
key := dao.makeExecutableKey(h)
|
||||||
bs, err := dao.Store.Get(key)
|
|
||||||
|
b, err := dao.getBlock(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
r := io.NewBinReaderFromBuf(bs)
|
err = dao.storeHeader(key, &b.Header)
|
||||||
if r.ReadB() != storage.ExecBlock {
|
|
||||||
return errors.New("internal DB inconsistency")
|
|
||||||
}
|
|
||||||
b, err := block.NewTrimmedFromReader(dao.Version.StateRootInHeader, r)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w := dao.getDataBuf()
|
|
||||||
w.WriteB(storage.ExecBlock)
|
|
||||||
b.Header.EncodeBinary(w.BinWriter)
|
|
||||||
w.BinWriter.WriteB(0)
|
|
||||||
if w.Err != nil {
|
|
||||||
return w.Err
|
|
||||||
}
|
|
||||||
dao.Store.Put(key, w.Bytes())
|
|
||||||
|
|
||||||
for _, tx := range b.Transactions {
|
for _, tx := range b.Transactions {
|
||||||
copy(key[1:], tx.Hash().BytesBE())
|
copy(key[1:], tx.Hash().BytesBE())
|
||||||
dao.Store.Delete(key)
|
dao.Store.Delete(key)
|
||||||
|
@ -697,6 +693,23 @@ func (dao *Simple) DeleteBlock(h util.Uint256) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StoreHeader saves block header into the store.
|
||||||
|
func (dao *Simple) StoreHeader(h *block.Header) error {
|
||||||
|
return dao.storeHeader(dao.makeExecutableKey(h.Hash()), h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dao *Simple) storeHeader(key []byte, h *block.Header) error {
|
||||||
|
buf := dao.getDataBuf()
|
||||||
|
buf.WriteB(storage.ExecBlock)
|
||||||
|
h.EncodeBinary(buf.BinWriter)
|
||||||
|
buf.BinWriter.WriteB(0)
|
||||||
|
if buf.Err != nil {
|
||||||
|
return buf.Err
|
||||||
|
}
|
||||||
|
dao.Store.Put(key, buf.Bytes())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// StoreAsCurrentBlock stores a hash of the given block with prefix
|
// StoreAsCurrentBlock stores a hash of the given block with prefix
|
||||||
// SYSCurrentBlock. It can reuse given buffer for the purpose of value
|
// SYSCurrentBlock. It can reuse given buffer for the purpose of value
|
||||||
// serialization.
|
// serialization.
|
||||||
|
@ -705,7 +718,7 @@ func (dao *Simple) StoreAsCurrentBlock(block *block.Block) {
|
||||||
h := block.Hash()
|
h := block.Hash()
|
||||||
h.EncodeBinary(buf.BinWriter)
|
h.EncodeBinary(buf.BinWriter)
|
||||||
buf.WriteU32LE(block.Index)
|
buf.WriteU32LE(block.Index)
|
||||||
dao.Store.Put(storage.SYSCurrentBlock.Bytes(), buf.Bytes())
|
dao.Store.Put(dao.mkKeyPrefix(storage.SYSCurrentBlock), buf.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
// StoreAsTransaction stores given TX as DataTransaction. It also stores transactions
|
// StoreAsTransaction stores given TX as DataTransaction. It also stores transactions
|
||||||
|
@ -744,14 +757,20 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dao *Simple) getKeyBuf(len int) []byte {
|
func (dao *Simple) getKeyBuf(len int) []byte {
|
||||||
if dao.keyBuf != nil { // Private DAO.
|
if dao.private {
|
||||||
|
if dao.keyBuf == nil {
|
||||||
|
dao.keyBuf = make([]byte, 0, 1+4+storage.MaxStorageKeyLen) // Prefix, uint32, key.
|
||||||
|
}
|
||||||
return dao.keyBuf[:len] // Should have enough capacity.
|
return dao.keyBuf[:len] // Should have enough capacity.
|
||||||
}
|
}
|
||||||
return make([]byte, len)
|
return make([]byte, len)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dao *Simple) getDataBuf() *io.BufBinWriter {
|
func (dao *Simple) getDataBuf() *io.BufBinWriter {
|
||||||
if dao.dataBuf != nil {
|
if dao.private {
|
||||||
|
if dao.dataBuf == nil {
|
||||||
|
dao.dataBuf = io.NewBufBinWriter()
|
||||||
|
}
|
||||||
dao.dataBuf.Reset()
|
dao.dataBuf.Reset()
|
||||||
return dao.dataBuf
|
return dao.dataBuf
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,14 +132,14 @@ func TestGetVersion(t *testing.T) {
|
||||||
|
|
||||||
t.Run("invalid", func(t *testing.T) {
|
t.Run("invalid", func(t *testing.T) {
|
||||||
dao := NewSimple(storage.NewMemoryStore(), false, false)
|
dao := NewSimple(storage.NewMemoryStore(), false, false)
|
||||||
dao.Store.Put(storage.SYSVersion.Bytes(), []byte("0.1.2\x00x"))
|
dao.Store.Put([]byte{byte(storage.SYSVersion)}, []byte("0.1.2\x00x"))
|
||||||
|
|
||||||
_, err := dao.GetVersion()
|
_, err := dao.GetVersion()
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
})
|
})
|
||||||
t.Run("old format", func(t *testing.T) {
|
t.Run("old format", func(t *testing.T) {
|
||||||
dao := NewSimple(storage.NewMemoryStore(), false, false)
|
dao := NewSimple(storage.NewMemoryStore(), false, false)
|
||||||
dao.Store.Put(storage.SYSVersion.Bytes(), []byte("0.1.2"))
|
dao.Store.Put([]byte{byte(storage.SYSVersion)}, []byte("0.1.2"))
|
||||||
|
|
||||||
version, err := dao.GetVersion()
|
version, err := dao.GetVersion()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -32,7 +32,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
nodes = make(map[util.Uint256][]byte)
|
nodes = make(map[util.Uint256][]byte)
|
||||||
expectedItems []storage.KeyValue
|
expectedItems []storage.KeyValue
|
||||||
)
|
)
|
||||||
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) bool {
|
expectedStorage.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
expectedItems = append(expectedItems, storage.KeyValue{
|
expectedItems = append(expectedItems, storage.KeyValue{
|
||||||
|
@ -96,7 +96,7 @@ func TestModule_PR2019_discussion_r689629704(t *testing.T) {
|
||||||
|
|
||||||
// Compare resulting storage items and refcounts.
|
// Compare resulting storage items and refcounts.
|
||||||
var actualItems []storage.KeyValue
|
var actualItems []storage.KeyValue
|
||||||
expectedStorage.Seek(storage.SeekRange{Prefix: storage.DataMPT.Bytes()}, func(k, v []byte) bool {
|
expectedStorage.Seek(storage.SeekRange{Prefix: []byte{byte(storage.DataMPT)}}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
actualItems = append(actualItems, storage.KeyValue{
|
actualItems = append(actualItems, storage.KeyValue{
|
||||||
|
|
|
@ -423,7 +423,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
// compare storage states
|
// compare storage states
|
||||||
fetchStorage := func(bc *Blockchain) []storage.KeyValue {
|
fetchStorage := func(bc *Blockchain) []storage.KeyValue {
|
||||||
var kv []storage.KeyValue
|
var kv []storage.KeyValue
|
||||||
bc.dao.Store.Seek(storage.SeekRange{Prefix: bc.dao.Version.StoragePrefix.Bytes()}, func(k, v []byte) bool {
|
bc.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(bc.dao.Version.StoragePrefix)}}, func(k, v []byte) bool {
|
||||||
key := slice.Copy(k)
|
key := slice.Copy(k)
|
||||||
value := slice.Copy(v)
|
value := slice.Copy(v)
|
||||||
if key[0] == byte(storage.STTempStorage) {
|
if key[0] == byte(storage.STTempStorage) {
|
||||||
|
@ -444,7 +444,7 @@ func TestStateSyncModule_RestoreBasicChain(t *testing.T) {
|
||||||
// no temp items should be left
|
// no temp items should be left
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
var haveItems bool
|
var haveItems bool
|
||||||
bcBolt.dao.Store.Seek(storage.SeekRange{Prefix: storage.STStorage.Bytes()}, func(_, _ []byte) bool {
|
bcBolt.dao.Store.Seek(storage.SeekRange{Prefix: []byte{byte(storage.STStorage)}}, func(_, _ []byte) bool {
|
||||||
haveItems = true
|
haveItems = true
|
||||||
return false
|
return false
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
@ -107,28 +106,6 @@ type (
|
||||||
KeyPrefix uint8
|
KeyPrefix uint8
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bytes returns the bytes representation of KeyPrefix.
|
|
||||||
func (k KeyPrefix) Bytes() []byte {
|
|
||||||
return []byte{byte(k)}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendPrefix appends byteslice b to the given KeyPrefix.
|
|
||||||
// AppendKeyPrefix(SYSVersion, []byte{0x00, 0x01}).
|
|
||||||
func AppendPrefix(k KeyPrefix, b []byte) []byte {
|
|
||||||
dest := make([]byte, len(b)+1)
|
|
||||||
dest[0] = byte(k)
|
|
||||||
copy(dest[1:], b)
|
|
||||||
return dest
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendPrefixInt append int n to the given KeyPrefix.
|
|
||||||
// AppendPrefixInt(SYSCurrentHeader, 10001)
|
|
||||||
func AppendPrefixInt(k KeyPrefix, n int) []byte {
|
|
||||||
b := make([]byte, 4)
|
|
||||||
binary.LittleEndian.PutUint32(b, uint32(n))
|
|
||||||
return AppendPrefix(k, b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func seekRangeToPrefixes(sr SeekRange) *util.Range {
|
func seekRangeToPrefixes(sr SeekRange) *util.Range {
|
||||||
var (
|
var (
|
||||||
rang *util.Range
|
rang *util.Range
|
||||||
|
|
|
@ -3,48 +3,9 @@ package storage
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
prefixes = []KeyPrefix{
|
|
||||||
DataExecutable,
|
|
||||||
DataMPT,
|
|
||||||
STStorage,
|
|
||||||
IXHeaderHashList,
|
|
||||||
SYSCurrentBlock,
|
|
||||||
SYSCurrentHeader,
|
|
||||||
SYSVersion,
|
|
||||||
}
|
|
||||||
|
|
||||||
expected = []uint8{
|
|
||||||
0x01,
|
|
||||||
0x03,
|
|
||||||
0x70,
|
|
||||||
0x80,
|
|
||||||
0xc0,
|
|
||||||
0xc1,
|
|
||||||
0xf0,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestAppendPrefix(t *testing.T) {
|
|
||||||
for i := 0; i < len(expected); i++ {
|
|
||||||
value := []byte{0x01, 0x02}
|
|
||||||
prefix := AppendPrefix(prefixes[i], value)
|
|
||||||
assert.Equal(t, KeyPrefix(expected[i]), KeyPrefix(prefix[0]))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAppendPrefixInt(t *testing.T) {
|
|
||||||
for i := 0; i < len(expected); i++ {
|
|
||||||
value := 2000
|
|
||||||
prefix := AppendPrefixInt(prefixes[i], value)
|
|
||||||
assert.Equal(t, KeyPrefix(expected[i]), KeyPrefix(prefix[0]))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBatchToOperations(t *testing.T) {
|
func TestBatchToOperations(t *testing.T) {
|
||||||
b := &MemBatch{
|
b := &MemBatch{
|
||||||
Put: []KeyValueExists{
|
Put: []KeyValueExists{
|
||||||
|
|
Loading…
Reference in a new issue