From 9b0ea2c21bc47e65cd1f6cc17d6679fe161279ef Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 28 Jul 2022 18:30:14 +0300 Subject: [PATCH] network/consensus: always process dBFT messages as high priority Move category definition from consensus to payload, consensus service is the one of its kind (HP), so network.Server can be adjusted accordingly. --- cli/executor_test.go | 2 +- cli/server/server.go | 4 ++-- pkg/consensus/consensus.go | 5 +---- pkg/consensus/recovery_message.go | 2 +- pkg/network/helper_test.go | 1 - pkg/network/payload/extensible.go | 4 ++++ pkg/network/server.go | 20 +++++++------------- pkg/network/server_test.go | 8 ++++---- 8 files changed, 20 insertions(+), 26 deletions(-) diff --git a/cli/executor_test.go b/cli/executor_test.go index b3564baa4..e3b8e81b9 100644 --- a/cli/executor_test.go +++ b/cli/executor_test.go @@ -151,7 +151,7 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch TimePerBlock: serverConfig.TimePerBlock, }) require.NoError(t, err) - netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) go netSrv.Start(make(chan error, 1)) errCh := make(chan error, 2) rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh) diff --git a/cli/server/server.go b/cli/server/server.go index f40d1405d..0d5f4e02e 100644 --- a/cli/server/server.go +++ b/cli/server/server.go @@ -423,7 +423,7 @@ func mkConsensus(config config.Wallet, tpb time.Duration, chain *core.Blockchain return nil, fmt.Errorf("can't initialize Consensus module: %w", err) } - serv.AddExtensibleHPService(srv, consensus.Category, srv.OnPayload, srv.OnTransaction) + serv.AddConsensusService(srv, srv.OnPayload, srv.OnTransaction) return srv, nil } @@ -606,7 +606,7 @@ Main: } case sigusr2: if dbftSrv != nil { - serv.DelExtensibleHPService(dbftSrv, consensus.Category) + serv.DelConsensusService(dbftSrv) dbftSrv.Shutdown() } dbftSrv, err = mkConsensus(cfgnew.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 977f2cd90..dca2b80e6 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -40,9 +40,6 @@ const defaultTimePerBlock = 15 * time.Second // Number of nanoseconds in millisecond. const nsInMs = 1000000 -// Category is a message category for extensible payloads. -const Category = "dBFT" - // Ledger is the interface to Blockchain sufficient for Service. type Ledger interface { AddBlock(block *coreb.Block) error @@ -218,7 +215,7 @@ var ( func NewPayload(m netmode.Magic, stateRootEnabled bool) *Payload { return &Payload{ Extensible: npayload.Extensible{ - Category: Category, + Category: npayload.ConsensusCategory, }, message: message{ stateRootEnabled: stateRootEnabled, diff --git a/pkg/consensus/recovery_message.go b/pkg/consensus/recovery_message.go index 6ab826d7e..a7fa9c435 100644 --- a/pkg/consensus/recovery_message.go +++ b/pkg/consensus/recovery_message.go @@ -297,7 +297,7 @@ func getVerificationScript(i uint8, validators []crypto.PublicKey) []byte { func fromPayload(t messageType, recovery *Payload, p io.Serializable) *Payload { return &Payload{ Extensible: npayload.Extensible{ - Category: Category, + Category: npayload.ConsensusCategory, ValidBlockEnd: recovery.BlockIndex, }, message: message{ diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index c8847ed00..04b54959a 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -10,7 +10,6 @@ import ( "github.com/nspcc-dev/neo-go/internal/fakechain" "github.com/nspcc-dev/neo-go/pkg/config" - "github.com/nspcc-dev/neo-go/pkg/consensus" "github.com/nspcc-dev/neo-go/pkg/io" "github.com/nspcc-dev/neo-go/pkg/network/capability" "github.com/nspcc-dev/neo-go/pkg/network/payload" diff --git a/pkg/network/payload/extensible.go b/pkg/network/payload/extensible.go index 1ca3c3c6d..31a909ee9 100644 --- a/pkg/network/payload/extensible.go +++ b/pkg/network/payload/extensible.go @@ -11,6 +11,10 @@ import ( const maxExtensibleCategorySize = 32 +// ConsensusCategory is a message category for consensus-related extensible +// payloads. +const ConsensusCategory = "dBFT" + // Extensible represents a payload containing arbitrary data. type Extensible struct { // Category is the payload type. diff --git a/pkg/network/server.go b/pkg/network/server.go index b69be9ebc..5eceba49b 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -105,7 +105,6 @@ type ( serviceLock sync.RWMutex services map[string]Service extensHandlers map[string]func(*payload.Extensible) error - extensHighPrio string txCallback func(*transaction.Transaction) txInLock sync.Mutex @@ -301,13 +300,12 @@ func (s *Server) addExtensibleService(svc Service, category string, handler func s.addService(svc) } -// AddExtensibleHPService registers a high-priority service that handles an extensible payload of some kind. -func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) { +// AddConsensusService registers consensus service that handles transactions and dBFT extensible payloads. +func (s *Server) AddConsensusService(svc Service, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) { s.serviceLock.Lock() defer s.serviceLock.Unlock() s.txCallback = txCallback - s.extensHighPrio = category - s.addExtensibleService(svc, category, handler) + s.addExtensibleService(svc, payload.ConsensusCategory, handler) } // DelService drops a service from the list, use it when the service is stopped @@ -337,13 +335,12 @@ func (s *Server) delExtensibleService(svc Service, category string) { s.delService(svc) } -// DelExtensibleHPService unregisters a high-priority service that handles an extensible payload of some kind. -func (s *Server) DelExtensibleHPService(svc Service, category string) { +// DelConsensusService unregisters consensus service that handles transactions and dBFT extensible payloads. +func (s *Server) DelConsensusService(svc Service) { s.serviceLock.Lock() defer s.serviceLock.Unlock() s.txCallback = nil - s.extensHighPrio = "" - s.delExtensibleService(svc, category) + s.delExtensibleService(svc, payload.ConsensusCategory) } // GetNotaryPool allows to retrieve notary pool, if it's configured. @@ -1004,10 +1001,7 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error { func (s *Server) advertiseExtensible(e *payload.Extensible) { msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()})) - s.serviceLock.RLock() - hp := s.extensHighPrio - s.serviceLock.RUnlock() - if e.Category == hp { + if e.Category == payload.ConsensusCategory { // It's high priority because it directly affects consensus process, // even though it's just an inv. s.broadcastHPMessage(msg) diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index c80f4eb4b..586ca48cd 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -111,7 +111,7 @@ func TestServerStartAndShutdown(t *testing.T) { t.Run("with consensus", func(t *testing.T) { s := newTestServer(t, ServerConfig{}) cons := new(fakeConsensus) - s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) ch := startWithChannel(s) p := newLocalPeer(t, s) @@ -413,7 +413,7 @@ func TestBlock(t *testing.T) { func TestConsensus(t *testing.T) { s := newTestServer(t, ServerConfig{}) cons := new(fakeConsensus) - s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) startWithCleanup(t, s) atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4) @@ -424,7 +424,7 @@ func TestConsensus(t *testing.T) { newConsensusMessage := func(start, end uint32) *Message { pl := payload.NewExtensible() - pl.Category = consensus.Category + pl.Category = payload.ConsensusCategory pl.ValidBlockStart = start pl.ValidBlockEnd = end return NewMessage(CMDExtensible, pl) @@ -458,7 +458,7 @@ func TestConsensus(t *testing.T) { func TestTransaction(t *testing.T) { s := newTestServer(t, ServerConfig{}) cons := new(fakeConsensus) - s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction) + s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction) startWithCleanup(t, s) t.Run("good", func(t *testing.T) {