From 73ce898e27a0e8ded8620afc182136d8f6d16069 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 14 Oct 2022 21:00:26 +0300 Subject: [PATCH] network/consensus: use new dbft StopTxFlow callback It makes sense in general (further narrowing down the time window when transactions are processed by consensus thread) and it improves block times a little too, especially in the 7+2 scenario. Related to #2744. --- cli/server/server.go | 1 + go.mod | 2 +- go.sum | 4 ++-- internal/testcli/executor.go | 1 + pkg/consensus/consensus.go | 4 ++++ pkg/consensus/consensus_test.go | 1 + pkg/network/server.go | 11 ++++++++--- pkg/network/server_test.go | 1 + 8 files changed, 19 insertions(+), 6 deletions(-) diff --git a/cli/server/server.go b/cli/server/server.go index 827e3ae77..bd76e49fc 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -332,6 +332,7 @@ func mkConsensus(config config.Wallet, tpb time.Duration, chain *core.Blockchain Chain: chain, ProtocolConfiguration: chain.GetConfig(), RequestTx: serv.RequestTx, + StopTxFlow: serv.StopTxFlow, Wallet: &config, TimePerBlock: tpb, }) diff --git a/go.mod b/go.mod index 0a314e51d..2a0bd53a4 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/holiman/uint256 v1.2.0 github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 github.com/mr-tron/base58 v1.2.0 - github.com/nspcc-dev/dbft v0.0.0-20220902113116-58a5e763e647 + github.com/nspcc-dev/dbft v0.0.0-20221018080254-c7e1bf49ccd7 github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20220927123257-24c107e3a262 github.com/nspcc-dev/neofs-sdk-go v0.0.0-20220113123743-7f3162110659 diff --git a/go.sum b/go.sum index 189513264..4acbe2ce0 100644 --- a/go.sum +++ b/go.sum @@ -251,8 +251,8 @@ github.com/nspcc-dev/dbft v0.0.0-20191209120240-0d6b7568d9ae/go.mod h1:3FjXOoHmA github.com/nspcc-dev/dbft v0.0.0-20200117124306-478e5cfbf03a/go.mod h1:/YFK+XOxxg0Bfm6P92lY5eDSLYfp06XOdL8KAVgXjVk= github.com/nspcc-dev/dbft v0.0.0-20200219114139-199d286ed6c1/go.mod h1:O0qtn62prQSqizzoagHmuuKoz8QMkU3SzBoKdEvm3aQ= github.com/nspcc-dev/dbft v0.0.0-20210721160347-1b03241391ac/go.mod h1:U8MSnEShH+o5hexfWJdze6uMFJteP0ko7J2frO7Yu1Y= -github.com/nspcc-dev/dbft v0.0.0-20220902113116-58a5e763e647 h1:handGBjqVzRx7HD6152zsP8ZRxw083zCMbN0IlUaPQk= -github.com/nspcc-dev/dbft v0.0.0-20220902113116-58a5e763e647/go.mod h1:g9xisXmX9NP9MjioaTe862n9SlZTrP+6PVUWLBYOr98= +github.com/nspcc-dev/dbft v0.0.0-20221018080254-c7e1bf49ccd7 h1:RxVI9RFiHmpUvbuYIM5siLMiOvVt8P651BdmTstGi3Q= +github.com/nspcc-dev/dbft v0.0.0-20221018080254-c7e1bf49ccd7/go.mod h1:g9xisXmX9NP9MjioaTe862n9SlZTrP+6PVUWLBYOr98= github.com/nspcc-dev/go-ordered-json v0.0.0-20210915112629-e1b6cce73d02/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U= github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 h1:n4ZaFCKt1pQJd7PXoMJabZWK9ejjbLOVrkl/lOUmshg= github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22/go.mod h1:79bEUDEviBHJMFV6Iq6in57FEOCMcRhfQnfaf0ETA5U= diff --git a/internal/testcli/executor.go b/internal/testcli/executor.go index 6bcb1adcb..c8f731d18 100644 --- a/internal/testcli/executor.go +++ b/internal/testcli/executor.go @@ -156,6 +156,7 @@ func NewTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch Chain: chain, ProtocolConfiguration: chain.GetConfig(), RequestTx: netSrv.RequestTx, + StopTxFlow: netSrv.StopTxFlow, Wallet: &cfg.ApplicationConfiguration.UnlockWallet, TimePerBlock: serverConfig.TimePerBlock, }) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 1c0fc2d23..f0db86045 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -119,6 +119,9 @@ type Config struct { // RequestTx is a callback to which will be called // when a node lacks transactions present in the block. RequestTx func(h ...util.Uint256) + // StopTxFlow is a callback that is called after the consensus + // process stops accepting incoming transactions. + StopTxFlow func() // TimePerBlock is minimal time that should pass before the next block is accepted. TimePerBlock time.Duration // Wallet is a local-node wallet configuration. @@ -173,6 +176,7 @@ func NewService(cfg Config) (Service, error) { dbft.WithSecondsPerBlock(cfg.TimePerBlock), dbft.WithGetKeyPair(srv.getKeyPair), dbft.WithRequestTx(cfg.RequestTx), + dbft.WithStopTxFlow(cfg.StopTxFlow), dbft.WithGetTx(srv.getTx), dbft.WithGetVerified(srv.getVerifiedTx), dbft.WithBroadcast(srv.broadcast), diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index 92aa0a7cc..904fe1693 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -479,6 +479,7 @@ func newTestServiceWithChain(t *testing.T, bc *core.Blockchain) *service { Chain: bc, ProtocolConfiguration: bc.GetConfig(), RequestTx: func(...util.Uint256) {}, + StopTxFlow: func() {}, TimePerBlock: time.Duration(bc.GetConfig().SecondsPerBlock) * time.Second, Wallet: &config.Wallet{ Path: "./testdata/wallet1.json", diff --git a/pkg/network/server.go b/pkg/network/server.go index 998120553..655edad0a 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -108,7 +108,7 @@ type ( services map[string]Service extensHandlers map[string]func(*payload.Extensible) error txCallback func(*transaction.Transaction) - txCbHeight atomic.Uint32 + txCbEnabled atomic.Bool txInLock sync.Mutex txInMap map[util.Uint256]struct{} @@ -1045,7 +1045,7 @@ func (s *Server) handleTxCmd(tx *transaction.Transaction) error { s.serviceLock.RLock() txCallback := s.txCallback s.serviceLock.RUnlock() - if txCallback != nil && s.chain.BlockHeight() <= s.txCbHeight.Load() { + if txCallback != nil && s.txCbEnabled.Load() { txCallback(tx) } if s.verifyAndPoolTX(tx) == nil { @@ -1345,7 +1345,7 @@ func (s *Server) RequestTx(hashes ...util.Uint256) { return } - s.txCbHeight.Store(s.chain.BlockHeight()) + s.txCbEnabled.Store(true) for i := 0; i <= len(hashes)/payload.MaxHashesCount; i++ { start := i * payload.MaxHashesCount @@ -1363,6 +1363,11 @@ func (s *Server) RequestTx(hashes ...util.Uint256) { } } +// StopTxFlow makes the server not call previously specified consensus transaction callback. +func (s *Server) StopTxFlow() { + s.txCbEnabled.Store(false) +} + // iteratePeersWithSendMsg sends the given message to all peers using two functions // passed, one is to send the message and the other is to filtrate peers (the // peer is considered invalid if it returns false). diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 2e4b7cf8e..49fcfe751 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -461,6 +461,7 @@ func TestTransaction(t *testing.T) { cons := new(fakeConsensus) s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) startWithCleanup(t, s) + s.RequestTx(util.Uint256{1}) t.Run("good", func(t *testing.T) { tx := newDummyTx()