core: refactor and improve verification and pooling

Now we have VerifyTx() and PoolTx() APIs that either verify transaction in
isolation or verify it against the mempool (either the primary one or the one
given) and then add it there. There is no possibility to check against the
mempool, but not add a transaction to it, but I doubt we really need it.

It allows to remove some duplication between old PoolTx and verifyTx where
they both tried to check transaction against mempool (verifying first and then
adding it). It also saves us utility token balance check because it's done by
the mempool anyway and we no longer need to do that explicitly in verifyTx.

It makes AddBlock() and verifyBlock() transaction's checks more correct,
because previously they could miss that even though sender S has enough
balance to pay for A, B or C, he can't pay for all of them.

Caveats:
 * consensus is running concurrently to other processes, so things could
   change while verifyBlock() is iterating over transactions, this will be
   mitigated in subsequent commits

Improves TPS value for single node by at least 11%.

Fixes #667, fixes #668.
This commit is contained in:
Roman Khimov 2020-08-19 19:27:15 +03:00
parent 0d8cc437fe
commit 55b2cbb74d
6 changed files with 127 additions and 59 deletions

View file

@ -13,6 +13,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/config/netmode"
coreb "github.com/nspcc-dev/neo-go/pkg/core/block" coreb "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/blockchainer" "github.com/nspcc-dev/neo-go/pkg/core/blockchainer"
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
"github.com/nspcc-dev/neo-go/pkg/crypto/hash" "github.com/nspcc-dev/neo-go/pkg/crypto/hash"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -358,9 +359,21 @@ func (s *service) verifyBlock(b block.Block) bool {
} }
var fee int64 var fee int64
var pool = mempool.New(len(coreb.Transactions))
var mainPool = s.Chain.GetMemPool()
for _, tx := range coreb.Transactions { for _, tx := range coreb.Transactions {
var err error
fee += tx.SystemFee fee += tx.SystemFee
if err := s.Chain.VerifyTx(tx, coreb); err != nil { if mainPool.ContainsKey(tx.Hash()) {
err = pool.Add(tx, s.Chain)
if err == nil {
continue
}
} else {
err = s.Chain.PoolTx(tx, pool)
}
if err != nil {
s.log.Warn("invalid transaction in proposed block", s.log.Warn("invalid transaction in proposed block",
zap.Stringer("hash", tx.Hash()), zap.Stringer("hash", tx.Hash()),
zap.Error(err)) zap.Error(err))

View file

@ -113,7 +113,7 @@ type Blockchain struct {
stopCh chan struct{} stopCh chan struct{}
runToExitCh chan struct{} runToExitCh chan struct{}
memPool mempool.Pool memPool *mempool.Pool
// This lock protects concurrent access to keyCache. // This lock protects concurrent access to keyCache.
keyCacheLock sync.RWMutex keyCacheLock sync.RWMutex
@ -437,14 +437,20 @@ func (bc *Blockchain) AddBlock(block *block.Block) error {
return fmt.Errorf("block %s is invalid: %w", block.Hash().StringLE(), err) return fmt.Errorf("block %s is invalid: %w", block.Hash().StringLE(), err)
} }
if bc.config.VerifyTransactions { if bc.config.VerifyTransactions {
var mp = mempool.New(len(block.Transactions))
for _, tx := range block.Transactions { for _, tx := range block.Transactions {
var err error
// Transactions are verified before adding them // Transactions are verified before adding them
// into the pool, so there is no point in doing // into the pool, so there is no point in doing
// it again even if we're verifying in-block transactions. // it again even if we're verifying in-block transactions.
if bc.memPool.ContainsKey(tx.Hash()) { if bc.memPool.ContainsKey(tx.Hash()) {
err = mp.Add(tx, bc)
if err == nil {
continue continue
} }
err := bc.VerifyTx(tx, block) } else {
err = bc.verifyAndPoolTx(tx, mp)
}
if err != nil { if err != nil {
return fmt.Errorf("transaction %s failed to verify: %w", tx.Hash().StringLE(), err) return fmt.Errorf("transaction %s failed to verify: %w", tx.Hash().StringLE(), err)
} }
@ -1164,7 +1170,7 @@ func (bc *Blockchain) GetMaxBlockSystemFee() int64 {
// GetMemPool returns the memory pool of the blockchain. // GetMemPool returns the memory pool of the blockchain.
func (bc *Blockchain) GetMemPool() *mempool.Pool { func (bc *Blockchain) GetMemPool() *mempool.Pool {
return &bc.memPool return bc.memPool
} }
// ApplyPolicyToTxSet applies configured policies to given transaction set. It // ApplyPolicyToTxSet applies configured policies to given transaction set. It
@ -1222,8 +1228,9 @@ var (
ErrTxInvalidWitnessNum = errors.New("number of signers doesn't match witnesses") ErrTxInvalidWitnessNum = errors.New("number of signers doesn't match witnesses")
) )
// verifyTx verifies whether a transaction is bonafide or not. // verifyAndPoolTx verifies whether a transaction is bonafide or not and tries
func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) error { // to add it to the mempool given.
func (bc *Blockchain) verifyAndPoolTx(t *transaction.Transaction, pool *mempool.Pool) error {
height := bc.BlockHeight() height := bc.BlockHeight()
if t.ValidUntilBlock <= height || t.ValidUntilBlock > height+transaction.MaxValidUntilBlockIncrement { if t.ValidUntilBlock <= height || t.ValidUntilBlock > height+transaction.MaxValidUntilBlockIncrement {
return fmt.Errorf("%w: ValidUntilBlock = %d, current height = %d", ErrTxExpired, t.ValidUntilBlock, height) return fmt.Errorf("%w: ValidUntilBlock = %d, current height = %d", ErrTxExpired, t.ValidUntilBlock, height)
@ -1233,11 +1240,6 @@ func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) e
// Only one %w can be used. // Only one %w can be used.
return fmt.Errorf("%w: %v", ErrPolicy, err) return fmt.Errorf("%w: %v", ErrPolicy, err)
} }
balance := bc.GetUtilityTokenBalance(t.Sender())
need := t.SystemFee + t.NetworkFee
if balance.Cmp(big.NewInt(need)) < 0 {
return fmt.Errorf("%w: balance is %v, need: %v", ErrInsufficientFunds, balance, need)
}
size := io.GetVarSize(t) size := io.GetVarSize(t)
if size > transaction.MaxTransactionSize { if size > transaction.MaxTransactionSize {
return fmt.Errorf("%w: (%d > MaxTransactionSize %d)", ErrTxTooBig, size, transaction.MaxTransactionSize) return fmt.Errorf("%w: (%d > MaxTransactionSize %d)", ErrTxTooBig, size, transaction.MaxTransactionSize)
@ -1247,13 +1249,30 @@ func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) e
if netFee < 0 { if netFee < 0 {
return fmt.Errorf("%w: net fee is %v, need %v", ErrTxSmallNetworkFee, t.NetworkFee, needNetworkFee) return fmt.Errorf("%w: net fee is %v, need %v", ErrTxSmallNetworkFee, t.NetworkFee, needNetworkFee)
} }
if block == nil { if bc.dao.HasTransaction(t.Hash()) {
if ok := bc.memPool.Verify(t, bc); !ok { return fmt.Errorf("blockchain: %w", ErrAlreadyExists)
}
err := bc.verifyTxWitnesses(t, nil)
if err != nil {
return err
}
err = pool.Add(t, bc)
if err != nil {
switch {
case errors.Is(err, mempool.ErrConflict):
return ErrMemPoolConflict return ErrMemPoolConflict
case errors.Is(err, mempool.ErrDup):
return fmt.Errorf("mempool: %w", ErrAlreadyExists)
case errors.Is(err, mempool.ErrInsufficientFunds):
return ErrInsufficientFunds
case errors.Is(err, mempool.ErrOOM):
return ErrOOM
default:
return err
} }
} }
return bc.verifyTxWitnesses(t, block) return nil
} }
// isTxStillRelevant is a callback for mempool transaction filtering after the // isTxStillRelevant is a callback for mempool transaction filtering after the
@ -1360,38 +1379,31 @@ func (bc *Blockchain) verifyStateRootWitness(r *state.MPTRoot) error {
bc.contracts.Policy.GetMaxVerificationGas(interopCtx.DAO)) bc.contracts.Policy.GetMaxVerificationGas(interopCtx.DAO))
} }
// VerifyTx verifies whether a transaction is bonafide or not. Block parameter // VerifyTx verifies whether transaction is bonafide or not relative to the
// is used for easy interop access and can be omitted for transactions that are // current blockchain state. Note that this verification is completely isolated
// not yet added into any block. // from the main node's mempool.
// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270). func (bc *Blockchain) VerifyTx(t *transaction.Transaction) error {
func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error { var mp = mempool.New(1)
bc.lock.RLock() bc.lock.RLock()
defer bc.lock.RUnlock() defer bc.lock.RUnlock()
return bc.verifyTx(t, block) return bc.verifyAndPoolTx(t, mp)
} }
// PoolTx verifies and tries to add given transaction into the mempool. // PoolTx verifies and tries to add given transaction into the mempool. If not
func (bc *Blockchain) PoolTx(t *transaction.Transaction) error { // given, the default mempool is used. Passing multiple pools is not supported.
func (bc *Blockchain) PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error {
var pool = bc.memPool
bc.lock.RLock() bc.lock.RLock()
defer bc.lock.RUnlock() defer bc.lock.RUnlock()
// Programmer error.
if bc.HasTransaction(t.Hash()) { if len(pools) > 1 {
return fmt.Errorf("blockchain: %w", ErrAlreadyExists) panic("too many pools given")
} }
if err := bc.verifyTx(t, nil); err != nil { if len(pools) == 1 {
return err pool = pools[0]
} }
if err := bc.memPool.Add(t, bc); err != nil { return bc.verifyAndPoolTx(t, pool)
switch {
case errors.Is(err, mempool.ErrOOM):
return ErrOOM
case errors.Is(err, mempool.ErrDup):
return fmt.Errorf("mempool: %w", ErrAlreadyExists)
default:
return err
}
}
return nil
} }
//GetStandByValidators returns validators from the configuration. //GetStandByValidators returns validators from the configuration.

View file

@ -11,6 +11,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config/netmode" "github.com/nspcc-dev/neo-go/pkg/config/netmode"
"github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/interop/interopnames" "github.com/nspcc-dev/neo-go/pkg/core/interop/interopnames"
"github.com/nspcc-dev/neo-go/pkg/core/mempool"
"github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/core/storage" "github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/core/transaction" "github.com/nspcc-dev/neo-go/pkg/core/transaction"
@ -271,7 +272,7 @@ func TestVerifyTx(t *testing.T) {
checkResult(t, res, stackitem.NewBool(true)) checkResult(t, res, stackitem.NewBool(true))
checkErr := func(t *testing.T, expectedErr error, tx *transaction.Transaction) { checkErr := func(t *testing.T, expectedErr error, tx *transaction.Transaction) {
err := bc.verifyTx(tx, nil) err := bc.VerifyTx(tx)
fmt.Println(err) fmt.Println(err)
require.True(t, errors.Is(err, expectedErr)) require.True(t, errors.Is(err, expectedErr))
} }
@ -287,7 +288,7 @@ func TestVerifyTx(t *testing.T) {
t.Run("BlockedAccount", func(t *testing.T) { t.Run("BlockedAccount", func(t *testing.T) {
tx := bc.newTestTx(accs[1].PrivateKey().GetScriptHash(), testScript) tx := bc.newTestTx(accs[1].PrivateKey().GetScriptHash(), testScript)
require.NoError(t, accs[1].SignTx(tx)) require.NoError(t, accs[1].SignTx(tx))
err := bc.verifyTx(tx, nil) err := bc.VerifyTx(tx)
require.True(t, errors.Is(err, ErrPolicy)) require.True(t, errors.Is(err, ErrPolicy))
}) })
t.Run("InsufficientGas", func(t *testing.T) { t.Run("InsufficientGas", func(t *testing.T) {
@ -314,12 +315,13 @@ func TestVerifyTx(t *testing.T) {
tx := bc.newTestTx(h, testScript) tx := bc.newTestTx(h, testScript)
tx.NetworkFee = balance / 2 tx.NetworkFee = balance / 2
require.NoError(t, accs[0].SignTx(tx)) require.NoError(t, accs[0].SignTx(tx))
checkErr(t, nil, tx) require.NoError(t, bc.PoolTx(tx))
tx2 := bc.newTestTx(h, testScript) tx2 := bc.newTestTx(h, testScript)
tx2.NetworkFee = balance / 2 tx2.NetworkFee = balance / 2
require.NoError(t, bc.memPool.Add(tx2, bc)) require.NoError(t, accs[0].SignTx(tx2))
checkErr(t, ErrMemPoolConflict, tx) err := bc.PoolTx(tx2)
require.True(t, errors.Is(err, ErrMemPoolConflict))
}) })
t.Run("NotEnoughWitnesses", func(t *testing.T) { t.Run("NotEnoughWitnesses", func(t *testing.T) {
tx := bc.newTestTx(h, testScript) tx := bc.newTestTx(h, testScript)
@ -337,6 +339,35 @@ func TestVerifyTx(t *testing.T) {
tx.Scripts[0].InvocationScript[10] = ^tx.Scripts[0].InvocationScript[10] tx.Scripts[0].InvocationScript[10] = ^tx.Scripts[0].InvocationScript[10]
checkErr(t, ErrVerificationFailed, tx) checkErr(t, ErrVerificationFailed, tx)
}) })
t.Run("OldTX", func(t *testing.T) {
tx := bc.newTestTx(h, testScript)
require.NoError(t, accs[0].SignTx(tx))
b := bc.newBlock(tx)
require.NoError(t, bc.AddBlock(b))
err := bc.VerifyTx(tx)
require.True(t, errors.Is(err, ErrAlreadyExists))
})
t.Run("MemPooledTX", func(t *testing.T) {
tx := bc.newTestTx(h, testScript)
require.NoError(t, accs[0].SignTx(tx))
require.NoError(t, bc.PoolTx(tx))
err := bc.PoolTx(tx)
require.True(t, errors.Is(err, ErrAlreadyExists))
})
t.Run("MemPoolOOM", func(t *testing.T) {
bc.memPool = mempool.New(1)
tx1 := bc.newTestTx(h, testScript)
tx1.NetworkFee += 10000 // Give it more priority.
require.NoError(t, accs[0].SignTx(tx1))
require.NoError(t, bc.PoolTx(tx1))
tx2 := bc.newTestTx(h, testScript)
require.NoError(t, accs[0].SignTx(tx2))
err := bc.PoolTx(tx2)
require.True(t, errors.Is(err, ErrOOM))
})
} }
func TestVerifyHashAgainstScript(t *testing.T) { func TestVerifyHashAgainstScript(t *testing.T) {

View file

@ -51,12 +51,12 @@ type Blockchainer interface {
mempool.Feer // fee interface mempool.Feer // fee interface
GetMaxBlockSize() uint32 GetMaxBlockSize() uint32
GetMaxBlockSystemFee() int64 GetMaxBlockSystemFee() int64
PoolTx(*transaction.Transaction) error PoolTx(t *transaction.Transaction, pools ...*mempool.Pool) error
SubscribeForBlocks(ch chan<- *block.Block) SubscribeForBlocks(ch chan<- *block.Block)
SubscribeForExecutions(ch chan<- *state.AppExecResult) SubscribeForExecutions(ch chan<- *state.AppExecResult)
SubscribeForNotifications(ch chan<- *state.NotificationEvent) SubscribeForNotifications(ch chan<- *state.NotificationEvent)
SubscribeForTransactions(ch chan<- *transaction.Transaction) SubscribeForTransactions(ch chan<- *transaction.Transaction)
VerifyTx(*transaction.Transaction, *block.Block) error VerifyTx(*transaction.Transaction) error
GetMemPool() *mempool.Pool GetMemPool() *mempool.Pool
UnsubscribeFromBlocks(ch chan<- *block.Block) UnsubscribeFromBlocks(ch chan<- *block.Block)
UnsubscribeFromExecutions(ch chan<- *state.AppExecResult) UnsubscribeFromExecutions(ch chan<- *state.AppExecResult)

View file

@ -12,6 +12,10 @@ import (
) )
var ( var (
// ErrInsufficientFunds is returned when Sender is not able to pay for
// transaction being added irrespective of the other contents of the
// pool.
ErrInsufficientFunds = errors.New("insufficient funds")
// ErrConflict is returned when transaction being added is incompatible // ErrConflict is returned when transaction being added is incompatible
// with the contents of the memory pool (Sender doesn't have enough GAS // with the contents of the memory pool (Sender doesn't have enough GAS
// to pay for all transactions in the pool). // to pay for all transactions in the pool).
@ -114,7 +118,7 @@ func (mp *Pool) tryAddSendersFee(tx *transaction.Transaction, feer Feer, needChe
senderFee.balance = feer.GetUtilityTokenBalance(tx.Sender()) senderFee.balance = feer.GetUtilityTokenBalance(tx.Sender())
mp.fees[tx.Sender()] = senderFee mp.fees[tx.Sender()] = senderFee
} }
if needCheck && !checkBalance(tx, senderFee) { if needCheck && checkBalance(tx, senderFee) != nil {
return false return false
} }
senderFee.feeSum += tx.SystemFee + tx.NetworkFee senderFee.feeSum += tx.SystemFee + tx.NetworkFee
@ -122,11 +126,18 @@ func (mp *Pool) tryAddSendersFee(tx *transaction.Transaction, feer Feer, needChe
return true return true
} }
// checkBalance returns true in case when sender has enough GAS to pay for the // checkBalance returns nil in case when sender has enough GAS to pay for the
// transaction // transaction
func checkBalance(tx *transaction.Transaction, balance utilityBalanceAndFees) bool { func checkBalance(tx *transaction.Transaction, balance utilityBalanceAndFees) error {
needFee := balance.feeSum + tx.SystemFee + tx.NetworkFee txFee := tx.SystemFee + tx.NetworkFee
return balance.balance.Cmp(big.NewInt(needFee)) >= 0 if balance.balance.Cmp(big.NewInt(txFee)) < 0 {
return ErrInsufficientFunds
}
needFee := balance.feeSum + txFee
if balance.balance.Cmp(big.NewInt(needFee)) < 0 {
return ErrConflict
}
return nil
} }
// Add tries to add given transaction to the Pool. // Add tries to add given transaction to the Pool.
@ -140,9 +151,10 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
mp.lock.Unlock() mp.lock.Unlock()
return ErrDup return ErrDup
} }
if !mp.checkTxConflicts(t, fee) { err := mp.checkTxConflicts(t, fee)
if err != nil {
mp.lock.Unlock() mp.lock.Unlock()
return ErrConflict return err
} }
mp.verifiedMap[t.Hash()] = pItem mp.verifiedMap[t.Hash()] = pItem
@ -248,8 +260,8 @@ func (mp *Pool) checkPolicy(tx *transaction.Transaction, policyChanged bool) boo
} }
// New returns a new Pool struct. // New returns a new Pool struct.
func New(capacity int) Pool { func New(capacity int) *Pool {
return Pool{ return &Pool{
verifiedMap: make(map[util.Uint256]*item), verifiedMap: make(map[util.Uint256]*item),
verifiedTxes: make([]*item, 0, capacity), verifiedTxes: make([]*item, 0, capacity),
capacity: capacity, capacity: capacity,
@ -283,7 +295,7 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction {
} }
// checkTxConflicts is an internal unprotected version of Verify. // checkTxConflicts is an internal unprotected version of Verify.
func (mp *Pool) checkTxConflicts(tx *transaction.Transaction, fee Feer) bool { func (mp *Pool) checkTxConflicts(tx *transaction.Transaction, fee Feer) error {
senderFee, ok := mp.fees[tx.Sender()] senderFee, ok := mp.fees[tx.Sender()]
if !ok { if !ok {
senderFee.balance = fee.GetUtilityTokenBalance(tx.Sender()) senderFee.balance = fee.GetUtilityTokenBalance(tx.Sender())
@ -298,5 +310,5 @@ func (mp *Pool) checkTxConflicts(tx *transaction.Transaction, fee Feer) bool {
func (mp *Pool) Verify(tx *transaction.Transaction, feer Feer) bool { func (mp *Pool) Verify(tx *transaction.Transaction, feer Feer) bool {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
return mp.checkTxConflicts(tx, feer) return mp.checkTxConflicts(tx, feer) == nil
} }

View file

@ -149,7 +149,7 @@ func (chain testChain) GetUtilityTokenBalance(uint160 util.Uint160) *big.Int {
panic("TODO") panic("TODO")
} }
func (chain testChain) PoolTx(*transaction.Transaction) error { func (chain testChain) PoolTx(*transaction.Transaction, ...*mempool.Pool) error {
panic("TODO") panic("TODO")
} }
@ -166,7 +166,7 @@ func (chain testChain) SubscribeForTransactions(ch chan<- *transaction.Transacti
panic("TODO") panic("TODO")
} }
func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error { func (chain testChain) VerifyTx(*transaction.Transaction) error {
panic("TODO") panic("TODO")
} }