Merge pull request #2119 from nspcc-dev/states-exchange/insole

core, network: prepare basis for Insole module
This commit is contained in:
Roman Khimov 2021-08-12 10:35:02 +03:00 committed by GitHub
commit 5aff82aef4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 236 additions and 53 deletions

View file

@ -234,6 +234,7 @@ protocol-related settings described in the table below.
| NativeActivations | `map[string][]uint32` | ContractManagement: [0]<br>StdLib: [0]<br>CryptoLib: [0]<br>LedgerContract: [0]<br>NeoToken: [0]<br>GasToken: [0]<br>PolicyContract: [0]<br>RoleManagement: [0]<br>OracleContract: [0] | The list of histories of native contracts updates. Each list item shod be presented as a known native contract name with the corresponding list of chain's heights. The contract is not active until chain reaches the first height value specified in the list. | `Notary` is supported. |
| P2PNotaryRequestPayloadPoolSize | `int` | `1000` | Size of the node's P2P Notary request payloads memory pool where P2P Notary requests are stored before main or fallback transaction is completed and added to the chain.<br>This option is valid only if `P2PSigExtensions` are enabled. | Not supported by the C# node, thus may affect heterogeneous networks functionality. |
| P2PSigExtensions | `bool` | `false` | Enables following additional Notary service related logic:<br>• Transaction attributes `NotValidBefore`, `Conflicts` and `NotaryAssisted`<br>• Network payload of the `P2PNotaryRequest` type<br>• Native `Notary` contract<br>• Notary node module | Not supported by the C# node, thus may affect heterogeneous networks functionality. |
| P2PStateExchangeExtensions | `bool` | `false` | Enables following P2P MPT state data exchange logic: <br>`StateSyncInterval` protocol setting <br>• P2P commands `GetMPTDataCMD` and `MPTDataCMD` | Not supported by the C# node, thus may affect heterogeneous networks functionality. |
| RemoveUntraceableBlocks | `bool`| `false` | Denotes whether old blocks should be removed from cache and database. If enabled, then only last `MaxTraceableBlocks` are stored and accessible to smart contracts. |
| ReservedAttributes | `bool` | `false` | Allows to have reserved attributes range for experimental or private purposes. |
| SaveStorageBatch | `bool` | `false` | Enables storage batch saving before every persist. It is similar to StorageDump plugin for C# node. |
@ -241,6 +242,7 @@ protocol-related settings described in the table below.
| SeedList | `[]string` | [] | List of initial nodes addresses used to establish connectivity. |
| StandbyCommittee | `[]string` | [] | List of public keys of standby committee validators are chosen from. |
| StateRootInHeader | `bool` | `false` | Enables storing state root in block header. | Experimental protocol extension! |
| StateSyncInterval | `int` | `40000` | The number of blocks between state heights available for MPT state data synchronization. | `P2PStateExchangeExtensions` should be enabled to use this setting. |
| ValidatorsCount | `int` | `0` | Number of validators. |
| VerifyBlocks | `bool` | `false` | Denotes whether to verify received blocks. |
| VerifyTransactions | `bool` | `false` | Denotes whether to verify transactions in received blocks. |

View file

@ -38,6 +38,8 @@ type (
NativeUpdateHistories map[string][]uint32 `yaml:"NativeActivations"`
// P2PSigExtensions enables additional signature-related logic.
P2PSigExtensions bool `yaml:"P2PSigExtensions"`
// P2PStateExchangeExtensions enables additional P2P MPT state data exchange logic.
P2PStateExchangeExtensions bool `yaml:"P2PStateExchangeExtensions"`
// ReservedAttributes allows to have reserved attributes range for experimental or private purposes.
ReservedAttributes bool `yaml:"ReservedAttributes"`
// SaveStorageBatch enables storage batch saving before every persist.
@ -47,6 +49,9 @@ type (
StandbyCommittee []string `yaml:"StandbyCommittee"`
// StateRooInHeader enables storing state root in block header.
StateRootInHeader bool `yaml:"StateRootInHeader"`
// StateSyncInterval is the number of blocks between state heights available for MPT state data synchronization.
// It is valid only if P2PStateExchangeExtensions are enabled.
StateSyncInterval int `yaml:"StateSyncInterval"`
ValidatorsCount int `yaml:"ValidatorsCount"`
// Whether to verify received blocks.
VerifyBlocks bool `yaml:"VerifyBlocks"`

View file

@ -43,7 +43,7 @@ import (
// Tuning parameters.
const (
headerBatchCount = 2000
version = "0.1.1"
version = "0.1.2"
defaultInitialGAS = 52000000_00000000
defaultMemPoolSize = 50000
@ -54,6 +54,7 @@ const (
defaultMaxTransactionsPerBlock = 512
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
HeaderVerificationGasLimit = 3_00000000 // 3 GAS
defaultStateSyncInterval = 40000
)
var (
@ -208,6 +209,16 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
log.Info("MaxValidUntilBlockIncrement is not set or wrong, using default value",
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
}
if cfg.StateSyncInterval <= 0 {
cfg.StateSyncInterval = defaultStateSyncInterval
log.Info("StateSyncInterval is not set or wrong, using default value",
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}
}
committee, err := committeeFromConfig(cfg)
if err != nil {
return nil, err
@ -716,21 +727,30 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
writeBuf.Reset()
if bc.config.P2PSigExtensions {
for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
hash := attr.Value.(*transaction.Conflicts).Hash
dummyTx := transaction.NewTrimmedTX(hash)
dummyTx.Version = transaction.DummyVersion
if err := kvcache.StoreAsTransaction(dummyTx, block.Index, writeBuf); err != nil {
blockdone <- fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err)
err := kvcache.StoreConflictingTransactions(tx, block.Index, writeBuf)
if err != nil {
blockdone <- err
return
}
writeBuf.Reset()
}
}
}
if bc.config.RemoveUntraceableBlocks {
if block.Index > bc.config.MaxTraceableBlocks {
index := block.Index - bc.config.MaxTraceableBlocks // is at least 1
var start, stop uint32
if bc.config.P2PStateExchangeExtensions {
// remove batch of old blocks starting from P2-MaxTraceableBlocks-StateSyncInterval up to P2-MaxTraceableBlocks
if block.Index >= 2*uint32(bc.config.StateSyncInterval) &&
block.Index >= uint32(bc.config.StateSyncInterval)+bc.config.MaxTraceableBlocks && // check this in case if MaxTraceableBlocks>StateSyncInterval
int(block.Index)%bc.config.StateSyncInterval == 0 {
stop = block.Index - uint32(bc.config.StateSyncInterval) - bc.config.MaxTraceableBlocks
if stop > uint32(bc.config.StateSyncInterval) {
start = stop - uint32(bc.config.StateSyncInterval)
}
}
} else if block.Index > bc.config.MaxTraceableBlocks {
start = block.Index - bc.config.MaxTraceableBlocks // is at least 1
stop = start + 1
}
for index := start; index < stop; index++ {
err := kvcache.DeleteBlock(bc.headerHashes[index], writeBuf)
if err != nil {
bc.log.Warn("error while removing old block",

View file

@ -1579,6 +1579,29 @@ func TestDumpAndRestore(t *testing.T) {
}
func TestRemoveUntraceable(t *testing.T) {
check := func(t *testing.T, bc *Blockchain, tHash, bHash util.Uint256, errorExpected bool) {
_, _, err := bc.GetTransaction(tHash)
if errorExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = bc.GetAppExecResults(tHash, trigger.Application)
if errorExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = bc.GetBlock(bHash)
if errorExpected {
require.Error(t, err)
} else {
require.NoError(t, err)
}
_, err = bc.GetHeader(bHash)
require.NoError(t, err)
}
t.Run("P2PStateExchangeExtensions off", func(t *testing.T) {
bc := newTestChainWithCustomCfg(t, func(c *config.Config) {
c.ProtocolConfiguration.MaxTraceableBlocks = 2
c.ProtocolConfiguration.RemoveUntraceableBlocks = true
@ -1600,14 +1623,48 @@ func TestRemoveUntraceable(t *testing.T) {
require.NoError(t, bc.AddBlock(bc.newBlock()))
_, _, err = bc.GetTransaction(tx1.Hash())
require.Error(t, err)
_, err = bc.GetAppExecResults(tx1.Hash(), trigger.Application)
require.Error(t, err)
_, err = bc.GetBlock(b1.Hash())
require.Error(t, err)
_, err = bc.GetHeader(b1.Hash())
check(t, bc, tx1.Hash(), b1.Hash(), true)
})
t.Run("P2PStateExchangeExtensions on", func(t *testing.T) {
bc := newTestChainWithCustomCfg(t, func(c *config.Config) {
c.ProtocolConfiguration.MaxTraceableBlocks = 2
c.ProtocolConfiguration.RemoveUntraceableBlocks = true
c.ProtocolConfiguration.P2PStateExchangeExtensions = true
c.ProtocolConfiguration.StateSyncInterval = 2
c.ProtocolConfiguration.StateRootInHeader = true
})
tx1, err := testchain.NewTransferFromOwner(bc, bc.contracts.NEO.Hash, util.Uint160{}, 1, 0, bc.BlockHeight()+1)
require.NoError(t, err)
b1 := bc.newBlock(tx1)
require.NoError(t, bc.AddBlock(b1))
tx1Height := bc.BlockHeight()
tx2, err := testchain.NewTransferFromOwner(bc, bc.contracts.NEO.Hash, util.Uint160{}, 1, 0, bc.BlockHeight()+1)
require.NoError(t, err)
b2 := bc.newBlock(tx2)
require.NoError(t, bc.AddBlock(b2))
tx2Height := bc.BlockHeight()
_, h1, err := bc.GetTransaction(tx1.Hash())
require.NoError(t, err)
require.Equal(t, tx1Height, h1)
require.NoError(t, bc.AddBlock(bc.newBlock()))
require.NoError(t, bc.AddBlock(bc.newBlock()))
require.NoError(t, bc.AddBlock(bc.newBlock()))
check(t, bc, tx1.Hash(), b1.Hash(), false)
check(t, bc, tx2.Hash(), b2.Hash(), false)
require.NoError(t, bc.AddBlock(bc.newBlock()))
check(t, bc, tx1.Hash(), b1.Hash(), true)
check(t, bc, tx2.Hash(), b2.Hash(), false)
_, h2, err := bc.GetTransaction(tx2.Hash())
require.NoError(t, err)
require.Equal(t, tx2Height, h2)
})
}
func TestInvalidNotification(t *testing.T) {

View file

@ -22,7 +22,7 @@ type Blockchainer interface {
ApplyPolicyToTxSet([]*transaction.Transaction) []*transaction.Transaction
GetConfig() config.ProtocolConfiguration
AddHeaders(...*block.Header) error
AddBlock(*block.Block) error
Blockqueuer // Blockqueuer interface
CalculateClaimable(h util.Uint160, endHeight uint32) (*big.Int, error)
Close()
InitVerificationVM(v *vm.VM, getContract func(util.Uint160) (*state.Contract, error), hash util.Uint160, witness *transaction.Witness) error

View file

@ -0,0 +1,9 @@
package blockchainer
import "github.com/nspcc-dev/neo-go/pkg/core/block"
// Blockqueuer is an interface for blockqueue.
type Blockqueuer interface {
AddBlock(block *block.Block) error
BlockHeight() uint32
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
iocore "io"
"sort"
@ -44,6 +45,8 @@ type DAO interface {
GetHeaderHashes() ([]util.Uint256, error)
GetNEP17TransferInfo(acc util.Uint160) (*state.NEP17TransferInfo, error)
GetNEP17TransferLog(acc util.Uint160, index uint32) (*state.NEP17TransferLog, error)
GetStateSyncPoint() (uint32, error)
GetStateSyncCurrentBlockHeight() (uint32, error)
GetStorageItem(id int32, key []byte) state.StorageItem
GetStorageItems(id int32) (map[string]state.StorageItem, error)
GetStorageItemsWithPrefix(id int32, prefix []byte) (map[string]state.StorageItem, error)
@ -57,12 +60,15 @@ type DAO interface {
PutCurrentHeader(hashAndIndex []byte) error
PutNEP17TransferInfo(acc util.Uint160, bs *state.NEP17TransferInfo) error
PutNEP17TransferLog(acc util.Uint160, index uint32, lg *state.NEP17TransferLog) error
PutStateSyncPoint(p uint32) error
PutStateSyncCurrentBlockHeight(h uint32) error
PutStorageItem(id int32, key []byte, si state.StorageItem) error
PutVersion(v string) error
Seek(id int32, prefix []byte, f func(k, v []byte))
StoreAsBlock(block *block.Block, buf *io.BufBinWriter) error
StoreAsCurrentBlock(block *block.Block, buf *io.BufBinWriter) error
StoreAsTransaction(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error
StoreConflictingTransactions(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error
putNEP17TransferInfo(acc util.Uint160, bs *state.NEP17TransferInfo, buf *io.BufBinWriter) error
}
@ -397,6 +403,25 @@ func (dao *Simple) GetCurrentHeaderHeight() (i uint32, h util.Uint256, err error
return
}
// GetStateSyncPoint returns current state synchronisation point P.
func (dao *Simple) GetStateSyncPoint() (uint32, error) {
b, err := dao.Store.Get(storage.SYSStateSyncPoint.Bytes())
if err != nil {
return 0, err
}
return binary.LittleEndian.Uint32(b), nil
}
// GetStateSyncCurrentBlockHeight returns current block height stored during state
// synchronisation process.
func (dao *Simple) GetStateSyncCurrentBlockHeight() (uint32, error) {
b, err := dao.Store.Get(storage.SYSStateSyncCurrentBlockHeight.Bytes())
if err != nil {
return 0, err
}
return binary.LittleEndian.Uint32(b), nil
}
// GetHeaderHashes returns a sorted list of header hashes retrieved from
// the given underlying store.
func (dao *Simple) GetHeaderHashes() ([]util.Uint256, error) {
@ -464,6 +489,20 @@ func (dao *Simple) PutCurrentHeader(hashAndIndex []byte) error {
return dao.Store.Put(storage.SYSCurrentHeader.Bytes(), hashAndIndex)
}
// PutStateSyncPoint stores current state synchronisation point P.
func (dao *Simple) PutStateSyncPoint(p uint32) error {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, p)
return dao.Store.Put(storage.SYSStateSyncPoint.Bytes(), buf)
}
// PutStateSyncCurrentBlockHeight stores current block height during state synchronisation process.
func (dao *Simple) PutStateSyncCurrentBlockHeight(h uint32) error {
buf := make([]byte, 4)
binary.LittleEndian.PutUint32(buf, h)
return dao.Store.Put(storage.SYSStateSyncCurrentBlockHeight.Bytes(), buf)
}
// read2000Uint256Hashes attempts to read 2000 Uint256 hashes from
// the given byte array.
func read2000Uint256Hashes(b []byte) ([]util.Uint256, error) {
@ -589,6 +628,25 @@ func (dao *Simple) StoreAsTransaction(tx *transaction.Transaction, index uint32,
return dao.Store.Put(key, buf.Bytes())
}
// StoreConflictingTransactions stores transactions given tx has conflicts with
// as DataTransaction with dummy version. It can reuse given buffer for the
// purpose of value serialization.
func (dao *Simple) StoreConflictingTransactions(tx *transaction.Transaction, index uint32, buf *io.BufBinWriter) error {
if buf == nil {
buf = io.NewBufBinWriter()
}
for _, attr := range tx.GetAttributes(transaction.ConflictsT) {
hash := attr.Value.(*transaction.Conflicts).Hash
dummyTx := transaction.NewTrimmedTX(hash)
dummyTx.Version = transaction.DummyVersion
if err := dao.StoreAsTransaction(dummyTx, index, buf); err != nil {
return fmt.Errorf("failed to store conflicting transaction %s for transaction %s: %w", hash.StringLE(), tx.Hash().StringLE(), err)
}
buf.Reset()
}
return nil
}
// Persist flushes all the changes made into the (supposedly) persistent
// underlying store.
func (dao *Simple) Persist() (int, error) {

View file

@ -171,3 +171,33 @@ func TestMakeStorageItemKey(t *testing.T) {
actual = makeStorageItemKey(id, nil)
require.Equal(t, expected, actual)
}
func TestPutGetStateSyncPoint(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), true)
// empty store
_, err := dao.GetStateSyncPoint()
require.Error(t, err)
// non-empty store
var expected uint32 = 5
require.NoError(t, dao.PutStateSyncPoint(expected))
actual, err := dao.GetStateSyncPoint()
require.NoError(t, err)
require.Equal(t, expected, actual)
}
func TestPutGetStateSyncCurrentBlockHeight(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), true)
// empty store
_, err := dao.GetStateSyncCurrentBlockHeight()
require.Error(t, err)
// non-empty store
var expected uint32 = 5
require.NoError(t, dao.PutStateSyncCurrentBlockHeight(expected))
actual, err := dao.GetStateSyncCurrentBlockHeight()
require.NoError(t, err)
require.Equal(t, expected, actual)
}

View file

@ -20,6 +20,8 @@ const (
IXHeaderHashList KeyPrefix = 0x80
SYSCurrentBlock KeyPrefix = 0xc0
SYSCurrentHeader KeyPrefix = 0xc1
SYSStateSyncCurrentBlockHeight KeyPrefix = 0xc2
SYSStateSyncPoint KeyPrefix = 0xc3
SYSVersion KeyPrefix = 0xf0
)

View file

@ -11,7 +11,7 @@ type blockQueue struct {
log *zap.Logger
queue *queue.PriorityQueue
checkBlocks chan struct{}
chain blockchainer.Blockchainer
chain blockchainer.Blockqueuer
relayF func(*block.Block)
}
@ -21,7 +21,7 @@ const (
blockCacheSize = 2000
)
func newBlockQueue(capacity int, bc blockchainer.Blockchainer, log *zap.Logger, relayer func(*block.Block)) *blockQueue {
func newBlockQueue(capacity int, bc blockchainer.Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue {
if log == nil {
return nil
}

View file

@ -175,7 +175,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
chain.SetNotary(n)
}
} else if config.P2PNotaryCfg.Enabled {
return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enable")
return nil, errors.New("P2PSigExtensions are disabled, but Notary service is enabled")
}
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, func(b *block.Block) {
s.tryStartServices()