Merge pull request #2755 from nspcc-dev/improve-networking

Improve P2P transaction handling
This commit is contained in:
Roman Khimov 2022-10-21 16:22:02 +07:00 committed by GitHub
commit bf4636f70a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 84 additions and 16 deletions

View file

@ -9,6 +9,7 @@ import (
"math/big" "math/big"
mrand "math/rand" mrand "math/rand"
"net" "net"
"runtime"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
@ -111,6 +112,7 @@ type (
txCbEnabled atomic.Bool txCbEnabled atomic.Bool
txInLock sync.Mutex txInLock sync.Mutex
txin chan *transaction.Transaction
txInMap map[util.Uint256]struct{} txInMap map[util.Uint256]struct{}
lock sync.RWMutex lock sync.RWMutex
@ -183,6 +185,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
mempool: chain.GetMemPool(), mempool: chain.GetMemPool(),
extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), extensiblePool: extpool.New(chain, config.ExtensiblePoolSize),
log: log, log: log,
txin: make(chan *transaction.Transaction, 64),
transactions: make(chan *transaction.Transaction, 64), transactions: make(chan *transaction.Transaction, 64),
services: make(map[string]Service), services: make(map[string]Service),
extensHandlers: make(map[string]func(*payload.Extensible) error), extensHandlers: make(map[string]func(*payload.Extensible) error),
@ -256,6 +259,10 @@ func (s *Server) Start(errChan chan error) {
s.tryStartServices() s.tryStartServices()
s.initStaleMemPools() s.initStaleMemPools()
var txThreads = optimalNumOfThreads()
for i := 0; i < txThreads; i++ {
go s.txHandlerLoop()
}
go s.broadcastTxLoop() go s.broadcastTxLoop()
go s.relayBlocksLoop() go s.relayBlocksLoop()
go s.bQueue.run() go s.bQueue.run()
@ -751,7 +758,7 @@ func (s *Server) handlePong(p Peer, pong *payload.Ping) error {
// handleInvCmd processes the received inventory. // handleInvCmd processes the received inventory.
func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
reqHashes := make([]util.Uint256, 0) var reqHashes = inv.Hashes[:0]
var typExists = map[payload.InventoryType]func(util.Uint256) bool{ var typExists = map[payload.InventoryType]func(util.Uint256) bool{
payload.TXType: s.mempool.ContainsKey, payload.TXType: s.mempool.ContainsKey,
payload.BlockType: s.chain.HasBlock, payload.BlockType: s.chain.HasBlock,
@ -1042,21 +1049,41 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
} }
s.txInMap[tx.Hash()] = struct{}{} s.txInMap[tx.Hash()] = struct{}{}
s.txInLock.Unlock() s.txInLock.Unlock()
s.serviceLock.RLock() s.txin <- tx
txCallback := s.txCallback
s.serviceLock.RUnlock()
if txCallback != nil && s.txCbEnabled.Load() {
txCallback(tx)
}
if s.verifyAndPoolTX(tx) == nil {
s.broadcastTX(tx, nil)
}
s.txInLock.Lock()
delete(s.txInMap, tx.Hash())
s.txInLock.Unlock()
return nil return nil
} }
func (s *Server) txHandlerLoop() {
txloop:
for {
select {
case tx := <-s.txin:
s.serviceLock.RLock()
txCallback := s.txCallback
s.serviceLock.RUnlock()
if txCallback != nil && s.txCbEnabled.Load() {
txCallback(tx)
}
if s.verifyAndPoolTX(tx) == nil {
s.broadcastTX(tx, nil)
}
s.txInLock.Lock()
delete(s.txInMap, tx.Hash())
s.txInLock.Unlock()
case <-s.quit:
break txloop
}
}
drainloop:
for {
select {
case <-s.txin:
default:
break drainloop
}
}
}
// handleP2PNotaryRequestCmd process the received P2PNotaryRequest payload. // handleP2PNotaryRequestCmd process the received P2PNotaryRequest payload.
func (s *Server) handleP2PNotaryRequestCmd(r *payload.P2PNotaryRequest) error { func (s *Server) handleP2PNotaryRequestCmd(r *payload.P2PNotaryRequest) error {
if !s.chain.P2PSigExtensionsEnabled() { if !s.chain.P2PSigExtensionsEnabled() {
@ -1589,3 +1616,18 @@ func (s *Server) Port() (uint16, error) {
} }
return port, nil return port, nil
} }
// optimalNumOfThreads returns the optimal number of processing threads to create
// for transaction processing.
func optimalNumOfThreads() int {
// Doing more won't help, mempool is still a contention point.
const maxThreads = 16
var threads = runtime.GOMAXPROCS(0)
if threads > runtime.NumCPU() {
threads = runtime.NumCPU()
}
if threads > maxThreads {
threads = maxThreads
}
return threads
}

View file

@ -6,6 +6,7 @@ import (
"math/big" "math/big"
"net" "net"
"strconv" "strconv"
"sync"
atomic2 "sync/atomic" atomic2 "sync/atomic"
"testing" "testing"
"time" "time"
@ -34,6 +35,7 @@ type fakeConsensus struct {
started atomic.Bool started atomic.Bool
stopped atomic.Bool stopped atomic.Bool
payloads []*payload.Extensible payloads []*payload.Extensible
txlock sync.Mutex
txs []*transaction.Transaction txs []*transaction.Transaction
} }
@ -46,7 +48,11 @@ func (f *fakeConsensus) OnPayload(p *payload.Extensible) error {
f.payloads = append(f.payloads, p) f.payloads = append(f.payloads, p)
return nil return nil
} }
func (f *fakeConsensus) OnTransaction(tx *transaction.Transaction) { f.txs = append(f.txs, tx) } func (f *fakeConsensus) OnTransaction(tx *transaction.Transaction) {
f.txlock.Lock()
defer f.txlock.Unlock()
f.txs = append(f.txs, tx)
}
func (f *fakeConsensus) GetPayload(h util.Uint256) *payload.Extensible { panic("implement me") } func (f *fakeConsensus) GetPayload(h util.Uint256) *payload.Extensible { panic("implement me") }
func TestNewServer(t *testing.T) { func TestNewServer(t *testing.T) {
@ -477,13 +483,33 @@ func TestTransaction(t *testing.T) {
s.register <- p s.register <- p
s.testHandleMessage(t, nil, CMDTX, tx) s.testHandleMessage(t, nil, CMDTX, tx)
require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) require.Eventually(t, func() bool {
var fake = s.services["fake"].(*fakeConsensus)
fake.txlock.Lock()
defer fake.txlock.Unlock()
for _, t := range fake.txs {
if t == tx {
return true
}
}
return false
}, 2*time.Second, time.Millisecond*500)
}) })
t.Run("bad", func(t *testing.T) { t.Run("bad", func(t *testing.T) {
tx := newDummyTx() tx := newDummyTx()
s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds }
s.testHandleMessage(t, nil, CMDTX, tx) s.testHandleMessage(t, nil, CMDTX, tx)
require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) // Consensus receives everything. require.Eventually(t, func() bool {
var fake = s.services["fake"].(*fakeConsensus)
fake.txlock.Lock()
defer fake.txlock.Unlock()
for _, t := range fake.txs {
if t == tx {
return true
}
}
return false
}, 2*time.Second, time.Millisecond*500)
}) })
} }