mempool: use one slice for both priorities

It doesn't harm as we have transactions naturally ordered by fee anyway and it
makes managing them a little easier. This also makes slices store item itself
instead of pointers to it which reduces the pressure on the memory subsystem.
This commit is contained in:
Roman Khimov 2020-02-05 17:13:35 +03:00
parent 325bea3fa9
commit 794027a90b
2 changed files with 74 additions and 87 deletions

View file

@ -38,12 +38,10 @@ type items []*item
// Pool stores the unconfirms transactions. // Pool stores the unconfirms transactions.
type Pool struct { type Pool struct {
lock sync.RWMutex lock sync.RWMutex
unsortedTxn map[util.Uint256]*item verifiedMap map[util.Uint256]*item
unverifiedTxn map[util.Uint256]*item unverifiedMap map[util.Uint256]*item
sortedHighPrioTxn items verifiedTxes items
sortedLowPrioTxn items unverifiedTxes items
unverifiedSortedHighPrioTxn items
unverifiedSortedLowPrioTxn items
capacity int capacity int
} }
@ -56,11 +54,19 @@ func (p items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 }
// difference < 0 implies p < otherP. // difference < 0 implies p < otherP.
// difference = 0 implies p = otherP. // difference = 0 implies p = otherP.
// difference > 0 implies p > otherP. // difference > 0 implies p > otherP.
func (p item) CompareTo(otherP *item) int { func (p *item) CompareTo(otherP *item) int {
if otherP == nil { if otherP == nil {
return 1 return 1
} }
if !p.isLowPrio && otherP.isLowPrio {
return 1
}
if p.isLowPrio && !otherP.isLowPrio {
return -1
}
if p.isLowPrio && otherP.isLowPrio { if p.isLowPrio && otherP.isLowPrio {
thisIsClaimTx := p.txn.Type == transaction.ClaimType thisIsClaimTx := p.txn.Type == transaction.ClaimType
otherIsClaimTx := otherP.txn.Type == transaction.ClaimType otherIsClaimTx := otherP.txn.Type == transaction.ClaimType
@ -93,7 +99,7 @@ func (mp *Pool) Count() int {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
return len(mp.unsortedTxn) + len(mp.unverifiedTxn) return len(mp.verifiedTxes) + len(mp.unverifiedTxes)
} }
// ContainsKey checks if a transactions hash is in the Pool. // ContainsKey checks if a transactions hash is in the Pool.
@ -101,11 +107,16 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
if _, ok := mp.unsortedTxn[hash]; ok { return mp.containsKey(hash)
}
// containsKey is an internal unlocked version of ContainsKey.
func (mp *Pool) containsKey(hash util.Uint256) bool {
if _, ok := mp.verifiedMap[hash]; ok {
return true return true
} }
if _, ok := mp.unverifiedTxn[hash]; ok { if _, ok := mp.unverifiedMap[hash]; ok {
return true return true
} }
@ -114,7 +125,6 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool {
// Add tries to add given transaction to the Pool. // Add tries to add given transaction to the Pool.
func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
var pool *items
var pItem = &item{ var pItem = &item{
txn: t, txn: t,
timeStamp: time.Now().UTC(), timeStamp: time.Now().UTC(),
@ -122,34 +132,27 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
netFee: fee.NetworkFee(t), netFee: fee.NetworkFee(t),
isLowPrio: fee.IsLowPriority(t), isLowPrio: fee.IsLowPriority(t),
} }
if pItem.isLowPrio {
pool = &mp.sortedLowPrioTxn
} else {
pool = &mp.sortedHighPrioTxn
}
mp.lock.Lock() mp.lock.Lock()
if !mp.verifyInputs(pItem.txn) { if !mp.verifyInputs(t) {
mp.lock.Unlock() mp.lock.Unlock()
return ErrConflict return ErrConflict
} }
if _, ok := mp.unsortedTxn[t.Hash()]; ok { if mp.containsKey(t.Hash()) {
mp.lock.Unlock() mp.lock.Unlock()
return ErrDup return ErrDup
} }
mp.unsortedTxn[t.Hash()] = pItem
*pool = append(*pool, pItem) mp.verifiedMap[t.Hash()] = pItem
sort.Sort(pool) mp.verifiedTxes = append(mp.verifiedTxes, pItem)
sort.Sort(mp.verifiedTxes)
mp.lock.Unlock() mp.lock.Unlock()
if mp.Count() > mp.capacity { if mp.Count() > mp.capacity {
mp.RemoveOverCapacity() mp.RemoveOverCapacity()
} }
mp.lock.RLock() mp.lock.RLock()
_, ok := mp.unsortedTxn[t.Hash()] _, ok := mp.verifiedMap[t.Hash()]
updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes))
mp.lock.RUnlock() mp.lock.RUnlock()
if !ok { if !ok {
return ErrOOM return ErrOOM
@ -161,33 +164,30 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error {
// nothing if it doesn't). // nothing if it doesn't).
func (mp *Pool) Remove(hash util.Uint256) { func (mp *Pool) Remove(hash util.Uint256) {
var mapAndPools = []struct { var mapAndPools = []struct {
unsortedMap map[util.Uint256]*item txMap map[util.Uint256]*item
sortedPools []*items txPool *items
}{ }{
{unsortedMap: mp.unsortedTxn, sortedPools: []*items{&mp.sortedHighPrioTxn, &mp.sortedLowPrioTxn}}, {txMap: mp.verifiedMap, txPool: &mp.verifiedTxes},
{unsortedMap: mp.unverifiedTxn, sortedPools: []*items{&mp.unverifiedSortedHighPrioTxn, &mp.unverifiedSortedLowPrioTxn}}, {txMap: mp.unverifiedMap, txPool: &mp.unverifiedTxes},
} }
mp.lock.Lock() mp.lock.Lock()
for _, mapAndPool := range mapAndPools { for _, mapAndPool := range mapAndPools {
if _, ok := mapAndPool.unsortedMap[hash]; ok { if _, ok := mapAndPool.txMap[hash]; ok {
delete(mapAndPool.unsortedMap, hash)
for _, pool := range mapAndPool.sortedPools {
var num int var num int
var item *item delete(mapAndPool.txMap, hash)
for num, item = range *pool { for num := range *mapAndPool.txPool {
if hash.Equals(item.txn.Hash()) { if hash.Equals((*mapAndPool.txPool)[num].txn.Hash()) {
break break
} }
} }
if num < len(*pool)-1 { if num < len(*mapAndPool.txPool)-1 {
*pool = append((*pool)[:num], (*pool)[num+1:]...) *mapAndPool.txPool = append((*mapAndPool.txPool)[:num], (*mapAndPool.txPool)[num+1:]...)
} else if num == len(*pool)-1 { } else if num == len(*mapAndPool.txPool)-1 {
*pool = (*pool)[:num] *mapAndPool.txPool = (*mapAndPool.txPool)[:num]
} }
} }
} }
} updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes))
updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn))
mp.lock.Unlock() mp.lock.Unlock()
} }
@ -196,34 +196,20 @@ func (mp *Pool) Remove(hash util.Uint256) {
func (mp *Pool) RemoveOverCapacity() { func (mp *Pool) RemoveOverCapacity() {
for mp.Count()-mp.capacity > 0 { for mp.Count()-mp.capacity > 0 {
mp.lock.Lock() mp.lock.Lock()
if minItem, argPosition := getLowestFeeTransaction(mp.sortedLowPrioTxn, mp.unverifiedSortedLowPrioTxn); minItem != nil { minItem, argPosition := getLowestFeeTransaction(mp.verifiedTxes, mp.unverifiedTxes)
if argPosition == 1 { if argPosition == 1 {
// minItem belongs to the mp.sortedLowPrioTxn slice. // minItem belongs to the mp.sortedLowPrioTxn slice.
// The corresponding unsorted pool is is mp.unsortedTxn. // The corresponding unsorted pool is is mp.unsortedTxn.
delete(mp.unsortedTxn, minItem.txn.Hash()) delete(mp.verifiedMap, minItem.txn.Hash())
mp.sortedLowPrioTxn = append(mp.sortedLowPrioTxn[:0], mp.sortedLowPrioTxn[1:]...) mp.verifiedTxes = append(mp.verifiedTxes[:0], mp.verifiedTxes[1:]...)
} else { } else {
// minItem belongs to the mp.unverifiedSortedLowPrioTxn slice. // minItem belongs to the mp.unverifiedSortedLowPrioTxn slice.
// The corresponding unsorted pool is is mp.unverifiedTxn. // The corresponding unsorted pool is is mp.unverifiedTxn.
delete(mp.unverifiedTxn, minItem.txn.Hash()) delete(mp.unverifiedMap, minItem.txn.Hash())
mp.unverifiedSortedLowPrioTxn = append(mp.unverifiedSortedLowPrioTxn[:0], mp.unverifiedSortedLowPrioTxn[1:]...) mp.unverifiedTxes = append(mp.unverifiedTxes[:0], mp.unverifiedTxes[1:]...)
} }
} else if minItem, argPosition := getLowestFeeTransaction(mp.sortedHighPrioTxn, mp.unverifiedSortedHighPrioTxn); minItem != nil { updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes))
if argPosition == 1 {
// minItem belongs to the mp.sortedHighPrioTxn slice.
// The corresponding unsorted pool is is mp.unsortedTxn.
delete(mp.unsortedTxn, minItem.txn.Hash())
mp.sortedHighPrioTxn = append(mp.sortedHighPrioTxn[:0], mp.sortedHighPrioTxn[1:]...)
} else {
// minItem belongs to the mp.unverifiedSortedHighPrioTxn slice.
// The corresponding unsorted pool is is mp.unverifiedTxn.
delete(mp.unverifiedTxn, minItem.txn.Hash())
mp.unverifiedSortedHighPrioTxn = append(mp.unverifiedSortedHighPrioTxn[:0], mp.unverifiedSortedHighPrioTxn[1:]...)
}
}
updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn))
mp.lock.Unlock() mp.lock.Unlock()
} }
@ -232,8 +218,10 @@ func (mp *Pool) RemoveOverCapacity() {
// NewMemPool returns a new Pool struct. // NewMemPool returns a new Pool struct.
func NewMemPool(capacity int) Pool { func NewMemPool(capacity int) Pool {
return Pool{ return Pool{
unsortedTxn: make(map[util.Uint256]*item), verifiedMap: make(map[util.Uint256]*item),
unverifiedTxn: make(map[util.Uint256]*item), unverifiedMap: make(map[util.Uint256]*item),
verifiedTxes: make([]*item, 0, capacity),
unverifiedTxes: make([]*item, 0, capacity),
capacity: capacity, capacity: capacity,
} }
} }
@ -242,11 +230,11 @@ func NewMemPool(capacity int) Pool {
func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) { func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
if pItem, ok := mp.unsortedTxn[hash]; ok { if pItem, ok := mp.verifiedMap[hash]; ok {
return pItem.txn, ok return pItem.txn, ok
} }
if pItem, ok := mp.unverifiedTxn[hash]; ok { if pItem, ok := mp.unverifiedMap[hash]; ok {
return pItem.txn, ok return pItem.txn, ok
} }
@ -286,10 +274,10 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction {
mp.lock.RLock() mp.lock.RLock()
defer mp.lock.RUnlock() defer mp.lock.RUnlock()
var t = make([]*transaction.Transaction, len(mp.unsortedTxn)) var t = make([]*transaction.Transaction, len(mp.verifiedMap))
var i int var i int
for _, p := range mp.unsortedTxn { for _, p := range mp.verifiedMap {
t[i] = p.txn t[i] = p.txn
i++ i++
} }
@ -302,10 +290,11 @@ func (mp *Pool) verifyInputs(tx *transaction.Transaction) bool {
if len(tx.Inputs) == 0 { if len(tx.Inputs) == 0 {
return true return true
} }
for _, item := range mp.unsortedTxn { for num := range mp.verifiedTxes {
for i := range item.txn.Inputs { txn := mp.verifiedTxes[num].txn
for i := range txn.Inputs {
for j := 0; j < len(tx.Inputs); j++ { for j := 0; j < len(tx.Inputs); j++ {
if item.txn.Inputs[i] == tx.Inputs[j] { if txn.Inputs[i] == tx.Inputs[j] {
return false return false
} }
} }

View file

@ -48,12 +48,10 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) {
_, ok = mp.TryGetValue(tx.Hash()) _, ok = mp.TryGetValue(tx.Hash())
require.Equal(t, false, ok) require.Equal(t, false, ok)
// Make sure nothing left in the mempool after removal. // Make sure nothing left in the mempool after removal.
assert.Equal(t, 0, len(mp.unsortedTxn)) assert.Equal(t, 0, len(mp.verifiedMap))
assert.Equal(t, 0, len(mp.unverifiedTxn)) assert.Equal(t, 0, len(mp.unverifiedMap))
assert.Equal(t, 0, len(mp.sortedHighPrioTxn)) assert.Equal(t, 0, len(mp.verifiedTxes))
assert.Equal(t, 0, len(mp.sortedLowPrioTxn)) assert.Equal(t, 0, len(mp.unverifiedTxes))
assert.Equal(t, 0, len(mp.unverifiedSortedHighPrioTxn))
assert.Equal(t, 0, len(mp.unverifiedSortedLowPrioTxn))
} }
func TestMemPoolAddRemove(t *testing.T) { func TestMemPoolAddRemove(t *testing.T) {