diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 2d642203a..654f86ade 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -43,7 +43,7 @@ func TestService_GetVerified(t *testing.T) { pool := srv.Chain.GetMemPool() item := mempool.NewPoolItem(txs[3], new(feer)) - require.True(t, pool.TryAdd(txs[3].Hash(), item)) + require.NoError(t, pool.TryAdd(txs[3].Hash(), item)) hashes := []util.Uint256{txs[0].Hash(), txs[1].Hash(), txs[2].Hash()} @@ -69,7 +69,7 @@ func TestService_GetVerified(t *testing.T) { t.Run("more than half of the last proposal will be reused", func(t *testing.T) { for _, tx := range txs[:2] { item := mempool.NewPoolItem(tx, new(feer)) - require.True(t, pool.TryAdd(tx.Hash(), item)) + require.NoError(t, pool.TryAdd(tx.Hash(), item)) } txx := srv.getVerifiedTx(10) diff --git a/pkg/core/blockchain.go b/pkg/core/blockchain.go index 6c9b1dffa..48491edd6 100644 --- a/pkg/core/blockchain.go +++ b/pkg/core/blockchain.go @@ -7,6 +7,7 @@ import ( "math/big" "sort" "strconv" + "sync" "sync/atomic" "time" @@ -41,6 +42,15 @@ const ( defaultMemPoolSize = 50000 ) +var ( + // ErrAlreadyExists is returned when trying to add some already existing + // transaction into the pool (not specifying whether it exists in the + // chain or mempool). + ErrAlreadyExists = errors.New("already exists") + // ErrOOM is returned when adding transaction to the memory pool because + // it reached its full capacity. + ErrOOM = errors.New("no space left in the memory pool") +) var ( genAmount = []int{8, 7, 6, 5, 4, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} decrementInterval = 2000000 @@ -51,6 +61,19 @@ var ( type Blockchain struct { config config.ProtocolConfiguration + // The only way chain state changes is by adding blocks, so we can't + // allow concurrent block additions. It differs from the next lock in + // that it's only for AddBlock method itself, the chain state is + // protected by the lock below, but holding it during all of AddBlock + // is too expensive (because the state only changes when persisting + // change cache). + addLock sync.Mutex + + // This lock ensures blockchain immutability for operations that need + // that while performing their tasks. It's mostly used as a read lock + // with the only writer being the block addition logic. + lock sync.RWMutex + // Data access object for CRUD operations around storage. dao *dao @@ -251,6 +274,9 @@ func (bc *Blockchain) Close() { // AddBlock accepts successive block for the Blockchain, verifies it and // stores internally. Eventually it will be persisted to the backing storage. func (bc *Blockchain) AddBlock(block *block.Block) error { + bc.addLock.Lock() + defer bc.addLock.Unlock() + expectedHeight := bc.BlockHeight() + 1 if expectedHeight != block.Index { return fmt.Errorf("expected block %d, but passed block %d", expectedHeight, block.Index) @@ -575,6 +601,9 @@ func (bc *Blockchain) storeBlock(block *block.Block) error { } } } + bc.lock.Lock() + defer bc.lock.Unlock() + _, err := cache.Persist() if err != nil { return err @@ -982,11 +1011,8 @@ func (bc *Blockchain) VerifyBlock(block *block.Block) error { return bc.verifyBlockWitnesses(block, prevHeader) } -// VerifyTx verifies whether a transaction is bonafide or not. Block parameter -// is used for easy interop access and can be omitted for transactions that are -// not yet added into any block. -// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270). -func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error { +// verifyTx verifies whether a transaction is bonafide or not. +func (bc *Blockchain) verifyTx(t *transaction.Transaction, block *block.Block) error { if io.GetVarSize(t) > transaction.MaxTransactionSize { return errors.Errorf("invalid transaction size = %d. It shoud be less then MaxTransactionSize = %d", io.GetVarSize(t), transaction.MaxTransactionSize) } @@ -1017,6 +1043,40 @@ func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) e return bc.verifyTxWitnesses(t, block) } +// VerifyTx verifies whether a transaction is bonafide or not. Block parameter +// is used for easy interop access and can be omitted for transactions that are +// not yet added into any block. +// Golang implementation of Verify method in C# (https://github.com/neo-project/neo/blob/master/neo/Network/P2P/Payloads/Transaction.cs#L270). +func (bc *Blockchain) VerifyTx(t *transaction.Transaction, block *block.Block) error { + bc.lock.RLock() + defer bc.lock.RUnlock() + return bc.verifyTx(t, block) +} + +// PoolTx verifies and tries to add given transaction into the mempool. +func (bc *Blockchain) PoolTx(t *transaction.Transaction) error { + bc.lock.RLock() + defer bc.lock.RUnlock() + + if bc.HasTransaction(t.Hash()) { + return ErrAlreadyExists + } + if err := bc.verifyTx(t, nil); err != nil { + return err + } + if err := bc.memPool.TryAdd(t.Hash(), mempool.NewPoolItem(t, bc)); err != nil { + switch err { + case mempool.ErrOOM: + return ErrOOM + case mempool.ErrConflict: + return ErrAlreadyExists + default: + return err + } + } + return nil +} + func (bc *Blockchain) verifyInputs(t *transaction.Transaction) bool { for i := 1; i < len(t.Inputs); i++ { for j := 0; j < i; j++ { diff --git a/pkg/core/blockchainer.go b/pkg/core/blockchainer.go index e3ef02698..79e6bc08f 100644 --- a/pkg/core/blockchainer.go +++ b/pkg/core/blockchainer.go @@ -40,6 +40,7 @@ type Blockchainer interface { GetUnspentCoinState(util.Uint256) *UnspentCoinState References(t *transaction.Transaction) map[transaction.Input]*transaction.Output mempool.Feer // fee interface + PoolTx(*transaction.Transaction) error VerifyTx(*transaction.Transaction, *block.Block) error GetMemPool() *mempool.Pool } diff --git a/pkg/core/mempool/mem_pool.go b/pkg/core/mempool/mem_pool.go index d5fc5a9d7..07ef64c76 100644 --- a/pkg/core/mempool/mem_pool.go +++ b/pkg/core/mempool/mem_pool.go @@ -1,6 +1,7 @@ package mempool import ( + "errors" "sort" "sync" "time" @@ -9,6 +10,19 @@ import ( "github.com/CityOfZion/neo-go/pkg/util" ) +var ( + // ErrConflict is returned when transaction being added is incompatible + // with the contents of the memory pool (using the same inputs as some + // other transaction in the pool) + ErrConflict = errors.New("conflicts with the memory pool") + // ErrDup is returned when transaction being added is already present + // in the memory pool. + ErrDup = errors.New("already in the memory pool") + // ErrOOM is returned when transaction just doesn't fit in the memory + // pool because of its capacity constraints. + ErrOOM = errors.New("out of memory") +) + // Item represents a transaction in the the Memory pool. type Item struct { txn *transaction.Transaction @@ -101,13 +115,17 @@ func (mp *Pool) ContainsKey(hash util.Uint256) bool { } // TryAdd try to add the Item to the Pool. -func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) bool { +func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) error { var pool Items mp.lock.Lock() + if !mp.verifyInputs(pItem.txn) { + mp.lock.Unlock() + return ErrConflict + } if _, ok := mp.unsortedTxn[hash]; ok { mp.lock.Unlock() - return false + return ErrDup } mp.unsortedTxn[hash] = pItem mp.lock.Unlock() @@ -130,7 +148,10 @@ func (mp *Pool) TryAdd(hash util.Uint256, pItem *Item) bool { _, ok := mp.unsortedTxn[hash] updateMempoolMetrics(len(mp.unsortedTxn), len(mp.unverifiedTxn)) mp.lock.RUnlock() - return ok + if !ok { + return ErrOOM + } + return nil } // Remove removes an item from the mempool, if it exists there (and does @@ -283,12 +304,8 @@ func (mp *Pool) GetVerifiedTransactions() []*transaction.Transaction { return t } -// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool. -// If yes, the transaction tx is not a valid transaction and the function return false. -// If no, the transaction tx is a valid transaction and the function return true. -func (mp *Pool) Verify(tx *transaction.Transaction) bool { - mp.lock.RLock() - defer mp.lock.RUnlock() +// verifyInputs is an internal unprotected version of Verify. +func (mp *Pool) verifyInputs(tx *transaction.Transaction) bool { for _, item := range mp.unsortedTxn { for i := range item.txn.Inputs { for j := 0; j < len(tx.Inputs); j++ { @@ -301,3 +318,12 @@ func (mp *Pool) Verify(tx *transaction.Transaction) bool { return true } + +// Verify verifies if the inputs of a transaction tx are already used in any other transaction in the memory pool. +// If yes, the transaction tx is not a valid transaction and the function return false. +// If no, the transaction tx is a valid transaction and the function return true. +func (mp *Pool) Verify(tx *transaction.Transaction) bool { + mp.lock.RLock() + defer mp.lock.RUnlock() + return mp.verifyInputs(tx) +} diff --git a/pkg/core/mempool/mem_pool_test.go b/pkg/core/mempool/mem_pool_test.go index 6da40043b..db6a5b25c 100644 --- a/pkg/core/mempool/mem_pool_test.go +++ b/pkg/core/mempool/mem_pool_test.go @@ -39,9 +39,9 @@ func testMemPoolAddRemoveWithFeer(t *testing.T, fs Feer) { item := NewPoolItem(tx, fs) _, ok := mp.TryGetValue(tx.Hash()) require.Equal(t, false, ok) - require.Equal(t, true, mp.TryAdd(tx.Hash(), item)) + require.NoError(t, mp.TryAdd(tx.Hash(), item)) // Re-adding should fail. - require.Equal(t, false, mp.TryAdd(tx.Hash(), item)) + require.Error(t, mp.TryAdd(tx.Hash(), item)) tx2, ok := mp.TryGetValue(tx.Hash()) require.Equal(t, true, ok) require.Equal(t, tx, tx2) @@ -71,14 +71,14 @@ func TestMemPoolVerify(t *testing.T) { tx.Inputs = append(tx.Inputs, transaction.Input{PrevHash: inhash1, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx)) item := NewPoolItem(tx, &FeerStub{}) - require.Equal(t, true, mp.TryAdd(tx.Hash(), item)) + require.NoError(t, mp.TryAdd(tx.Hash(), item)) tx2 := newMinerTX() inhash2 := random.Uint256() tx2.Inputs = append(tx2.Inputs, transaction.Input{PrevHash: inhash2, PrevIndex: 0}) require.Equal(t, true, mp.Verify(tx2)) item = NewPoolItem(tx2, &FeerStub{}) - require.Equal(t, true, mp.TryAdd(tx2.Hash(), item)) + require.NoError(t, mp.TryAdd(tx2.Hash(), item)) tx3 := newMinerTX() // Different index number, but the same PrevHash as in tx1. diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 00ea04ac6..1ab191d87 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -126,6 +126,10 @@ func (chain testChain) IsLowPriority(*transaction.Transaction) bool { panic("TODO") } +func (chain testChain) PoolTx(*transaction.Transaction) error { + panic("TODO") +} + func (chain testChain) VerifyTx(*transaction.Transaction, *block.Block) error { panic("TODO") } diff --git a/pkg/network/server.go b/pkg/network/server.go index c62d7f34c..681932e61 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -13,7 +13,6 @@ import ( "github.com/CityOfZion/neo-go/pkg/consensus" "github.com/CityOfZion/neo-go/pkg/core" "github.com/CityOfZion/neo-go/pkg/core/block" - "github.com/CityOfZion/neo-go/pkg/core/mempool" "github.com/CityOfZion/neo-go/pkg/core/transaction" "github.com/CityOfZion/neo-go/pkg/network/payload" "github.com/CityOfZion/neo-go/pkg/util" @@ -743,17 +742,18 @@ func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason { if t.Type == transaction.MinerType { return RelayInvalid } - if s.chain.HasTransaction(t.Hash()) { - return RelayAlreadyExists - } - if err := s.chain.VerifyTx(t, nil); err != nil { - return RelayInvalid - } // TODO: Implement Plugin.CheckPolicy? //if (!Plugin.CheckPolicy(transaction)) // return RelayResultReason.PolicyFail; - if ok := s.chain.GetMemPool().TryAdd(t.Hash(), mempool.NewPoolItem(t, s.chain)); !ok { - return RelayOutOfMemory + if err := s.chain.PoolTx(t); err != nil { + switch err { + case core.ErrAlreadyExists: + return RelayAlreadyExists + case core.ErrOOM: + return RelayOutOfMemory + default: + return RelayInvalid + } } return RelaySucceed }