From 794027a90b0d8aa7afcf495a4bd27e44e599be4d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 5 Feb 2020 17:13:35 +0300 Subject: [PATCH] mempool: use one slice for both priorities It doesn't harm as we have transactions naturally ordered by fee anyway and it makes managing them a little easier. This also makes slices store item itself instead of pointers to it which reduces the pressure on the memory subsystem. --- pkg/core/mempool/mem_pool.go | 151 ++++++++++++++---------------- pkg/core/mempool/mem_pool_test.go | 10 +- 2 files changed, 74 insertions(+), 87 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 3be8e47f9..326ce495f 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -37,13 +37,11 @@ type items []*item // Pool stores the unconfirms transactions. type Pool struct { - lock sync.RWMutex - unsortedTxn map[util.Uint256]*item - unverifiedTxn map[util.Uint256]*item - sortedHighPrioTxn items - sortedLowPrioTxn items - unverifiedSortedHighPrioTxn items - unverifiedSortedLowPrioTxn items + lock sync.RWMutex + verifiedMap map[util.Uint256]*item + unverifiedMap map[util.Uint256]*item + verifiedTxes items + unverifiedTxes items capacity int } @@ -56,11 +54,19 @@ func (p items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 } // difference < 0 implies p < otherP. // difference = 0 implies p = otherP. // difference > 0 implies p > otherP. -func (p item) CompareTo(otherP *item) int { +func (p *item) CompareTo(otherP *item) int { if otherP == nil { return 1 } + if !p.isLowPrio && otherP.isLowPrio { + return 1 + } + + if p.isLowPrio && !otherP.isLowPrio { + return -1 + } + if p.isLowPrio && otherP.isLowPrio { thisIsClaimTx := p.txn.Type == transaction.ClaimType otherIsClaimTx := otherP.txn.Type == transaction.ClaimType @@ -93,7 +99,7 @@ func (mp *Pool) Count() int { mp.lock.RLock() defer mp.lock.RUnlock() - return len(mp.unsortedTxn) + len(mp.unverifiedTxn) + return len(mp.verifiedTxes) + len(mp.unverifiedTxes) } // ContainsKey checks if a transactions hash is in the Pool. @@ -101,11 +107,16 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool { mp.lock.RLock() defer mp.lock.RUnlock() - if _, ok := mp.unsortedTxn[hash]; ok { + return mp.containsKey(hash) +} + +// containsKey is an internal unlocked version of ContainsKey. +func (mp *Pool) containsKey(hash util.Uint256) bool { + if _, ok := mp.verifiedMap[hash]; ok { return true } - if _, ok := mp.unverifiedTxn[hash]; ok { + if _, ok := mp.unverifiedMap[hash]; ok { return true } @@ -114,7 +125,6 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool { // Add tries to add given transaction to the Pool. func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { - var pool *items var pItem = &item{ txn: t, timeStamp: time.Now().UTC(), @@ -122,34 +132,27 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { netFee: fee.NetworkFee(t), isLowPrio: fee.IsLowPriority(t), } - - if pItem.isLowPrio { - pool = &mp.sortedLowPrioTxn - } else { - pool = &mp.sortedHighPrioTxn - } - mp.lock.Lock() - if !mp.verifyInputs(pItem.txn) { + if !mp.verifyInputs(t) { mp.lock.Unlock() return ErrConflict } - if _, ok := mp.unsortedTxn[t.Hash()]; ok { + if mp.containsKey(t.Hash()) { mp.lock.Unlock() return ErrDup } - mp.unsortedTxn[t.Hash()] = pItem - *pool = append(*pool, pItem) - sort.Sort(pool) + mp.verifiedMap[t.Hash()] = pItem + mp.verifiedTxes = append(mp.verifiedTxes, pItem) + sort.Sort(mp.verifiedTxes) mp.lock.Unlock() if mp.Count() > mp.capacity { mp.RemoveOverCapacity() } mp.lock.RLock() - _, ok := mp.unsortedTxn[t.Hash()] - updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) + _, ok := mp.verifiedMap[t.Hash()] + updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) mp.lock.RUnlock() if !ok { return ErrOOM @@ -161,33 +164,30 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { // nothing if it doesn't). func (mp *Pool) Remove(hash util.Uint256) { var mapAndPools = []struct { - unsortedMap map[util.Uint256]*item - sortedPools []*items + txMap map[util.Uint256]*item + txPool *items }{ - {unsortedMap: mp.unsortedTxn, sortedPools: []*items{&mp.sortedHighPrioTxn, &mp.sortedLowPrioTxn}}, - {unsortedMap: mp.unverifiedTxn, sortedPools: []*items{&mp.unverifiedSortedHighPrioTxn, &mp.unverifiedSortedLowPrioTxn}}, + {txMap: mp.verifiedMap, txPool: &mp.verifiedTxes}, + {txMap: mp.unverifiedMap, txPool: &mp.unverifiedTxes}, } mp.lock.Lock() for _, mapAndPool := range mapAndPools { - if _, ok := mapAndPool.unsortedMap[hash]; ok { - delete(mapAndPool.unsortedMap, hash) - for _, pool := range mapAndPool.sortedPools { - var num int - var item *item - for num, item = range *pool { - if hash.Equals(item.txn.Hash()) { - break - } - } - if num < len(*pool)-1 { - *pool = append((*pool)[:num], (*pool)[num+1:]...) - } else if num == len(*pool)-1 { - *pool = (*pool)[:num] + if _, ok := mapAndPool.txMap[hash]; ok { + var num int + delete(mapAndPool.txMap, hash) + for num := range *mapAndPool.txPool { + if hash.Equals((*mapAndPool.txPool)[num].txn.Hash()) { + break } } + if num < len(*mapAndPool.txPool)-1 { + *mapAndPool.txPool = append((*mapAndPool.txPool)[:num], (*mapAndPool.txPool)[num+1:]...) + } else if num == len(*mapAndPool.txPool)-1 { + *mapAndPool.txPool = (*mapAndPool.txPool)[:num] + } } } - updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) + updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) mp.lock.Unlock() } @@ -196,34 +196,20 @@ func (mp *Pool) Remove(hash util.Uint256) { func (mp *Pool) RemoveOverCapacity() { for mp.Count()-mp.capacity > 0 { mp.lock.Lock() - if minItem, argPosition := getLowestFeeTransaction(mp.sortedLowPrioTxn, mp.unverifiedSortedLowPrioTxn); minItem != nil { - if argPosition == 1 { - // minItem belongs to the mp.sortedLowPrioTxn slice. - // The corresponding unsorted pool is is mp.unsortedTxn. - delete(mp.unsortedTxn, minItem.txn.Hash()) - mp.sortedLowPrioTxn = append(mp.sortedLowPrioTxn[:0], mp.sortedLowPrioTxn[1:]...) - } else { - // minItem belongs to the mp.unverifiedSortedLowPrioTxn slice. - // The corresponding unsorted pool is is mp.unverifiedTxn. - delete(mp.unverifiedTxn, minItem.txn.Hash()) - mp.unverifiedSortedLowPrioTxn = append(mp.unverifiedSortedLowPrioTxn[:0], mp.unverifiedSortedLowPrioTxn[1:]...) + minItem, argPosition := getLowestFeeTransaction(mp.verifiedTxes, mp.unverifiedTxes) + if argPosition == 1 { + // minItem belongs to the mp.sortedLowPrioTxn slice. + // The corresponding unsorted pool is is mp.unsortedTxn. + delete(mp.verifiedMap, minItem.txn.Hash()) + mp.verifiedTxes = append(mp.verifiedTxes[:0], mp.verifiedTxes[1:]...) + } else { + // minItem belongs to the mp.unverifiedSortedLowPrioTxn slice. + // The corresponding unsorted pool is is mp.unverifiedTxn. + delete(mp.unverifiedMap, minItem.txn.Hash()) + mp.unverifiedTxes = append(mp.unverifiedTxes[:0], mp.unverifiedTxes[1:]...) - } - } else if minItem, argPosition := getLowestFeeTransaction(mp.sortedHighPrioTxn, mp.unverifiedSortedHighPrioTxn); minItem != nil { - if argPosition == 1 { - // minItem belongs to the mp.sortedHighPrioTxn slice. - // The corresponding unsorted pool is is mp.unsortedTxn. - delete(mp.unsortedTxn, minItem.txn.Hash()) - mp.sortedHighPrioTxn = append(mp.sortedHighPrioTxn[:0], mp.sortedHighPrioTxn[1:]...) - } else { - // minItem belongs to the mp.unverifiedSortedHighPrioTxn slice. - // The corresponding unsorted pool is is mp.unverifiedTxn. - delete(mp.unverifiedTxn, minItem.txn.Hash()) - mp.unverifiedSortedHighPrioTxn = append(mp.unverifiedSortedHighPrioTxn[:0], mp.unverifiedSortedHighPrioTxn[1:]...) - - } } - updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) + updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) mp.lock.Unlock() } @@ -232,9 +218,11 @@ func (mp *Pool) RemoveOverCapacity() { // NewMemPool returns a new Pool struct. func NewMemPool(capacity int) Pool { return Pool{ - unsortedTxn: make(map[util.Uint256]*item), - unverifiedTxn: make(map[util.Uint256]*item), - capacity: capacity, + verifiedMap: make(map[util.Uint256]*item), + unverifiedMap: make(map[util.Uint256]*item), + verifiedTxes: make([]*item, 0, capacity), + unverifiedTxes: make([]*item, 0, capacity), + capacity: capacity, } } @@ -242,11 +230,11 @@ func NewMemPool(capacity int) Pool { func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) { mp.lock.RLock() defer mp.lock.RUnlock() - if pItem, ok := mp.unsortedTxn[hash]; ok { + if pItem, ok := mp.verifiedMap[hash]; ok { return pItem.txn, ok } - if pItem, ok := mp.unverifiedTxn[hash]; ok { + if pItem, ok := mp.unverifiedMap[hash]; ok { return pItem.txn, ok } @@ -286,10 +274,10 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { mp.lock.RLock() defer mp.lock.RUnlock() - var t = make([]*transaction.Transaction, len(mp.unsortedTxn)) + var t = make([]*transaction.Transaction, len(mp.verifiedMap)) var i int - for _, p := range mp.unsortedTxn { + for _, p := range mp.verifiedMap { t[i] = p.txn i++ } @@ -302,10 +290,11 @@ func (mp *Pool) verifyInputs(tx *transaction.Transaction) bool { if len(tx.Inputs) == 0 { return true } - for _, item := range mp.unsortedTxn { - for i := range item.txn.Inputs { + for num := range mp.verifiedTxes { + txn := mp.verifiedTxes[num].txn + for i := range txn.Inputs { for j := 0; j < len(tx.Inputs); j++ { - if item.txn.Inputs[i] == tx.Inputs[j] { + if txn.Inputs[i] == tx.Inputs[j] { return false } } diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index 546febe75..d8bbb416a 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -48,12 +48,10 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { _, ok = mp.TryGetValue(tx.Hash()) require.Equal(t, false, ok) // Make sure nothing left in the mempool after removal. - assert.Equal(t, 0, len(mp.unsortedTxn)) - assert.Equal(t, 0, len(mp.unverifiedTxn)) - assert.Equal(t, 0, len(mp.sortedHighPrioTxn)) - assert.Equal(t, 0, len(mp.sortedLowPrioTxn)) - assert.Equal(t, 0, len(mp.unverifiedSortedHighPrioTxn)) - assert.Equal(t, 0, len(mp.unverifiedSortedLowPrioTxn)) + assert.Equal(t, 0, len(mp.verifiedMap)) + assert.Equal(t, 0, len(mp.unverifiedMap)) + assert.Equal(t, 0, len(mp.verifiedTxes)) + assert.Equal(t, 0, len(mp.unverifiedTxes)) } func TestMemPoolAddRemove(t *testing.T) {