network: pre-filter transactions going into dbft
Drop some load from dbft loop during consensus process.
This commit is contained in:
parent
f78231fd9c
commit
c405092953
2 changed files with 24 additions and 6 deletions
|
@ -13,6 +13,7 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
satomic "sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/config"
|
"github.com/nspcc-dev/neo-go/pkg/config"
|
||||||
|
@ -109,7 +110,7 @@ type (
|
||||||
services map[string]Service
|
services map[string]Service
|
||||||
extensHandlers map[string]func(*payload.Extensible) error
|
extensHandlers map[string]func(*payload.Extensible) error
|
||||||
txCallback func(*transaction.Transaction)
|
txCallback func(*transaction.Transaction)
|
||||||
txCbEnabled atomic.Bool
|
txCbList satomic.Value
|
||||||
|
|
||||||
txInLock sync.RWMutex
|
txInLock sync.RWMutex
|
||||||
txin chan *transaction.Transaction
|
txin chan *transaction.Transaction
|
||||||
|
@ -1106,9 +1107,18 @@ txloop:
|
||||||
s.serviceLock.RLock()
|
s.serviceLock.RLock()
|
||||||
txCallback := s.txCallback
|
txCallback := s.txCallback
|
||||||
s.serviceLock.RUnlock()
|
s.serviceLock.RUnlock()
|
||||||
if txCallback != nil && s.txCbEnabled.Load() {
|
if txCallback != nil {
|
||||||
|
var cbList = s.txCbList.Load()
|
||||||
|
if cbList != nil {
|
||||||
|
var list = cbList.([]util.Uint256)
|
||||||
|
var i = sort.Search(len(list), func(i int) bool {
|
||||||
|
return list[i].CompareTo(tx.Hash()) >= 0
|
||||||
|
})
|
||||||
|
if i < len(list) && list[i].Equals(tx.Hash()) {
|
||||||
txCallback(tx)
|
txCallback(tx)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if s.verifyAndPoolTX(tx) == nil {
|
if s.verifyAndPoolTX(tx) == nil {
|
||||||
s.broadcastTX(tx, nil)
|
s.broadcastTX(tx, nil)
|
||||||
}
|
}
|
||||||
|
@ -1420,7 +1430,13 @@ func (s *Server) RequestTx(hashes ...util.Uint256) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.txCbEnabled.Store(true)
|
var sorted = make([]util.Uint256, len(hashes))
|
||||||
|
copy(sorted, hashes)
|
||||||
|
sort.Slice(sorted, func(i, j int) bool {
|
||||||
|
return sorted[i].CompareTo(sorted[j]) < 0
|
||||||
|
})
|
||||||
|
|
||||||
|
s.txCbList.Store(sorted)
|
||||||
|
|
||||||
for i := 0; i <= len(hashes)/payload.MaxHashesCount; i++ {
|
for i := 0; i <= len(hashes)/payload.MaxHashesCount; i++ {
|
||||||
start := i * payload.MaxHashesCount
|
start := i * payload.MaxHashesCount
|
||||||
|
@ -1440,7 +1456,8 @@ func (s *Server) RequestTx(hashes ...util.Uint256) {
|
||||||
|
|
||||||
// StopTxFlow makes the server not call previously specified consensus transaction callback.
|
// StopTxFlow makes the server not call previously specified consensus transaction callback.
|
||||||
func (s *Server) StopTxFlow() {
|
func (s *Server) StopTxFlow() {
|
||||||
s.txCbEnabled.Store(false)
|
var hashes []util.Uint256
|
||||||
|
s.txCbList.Store(hashes)
|
||||||
}
|
}
|
||||||
|
|
||||||
// iteratePeersWithSendMsg sends the given message to all peers using two functions
|
// iteratePeersWithSendMsg sends the given message to all peers using two functions
|
||||||
|
|
|
@ -467,10 +467,10 @@ func TestTransaction(t *testing.T) {
|
||||||
cons := new(fakeConsensus)
|
cons := new(fakeConsensus)
|
||||||
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
|
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
|
||||||
startWithCleanup(t, s)
|
startWithCleanup(t, s)
|
||||||
s.RequestTx(util.Uint256{1})
|
|
||||||
|
|
||||||
t.Run("good", func(t *testing.T) {
|
t.Run("good", func(t *testing.T) {
|
||||||
tx := newDummyTx()
|
tx := newDummyTx()
|
||||||
|
s.RequestTx(tx.Hash())
|
||||||
p := newLocalPeer(t, s)
|
p := newLocalPeer(t, s)
|
||||||
p.isFullNode = true
|
p.isFullNode = true
|
||||||
p.messageHandler = func(t *testing.T, msg *Message) {
|
p.messageHandler = func(t *testing.T, msg *Message) {
|
||||||
|
@ -497,6 +497,7 @@ func TestTransaction(t *testing.T) {
|
||||||
})
|
})
|
||||||
t.Run("bad", func(t *testing.T) {
|
t.Run("bad", func(t *testing.T) {
|
||||||
tx := newDummyTx()
|
tx := newDummyTx()
|
||||||
|
s.RequestTx(tx.Hash())
|
||||||
s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds }
|
s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds }
|
||||||
s.testHandleMessage(t, nil, CMDTX, tx)
|
s.testHandleMessage(t, nil, CMDTX, tx)
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
|
|
Loading…
Reference in a new issue