From 89d754da6f77116b16114f5a843d0894df1a5415 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 18 Jan 2022 00:04:41 +0300 Subject: [PATCH] network: don't request blocks we already have in the queue Fixes #2258. --- pkg/network/blockqueue.go | 13 ++++++++++++- pkg/network/blockqueue_test.go | 5 +++++ pkg/network/server.go | 11 ++++++++++- 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index ef85c878f..f3437d912 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -17,8 +17,9 @@ type Blockqueuer interface { type blockQueue struct { log *zap.Logger - queueLock sync.Mutex + queueLock sync.RWMutex queue []*block.Block + lastQ uint32 checkBlocks chan struct{} chain Blockqueuer relayF func(*block.Block) @@ -115,6 +116,10 @@ func (bq *blockQueue) putBlock(block *block.Block) error { if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { bq.len++ bq.queue[pos] = block + for pos < blockCacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { + bq.lastQ = bq.queue[pos].Index + pos++ + } } l := bq.len bq.queueLock.Unlock() @@ -129,6 +134,12 @@ func (bq *blockQueue) putBlock(block *block.Block) error { return nil } +func (bq *blockQueue) lastQueued() uint32 { + bq.queueLock.RLock() + defer bq.queueLock.RUnlock() + return bq.lastQ +} + func (bq *blockQueue) discard() { if bq.discarded.CAS(false, true) { close(bq.checkBlocks) diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index 7237b8c99..137928c72 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -22,6 +22,7 @@ func TestBlockQueue(t *testing.T) { for i := 3; i < 5; i++ { assert.NoError(t, bq.putBlock(blocks[i])) } + assert.Equal(t, uint32(0), bq.lastQueued()) // nothing should be put into the blockchain assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 2, bq.length()) @@ -30,6 +31,7 @@ func TestBlockQueue(t *testing.T) { assert.NoError(t, bq.putBlock(blocks[i])) } // but they're still not put into the blockchain, because bq isn't running + assert.Equal(t, uint32(4), bq.lastQueued()) assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 4, bq.length()) // block with too big index is dropped @@ -38,12 +40,14 @@ func TestBlockQueue(t *testing.T) { go bq.run() // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond) + assert.Equal(t, uint32(4), bq.lastQueued()) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // put some old blocks for i := 1; i < 5; i++ { assert.NoError(t, bq.putBlock(blocks[i])) } + assert.Equal(t, uint32(4), bq.lastQueued()) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // unexpected blocks with run() active @@ -61,6 +65,7 @@ func TestBlockQueue(t *testing.T) { assert.NoError(t, bq.putBlock(blocks[5])) // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) + assert.Equal(t, uint32(8), bq.lastQueued()) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(8), chain.BlockHeight()) bq.discard() diff --git a/pkg/network/server.go b/pkg/network/server.go index 98815b8af..371842f80 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1070,7 +1070,16 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 2. Send requests for chunk in increasing order. // 3. After all requests were sent, request random height. func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error { - pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) + h := bq.BlockHeight() + pl := getRequestBlocksPayload(p, h, &s.lastRequestedBlock) + lq := s.bQueue.lastQueued() + if lq > pl.IndexStart { + c := int16(h + blockCacheSize - lq) + if c < payload.MaxHashesCount { + pl.Count = c + } + pl.IndexStart = lq + 1 + } return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, pl)) }