diff --git a/pkg/network/server.go b/pkg/network/server.go index 364ff30bd..57aa06ffe 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -9,6 +9,7 @@ import ( "math/big" mrand "math/rand" "net" + "runtime" "sort" "strconv" "sync" @@ -111,6 +112,7 @@ type ( txCbEnabled atomic.Bool txInLock sync.Mutex + txin chan *transaction.Transaction txInMap map[util.Uint256]struct{} lock sync.RWMutex @@ -183,6 +185,7 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy mempool: chain.GetMemPool(), extensiblePool: extpool.New(chain, config.ExtensiblePoolSize), log: log, + txin: make(chan *transaction.Transaction, 64), transactions: make(chan *transaction.Transaction, 64), services: make(map[string]Service), extensHandlers: make(map[string]func(*payload.Extensible) error), @@ -256,6 +259,10 @@ func (s *Server) Start(errChan chan error) { s.tryStartServices() s.initStaleMemPools() + var txThreads = optimalNumOfThreads() + for i := 0; i < txThreads; i++ { + go s.txHandlerLoop() + } go s.broadcastTxLoop() go s.relayBlocksLoop() go s.bQueue.run() @@ -1042,21 +1049,41 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { } s.txInMap[tx.Hash()] = struct{}{} s.txInLock.Unlock() - s.serviceLock.RLock() - txCallback := s.txCallback - s.serviceLock.RUnlock() - if txCallback != nil && s.txCbEnabled.Load() { - txCallback(tx) - } - if s.verifyAndPoolTX(tx) == nil { - s.broadcastTX(tx, nil) - } - s.txInLock.Lock() - delete(s.txInMap, tx.Hash()) - s.txInLock.Unlock() + s.txin <- tx return nil } +func (s *Server) txHandlerLoop() { +txloop: + for { + select { + case tx := <-s.txin: + s.serviceLock.RLock() + txCallback := s.txCallback + s.serviceLock.RUnlock() + if txCallback != nil && s.txCbEnabled.Load() { + txCallback(tx) + } + if s.verifyAndPoolTX(tx) == nil { + s.broadcastTX(tx, nil) + } + s.txInLock.Lock() + delete(s.txInMap, tx.Hash()) + s.txInLock.Unlock() + case <-s.quit: + break txloop + } + } +drainloop: + for { + select { + case <-s.txin: + default: + break drainloop + } + } +} + // handleP2PNotaryRequestCmd process the received P2PNotaryRequest payload. func (s *Server) handleP2PNotaryRequestCmd(r *payload.P2PNotaryRequest) error { if !s.chain.P2PSigExtensionsEnabled() { @@ -1589,3 +1616,18 @@ func (s *Server) Port() (uint16, error) { } return port, nil } + +// optimalNumOfThreads returns the optimal number of processing threads to create +// for transaction processing. +func optimalNumOfThreads() int { + // Doing more won't help, mempool is still a contention point. + const maxThreads = 16 + var threads = runtime.GOMAXPROCS(0) + if threads > runtime.NumCPU() { + threads = runtime.NumCPU() + } + if threads > maxThreads { + threads = maxThreads + } + return threads +} diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 49fcfe751..b6684b1b5 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -6,6 +6,7 @@ import ( "math/big" "net" "strconv" + "sync" atomic2 "sync/atomic" "testing" "time" @@ -34,6 +35,7 @@ type fakeConsensus struct { started atomic.Bool stopped atomic.Bool payloads []*payload.Extensible + txlock sync.Mutex txs []*transaction.Transaction } @@ -46,7 +48,11 @@ func (f *fakeConsensus) OnPayload(p *payload.Extensible) error { f.payloads = append(f.payloads, p) return nil } -func (f *fakeConsensus) OnTransaction(tx *transaction.Transaction) { f.txs = append(f.txs, tx) } +func (f *fakeConsensus) OnTransaction(tx *transaction.Transaction) { + f.txlock.Lock() + defer f.txlock.Unlock() + f.txs = append(f.txs, tx) +} func (f *fakeConsensus) GetPayload(h util.Uint256) *payload.Extensible { panic("implement me") } func TestNewServer(t *testing.T) { @@ -477,13 +483,33 @@ func TestTransaction(t *testing.T) { s.register <- p s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) + require.Eventually(t, func() bool { + var fake = s.services["fake"].(*fakeConsensus) + fake.txlock.Lock() + defer fake.txlock.Unlock() + for _, t := range fake.txs { + if t == tx { + return true + } + } + return false + }, 2*time.Second, time.Millisecond*500) }) t.Run("bad", func(t *testing.T) { tx := newDummyTx() s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.testHandleMessage(t, nil, CMDTX, tx) - require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) // Consensus receives everything. + require.Eventually(t, func() bool { + var fake = s.services["fake"].(*fakeConsensus) + fake.txlock.Lock() + defer fake.txlock.Unlock() + for _, t := range fake.txs { + if t == tx { + return true + } + } + return false + }, 2*time.Second, time.Millisecond*500) }) }