From b7d2b659b4805cbd3a8c42cff4942eae2f5b6d0b Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Thu, 7 May 2020 23:00:38 +0300 Subject: [PATCH] network: get blocks directly from the chain for rebroadcasting Simplify network<->consensus relations, also broadcast blocks received by other means like RPC. --- pkg/consensus/consensus.go | 5 ----- pkg/network/server.go | 40 ++++++++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 8f8bbd7d4..190f3577c 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -71,9 +71,6 @@ type Config struct { // Broadcast is a callback which is called to notify server // about new consensus payload to sent. Broadcast func(p *Payload) - // RelayBlock is a callback that is called to notify server - // about the new block that needs to be broadcasted. - RelayBlock func(b *coreb.Block) // Chain is a core.Blockchainer instance. Chain blockchainer.Blockchainer // RequestTx is a callback to which will be called @@ -348,8 +345,6 @@ func (s *service) processBlock(b block.Block) { if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { s.log.Warn("error on add block", zap.Error(err)) } - } else { - s.Config.RelayBlock(bb) } } diff --git a/pkg/network/server.go b/pkg/network/server.go index 0031ec877..c9a5e1038 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -106,16 +106,14 @@ func NewServer(config ServerConfig, chain blockchainer.Blockchainer, log *zap.Lo if !s.consensusStarted.Load() { s.tryStartConsensus() } - s.relayBlock(b) }) srv, err := consensus.NewService(consensus.Config{ - Logger: log, - Broadcast: s.handleNewPayload, - RelayBlock: s.relayBlock, - Chain: chain, - RequestTx: s.requestTx, - Wallet: config.Wallet, + Logger: log, + Broadcast: s.handleNewPayload, + Chain: chain, + RequestTx: s.requestTx, + Wallet: config.Wallet, TimePerBlock: config.TimePerBlock, }) @@ -171,6 +169,7 @@ func (s *Server) Start(errChan chan error) { s.discovery.BackFill(s.Seeds...) go s.broadcastTxLoop() + go s.relayBlocksLoop() go s.bQueue.run() go s.transport.Accept() setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) @@ -795,14 +794,25 @@ func (s *Server) broadcastHPMessage(msg *Message) { s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil) } -// relayBlock tells all the other connected nodes about the given block. -func (s *Server) relayBlock(b *block.Block) { - msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) - // Filter out nodes that are more current (avoid spamming the network - // during initial sync). - s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { - return p.Handshaked() && p.LastBlockIndex() < b.Index - }) +// relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them +// to the network. Intended to be run as a separate goroutine. +func (s *Server) relayBlocksLoop() { + ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays. + s.chain.SubscribeForBlocks(ch) + for { + select { + case <-s.quit: + s.chain.UnsubscribeFromBlocks(ch) + return + case b := <-ch: + msg := NewMessage(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) + // Filter out nodes that are more current (avoid spamming the network + // during initial sync). + s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { + return p.Handshaked() && p.LastBlockIndex() < b.Index + }) + } + } } // verifyAndPoolTX verifies the TX and adds it to the local mempool.