network: add fail-fast route for tx double processing

When transaction spreads through the network many nodes are likely to get it
in roughly the same time. They will rebroadcast it also in roughly the same
time. As we have a number of peers it's quite likely that we'd get an Inv with
the same transaction from multiple peers simultaneously. We will ask them for
this transaction (independently!) and again we're likely to get it in roughly
the same time. So we can easily end up with multiple threads processing the
same transaction. Only one will succeed, but we can actually easily avoid
doing it in the first place saving some CPU cycles for other things.

Notice that we can't do it _before_ receiving a transaction because nothing
guarantees that the peer will respond to our transaction request, so
communication overhead is unavoidable at the moment, but saving on processing
already gives quite interesting results.

Baseline, four nodes with 10 workers:

RPS    7176.784 7014.511 6139.663 7191.280 7080.852 ≈ 6921   ± 5.72%
TPS    6945.409 6562.756 5927.050 6681.187 6821.794 ≈ 6588   ± 5.38%
CPU %    44.400   43.842   40.418   49.211   49.370 ≈   45.4 ± 7.53%
Mem MB 2693.414 2640.602 2472.007 2731.482 2707.879 ≈ 2649   ± 3.53%

Patched:

RPS ≈  7791.675 7996.559 7834.504 7746.705 7891.614 ≈ 7852   ±  1.10% ↑ 13.45%
TPS ≈  7241.497 7711.765 7520.211 7425.890 7334.443 ≈ 7447   ±  2.17% ↑ 13.04%
CPU %    29.853   39.936   39.945   36.371   39.999 ≈   37.2 ± 10.57% ↓ 18.06%
Mem MB 2749.635 2791.609 2828.610 2910.431 2863.344 ≈ 2829   ±  1.97% ↑  6.80%
This commit is contained in:
Roman Khimov 2021-08-03 22:43:31 +03:00
parent 7fc153ed2a
commit 119b4200ac

View file

@ -74,6 +74,9 @@ type (
notaryFeer NotaryFeer notaryFeer NotaryFeer
notaryModule *notary.Notary notaryModule *notary.Notary
txInLock sync.Mutex
txInMap map[util.Uint256]struct{}
lock sync.RWMutex lock sync.RWMutex
peers map[Peer]bool peers map[Peer]bool
@ -137,6 +140,7 @@ func newServerFromConstructors(config ServerConfig, chain blockchainer.Blockchai
quit: make(chan struct{}), quit: make(chan struct{}),
register: make(chan Peer), register: make(chan Peer),
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
txInMap: make(map[util.Uint256]struct{}),
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
syncReached: atomic.NewBool(false), syncReached: atomic.NewBool(false),
mempool: chain.GetMemPool(), mempool: chain.GetMemPool(),
@ -876,10 +880,21 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
func (s *Server) handleTxCmd(tx *transaction.Transaction) error { func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
// It's OK for it to fail for various reasons like tx already existing // It's OK for it to fail for various reasons like tx already existing
// in the pool. // in the pool.
s.txInLock.Lock()
_, ok := s.txInMap[tx.Hash()]
if ok || s.mempool.ContainsKey(tx.Hash()) {
s.txInLock.Unlock()
return nil
}
s.txInMap[tx.Hash()] = struct{}{}
s.txInLock.Unlock()
if s.verifyAndPoolTX(tx) == nil { if s.verifyAndPoolTX(tx) == nil {
s.consensus.OnTransaction(tx) s.consensus.OnTransaction(tx)
s.broadcastTX(tx, nil) s.broadcastTX(tx, nil)
} }
s.txInLock.Lock()
delete(s.txInMap, tx.Hash())
s.txInLock.Unlock()
return nil return nil
} }