From 119b4200acebd90ca09abf91f9549dce6567ccc9 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 3 Aug 2021 22:43:31 +0300 Subject: [PATCH] network: add fail-fast route for tx double processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When transaction spreads through the network many nodes are likely to get it in roughly the same time. They will rebroadcast it also in roughly the same time. As we have a number of peers it's quite likely that we'd get an Inv with the same transaction from multiple peers simultaneously. We will ask them for this transaction (independently!) and again we're likely to get it in roughly the same time. So we can easily end up with multiple threads processing the same transaction. Only one will succeed, but we can actually easily avoid doing it in the first place saving some CPU cycles for other things. Notice that we can't do it _before_ receiving a transaction because nothing guarantees that the peer will respond to our transaction request, so communication overhead is unavoidable at the moment, but saving on processing already gives quite interesting results. Baseline, four nodes with 10 workers: RPS 7176.784 7014.511 6139.663 7191.280 7080.852 ≈ 6921 ± 5.72% TPS 6945.409 6562.756 5927.050 6681.187 6821.794 ≈ 6588 ± 5.38% CPU % 44.400 43.842 40.418 49.211 49.370 ≈ 45.4 ± 7.53% Mem MB 2693.414 2640.602 2472.007 2731.482 2707.879 ≈ 2649 ± 3.53% Patched: RPS ≈ 7791.675 7996.559 7834.504 7746.705 7891.614 ≈ 7852 ± 1.10% ↑ 13.45% TPS ≈ 7241.497 7711.765 7520.211 7425.890 7334.443 ≈ 7447 ± 2.17% ↑ 13.04% CPU % 29.853 39.936 39.945 36.371 39.999 ≈ 37.2 ± 10.57% ↓ 18.06% Mem MB 2749.635 2791.609 2828.610 2910.431 2863.344 ≈ 2829 ± 1.97% ↑ 6.80% --- pkg/network/server.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/network/server.go b/pkg/network/server.go index 5b8032de8..e03c1e6bc 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -74,6 +74,9 @@ type ( notaryFeer NotaryFeer notaryModule *notary.Notary + txInLock sync.Mutex + txInMap map[util.Uint256]struct{} + lock sync.RWMutex peers map[Peer]bool @@ -137,6 +140,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai quit: make(chan struct{}), register: make(chan Peer), unregister: make(chan peerDrop), + txInMap: make(map[util.Uint256]struct{}), peers: make(map[Peer]bool), syncReached: atomic.NewBool(false), mempool: chain.GetMemPool(), @@ -876,10 +880,21 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { func (s *Server) handleTxCmd(tx *transaction.Transaction) error { // It's OK for it to fail for various reasons like tx already existing // in the pool. + s.txInLock.Lock() + _, ok := s.txInMap[tx.Hash()] + if ok || s.mempool.ContainsKey(tx.Hash()) { + s.txInLock.Unlock() + return nil + } + s.txInMap[tx.Hash()] = struct{}{} + s.txInLock.Unlock() if s.verifyAndPoolTX(tx) == nil { s.consensus.OnTransaction(tx) s.broadcastTX(tx, nil) } + s.txInLock.Lock() + delete(s.txInMap, tx.Hash()) + s.txInLock.Unlock() return nil }