diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 96847d904..8b9d686f1 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -335,7 +335,11 @@ func (s *service) processBlock(b block.Block) { bb.Script = *(s.getBlockWitness(bb)) if err := s.Chain.AddBlock(bb); err != nil { - s.log.Warn("error on add block", zap.Error(err)) + // The block might already be added via the regular network + // interaction. + if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { + s.log.Warn("error on add block", zap.Error(err)) + } } else { s.Config.RelayBlock(bb) } diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 2d642203a..8fc19be22 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -5,7 +5,6 @@ import ( "github.com/CityOfZion/neo-go/config" "github.com/CityOfZion/neo-go/pkg/core" - "github.com/CityOfZion/neo-go/pkg/core/mempool" "github.com/CityOfZion/neo-go/pkg/core/storage" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/crypto/keys" @@ -22,8 +21,7 @@ func TestNewService(t *testing.T) { Type: transaction.MinerType, Data: &transaction.MinerTX{Nonce: 12345}, } - item := mempool.NewPoolItem(tx, new(feer)) - srv.Chain.GetMemPool().TryAdd(tx.Hash(), item) + require.NoError(t, srv.Chain.PoolTx(tx)) var txx []block.Transaction require.NotPanics(t, func() { txx = srv.getVerifiedTx(1) }) @@ -40,10 +38,7 @@ func TestService_GetVerified(t *testing.T) { newMinerTx(3), newMinerTx(4), } - pool := srv.Chain.GetMemPool() - item := mempool.NewPoolItem(txs[3], new(feer)) - - require.True(t, pool.TryAdd(txs[3].Hash(), item)) + require.NoError(t, srv.Chain.PoolTx(txs[3])) hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()} @@ -68,8 +63,7 @@ func TestService_GetVerified(t *testing.T) { t.Run("more than half of the last proposal will be reused", func(t *testing.T) { for _, tx := range txs[:2] { - item := mempool.NewPoolItem(tx, new(feer)) - require.True(t, pool.TryAdd(tx.Hash(), item)) + require.NoError(t, srv.Chain.PoolTx(tx)) } txx := srv.getVerifiedTx(10) @@ -119,8 +113,7 @@ func TestService_getTx(t *testing.T) { require.Equal(t, nil, srv.getTx(h)) - item := mempool.NewPoolItem(tx, new(feer)) - srv.Chain.GetMemPool().TryAdd(h, item) + require.NoError(t, srv.Chain.PoolTx(tx)) got := srv.getTx(h) require.NotNil(t, got) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 285a0eb98..0a22cf76b 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -7,6 +7,7 @@ import ( "math/big" "sort" "strconv" + "sync" "sync/atomic" "time" @@ -41,6 +42,15 @@ const ( defaultMemPoolSize = 50000 ) +var ( + // ErrAlreadyExists is returned when trying to add some already existing + // transaction into the pool (not specifying whether it exists in the + // chain or mempool). + ErrAlreadyExists = errors.New("already exists") + // ErrOOM is returned when adding transaction to the memory pool because + // it reached its full capacity. + ErrOOM = errors.New("no space left in the memory pool") +) var ( genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} decrementInterval = 2000000 @@ -51,6 +61,19 @@ var ( type Blockchain struct { config config.ProtocolConfiguration + // The only way chain state changes is by adding blocks, so we can't + // allow concurrent block additions. It differs from the next lock in + // that it's only for AddBlock method itself, the chain state is + // protected by the lock below, but holding it during all of AddBlock + // is too expensive (because the state only changes when persisting + // change cache). + addLock sync.Mutex + + // This lock ensures blockchain immutability for operations that need + // that while performing their tasks. It's mostly used as a read lock + // with the only writer being the block addition logic. + lock sync.RWMutex + // Data access object for CRUD operations around storage. dao *dao @@ -251,6 +274,9 @@ func (bc *Blockchain) Close() { // AddBlock accepts successive block for the Blockchain, verifies it and // stores internally. Eventually it will be persisted to the backing storage. func (bc *Blockchain) AddBlock(block *block.Block) error { + bc.addLock.Lock() + defer bc.addLock.Unlock() + expectedHeight := bc.BlockHeight() + 1 if expectedHeight != block.Index { return fmt.Errorf("expected block %d, but passed block %d", expectedHeight, block.Index) @@ -575,6 +601,9 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { } } } + bc.lock.Lock() + defer bc.lock.Unlock() + _, err := cache.Persist() if err != nil { return err @@ -582,9 +611,7 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { bc.topBlock.Store(block) atomic.StoreUint32(&bc.blockHeight, block.Index) updateBlockHeightMetric(block.Index) - for _, tx := range block.Transactions { - bc.memPool.Remove(tx.Hash()) - } + bc.memPool.RemoveStale(bc.isTxStillRelevant) return nil } @@ -963,8 +990,8 @@ func (bc *Blockchain) IsLowPriority(t *transaction.Transaction) bool { } // GetMemPool returns the memory pool of the blockchain. -func (bc *Blockchain) GetMemPool() mempool.Pool { - return bc.memPool +func (bc *Blockchain) GetMemPool() *mempool.Pool { + return &bc.memPool } // VerifyBlock verifies block against its current state. @@ -982,11 +1009,8 @@ func (bc *Blockchain) VerifyBlock(block *block.Block) error { return bc.verifyBlockWitnesses(block, prevHeader) } -// VerifyTx verifies whether a transaction is bonafide or not. Block parameter -// is used for easy interop access and can be omitted for transactions that are -// not yet added into any block. -// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270). -func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error { +// verifyTx verifies whether a transaction is bonafide or not. +func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) error { if io.GetVarSize(t) > transaction.MaxTransactionSize { return errors.Errorf("invalid transaction size = %d. It shoud be less then MaxTransactionSize = %d", io.GetVarSize(t), transaction.MaxTransactionSize) } @@ -1017,6 +1041,69 @@ func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) e return bc.verifyTxWitnesses(t, block) } +// isTxStillRelevant is a callback for mempool transaction filtering after the +// new block addition. It returns false for transactions already present in the +// chain (added by the new block), transactions using some inputs that are +// already used (double spends) and does witness reverification for non-standard +// contracts. It operates under the assumption that full transaction verification +// was already done so we don't need to check basic things like size, input/output +// correctness, etc. +func (bc *Blockchain) isTxStillRelevant(t *transaction.Transaction) bool { + var recheckWitness bool + + if bc.dao.HasTransaction(t.Hash()) { + return false + } + if bc.dao.IsDoubleSpend(t) { + return false + } + for i := range t.Scripts { + if !vm.IsStandardContract(t.Scripts[i].VerificationScript) { + recheckWitness = true + break + } + } + if recheckWitness { + return bc.verifyTxWitnesses(t, nil) == nil + } + return true + +} + +// VerifyTx verifies whether a transaction is bonafide or not. Block parameter +// is used for easy interop access and can be omitted for transactions that are +// not yet added into any block. +// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270). +func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error { + bc.lock.RLock() + defer bc.lock.RUnlock() + return bc.verifyTx(t, block) +} + +// PoolTx verifies and tries to add given transaction into the mempool. +func (bc *Blockchain) PoolTx(t *transaction.Transaction) error { + bc.lock.RLock() + defer bc.lock.RUnlock() + + if bc.HasTransaction(t.Hash()) { + return ErrAlreadyExists + } + if err := bc.verifyTx(t, nil); err != nil { + return err + } + if err := bc.memPool.Add(t, bc); err != nil { + switch err { + case mempool.ErrOOM: + return ErrOOM + case mempool.ErrConflict: + return ErrAlreadyExists + default: + return err + } + } + return nil +} + func (bc *Blockchain) verifyInputs(t *transaction.Transaction) bool { for i := 1; i < len(t.Inputs); i++ { for j := 0; j < i; j++ { diff --git a/pkg/core/blockchainer.go b/pkg/core/blockchainer.go index 34b6d70cf..79e6bc08f 100644 --- a/pkg/core/blockchainer.go +++ b/pkg/core/blockchainer.go @@ -40,6 +40,7 @@ type Blockchainer interface { GetUnspentCoinState(util.Uint256) *UnspentCoinState References(t *transaction.Transaction) map[transaction.Input]*transaction.Output mempool.Feer // fee interface + PoolTx(*transaction.Transaction) error VerifyTx(*transaction.Transaction, *block.Block) error - GetMemPool() mempool.Pool + GetMemPool() *mempool.Pool } diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index 444f2e36e..a2f3ef9af 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -1,6 +1,7 @@ package mempool import ( + "errors" "sort" "sync" "time" @@ -9,43 +10,62 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" ) -// Item represents a transaction in the the Memory pool. -type Item struct { - txn *transaction.Transaction - timeStamp time.Time - fee Feer +var ( + // ErrConflict is returned when transaction being added is incompatible + // with the contents of the memory pool (using the same inputs as some + // other transaction in the pool) + ErrConflict = errors.New("conflicts with the memory pool") + // ErrDup is returned when transaction being added is already present + // in the memory pool. + ErrDup = errors.New("already in the memory pool") + // ErrOOM is returned when transaction just doesn't fit in the memory + // pool because of its capacity constraints. + ErrOOM = errors.New("out of memory") +) + +// item represents a transaction in the the Memory pool. +type item struct { + txn *transaction.Transaction + timeStamp time.Time + perByteFee util.Fixed8 + netFee util.Fixed8 + isLowPrio bool } -// Items is a slice of Item. -type Items []*Item +// items is a slice of item. +type items []*item // Pool stores the unconfirms transactions. type Pool struct { - lock *sync.RWMutex - unsortedTxn map[util.Uint256]*Item - unverifiedTxn map[util.Uint256]*Item - sortedHighPrioTxn Items - sortedLowPrioTxn Items - unverifiedSortedHighPrioTxn Items - unverifiedSortedLowPrioTxn Items + lock sync.RWMutex + verifiedMap map[util.Uint256]*item + verifiedTxes items capacity int } -func (p Items) Len() int { return len(p) } -func (p Items) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (p Items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 } +func (p items) Len() int { return len(p) } +func (p items) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p items) Less(i, j int) bool { return p[i].CompareTo(p[j]) < 0 } -// CompareTo returns the difference between two Items. +// CompareTo returns the difference between two items. // difference < 0 implies p < otherP. // difference = 0 implies p = otherP. // difference > 0 implies p > otherP. -func (p Item) CompareTo(otherP *Item) int { +func (p *item) CompareTo(otherP *item) int { if otherP == nil { return 1 } - if p.fee.IsLowPriority(p.txn) && p.fee.IsLowPriority(otherP.txn) { + if !p.isLowPrio && otherP.isLowPrio { + return 1 + } + + if p.isLowPrio && !otherP.isLowPrio { + return -1 + } + + if p.isLowPrio && otherP.isLowPrio { thisIsClaimTx := p.txn.Type == transaction.ClaimType otherIsClaimTx := otherP.txn.Type == transaction.ClaimType @@ -60,15 +80,11 @@ func (p Item) CompareTo(otherP *Item) int { } // Fees sorted ascending. - pFPB := p.fee.FeePerByte(p.txn) - otherFPB := p.fee.FeePerByte(otherP.txn) - if ret := pFPB.CompareTo(otherFPB); ret != 0 { + if ret := p.perByteFee.CompareTo(otherP.perByteFee); ret != 0 { return ret } - pNF := p.fee.NetworkFee(p.txn) - otherNF := p.fee.NetworkFee(otherP.txn) - if ret := pNF.CompareTo(otherNF); ret != 0 { + if ret := p.netFee.CompareTo(otherP.netFee); ret != 0 { return ret } @@ -77,205 +93,158 @@ func (p Item) CompareTo(otherP *Item) int { } // Count returns the total number of uncofirm transactions. -func (mp Pool) Count() int { +func (mp *Pool) Count() int { mp.lock.RLock() defer mp.lock.RUnlock() + return mp.count() +} - return len(mp.unsortedTxn) + len(mp.unverifiedTxn) +// count is an internal unlocked version of Count. +func (mp *Pool) count() int { + return len(mp.verifiedTxes) } // ContainsKey checks if a transactions hash is in the Pool. -func (mp Pool) ContainsKey(hash util.Uint256) bool { +func (mp *Pool) ContainsKey(hash util.Uint256) bool { mp.lock.RLock() defer mp.lock.RUnlock() - if _, ok := mp.unsortedTxn[hash]; ok { - return true - } + return mp.containsKey(hash) +} - if _, ok := mp.unverifiedTxn[hash]; ok { +// containsKey is an internal unlocked version of ContainsKey. +func (mp *Pool) containsKey(hash util.Uint256) bool { + if _, ok := mp.verifiedMap[hash]; ok { return true } return false } -// TryAdd try to add the Item to the Pool. -func (mp Pool) TryAdd(hash util.Uint256, pItem *Item) bool { - var pool Items - +// Add tries to add given transaction to the Pool. +func (mp *Pool) Add(t *transaction.Transaction, fee Feer) error { + var pItem = &item{ + txn: t, + timeStamp: time.Now().UTC(), + perByteFee: fee.FeePerByte(t), + netFee: fee.NetworkFee(t), + isLowPrio: fee.IsLowPriority(t), + } mp.lock.Lock() - if _, ok := mp.unsortedTxn[hash]; ok { + if !mp.verifyInputs(t) { mp.lock.Unlock() - return false + return ErrConflict + } + if mp.containsKey(t.Hash()) { + mp.lock.Unlock() + return ErrDup } - mp.unsortedTxn[hash] = pItem - mp.lock.Unlock() - if pItem.fee.IsLowPriority(pItem.txn) { - pool = mp.sortedLowPrioTxn + mp.verifiedMap[t.Hash()] = pItem + // Insert into sorted array (from max to min, that could also be done + // using sort.Sort(sort.Reverse()), but it incurs more overhead. Notice + // also that we're searching for position that is strictly more + // prioritized than our new item because we do expect a lot of + // transactions with the same priority and appending to the end of the + // slice is always more efficient. + n := sort.Search(len(mp.verifiedTxes), func(n int) bool { + return pItem.CompareTo(mp.verifiedTxes[n]) > 0 + }) + + // We've reached our capacity already. + if len(mp.verifiedTxes) == mp.capacity { + // Less prioritized than the least prioritized we already have, won't fit. + if n == len(mp.verifiedTxes) { + mp.lock.Unlock() + return ErrOOM + } + // Ditch the last one. + unlucky := mp.verifiedTxes[len(mp.verifiedTxes)-1] + delete(mp.verifiedMap, unlucky.txn.Hash()) + mp.verifiedTxes[len(mp.verifiedTxes)-1] = pItem } else { - pool = mp.sortedHighPrioTxn + mp.verifiedTxes = append(mp.verifiedTxes, pItem) } - - mp.lock.Lock() - pool = append(pool, pItem) - sort.Sort(pool) + if n != len(mp.verifiedTxes)-1 { + copy(mp.verifiedTxes[n+1:], mp.verifiedTxes[n:]) + mp.verifiedTxes[n] = pItem + } + updateMempoolMetrics(len(mp.verifiedTxes)) mp.lock.Unlock() - - if mp.Count() > mp.capacity { - (&mp).RemoveOverCapacity() - } - mp.lock.RLock() - _, ok := mp.unsortedTxn[hash] - updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) - mp.lock.RUnlock() - return ok + return nil } // Remove removes an item from the mempool, if it exists there (and does // nothing if it doesn't). func (mp *Pool) Remove(hash util.Uint256) { - var mapAndPools = []struct { - unsortedMap map[util.Uint256]*Item - sortedPools []*Items - }{ - {unsortedMap: mp.unsortedTxn, sortedPools: []*Items{&mp.sortedHighPrioTxn, &mp.sortedLowPrioTxn}}, - {unsortedMap: mp.unverifiedTxn, sortedPools: []*Items{&mp.unverifiedSortedHighPrioTxn, &mp.unverifiedSortedLowPrioTxn}}, - } mp.lock.Lock() - for _, mapAndPool := range mapAndPools { - if _, ok := mapAndPool.unsortedMap[hash]; ok { - delete(mapAndPool.unsortedMap, hash) - for _, pool := range mapAndPool.sortedPools { - var num int - var item *Item - for num, item = range *pool { - if hash.Equals(item.txn.Hash()) { - break - } - } - if num < len(*pool)-1 { - *pool = append((*pool)[:num], (*pool)[num+1:]...) - } else if num == len(*pool)-1 { - *pool = (*pool)[:num] - } + if _, ok := mp.verifiedMap[hash]; ok { + var num int + delete(mp.verifiedMap, hash) + for num := range mp.verifiedTxes { + if hash.Equals(mp.verifiedTxes[num].txn.Hash()) { + break } } + if num < len(mp.verifiedTxes)-1 { + mp.verifiedTxes = append(mp.verifiedTxes[:num], mp.verifiedTxes[num+1:]...) + } else if num == len(mp.verifiedTxes)-1 { + mp.verifiedTxes = mp.verifiedTxes[:num] + } } - updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) + updateMempoolMetrics(len(mp.verifiedTxes)) mp.lock.Unlock() } -// RemoveOverCapacity removes transactions with lowest fees until the total number of transactions -// in the Pool is within the capacity of the Pool. -func (mp *Pool) RemoveOverCapacity() { - for mp.Count()-mp.capacity > 0 { - mp.lock.Lock() - if minItem, argPosition := getLowestFeeTransaction(mp.sortedLowPrioTxn, mp.unverifiedSortedLowPrioTxn); minItem != nil { - if argPosition == 1 { - // minItem belongs to the mp.sortedLowPrioTxn slice. - // The corresponding unsorted pool is is mp.unsortedTxn. - delete(mp.unsortedTxn, minItem.txn.Hash()) - mp.sortedLowPrioTxn = append(mp.sortedLowPrioTxn[:0], mp.sortedLowPrioTxn[1:]...) - } else { - // minItem belongs to the mp.unverifiedSortedLowPrioTxn slice. - // The corresponding unsorted pool is is mp.unverifiedTxn. - delete(mp.unverifiedTxn, minItem.txn.Hash()) - mp.unverifiedSortedLowPrioTxn = append(mp.unverifiedSortedLowPrioTxn[:0], mp.unverifiedSortedLowPrioTxn[1:]...) - - } - } else if minItem, argPosition := getLowestFeeTransaction(mp.sortedHighPrioTxn, mp.unverifiedSortedHighPrioTxn); minItem != nil { - if argPosition == 1 { - // minItem belongs to the mp.sortedHighPrioTxn slice. - // The corresponding unsorted pool is is mp.unsortedTxn. - delete(mp.unsortedTxn, minItem.txn.Hash()) - mp.sortedHighPrioTxn = append(mp.sortedHighPrioTxn[:0], mp.sortedHighPrioTxn[1:]...) - } else { - // minItem belongs to the mp.unverifiedSortedHighPrioTxn slice. - // The corresponding unsorted pool is is mp.unverifiedTxn. - delete(mp.unverifiedTxn, minItem.txn.Hash()) - mp.unverifiedSortedHighPrioTxn = append(mp.unverifiedSortedHighPrioTxn[:0], mp.unverifiedSortedHighPrioTxn[1:]...) - - } +// RemoveStale filters verified transactions through the given function keeping +// only the transactions for which it returns a true result. It's used to quickly +// drop part of the mempool that is now invalid after the block acceptance. +func (mp *Pool) RemoveStale(isOK func(*transaction.Transaction) bool) { + mp.lock.Lock() + // We expect a lot of changes, so it's easier to allocate a new slice + // rather than move things in an old one. + newVerifiedTxes := make([]*item, 0, mp.capacity) + for _, itm := range mp.verifiedTxes { + if isOK(itm.txn) { + newVerifiedTxes = append(newVerifiedTxes, itm) + } else { + delete(mp.verifiedMap, itm.txn.Hash()) } - updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) - mp.lock.Unlock() - } - -} - -// NewPoolItem returns a new Item. -func NewPoolItem(t *transaction.Transaction, fee Feer) *Item { - return &Item{ - txn: t, - timeStamp: time.Now().UTC(), - fee: fee, } + mp.verifiedTxes = newVerifiedTxes + mp.lock.Unlock() } // NewMemPool returns a new Pool struct. func NewMemPool(capacity int) Pool { return Pool{ - lock: new(sync.RWMutex), - unsortedTxn: make(map[util.Uint256]*Item), - unverifiedTxn: make(map[util.Uint256]*Item), - capacity: capacity, + verifiedMap: make(map[util.Uint256]*item), + verifiedTxes: make([]*item, 0, capacity), + capacity: capacity, } } // TryGetValue returns a transaction if it exists in the memory pool. -func (mp Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) { +func (mp *Pool) TryGetValue(hash util.Uint256) (*transaction.Transaction, bool) { mp.lock.RLock() defer mp.lock.RUnlock() - if pItem, ok := mp.unsortedTxn[hash]; ok { - return pItem.txn, ok - } - - if pItem, ok := mp.unverifiedTxn[hash]; ok { + if pItem, ok := mp.verifiedMap[hash]; ok { return pItem.txn, ok } return nil, false } -// getLowestFeeTransaction returns the Item with the lowest fee amongst the "verifiedTxnSorted" -// and "unverifiedTxnSorted" Items along with a integer. The integer can assume two values, 1 and 2 which indicate -// that the Item with the lowest fee was found in "verifiedTxnSorted" respectively in "unverifiedTxnSorted". -// "verifiedTxnSorted" and "unverifiedTxnSorted" are sorted slice order by transaction fee ascending. This means that -// the transaction with lowest fee start at index 0. -// Reference: GetLowestFeeTransaction method in C# (https://github.com/neo-project/neo/blob/master/neo/Ledger/MemoryPool.cs) -func getLowestFeeTransaction(verifiedTxnSorted Items, unverifiedTxnSorted Items) (*Item, int) { - minItem := min(unverifiedTxnSorted) - verifiedMin := min(verifiedTxnSorted) - if verifiedMin == nil || (minItem != nil && verifiedMin.CompareTo(minItem) >= 0) { - return minItem, 2 - } - - minItem = verifiedMin - return minItem, 1 - -} - -// min returns the minimum item in a ascending sorted slice of pool items. -// The function can't be applied to unsorted slice! -func min(sortedPool Items) *Item { - if len(sortedPool) == 0 { - return nil - } - return sortedPool[0] -} - // GetVerifiedTransactions returns a slice of Input from all the transactions in the memory pool // whose hash is not included in excludedHashes. func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { mp.lock.RLock() defer mp.lock.RUnlock() - var t = make([]*transaction.Transaction, len(mp.unsortedTxn)) + var t = make([]*transaction.Transaction, len(mp.verifiedTxes)) var i int - for _, p := range mp.unsortedTxn { + for _, p := range mp.verifiedTxes { t[i] = p.txn i++ } @@ -283,16 +252,16 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { return t } -// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool. -// If yes, the transaction tx is not a valid transaction and the function return false. -// If no, the transaction tx is a valid transaction and the function return true. -func (mp Pool) Verify(tx *transaction.Transaction) bool { - mp.lock.RLock() - defer mp.lock.RUnlock() - for _, item := range mp.unsortedTxn { - for i := range item.txn.Inputs { +// verifyInputs is an internal unprotected version of Verify. +func (mp *Pool) verifyInputs(tx *transaction.Transaction) bool { + if len(tx.Inputs) == 0 { + return true + } + for num := range mp.verifiedTxes { + txn := mp.verifiedTxes[num].txn + for i := range txn.Inputs { for j := 0; j < len(tx.Inputs); j++ { - if item.txn.Inputs[i] == tx.Inputs[j] { + if txn.Inputs[i] == tx.Inputs[j] { return false } } @@ -301,3 +270,12 @@ func (mp Pool) Verify(tx *transaction.Transaction) bool { return true } + +// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool. +// If yes, the transaction tx is not a valid transaction and the function return false. +// If no, the transaction tx is a valid transaction and the function return true. +func (mp *Pool) Verify(tx *transaction.Transaction) bool { + mp.lock.RLock() + defer mp.lock.RUnlock() + return mp.verifyInputs(tx) +} diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index 6da40043b..4cd529a06 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -1,6 +1,7 @@ package mempool import ( + "sort" "testing" "github.com/CityOfZion/neo-go/pkg/core/transaction" @@ -35,13 +36,12 @@ func (fs *FeerStub) SystemFee(*transaction.Transaction) util.Fixed8 { func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { mp := NewMemPool(10) - tx := newMinerTX() - item := NewPoolItem(tx, fs) + tx := newMinerTX(0) _, ok := mp.TryGetValue(tx.Hash()) require.Equal(t, false, ok) - require.Equal(t, true, mp.TryAdd(tx.Hash(), item)) + require.NoError(t, mp.Add(tx, fs)) // Re-adding should fail. - require.Equal(t, false, mp.TryAdd(tx.Hash(), item)) + require.Error(t, mp.Add(tx, fs)) tx2, ok := mp.TryGetValue(tx.Hash()) require.Equal(t, true, ok) require.Equal(t, tx, tx2) @@ -49,12 +49,8 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { _, ok = mp.TryGetValue(tx.Hash()) require.Equal(t, false, ok) // Make sure nothing left in the mempool after removal. - assert.Equal(t, 0, len(mp.unsortedTxn)) - assert.Equal(t, 0, len(mp.unverifiedTxn)) - assert.Equal(t, 0, len(mp.sortedHighPrioTxn)) - assert.Equal(t, 0, len(mp.sortedLowPrioTxn)) - assert.Equal(t, 0, len(mp.unverifiedSortedHighPrioTxn)) - assert.Equal(t, 0, len(mp.unverifiedSortedLowPrioTxn)) + assert.Equal(t, 0, len(mp.verifiedMap)) + assert.Equal(t, 0, len(mp.verifiedTxes)) } func TestMemPoolAddRemove(t *testing.T) { @@ -66,32 +62,156 @@ func TestMemPoolAddRemove(t *testing.T) { func TestMemPoolVerify(t *testing.T) { mp := NewMemPool(10) - tx := newMinerTX() + tx := newMinerTX(1) inhash1 := random.Uint256() tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx)) - item := NewPoolItem(tx, &FeerStub{}) - require.Equal(t, true, mp.TryAdd(tx.Hash(), item)) + require.NoError(t, mp.Add(tx, &FeerStub{})) - tx2 := newMinerTX() + tx2 := newMinerTX(2) inhash2 := random.Uint256() tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx2)) - item = NewPoolItem(tx2, &FeerStub{}) - require.Equal(t, true, mp.TryAdd(tx2.Hash(), item)) + require.NoError(t, mp.Add(tx2, &FeerStub{})) - tx3 := newMinerTX() + tx3 := newMinerTX(3) // Different index number, but the same PrevHash as in tx1. tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 1}) require.Equal(t, true, mp.Verify(tx3)) // The same input as in tx2. tx3.Inputs = append(tx3.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) require.Equal(t, false, mp.Verify(tx3)) + require.Error(t, mp.Add(tx3, &FeerStub{})) } -func newMinerTX() *transaction.Transaction { +func newMinerTX(i uint32) *transaction.Transaction { return &transaction.Transaction{ Type: transaction.MinerType, - Data: &transaction.MinerTX{}, + Data: &transaction.MinerTX{Nonce: i}, + } +} + +func TestOverCapacity(t *testing.T) { + var fs = &FeerStub{lowPriority: true} + const mempoolSize = 10 + mp := NewMemPool(mempoolSize) + + for i := 0; i < mempoolSize; i++ { + tx := newMinerTX(uint32(i)) + require.NoError(t, mp.Add(tx, fs)) + } + txcnt := uint32(mempoolSize) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + + // Claim TX has more priority than ordinary lowprio, so it should easily + // fit into the pool. + claim := &transaction.Transaction{ + Type: transaction.ClaimType, + Data: &transaction.ClaimTX{}, + } + require.NoError(t, mp.Add(claim, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + + // Fees are also prioritized. + fs.netFee = util.Fixed8FromFloat(0.0001) + for i := 0; i < mempoolSize-1; i++ { + tx := newMinerTX(txcnt) + txcnt++ + require.NoError(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + } + // Less prioritized txes are not allowed anymore. + fs.netFee = util.Fixed8FromFloat(0.00001) + tx := newMinerTX(txcnt) + txcnt++ + require.Error(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + + // But claim tx should still be there. + require.True(t, mp.ContainsKey(claim.Hash())) + + // Low net fee, but higher per-byte fee is still a better combination. + fs.perByteFee = util.Fixed8FromFloat(0.001) + tx = newMinerTX(txcnt) + txcnt++ + require.NoError(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + + // High priority always wins over low priority. + fs.lowPriority = false + for i := 0; i < mempoolSize; i++ { + tx := newMinerTX(txcnt) + txcnt++ + require.NoError(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) + } + // Good luck with low priority now. + fs.lowPriority = true + tx = newMinerTX(txcnt) + require.Error(t, mp.Add(tx, fs)) + require.Equal(t, mempoolSize, mp.Count()) + require.Equal(t, true, sort.IsSorted(sort.Reverse(mp.verifiedTxes))) +} + +func TestGetVerified(t *testing.T) { + var fs = &FeerStub{lowPriority: true} + const mempoolSize = 10 + mp := NewMemPool(mempoolSize) + + txes := make([]*transaction.Transaction, 0, mempoolSize) + for i := 0; i < mempoolSize; i++ { + tx := newMinerTX(uint32(i)) + txes = append(txes, tx) + require.NoError(t, mp.Add(tx, fs)) + } + require.Equal(t, mempoolSize, mp.Count()) + verTxes := mp.GetVerifiedTransactions() + require.Equal(t, mempoolSize, len(verTxes)) + for _, tx := range verTxes { + require.Contains(t, txes, tx) + } + for _, tx := range txes { + mp.Remove(tx.Hash()) + } + verTxes = mp.GetVerifiedTransactions() + require.Equal(t, 0, len(verTxes)) +} + +func TestRemoveStale(t *testing.T) { + var fs = &FeerStub{lowPriority: true} + const mempoolSize = 10 + mp := NewMemPool(mempoolSize) + + txes1 := make([]*transaction.Transaction, 0, mempoolSize/2) + txes2 := make([]*transaction.Transaction, 0, mempoolSize/2) + for i := 0; i < mempoolSize; i++ { + tx := newMinerTX(uint32(i)) + if i%2 == 0 { + txes1 = append(txes1, tx) + } else { + txes2 = append(txes2, tx) + } + require.NoError(t, mp.Add(tx, fs)) + } + require.Equal(t, mempoolSize, mp.Count()) + mp.RemoveStale(func(t *transaction.Transaction) bool { + for _, tx := range txes2 { + if tx == t { + return true + } + } + return false + }) + require.Equal(t, mempoolSize/2, mp.Count()) + verTxes := mp.GetVerifiedTransactions() + for _, tx := range verTxes { + require.NotContains(t, txes1, tx) + require.Contains(t, txes2, tx) } } diff --git a/pkg/core/mempool/prometheus.go b/pkg/core/mempool/prometheus.go index 3d8a6a585..dbe073a0d 100644 --- a/pkg/core/mempool/prometheus.go +++ b/pkg/core/mempool/prometheus.go @@ -11,24 +11,14 @@ var ( Namespace: "neogo", }, ) - //mempoolUnverifiedTx prometheus metric. - mempoolUnverifiedTx = prometheus.NewGauge( - prometheus.GaugeOpts{ - Help: "Mempool Unverified TXs", - Name: "mempool_unverified_tx", - Namespace: "neogo", - }, - ) ) func init() { prometheus.MustRegister( mempoolUnsortedTx, - mempoolUnverifiedTx, ) } -func updateMempoolMetrics(unsortedTxnLen int, unverifiedTxnLen int) { +func updateMempoolMetrics(unsortedTxnLen int) { mempoolUnsortedTx.Set(float64(unsortedTxnLen)) - mempoolUnverifiedTx.Set(float64(unverifiedTxnLen)) } diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index e26d96629..2911ffced 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -12,9 +12,10 @@ type blockQueue struct { queue *queue.PriorityQueue checkBlocks chan struct{} chain core.Blockchainer + relayF func(*block.Block) } -func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQueue { +func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { if log == nil { return nil } @@ -24,6 +25,7 @@ func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQu queue: queue.NewPriorityQueue(capacity, false), checkBlocks: make(chan struct{}, 1), chain: bc, + relayF: relayer, } } @@ -45,10 +47,15 @@ func (bq *blockQueue) run() { if minblock.Index == bq.chain.BlockHeight()+1 { err := bq.chain.AddBlock(minblock) if err != nil { - bq.log.Warn("blockQueue: failed adding block into the blockchain", - zap.String("error", err.Error()), - zap.Uint32("blockHeight", bq.chain.BlockHeight()), - zap.Uint32("nextIndex", minblock.Index)) + // The block might already be added by consensus. + if _, errget := bq.chain.GetBlock(minblock.Hash()); errget != nil { + bq.log.Warn("blockQueue: failed adding block into the blockchain", + zap.String("error", err.Error()), + zap.Uint32("blockHeight", bq.chain.BlockHeight()), + zap.Uint32("nextIndex", minblock.Index)) + } + } else if bq.relayF != nil { + bq.relayF(minblock) } } } else { diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index 62a43595f..af3540096 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -12,7 +12,7 @@ import ( func TestBlockQueue(t *testing.T) { chain := &testChain{} // notice, it's not yet running - bq := newBlockQueue(0, chain, zaptest.NewLogger(t)) + bq := newBlockQueue(0, chain, zaptest.NewLogger(t), nil) blocks := make([]*block.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &block.Block{Base: block.Base{Index: uint32(i)}} diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 823d78525..1ab191d87 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -118,7 +118,7 @@ func (chain testChain) GetUnspentCoinState(util.Uint256) *core.UnspentCoinState panic("TODO") } -func (chain testChain) GetMemPool() mempool.Pool { +func (chain testChain) GetMemPool() *mempool.Pool { panic("TODO") } @@ -126,6 +126,10 @@ func (chain testChain) IsLowPriority(*transaction.Transaction) bool { panic("TODO") } +func (chain testChain) PoolTx(*transaction.Transaction) error { + panic("TODO") +} + func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error { panic("TODO") } diff --git a/pkg/network/server.go b/pkg/network/server.go index c62d7f34c..5a45ed393 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -13,7 +13,6 @@ import ( "github.com/CityOfZion/neo-go/pkg/consensus" "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core/block" - "github.com/CityOfZion/neo-go/pkg/core/mempool" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/util" @@ -91,7 +90,6 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* s := &Server{ ServerConfig: config, chain: chain, - bQueue: newBlockQueue(maxBlockBatch, chain, log), id: randomID(), quit: make(chan struct{}), register: make(chan Peer), @@ -100,6 +98,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* connected: atomic.NewBool(false), log: log, } + s.bQueue = newBlockQueue(maxBlockBatch, chain, log, s.relayBlock) srv, err := consensus.NewService(consensus.Config{ Logger: log, @@ -735,7 +734,11 @@ func (s *Server) broadcastHPMessage(msg *Message) { // relayBlock tells all the other connected nodes about the given block. func (s *Server) relayBlock(b *block.Block) { msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) - s.broadcastMessage(msg) + // Filter out nodes that are more current (avoid spamming the network + // during initial sync). + s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { + return p.Handshaked() && p.LastBlockIndex() < b.Index + }) } // verifyAndPoolTX verifies the TX and adds it to the local mempool. @@ -743,17 +746,18 @@ func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason { if t.Type == transaction.MinerType { return RelayInvalid } - if s.chain.HasTransaction(t.Hash()) { - return RelayAlreadyExists - } - if err := s.chain.VerifyTx(t, nil); err != nil { - return RelayInvalid - } // TODO: Implement Plugin.CheckPolicy? //if (!Plugin.CheckPolicy(transaction)) // return RelayResultReason.PolicyFail; - if ok := s.chain.GetMemPool().TryAdd(t.Hash(), mempool.NewPoolItem(t, s.chain)); !ok { - return RelayOutOfMemory + if err := s.chain.PoolTx(t); err != nil { + switch err { + case core.ErrAlreadyExists: + return RelayAlreadyExists + case core.ErrOOM: + return RelayOutOfMemory + default: + return RelayInvalid + } } return RelaySucceed }