diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 0f59b733a..e12b9d37e 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -70,9 +70,6 @@ type Config struct { // Broadcast is a callback which is called to notify server // about new consensus payload to sent. Broadcast func(p *Payload) - // RelayBlock is a callback that is called to notify server - // about the new block that needs to be broadcasted. - RelayBlock func(b *coreb.Block) // Chain is a core.Blockchainer instance. Chain core.Blockchainer // RequestTx is a callback to which will be called @@ -352,8 +349,6 @@ func (s *service) processBlock(b block.Block) { if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { s.log.Warn("error on add block", zap.Error(err)) } - } else { - s.Config.RelayBlock(bb) } } diff --git a/pkg/network/server.go b/pkg/network/server.go index b6c785b9c..1836cdf92 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -105,16 +105,14 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* if !s.consensusStarted.Load() { s.tryStartConsensus() } - s.relayBlock(b) }) srv, err := consensus.NewService(consensus.Config{ - Logger: log, - Broadcast: s.handleNewPayload, - RelayBlock: s.relayBlock, - Chain: chain, - RequestTx: s.requestTx, - Wallet: config.Wallet, + Logger: log, + Broadcast: s.handleNewPayload, + Chain: chain, + RequestTx: s.requestTx, + Wallet: config.Wallet, TimePerBlock: config.TimePerBlock, }) @@ -176,6 +174,7 @@ func (s *Server) Start(errChan chan error) { s.discovery.BackFill(s.Seeds...) go s.broadcastTxLoop() + go s.relayBlocksLoop() go s.bQueue.run() go s.transport.Accept() setServerAndNodeVersions(s.UserAgent, strconv.FormatUint(uint64(s.id), 10)) @@ -788,14 +787,25 @@ func (s *Server) broadcastHPMessage(msg *Message) { s.iteratePeersWithSendMsg(msg, Peer.EnqueueHPPacket, nil) } -// relayBlock tells all the other connected nodes about the given block. -func (s *Server) relayBlock(b *block.Block) { - msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) - // Filter out nodes that are more current (avoid spamming the network - // during initial sync). - s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { - return p.Handshaked() && p.LastBlockIndex() < b.Index - }) +// relayBlocksLoop subscribes to new blocks in the ledger and broadcasts them +// to the network. Intended to be run as a separate goroutine. +func (s *Server) relayBlocksLoop() { + ch := make(chan *block.Block, 2) // Some buffering to smooth out possible egressing delays. + s.chain.SubscribeForBlocks(ch) + for { + select { + case <-s.quit: + s.chain.UnsubscribeFromBlocks(ch) + return + case b := <-ch: + msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) + // Filter out nodes that are more current (avoid spamming the network + // during initial sync). + s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { + return p.Handshaked() && p.LastBlockIndex() < b.Index + }) + } + } } // verifyAndPoolTX verifies the TX and adds it to the local mempool.