network/consensus: always process dBFT messages as high priority

Move category definition from consensus to payload, consensus service is the
one of its kind (HP), so network.Server can be adjusted accordingly.
This commit is contained in:
Roman Khimov 2022-07-28 18:30:14 +03:00
parent ff93a680eb
commit 9b0ea2c21b
8 changed files with 20 additions and 26 deletions

View file

@ -151,7 +151,7 @@ func newTestChain(t *testing.T, f func(*config.Config), run bool) (*core.Blockch
TimePerBlock: serverConfig.TimePerBlock,
})
require.NoError(t, err)
netSrv.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
netSrv.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
go netSrv.Start(make(chan error, 1))
errCh := make(chan error, 2)
rpcServer := rpcsrv.New(chain, cfg.ApplicationConfiguration.RPC, netSrv, nil, logger, errCh)

View file

@ -423,7 +423,7 @@ func mkConsensus(config config.Wallet, tpb time.Duration, chain *core.Blockchain
return nil, fmt.Errorf("can't initialize Consensus module: %w", err)
}
serv.AddExtensibleHPService(srv, consensus.Category, srv.OnPayload, srv.OnTransaction)
serv.AddConsensusService(srv, srv.OnPayload, srv.OnTransaction)
return srv, nil
}
@ -606,7 +606,7 @@ Main:
}
case sigusr2:
if dbftSrv != nil {
serv.DelExtensibleHPService(dbftSrv, consensus.Category)
serv.DelConsensusService(dbftSrv)
dbftSrv.Shutdown()
}
dbftSrv, err = mkConsensus(cfgnew.ApplicationConfiguration.UnlockWallet, serverConfig.TimePerBlock, chain, serv, log)

View file

@ -40,9 +40,6 @@ const defaultTimePerBlock = 15 * time.Second
// Number of nanoseconds in millisecond.
const nsInMs = 1000000
// Category is a message category for extensible payloads.
const Category = "dBFT"
// Ledger is the interface to Blockchain sufficient for Service.
type Ledger interface {
AddBlock(block *coreb.Block) error
@ -218,7 +215,7 @@ var (
func NewPayload(m netmode.Magic, stateRootEnabled bool) *Payload {
return &Payload{
Extensible: npayload.Extensible{
Category: Category,
Category: npayload.ConsensusCategory,
},
message: message{
stateRootEnabled: stateRootEnabled,

View file

@ -297,7 +297,7 @@ func getVerificationScript(i uint8, validators []crypto.PublicKey) []byte {
func fromPayload(t messageType, recovery *Payload, p io.Serializable) *Payload {
return &Payload{
Extensible: npayload.Extensible{
Category: Category,
Category: npayload.ConsensusCategory,
ValidBlockEnd: recovery.BlockIndex,
},
message: message{

View file

@ -10,7 +10,6 @@ import (
"github.com/nspcc-dev/neo-go/internal/fakechain"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/consensus"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/capability"
"github.com/nspcc-dev/neo-go/pkg/network/payload"

View file

@ -11,6 +11,10 @@ import (
const maxExtensibleCategorySize = 32
// ConsensusCategory is a message category for consensus-related extensible
// payloads.
const ConsensusCategory = "dBFT"
// Extensible represents a payload containing arbitrary data.
type Extensible struct {
// Category is the payload type.

View file

@ -105,7 +105,6 @@ type (
serviceLock sync.RWMutex
services map[string]Service
extensHandlers map[string]func(*payload.Extensible) error
extensHighPrio string
txCallback func(*transaction.Transaction)
txInLock sync.Mutex
@ -301,13 +300,12 @@ func (s *Server) addExtensibleService(svc Service, category string, handler func
s.addService(svc)
}
// AddExtensibleHPService registers a high-priority service that handles an extensible payload of some kind.
func (s *Server) AddExtensibleHPService(svc Service, category string, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) {
// AddConsensusService registers consensus service that handles transactions and dBFT extensible payloads.
func (s *Server) AddConsensusService(svc Service, handler func(*payload.Extensible) error, txCallback func(*transaction.Transaction)) {
s.serviceLock.Lock()
defer s.serviceLock.Unlock()
s.txCallback = txCallback
s.extensHighPrio = category
s.addExtensibleService(svc, category, handler)
s.addExtensibleService(svc, payload.ConsensusCategory, handler)
}
// DelService drops a service from the list, use it when the service is stopped
@ -337,13 +335,12 @@ func (s *Server) delExtensibleService(svc Service, category string) {
s.delService(svc)
}
// DelExtensibleHPService unregisters a high-priority service that handles an extensible payload of some kind.
func (s *Server) DelExtensibleHPService(svc Service, category string) {
// DelConsensusService unregisters consensus service that handles transactions and dBFT extensible payloads.
func (s *Server) DelConsensusService(svc Service) {
s.serviceLock.Lock()
defer s.serviceLock.Unlock()
s.txCallback = nil
s.extensHighPrio = ""
s.delExtensibleService(svc, category)
s.delExtensibleService(svc, payload.ConsensusCategory)
}
// GetNotaryPool allows to retrieve notary pool, if it's configured.
@ -1004,10 +1001,7 @@ func (s *Server) handleExtensibleCmd(e *payload.Extensible) error {
func (s *Server) advertiseExtensible(e *payload.Extensible) {
msg := NewMessage(CMDInv, payload.NewInventory(payload.ExtensibleType, []util.Uint256{e.Hash()}))
s.serviceLock.RLock()
hp := s.extensHighPrio
s.serviceLock.RUnlock()
if e.Category == hp {
if e.Category == payload.ConsensusCategory {
// It's high priority because it directly affects consensus process,
// even though it's just an inv.
s.broadcastHPMessage(msg)

View file

@ -111,7 +111,7 @@ func TestServerStartAndShutdown(t *testing.T) {
t.Run("with consensus", func(t *testing.T) {
s := newTestServer(t, ServerConfig{})
cons := new(fakeConsensus)
s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
ch := startWithChannel(s)
p := newLocalPeer(t, s)
@ -413,7 +413,7 @@ func TestBlock(t *testing.T) {
func TestConsensus(t *testing.T) {
s := newTestServer(t, ServerConfig{})
cons := new(fakeConsensus)
s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
startWithCleanup(t, s)
atomic2.StoreUint32(&s.chain.(*fakechain.FakeChain).Blockheight, 4)
@ -424,7 +424,7 @@ func TestConsensus(t *testing.T) {
newConsensusMessage := func(start, end uint32) *Message {
pl := payload.NewExtensible()
pl.Category = consensus.Category
pl.Category = payload.ConsensusCategory
pl.ValidBlockStart = start
pl.ValidBlockEnd = end
return NewMessage(CMDExtensible, pl)
@ -458,7 +458,7 @@ func TestConsensus(t *testing.T) {
func TestTransaction(t *testing.T) {
s := newTestServer(t, ServerConfig{})
cons := new(fakeConsensus)
s.AddExtensibleHPService(cons, consensus.Category, cons.OnPayload, cons.OnTransaction)
s.AddConsensusService(cons, cons.OnPayload, cons.OnTransaction)
startWithCleanup(t, s)
t.Run("good", func(t *testing.T) {