network: fix block relaying, don't spit out useless errors

We can only add one block of the given height and we have two competing
goroutines to do that --- consensus and block queue. Whomever adds the block
first shouldn't trigger an error in another one.

Fix block relaying for blocks added via the block queue also, previously one
consensus-generated blocks were broadcasted.
This commit is contained in:
Roman Khimov 2020-02-04 19:32:29 +03:00
parent f9963cca37
commit b9b77ac1be
4 changed files with 24 additions and 9 deletions

View file

@ -335,7 +335,11 @@ func (s *service) processBlock(b block.Block) {
bb.Script = *(s.getBlockWitness(bb))
if err := s.Chain.AddBlock(bb); err != nil {
// The block might already be added via the regular network
// interaction.
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
s.log.Warn("error on add block", zap.Error(err))
}
} else {
s.Config.RelayBlock(bb)
}

View file

@ -12,9 +12,10 @@ type blockQueue struct {
queue *queue.PriorityQueue
checkBlocks chan struct{}
chain core.Blockchainer
relayF func(*block.Block)
}
func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQueue {
func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger, relayer func(*block.Block)) *blockQueue {
if log == nil {
return nil
}
@ -24,6 +25,7 @@ func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQu
queue: queue.NewPriorityQueue(capacity, false),
checkBlocks: make(chan struct{}, 1),
chain: bc,
relayF: relayer,
}
}
@ -45,11 +47,16 @@ func (bq *blockQueue) run() {
if minblock.Index == bq.chain.BlockHeight()+1 {
err := bq.chain.AddBlock(minblock)
if err != nil {
// The block might already be added by consensus.
if _, errget := bq.chain.GetBlock(minblock.Hash()); errget != nil {
bq.log.Warn("blockQueue: failed adding block into the blockchain",
zap.String("error", err.Error()),
zap.Uint32("blockHeight", bq.chain.BlockHeight()),
zap.Uint32("nextIndex", minblock.Index))
}
} else if bq.relayF != nil {
bq.relayF(minblock)
}
}
} else {
break

View file

@ -12,7 +12,7 @@ import (
func TestBlockQueue(t *testing.T) {
chain := &testChain{}
// notice, it's not yet running
bq := newBlockQueue(0, chain, zaptest.NewLogger(t))
bq := newBlockQueue(0, chain, zaptest.NewLogger(t), nil)
blocks := make([]*block.Block, 11)
for i := 1; i < 11; i++ {
blocks[i] = &block.Block{Base: block.Base{Index: uint32(i)}}

View file

@ -90,7 +90,6 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
s := &Server{
ServerConfig: config,
chain: chain,
bQueue: newBlockQueue(maxBlockBatch, chain, log),
id: randomID(),
quit: make(chan struct{}),
register: make(chan Peer),
@ -99,6 +98,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
connected: atomic.NewBool(false),
log: log,
}
s.bQueue = newBlockQueue(maxBlockBatch, chain, log, s.relayBlock)
srv, err := consensus.NewService(consensus.Config{
Logger: log,
@ -734,7 +734,11 @@ func (s *Server) broadcastHPMessage(msg *Message) {
// relayBlock tells all the other connected nodes about the given block.
func (s *Server) relayBlock(b *block.Block) {
msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()}))
s.broadcastMessage(msg)
// Filter out nodes that are more current (avoid spamming the network
// during initial sync).
s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool {
return p.Handshaked() && p.LastBlockIndex() < b.Index
})
}
// verifyAndPoolTX verifies the TX and adds it to the local mempool.