network/consensus: use new dbft StopTxFlow callback

It makes sense in general (further narrowing down the time window when
transactions are processed by consensus thread) and it improves block times a
little too, especially in the 7+2 scenario.

Related to #2744.
This commit is contained in:
Roman Khimov 2022-10-14 21:00:26 +03:00
parent 73079745ab
commit 73ce898e27
8 changed files with 19 additions and 6 deletions

View file

@ -332,6 +332,7 @@ func mkConsensus(config config.Wallet, tpb time.Duration, chain *core.Blockchain
Chain: chain, Chain: chain,
ProtocolConfiguration: chain.GetConfig(), ProtocolConfiguration: chain.GetConfig(),
RequestTx: serv.RequestTx, RequestTx: serv.RequestTx,
StopTxFlow: serv.StopTxFlow,
Wallet: &config, Wallet: &config,
TimePerBlock: tpb, TimePerBlock: tpb,
}) })

2
go.mod
View file

@ -11,7 +11,7 @@ require (
github.com/holiman/uint256 v1.2.0 github.com/holiman/uint256 v1.2.0
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/mr-tron/base58 v1.2.0 github.com/mr-tron/base58 v1.2.0
github.com/nspcc-dev/dbft v0.0.0-20220902113116-58a5e763e647 github.com/nspcc-dev/dbft v0.0.0-20221018080254-c7e1bf49ccd7
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20220927123257-24c107e3a262 github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20220927123257-24c107e3a262
github.com/nspcc-dev/neofs-sdk-go v0.0.0-20220113123743-7f3162110659 github.com/nspcc-dev/neofs-sdk-go v0.0.0-20220113123743-7f3162110659

4
go.sum
View file

@ -251,8 +251,8 @@ github.com/nspcc-dev/dbft v0.0.0-20191209120240-0d6b7568d9ae/go.mod h1:3FjXOoHmA
github.com/nspcc-dev/dbft v0.0.0-20200117124306-478e5cfbf03a/go.mod h1:/YFK+XOxxg0Bfm6P92lY5eDSLYfp06XOdL8KAVgXjVk= github.com/nspcc-dev/dbft v0.0.0-20200117124306-478e5cfbf03a/go.mod h1:/YFK+XOxxg0Bfm6P92lY5eDSLYfp06XOdL8KAVgXjVk=
github.com/nspcc-dev/dbft v0.0.0-20200219114139-199d286ed6c1/go.mod h1:O0qtn62prQSqizzoagHmuuKoz8QMkU3SzBoKdEvm3aQ= github.com/nspcc-dev/dbft v0.0.0-20200219114139-199d286ed6c1/go.mod h1:O0qtn62prQSqizzoagHmuuKoz8QMkU3SzBoKdEvm3aQ=
github.com/nspcc-dev/dbft v0.0.0-20210721160347-1b03241391ac/go.mod h1:U8MSnEShH+o5hexfWJdze6uMFJteP0ko7J2frO7Yu1Y= github.com/nspcc-dev/dbft v0.0.0-20210721160347-1b03241391ac/go.mod h1:U8MSnEShH+o5hexfWJdze6uMFJteP0ko7J2frO7Yu1Y=
github.com/nspcc-dev/dbft v0.0.0-20220902113116-58a5e763e647 h1:handGBjqVzRx7HD6152zsP8ZRxw083zCMbN0IlUaPQk= github.com/nspcc-dev/dbft v0.0.0-20221018080254-c7e1bf49ccd7 h1:RxVI9RFiHmpUvbuYIM5siLMiOvVt8P651BdmTstGi3Q=
github.com/nspcc-dev/dbft v0.0.0-20220902113116-58a5e763e647/go.mod h1:g9xisXmX9NP9MjioaTe862n9SlZTrP+6PVUWLBYOr98= github.com/nspcc-dev/dbft v0.0.0-20221018080254-c7e1bf49ccd7/go.mod h1:g9xisXmX9NP9MjioaTe862n9SlZTrP+6PVUWLBYOr98=
github.com/nspcc-dev/go-ordered-json v0.0.0-20210915112629-e1b6cce73d02/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U= github.com/nspcc-dev/go-ordered-json v0.0.0-20210915112629-e1b6cce73d02/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U=
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 h1:n4ZaFCKt1pQJd7PXoMJabZWK9ejjbLOVrkl/lOUmshg= github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 h1:n4ZaFCKt1pQJd7PXoMJabZWK9ejjbLOVrkl/lOUmshg=
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U= github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U=

View file

@ -156,6 +156,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
Chain: chain, Chain: chain,
ProtocolConfiguration: chain.GetConfig(), ProtocolConfiguration: chain.GetConfig(),
RequestTx: netSrv.RequestTx, RequestTx: netSrv.RequestTx,
StopTxFlow: netSrv.StopTxFlow,
Wallet: &cfg.ApplicationConfiguration.UnlockWallet, Wallet: &cfg.ApplicationConfiguration.UnlockWallet,
TimePerBlock: serverConfig.TimePerBlock, TimePerBlock: serverConfig.TimePerBlock,
}) })

View file

@ -119,6 +119,9 @@ type Config struct {
// RequestTx is a callback to which will be called // RequestTx is a callback to which will be called
// when a node lacks transactions present in the block. // when a node lacks transactions present in the block.
RequestTx func(h ...util.Uint256) RequestTx func(h ...util.Uint256)
// StopTxFlow is a callback that is called after the consensus
// process stops accepting incoming transactions.
StopTxFlow func()
// TimePerBlock is minimal time that should pass before the next block is accepted. // TimePerBlock is minimal time that should pass before the next block is accepted.
TimePerBlock time.Duration TimePerBlock time.Duration
// Wallet is a local-node wallet configuration. // Wallet is a local-node wallet configuration.
@ -173,6 +176,7 @@ func NewService(cfg Config) (Service, error) {
dbft.WithSecondsPerBlock(cfg.TimePerBlock), dbft.WithSecondsPerBlock(cfg.TimePerBlock),
dbft.WithGetKeyPair(srv.getKeyPair), dbft.WithGetKeyPair(srv.getKeyPair),
dbft.WithRequestTx(cfg.RequestTx), dbft.WithRequestTx(cfg.RequestTx),
dbft.WithStopTxFlow(cfg.StopTxFlow),
dbft.WithGetTx(srv.getTx), dbft.WithGetTx(srv.getTx),
dbft.WithGetVerified(srv.getVerifiedTx), dbft.WithGetVerified(srv.getVerifiedTx),
dbft.WithBroadcast(srv.broadcast), dbft.WithBroadcast(srv.broadcast),

View file

@ -479,6 +479,7 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service {
Chain: bc, Chain: bc,
ProtocolConfiguration: bc.GetConfig(), ProtocolConfiguration: bc.GetConfig(),
RequestTx: func(...util.Uint256) {}, RequestTx: func(...util.Uint256) {},
StopTxFlow: func() {},
TimePerBlock: time.Duration(bc.GetConfig().SecondsPerBlock) * time.Second, TimePerBlock: time.Duration(bc.GetConfig().SecondsPerBlock) * time.Second,
Wallet: &config.Wallet{ Wallet: &config.Wallet{
Path: "./testdata/wallet1.json", Path: "./testdata/wallet1.json",

View file

@ -108,7 +108,7 @@ type (
services map[string]Service services map[string]Service
extensHandlers map[string]func(*payload.Extensible) error extensHandlers map[string]func(*payload.Extensible) error
txCallback func(*transaction.Transaction) txCallback func(*transaction.Transaction)
txCbHeight atomic.Uint32 txCbEnabled atomic.Bool
txInLock sync.Mutex txInLock sync.Mutex
txInMap map[util.Uint256]struct{} txInMap map[util.Uint256]struct{}
@ -1045,7 +1045,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error {
s.serviceLock.RLock() s.serviceLock.RLock()
txCallback := s.txCallback txCallback := s.txCallback
s.serviceLock.RUnlock() s.serviceLock.RUnlock()
if txCallback != nil && s.chain.BlockHeight() <= s.txCbHeight.Load() { if txCallback != nil && s.txCbEnabled.Load() {
txCallback(tx) txCallback(tx)
} }
if s.verifyAndPoolTX(tx) == nil { if s.verifyAndPoolTX(tx) == nil {
@ -1345,7 +1345,7 @@ func (s *Server) RequestTx(hashes ...util.Uint256) {
return return
} }
s.txCbHeight.Store(s.chain.BlockHeight()) s.txCbEnabled.Store(true)
for i := 0; i <= len(hashes)/payload.MaxHashesCount; i++ { for i := 0; i <= len(hashes)/payload.MaxHashesCount; i++ {
start := i * payload.MaxHashesCount start := i * payload.MaxHashesCount
@ -1363,6 +1363,11 @@ func (s *Server) RequestTx(hashes ...util.Uint256) {
} }
} }
// StopTxFlow makes the server not call previously specified consensus transaction callback.
func (s *Server) StopTxFlow() {
s.txCbEnabled.Store(false)
}
// iteratePeersWithSendMsg sends the given message to all peers using two functions // iteratePeersWithSendMsg sends the given message to all peers using two functions
// passed, one is to send the message and the other is to filtrate peers (the // passed, one is to send the message and the other is to filtrate peers (the
// peer is considered invalid if it returns false). // peer is considered invalid if it returns false).

View file

@ -461,6 +461,7 @@ func TestTransaction(t *testing.T) {
cons := new(fakeConsensus) cons := new(fakeConsensus)
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
startWithCleanup(t, s) startWithCleanup(t, s)
s.RequestTx(util.Uint256{1})
t.Run("good", func(t *testing.T) { t.Run("good", func(t *testing.T) {
tx := newDummyTx() tx := newDummyTx()