Merge pull request #647 from nspcc-dev/fix-mempool-and-chain-locking

Fix mempool and chain locking

This allows us easily make 1000 Tx/s in 4-nodes privnet, fixes potential
double spends and improves mempool testing coverage.
This commit is contained in:
Roman Khimov 2020-02-06 18:43:29 +03:00 committed by GitHub
commit ab14a4619d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 431 additions and 243 deletions

View file

@ -335,7 +335,11 @@ func (s *service) processBlock(b block.Block) {
bb.Script = *(s.getBlockWitness(bb)) bb.Script = *(s.getBlockWitness(bb))
if err := s.Chain.AddBlock(bb); err != nil { if err := s.Chain.AddBlock(bb); err != nil {
s.log.Warn("error on add block", zap.Error(err)) // The block might already be added via the regular network
// interaction.
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
s.log.Warn("error on add block", zap.Error(err))
}
} else { } else {
s.Config.RelayBlock(bb) s.Config.RelayBlock(bb)
} }

View file

@ -5,7 +5,6 @@ import (
"github.com/CityOfZion/neo-go/config" "github.com/CityOfZion/neo-go/config"
"github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/mempool"
"github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/storage"
"github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/crypto/keys" "github.com/CityOfZion/neo-go/pkg/crypto/keys"
@ -22,8 +21,7 @@ func TestNewService(t *testing.T) {
Type: transaction.MinerType, Type: transaction.MinerType,
Data: &transaction.MinerTX{Nonce: 12345}, Data: &transaction.MinerTX{Nonce: 12345},
} }
item := mempool.NewPoolItem(tx, new(feer)) require.NoError(t, srv.Chain.PoolTx(tx))
srv.Chain.GetMemPool().TryAdd(tx.Hash(), item)
var txx []block.Transaction var txx []block.Transaction
require.NotPanics(t, func() { txx = srv.getVerifiedTx(1) }) require.NotPanics(t, func() { txx = srv.getVerifiedTx(1) })
@ -40,10 +38,7 @@ func TestService_GetVerified(t *testing.T) {
newMinerTx(3), newMinerTx(3),
newMinerTx(4), newMinerTx(4),
} }
pool := srv.Chain.GetMemPool() require.NoError(t, srv.Chain.PoolTx(txs[3]))
item := mempool.NewPoolItem(txs[3], new(feer))
require.True(t, pool.TryAdd(txs[3].Hash(), item))
hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()} hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()}
@ -68,8 +63,7 @@ func TestService_GetVerified(t *testing.T) {
t.Run("more than half of the last proposal will be reused", func(t *testing.T) { t.Run("more than half of the last proposal will be reused", func(t *testing.T) {
for _, tx := range txs[:2] { for _, tx := range txs[:2] {
item := mempool.NewPoolItem(tx, new(feer)) require.NoError(t, srv.Chain.PoolTx(tx))
require.True(t, pool.TryAdd(tx.Hash(), item))
} }
txx := srv.getVerifiedTx(10) txx := srv.getVerifiedTx(10)
@ -119,8 +113,7 @@ func TestService_getTx(t *testing.T) {
require.Equal(t, nil, srv.getTx(h)) require.Equal(t, nil, srv.getTx(h))
item := mempool.NewPoolItem(tx, new(feer)) require.NoError(t, srv.Chain.PoolTx(tx))
srv.Chain.GetMemPool().TryAdd(h, item)
got := srv.getTx(h) got := srv.getTx(h)
require.NotNil(t, got) require.NotNil(t, got)

View file

@ -7,6 +7,7 @@ import (
"math/big" "math/big"
"sort" "sort"
"strconv" "strconv"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -41,6 +42,15 @@ const (
defaultMemPoolSize = 50000 defaultMemPoolSize = 50000
) )
var (
// ErrAlreadyExists is returned when trying to add some already existing
// transaction into the pool (not specifying whether it exists in the
// chain or mempool).
ErrAlreadyExists = errors.New("already exists")
// ErrOOM is returned when adding transaction to the memory pool because
// it reached its full capacity.
ErrOOM = errors.New("no space left in the memory pool")
)
var ( var (
genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
decrementInterval = 2000000 decrementInterval = 2000000
@ -51,6 +61,19 @@ var (
type Blockchain struct { type Blockchain struct {
config config.ProtocolConfiguration config config.ProtocolConfiguration
// The only way chain state changes is by adding blocks, so we can't
// allow concurrent block additions. It differs from the next lock in
// that it's only for AddBlock method itself, the chain state is
// protected by the lock below, but holding it during all of AddBlock
// is too expensive (because the state only changes when persisting
// change cache).
addLock sync.Mutex
// This lock ensures blockchain immutability for operations that need
// that while performing their tasks. It's mostly used as a read lock
// with the only writer being the block addition logic.
lock sync.RWMutex
// Data access object for CRUD operations around storage. // Data access object for CRUD operations around storage.
dao *dao dao *dao
@ -251,6 +274,9 @@ func (bc *Blockchain) Close() {
// AddBlock accepts successive block for the Blockchain, verifies it and // AddBlock accepts successive block for the Blockchain, verifies it and
// stores internally. Eventually it will be persisted to the backing storage. // stores internally. Eventually it will be persisted to the backing storage.
func (bc *Blockchain) AddBlock(block *block.Block) error { func (bc *Blockchain) AddBlock(block *block.Block) error {
bc.addLock.Lock()
defer bc.addLock.Unlock()
expectedHeight := bc.BlockHeight() + 1 expectedHeight := bc.BlockHeight() + 1
if expectedHeight != block.Index { if expectedHeight != block.Index {
return fmt.Errorf("expected block %d, but passed block %d", expectedHeight, block.Index) return fmt.Errorf("expected block %d, but passed block %d", expectedHeight, block.Index)
@ -575,6 +601,9 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
} }
} }
} }
bc.lock.Lock()
defer bc.lock.Unlock()
_, err := cache.Persist() _, err := cache.Persist()
if err != nil { if err != nil {
return err return err
@ -582,9 +611,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error {
bc.topBlock.Store(block) bc.topBlock.Store(block)
atomic.StoreUint32(&bc.blockHeight, block.Index) atomic.StoreUint32(&bc.blockHeight, block.Index)
updateBlockHeightMetric(block.Index) updateBlockHeightMetric(block.Index)
for _, tx := range block.Transactions { bc.memPool.RemoveStale(bc.isTxStillRelevant)
bc.memPool.Remove(tx.Hash())
}
return nil return nil
} }
@ -963,8 +990,8 @@ func (bc *Blockchain) IsLowPriority(t *transaction.Transaction) bool {
} }
// GetMemPool returns the memory pool of the blockchain. // GetMemPool returns the memory pool of the blockchain.
func (bc *Blockchain) GetMemPool() mempool.Pool { func (bc *Blockchain) GetMemPool() *mempool.Pool {
return bc.memPool return &bc.memPool
} }
// VerifyBlock verifies block against its current state. // VerifyBlock verifies block against its current state.
@ -982,11 +1009,8 @@ func (bc *Blockchain) VerifyBlock(block *block.Block) error {
return bc.verifyBlockWitnesses(block, prevHeader) return bc.verifyBlockWitnesses(block, prevHeader)
} }
// VerifyTx verifies whether a transaction is bonafide or not. Block parameter // verifyTx verifies whether a transaction is bonafide or not.
// is used for easy interop access and can be omitted for transactions that are func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) error {
// not yet added into any block.
// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270).
func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error {
if io.GetVarSize(t) > transaction.MaxTransactionSize { if io.GetVarSize(t) > transaction.MaxTransactionSize {
return errors.Errorf("invalid transaction size = %d. It shoud be less then MaxTransactionSize = %d", io.GetVarSize(t), transaction.MaxTransactionSize) return errors.Errorf("invalid transaction size = %d. It shoud be less then MaxTransactionSize = %d", io.GetVarSize(t), transaction.MaxTransactionSize)
} }
@ -1017,6 +1041,69 @@ func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) e
return bc.verifyTxWitnesses(t, block) return bc.verifyTxWitnesses(t, block)
} }
// isTxStillRelevant is a callback for mempool transaction filtering after the
// new block addition. It returns false for transactions already present in the
// chain (added by the new block), transactions using some inputs that are
// already used (double spends) and does witness reverification for non-standard
// contracts. It operates under the assumption that full transaction verification
// was already done so we don't need to check basic things like size, input/output
// correctness, etc.
func (bc *Blockchain) isTxStillRelevant(t *transaction.Transaction) bool {
var recheckWitness bool
if bc.dao.HasTransaction(t.Hash()) {
return false
}
if bc.dao.IsDoubleSpend(t) {
return false
}
for i := range t.Scripts {
if !vm.IsStandardContract(t.Scripts[i].VerificationScript) {
recheckWitness = true
break
}
}
if recheckWitness {
return bc.verifyTxWitnesses(t, nil) == nil
}
return true
}
// VerifyTx verifies whether a transaction is bonafide or not. Block parameter
// is used for easy interop access and can be omitted for transactions that are
// not yet added into any block.
// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270).
func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error {
bc.lock.RLock()
defer bc.lock.RUnlock()
return bc.verifyTx(t, block)
}
// PoolTx verifies and tries to add given transaction into the mempool.
func (bc *Blockchain) PoolTx(t *transaction.Transaction) error {
bc.lock.RLock()
defer bc.lock.RUnlock()
if bc.HasTransaction(t.Hash()) {
return ErrAlreadyExists
}
if err := bc.verifyTx(t, nil); err != nil {
return err
}
if err := bc.memPool.Add(t, bc); err != nil {
switch err {
case mempool.ErrOOM:
return ErrOOM
case mempool.ErrConflict:
return ErrAlreadyExists
default:
return err
}
}
return nil
}
func (bc *Blockchain) verifyInputs(t *transaction.Transaction) bool { func (bc *Blockchain) verifyInputs(t *transaction.Transaction) bool {
for i := 1; i < len(t.Inputs); i++ { for i := 1; i < len(t.Inputs); i++ {
for j := 0; j < i; j++ { for j := 0; j < i; j++ {

View file

@ -40,6 +40,7 @@ type Blockchainer interface {
GetUnspentCoinState(util.Uint256) *UnspentCoinState GetUnspentCoinState(util.Uint256) *UnspentCoinState
References(t *transaction.Transaction) map[transaction.Input]*transaction.Output References(t *transaction.Transaction) map[transaction.Input]*transaction.Output
mempool.Feer // fee interface mempool.Feer // fee interface
PoolTx(*transaction.Transaction) error
VerifyTx(*transaction.Transaction, *block.Block) error VerifyTx(*transaction.Transaction, *block.Block) error
GetMemPool() mempool.Pool GetMemPool() *mempool.Pool
} }

View file

@ -1,6 +1,7 @@
package mempool package mempool
import ( import (
"errors"
"sort" "sort"
"sync" "sync"
"time" "time"
@ -9,43 +10,62 @@ import (
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
) )
// Item represents a transaction in the the Memory pool. var (
type Item struct { // ErrConflict is returned when transaction being added is incompatible
txn *transaction.Transaction // with the contents of the memory pool (using the same inputs as some
timeStamp time.Time // other transaction in the pool)
fee Feer ErrConflict = errors.New("conflicts with the memory pool")
// ErrDup is returned when transaction being added is already present
// in the memory pool.
ErrDup = errors.New("already in the memory pool")
// ErrOOM is returned when transaction just doesn't fit in the memory
// pool because of its capacity constraints.
ErrOOM = errors.New("out of memory")
)
// item represents a transaction in the the Memory pool.
type item struct {
txn *transaction.Transaction
timeStamp time.Time
perByteFee util.Fixed8
netFee util.Fixed8
isLowPrio bool
} }
// Items is a slice of Item. // items is a slice of item.
type Items []*Item type items []*item
// Pool stores the unconfirms transactions. // Pool stores the unconfirms transactions.
type Pool struct { type Pool struct {
lock *sync.RWMutex lock sync.RWMutex
unsortedTxn map[util.Uint256]*Item verifiedMap map[util.Uint256]*item
unverifiedTxn map[util.Uint256]*Item verifiedTxes items
sortedHighPrioTxn Items
sortedLowPrioTxn Items
unverifiedSortedHighPrioTxn Items
unverifiedSortedLowPrioTxn Items
capacity int capacity int
} }
func (p Items) Len() int { return len(p) } func (p items) Len() int { return len(p) }
func (p Items) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p items) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p Items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 } func (p items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 }
// CompareTo returns the difference between two Items. // CompareTo returns the difference between two items.
// difference < 0 implies p < otherP. // difference < 0 implies p < otherP.
// difference = 0 implies p = otherP. // difference = 0 implies p = otherP.
// difference > 0 implies p > otherP. // difference > 0 implies p > otherP.
func (p Item) CompareTo(otherP *Item) int { func (p *item) CompareTo(otherP *item) int {
if otherP == nil { if otherP == nil {
return 1 return 1
} }
if p.fee.IsLowPriority(p.txn) && p.fee.IsLowPriority(otherP.txn) { if !p.isLowPrio && otherP.isLowPrio {
return 1
}
if p.isLowPrio && !otherP.isLowPrio {
return -1
}
if p.isLowPrio && otherP.isLowPrio {
thisIsClaimTx := p.txn.Type == transaction.ClaimType thisIsClaimTx := p.txn.Type == transaction.ClaimType
otherIsClaimTx := otherP.txn.Type == transaction.ClaimType otherIsClaimTx := otherP.txn.Type == transaction.ClaimType
@ -60,15 +80,11 @@ func (p Item) CompareTo(otherP *Item) int {
} }
// Fees sorted ascending. // Fees sorted ascending.
pFPB := p.fee.FeePerByte(p.txn) if ret := p.perByteFee.CompareTo(otherP.perByteFee); ret != 0 {
otherFPB := p.fee.FeePerByte(otherP.txn)
if ret := pFPB.CompareTo(otherFPB); ret != 0 {
return ret return ret
} }
pNF := p.fee.NetworkFee(p.txn) if ret := p.netFee.CompareTo(otherP.netFee); ret != 0 {
otherNF := p.fee.NetworkFee(otherP.txn)
if ret := pNF.CompareTo(otherNF); ret != 0 {
return ret return ret
} }
@ -77,205 +93,158 @@ func (p Item) CompareTo(otherP *Item) int {
} }
// Count returns the total number of uncofirm transactions. // Count returns the total number of uncofirm transactions.
func (mp Pool) Count() int { func (mp *Pool) Count() int {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
return mp.count()
}
return len(mp.unsortedTxn) + len(mp.unverifiedTxn) // count is an internal unlocked version of Count.
func (mp *Pool) count() int {
return len(mp.verifiedTxes)
} }
// ContainsKey checks if a transactions hash is in the Pool. // ContainsKey checks if a transactions hash is in the Pool.
func (mp Pool) ContainsKey(hash util.Uint256) bool { func (mp *Pool) ContainsKey(hash util.Uint256) bool {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
if _, ok := mp.unsortedTxn[hash]; ok { return mp.containsKey(hash)
return true }
}
if _, ok := mp.unverifiedTxn[hash]; ok { // containsKey is an internal unlocked version of ContainsKey.
func (mp *Pool) containsKey(hash util.Uint256) bool {
if _, ok := mp.verifiedMap[hash]; ok {
return true return true
} }
return false return false
} }
// TryAdd try to add the Item to the Pool. // Add tries to add given transaction to the Pool.
func (mp Pool) TryAdd(hash util.Uint256, pItem *Item) bool { func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
var pool Items var pItem = &item{
txn: t,
timeStamp: time.Now().UTC(),
perByteFee: fee.FeePerByte(t),
netFee: fee.NetworkFee(t),
isLowPrio: fee.IsLowPriority(t),
}
mp.lock.Lock() mp.lock.Lock()
if _, ok := mp.unsortedTxn[hash]; ok { if !mp.verifyInputs(t) {
mp.lock.Unlock() mp.lock.Unlock()
return false return ErrConflict
}
if mp.containsKey(t.Hash()) {
mp.lock.Unlock()
return ErrDup
} }
mp.unsortedTxn[hash] = pItem
mp.lock.Unlock()
if pItem.fee.IsLowPriority(pItem.txn) { mp.verifiedMap[t.Hash()] = pItem
pool = mp.sortedLowPrioTxn // Insert into sorted array (from max to min, that could also be done
// using sort.Sort(sort.Reverse()), but it incurs more overhead. Notice
// also that we're searching for position that is strictly more
// prioritized than our new item because we do expect a lot of
// transactions with the same priority and appending to the end of the
// slice is always more efficient.
n := sort.Search(len(mp.verifiedTxes), func(n int) bool {
return pItem.CompareTo(mp.verifiedTxes[n]) > 0
})
// We've reached our capacity already.
if len(mp.verifiedTxes) == mp.capacity {
// Less prioritized than the least prioritized we already have, won't fit.
if n == len(mp.verifiedTxes) {
mp.lock.Unlock()
return ErrOOM
}
// Ditch the last one.
unlucky := mp.verifiedTxes[len(mp.verifiedTxes)-1]
delete(mp.verifiedMap, unlucky.txn.Hash())
mp.verifiedTxes[len(mp.verifiedTxes)-1] = pItem
} else { } else {
pool = mp.sortedHighPrioTxn mp.verifiedTxes = append(mp.verifiedTxes, pItem)
} }
if n != len(mp.verifiedTxes)-1 {
mp.lock.Lock() copy(mp.verifiedTxes[n+1:], mp.verifiedTxes[n:])
pool = append(pool, pItem) mp.verifiedTxes[n] = pItem
sort.Sort(pool) }
updateMempoolMetrics(len(mp.verifiedTxes))
mp.lock.Unlock() mp.lock.Unlock()
return nil
if mp.Count() > mp.capacity {
(&mp).RemoveOverCapacity()
}
mp.lock.RLock()
_, ok := mp.unsortedTxn[hash]
updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn))
mp.lock.RUnlock()
return ok
} }
// Remove removes an item from the mempool, if it exists there (and does // Remove removes an item from the mempool, if it exists there (and does
// nothing if it doesn't). // nothing if it doesn't).
func (mp *Pool) Remove(hash util.Uint256) { func (mp *Pool) Remove(hash util.Uint256) {
var mapAndPools = []struct {
unsortedMap map[util.Uint256]*Item
sortedPools []*Items
}{
{unsortedMap: mp.unsortedTxn, sortedPools: []*Items{&mp.sortedHighPrioTxn, &mp.sortedLowPrioTxn}},
{unsortedMap: mp.unverifiedTxn, sortedPools: []*Items{&mp.unverifiedSortedHighPrioTxn, &mp.unverifiedSortedLowPrioTxn}},
}
mp.lock.Lock() mp.lock.Lock()
for _, mapAndPool := range mapAndPools { if _, ok := mp.verifiedMap[hash]; ok {
if _, ok := mapAndPool.unsortedMap[hash]; ok { var num int
delete(mapAndPool.unsortedMap, hash) delete(mp.verifiedMap, hash)
for _, pool := range mapAndPool.sortedPools { for num := range mp.verifiedTxes {
var num int if hash.Equals(mp.verifiedTxes[num].txn.Hash()) {
var item *Item break
for num, item = range *pool {
if hash.Equals(item.txn.Hash()) {
break
}
}
if num < len(*pool)-1 {
*pool = append((*pool)[:num], (*pool)[num+1:]...)
} else if num == len(*pool)-1 {
*pool = (*pool)[:num]
}
} }
} }
if num < len(mp.verifiedTxes)-1 {
mp.verifiedTxes = append(mp.verifiedTxes[:num], mp.verifiedTxes[num+1:]...)
} else if num == len(mp.verifiedTxes)-1 {
mp.verifiedTxes = mp.verifiedTxes[:num]
}
} }
updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) updateMempoolMetrics(len(mp.verifiedTxes))
mp.lock.Unlock() mp.lock.Unlock()
} }
// RemoveOverCapacity removes transactions with lowest fees until the total number of transactions // RemoveStale filters verified transactions through the given function keeping
// in the Pool is within the capacity of the Pool. // only the transactions for which it returns a true result. It's used to quickly
func (mp *Pool) RemoveOverCapacity() { // drop part of the mempool that is now invalid after the block acceptance.
for mp.Count()-mp.capacity > 0 { func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) {
mp.lock.Lock() mp.lock.Lock()
if minItem, argPosition := getLowestFeeTransaction(mp.sortedLowPrioTxn, mp.unverifiedSortedLowPrioTxn); minItem != nil { // We expect a lot of changes, so it's easier to allocate a new slice
if argPosition == 1 { // rather than move things in an old one.
// minItem belongs to the mp.sortedLowPrioTxn slice. newVerifiedTxes := make([]*item, 0, mp.capacity)
// The corresponding unsorted pool is is mp.unsortedTxn. for _, itm := range mp.verifiedTxes {
delete(mp.unsortedTxn, minItem.txn.Hash()) if isOK(itm.txn) {
mp.sortedLowPrioTxn = append(mp.sortedLowPrioTxn[:0], mp.sortedLowPrioTxn[1:]...) newVerifiedTxes = append(newVerifiedTxes, itm)
} else { } else {
// minItem belongs to the mp.unverifiedSortedLowPrioTxn slice. delete(mp.verifiedMap, itm.txn.Hash())
// The corresponding unsorted pool is is mp.unverifiedTxn.
delete(mp.unverifiedTxn, minItem.txn.Hash())
mp.unverifiedSortedLowPrioTxn = append(mp.unverifiedSortedLowPrioTxn[:0], mp.unverifiedSortedLowPrioTxn[1:]...)
}
} else if minItem, argPosition := getLowestFeeTransaction(mp.sortedHighPrioTxn, mp.unverifiedSortedHighPrioTxn); minItem != nil {
if argPosition == 1 {
// minItem belongs to the mp.sortedHighPrioTxn slice.
// The corresponding unsorted pool is is mp.unsortedTxn.
delete(mp.unsortedTxn, minItem.txn.Hash())
mp.sortedHighPrioTxn = append(mp.sortedHighPrioTxn[:0], mp.sortedHighPrioTxn[1:]...)
} else {
// minItem belongs to the mp.unverifiedSortedHighPrioTxn slice.
// The corresponding unsorted pool is is mp.unverifiedTxn.
delete(mp.unverifiedTxn, minItem.txn.Hash())
mp.unverifiedSortedHighPrioTxn = append(mp.unverifiedSortedHighPrioTxn[:0], mp.unverifiedSortedHighPrioTxn[1:]...)
}
} }
updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn))
mp.lock.Unlock()
}
}
// NewPoolItem returns a new Item.
func NewPoolItem(t *transaction.Transaction, fee Feer) *Item {
return &Item{
txn: t,
timeStamp: time.Now().UTC(),
fee: fee,
} }
mp.verifiedTxes = newVerifiedTxes
mp.lock.Unlock()
} }
// NewMemPool returns a new Pool struct. // NewMemPool returns a new Pool struct.
func NewMemPool(capacity int) Pool { func NewMemPool(capacity int) Pool {
return Pool{ return Pool{
lock: new(sync.RWMutex), verifiedMap: make(map[util.Uint256]*item),
unsortedTxn: make(map[util.Uint256]*Item), verifiedTxes: make([]*item, 0, capacity),
unverifiedTxn: make(map[util.Uint256]*Item), capacity: capacity,
capacity: capacity,
} }
} }
// TryGetValue returns a transaction if it exists in the memory pool. // TryGetValue returns a transaction if it exists in the memory pool.
func (mp Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) { func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
if pItem, ok := mp.unsortedTxn[hash]; ok { if pItem, ok := mp.verifiedMap[hash]; ok {
return pItem.txn, ok
}
if pItem, ok := mp.unverifiedTxn[hash]; ok {
return pItem.txn, ok return pItem.txn, ok
} }
return nil, false return nil, false
} }
// getLowestFeeTransaction returns the Item with the lowest fee amongst the "verifiedTxnSorted"
// and "unverifiedTxnSorted" Items along with a integer. The integer can assume two values, 1 and 2 which indicate
// that the Item with the lowest fee was found in "verifiedTxnSorted" respectively in "unverifiedTxnSorted".
// "verifiedTxnSorted" and "unverifiedTxnSorted" are sorted slice order by transaction fee ascending. This means that
// the transaction with lowest fee start at index 0.
// Reference: GetLowestFeeTransaction method in C# (https://github.com/neo-project/neo/blob/master/neo/Ledger/MemoryPool.cs)
func getLowestFeeTransaction(verifiedTxnSorted Items, unverifiedTxnSorted Items) (*Item, int) {
minItem := min(unverifiedTxnSorted)
verifiedMin := min(verifiedTxnSorted)
if verifiedMin == nil || (minItem != nil && verifiedMin.CompareTo(minItem) >= 0) {
return minItem, 2
}
minItem = verifiedMin
return minItem, 1
}
// min returns the minimum item in a ascending sorted slice of pool items.
// The function can't be applied to unsorted slice!
func min(sortedPool Items) *Item {
if len(sortedPool) == 0 {
return nil
}
return sortedPool[0]
}
// GetVerifiedTransactions returns a slice of Input from all the transactions in the memory pool // GetVerifiedTransactions returns a slice of Input from all the transactions in the memory pool
// whose hash is not included in excludedHashes. // whose hash is not included in excludedHashes.
func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
var t = make([]*transaction.Transaction, len(mp.unsortedTxn)) var t = make([]*transaction.Transaction, len(mp.verifiedTxes))
var i int var i int
for _, p := range mp.unsortedTxn { for _, p := range mp.verifiedTxes {
t[i] = p.txn t[i] = p.txn
i++ i++
} }
@ -283,16 +252,16 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction {
return t return t
} }
// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool. // verifyInputs is an internal unprotected version of Verify.
// If yes, the transaction tx is not a valid transaction and the function return false. func (mp *Pool) verifyInputs(tx *transaction.Transaction) bool {
// If no, the transaction tx is a valid transaction and the function return true. if len(tx.Inputs) == 0 {
func (mp Pool) Verify(tx *transaction.Transaction) bool { return true
mp.lock.RLock() }
defer mp.lock.RUnlock() for num := range mp.verifiedTxes {
for _, item := range mp.unsortedTxn { txn := mp.verifiedTxes[num].txn
for i := range item.txn.Inputs { for i := range txn.Inputs {
for j := 0; j < len(tx.Inputs); j++ { for j := 0; j < len(tx.Inputs); j++ {
if item.txn.Inputs[i] == tx.Inputs[j] { if txn.Inputs[i] == tx.Inputs[j] {
return false return false
} }
} }
@ -301,3 +270,12 @@ func (mp Pool) Verify(tx *transaction.Transaction) bool {
return true return true
} }
// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool.
// If yes, the transaction tx is not a valid transaction and the function return false.
// If no, the transaction tx is a valid transaction and the function return true.
func (mp *Pool) Verify(tx *transaction.Transaction) bool {
mp.lock.RLock()
defer mp.lock.RUnlock()
return mp.verifyInputs(tx)
}

View file

@ -1,6 +1,7 @@
package mempool package mempool
import ( import (
"sort"
"testing" "testing"
"github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/core/transaction"
@ -35,13 +36,12 @@ func (fs *FeerStub) SystemFee(*transaction.Transaction) util.Fixed8 {
func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
mp := NewMemPool(10) mp := NewMemPool(10)
tx := newMinerTX() tx := newMinerTX(0)
item := NewPoolItem(tx, fs)
_, ok := mp.TryGetValue(tx.Hash()) _, ok := mp.TryGetValue(tx.Hash())
require.Equal(t, false, ok) require.Equal(t, false, ok)
require.Equal(t, true, mp.TryAdd(tx.Hash(), item)) require.NoError(t, mp.Add(tx, fs))
// Re-adding should fail. // Re-adding should fail.
require.Equal(t, false, mp.TryAdd(tx.Hash(), item)) require.Error(t, mp.Add(tx, fs))
tx2, ok := mp.TryGetValue(tx.Hash()) tx2, ok := mp.TryGetValue(tx.Hash())
require.Equal(t, true, ok) require.Equal(t, true, ok)
require.Equal(t, tx, tx2) require.Equal(t, tx, tx2)
@ -49,12 +49,8 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
_, ok = mp.TryGetValue(tx.Hash()) _, ok = mp.TryGetValue(tx.Hash())
require.Equal(t, false, ok) require.Equal(t, false, ok)
// Make sure nothing left in the mempool after removal. // Make sure nothing left in the mempool after removal.
assert.Equal(t, 0, len(mp.unsortedTxn)) assert.Equal(t, 0, len(mp.verifiedMap))
assert.Equal(t, 0, len(mp.unverifiedTxn)) assert.Equal(t, 0, len(mp.verifiedTxes))
assert.Equal(t, 0, len(mp.sortedHighPrioTxn))
assert.Equal(t, 0, len(mp.sortedLowPrioTxn))
assert.Equal(t, 0, len(mp.unverifiedSortedHighPrioTxn))
assert.Equal(t, 0, len(mp.unverifiedSortedLowPrioTxn))
} }
func TestMemPoolAddRemove(t *testing.T) { func TestMemPoolAddRemove(t *testing.T) {
@ -66,32 +62,156 @@ func TestMemPoolAddRemove(t *testing.T) {
func TestMemPoolVerify(t *testing.T) { func TestMemPoolVerify(t *testing.T) {
mp := NewMemPool(10) mp := NewMemPool(10)
tx := newMinerTX() tx := newMinerTX(1)
inhash1 := random.Uint256() inhash1 := random.Uint256()
tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0}) tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0})
require.Equal(t, true, mp.Verify(tx)) require.Equal(t, true, mp.Verify(tx))
item := NewPoolItem(tx, &FeerStub{}) require.NoError(t, mp.Add(tx, &FeerStub{}))
require.Equal(t, true, mp.TryAdd(tx.Hash(), item))
tx2 := newMinerTX() tx2 := newMinerTX(2)
inhash2 := random.Uint256() inhash2 := random.Uint256()
tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0})
require.Equal(t, true, mp.Verify(tx2)) require.Equal(t, true, mp.Verify(tx2))
item = NewPoolItem(tx2, &FeerStub{}) require.NoError(t, mp.Add(tx2, &FeerStub{}))
require.Equal(t, true, mp.TryAdd(tx2.Hash(), item))
tx3 := newMinerTX() tx3 := newMinerTX(3)
// Different index number, but the same PrevHash as in tx1. // Different index number, but the same PrevHash as in tx1.
tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 1}) tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 1})
require.Equal(t, true, mp.Verify(tx3)) require.Equal(t, true, mp.Verify(tx3))
// The same input as in tx2. // The same input as in tx2.
tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0})
require.Equal(t, false, mp.Verify(tx3)) require.Equal(t, false, mp.Verify(tx3))
require.Error(t, mp.Add(tx3, &FeerStub{}))
} }
func newMinerTX() *transaction.Transaction { func newMinerTX(i uint32) *transaction.Transaction {
return &transaction.Transaction{ return &transaction.Transaction{
Type: transaction.MinerType, Type: transaction.MinerType,
Data: &transaction.MinerTX{}, Data: &transaction.MinerTX{Nonce: i},
}
}
func TestOverCapacity(t *testing.T) {
var fs = &FeerStub{lowPriority: true}
const mempoolSize = 10
mp := NewMemPool(mempoolSize)
for i := 0; i < mempoolSize; i++ {
tx := newMinerTX(uint32(i))
require.NoError(t, mp.Add(tx, fs))
}
txcnt := uint32(mempoolSize)
require.Equal(t, mempoolSize, mp.Count())
require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes)))
// Claim TX has more priority than ordinary lowprio, so it should easily
// fit into the pool.
claim := &transaction.Transaction{
Type: transaction.ClaimType,
Data: &transaction.ClaimTX{},
}
require.NoError(t, mp.Add(claim, fs))
require.Equal(t, mempoolSize, mp.Count())
require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes)))
// Fees are also prioritized.
fs.netFee = util.Fixed8FromFloat(0.0001)
for i := 0; i < mempoolSize-1; i++ {
tx := newMinerTX(txcnt)
txcnt++
require.NoError(t, mp.Add(tx, fs))
require.Equal(t, mempoolSize, mp.Count())
require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes)))
}
// Less prioritized txes are not allowed anymore.
fs.netFee = util.Fixed8FromFloat(0.00001)
tx := newMinerTX(txcnt)
txcnt++
require.Error(t, mp.Add(tx, fs))
require.Equal(t, mempoolSize, mp.Count())
require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes)))
// But claim tx should still be there.
require.True(t, mp.ContainsKey(claim.Hash()))
// Low net fee, but higher per-byte fee is still a better combination.
fs.perByteFee = util.Fixed8FromFloat(0.001)
tx = newMinerTX(txcnt)
txcnt++
require.NoError(t, mp.Add(tx, fs))
require.Equal(t, mempoolSize, mp.Count())
require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes)))
// High priority always wins over low priority.
fs.lowPriority = false
for i := 0; i < mempoolSize; i++ {
tx := newMinerTX(txcnt)
txcnt++
require.NoError(t, mp.Add(tx, fs))
require.Equal(t, mempoolSize, mp.Count())
require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes)))
}
// Good luck with low priority now.
fs.lowPriority = true
tx = newMinerTX(txcnt)
require.Error(t, mp.Add(tx, fs))
require.Equal(t, mempoolSize, mp.Count())
require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes)))
}
func TestGetVerified(t *testing.T) {
var fs = &FeerStub{lowPriority: true}
const mempoolSize = 10
mp := NewMemPool(mempoolSize)
txes := make([]*transaction.Transaction, 0, mempoolSize)
for i := 0; i < mempoolSize; i++ {
tx := newMinerTX(uint32(i))
txes = append(txes, tx)
require.NoError(t, mp.Add(tx, fs))
}
require.Equal(t, mempoolSize, mp.Count())
verTxes := mp.GetVerifiedTransactions()
require.Equal(t, mempoolSize, len(verTxes))
for _, tx := range verTxes {
require.Contains(t, txes, tx)
}
for _, tx := range txes {
mp.Remove(tx.Hash())
}
verTxes = mp.GetVerifiedTransactions()
require.Equal(t, 0, len(verTxes))
}
func TestRemoveStale(t *testing.T) {
var fs = &FeerStub{lowPriority: true}
const mempoolSize = 10
mp := NewMemPool(mempoolSize)
txes1 := make([]*transaction.Transaction, 0, mempoolSize/2)
txes2 := make([]*transaction.Transaction, 0, mempoolSize/2)
for i := 0; i < mempoolSize; i++ {
tx := newMinerTX(uint32(i))
if i%2 == 0 {
txes1 = append(txes1, tx)
} else {
txes2 = append(txes2, tx)
}
require.NoError(t, mp.Add(tx, fs))
}
require.Equal(t, mempoolSize, mp.Count())
mp.RemoveStale(func(t *transaction.Transaction) bool {
for _, tx := range txes2 {
if tx == t {
return true
}
}
return false
})
require.Equal(t, mempoolSize/2, mp.Count())
verTxes := mp.GetVerifiedTransactions()
for _, tx := range verTxes {
require.NotContains(t, txes1, tx)
require.Contains(t, txes2, tx)
} }
} }

View file

@ -11,24 +11,14 @@ var (
Namespace: "neogo", Namespace: "neogo",
}, },
) )
//mempoolUnverifiedTx prometheus metric.
mempoolUnverifiedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Help: "Mempool Unverified TXs",
Name: "mempool_unverified_tx",
Namespace: "neogo",
},
)
) )
func init() { func init() {
prometheus.MustRegister( prometheus.MustRegister(
mempoolUnsortedTx, mempoolUnsortedTx,
mempoolUnverifiedTx,
) )
} }
func updateMempoolMetrics(unsortedTxnLen int, unverifiedTxnLen int) { func updateMempoolMetrics(unsortedTxnLen int) {
mempoolUnsortedTx.Set(float64(unsortedTxnLen)) mempoolUnsortedTx.Set(float64(unsortedTxnLen))
mempoolUnverifiedTx.Set(float64(unverifiedTxnLen))
} }

View file

@ -12,9 +12,10 @@ type blockQueue struct {
queue *queue.PriorityQueue queue *queue.PriorityQueue
checkBlocks chan struct{} checkBlocks chan struct{}
chain core.Blockchainer chain core.Blockchainer
relayF func(*block.Block)
} }
func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQueue { func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger, relayer func(*block.Block)) *blockQueue {
if log == nil { if log == nil {
return nil return nil
} }
@ -24,6 +25,7 @@ func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQu
queue: queue.NewPriorityQueue(capacity, false), queue: queue.NewPriorityQueue(capacity, false),
checkBlocks: make(chan struct{}, 1), checkBlocks: make(chan struct{}, 1),
chain: bc, chain: bc,
relayF: relayer,
} }
} }
@ -45,10 +47,15 @@ func (bq *blockQueue) run() {
if minblock.Index == bq.chain.BlockHeight()+1 { if minblock.Index == bq.chain.BlockHeight()+1 {
err := bq.chain.AddBlock(minblock) err := bq.chain.AddBlock(minblock)
if err != nil { if err != nil {
bq.log.Warn("blockQueue: failed adding block into the blockchain", // The block might already be added by consensus.
zap.String("error", err.Error()), if _, errget := bq.chain.GetBlock(minblock.Hash()); errget != nil {
zap.Uint32("blockHeight", bq.chain.BlockHeight()), bq.log.Warn("blockQueue: failed adding block into the blockchain",
zap.Uint32("nextIndex", minblock.Index)) zap.String("error", err.Error()),
zap.Uint32("blockHeight", bq.chain.BlockHeight()),
zap.Uint32("nextIndex", minblock.Index))
}
} else if bq.relayF != nil {
bq.relayF(minblock)
} }
} }
} else { } else {

View file

@ -12,7 +12,7 @@ import (
func TestBlockQueue(t *testing.T) { func TestBlockQueue(t *testing.T) {
chain := &testChain{} chain := &testChain{}
// notice, it's not yet running // notice, it's not yet running
bq := newBlockQueue(0, chain, zaptest.NewLogger(t)) bq := newBlockQueue(0, chain, zaptest.NewLogger(t), nil)
blocks := make([]*block.Block, 11) blocks := make([]*block.Block, 11)
for i := 1; i < 11; i++ { for i := 1; i < 11; i++ {
blocks[i] = &block.Block{Base: block.Base{Index: uint32(i)}} blocks[i] = &block.Block{Base: block.Base{Index: uint32(i)}}

View file

@ -118,7 +118,7 @@ func (chain testChain) GetUnspentCoinState(util.Uint256) *core.UnspentCoinState
panic("TODO") panic("TODO")
} }
func (chain testChain) GetMemPool() mempool.Pool { func (chain testChain) GetMemPool() *mempool.Pool {
panic("TODO") panic("TODO")
} }
@ -126,6 +126,10 @@ func (chain testChain) IsLowPriority(*transaction.Transaction) bool {
panic("TODO") panic("TODO")
} }
func (chain testChain) PoolTx(*transaction.Transaction) error {
panic("TODO")
}
func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error { func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error {
panic("TODO") panic("TODO")
} }

View file

@ -13,7 +13,6 @@ import (
"github.com/CityOfZion/neo-go/pkg/consensus" "github.com/CityOfZion/neo-go/pkg/consensus"
"github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/block" "github.com/CityOfZion/neo-go/pkg/core/block"
"github.com/CityOfZion/neo-go/pkg/core/mempool"
"github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/network/payload"
"github.com/CityOfZion/neo-go/pkg/util" "github.com/CityOfZion/neo-go/pkg/util"
@ -91,7 +90,6 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
s := &Server{ s := &Server{
ServerConfig: config, ServerConfig: config,
chain: chain, chain: chain,
bQueue: newBlockQueue(maxBlockBatch, chain, log),
id: randomID(), id: randomID(),
quit: make(chan struct{}), quit: make(chan struct{}),
register: make(chan Peer), register: make(chan Peer),
@ -100,6 +98,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
connected: atomic.NewBool(false), connected: atomic.NewBool(false),
log: log, log: log,
} }
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, s.relayBlock)
srv, err := consensus.NewService(consensus.Config{ srv, err := consensus.NewService(consensus.Config{
Logger: log, Logger: log,
@ -735,7 +734,11 @@ func (s *Server) broadcastHPMessage(msg *Message) {
// relayBlock tells all the other connected nodes about the given block. // relayBlock tells all the other connected nodes about the given block.
func (s *Server) relayBlock(b *block.Block) { func (s *Server) relayBlock(b *block.Block) {
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()}))
s.broadcastMessage(msg) // Filter out nodes that are more current (avoid spamming the network
// during initial sync).
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool {
return p.Handshaked() && p.LastBlockIndex() < b.Index
})
} }
// verifyAndPoolTX verifies the TX and adds it to the local mempool. // verifyAndPoolTX verifies the TX and adds it to the local mempool.
@ -743,17 +746,18 @@ func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason {
if t.Type == transaction.MinerType { if t.Type == transaction.MinerType {
return RelayInvalid return RelayInvalid
} }
if s.chain.HasTransaction(t.Hash()) {
return RelayAlreadyExists
}
if err := s.chain.VerifyTx(t, nil); err != nil {
return RelayInvalid
}
// TODO: Implement Plugin.CheckPolicy? // TODO: Implement Plugin.CheckPolicy?
//if (!Plugin.CheckPolicy(transaction)) //if (!Plugin.CheckPolicy(transaction))
// return RelayResultReason.PolicyFail; // return RelayResultReason.PolicyFail;
if ok := s.chain.GetMemPool().TryAdd(t.Hash(), mempool.NewPoolItem(t, s.chain)); !ok { if err := s.chain.PoolTx(t); err != nil {
return RelayOutOfMemory switch err {
case core.ErrAlreadyExists:
return RelayAlreadyExists
case core.ErrOOM:
return RelayOutOfMemory
default:
return RelayInvalid
}
} }
return RelaySucceed return RelaySucceed
} }