Merge pull request #1342 from nspcc-dev/tps

Improve TPS, part 1
This commit is contained in:
Roman Khimov 2020-08-21 14:56:51 +03:00 committed by GitHub
commit 4e7a1f6c87
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 369 additions and 120 deletions

View file

@ -13,9 +13,11 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
coreb "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util"
@ -255,9 +257,14 @@ func (s *service) getKeyPair(pubs []crypto.PublicKey) (int, crypto.PrivateKey, c
continue
}
key, err := keys.NEP2Decrypt(acc.EncryptedWIF, s.Config.Wallet.Password)
if err != nil {
continue
key := acc.PrivateKey()
if acc.PrivateKey() == nil {
err := acc.Decrypt(s.Config.Wallet.Password)
if err != nil {
s.log.Fatal("can't unlock account", zap.String("address", address.Uint160ToString(sh)))
break
}
key = acc.PrivateKey()
}
return i, &privateKey{PrivateKey: key}, &publicKey{PublicKey: key.PublicKey()}
@ -342,6 +349,10 @@ func (s *service) getTx(h util.Uint256) block.Transaction {
func (s *service) verifyBlock(b block.Block) bool {
coreb := &b.(*neoBlock).Block
if s.Chain.BlockHeight() >= coreb.Index {
s.log.Warn("proposed block has already outdated")
return false
}
maxBlockSize := int(s.Chain.GetMaxBlockSize())
size := io.GetVarSize(coreb)
if size > maxBlockSize {
@ -352,14 +363,30 @@ func (s *service) verifyBlock(b block.Block) bool {
}
var fee int64
var pool = mempool.New(len(coreb.Transactions))
var mainPool = s.Chain.GetMemPool()
for _, tx := range coreb.Transactions {
var err error
fee += tx.SystemFee
if err := s.Chain.VerifyTx(tx, coreb); err != nil {
if mainPool.ContainsKey(tx.Hash()) {
err = pool.Add(tx, s.Chain)
if err == nil {
continue
}
} else {
err = s.Chain.PoolTx(tx, pool)
}
if err != nil {
s.log.Warn("invalid transaction in proposed block",
zap.Stringer("hash", tx.Hash()),
zap.Error(err))
return false
}
if s.Chain.BlockHeight() >= coreb.Index {
s.log.Warn("proposed block has already outdated")
return false
}
}
maxBlockSysFee := s.Chain.GetMaxBlockSystemFee()

View file

@ -9,6 +9,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/native"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -193,6 +194,83 @@ func TestService_OnPayload(t *testing.T) {
srv.Chain.Close()
}
func TestVerifyBlock(t *testing.T) {
srv := newTestService(t)
defer srv.Chain.Close()
t.Run("good empty", func(t *testing.T) {
b := testchain.NewBlock(t, srv.Chain, 1, 0)
require.True(t, srv.verifyBlock(&neoBlock{Block: *b}))
})
t.Run("good pooled tx", func(t *testing.T) {
tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.RET)}, 100000)
tx.ValidUntilBlock = 1
addSender(t, tx)
signTx(t, srv.Chain.FeePerByte(), tx)
require.NoError(t, srv.Chain.PoolTx(tx))
b := testchain.NewBlock(t, srv.Chain, 1, 0, tx)
require.True(t, srv.verifyBlock(&neoBlock{Block: *b}))
})
t.Run("good non-pooled tx", func(t *testing.T) {
tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.RET)}, 100000)
tx.ValidUntilBlock = 1
addSender(t, tx)
signTx(t, srv.Chain.FeePerByte(), tx)
b := testchain.NewBlock(t, srv.Chain, 1, 0, tx)
require.True(t, srv.verifyBlock(&neoBlock{Block: *b}))
})
t.Run("good conflicting tx", func(t *testing.T) {
tx1 := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.RET)}, 100000)
tx1.NetworkFee = 20_000_000 * native.GASFactor
tx1.ValidUntilBlock = 1
addSender(t, tx1)
signTx(t, srv.Chain.FeePerByte(), tx1)
tx2 := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.RET)}, 100000)
tx2.NetworkFee = 20_000_000 * native.GASFactor
tx2.ValidUntilBlock = 1
addSender(t, tx2)
signTx(t, srv.Chain.FeePerByte(), tx2)
require.NoError(t, srv.Chain.PoolTx(tx1))
require.Error(t, srv.Chain.PoolTx(tx2))
b := testchain.NewBlock(t, srv.Chain, 1, 0, tx2)
require.True(t, srv.verifyBlock(&neoBlock{Block: *b}))
})
t.Run("bad old", func(t *testing.T) {
b := testchain.NewBlock(t, srv.Chain, 1, 0)
b.Index = srv.Chain.BlockHeight()
require.False(t, srv.verifyBlock(&neoBlock{Block: *b}))
})
t.Run("bad big size", func(t *testing.T) {
script := make([]byte, int(srv.Chain.GetMaxBlockSize()))
script[0] = byte(opcode.RET)
tx := transaction.New(netmode.UnitTestNet, script, 100000)
tx.ValidUntilBlock = 1
addSender(t, tx)
signTx(t, srv.Chain.FeePerByte(), tx)
b := testchain.NewBlock(t, srv.Chain, 1, 0, tx)
require.False(t, srv.verifyBlock(&neoBlock{Block: *b}))
})
t.Run("bad tx", func(t *testing.T) {
tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.RET)}, 100000)
tx.ValidUntilBlock = 1
addSender(t, tx)
signTx(t, srv.Chain.FeePerByte(), tx)
tx.Scripts[0].InvocationScript[16] = ^tx.Scripts[0].InvocationScript[16]
b := testchain.NewBlock(t, srv.Chain, 1, 0, tx)
require.False(t, srv.verifyBlock(&neoBlock{Block: *b}))
})
t.Run("bad big sys fee", func(t *testing.T) {
txes := make([]*transaction.Transaction, 2)
for i := range txes {
txes[i] = transaction.New(netmode.UnitTestNet, []byte{byte(opcode.RET)}, srv.Chain.GetMaxBlockSystemFee()/2+1)
txes[i].ValidUntilBlock = 1
addSender(t, txes[i])
signTx(t, srv.Chain.FeePerByte(), txes[i])
}
b := testchain.NewBlock(t, srv.Chain, 1, 0, txes...)
require.False(t, srv.verifyBlock(&neoBlock{Block: *b}))
})
}
func shouldReceive(t *testing.T, ch chan Payload) {
select {
case <-ch:

View file

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math/big"
"sort"
"sync"
"sync/atomic"
"time"
@ -112,7 +113,7 @@ type Blockchain struct {
stopCh chan struct{}
runToExitCh chan struct{}
memPool mempool.Pool
memPool *mempool.Pool
// This lock protects concurrent access to keyCache.
keyCacheLock sync.RWMutex
@ -167,7 +168,7 @@ func NewBlockchain(s storage.Store, cfg config.ProtocolConfiguration, log *zap.L
headersOpDone: make(chan struct{}),
stopCh: make(chan struct{}),
runToExitCh: make(chan struct{}),
memPool: mempool.NewMemPool(cfg.MemPoolSize),
memPool: mempool.New(cfg.MemPoolSize),
keyCache: make(map[util.Uint160]map[string]*keys.PublicKey),
sbCommittee: committee,
log: log,
@ -436,8 +437,20 @@ func (bc *Blockchain) AddBlock(block *block.Block) error {
return fmt.Errorf("block %s is invalid: %w", block.Hash().StringLE(), err)
}
if bc.config.VerifyTransactions {
var mp = mempool.New(len(block.Transactions))
for _, tx := range block.Transactions {
err := bc.VerifyTx(tx, block)
var err error
// Transactions are verified before adding them
// into the pool, so there is no point in doing
// it again even if we're verifying in-block transactions.
if bc.memPool.ContainsKey(tx.Hash()) {
err = mp.Add(tx, bc)
if err == nil {
continue
}
} else {
err = bc.verifyAndPoolTx(tx, mp)
}
if err != nil {
return fmt.Errorf("transaction %s failed to verify: %w", tx.Hash().StringLE(), err)
}
@ -601,7 +614,8 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
}
}
for _, tx := range block.Transactions {
var txHashes = make([]util.Uint256, len(block.Transactions))
for i, tx := range block.Transactions {
if err := cache.StoreAsTransaction(tx, block.Index); err != nil {
return err
}
@ -618,8 +632,8 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
if err != nil {
return fmt.Errorf("failed to persist invocation results: %w", err)
}
for i := range systemInterop.Notifications {
bc.handleNotification(&systemInterop.Notifications[i], cache, block, tx.Hash())
for j := range systemInterop.Notifications {
bc.handleNotification(&systemInterop.Notifications[j], cache, block, tx.Hash())
}
} else {
bc.log.Warn("contract invocation failed",
@ -640,7 +654,11 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
if err != nil {
return fmt.Errorf("failed to store tx exec result: %w", err)
}
txHashes[i] = tx.Hash()
}
sort.Slice(txHashes, func(i, j int) bool {
return txHashes[i].CompareTo(txHashes[j]) < 0
})
root := bc.dao.MPT.StateRoot()
var prevHash util.Uint256
@ -682,7 +700,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
}
bc.topBlock.Store(block)
atomic.StoreUint32(&bc.blockHeight, block.Index)
bc.memPool.RemoveStale(bc.isTxStillRelevant, bc)
bc.memPool.RemoveStale(func(tx *transaction.Transaction) bool { return bc.isTxStillRelevant(tx, txHashes) }, bc)
bc.lock.Unlock()
updateBlockHeightMetric(block.Index)
@ -1152,7 +1170,7 @@ func (bc *Blockchain) GetMaxBlockSystemFee() int64 {
// GetMemPool returns the memory pool of the blockchain.
func (bc *Blockchain) GetMemPool() *mempool.Pool {
return &bc.memPool
return bc.memPool
}
// ApplyPolicyToTxSet applies configured policies to given transaction set. It
@ -1210,8 +1228,9 @@ var (
ErrTxInvalidWitnessNum = errors.New("number of signers doesn't match witnesses")
)
// verifyTx verifies whether a transaction is bonafide or not.
func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) error {
// verifyAndPoolTx verifies whether a transaction is bonafide or not and tries
// to add it to the mempool given.
func (bc *Blockchain) verifyAndPoolTx(t *transaction.Transaction, pool *mempool.Pool) error {
height := bc.BlockHeight()
if t.ValidUntilBlock <= height || t.ValidUntilBlock > height+transaction.MaxValidUntilBlockIncrement {
return fmt.Errorf("%w: ValidUntilBlock = %d, current height = %d", ErrTxExpired, t.ValidUntilBlock, height)
@ -1221,11 +1240,6 @@ func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) e
// Only one %w can be used.
return fmt.Errorf("%w: %v", ErrPolicy, err)
}
balance := bc.GetUtilityTokenBalance(t.Sender())
need := t.SystemFee + t.NetworkFee
if balance.Cmp(big.NewInt(need)) < 0 {
return fmt.Errorf("%w: balance is %v, need: %v", ErrInsufficientFunds, balance, need)
}
size := io.GetVarSize(t)
if size > transaction.MaxTransactionSize {
return fmt.Errorf("%w: (%d > MaxTransactionSize %d)", ErrTxTooBig, size, transaction.MaxTransactionSize)
@ -1235,26 +1249,45 @@ func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) e
if netFee < 0 {
return fmt.Errorf("%w: net fee is %v, need %v", ErrTxSmallNetworkFee, t.NetworkFee, needNetworkFee)
}
if block == nil {
if ok := bc.memPool.Verify(t, bc); !ok {
if bc.dao.HasTransaction(t.Hash()) {
return fmt.Errorf("blockchain: %w", ErrAlreadyExists)
}
err := bc.verifyTxWitnesses(t, nil)
if err != nil {
return err
}
err = pool.Add(t, bc)
if err != nil {
switch {
case errors.Is(err, mempool.ErrConflict):
return ErrMemPoolConflict
case errors.Is(err, mempool.ErrDup):
return fmt.Errorf("mempool: %w", ErrAlreadyExists)
case errors.Is(err, mempool.ErrInsufficientFunds):
return ErrInsufficientFunds
case errors.Is(err, mempool.ErrOOM):
return ErrOOM
default:
return err
}
}
return bc.verifyTxWitnesses(t, block)
return nil
}
// isTxStillRelevant is a callback for mempool transaction filtering after the
// new block addition. It returns false for transactions already present in the
// chain (added by the new block), transactions using some inputs that are
// already used (double spends) and does witness reverification for non-standard
// new block addition. It returns false for transactions added by the new block
// (passed via txHashes) and does witness reverification for non-standard
// contracts. It operates under the assumption that full transaction verification
// was already done so we don't need to check basic things like size, input/output
// correctness, etc.
func (bc *Blockchain) isTxStillRelevant(t *transaction.Transaction) bool {
// correctness, presence in blocks before the new one, etc.
func (bc *Blockchain) isTxStillRelevant(t *transaction.Transaction, txHashes []util.Uint256) bool {
var recheckWitness bool
if bc.dao.HasTransaction(t.Hash()) {
index := sort.Search(len(txHashes), func(i int) bool {
return txHashes[i].CompareTo(t.Hash()) >= 0
})
if index < len(txHashes) && txHashes[index].Equals(t.Hash()) {
return false
}
for i := range t.Scripts {
@ -1346,38 +1379,31 @@ func (bc *Blockchain) verifyStateRootWitness(r *state.MPTRoot) error {
bc.contracts.Policy.GetMaxVerificationGas(interopCtx.DAO))
}
// VerifyTx verifies whether a transaction is bonafide or not. Block parameter
// is used for easy interop access and can be omitted for transactions that are
// not yet added into any block.
// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270).
func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error {
// VerifyTx verifies whether transaction is bonafide or not relative to the
// current blockchain state. Note that this verification is completely isolated
// from the main node's mempool.
func (bc *Blockchain) VerifyTx(t *transaction.Transaction) error {
var mp = mempool.New(1)
bc.lock.RLock()
defer bc.lock.RUnlock()
return bc.verifyTx(t, block)
return bc.verifyAndPoolTx(t, mp)
}
// PoolTx verifies and tries to add given transaction into the mempool.
func (bc *Blockchain) PoolTx(t *transaction.Transaction) error {
// PoolTx verifies and tries to add given transaction into the mempool. If not
// given, the default mempool is used. Passing multiple pools is not supported.
func (bc *Blockchain) PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error {
var pool = bc.memPool
bc.lock.RLock()
defer bc.lock.RUnlock()
if bc.HasTransaction(t.Hash()) {
return fmt.Errorf("blockchain: %w", ErrAlreadyExists)
// Programmer error.
if len(pools) > 1 {
panic("too many pools given")
}
if err := bc.verifyTx(t, nil); err != nil {
return err
if len(pools) == 1 {
pool = pools[0]
}
if err := bc.memPool.Add(t, bc); err != nil {
switch {
case errors.Is(err, mempool.ErrOOM):
return ErrOOM
case errors.Is(err, mempool.ErrDup):
return fmt.Errorf("mempool: %w", ErrAlreadyExists)
default:
return err
}
}
return nil
return bc.verifyAndPoolTx(t, pool)
}
//GetStandByValidators returns validators from the configuration.

View file

@ -11,6 +11,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/interop/interopnames"
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
@ -117,6 +118,43 @@ func TestAddBlock(t *testing.T) {
assert.Equal(t, lastBlock.Hash(), bc.CurrentHeaderHash())
}
func TestAddBadBlock(t *testing.T) {
bc := newTestChain(t)
defer bc.Close()
// It has ValidUntilBlock == 0, which is wrong
tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 0)
tx.Signers = []transaction.Signer{{
Account: testchain.MultisigScriptHash(),
Scopes: transaction.FeeOnly,
}}
require.NoError(t, signTx(bc, tx))
b1 := bc.newBlock(tx)
require.Error(t, bc.AddBlock(b1))
bc.config.VerifyTransactions = false
require.NoError(t, bc.AddBlock(b1))
b2 := bc.newBlock()
b2.PrevHash = util.Uint256{}
require.Error(t, bc.AddBlock(b2))
bc.config.VerifyBlocks = false
require.NoError(t, bc.AddBlock(b2))
tx = transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 0)
tx.ValidUntilBlock = 128
tx.Signers = []transaction.Signer{{
Account: testchain.MultisigScriptHash(),
Scopes: transaction.FeeOnly,
}}
require.NoError(t, signTx(bc, tx))
require.NoError(t, bc.PoolTx(tx))
bc.config.VerifyTransactions = true
bc.config.VerifyBlocks = true
b3 := bc.newBlock(tx)
require.NoError(t, bc.AddBlock(b3))
}
func TestScriptFromWitness(t *testing.T) {
witness := &transaction.Witness{}
h := util.Uint160{1, 2, 3}
@ -234,7 +272,7 @@ func TestVerifyTx(t *testing.T) {
checkResult(t, res, stackitem.NewBool(true))
checkErr := func(t *testing.T, expectedErr error, tx *transaction.Transaction) {
err := bc.verifyTx(tx, nil)
err := bc.VerifyTx(tx)
fmt.Println(err)
require.True(t, errors.Is(err, expectedErr))
}
@ -250,7 +288,7 @@ func TestVerifyTx(t *testing.T) {
t.Run("BlockedAccount", func(t *testing.T) {
tx := bc.newTestTx(accs[1].PrivateKey().GetScriptHash(), testScript)
require.NoError(t, accs[1].SignTx(tx))
err := bc.verifyTx(tx, nil)
err := bc.VerifyTx(tx)
require.True(t, errors.Is(err, ErrPolicy))
})
t.Run("InsufficientGas", func(t *testing.T) {
@ -277,12 +315,13 @@ func TestVerifyTx(t *testing.T) {
tx := bc.newTestTx(h, testScript)
tx.NetworkFee = balance / 2
require.NoError(t, accs[0].SignTx(tx))
checkErr(t, nil, tx)
require.NoError(t, bc.PoolTx(tx))
tx2 := bc.newTestTx(h, testScript)
tx2.NetworkFee = balance / 2
require.NoError(t, bc.memPool.Add(tx2, bc))
checkErr(t, ErrMemPoolConflict, tx)
require.NoError(t, accs[0].SignTx(tx2))
err := bc.PoolTx(tx2)
require.True(t, errors.Is(err, ErrMemPoolConflict))
})
t.Run("NotEnoughWitnesses", func(t *testing.T) {
tx := bc.newTestTx(h, testScript)
@ -300,6 +339,35 @@ func TestVerifyTx(t *testing.T) {
tx.Scripts[0].InvocationScript[10] = ^tx.Scripts[0].InvocationScript[10]
checkErr(t, ErrVerificationFailed, tx)
})
t.Run("OldTX", func(t *testing.T) {
tx := bc.newTestTx(h, testScript)
require.NoError(t, accs[0].SignTx(tx))
b := bc.newBlock(tx)
require.NoError(t, bc.AddBlock(b))
err := bc.VerifyTx(tx)
require.True(t, errors.Is(err, ErrAlreadyExists))
})
t.Run("MemPooledTX", func(t *testing.T) {
tx := bc.newTestTx(h, testScript)
require.NoError(t, accs[0].SignTx(tx))
require.NoError(t, bc.PoolTx(tx))
err := bc.PoolTx(tx)
require.True(t, errors.Is(err, ErrAlreadyExists))
})
t.Run("MemPoolOOM", func(t *testing.T) {
bc.memPool = mempool.New(1)
tx1 := bc.newTestTx(h, testScript)
tx1.NetworkFee += 10000 // Give it more priority.
require.NoError(t, accs[0].SignTx(tx1))
require.NoError(t, bc.PoolTx(tx1))
tx2 := bc.newTestTx(h, testScript)
require.NoError(t, accs[0].SignTx(tx2))
err := bc.PoolTx(tx2)
require.True(t, errors.Is(err, ErrOOM))
})
}
func TestVerifyHashAgainstScript(t *testing.T) {
@ -365,6 +433,34 @@ func TestVerifyHashAgainstScript(t *testing.T) {
})
}
func TestMemPoolRemoval(t *testing.T) {
const added = 16
const notAdded = 32
bc := newTestChain(t)
defer bc.Close()
addedTxes := make([]*transaction.Transaction, added)
notAddedTxes := make([]*transaction.Transaction, notAdded)
for i := range addedTxes {
addedTxes[i] = bc.newTestTx(testchain.MultisigScriptHash(), []byte{byte(opcode.PUSH1)})
require.NoError(t, signTx(bc, addedTxes[i]))
require.NoError(t, bc.PoolTx(addedTxes[i]))
}
for i := range notAddedTxes {
notAddedTxes[i] = bc.newTestTx(testchain.MultisigScriptHash(), []byte{byte(opcode.PUSH1)})
require.NoError(t, signTx(bc, notAddedTxes[i]))
require.NoError(t, bc.PoolTx(notAddedTxes[i]))
}
b := bc.newBlock(addedTxes...)
require.NoError(t, bc.AddBlock(b))
mempool := bc.GetMemPool()
for _, tx := range addedTxes {
require.False(t, mempool.ContainsKey(tx.Hash()))
}
for _, tx := range notAddedTxes {
require.True(t, mempool.ContainsKey(tx.Hash()))
}
}
func TestHasBlock(t *testing.T) {
bc := newTestChain(t)
blocks, err := bc.genBlocks(50)

View file

@ -51,12 +51,12 @@ type Blockchainer interface {
mempool.Feer // fee interface
GetMaxBlockSize() uint32
GetMaxBlockSystemFee() int64
PoolTx(*transaction.Transaction) error
PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
SubscribeForBlocks(ch chan<- *block.Block)
SubscribeForExecutions(ch chan<- *state.AppExecResult)
SubscribeForNotifications(ch chan<- *state.NotificationEvent)
SubscribeForTransactions(ch chan<- *transaction.Transaction)
VerifyTx(*transaction.Transaction, *block.Block) error
VerifyTx(*transaction.Transaction) error
GetMemPool() *mempool.Pool
UnsubscribeFromBlocks(ch chan<- *block.Block)
UnsubscribeFromExecutions(ch chan<- *state.AppExecResult)

View file

@ -12,6 +12,10 @@ import (
)
var (
// ErrInsufficientFunds is returned when Sender is not able to pay for
// transaction being added irrespective of the other contents of the
// pool.
ErrInsufficientFunds = errors.New("insufficient funds")
// ErrConflict is returned when transaction being added is incompatible
// with the contents of the memory pool (Sender doesn't have enough GAS
// to pay for all transactions in the pool).
@ -37,7 +41,7 @@ type items []*item
// sender's transactions which are currently in mempool
type utilityBalanceAndFees struct {
balance *big.Int
feeSum int64
feeSum *big.Int
}
// Pool stores the unconfirms transactions.
@ -112,21 +116,29 @@ func (mp *Pool) tryAddSendersFee(tx *transaction.Transaction, feer Feer, needChe
senderFee, ok := mp.fees[tx.Sender()]
if !ok {
senderFee.balance = feer.GetUtilityTokenBalance(tx.Sender())
senderFee.feeSum = big.NewInt(0)
mp.fees[tx.Sender()] = senderFee
}
if needCheck && !checkBalance(tx, senderFee) {
if needCheck && checkBalance(tx, senderFee) != nil {
return false
}
senderFee.feeSum += tx.SystemFee + tx.NetworkFee
senderFee.feeSum.Add(senderFee.feeSum, big.NewInt(tx.SystemFee+tx.NetworkFee))
mp.fees[tx.Sender()] = senderFee
return true
}
// checkBalance returns true in case when sender has enough GAS to pay for the
// checkBalance returns nil in case when sender has enough GAS to pay for the
// transaction
func checkBalance(tx *transaction.Transaction, balance utilityBalanceAndFees) bool {
needFee := balance.feeSum + tx.SystemFee + tx.NetworkFee
return balance.balance.Cmp(big.NewInt(needFee)) >= 0
func checkBalance(tx *transaction.Transaction, balance utilityBalanceAndFees) error {
txFee := big.NewInt(tx.SystemFee + tx.NetworkFee)
if balance.balance.Cmp(txFee) < 0 {
return ErrInsufficientFunds
}
needFee := txFee.Add(txFee, balance.feeSum)
if balance.balance.Cmp(needFee) < 0 {
return ErrConflict
}
return nil
}
// Add tries to add given transaction to the Pool.
@ -136,14 +148,15 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
timeStamp: time.Now().UTC(),
}
mp.lock.Lock()
if !mp.checkTxConflicts(t, fee) {
mp.lock.Unlock()
return ErrConflict
}
if mp.containsKey(t.Hash()) {
mp.lock.Unlock()
return ErrDup
}
err := mp.checkTxConflicts(t, fee)
if err != nil {
mp.lock.Unlock()
return err
}
mp.verifiedMap[t.Hash()] = pItem
// Insert into sorted array (from max to min, that could also be done
@ -200,7 +213,7 @@ func (mp *Pool) Remove(hash util.Uint256) {
mp.verifiedTxes = mp.verifiedTxes[:num]
}
senderFee := mp.fees[it.txn.Sender()]
senderFee.feeSum -= it.txn.SystemFee + it.txn.NetworkFee
senderFee.feeSum.Sub(senderFee.feeSum, big.NewInt(it.txn.SystemFee+it.txn.NetworkFee))
mp.fees[it.txn.Sender()] = senderFee
}
updateMempoolMetrics(len(mp.verifiedTxes))
@ -247,9 +260,9 @@ func (mp *Pool) checkPolicy(tx *transaction.Transaction, policyChanged bool) boo
return false
}
// NewMemPool returns a new Pool struct.
func NewMemPool(capacity int) Pool {
return Pool{
// New returns a new Pool struct.
func New(capacity int) *Pool {
return &Pool{
verifiedMap: make(map[util.Uint256]*item),
verifiedTxes: make([]*item, 0, capacity),
capacity: capacity,
@ -283,10 +296,11 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction {
}
// checkTxConflicts is an internal unprotected version of Verify.
func (mp *Pool) checkTxConflicts(tx *transaction.Transaction, fee Feer) bool {
func (mp *Pool) checkTxConflicts(tx *transaction.Transaction, fee Feer) error {
senderFee, ok := mp.fees[tx.Sender()]
if !ok {
senderFee.balance = fee.GetUtilityTokenBalance(tx.Sender())
senderFee.feeSum = big.NewInt(0)
}
return checkBalance(tx, senderFee)
}
@ -298,5 +312,5 @@ func (mp *Pool) checkTxConflicts(tx *transaction.Transaction, fee Feer) bool {
func (mp *Pool) Verify(tx *transaction.Transaction, feer Feer) bool {
mp.lock.RLock()
defer mp.lock.RUnlock()
return mp.checkTxConflicts(tx, feer)
return mp.checkTxConflicts(tx, feer) == nil
}

View file

@ -28,7 +28,7 @@ func (fs *FeerStub) GetUtilityTokenBalance(uint160 util.Uint160) *big.Int {
}
func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
mp := NewMemPool(10)
mp := New(10)
tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 0)
tx.Nonce = 0
tx.Signers = []transaction.Signer{{Account: util.Uint160{1, 2, 3}}}
@ -56,7 +56,7 @@ func TestMemPoolAddRemove(t *testing.T) {
func TestOverCapacity(t *testing.T) {
var fs = &FeerStub{}
const mempoolSize = 10
mp := NewMemPool(mempoolSize)
mp := New(mempoolSize)
for i := 0; i < mempoolSize; i++ {
tx := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 0)
@ -129,7 +129,7 @@ func TestOverCapacity(t *testing.T) {
func TestGetVerified(t *testing.T) {
var fs = &FeerStub{}
const mempoolSize = 10
mp := NewMemPool(mempoolSize)
mp := New(mempoolSize)
txes := make([]*transaction.Transaction, 0, mempoolSize)
for i := 0; i < mempoolSize; i++ {
@ -153,7 +153,7 @@ func TestGetVerified(t *testing.T) {
func TestRemoveStale(t *testing.T) {
var fs = &FeerStub{}
const mempoolSize = 10
mp := NewMemPool(mempoolSize)
mp := New(mempoolSize)
txes1 := make([]*transaction.Transaction, 0, mempoolSize/2)
txes2 := make([]*transaction.Transaction, 0, mempoolSize/2)
@ -186,7 +186,7 @@ func TestRemoveStale(t *testing.T) {
}
func TestMemPoolFees(t *testing.T) {
mp := NewMemPool(10)
mp := New(10)
sender0 := util.Uint160{1, 2, 3}
tx0 := transaction.New(netmode.UnitTestNet, []byte{byte(opcode.PUSH1)}, 0)
tx0.NetworkFee = balance.Int64() + 1
@ -205,7 +205,7 @@ func TestMemPoolFees(t *testing.T) {
require.Equal(t, 1, len(mp.fees))
require.Equal(t, utilityBalanceAndFees{
balance: balance,
feeSum: tx1.NetworkFee,
feeSum: big.NewInt(tx1.NetworkFee),
}, mp.fees[sender0])
// balance shouldn't change after adding one more transaction
@ -217,7 +217,7 @@ func TestMemPoolFees(t *testing.T) {
require.Equal(t, 1, len(mp.fees))
require.Equal(t, utilityBalanceAndFees{
balance: balance,
feeSum: balance.Int64(),
feeSum: balance,
}, mp.fees[sender0])
// can't add more transactions as we don't have enough GAS
@ -229,7 +229,7 @@ func TestMemPoolFees(t *testing.T) {
require.Equal(t, 1, len(mp.fees))
require.Equal(t, utilityBalanceAndFees{
balance: balance,
feeSum: balance.Int64(),
feeSum: balance,
}, mp.fees[sender0])
// check whether sender's fee updates correctly
@ -242,7 +242,7 @@ func TestMemPoolFees(t *testing.T) {
require.Equal(t, 1, len(mp.fees))
require.Equal(t, utilityBalanceAndFees{
balance: balance,
feeSum: tx2.NetworkFee,
feeSum: big.NewInt(tx2.NetworkFee),
}, mp.fees[sender0])
// there should be nothing left

View file

@ -1,6 +1,12 @@
package testchain
import (
"testing"
"time"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
@ -8,6 +14,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/vm/emit"
"github.com/stretchr/testify/require"
)
// privNetKeys is a list of unencrypted WIFs sorted by public key.
@ -94,3 +101,32 @@ func Sign(data []byte) []byte {
}
return buf.Bytes()
}
// NewBlock creates new block for the given blockchain with the given offset
// (usually, 1), primary node index and transactions.
func NewBlock(t *testing.T, bc blockchainer.Blockchainer, offset uint32, primary uint32, txs ...*transaction.Transaction) *block.Block {
witness := transaction.Witness{VerificationScript: MultisigVerificationScript()}
height := bc.BlockHeight()
h := bc.GetHeaderHash(int(height))
hdr, err := bc.GetHeader(h)
require.NoError(t, err)
b := &block.Block{
Base: block.Base{
PrevHash: hdr.Hash(),
Timestamp: (uint64(time.Now().UTC().Unix()) + uint64(hdr.Index)) * 1000,
Index: hdr.Index + offset,
NextConsensus: witness.ScriptHash(),
Script: witness,
Network: bc.GetConfig().Magic,
},
ConsensusData: block.ConsensusData{
PrimaryIndex: primary,
Nonce: 1111,
},
Transactions: txs,
}
_ = b.RebuildMerkleRoot()
b.Script.InvocationScript = Sign(b.GetSignedPart())
return b
}

View file

@ -149,7 +149,7 @@ func (chain testChain) GetUtilityTokenBalance(uint160 util.Uint160) *big.Int {
panic("TODO")
}
func (chain testChain) PoolTx(*transaction.Transaction) error {
func (chain testChain) PoolTx(*transaction.Transaction, ...*mempool.Pool) error {
panic("TODO")
}
@ -166,7 +166,7 @@ func (chain testChain) SubscribeForTransactions(ch chan<- *transaction.Transacti
panic("TODO")
}
func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error {
func (chain testChain) VerifyTx(*transaction.Transaction) error {
panic("TODO")
}

View file

@ -18,7 +18,6 @@ import (
"github.com/gorilla/websocket"
"github.com/nspcc-dev/neo-go/pkg/core"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
@ -729,7 +728,7 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []
t.Run("submit", func(t *testing.T) {
rpc := `{"jsonrpc": "2.0", "id": 1, "method": "submitblock", "params": ["%s"]}`
t.Run("invalid signature", func(t *testing.T) {
s := newBlock(t, chain, 1, 0)
s := testchain.NewBlock(t, chain, 1, 0)
s.Script.VerificationScript[8] ^= 0xff
body := doRPCCall(fmt.Sprintf(rpc, encodeBlock(t, s)), httpSrv.URL, t)
checkErrGetResult(t, body, true)
@ -759,13 +758,13 @@ func testRPCProtocol(t *testing.T, doRPCCall func(string, string, *testing.T) []
}
t.Run("invalid height", func(t *testing.T) {
b := newBlock(t, chain, 2, 0, newTx())
b := testchain.NewBlock(t, chain, 2, 0, newTx())
body := doRPCCall(fmt.Sprintf(rpc, encodeBlock(t, b)), httpSrv.URL, t)
checkErrGetResult(t, body, true)
})
t.Run("positive", func(t *testing.T) {
b := newBlock(t, chain, 1, 0, newTx())
b := testchain.NewBlock(t, chain, 1, 0, newTx())
body := doRPCCall(fmt.Sprintf(rpc, encodeBlock(t, b)), httpSrv.URL, t)
data := checkErrGetResult(t, body, false)
var res = new(result.RelayResult)
@ -934,33 +933,6 @@ func encodeBlock(t *testing.T, b *block.Block) string {
return hex.EncodeToString(w.Bytes())
}
func newBlock(t *testing.T, bc blockchainer.Blockchainer, index uint32, primary uint32, txs ...*transaction.Transaction) *block.Block {
witness := transaction.Witness{VerificationScript: testchain.MultisigVerificationScript()}
height := bc.BlockHeight()
h := bc.GetHeaderHash(int(height))
hdr, err := bc.GetHeader(h)
require.NoError(t, err)
b := &block.Block{
Base: block.Base{
PrevHash: hdr.Hash(),
Timestamp: (uint64(time.Now().UTC().Unix()) + uint64(hdr.Index)) * 1000,
Index: hdr.Index + index,
NextConsensus: witness.ScriptHash(),
Script: witness,
Network: bc.GetConfig().Magic,
},
ConsensusData: block.ConsensusData{
PrimaryIndex: primary,
Nonce: 1111,
},
Transactions: txs,
}
_ = b.RebuildMerkleRoot()
b.Script.InvocationScript = testchain.Sign(b.GetSignedPart())
return b
}
func (tc rpcTestCase) getResultPair(e *executor) (expected interface{}, res interface{}) {
expected = tc.result(e)
resVal := reflect.New(reflect.TypeOf(expected).Elem())

View file

@ -284,7 +284,7 @@ func TestFilteredBlockSubscriptions(t *testing.T) {
if primary == 3 {
expectedCnt++
}
b := newBlock(t, chain, 1, primary)
b := testchain.NewBlock(t, chain, 1, primary)
require.NoError(t, chain.AddBlock(b))
}
@ -437,7 +437,7 @@ func testSubscriptionOverflow(t *testing.T) {
// Push a lot of new blocks, but don't read events for them.
for i := 0; i < blockCnt; i++ {
b := newBlock(t, chain, 1, 0)
b := testchain.NewBlock(t, chain, 1, 0)
require.NoError(t, chain.AddBlock(b))
}
for i := 0; i < blockCnt; i++ {