core/mempool: fix AddBlock and tx pooling concurrency issues

Eliminate races between tx checks and adding them to the mempool, ensure the
chain doesn't change while we're working with the new tx. Ensure only one
block addition attempt could be in progress.
This commit is contained in:
Roman Khimov 2020-02-04 18:43:21 +03:00
parent f0bb886be3
commit 70b3839fd0
7 changed files with 120 additions and 29 deletions

View file

@ -43,7 +43,7 @@ func TestService_GetVerified(t *testing.T) {
pool := srv.Chain.GetMemPool()
item := mempool.NewPoolItem(txs[3], new(feer))
require.True(t, pool.TryAdd(txs[3].Hash(), item))
require.NoError(t, pool.TryAdd(txs[3].Hash(), item))
hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()}
@ -69,7 +69,7 @@ func TestService_GetVerified(t *testing.T) {
t.Run("more than half of the last proposal will be reused", func(t *testing.T) {
for _, tx := range txs[:2] {
item := mempool.NewPoolItem(tx, new(feer))
require.True(t, pool.TryAdd(tx.Hash(), item))
require.NoError(t, pool.TryAdd(tx.Hash(), item))
}
txx := srv.getVerifiedTx(10)

View file

@ -7,6 +7,7 @@ import (
"math/big"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
@ -41,6 +42,15 @@ const (
defaultMemPoolSize = 50000
)
var (
// ErrAlreadyExists is returned when trying to add some already existing
// transaction into the pool (not specifying whether it exists in the
// chain or mempool).
ErrAlreadyExists = errors.New("already exists")
// ErrOOM is returned when adding transaction to the memory pool because
// it reached its full capacity.
ErrOOM = errors.New("no space left in the memory pool")
)
var (
genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
decrementInterval = 2000000
@ -51,6 +61,19 @@ var (
type Blockchain struct {
config config.ProtocolConfiguration
// The only way chain state changes is by adding blocks, so we can't
// allow concurrent block additions. It differs from the next lock in
// that it's only for AddBlock method itself, the chain state is
// protected by the lock below, but holding it during all of AddBlock
// is too expensive (because the state only changes when persisting
// change cache).
addLock sync.Mutex
// This lock ensures blockchain immutability for operations that need
// that while performing their tasks. It's mostly used as a read lock
// with the only writer being the block addition logic.
lock sync.RWMutex
// Data access object for CRUD operations around storage.
dao *dao
@ -251,6 +274,9 @@ func (bc *Blockchain) Close() {
// AddBlock accepts successive block for the Blockchain, verifies it and
// stores internally. Eventually it will be persisted to the backing storage.
func (bc *Blockchain) AddBlock(block *block.Block) error {
bc.addLock.Lock()
defer bc.addLock.Unlock()
expectedHeight := bc.BlockHeight() + 1
if expectedHeight != block.Index {
return fmt.Errorf("expected block %d, but passed block %d", expectedHeight, block.Index)
@ -575,6 +601,9 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
}
}
}
bc.lock.Lock()
defer bc.lock.Unlock()
_, err := cache.Persist()
if err != nil {
return err
@ -982,11 +1011,8 @@ func (bc *Blockchain) VerifyBlock(block *block.Block) error {
return bc.verifyBlockWitnesses(block, prevHeader)
}
// VerifyTx verifies whether a transaction is bonafide or not. Block parameter
// is used for easy interop access and can be omitted for transactions that are
// not yet added into any block.
// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270).
func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error {
// verifyTx verifies whether a transaction is bonafide or not.
func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) error {
if io.GetVarSize(t) > transaction.MaxTransactionSize {
return errors.Errorf("invalid transaction size = %d. It shoud be less then MaxTransactionSize = %d", io.GetVarSize(t), transaction.MaxTransactionSize)
}
@ -1017,6 +1043,40 @@ func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) e
return bc.verifyTxWitnesses(t, block)
}
// VerifyTx verifies whether a transaction is bonafide or not. Block parameter
// is used for easy interop access and can be omitted for transactions that are
// not yet added into any block.
// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270).
func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error {
bc.lock.RLock()
defer bc.lock.RUnlock()
return bc.verifyTx(t, block)
}
// PoolTx verifies and tries to add given transaction into the mempool.
func (bc *Blockchain) PoolTx(t *transaction.Transaction) error {
bc.lock.RLock()
defer bc.lock.RUnlock()
if bc.HasTransaction(t.Hash()) {
return ErrAlreadyExists
}
if err := bc.verifyTx(t, nil); err != nil {
return err
}
if err := bc.memPool.TryAdd(t.Hash(), mempool.NewPoolItem(t, bc)); err != nil {
switch err {
case mempool.ErrOOM:
return ErrOOM
case mempool.ErrConflict:
return ErrAlreadyExists
default:
return err
}
}
return nil
}
func (bc *Blockchain) verifyInputs(t *transaction.Transaction) bool {
for i := 1; i < len(t.Inputs); i++ {
for j := 0; j < i; j++ {

View file

@ -40,6 +40,7 @@ type Blockchainer interface {
GetUnspentCoinState(util.Uint256) *UnspentCoinState
References(t *transaction.Transaction) map[transaction.Input]*transaction.Output
mempool.Feer // fee interface
PoolTx(*transaction.Transaction) error
VerifyTx(*transaction.Transaction, *block.Block) error
GetMemPool() *mempool.Pool
}

View file

@ -1,6 +1,7 @@
package mempool
import (
"errors"
"sort"
"sync"
"time"
@ -9,6 +10,19 @@ import (
"github.com/CityOfZion/neo-go/pkg/util"
)
var (
// ErrConflict is returned when transaction being added is incompatible
// with the contents of the memory pool (using the same inputs as some
// other transaction in the pool)
ErrConflict = errors.New("conflicts with the memory pool")
// ErrDup is returned when transaction being added is already present
// in the memory pool.
ErrDup = errors.New("already in the memory pool")
// ErrOOM is returned when transaction just doesn't fit in the memory
// pool because of its capacity constraints.
ErrOOM = errors.New("out of memory")
)
// Item represents a transaction in the the Memory pool.
type Item struct {
txn *transaction.Transaction
@ -101,13 +115,17 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool {
}
// TryAdd try to add the Item to the Pool.
func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) bool {
func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error {
var pool Items
mp.lock.Lock()
if !mp.verifyInputs(pItem.txn) {
mp.lock.Unlock()
return ErrConflict
}
if _, ok := mp.unsortedTxn[hash]; ok {
mp.lock.Unlock()
return false
return ErrDup
}
mp.unsortedTxn[hash] = pItem
mp.lock.Unlock()
@ -130,7 +148,10 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) bool {
_, ok := mp.unsortedTxn[hash]
updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn))
mp.lock.RUnlock()
return ok
if !ok {
return ErrOOM
}
return nil
}
// Remove removes an item from the mempool, if it exists there (and does
@ -283,12 +304,8 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction {
return t
}
// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool.
// If yes, the transaction tx is not a valid transaction and the function return false.
// If no, the transaction tx is a valid transaction and the function return true.
func (mp *Pool) Verify(tx *transaction.Transaction) bool {
mp.lock.RLock()
defer mp.lock.RUnlock()
// verifyInputs is an internal unprotected version of Verify.
func (mp *Pool) verifyInputs(tx *transaction.Transaction) bool {
for _, item := range mp.unsortedTxn {
for i := range item.txn.Inputs {
for j := 0; j < len(tx.Inputs); j++ {
@ -301,3 +318,12 @@ func (mp *Pool) Verify(tx *transaction.Transaction) bool {
return true
}
// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool.
// If yes, the transaction tx is not a valid transaction and the function return false.
// If no, the transaction tx is a valid transaction and the function return true.
func (mp *Pool) Verify(tx *transaction.Transaction) bool {
mp.lock.RLock()
defer mp.lock.RUnlock()
return mp.verifyInputs(tx)
}

View file

@ -39,9 +39,9 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
item := NewPoolItem(tx, fs)
_, ok := mp.TryGetValue(tx.Hash())
require.Equal(t, false, ok)
require.Equal(t, true, mp.TryAdd(tx.Hash(), item))
require.NoError(t, mp.TryAdd(tx.Hash(), item))
// Re-adding should fail.
require.Equal(t, false, mp.TryAdd(tx.Hash(), item))
require.Error(t, mp.TryAdd(tx.Hash(), item))
tx2, ok := mp.TryGetValue(tx.Hash())
require.Equal(t, true, ok)
require.Equal(t, tx, tx2)
@ -71,14 +71,14 @@ func TestMemPoolVerify(t *testing.T) {
tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0})
require.Equal(t, true, mp.Verify(tx))
item := NewPoolItem(tx, &FeerStub{})
require.Equal(t, true, mp.TryAdd(tx.Hash(), item))
require.NoError(t, mp.TryAdd(tx.Hash(), item))
tx2 := newMinerTX()
inhash2 := random.Uint256()
tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0})
require.Equal(t, true, mp.Verify(tx2))
item = NewPoolItem(tx2, &FeerStub{})
require.Equal(t, true, mp.TryAdd(tx2.Hash(), item))
require.NoError(t, mp.TryAdd(tx2.Hash(), item))
tx3 := newMinerTX()
// Different index number, but the same PrevHash as in tx1.

View file

@ -126,6 +126,10 @@ func (chain testChain) IsLowPriority(*transaction.Transaction) bool {
panic("TODO")
}
func (chain testChain) PoolTx(*transaction.Transaction) error {
panic("TODO")
}
func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error {
panic("TODO")
}

View file

@ -13,7 +13,6 @@ import (
"github.com/CityOfZion/neo-go/pkg/consensus"
"github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/block"
"github.com/CityOfZion/neo-go/pkg/core/mempool"
"github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/network/payload"
"github.com/CityOfZion/neo-go/pkg/util"
@ -743,17 +742,18 @@ func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason {
if t.Type == transaction.MinerType {
return RelayInvalid
}
if s.chain.HasTransaction(t.Hash()) {
return RelayAlreadyExists
}
if err := s.chain.VerifyTx(t, nil); err != nil {
return RelayInvalid
}
// TODO: Implement Plugin.CheckPolicy?
//if (!Plugin.CheckPolicy(transaction))
// return RelayResultReason.PolicyFail;
if ok := s.chain.GetMemPool().TryAdd(t.Hash(), mempool.NewPoolItem(t, s.chain)); !ok {
return RelayOutOfMemory
if err := s.chain.PoolTx(t); err != nil {
switch err {
case core.ErrAlreadyExists:
return RelayAlreadyExists
case core.ErrOOM:
return RelayOutOfMemory
default:
return RelayInvalid
}
}
return RelaySucceed
}