Merge pull request #2788 from nspcc-dev/dbft-tx-list

network: pre-filter transactions going into dbft
This commit is contained in:
Roman Khimov 2022-11-14 15:10:10 +07:00 committed by GitHub
commit ecf1f3195c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 6 deletions

View file

@ -13,6 +13,7 @@ import (
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
satomic "sync/atomic"
"time" "time"
"github.com/nspcc-dev/neo-go/pkg/config" "github.com/nspcc-dev/neo-go/pkg/config"
@ -109,7 +110,7 @@ type (
services map[string]Service services map[string]Service
extensHandlers map[string]func(*payload.Extensible) error extensHandlers map[string]func(*payload.Extensible) error
txCallback func(*transaction.Transaction) txCallback func(*transaction.Transaction)
txCbEnabled atomic.Bool txCbList satomic.Value
txInLock sync.RWMutex txInLock sync.RWMutex
txin chan *transaction.Transaction txin chan *transaction.Transaction
@ -1106,9 +1107,18 @@ txloop:
s.serviceLock.RLock() s.serviceLock.RLock()
txCallback := s.txCallback txCallback := s.txCallback
s.serviceLock.RUnlock() s.serviceLock.RUnlock()
if txCallback != nil && s.txCbEnabled.Load() { if txCallback != nil {
var cbList = s.txCbList.Load()
if cbList != nil {
var list = cbList.([]util.Uint256)
var i = sort.Search(len(list), func(i int) bool {
return list[i].CompareTo(tx.Hash()) >= 0
})
if i < len(list) && list[i].Equals(tx.Hash()) {
txCallback(tx) txCallback(tx)
} }
}
}
if s.verifyAndPoolTX(tx) == nil { if s.verifyAndPoolTX(tx) == nil {
s.broadcastTX(tx, nil) s.broadcastTX(tx, nil)
} }
@ -1420,7 +1430,13 @@ func (s *Server) RequestTx(hashes ...util.Uint256) {
return return
} }
s.txCbEnabled.Store(true) var sorted = make([]util.Uint256, len(hashes))
copy(sorted, hashes)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].CompareTo(sorted[j]) < 0
})
s.txCbList.Store(sorted)
for i := 0; i <= len(hashes)/payload.MaxHashesCount; i++ { for i := 0; i <= len(hashes)/payload.MaxHashesCount; i++ {
start := i * payload.MaxHashesCount start := i * payload.MaxHashesCount
@ -1440,7 +1456,8 @@ func (s *Server) RequestTx(hashes ...util.Uint256) {
// StopTxFlow makes the server not call previously specified consensus transaction callback. // StopTxFlow makes the server not call previously specified consensus transaction callback.
func (s *Server) StopTxFlow() { func (s *Server) StopTxFlow() {
s.txCbEnabled.Store(false) var hashes []util.Uint256
s.txCbList.Store(hashes)
} }
// iteratePeersWithSendMsg sends the given message to all peers using two functions // iteratePeersWithSendMsg sends the given message to all peers using two functions

View file

@ -467,10 +467,10 @@ func TestTransaction(t *testing.T) {
cons := new(fakeConsensus) cons := new(fakeConsensus)
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
startWithCleanup(t, s) startWithCleanup(t, s)
s.RequestTx(util.Uint256{1})
t.Run("good", func(t *testing.T) { t.Run("good", func(t *testing.T) {
tx := newDummyTx() tx := newDummyTx()
s.RequestTx(tx.Hash())
p := newLocalPeer(t, s) p := newLocalPeer(t, s)
p.isFullNode = true p.isFullNode = true
p.messageHandler = func(t *testing.T, msg *Message) { p.messageHandler = func(t *testing.T, msg *Message) {
@ -497,6 +497,7 @@ func TestTransaction(t *testing.T) {
}) })
t.Run("bad", func(t *testing.T) { t.Run("bad", func(t *testing.T) {
tx := newDummyTx() tx := newDummyTx()
s.RequestTx(tx.Hash())
s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds } s.chain.(*fakechain.FakeChain).PoolTxF = func(*transaction.Transaction) error { return core.ErrInsufficientFunds }
s.testHandleMessage(t, nil, CMDTX, tx) s.testHandleMessage(t, nil, CMDTX, tx)
require.Eventually(t, func() bool { require.Eventually(t, func() bool {