diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 96847d904..8b9d686f1 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -335,7 +335,11 @@ func (s *service) processBlock(b block.Block) { bb.Script = *(s.getBlockWitness(bb)) if err := s.Chain.AddBlock(bb); err != nil { - s.log.Warn("error on add block", zap.Error(err)) + // The block might already be added via the regular network + // interaction. + if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { + s.log.Warn("error on add block", zap.Error(err)) + } } else { s.Config.RelayBlock(bb) } diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index e26d96629..2911ffced 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -12,9 +12,10 @@ type blockQueue struct { queue *queue.PriorityQueue checkBlocks chan struct{} chain core.Blockchainer + relayF func(*block.Block) } -func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQueue { +func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { if log == nil { return nil } @@ -24,6 +25,7 @@ func newBlockQueue(capacity int, bc core.Blockchainer, log *zap.Logger) *blockQu queue: queue.NewPriorityQueue(capacity, false), checkBlocks: make(chan struct{}, 1), chain: bc, + relayF: relayer, } } @@ -45,10 +47,15 @@ func (bq *blockQueue) run() { if minblock.Index == bq.chain.BlockHeight()+1 { err := bq.chain.AddBlock(minblock) if err != nil { - bq.log.Warn("blockQueue: failed adding block into the blockchain", - zap.String("error", err.Error()), - zap.Uint32("blockHeight", bq.chain.BlockHeight()), - zap.Uint32("nextIndex", minblock.Index)) + // The block might already be added by consensus. + if _, errget := bq.chain.GetBlock(minblock.Hash()); errget != nil { + bq.log.Warn("blockQueue: failed adding block into the blockchain", + zap.String("error", err.Error()), + zap.Uint32("blockHeight", bq.chain.BlockHeight()), + zap.Uint32("nextIndex", minblock.Index)) + } + } else if bq.relayF != nil { + bq.relayF(minblock) } } } else { diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index 62a43595f..af3540096 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -12,7 +12,7 @@ import ( func TestBlockQueue(t *testing.T) { chain := &testChain{} // notice, it's not yet running - bq := newBlockQueue(0, chain, zaptest.NewLogger(t)) + bq := newBlockQueue(0, chain, zaptest.NewLogger(t), nil) blocks := make([]*block.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &block.Block{Base: block.Base{Index: uint32(i)}} diff --git a/pkg/network/server.go b/pkg/network/server.go index 681932e61..5a45ed393 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -90,7 +90,6 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* s := &Server{ ServerConfig: config, chain: chain, - bQueue: newBlockQueue(maxBlockBatch, chain, log), id: randomID(), quit: make(chan struct{}), register: make(chan Peer), @@ -99,6 +98,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* connected: atomic.NewBool(false), log: log, } + s.bQueue = newBlockQueue(maxBlockBatch, chain, log, s.relayBlock) srv, err := consensus.NewService(consensus.Config{ Logger: log, @@ -734,7 +734,11 @@ func (s *Server) broadcastHPMessage(msg *Message) { // relayBlock tells all the other connected nodes about the given block. func (s *Server) relayBlock(b *block.Block) { msg := s.MkMsg(CMDInv, payload.NewInventory(payload.BlockType, []util.Uint256{b.Hash()})) - s.broadcastMessage(msg) + // Filter out nodes that are more current (avoid spamming the network + // during initial sync). + s.iteratePeersWithSendMsg(msg, Peer.EnqueuePacket, func(p Peer) bool { + return p.Handshaked() && p.LastBlockIndex() < b.Index + }) } // verifyAndPoolTX verifies the TX and adds it to the local mempool.