network: get blocks directly from the chain for rebroadcasting

Simplify network<->consensus relations, also broadcast blocks received by
other means like RPC.
This commit is contained in:
Roman Khimov 2020-05-07 23:00:38 +03:00
parent 462022bbdd
commit b7d2b659b4
2 changed files with 25 additions and 20 deletions

View file

@ -71,9 +71,6 @@ type Config struct {
// Broadcast is a callback which is called to notify server
// about new consensus payload to sent.
Broadcast func(p *Payload)
// RelayBlock is a callback that is called to notify server
// about the new block that needs to be broadcasted.
RelayBlock func(b *coreb.Block)
// Chain is a core.Blockchainer instance.
Chain blockchainer.Blockchainer
// RequestTx is a callback to which will be called
@ -348,8 +345,6 @@ func (s *service) processBlock(b block.Block) {
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
s.log.Warn("error on add block", zap.Error(err))
}
} else {
s.Config.RelayBlock(bb)
}
}

View file

@ -106,13 +106,11 @@ func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Lo
if !s.consensusStarted.Load() {
s.tryStartConsensus()
}
s.relayBlock(b)
})
srv, err := consensus.NewService(consensus.Config{
Logger: log,
Broadcast: s.handleNewPayload,
RelayBlock: s.relayBlock,
Chain: chain,
RequestTx: s.requestTx,
Wallet: config.Wallet,
@ -171,6 +169,7 @@ func (s *Server) Start(errChan chan error) {
s.discovery.BackFill(s.Seeds...)
go s.broadcastTxLoop()
go s.relayBlocksLoop()
go s.bQueue.run()
go s.transport.Accept()
setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10))
@ -795,8 +794,17 @@ func (s *Server) broadcastHPMessage(msg *Message) {
s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil)
}
// relayBlock tells all the other connected nodes about the given block.
func (s *Server) relayBlock(b *block.Block) {
// relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them
// to the network. Intended to be run as a separate goroutine.
func (s *Server) relayBlocksLoop() {
ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays.
s.chain.SubscribeForBlocks(ch)
for {
select {
case <-s.quit:
s.chain.UnsubscribeFromBlocks(ch)
return
case b := <-ch:
msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()}))
// Filter out nodes that are more current (avoid spamming the network
// during initial sync).
@ -804,6 +812,8 @@ func (s *Server) relayBlock(b *block.Block) {
return p.Handshaked() && p.LastBlockIndex() < b.Index
})
}
}
}
// verifyAndPoolTX verifies the TX and adds it to the local mempool.
func (s *Server) verifyAndPoolTX(t *transaction.Transaction) RelayReason {