From d52a06a82d351d179ee22d694935cb945010cae4 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 18 Jan 2022 00:01:26 +0300 Subject: [PATCH 1/3] network: move index-position relation into helper Just to make things more clear, no functional changes. --- pkg/network/blockqueue.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index e63947d9e..ef85c878f 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -32,6 +32,10 @@ const ( blockCacheSize = 2000 ) +func indexToPosition(i uint32) int { + return int(i) % blockCacheSize +} + func newBlockQueue(capacity int, bc Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { if log == nil { return nil @@ -56,12 +60,12 @@ func (bq *blockQueue) run() { } for { h := bq.chain.BlockHeight() - pos := int(h+1) % blockCacheSize + pos := indexToPosition(h + 1) bq.queueLock.Lock() b := bq.queue[pos] // The chain moved forward using blocks from other sources (consensus). for i := lastHeight; i < h; i++ { - old := int(i+1) % blockCacheSize + old := indexToPosition(i + 1) if bq.queue[old] != nil && bq.queue[old].Index == i { bq.len-- bq.queue[old] = nil @@ -106,7 +110,7 @@ func (bq *blockQueue) putBlock(block *block.Block) error { bq.queueLock.Unlock() return nil } - pos := block.Index % blockCacheSize + pos := indexToPosition(block.Index) // If we already have it, keep the old block, throw away new one. if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { bq.len++ From 03fd91e85796dc349a9520e3b9ee6922c58298e0 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 18 Jan 2022 00:04:29 +0300 Subject: [PATCH 2/3] network: use assert.Eventually in bq test Simpler and more efficient (polls more often and completes the test sooner). --- pkg/network/blockqueue_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index cffdb1873..7237b8c99 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -37,11 +37,7 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, 4, bq.length()) go bq.run() // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one - for i := 0; i < 5; i++ { - if chain.BlockHeight() != 4 { - time.Sleep(time.Second) - } - } + assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // put some old blocks @@ -64,11 +60,7 @@ func TestBlockQueue(t *testing.T) { assert.NoError(t, bq.putBlock(blocks[6])) assert.NoError(t, bq.putBlock(blocks[5])) // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one - for i := 0; i < 5; i++ { - if chain.BlockHeight() != 8 { - time.Sleep(time.Second) - } - } + assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(8), chain.BlockHeight()) bq.discard() From 89d754da6f77116b16114f5a843d0894df1a5415 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 18 Jan 2022 00:04:41 +0300 Subject: [PATCH 3/3] network: don't request blocks we already have in the queue Fixes #2258. --- pkg/network/blockqueue.go | 13 ++++++++++++- pkg/network/blockqueue_test.go | 5 +++++ pkg/network/server.go | 11 ++++++++++- 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index ef85c878f..f3437d912 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -17,8 +17,9 @@ type Blockqueuer interface { type blockQueue struct { log *zap.Logger - queueLock sync.Mutex + queueLock sync.RWMutex queue []*block.Block + lastQ uint32 checkBlocks chan struct{} chain Blockqueuer relayF func(*block.Block) @@ -115,6 +116,10 @@ func (bq *blockQueue) putBlock(block *block.Block) error { if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { bq.len++ bq.queue[pos] = block + for pos < blockCacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { + bq.lastQ = bq.queue[pos].Index + pos++ + } } l := bq.len bq.queueLock.Unlock() @@ -129,6 +134,12 @@ func (bq *blockQueue) putBlock(block *block.Block) error { return nil } +func (bq *blockQueue) lastQueued() uint32 { + bq.queueLock.RLock() + defer bq.queueLock.RUnlock() + return bq.lastQ +} + func (bq *blockQueue) discard() { if bq.discarded.CAS(false, true) { close(bq.checkBlocks) diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index 7237b8c99..137928c72 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -22,6 +22,7 @@ func TestBlockQueue(t *testing.T) { for i := 3; i < 5; i++ { assert.NoError(t, bq.putBlock(blocks[i])) } + assert.Equal(t, uint32(0), bq.lastQueued()) // nothing should be put into the blockchain assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 2, bq.length()) @@ -30,6 +31,7 @@ func TestBlockQueue(t *testing.T) { assert.NoError(t, bq.putBlock(blocks[i])) } // but they're still not put into the blockchain, because bq isn't running + assert.Equal(t, uint32(4), bq.lastQueued()) assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 4, bq.length()) // block with too big index is dropped @@ -38,12 +40,14 @@ func TestBlockQueue(t *testing.T) { go bq.run() // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond) + assert.Equal(t, uint32(4), bq.lastQueued()) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // put some old blocks for i := 1; i < 5; i++ { assert.NoError(t, bq.putBlock(blocks[i])) } + assert.Equal(t, uint32(4), bq.lastQueued()) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // unexpected blocks with run() active @@ -61,6 +65,7 @@ func TestBlockQueue(t *testing.T) { assert.NoError(t, bq.putBlock(blocks[5])) // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) + assert.Equal(t, uint32(8), bq.lastQueued()) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(8), chain.BlockHeight()) bq.discard() diff --git a/pkg/network/server.go b/pkg/network/server.go index 98815b8af..371842f80 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1070,7 +1070,16 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 2. Send requests for chunk in increasing order. // 3. After all requests were sent, request random height. func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error { - pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) + h := bq.BlockHeight() + pl := getRequestBlocksPayload(p, h, &s.lastRequestedBlock) + lq := s.bQueue.lastQueued() + if lq > pl.IndexStart { + c := int16(h + blockCacheSize - lq) + if c < payload.MaxHashesCount { + pl.Count = c + } + pl.IndexStart = lq + 1 + } return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, pl)) }