From f0bb886be333d268ec72faca5c0e1fee50782f51 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 4 Feb 2020 17:36:11 +0300 Subject: [PATCH 01/18] mempool: make all methods pointer methods Makes no sense copying the Pool around. --- pkg/core/blockchain.go | 4 ++-- pkg/core/blockchainer.go | 2 +- pkg/core/mempool/mem_pool.go | 12 ++++++------ pkg/network/helper_test.go | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 285a0eb98..6c9b1dffa 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -963,8 +963,8 @@ func (bc *Blockchain) IsLowPriority(t *transaction.Transaction) bool { } // GetMemPool returns the memory pool of the blockchain. -func (bc *Blockchain) GetMemPool() mempool.Pool { - return bc.memPool +func (bc *Blockchain) GetMemPool() *mempool.Pool { + return &bc.memPool } // VerifyBlock verifies block against its current state. diff --git a/pkg/core/blockchainer.go b/pkg/core/blockchainer.go index 34b6d70cf..e3ef02698 100644 --- a/pkg/core/blockchainer.go +++ b/pkg/core/blockchainer.go @@ -41,5 +41,5 @@ type Blockchainer interface { References(t *transaction.Transaction) map[transaction.Input]*transaction.Output mempool.Feer // fee interface VerifyTx(*transaction.Transaction, *block.Block) error - GetMemPool() mempool.Pool + GetMemPool() *mempool.Pool } diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 444f2e36e..d5fc5a9d7 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -77,7 +77,7 @@ func (p Item) CompareTo(otherP *Item) int { } // Count returns the total number of uncofirm transactions. -func (mp Pool) Count() int { +func (mp *Pool) Count() int { mp.lock.RLock() defer mp.lock.RUnlock() @@ -85,7 +85,7 @@ func (mp Pool) Count() int { } // ContainsKey checks if a transactions hash is in the Pool. -func (mp Pool) ContainsKey(hash util.Uint256) bool { +func (mp *Pool) ContainsKey(hash util.Uint256) bool { mp.lock.RLock() defer mp.lock.RUnlock() @@ -101,7 +101,7 @@ func (mp Pool) ContainsKey(hash util.Uint256) bool { } // TryAdd try to add the Item to the Pool. -func (mp Pool) TryAdd(hash util.Uint256, pItem *Item) bool { +func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) bool { var pool Items mp.lock.Lock() @@ -124,7 +124,7 @@ func (mp Pool) TryAdd(hash util.Uint256, pItem *Item) bool { mp.lock.Unlock() if mp.Count() > mp.capacity { - (&mp).RemoveOverCapacity() + mp.RemoveOverCapacity() } mp.lock.RLock() _, ok := mp.unsortedTxn[hash] @@ -225,7 +225,7 @@ func NewMemPool(capacity int) Pool { } // TryGetValue returns a transaction if it exists in the memory pool. -func (mp Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) { +func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) { mp.lock.RLock() defer mp.lock.RUnlock() if pItem, ok := mp.unsortedTxn[hash]; ok { @@ -286,7 +286,7 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { // Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool. // If yes, the transaction tx is not a valid transaction and the function return false. // If no, the transaction tx is a valid transaction and the function return true. -func (mp Pool) Verify(tx *transaction.Transaction) bool { +func (mp *Pool) Verify(tx *transaction.Transaction) bool { mp.lock.RLock() defer mp.lock.RUnlock() for _, item := range mp.unsortedTxn { diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 823d78525..00ea04ac6 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -118,7 +118,7 @@ func (chain testChain) GetUnspentCoinState(util.Uint256) *core.UnspentCoinState panic("TODO") } -func (chain testChain) GetMemPool() mempool.Pool { +func (chain testChain) GetMemPool() *mempool.Pool { panic("TODO") } From 70b3839fd090b68b87918a4712e81c3ec0f2a20d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 4 Feb 2020 18:43:21 +0300 Subject: [PATCH 02/18] core/mempool: fix AddBlock and tx pooling concurrency issues Eliminate races between tx checks and adding them to the mempool, ensure the chain doesn't change while we're working with the new tx. Ensure only one block addition attempt could be in progress. --- pkg/consensus/consensus_test.go | 4 +- pkg/core/blockchain.go | 70 ++++++++++++++++++++++++++++--- pkg/core/blockchainer.go | 1 + pkg/core/mempool/mem_pool.go | 44 +++++++++++++++---- pkg/core/mempool/mem_pool_test.go | 8 ++-- pkg/network/helper_test.go | 4 ++ pkg/network/server.go | 18 ++++---- 7 files changed, 120 insertions(+), 29 deletions(-) diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 2d642203a..654f86ade 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -43,7 +43,7 @@ func TestService_GetVerified(t *testing.T) { pool := srv.Chain.GetMemPool() item := mempool.NewPoolItem(txs[3], new(feer)) - require.True(t, pool.TryAdd(txs[3].Hash(), item)) + require.NoError(t, pool.TryAdd(txs[3].Hash(), item)) hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()} @@ -69,7 +69,7 @@ func TestService_GetVerified(t *testing.T) { t.Run("more than half of the last proposal will be reused", func(t *testing.T) { for _, tx := range txs[:2] { item := mempool.NewPoolItem(tx, new(feer)) - require.True(t, pool.TryAdd(tx.Hash(), item)) + require.NoError(t, pool.TryAdd(tx.Hash(), item)) } txx := srv.getVerifiedTx(10) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 6c9b1dffa..48491edd6 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -7,6 +7,7 @@ import ( "math/big" "sort" "strconv" + "sync" "sync/atomic" "time" @@ -41,6 +42,15 @@ const ( defaultMemPoolSize = 50000 ) +var ( + // ErrAlreadyExists is returned when trying to add some already existing + // transaction into the pool (not specifying whether it exists in the + // chain or mempool). + ErrAlreadyExists = errors.New("already exists") + // ErrOOM is returned when adding transaction to the memory pool because + // it reached its full capacity. + ErrOOM = errors.New("no space left in the memory pool") +) var ( genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} decrementInterval = 2000000 @@ -51,6 +61,19 @@ var ( type Blockchain struct { config config.ProtocolConfiguration + // The only way chain state changes is by adding blocks, so we can't + // allow concurrent block additions. It differs from the next lock in + // that it's only for AddBlock method itself, the chain state is + // protected by the lock below, but holding it during all of AddBlock + // is too expensive (because the state only changes when persisting + // change cache). + addLock sync.Mutex + + // This lock ensures blockchain immutability for operations that need + // that while performing their tasks. It's mostly used as a read lock + // with the only writer being the block addition logic. + lock sync.RWMutex + // Data access object for CRUD operations around storage. dao *dao @@ -251,6 +274,9 @@ func (bc *Blockchain) Close() { // AddBlock accepts successive block for the Blockchain, verifies it and // stores internally. Eventually it will be persisted to the backing storage. func (bc *Blockchain) AddBlock(block *block.Block) error { + bc.addLock.Lock() + defer bc.addLock.Unlock() + expectedHeight := bc.BlockHeight() + 1 if expectedHeight != block.Index { return fmt.Errorf("expected block %d, but passed block %d", expectedHeight, block.Index) @@ -575,6 +601,9 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { } } } + bc.lock.Lock() + defer bc.lock.Unlock() + _, err := cache.Persist() if err != nil { return err @@ -982,11 +1011,8 @@ func (bc *Blockchain) VerifyBlock(block *block.Block) error { return bc.verifyBlockWitnesses(block, prevHeader) } -// VerifyTx verifies whether a transaction is bonafide or not. Block parameter -// is used for easy interop access and can be omitted for transactions that are -// not yet added into any block. -// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270). -func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error { +// verifyTx verifies whether a transaction is bonafide or not. +func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) error { if io.GetVarSize(t) > transaction.MaxTransactionSize { return errors.Errorf("invalid transaction size = %d. It shoud be less then MaxTransactionSize = %d", io.GetVarSize(t), transaction.MaxTransactionSize) } @@ -1017,6 +1043,40 @@ func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) e return bc.verifyTxWitnesses(t, block) } +// VerifyTx verifies whether a transaction is bonafide or not. Block parameter +// is used for easy interop access and can be omitted for transactions that are +// not yet added into any block. +// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270). +func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error { + bc.lock.RLock() + defer bc.lock.RUnlock() + return bc.verifyTx(t, block) +} + +// PoolTx verifies and tries to add given transaction into the mempool. +func (bc *Blockchain) PoolTx(t *transaction.Transaction) error { + bc.lock.RLock() + defer bc.lock.RUnlock() + + if bc.HasTransaction(t.Hash()) { + return ErrAlreadyExists + } + if err := bc.verifyTx(t, nil); err != nil { + return err + } + if err := bc.memPool.TryAdd(t.Hash(), mempool.NewPoolItem(t, bc)); err != nil { + switch err { + case mempool.ErrOOM: + return ErrOOM + case mempool.ErrConflict: + return ErrAlreadyExists + default: + return err + } + } + return nil +} + func (bc *Blockchain) verifyInputs(t *transaction.Transaction) bool { for i := 1; i < len(t.Inputs); i++ { for j := 0; j < i; j++ { diff --git a/pkg/core/blockchainer.go b/pkg/core/blockchainer.go index e3ef02698..79e6bc08f 100644 --- a/pkg/core/blockchainer.go +++ b/pkg/core/blockchainer.go @@ -40,6 +40,7 @@ type Blockchainer interface { GetUnspentCoinState(util.Uint256) *UnspentCoinState References(t *transaction.Transaction) map[transaction.Input]*transaction.Output mempool.Feer // fee interface + PoolTx(*transaction.Transaction) error VerifyTx(*transaction.Transaction, *block.Block) error GetMemPool() *mempool.Pool } diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index d5fc5a9d7..07ef64c76 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -1,6 +1,7 @@ package mempool import ( + "errors" "sort" "sync" "time" @@ -9,6 +10,19 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" ) +var ( + // ErrConflict is returned when transaction being added is incompatible + // with the contents of the memory pool (using the same inputs as some + // other transaction in the pool) + ErrConflict = errors.New("conflicts with the memory pool") + // ErrDup is returned when transaction being added is already present + // in the memory pool. + ErrDup = errors.New("already in the memory pool") + // ErrOOM is returned when transaction just doesn't fit in the memory + // pool because of its capacity constraints. + ErrOOM = errors.New("out of memory") +) + // Item represents a transaction in the the Memory pool. type Item struct { txn *transaction.Transaction @@ -101,13 +115,17 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool { } // TryAdd try to add the Item to the Pool. -func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) bool { +func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error { var pool Items mp.lock.Lock() + if !mp.verifyInputs(pItem.txn) { + mp.lock.Unlock() + return ErrConflict + } if _, ok := mp.unsortedTxn[hash]; ok { mp.lock.Unlock() - return false + return ErrDup } mp.unsortedTxn[hash] = pItem mp.lock.Unlock() @@ -130,7 +148,10 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) bool { _, ok := mp.unsortedTxn[hash] updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) mp.lock.RUnlock() - return ok + if !ok { + return ErrOOM + } + return nil } // Remove removes an item from the mempool, if it exists there (and does @@ -283,12 +304,8 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { return t } -// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool. -// If yes, the transaction tx is not a valid transaction and the function return false. -// If no, the transaction tx is a valid transaction and the function return true. -func (mp *Pool) Verify(tx *transaction.Transaction) bool { - mp.lock.RLock() - defer mp.lock.RUnlock() +// verifyInputs is an internal unprotected version of Verify. +func (mp *Pool) verifyInputs(tx *transaction.Transaction) bool { for _, item := range mp.unsortedTxn { for i := range item.txn.Inputs { for j := 0; j < len(tx.Inputs); j++ { @@ -301,3 +318,12 @@ func (mp *Pool) Verify(tx *transaction.Transaction) bool { return true } + +// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool. +// If yes, the transaction tx is not a valid transaction and the function return false. +// If no, the transaction tx is a valid transaction and the function return true. +func (mp *Pool) Verify(tx *transaction.Transaction) bool { + mp.lock.RLock() + defer mp.lock.RUnlock() + return mp.verifyInputs(tx) +} diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index 6da40043b..db6a5b25c 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -39,9 +39,9 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { item := NewPoolItem(tx, fs) _, ok := mp.TryGetValue(tx.Hash()) require.Equal(t, false, ok) - require.Equal(t, true, mp.TryAdd(tx.Hash(), item)) + require.NoError(t, mp.TryAdd(tx.Hash(), item)) // Re-adding should fail. - require.Equal(t, false, mp.TryAdd(tx.Hash(), item)) + require.Error(t, mp.TryAdd(tx.Hash(), item)) tx2, ok := mp.TryGetValue(tx.Hash()) require.Equal(t, true, ok) require.Equal(t, tx, tx2) @@ -71,14 +71,14 @@ func TestMemPoolVerify(t *testing.T) { tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx)) item := NewPoolItem(tx, &FeerStub{}) - require.Equal(t, true, mp.TryAdd(tx.Hash(), item)) + require.NoError(t, mp.TryAdd(tx.Hash(), item)) tx2 := newMinerTX() inhash2 := random.Uint256() tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx2)) item = NewPoolItem(tx2, &FeerStub{}) - require.Equal(t, true, mp.TryAdd(tx2.Hash(), item)) + require.NoError(t, mp.TryAdd(tx2.Hash(), item)) tx3 := newMinerTX() // Different index number, but the same PrevHash as in tx1. diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 00ea04ac6..1ab191d87 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -126,6 +126,10 @@ func (chain testChain) IsLowPriority(*transaction.Transaction) bool { panic("TODO") } +func (chain testChain) PoolTx(*transaction.Transaction) error { + panic("TODO") +} + func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error { panic("TODO") } diff --git a/pkg/network/server.go b/pkg/network/server.go index c62d7f34c..681932e61 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -13,7 +13,6 @@ import ( "github.com/CityOfZion/neo-go/pkg/consensus" "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core/block" - "github.com/CityOfZion/neo-go/pkg/core/mempool" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/util" @@ -743,17 +742,18 @@ func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason { if t.Type == transaction.MinerType { return RelayInvalid } - if s.chain.HasTransaction(t.Hash()) { - return RelayAlreadyExists - } - if err := s.chain.VerifyTx(t, nil); err != nil { - return RelayInvalid - } // TODO: Implement Plugin.CheckPolicy? //if (!Plugin.CheckPolicy(transaction)) // return RelayResultReason.PolicyFail; - if ok := s.chain.GetMemPool().TryAdd(t.Hash(), mempool.NewPoolItem(t, s.chain)); !ok { - return RelayOutOfMemory + if err := s.chain.PoolTx(t); err != nil { + switch err { + case core.ErrAlreadyExists: + return RelayAlreadyExists + case core.ErrOOM: + return RelayOutOfMemory + default: + return RelayInvalid + } } return RelaySucceed } From f9963cca377d2789bbb09eef5a58e9a564ab4241 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 4 Feb 2020 18:58:40 +0300 Subject: [PATCH 03/18] core: short-circuit verifyInputs when there are no inputs --- pkg/core/mempool/mem_pool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 07ef64c76..0848b69dc 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -306,6 +306,9 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { // verifyInputs is an internal unprotected version of Verify. func (mp *Pool) verifyInputs(tx *transaction.Transaction) bool { + if len(tx.Inputs) == 0 { + return true + } for _, item := range mp.unsortedTxn { for i := range item.txn.Inputs { for j := 0; j < len(tx.Inputs); j++ { From b9b77ac1be93d7a8577bcfe943576d9303200208 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 4 Feb 2020 19:32:29 +0300 Subject: [PATCH 04/18] network: fix block relaying, don't spit out useless errors We can only add one block of the given height and we have two competing goroutines to do that --- consensus and block queue. Whomever adds the block first shouldn't trigger an error in another one. Fix block relaying for blocks added via the block queue also, previously one consensus-generated blocks were broadcasted. --- pkg/consensus/consensus.go | 6 +++++- pkg/network/blockqueue.go | 17 ++++++++++++----- pkg/network/blockqueue_test.go | 2 +- pkg/network/server.go | 8 ++++++-- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 96847d904..8b9d686f1 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -335,7 +335,11 @@ func (s *service) processBlock(b block.Block) { bb.Script = *(s.getBlockWitness(bb)) if err := s.Chain.AddBlock(bb); err != nil { - s.log.Warn("error on add block", zap.Error(err)) + // The block might already be added via the regular network + // interaction. + if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { + s.log.Warn("error on add block", zap.Error(err)) + } } else { s.Config.RelayBlock(bb) } diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index e26d96629..2911ffced 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -12,9 +12,10 @@ type blockQueue struct { queue *queue.PriorityQueue checkBlocks chan struct{} chain core.Blockchainer + relayF func(*block.Block) } -func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQueue { +func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { if log == nil { return nil } @@ -24,6 +25,7 @@ func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQu queue: queue.NewPriorityQueue(capacity, false), checkBlocks: make(chan struct{}, 1), chain: bc, + relayF: relayer, } } @@ -45,10 +47,15 @@ func (bq *blockQueue) run() { if minblock.Index == bq.chain.BlockHeight()+1 { err := bq.chain.AddBlock(minblock) if err != nil { - bq.log.Warn("blockQueue: failed adding block into the blockchain", - zap.String("error", err.Error()), - zap.Uint32("blockHeight", bq.chain.BlockHeight()), - zap.Uint32("nextIndex", minblock.Index)) + // The block might already be added by consensus. + if _, errget := bq.chain.GetBlock(minblock.Hash()); errget != nil { + bq.log.Warn("blockQueue: failed adding block into the blockchain", + zap.String("error", err.Error()), + zap.Uint32("blockHeight", bq.chain.BlockHeight()), + zap.Uint32("nextIndex", minblock.Index)) + } + } else if bq.relayF != nil { + bq.relayF(minblock) } } } else { diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index 62a43595f..af3540096 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -12,7 +12,7 @@ import ( func TestBlockQueue(t *testing.T) { chain := &testChain{} // notice, it's not yet running - bq := newBlockQueue(0, chain, zaptest.NewLogger(t)) + bq := newBlockQueue(0, chain, zaptest.NewLogger(t), nil) blocks := make([]*block.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &block.Block{Base: block.Base{Index: uint32(i)}} diff --git a/pkg/network/server.go b/pkg/network/server.go index 681932e61..5a45ed393 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -90,7 +90,6 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* s := &Server{ ServerConfig: config, chain: chain, - bQueue: newBlockQueue(maxBlockBatch, chain, log), id: randomID(), quit: make(chan struct{}), register: make(chan Peer), @@ -99,6 +98,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* connected: atomic.NewBool(false), log: log, } + s.bQueue = newBlockQueue(maxBlockBatch, chain, log, s.relayBlock) srv, err := consensus.NewService(consensus.Config{ Logger: log, @@ -734,7 +734,11 @@ func (s *Server) broadcastHPMessage(msg *Message) { // relayBlock tells all the other connected nodes about the given block. func (s *Server) relayBlock(b *block.Block) { msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) - s.broadcastMessage(msg) + // Filter out nodes that are more current (avoid spamming the network + // during initial sync). + s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { + return p.Handshaked() && p.LastBlockIndex() < b.Index + }) } // verifyAndPoolTX verifies the TX and adds it to the local mempool. From f0e3a31bc899e339e0a70f56055ef0c3dde77274 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 5 Feb 2020 00:07:05 +0300 Subject: [PATCH 05/18] mempool: fix appending to sorted pools Appending and not changing the real Items is utterly wrong. --- pkg/core/mempool/mem_pool.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 0848b69dc..d6a3b94d7 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -116,7 +116,13 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool { // TryAdd try to add the Item to the Pool. func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error { - var pool Items + var pool *Items + + if pItem.fee.IsLowPriority(pItem.txn) { + pool = &mp.sortedLowPrioTxn + } else { + pool = &mp.sortedHighPrioTxn + } mp.lock.Lock() if !mp.verifyInputs(pItem.txn) { @@ -128,16 +134,8 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error { return ErrDup } mp.unsortedTxn[hash] = pItem - mp.lock.Unlock() - if pItem.fee.IsLowPriority(pItem.txn) { - pool = mp.sortedLowPrioTxn - } else { - pool = mp.sortedHighPrioTxn - } - - mp.lock.Lock() - pool = append(pool, pItem) + *pool = append(*pool, pItem) sort.Sort(pool) mp.lock.Unlock() From a928ad9cfa968c291dae93a0485702340fc2a3ce Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 5 Feb 2020 14:24:36 +0300 Subject: [PATCH 06/18] mempool: make item an internal thing of mempool package Nobody outside should care about these details, mempool operates on transactions and that's it. --- pkg/consensus/consensus_test.go | 13 ++--- pkg/core/blockchain.go | 2 +- pkg/core/mempool/mem_pool.go | 80 +++++++++++++++---------------- pkg/core/mempool/mem_pool_test.go | 11 ++--- 4 files changed, 47 insertions(+), 59 deletions(-) diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 654f86ade..9db01b198 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -5,7 +5,6 @@ import ( "github.com/CityOfZion/neo-go/config" "github.com/CityOfZion/neo-go/pkg/core" - "github.com/CityOfZion/neo-go/pkg/core/mempool" "github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/crypto/keys" @@ -22,8 +21,7 @@ func TestNewService(t *testing.T) { Type: transaction.MinerType, Data: &transaction.MinerTX{Nonce: 12345}, } - item := mempool.NewPoolItem(tx, new(feer)) - srv.Chain.GetMemPool().TryAdd(tx.Hash(), item) + srv.Chain.GetMemPool().Add(tx, new(feer)) var txx []block.Transaction require.NotPanics(t, func() { txx = srv.getVerifiedTx(1) }) @@ -41,9 +39,8 @@ func TestService_GetVerified(t *testing.T) { newMinerTx(4), } pool := srv.Chain.GetMemPool() - item := mempool.NewPoolItem(txs[3], new(feer)) - require.NoError(t, pool.TryAdd(txs[3].Hash(), item)) + require.NoError(t, pool.Add(txs[3], new(feer))) hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()} @@ -68,8 +65,7 @@ func TestService_GetVerified(t *testing.T) { t.Run("more than half of the last proposal will be reused", func(t *testing.T) { for _, tx := range txs[:2] { - item := mempool.NewPoolItem(tx, new(feer)) - require.NoError(t, pool.TryAdd(tx.Hash(), item)) + require.NoError(t, pool.Add(tx, new(feer))) } txx := srv.getVerifiedTx(10) @@ -119,8 +115,7 @@ func TestService_getTx(t *testing.T) { require.Equal(t, nil, srv.getTx(h)) - item := mempool.NewPoolItem(tx, new(feer)) - srv.Chain.GetMemPool().TryAdd(h, item) + srv.Chain.GetMemPool().Add(tx, new(feer)) got := srv.getTx(h) require.NotNil(t, got) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 48491edd6..b92d307f9 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -1064,7 +1064,7 @@ func (bc *Blockchain) PoolTx(t *transaction.Transaction) error { if err := bc.verifyTx(t, nil); err != nil { return err } - if err := bc.memPool.TryAdd(t.Hash(), mempool.NewPoolItem(t, bc)); err != nil { + if err := bc.memPool.Add(t, bc); err != nil { switch err { case mempool.ErrOOM: return ErrOOM diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index d6a3b94d7..aa28b04a8 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -23,38 +23,38 @@ var ( ErrOOM = errors.New("out of memory") ) -// Item represents a transaction in the the Memory pool. -type Item struct { +// item represents a transaction in the the Memory pool. +type item struct { txn *transaction.Transaction timeStamp time.Time fee Feer } -// Items is a slice of Item. -type Items []*Item +// items is a slice of item. +type items []*item // Pool stores the unconfirms transactions. type Pool struct { lock *sync.RWMutex - unsortedTxn map[util.Uint256]*Item - unverifiedTxn map[util.Uint256]*Item - sortedHighPrioTxn Items - sortedLowPrioTxn Items - unverifiedSortedHighPrioTxn Items - unverifiedSortedLowPrioTxn Items + unsortedTxn map[util.Uint256]*item + unverifiedTxn map[util.Uint256]*item + sortedHighPrioTxn items + sortedLowPrioTxn items + unverifiedSortedHighPrioTxn items + unverifiedSortedLowPrioTxn items capacity int } -func (p Items) Len() int { return len(p) } -func (p Items) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (p Items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 } +func (p items) Len() int { return len(p) } +func (p items) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 } -// CompareTo returns the difference between two Items. +// CompareTo returns the difference between two items. // difference < 0 implies p < otherP. // difference = 0 implies p = otherP. // difference > 0 implies p > otherP. -func (p Item) CompareTo(otherP *Item) int { +func (p item) CompareTo(otherP *item) int { if otherP == nil { return 1 } @@ -114,9 +114,14 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool { return false } -// TryAdd try to add the Item to the Pool. -func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error { - var pool *Items +// Add tries to add given transaction to the Pool. +func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { + var pool *items + var pItem = &item{ + txn: t, + timeStamp: time.Now().UTC(), + fee: fee, + } if pItem.fee.IsLowPriority(pItem.txn) { pool = &mp.sortedLowPrioTxn @@ -129,11 +134,11 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error { mp.lock.Unlock() return ErrConflict } - if _, ok := mp.unsortedTxn[hash]; ok { + if _, ok := mp.unsortedTxn[t.Hash()]; ok { mp.lock.Unlock() return ErrDup } - mp.unsortedTxn[hash] = pItem + mp.unsortedTxn[t.Hash()] = pItem *pool = append(*pool, pItem) sort.Sort(pool) @@ -143,7 +148,7 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error { mp.RemoveOverCapacity() } mp.lock.RLock() - _, ok := mp.unsortedTxn[hash] + _, ok := mp.unsortedTxn[t.Hash()] updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) mp.lock.RUnlock() if !ok { @@ -156,11 +161,11 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error { // nothing if it doesn't). func (mp *Pool) Remove(hash util.Uint256) { var mapAndPools = []struct { - unsortedMap map[util.Uint256]*Item - sortedPools []*Items + unsortedMap map[util.Uint256]*item + sortedPools []*items }{ - {unsortedMap: mp.unsortedTxn, sortedPools: []*Items{&mp.sortedHighPrioTxn, &mp.sortedLowPrioTxn}}, - {unsortedMap: mp.unverifiedTxn, sortedPools: []*Items{&mp.unverifiedSortedHighPrioTxn, &mp.unverifiedSortedLowPrioTxn}}, + {unsortedMap: mp.unsortedTxn, sortedPools: []*items{&mp.sortedHighPrioTxn, &mp.sortedLowPrioTxn}}, + {unsortedMap: mp.unverifiedTxn, sortedPools: []*items{&mp.unverifiedSortedHighPrioTxn, &mp.unverifiedSortedLowPrioTxn}}, } mp.lock.Lock() for _, mapAndPool := range mapAndPools { @@ -168,7 +173,7 @@ func (mp *Pool) Remove(hash util.Uint256) { delete(mapAndPool.unsortedMap, hash) for _, pool := range mapAndPool.sortedPools { var num int - var item *Item + var item *item for num, item = range *pool { if hash.Equals(item.txn.Hash()) { break @@ -224,21 +229,12 @@ func (mp *Pool) RemoveOverCapacity() { } -// NewPoolItem returns a new Item. -func NewPoolItem(t *transaction.Transaction, fee Feer) *Item { - return &Item{ - txn: t, - timeStamp: time.Now().UTC(), - fee: fee, - } -} - // NewMemPool returns a new Pool struct. func NewMemPool(capacity int) Pool { return Pool{ lock: new(sync.RWMutex), - unsortedTxn: make(map[util.Uint256]*Item), - unverifiedTxn: make(map[util.Uint256]*Item), + unsortedTxn: make(map[util.Uint256]*item), + unverifiedTxn: make(map[util.Uint256]*item), capacity: capacity, } } @@ -258,13 +254,13 @@ func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) return nil, false } -// getLowestFeeTransaction returns the Item with the lowest fee amongst the "verifiedTxnSorted" -// and "unverifiedTxnSorted" Items along with a integer. The integer can assume two values, 1 and 2 which indicate -// that the Item with the lowest fee was found in "verifiedTxnSorted" respectively in "unverifiedTxnSorted". +// getLowestFeeTransaction returns the item with the lowest fee amongst the "verifiedTxnSorted" +// and "unverifiedTxnSorted" items along with a integer. The integer can assume two values, 1 and 2 which indicate +// that the item with the lowest fee was found in "verifiedTxnSorted" respectively in "unverifiedTxnSorted". // "verifiedTxnSorted" and "unverifiedTxnSorted" are sorted slice order by transaction fee ascending. This means that // the transaction with lowest fee start at index 0. // Reference: GetLowestFeeTransaction method in C# (https://github.com/neo-project/neo/blob/master/neo/Ledger/MemoryPool.cs) -func getLowestFeeTransaction(verifiedTxnSorted Items, unverifiedTxnSorted Items) (*Item, int) { +func getLowestFeeTransaction(verifiedTxnSorted items, unverifiedTxnSorted items) (*item, int) { minItem := min(unverifiedTxnSorted) verifiedMin := min(verifiedTxnSorted) if verifiedMin == nil || (minItem != nil && verifiedMin.CompareTo(minItem) >= 0) { @@ -278,7 +274,7 @@ func getLowestFeeTransaction(verifiedTxnSorted Items, unverifiedTxnSorted Items) // min returns the minimum item in a ascending sorted slice of pool items. // The function can't be applied to unsorted slice! -func min(sortedPool Items) *Item { +func min(sortedPool items) *item { if len(sortedPool) == 0 { return nil } diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index db6a5b25c..546febe75 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -36,12 +36,11 @@ func (fs *FeerStub) SystemFee(*transaction.Transaction) util.Fixed8 { func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { mp := NewMemPool(10) tx := newMinerTX() - item := NewPoolItem(tx, fs) _, ok := mp.TryGetValue(tx.Hash()) require.Equal(t, false, ok) - require.NoError(t, mp.TryAdd(tx.Hash(), item)) + require.NoError(t, mp.Add(tx, fs)) // Re-adding should fail. - require.Error(t, mp.TryAdd(tx.Hash(), item)) + require.Error(t, mp.Add(tx, fs)) tx2, ok := mp.TryGetValue(tx.Hash()) require.Equal(t, true, ok) require.Equal(t, tx, tx2) @@ -70,15 +69,13 @@ func TestMemPoolVerify(t *testing.T) { inhash1 := random.Uint256() tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx)) - item := NewPoolItem(tx, &FeerStub{}) - require.NoError(t, mp.TryAdd(tx.Hash(), item)) + require.NoError(t, mp.Add(tx, &FeerStub{})) tx2 := newMinerTX() inhash2 := random.Uint256() tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx2)) - item = NewPoolItem(tx2, &FeerStub{}) - require.NoError(t, mp.TryAdd(tx2.Hash(), item)) + require.NoError(t, mp.Add(tx2, &FeerStub{})) tx3 := newMinerTX() // Different index number, but the same PrevHash as in tx1. From e01bfeeb4d04eb4c7b6d0e1c20a44ecc53abb32d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 5 Feb 2020 14:27:03 +0300 Subject: [PATCH 07/18] mempool: remove lock indirection from the Pool After the f0bb886be333d268ec72faca5c0e1fee50782f51 with all methods of Pool being pointer-based it makes no sense having this lock as a pointer. --- pkg/core/mempool/mem_pool.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index aa28b04a8..13e6689e1 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -35,7 +35,7 @@ type items []*item // Pool stores the unconfirms transactions. type Pool struct { - lock *sync.RWMutex + lock sync.RWMutex unsortedTxn map[util.Uint256]*item unverifiedTxn map[util.Uint256]*item sortedHighPrioTxn items @@ -232,7 +232,6 @@ func (mp *Pool) RemoveOverCapacity() { // NewMemPool returns a new Pool struct. func NewMemPool(capacity int) Pool { return Pool{ - lock: new(sync.RWMutex), unsortedTxn: make(map[util.Uint256]*item), unverifiedTxn: make(map[util.Uint256]*item), capacity: capacity, From 325bea3fa9d8641711f936bd8b926557dd8932ad Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 5 Feb 2020 14:34:58 +0300 Subject: [PATCH 08/18] mempool: cache Feer invocation results in the item They shouldn't depend on the chain state and for the same transaction they should always produce the same result. Thus, it makes no sense recalculating them over and over again. --- pkg/core/mempool/mem_pool.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 13e6689e1..3be8e47f9 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -25,9 +25,11 @@ var ( // item represents a transaction in the the Memory pool. type item struct { - txn *transaction.Transaction - timeStamp time.Time - fee Feer + txn *transaction.Transaction + timeStamp time.Time + perByteFee util.Fixed8 + netFee util.Fixed8 + isLowPrio bool } // items is a slice of item. @@ -59,7 +61,7 @@ func (p item) CompareTo(otherP *item) int { return 1 } - if p.fee.IsLowPriority(p.txn) && p.fee.IsLowPriority(otherP.txn) { + if p.isLowPrio && otherP.isLowPrio { thisIsClaimTx := p.txn.Type == transaction.ClaimType otherIsClaimTx := otherP.txn.Type == transaction.ClaimType @@ -74,15 +76,11 @@ func (p item) CompareTo(otherP *item) int { } // Fees sorted ascending. - pFPB := p.fee.FeePerByte(p.txn) - otherFPB := p.fee.FeePerByte(otherP.txn) - if ret := pFPB.CompareTo(otherFPB); ret != 0 { + if ret := p.perByteFee.CompareTo(otherP.perByteFee); ret != 0 { return ret } - pNF := p.fee.NetworkFee(p.txn) - otherNF := p.fee.NetworkFee(otherP.txn) - if ret := pNF.CompareTo(otherNF); ret != 0 { + if ret := p.netFee.CompareTo(otherP.netFee); ret != 0 { return ret } @@ -118,12 +116,14 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool { func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { var pool *items var pItem = &item{ - txn: t, - timeStamp: time.Now().UTC(), - fee: fee, + txn: t, + timeStamp: time.Now().UTC(), + perByteFee: fee.FeePerByte(t), + netFee: fee.NetworkFee(t), + isLowPrio: fee.IsLowPriority(t), } - if pItem.fee.IsLowPriority(pItem.txn) { + if pItem.isLowPrio { pool = &mp.sortedLowPrioTxn } else { pool = &mp.sortedHighPrioTxn From 794027a90b0d8aa7afcf495a4bd27e44e599be4d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 5 Feb 2020 17:13:35 +0300 Subject: [PATCH 09/18] mempool: use one slice for both priorities It doesn't harm as we have transactions naturally ordered by fee anyway and it makes managing them a little easier. This also makes slices store item itself instead of pointers to it which reduces the pressure on the memory subsystem. --- pkg/core/mempool/mem_pool.go | 151 ++++++++++++++---------------- pkg/core/mempool/mem_pool_test.go | 10 +- 2 files changed, 74 insertions(+), 87 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 3be8e47f9..326ce495f 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -37,13 +37,11 @@ type items []*item // Pool stores the unconfirms transactions. type Pool struct { - lock sync.RWMutex - unsortedTxn map[util.Uint256]*item - unverifiedTxn map[util.Uint256]*item - sortedHighPrioTxn items - sortedLowPrioTxn items - unverifiedSortedHighPrioTxn items - unverifiedSortedLowPrioTxn items + lock sync.RWMutex + verifiedMap map[util.Uint256]*item + unverifiedMap map[util.Uint256]*item + verifiedTxes items + unverifiedTxes items capacity int } @@ -56,11 +54,19 @@ func (p items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 } // difference < 0 implies p < otherP. // difference = 0 implies p = otherP. // difference > 0 implies p > otherP. -func (p item) CompareTo(otherP *item) int { +func (p *item) CompareTo(otherP *item) int { if otherP == nil { return 1 } + if !p.isLowPrio && otherP.isLowPrio { + return 1 + } + + if p.isLowPrio && !otherP.isLowPrio { + return -1 + } + if p.isLowPrio && otherP.isLowPrio { thisIsClaimTx := p.txn.Type == transaction.ClaimType otherIsClaimTx := otherP.txn.Type == transaction.ClaimType @@ -93,7 +99,7 @@ func (mp *Pool) Count() int { mp.lock.RLock() defer mp.lock.RUnlock() - return len(mp.unsortedTxn) + len(mp.unverifiedTxn) + return len(mp.verifiedTxes) + len(mp.unverifiedTxes) } // ContainsKey checks if a transactions hash is in the Pool. @@ -101,11 +107,16 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool { mp.lock.RLock() defer mp.lock.RUnlock() - if _, ok := mp.unsortedTxn[hash]; ok { + return mp.containsKey(hash) +} + +// containsKey is an internal unlocked version of ContainsKey. +func (mp *Pool) containsKey(hash util.Uint256) bool { + if _, ok := mp.verifiedMap[hash]; ok { return true } - if _, ok := mp.unverifiedTxn[hash]; ok { + if _, ok := mp.unverifiedMap[hash]; ok { return true } @@ -114,7 +125,6 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool { // Add tries to add given transaction to the Pool. func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { - var pool *items var pItem = &item{ txn: t, timeStamp: time.Now().UTC(), @@ -122,34 +132,27 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { netFee: fee.NetworkFee(t), isLowPrio: fee.IsLowPriority(t), } - - if pItem.isLowPrio { - pool = &mp.sortedLowPrioTxn - } else { - pool = &mp.sortedHighPrioTxn - } - mp.lock.Lock() - if !mp.verifyInputs(pItem.txn) { + if !mp.verifyInputs(t) { mp.lock.Unlock() return ErrConflict } - if _, ok := mp.unsortedTxn[t.Hash()]; ok { + if mp.containsKey(t.Hash()) { mp.lock.Unlock() return ErrDup } - mp.unsortedTxn[t.Hash()] = pItem - *pool = append(*pool, pItem) - sort.Sort(pool) + mp.verifiedMap[t.Hash()] = pItem + mp.verifiedTxes = append(mp.verifiedTxes, pItem) + sort.Sort(mp.verifiedTxes) mp.lock.Unlock() if mp.Count() > mp.capacity { mp.RemoveOverCapacity() } mp.lock.RLock() - _, ok := mp.unsortedTxn[t.Hash()] - updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) + _, ok := mp.verifiedMap[t.Hash()] + updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) mp.lock.RUnlock() if !ok { return ErrOOM @@ -161,33 +164,30 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { // nothing if it doesn't). func (mp *Pool) Remove(hash util.Uint256) { var mapAndPools = []struct { - unsortedMap map[util.Uint256]*item - sortedPools []*items + txMap map[util.Uint256]*item + txPool *items }{ - {unsortedMap: mp.unsortedTxn, sortedPools: []*items{&mp.sortedHighPrioTxn, &mp.sortedLowPrioTxn}}, - {unsortedMap: mp.unverifiedTxn, sortedPools: []*items{&mp.unverifiedSortedHighPrioTxn, &mp.unverifiedSortedLowPrioTxn}}, + {txMap: mp.verifiedMap, txPool: &mp.verifiedTxes}, + {txMap: mp.unverifiedMap, txPool: &mp.unverifiedTxes}, } mp.lock.Lock() for _, mapAndPool := range mapAndPools { - if _, ok := mapAndPool.unsortedMap[hash]; ok { - delete(mapAndPool.unsortedMap, hash) - for _, pool := range mapAndPool.sortedPools { - var num int - var item *item - for num, item = range *pool { - if hash.Equals(item.txn.Hash()) { - break - } - } - if num < len(*pool)-1 { - *pool = append((*pool)[:num], (*pool)[num+1:]...) - } else if num == len(*pool)-1 { - *pool = (*pool)[:num] + if _, ok := mapAndPool.txMap[hash]; ok { + var num int + delete(mapAndPool.txMap, hash) + for num := range *mapAndPool.txPool { + if hash.Equals((*mapAndPool.txPool)[num].txn.Hash()) { + break } } + if num < len(*mapAndPool.txPool)-1 { + *mapAndPool.txPool = append((*mapAndPool.txPool)[:num], (*mapAndPool.txPool)[num+1:]...) + } else if num == len(*mapAndPool.txPool)-1 { + *mapAndPool.txPool = (*mapAndPool.txPool)[:num] + } } } - updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) + updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) mp.lock.Unlock() } @@ -196,34 +196,20 @@ func (mp *Pool) Remove(hash util.Uint256) { func (mp *Pool) RemoveOverCapacity() { for mp.Count()-mp.capacity > 0 { mp.lock.Lock() - if minItem, argPosition := getLowestFeeTransaction(mp.sortedLowPrioTxn, mp.unverifiedSortedLowPrioTxn); minItem != nil { - if argPosition == 1 { - // minItem belongs to the mp.sortedLowPrioTxn slice. - // The corresponding unsorted pool is is mp.unsortedTxn. - delete(mp.unsortedTxn, minItem.txn.Hash()) - mp.sortedLowPrioTxn = append(mp.sortedLowPrioTxn[:0], mp.sortedLowPrioTxn[1:]...) - } else { - // minItem belongs to the mp.unverifiedSortedLowPrioTxn slice. - // The corresponding unsorted pool is is mp.unverifiedTxn. - delete(mp.unverifiedTxn, minItem.txn.Hash()) - mp.unverifiedSortedLowPrioTxn = append(mp.unverifiedSortedLowPrioTxn[:0], mp.unverifiedSortedLowPrioTxn[1:]...) + minItem, argPosition := getLowestFeeTransaction(mp.verifiedTxes, mp.unverifiedTxes) + if argPosition == 1 { + // minItem belongs to the mp.sortedLowPrioTxn slice. + // The corresponding unsorted pool is is mp.unsortedTxn. + delete(mp.verifiedMap, minItem.txn.Hash()) + mp.verifiedTxes = append(mp.verifiedTxes[:0], mp.verifiedTxes[1:]...) + } else { + // minItem belongs to the mp.unverifiedSortedLowPrioTxn slice. + // The corresponding unsorted pool is is mp.unverifiedTxn. + delete(mp.unverifiedMap, minItem.txn.Hash()) + mp.unverifiedTxes = append(mp.unverifiedTxes[:0], mp.unverifiedTxes[1:]...) - } - } else if minItem, argPosition := getLowestFeeTransaction(mp.sortedHighPrioTxn, mp.unverifiedSortedHighPrioTxn); minItem != nil { - if argPosition == 1 { - // minItem belongs to the mp.sortedHighPrioTxn slice. - // The corresponding unsorted pool is is mp.unsortedTxn. - delete(mp.unsortedTxn, minItem.txn.Hash()) - mp.sortedHighPrioTxn = append(mp.sortedHighPrioTxn[:0], mp.sortedHighPrioTxn[1:]...) - } else { - // minItem belongs to the mp.unverifiedSortedHighPrioTxn slice. - // The corresponding unsorted pool is is mp.unverifiedTxn. - delete(mp.unverifiedTxn, minItem.txn.Hash()) - mp.unverifiedSortedHighPrioTxn = append(mp.unverifiedSortedHighPrioTxn[:0], mp.unverifiedSortedHighPrioTxn[1:]...) - - } } - updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) + updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) mp.lock.Unlock() } @@ -232,9 +218,11 @@ func (mp *Pool) RemoveOverCapacity() { // NewMemPool returns a new Pool struct. func NewMemPool(capacity int) Pool { return Pool{ - unsortedTxn: make(map[util.Uint256]*item), - unverifiedTxn: make(map[util.Uint256]*item), - capacity: capacity, + verifiedMap: make(map[util.Uint256]*item), + unverifiedMap: make(map[util.Uint256]*item), + verifiedTxes: make([]*item, 0, capacity), + unverifiedTxes: make([]*item, 0, capacity), + capacity: capacity, } } @@ -242,11 +230,11 @@ func NewMemPool(capacity int) Pool { func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) { mp.lock.RLock() defer mp.lock.RUnlock() - if pItem, ok := mp.unsortedTxn[hash]; ok { + if pItem, ok := mp.verifiedMap[hash]; ok { return pItem.txn, ok } - if pItem, ok := mp.unverifiedTxn[hash]; ok { + if pItem, ok := mp.unverifiedMap[hash]; ok { return pItem.txn, ok } @@ -286,10 +274,10 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { mp.lock.RLock() defer mp.lock.RUnlock() - var t = make([]*transaction.Transaction, len(mp.unsortedTxn)) + var t = make([]*transaction.Transaction, len(mp.verifiedMap)) var i int - for _, p := range mp.unsortedTxn { + for _, p := range mp.verifiedMap { t[i] = p.txn i++ } @@ -302,10 +290,11 @@ func (mp *Pool) verifyInputs(tx *transaction.Transaction) bool { if len(tx.Inputs) == 0 { return true } - for _, item := range mp.unsortedTxn { - for i := range item.txn.Inputs { + for num := range mp.verifiedTxes { + txn := mp.verifiedTxes[num].txn + for i := range txn.Inputs { for j := 0; j < len(tx.Inputs); j++ { - if item.txn.Inputs[i] == tx.Inputs[j] { + if txn.Inputs[i] == tx.Inputs[j] { return false } } diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index 546febe75..d8bbb416a 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -48,12 +48,10 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { _, ok = mp.TryGetValue(tx.Hash()) require.Equal(t, false, ok) // Make sure nothing left in the mempool after removal. - assert.Equal(t, 0, len(mp.unsortedTxn)) - assert.Equal(t, 0, len(mp.unverifiedTxn)) - assert.Equal(t, 0, len(mp.sortedHighPrioTxn)) - assert.Equal(t, 0, len(mp.sortedLowPrioTxn)) - assert.Equal(t, 0, len(mp.unverifiedSortedHighPrioTxn)) - assert.Equal(t, 0, len(mp.unverifiedSortedLowPrioTxn)) + assert.Equal(t, 0, len(mp.verifiedMap)) + assert.Equal(t, 0, len(mp.unverifiedMap)) + assert.Equal(t, 0, len(mp.verifiedTxes)) + assert.Equal(t, 0, len(mp.unverifiedTxes)) } func TestMemPoolAddRemove(t *testing.T) { From 35183b6dbae642d42971357d002c71ba5532ef5e Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 5 Feb 2020 17:46:19 +0300 Subject: [PATCH 10/18] mempool: reverse the order of sorted slice Chopping off the last element of the slice if way easier than doing it with the first one. --- pkg/core/mempool/mem_pool.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 326ce495f..24950168f 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -144,7 +144,7 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { mp.verifiedMap[t.Hash()] = pItem mp.verifiedTxes = append(mp.verifiedTxes, pItem) - sort.Sort(mp.verifiedTxes) + sort.Sort(sort.Reverse(mp.verifiedTxes)) mp.lock.Unlock() if mp.Count() > mp.capacity { @@ -201,13 +201,12 @@ func (mp *Pool) RemoveOverCapacity() { // minItem belongs to the mp.sortedLowPrioTxn slice. // The corresponding unsorted pool is is mp.unsortedTxn. delete(mp.verifiedMap, minItem.txn.Hash()) - mp.verifiedTxes = append(mp.verifiedTxes[:0], mp.verifiedTxes[1:]...) + mp.verifiedTxes = mp.verifiedTxes[:len(mp.verifiedTxes)-1] } else { // minItem belongs to the mp.unverifiedSortedLowPrioTxn slice. // The corresponding unsorted pool is is mp.unverifiedTxn. delete(mp.unverifiedMap, minItem.txn.Hash()) - mp.unverifiedTxes = append(mp.unverifiedTxes[:0], mp.unverifiedTxes[1:]...) - + mp.unverifiedTxes = mp.unverifiedTxes[:len(mp.unverifiedTxes)-1] } updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) mp.lock.Unlock() @@ -265,7 +264,7 @@ func min(sortedPool items) *item { if len(sortedPool) == 0 { return nil } - return sortedPool[0] + return sortedPool[len(sortedPool)-1] } // GetVerifiedTransactions returns a slice of Input from all the transactions in the memory pool From b567ce86ac2261342ee4c5451c5a209c7590ddf1 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 5 Feb 2020 18:18:34 +0300 Subject: [PATCH 11/18] mempool: implement insertion to sorted slice Which is way faster than sort.Sort'ing things all the time. --- pkg/core/mempool/mem_pool.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 24950168f..f0e9cdc48 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -98,7 +98,11 @@ func (p *item) CompareTo(otherP *item) int { func (mp *Pool) Count() int { mp.lock.RLock() defer mp.lock.RUnlock() + return mp.count() +} +// count is an internal unlocked version of Count. +func (mp *Pool) count() int { return len(mp.verifiedTxes) + len(mp.unverifiedTxes) } @@ -143,8 +147,20 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { } mp.verifiedMap[t.Hash()] = pItem + // Insert into sorted array (from max to min, that could also be done + // using sort.Sort(sort.Reverse()), but it incurs more overhead. Notice + // also that we're searching for position that is strictly more + // prioritized than our new item because we do expect a lot of + // transactions with the same priority and appending to the end of the + // slice is always more efficient. + n := sort.Search(len(mp.verifiedTxes), func(n int) bool { + return pItem.CompareTo(mp.verifiedTxes[n]) > 0 + }) mp.verifiedTxes = append(mp.verifiedTxes, pItem) - sort.Sort(sort.Reverse(mp.verifiedTxes)) + if n != len(mp.verifiedTxes) { + copy(mp.verifiedTxes[n+1:], mp.verifiedTxes[n:]) + mp.verifiedTxes[n] = pItem + } mp.lock.Unlock() if mp.Count() > mp.capacity { @@ -194,8 +210,8 @@ func (mp *Pool) Remove(hash util.Uint256) { // RemoveOverCapacity removes transactions with lowest fees until the total number of transactions // in the Pool is within the capacity of the Pool. func (mp *Pool) RemoveOverCapacity() { - for mp.Count()-mp.capacity > 0 { - mp.lock.Lock() + mp.lock.Lock() + for mp.count()-mp.capacity > 0 { minItem, argPosition := getLowestFeeTransaction(mp.verifiedTxes, mp.unverifiedTxes) if argPosition == 1 { // minItem belongs to the mp.sortedLowPrioTxn slice. @@ -209,9 +225,8 @@ func (mp *Pool) RemoveOverCapacity() { mp.unverifiedTxes = mp.unverifiedTxes[:len(mp.unverifiedTxes)-1] } updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) - mp.lock.Unlock() } - + mp.lock.Unlock() } // NewMemPool returns a new Pool struct. From b675903f5220f853cc9348081789f9fb48a48ec1 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 6 Feb 2020 00:23:49 +0300 Subject: [PATCH 12/18] mempool/core: redesign mempool dances on block acceptance We not only need to remove transactions stored in the block, but also invalidate some potential double spends caused by these transactions. Usually new block contains a substantial number of transactions from the pool, so it's easier to make one pass over it only keeping valid items rather than remove them one by one and make an additional pass to recheck inputs/witnesses. --- pkg/core/blockchain.go | 33 ++++++++++++++++++++++++++++++--- pkg/core/mempool/mem_pool.go | 19 +++++++++++++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index b92d307f9..0a22cf76b 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -611,9 +611,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { bc.topBlock.Store(block) atomic.StoreUint32(&bc.blockHeight, block.Index) updateBlockHeightMetric(block.Index) - for _, tx := range block.Transactions { - bc.memPool.Remove(tx.Hash()) - } + bc.memPool.RemoveStale(bc.isTxStillRelevant) return nil } @@ -1043,6 +1041,35 @@ func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) e return bc.verifyTxWitnesses(t, block) } +// isTxStillRelevant is a callback for mempool transaction filtering after the +// new block addition. It returns false for transactions already present in the +// chain (added by the new block), transactions using some inputs that are +// already used (double spends) and does witness reverification for non-standard +// contracts. It operates under the assumption that full transaction verification +// was already done so we don't need to check basic things like size, input/output +// correctness, etc. +func (bc *Blockchain) isTxStillRelevant(t *transaction.Transaction) bool { + var recheckWitness bool + + if bc.dao.HasTransaction(t.Hash()) { + return false + } + if bc.dao.IsDoubleSpend(t) { + return false + } + for i := range t.Scripts { + if !vm.IsStandardContract(t.Scripts[i].VerificationScript) { + recheckWitness = true + break + } + } + if recheckWitness { + return bc.verifyTxWitnesses(t, nil) == nil + } + return true + +} + // VerifyTx verifies whether a transaction is bonafide or not. Block parameter // is used for easy interop access and can be omitted for transactions that are // not yet added into any block. diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index f0e9cdc48..7935f9241 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -207,6 +207,25 @@ func (mp *Pool) Remove(hash util.Uint256) { mp.lock.Unlock() } +// RemoveStale filters verified transactions through the given function keeping +// only the transactions for which it returns a true result. It's used to quickly +// drop part of the mempool that is now invalid after the block acceptance. +func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) { + mp.lock.Lock() + // We expect a lot of changes, so it's easier to allocate a new slice + // rather than move things in an old one. + newVerifiedTxes := make([]*item, 0, mp.capacity) + for _, itm := range mp.verifiedTxes { + if isOK(itm.txn) { + newVerifiedTxes = append(newVerifiedTxes, itm) + } else { + delete(mp.verifiedMap, itm.txn.Hash()) + } + } + mp.verifiedTxes = newVerifiedTxes + mp.lock.Unlock() +} + // RemoveOverCapacity removes transactions with lowest fees until the total number of transactions // in the Pool is within the capacity of the Pool. func (mp *Pool) RemoveOverCapacity() { From 1133bbe584db050c9a20ea94943741d1daa1d848 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 6 Feb 2020 15:33:49 +0300 Subject: [PATCH 13/18] mempool: remove unverified transactions pool Our mempool only contains valid verified transactions all the time, it never has any unverified ones. Unverified pool made some sense for quick unverifying after the new block acceptance (and gradual background reverification), but reverification needs some non-trivial locking between blockchain and mempool and internal mempool state locking (reverifying tx and moving it between unverified and verified pools must be atomic). But our current reverification is fast enough (and has all the appropriate locks), so bothering with unverified pool makes little sense. --- pkg/core/mempool/mem_pool.go | 108 +++++++----------------------- pkg/core/mempool/mem_pool_test.go | 2 - pkg/core/mempool/prometheus.go | 12 +--- 3 files changed, 27 insertions(+), 95 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 7935f9241..023a7fa68 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -37,11 +37,9 @@ type items []*item // Pool stores the unconfirms transactions. type Pool struct { - lock sync.RWMutex - verifiedMap map[util.Uint256]*item - unverifiedMap map[util.Uint256]*item - verifiedTxes items - unverifiedTxes items + lock sync.RWMutex + verifiedMap map[util.Uint256]*item + verifiedTxes items capacity int } @@ -103,7 +101,7 @@ func (mp *Pool) Count() int { // count is an internal unlocked version of Count. func (mp *Pool) count() int { - return len(mp.verifiedTxes) + len(mp.unverifiedTxes) + return len(mp.verifiedTxes) } // ContainsKey checks if a transactions hash is in the Pool. @@ -120,10 +118,6 @@ func (mp *Pool) containsKey(hash util.Uint256) bool { return true } - if _, ok := mp.unverifiedMap[hash]; ok { - return true - } - return false } @@ -168,7 +162,7 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { } mp.lock.RLock() _, ok := mp.verifiedMap[t.Hash()] - updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) + updateMempoolMetrics(len(mp.verifiedTxes)) mp.lock.RUnlock() if !ok { return ErrOOM @@ -179,31 +173,22 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { // Remove removes an item from the mempool, if it exists there (and does // nothing if it doesn't). func (mp *Pool) Remove(hash util.Uint256) { - var mapAndPools = []struct { - txMap map[util.Uint256]*item - txPool *items - }{ - {txMap: mp.verifiedMap, txPool: &mp.verifiedTxes}, - {txMap: mp.unverifiedMap, txPool: &mp.unverifiedTxes}, - } mp.lock.Lock() - for _, mapAndPool := range mapAndPools { - if _, ok := mapAndPool.txMap[hash]; ok { - var num int - delete(mapAndPool.txMap, hash) - for num := range *mapAndPool.txPool { - if hash.Equals((*mapAndPool.txPool)[num].txn.Hash()) { - break - } - } - if num < len(*mapAndPool.txPool)-1 { - *mapAndPool.txPool = append((*mapAndPool.txPool)[:num], (*mapAndPool.txPool)[num+1:]...) - } else if num == len(*mapAndPool.txPool)-1 { - *mapAndPool.txPool = (*mapAndPool.txPool)[:num] + if _, ok := mp.verifiedMap[hash]; ok { + var num int + delete(mp.verifiedMap, hash) + for num := range mp.verifiedTxes { + if hash.Equals(mp.verifiedTxes[num].txn.Hash()) { + break } } + if num < len(mp.verifiedTxes)-1 { + mp.verifiedTxes = append(mp.verifiedTxes[:num], mp.verifiedTxes[num+1:]...) + } else if num == len(mp.verifiedTxes)-1 { + mp.verifiedTxes = mp.verifiedTxes[:num] + } } - updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) + updateMempoolMetrics(len(mp.verifiedTxes)) mp.lock.Unlock() } @@ -230,32 +215,22 @@ func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) { // in the Pool is within the capacity of the Pool. func (mp *Pool) RemoveOverCapacity() { mp.lock.Lock() - for mp.count()-mp.capacity > 0 { - minItem, argPosition := getLowestFeeTransaction(mp.verifiedTxes, mp.unverifiedTxes) - if argPosition == 1 { - // minItem belongs to the mp.sortedLowPrioTxn slice. - // The corresponding unsorted pool is is mp.unsortedTxn. - delete(mp.verifiedMap, minItem.txn.Hash()) - mp.verifiedTxes = mp.verifiedTxes[:len(mp.verifiedTxes)-1] - } else { - // minItem belongs to the mp.unverifiedSortedLowPrioTxn slice. - // The corresponding unsorted pool is is mp.unverifiedTxn. - delete(mp.unverifiedMap, minItem.txn.Hash()) - mp.unverifiedTxes = mp.unverifiedTxes[:len(mp.unverifiedTxes)-1] - } - updateMempoolMetrics(len(mp.verifiedTxes), len(mp.unverifiedTxes)) + num := mp.count() - mp.capacity + for i := 1; i <= num; i++ { + txToDrop := mp.verifiedTxes[len(mp.verifiedTxes)-num].txn + delete(mp.verifiedMap, txToDrop.Hash()) } + mp.verifiedTxes = mp.verifiedTxes[:len(mp.verifiedTxes)-num] + updateMempoolMetrics(mp.count()) mp.lock.Unlock() } // NewMemPool returns a new Pool struct. func NewMemPool(capacity int) Pool { return Pool{ - verifiedMap: make(map[util.Uint256]*item), - unverifiedMap: make(map[util.Uint256]*item), - verifiedTxes: make([]*item, 0, capacity), - unverifiedTxes: make([]*item, 0, capacity), - capacity: capacity, + verifiedMap: make(map[util.Uint256]*item), + verifiedTxes: make([]*item, 0, capacity), + capacity: capacity, } } @@ -267,40 +242,9 @@ func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) return pItem.txn, ok } - if pItem, ok := mp.unverifiedMap[hash]; ok { - return pItem.txn, ok - } - return nil, false } -// getLowestFeeTransaction returns the item with the lowest fee amongst the "verifiedTxnSorted" -// and "unverifiedTxnSorted" items along with a integer. The integer can assume two values, 1 and 2 which indicate -// that the item with the lowest fee was found in "verifiedTxnSorted" respectively in "unverifiedTxnSorted". -// "verifiedTxnSorted" and "unverifiedTxnSorted" are sorted slice order by transaction fee ascending. This means that -// the transaction with lowest fee start at index 0. -// Reference: GetLowestFeeTransaction method in C# (https://github.com/neo-project/neo/blob/master/neo/Ledger/MemoryPool.cs) -func getLowestFeeTransaction(verifiedTxnSorted items, unverifiedTxnSorted items) (*item, int) { - minItem := min(unverifiedTxnSorted) - verifiedMin := min(verifiedTxnSorted) - if verifiedMin == nil || (minItem != nil && verifiedMin.CompareTo(minItem) >= 0) { - return minItem, 2 - } - - minItem = verifiedMin - return minItem, 1 - -} - -// min returns the minimum item in a ascending sorted slice of pool items. -// The function can't be applied to unsorted slice! -func min(sortedPool items) *item { - if len(sortedPool) == 0 { - return nil - } - return sortedPool[len(sortedPool)-1] -} - // GetVerifiedTransactions returns a slice of Input from all the transactions in the memory pool // whose hash is not included in excludedHashes. func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index d8bbb416a..3e846344b 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -49,9 +49,7 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { require.Equal(t, false, ok) // Make sure nothing left in the mempool after removal. assert.Equal(t, 0, len(mp.verifiedMap)) - assert.Equal(t, 0, len(mp.unverifiedMap)) assert.Equal(t, 0, len(mp.verifiedTxes)) - assert.Equal(t, 0, len(mp.unverifiedTxes)) } func TestMemPoolAddRemove(t *testing.T) { diff --git a/pkg/core/mempool/prometheus.go b/pkg/core/mempool/prometheus.go index 3d8a6a585..dbe073a0d 100644 --- a/pkg/core/mempool/prometheus.go +++ b/pkg/core/mempool/prometheus.go @@ -11,24 +11,14 @@ var ( Namespace: "neogo", }, ) - //mempoolUnverifiedTx prometheus metric. - mempoolUnverifiedTx = prometheus.NewGauge( - prometheus.GaugeOpts{ - Help: "Mempool Unverified TXs", - Name: "mempool_unverified_tx", - Namespace: "neogo", - }, - ) ) func init() { prometheus.MustRegister( mempoolUnsortedTx, - mempoolUnverifiedTx, ) } -func updateMempoolMetrics(unsortedTxnLen int, unverifiedTxnLen int) { +func updateMempoolMetrics(unsortedTxnLen int) { mempoolUnsortedTx.Set(float64(unsortedTxnLen)) - mempoolUnverifiedTx.Set(float64(unverifiedTxnLen)) } From e97396e56c68da470378ff0876c8fee809acd239 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 6 Feb 2020 15:57:20 +0300 Subject: [PATCH 14/18] mempool: test addition of conflicting tx to the pool --- pkg/core/mempool/mem_pool_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index 3e846344b..6575ccf4e 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -80,6 +80,7 @@ func TestMemPoolVerify(t *testing.T) { // The same input as in tx2. tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) require.Equal(t, false, mp.Verify(tx3)) + require.Error(t, mp.Add(tx3, &FeerStub{})) } func newMinerTX() *transaction.Transaction { From 6672f4b26f9ea3b641b55b730a8b4ffbe119ddf3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 6 Feb 2020 16:59:20 +0300 Subject: [PATCH 15/18] mempool: iterate over slice in GetVerifiedTransactions() It's more efficient and keeps transactions sorted by priority. --- pkg/core/mempool/mem_pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 023a7fa68..684464fce 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -251,10 +251,10 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { mp.lock.RLock() defer mp.lock.RUnlock() - var t = make([]*transaction.Transaction, len(mp.verifiedMap)) + var t = make([]*transaction.Transaction, len(mp.verifiedTxes)) var i int - for _, p := range mp.verifiedMap { + for _, p := range mp.verifiedTxes { t[i] = p.txn i++ } From 684cbf5bac8b0f40538b67fd53c904b554602ea2 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 6 Feb 2020 17:20:38 +0300 Subject: [PATCH 16/18] mempool: make it almost 100% test-covered --- pkg/core/mempool/mem_pool_test.go | 138 ++++++++++++++++++++++++++++-- 1 file changed, 132 insertions(+), 6 deletions(-) diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index 6575ccf4e..4cd529a06 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -1,6 +1,7 @@ package mempool import ( + "sort" "testing" "github.com/CityOfZion/neo-go/pkg/core/transaction" @@ -35,7 +36,7 @@ func (fs *FeerStub) SystemFee(*transaction.Transaction) util.Fixed8 { func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { mp := NewMemPool(10) - tx := newMinerTX() + tx := newMinerTX(0) _, ok := mp.TryGetValue(tx.Hash()) require.Equal(t, false, ok) require.NoError(t, mp.Add(tx, fs)) @@ -61,19 +62,19 @@ func TestMemPoolAddRemove(t *testing.T) { func TestMemPoolVerify(t *testing.T) { mp := NewMemPool(10) - tx := newMinerTX() + tx := newMinerTX(1) inhash1 := random.Uint256() tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx)) require.NoError(t, mp.Add(tx, &FeerStub{})) - tx2 := newMinerTX() + tx2 := newMinerTX(2) inhash2 := random.Uint256() tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx2)) require.NoError(t, mp.Add(tx2, &FeerStub{})) - tx3 := newMinerTX() + tx3 := newMinerTX(3) // Different index number, but the same PrevHash as in tx1. tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 1}) require.Equal(t, true, mp.Verify(tx3)) @@ -83,9 +84,134 @@ func TestMemPoolVerify(t *testing.T) { require.Error(t, mp.Add(tx3, &FeerStub{})) } -func newMinerTX() *transaction.Transaction { +func newMinerTX(i uint32) *transaction.Transaction { return &transaction.Transaction{ Type: transaction.MinerType, - Data: &transaction.MinerTX{}, + Data: &transaction.MinerTX{Nonce: i}, + } +} + +func TestOverCapacity(t *testing.T) { + var fs = &FeerStub{lowPriority: true} + const mempoolSize = 10 + mp := NewMemPool(mempoolSize) + + for i := 0; i < mempoolSize; i++ { + tx := newMinerTX(uint32(i)) + require.NoError(t, mp.Add(tx, fs)) + } + txcnt := uint32(mempoolSize) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + + // Claim TX has more priority than ordinary lowprio, so it should easily + // fit into the pool. + claim := &transaction.Transaction{ + Type: transaction.ClaimType, + Data: &transaction.ClaimTX{}, + } + require.NoError(t, mp.Add(claim, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + + // Fees are also prioritized. + fs.netFee = util.Fixed8FromFloat(0.0001) + for i := 0; i < mempoolSize-1; i++ { + tx := newMinerTX(txcnt) + txcnt++ + require.NoError(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + } + // Less prioritized txes are not allowed anymore. + fs.netFee = util.Fixed8FromFloat(0.00001) + tx := newMinerTX(txcnt) + txcnt++ + require.Error(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + + // But claim tx should still be there. + require.True(t, mp.ContainsKey(claim.Hash())) + + // Low net fee, but higher per-byte fee is still a better combination. + fs.perByteFee = util.Fixed8FromFloat(0.001) + tx = newMinerTX(txcnt) + txcnt++ + require.NoError(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + + // High priority always wins over low priority. + fs.lowPriority = false + for i := 0; i < mempoolSize; i++ { + tx := newMinerTX(txcnt) + txcnt++ + require.NoError(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + } + // Good luck with low priority now. + fs.lowPriority = true + tx = newMinerTX(txcnt) + require.Error(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) +} + +func TestGetVerified(t *testing.T) { + var fs = &FeerStub{lowPriority: true} + const mempoolSize = 10 + mp := NewMemPool(mempoolSize) + + txes := make([]*transaction.Transaction, 0, mempoolSize) + for i := 0; i < mempoolSize; i++ { + tx := newMinerTX(uint32(i)) + txes = append(txes, tx) + require.NoError(t, mp.Add(tx, fs)) + } + require.Equal(t, mempoolSize, mp.Count()) + verTxes := mp.GetVerifiedTransactions() + require.Equal(t, mempoolSize, len(verTxes)) + for _, tx := range verTxes { + require.Contains(t, txes, tx) + } + for _, tx := range txes { + mp.Remove(tx.Hash()) + } + verTxes = mp.GetVerifiedTransactions() + require.Equal(t, 0, len(verTxes)) +} + +func TestRemoveStale(t *testing.T) { + var fs = &FeerStub{lowPriority: true} + const mempoolSize = 10 + mp := NewMemPool(mempoolSize) + + txes1 := make([]*transaction.Transaction, 0, mempoolSize/2) + txes2 := make([]*transaction.Transaction, 0, mempoolSize/2) + for i := 0; i < mempoolSize; i++ { + tx := newMinerTX(uint32(i)) + if i%2 == 0 { + txes1 = append(txes1, tx) + } else { + txes2 = append(txes2, tx) + } + require.NoError(t, mp.Add(tx, fs)) + } + require.Equal(t, mempoolSize, mp.Count()) + mp.RemoveStale(func(t *transaction.Transaction) bool { + for _, tx := range txes2 { + if tx == t { + return true + } + } + return false + }) + require.Equal(t, mempoolSize/2, mp.Count()) + verTxes := mp.GetVerifiedTransactions() + for _, tx := range verTxes { + require.NotContains(t, txes1, tx) + require.Contains(t, txes2, tx) } } From 18695e660b84a23aa612cb233d0c893db1398615 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 6 Feb 2020 17:38:59 +0300 Subject: [PATCH 17/18] mempool: drop RemoveOverCapacity(), handle it right in the Add() Simplifies things a lot and removes useless code. --- pkg/core/mempool/mem_pool.go | 44 ++++++++++++++---------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 684464fce..a2f3ef9af 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -150,23 +150,27 @@ func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { n := sort.Search(len(mp.verifiedTxes), func(n int) bool { return pItem.CompareTo(mp.verifiedTxes[n]) > 0 }) - mp.verifiedTxes = append(mp.verifiedTxes, pItem) - if n != len(mp.verifiedTxes) { + + // We've reached our capacity already. + if len(mp.verifiedTxes) == mp.capacity { + // Less prioritized than the least prioritized we already have, won't fit. + if n == len(mp.verifiedTxes) { + mp.lock.Unlock() + return ErrOOM + } + // Ditch the last one. + unlucky := mp.verifiedTxes[len(mp.verifiedTxes)-1] + delete(mp.verifiedMap, unlucky.txn.Hash()) + mp.verifiedTxes[len(mp.verifiedTxes)-1] = pItem + } else { + mp.verifiedTxes = append(mp.verifiedTxes, pItem) + } + if n != len(mp.verifiedTxes)-1 { copy(mp.verifiedTxes[n+1:], mp.verifiedTxes[n:]) mp.verifiedTxes[n] = pItem } - mp.lock.Unlock() - - if mp.Count() > mp.capacity { - mp.RemoveOverCapacity() - } - mp.lock.RLock() - _, ok := mp.verifiedMap[t.Hash()] updateMempoolMetrics(len(mp.verifiedTxes)) - mp.lock.RUnlock() - if !ok { - return ErrOOM - } + mp.lock.Unlock() return nil } @@ -211,20 +215,6 @@ func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) { mp.lock.Unlock() } -// RemoveOverCapacity removes transactions with lowest fees until the total number of transactions -// in the Pool is within the capacity of the Pool. -func (mp *Pool) RemoveOverCapacity() { - mp.lock.Lock() - num := mp.count() - mp.capacity - for i := 1; i <= num; i++ { - txToDrop := mp.verifiedTxes[len(mp.verifiedTxes)-num].txn - delete(mp.verifiedMap, txToDrop.Hash()) - } - mp.verifiedTxes = mp.verifiedTxes[:len(mp.verifiedTxes)-num] - updateMempoolMetrics(mp.count()) - mp.lock.Unlock() -} - // NewMemPool returns a new Pool struct. func NewMemPool(capacity int) Pool { return Pool{ From 7445655437f0749a5818ae48888aadc58363e6ae Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 6 Feb 2020 17:49:05 +0300 Subject: [PATCH 18/18] consensus: switch test to using the new PoolTx API Fixes GolangCI: Error return value of (*github.com/CityOfZion/neo-go/pkg/core/mempool.Pool).Add is not checked (from errcheck) and allows us to almost completely forget about mempool here. --- pkg/consensus/consensus_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 9db01b198..8fc19be22 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -21,7 +21,7 @@ func TestNewService(t *testing.T) { Type: transaction.MinerType, Data: &transaction.MinerTX{Nonce: 12345}, } - srv.Chain.GetMemPool().Add(tx, new(feer)) + require.NoError(t, srv.Chain.PoolTx(tx)) var txx []block.Transaction require.NotPanics(t, func() { txx = srv.getVerifiedTx(1) }) @@ -38,9 +38,7 @@ func TestService_GetVerified(t *testing.T) { newMinerTx(3), newMinerTx(4), } - pool := srv.Chain.GetMemPool() - - require.NoError(t, pool.Add(txs[3], new(feer))) + require.NoError(t, srv.Chain.PoolTx(txs[3])) hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()} @@ -65,7 +63,7 @@ func TestService_GetVerified(t *testing.T) { t.Run("more than half of the last proposal will be reused", func(t *testing.T) { for _, tx := range txs[:2] { - require.NoError(t, pool.Add(tx, new(feer))) + require.NoError(t, srv.Chain.PoolTx(tx)) } txx := srv.getVerifiedTx(10) @@ -115,7 +113,7 @@ func TestService_getTx(t *testing.T) { require.Equal(t, nil, srv.getTx(h)) - srv.Chain.GetMemPool().Add(tx, new(feer)) + require.NoError(t, srv.Chain.PoolTx(tx)) got := srv.getTx(h) require.NotNil(t, got)