network: separate tx handling from msg handling
This allows to naturally scale transaction processing if we have some peer that is sending a lot of them while others are mostly silent. It also can help somewhat in the event we have 50 peers that all send transactions. 4+1 scenario benefits a lot from it, while 7+2 slows down a little. Delayed scenarios don't care. Surprisingly, this also makes disconnects (#2744) much more rare, 4-node scenario almost never sees it now. Most probably this is the case where peers affect each other a lot, single-threaded transaction receiver can be slow enough to trigger some timeout in getdata handler of its peer (because it tries to push a number of replies).
This commit is contained in:
parent
e003b67418
commit
e1b5ac9b81
2 changed files with 83 additions and 15 deletions
|
@ -6,6 +6,7 @@ import (
|
|||
"math/big"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
atomic2 "sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -34,6 +35,7 @@ type fakeConsensus struct {
|
|||
started atomic.Bool
|
||||
stopped atomic.Bool
|
||||
payloads []*payload.Extensible
|
||||
txlock sync.Mutex
|
||||
txs []*transaction.Transaction
|
||||
}
|
||||
|
||||
|
@ -46,7 +48,11 @@ func (f *fakeConsensus) OnPayload(p *payload.Extensible) error {
|
|||
f.payloads = append(f.payloads, p)
|
||||
return nil
|
||||
}
|
||||
func (f *fakeConsensus) OnTransaction(tx *transaction.Transaction) { f.txs = append(f.txs, tx) }
|
||||
func (f *fakeConsensus) OnTransaction(tx *transaction.Transaction) {
|
||||
f.txlock.Lock()
|
||||
defer f.txlock.Unlock()
|
||||
f.txs = append(f.txs, tx)
|
||||
}
|
||||
func (f *fakeConsensus) GetPayload(h util.Uint256) *payload.Extensible { panic("implement me") }
|
||||
|
||||
func TestNewServer(t *testing.T) {
|
||||
|
@ -477,13 +483,33 @@ func TestTransaction(t *testing.T) {
|
|||
s.register <- p
|
||||
|
||||
s.testHandleMessage(t, nil, CMDTX, tx)
|
||||
require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx)
|
||||
require.Eventually(t, func() bool {
|
||||
var fake = s.services["fake"].(*fakeConsensus)
|
||||
fake.txlock.Lock()
|
||||
defer fake.txlock.Unlock()
|
||||
for _, t := range fake.txs {
|
||||
if t == tx {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 2*time.Second, time.Millisecond*500)
|
||||
})
|
||||
t.Run("bad", func(t *testing.T) {
|
||||
tx := newDummyTx()
|
||||
s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds }
|
||||
s.testHandleMessage(t, nil, CMDTX, tx)
|
||||
require.Contains(t, s.services["fake"].(*fakeConsensus).txs, tx) // Consensus receives everything.
|
||||
require.Eventually(t, func() bool {
|
||||
var fake = s.services["fake"].(*fakeConsensus)
|
||||
fake.txlock.Lock()
|
||||
defer fake.txlock.Unlock()
|
||||
for _, t := range fake.txs {
|
||||
if t == tx {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 2*time.Second, time.Millisecond*500)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue