mempool: make item an internal thing of mempool package

Nobody outside should care about these details, mempool operates on
transactions and that's it.
This commit is contained in:
Roman Khimov 2020-02-05 14:24:36 +03:00
parent f0e3a31bc8
commit a928ad9cfa
4 changed files with 47 additions and 59 deletions

View file

@ -5,7 +5,6 @@ import (
"github.com/CityOfZion/neo-go/config" "github.com/CityOfZion/neo-go/config"
"github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core"
"github.com/CityOfZion/neo-go/pkg/core/mempool"
"github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/storage"
"github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/core/transaction"
"github.com/CityOfZion/neo-go/pkg/crypto/keys" "github.com/CityOfZion/neo-go/pkg/crypto/keys"
@ -22,8 +21,7 @@ func TestNewService(t *testing.T) {
Type: transaction.MinerType, Type: transaction.MinerType,
Data: &transaction.MinerTX{Nonce: 12345}, Data: &transaction.MinerTX{Nonce: 12345},
} }
item := mempool.NewPoolItem(tx, new(feer)) srv.Chain.GetMemPool().Add(tx, new(feer))
srv.Chain.GetMemPool().TryAdd(tx.Hash(), item)
var txx []block.Transaction var txx []block.Transaction
require.NotPanics(t, func() { txx = srv.getVerifiedTx(1) }) require.NotPanics(t, func() { txx = srv.getVerifiedTx(1) })
@ -41,9 +39,8 @@ func TestService_GetVerified(t *testing.T) {
newMinerTx(4), newMinerTx(4),
} }
pool := srv.Chain.GetMemPool() pool := srv.Chain.GetMemPool()
item := mempool.NewPoolItem(txs[3], new(feer))
require.NoError(t, pool.TryAdd(txs[3].Hash(), item)) require.NoError(t, pool.Add(txs[3], new(feer)))
hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()} hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()}
@ -68,8 +65,7 @@ func TestService_GetVerified(t *testing.T) {
t.Run("more than half of the last proposal will be reused", func(t *testing.T) { t.Run("more than half of the last proposal will be reused", func(t *testing.T) {
for _, tx := range txs[:2] { for _, tx := range txs[:2] {
item := mempool.NewPoolItem(tx, new(feer)) require.NoError(t, pool.Add(tx, new(feer)))
require.NoError(t, pool.TryAdd(tx.Hash(), item))
} }
txx := srv.getVerifiedTx(10) txx := srv.getVerifiedTx(10)
@ -119,8 +115,7 @@ func TestService_getTx(t *testing.T) {
require.Equal(t, nil, srv.getTx(h)) require.Equal(t, nil, srv.getTx(h))
item := mempool.NewPoolItem(tx, new(feer)) srv.Chain.GetMemPool().Add(tx, new(feer))
srv.Chain.GetMemPool().TryAdd(h, item)
got := srv.getTx(h) got := srv.getTx(h)
require.NotNil(t, got) require.NotNil(t, got)

View file

@ -1064,7 +1064,7 @@ func (bc *Blockchain) PoolTx(t *transaction.Transaction) error {
if err := bc.verifyTx(t, nil); err != nil { if err := bc.verifyTx(t, nil); err != nil {
return err return err
} }
if err := bc.memPool.TryAdd(t.Hash(), mempool.NewPoolItem(t, bc)); err != nil { if err := bc.memPool.Add(t, bc); err != nil {
switch err { switch err {
case mempool.ErrOOM: case mempool.ErrOOM:
return ErrOOM return ErrOOM

View file

@ -23,38 +23,38 @@ var (
ErrOOM = errors.New("out of memory") ErrOOM = errors.New("out of memory")
) )
// Item represents a transaction in the the Memory pool. // item represents a transaction in the the Memory pool.
type Item struct { type item struct {
txn *transaction.Transaction txn *transaction.Transaction
timeStamp time.Time timeStamp time.Time
fee Feer fee Feer
} }
// Items is a slice of Item. // items is a slice of item.
type Items []*Item type items []*item
// Pool stores the unconfirms transactions. // Pool stores the unconfirms transactions.
type Pool struct { type Pool struct {
lock *sync.RWMutex lock *sync.RWMutex
unsortedTxn map[util.Uint256]*Item unsortedTxn map[util.Uint256]*item
unverifiedTxn map[util.Uint256]*Item unverifiedTxn map[util.Uint256]*item
sortedHighPrioTxn Items sortedHighPrioTxn items
sortedLowPrioTxn Items sortedLowPrioTxn items
unverifiedSortedHighPrioTxn Items unverifiedSortedHighPrioTxn items
unverifiedSortedLowPrioTxn Items unverifiedSortedLowPrioTxn items
capacity int capacity int
} }
func (p Items) Len() int { return len(p) } func (p items) Len() int { return len(p) }
func (p Items) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p items) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p Items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 } func (p items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 }
// CompareTo returns the difference between two Items. // CompareTo returns the difference between two items.
// difference < 0 implies p < otherP. // difference < 0 implies p < otherP.
// difference = 0 implies p = otherP. // difference = 0 implies p = otherP.
// difference > 0 implies p > otherP. // difference > 0 implies p > otherP.
func (p Item) CompareTo(otherP *Item) int { func (p item) CompareTo(otherP *item) int {
if otherP == nil { if otherP == nil {
return 1 return 1
} }
@ -114,9 +114,14 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool {
return false return false
} }
// TryAdd try to add the Item to the Pool. // Add tries to add given transaction to the Pool.
func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error { func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
var pool *Items var pool *items
var pItem = &item{
txn: t,
timeStamp: time.Now().UTC(),
fee: fee,
}
if pItem.fee.IsLowPriority(pItem.txn) { if pItem.fee.IsLowPriority(pItem.txn) {
pool = &mp.sortedLowPrioTxn pool = &mp.sortedLowPrioTxn
@ -129,11 +134,11 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error {
mp.lock.Unlock() mp.lock.Unlock()
return ErrConflict return ErrConflict
} }
if _, ok := mp.unsortedTxn[hash]; ok { if _, ok := mp.unsortedTxn[t.Hash()]; ok {
mp.lock.Unlock() mp.lock.Unlock()
return ErrDup return ErrDup
} }
mp.unsortedTxn[hash] = pItem mp.unsortedTxn[t.Hash()] = pItem
*pool = append(*pool, pItem) *pool = append(*pool, pItem)
sort.Sort(pool) sort.Sort(pool)
@ -143,7 +148,7 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error {
mp.RemoveOverCapacity() mp.RemoveOverCapacity()
} }
mp.lock.RLock() mp.lock.RLock()
_, ok := mp.unsortedTxn[hash] _, ok := mp.unsortedTxn[t.Hash()]
updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn))
mp.lock.RUnlock() mp.lock.RUnlock()
if !ok { if !ok {
@ -156,11 +161,11 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error {
// nothing if it doesn't). // nothing if it doesn't).
func (mp *Pool) Remove(hash util.Uint256) { func (mp *Pool) Remove(hash util.Uint256) {
var mapAndPools = []struct { var mapAndPools = []struct {
unsortedMap map[util.Uint256]*Item unsortedMap map[util.Uint256]*item
sortedPools []*Items sortedPools []*items
}{ }{
{unsortedMap: mp.unsortedTxn, sortedPools: []*Items{&mp.sortedHighPrioTxn, &mp.sortedLowPrioTxn}}, {unsortedMap: mp.unsortedTxn, sortedPools: []*items{&mp.sortedHighPrioTxn, &mp.sortedLowPrioTxn}},
{unsortedMap: mp.unverifiedTxn, sortedPools: []*Items{&mp.unverifiedSortedHighPrioTxn, &mp.unverifiedSortedLowPrioTxn}}, {unsortedMap: mp.unverifiedTxn, sortedPools: []*items{&mp.unverifiedSortedHighPrioTxn, &mp.unverifiedSortedLowPrioTxn}},
} }
mp.lock.Lock() mp.lock.Lock()
for _, mapAndPool := range mapAndPools { for _, mapAndPool := range mapAndPools {
@ -168,7 +173,7 @@ func (mp *Pool) Remove(hash util.Uint256) {
delete(mapAndPool.unsortedMap, hash) delete(mapAndPool.unsortedMap, hash)
for _, pool := range mapAndPool.sortedPools { for _, pool := range mapAndPool.sortedPools {
var num int var num int
var item *Item var item *item
for num, item = range *pool { for num, item = range *pool {
if hash.Equals(item.txn.Hash()) { if hash.Equals(item.txn.Hash()) {
break break
@ -224,21 +229,12 @@ func (mp *Pool) RemoveOverCapacity() {
} }
// NewPoolItem returns a new Item.
func NewPoolItem(t *transaction.Transaction, fee Feer) *Item {
return &Item{
txn: t,
timeStamp: time.Now().UTC(),
fee: fee,
}
}
// NewMemPool returns a new Pool struct. // NewMemPool returns a new Pool struct.
func NewMemPool(capacity int) Pool { func NewMemPool(capacity int) Pool {
return Pool{ return Pool{
lock: new(sync.RWMutex), lock: new(sync.RWMutex),
unsortedTxn: make(map[util.Uint256]*Item), unsortedTxn: make(map[util.Uint256]*item),
unverifiedTxn: make(map[util.Uint256]*Item), unverifiedTxn: make(map[util.Uint256]*item),
capacity: capacity, capacity: capacity,
} }
} }
@ -258,13 +254,13 @@ func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool)
return nil, false return nil, false
} }
// getLowestFeeTransaction returns the Item with the lowest fee amongst the "verifiedTxnSorted" // getLowestFeeTransaction returns the item with the lowest fee amongst the "verifiedTxnSorted"
// and "unverifiedTxnSorted" Items along with a integer. The integer can assume two values, 1 and 2 which indicate // and "unverifiedTxnSorted" items along with a integer. The integer can assume two values, 1 and 2 which indicate
// that the Item with the lowest fee was found in "verifiedTxnSorted" respectively in "unverifiedTxnSorted". // that the item with the lowest fee was found in "verifiedTxnSorted" respectively in "unverifiedTxnSorted".
// "verifiedTxnSorted" and "unverifiedTxnSorted" are sorted slice order by transaction fee ascending. This means that // "verifiedTxnSorted" and "unverifiedTxnSorted" are sorted slice order by transaction fee ascending. This means that
// the transaction with lowest fee start at index 0. // the transaction with lowest fee start at index 0.
// Reference: GetLowestFeeTransaction method in C# (https://github.com/neo-project/neo/blob/master/neo/Ledger/MemoryPool.cs) // Reference: GetLowestFeeTransaction method in C# (https://github.com/neo-project/neo/blob/master/neo/Ledger/MemoryPool.cs)
func getLowestFeeTransaction(verifiedTxnSorted Items, unverifiedTxnSorted Items) (*Item, int) { func getLowestFeeTransaction(verifiedTxnSorted items, unverifiedTxnSorted items) (*item, int) {
minItem := min(unverifiedTxnSorted) minItem := min(unverifiedTxnSorted)
verifiedMin := min(verifiedTxnSorted) verifiedMin := min(verifiedTxnSorted)
if verifiedMin == nil || (minItem != nil && verifiedMin.CompareTo(minItem) >= 0) { if verifiedMin == nil || (minItem != nil && verifiedMin.CompareTo(minItem) >= 0) {
@ -278,7 +274,7 @@ func getLowestFeeTransaction(verifiedTxnSorted Items, unverifiedTxnSorted Items)
// min returns the minimum item in a ascending sorted slice of pool items. // min returns the minimum item in a ascending sorted slice of pool items.
// The function can't be applied to unsorted slice! // The function can't be applied to unsorted slice!
func min(sortedPool Items) *Item { func min(sortedPool items) *item {
if len(sortedPool) == 0 { if len(sortedPool) == 0 {
return nil return nil
} }

View file

@ -36,12 +36,11 @@ func (fs *FeerStub) SystemFee(*transaction.Transaction) util.Fixed8 {
func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
mp := NewMemPool(10) mp := NewMemPool(10)
tx := newMinerTX() tx := newMinerTX()
item := NewPoolItem(tx, fs)
_, ok := mp.TryGetValue(tx.Hash()) _, ok := mp.TryGetValue(tx.Hash())
require.Equal(t, false, ok) require.Equal(t, false, ok)
require.NoError(t, mp.TryAdd(tx.Hash(), item)) require.NoError(t, mp.Add(tx, fs))
// Re-adding should fail. // Re-adding should fail.
require.Error(t, mp.TryAdd(tx.Hash(), item)) require.Error(t, mp.Add(tx, fs))
tx2, ok := mp.TryGetValue(tx.Hash()) tx2, ok := mp.TryGetValue(tx.Hash())
require.Equal(t, true, ok) require.Equal(t, true, ok)
require.Equal(t, tx, tx2) require.Equal(t, tx, tx2)
@ -70,15 +69,13 @@ func TestMemPoolVerify(t *testing.T) {
inhash1 := random.Uint256() inhash1 := random.Uint256()
tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0}) tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0})
require.Equal(t, true, mp.Verify(tx)) require.Equal(t, true, mp.Verify(tx))
item := NewPoolItem(tx, &FeerStub{}) require.NoError(t, mp.Add(tx, &FeerStub{}))
require.NoError(t, mp.TryAdd(tx.Hash(), item))
tx2 := newMinerTX() tx2 := newMinerTX()
inhash2 := random.Uint256() inhash2 := random.Uint256()
tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0})
require.Equal(t, true, mp.Verify(tx2)) require.Equal(t, true, mp.Verify(tx2))
item = NewPoolItem(tx2, &FeerStub{}) require.NoError(t, mp.Add(tx2, &FeerStub{}))
require.NoError(t, mp.TryAdd(tx2.Hash(), item))
tx3 := newMinerTX() tx3 := newMinerTX()
// Different index number, but the same PrevHash as in tx1. // Different index number, but the same PrevHash as in tx1.