network: get blocks directly from the chain for rebroadcasting

Simplify network<->consensus relations, also broadcast blocks received by
other means like RPC.
This commit is contained in:
Roman Khimov 2020-05-07 23:00:38 +03:00
parent dd8bcfae47
commit d686fe4e5d
2 changed files with 25 additions and 20 deletions

View file

@ -70,9 +70,6 @@ type Config struct {
// Broadcast is a callback which is called to notify server // Broadcast is a callback which is called to notify server
// about new consensus payload to sent. // about new consensus payload to sent.
Broadcast func(p *Payload) Broadcast func(p *Payload)
// RelayBlock is a callback that is called to notify server
// about the new block that needs to be broadcasted.
RelayBlock func(b *coreb.Block)
// Chain is a core.Blockchainer instance. // Chain is a core.Blockchainer instance.
Chain core.Blockchainer Chain core.Blockchainer
// RequestTx is a callback to which will be called // RequestTx is a callback to which will be called
@ -352,8 +349,6 @@ func (s *service) processBlock(b block.Block) {
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
s.log.Warn("error on add block", zap.Error(err)) s.log.Warn("error on add block", zap.Error(err))
} }
} else {
s.Config.RelayBlock(bb)
} }
} }

View file

@ -105,13 +105,11 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
if !s.consensusStarted.Load() { if !s.consensusStarted.Load() {
s.tryStartConsensus() s.tryStartConsensus()
} }
s.relayBlock(b)
}) })
srv, err := consensus.NewService(consensus.Config{ srv, err := consensus.NewService(consensus.Config{
Logger: log, Logger: log,
Broadcast: s.handleNewPayload, Broadcast: s.handleNewPayload,
RelayBlock: s.relayBlock,
Chain: chain, Chain: chain,
RequestTx: s.requestTx, RequestTx: s.requestTx,
Wallet: config.Wallet, Wallet: config.Wallet,
@ -176,6 +174,7 @@ func (s *Server) Start(errChan chan error) {
s.discovery.BackFill(s.Seeds...) s.discovery.BackFill(s.Seeds...)
go s.broadcastTxLoop() go s.broadcastTxLoop()
go s.relayBlocksLoop()
go s.bQueue.run() go s.bQueue.run()
go s.transport.Accept() go s.transport.Accept()
setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10))
@ -788,8 +787,17 @@ func (s *Server) broadcastHPMessage(msg *Message) {
s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil) s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil)
} }
// relayBlock tells all the other connected nodes about the given block. // relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them
func (s *Server) relayBlock(b *block.Block) { // to the network. Intended to be run as a separate goroutine.
func (s *Server) relayBlocksLoop() {
ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays.
s.chain.SubscribeForBlocks(ch)
for {
select {
case <-s.quit:
s.chain.UnsubscribeFromBlocks(ch)
return
case b := <-ch:
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()}))
// Filter out nodes that are more current (avoid spamming the network // Filter out nodes that are more current (avoid spamming the network
// during initial sync). // during initial sync).
@ -797,6 +805,8 @@ func (s *Server) relayBlock(b *block.Block) {
return p.Handshaked() && p.LastBlockIndex() < b.Index return p.Handshaked() && p.LastBlockIndex() < b.Index
}) })
} }
}
}
// verifyAndPoolTX verifies the TX and adds it to the local mempool. // verifyAndPoolTX verifies the TX and adds it to the local mempool.
func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason { func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason {