diff --git a/pkg/network/blockqueue.go b/pkg/network/blockqueue.go index e63947d9e..f3437d912 100644 --- a/pkg/network/blockqueue.go +++ b/pkg/network/blockqueue.go @@ -17,8 +17,9 @@ type Blockqueuer interface { type blockQueue struct { log *zap.Logger - queueLock sync.Mutex + queueLock sync.RWMutex queue []*block.Block + lastQ uint32 checkBlocks chan struct{} chain Blockqueuer relayF func(*block.Block) @@ -32,6 +33,10 @@ const ( blockCacheSize = 2000 ) +func indexToPosition(i uint32) int { + return int(i) % blockCacheSize +} + func newBlockQueue(capacity int, bc Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue { if log == nil { return nil @@ -56,12 +61,12 @@ func (bq *blockQueue) run() { } for { h := bq.chain.BlockHeight() - pos := int(h+1) % blockCacheSize + pos := indexToPosition(h + 1) bq.queueLock.Lock() b := bq.queue[pos] // The chain moved forward using blocks from other sources (consensus). for i := lastHeight; i < h; i++ { - old := int(i+1) % blockCacheSize + old := indexToPosition(i + 1) if bq.queue[old] != nil && bq.queue[old].Index == i { bq.len-- bq.queue[old] = nil @@ -106,11 +111,15 @@ func (bq *blockQueue) putBlock(block *block.Block) error { bq.queueLock.Unlock() return nil } - pos := block.Index % blockCacheSize + pos := indexToPosition(block.Index) // If we already have it, keep the old block, throw away new one. if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { bq.len++ bq.queue[pos] = block + for pos < blockCacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { + bq.lastQ = bq.queue[pos].Index + pos++ + } } l := bq.len bq.queueLock.Unlock() @@ -125,6 +134,12 @@ func (bq *blockQueue) putBlock(block *block.Block) error { return nil } +func (bq *blockQueue) lastQueued() uint32 { + bq.queueLock.RLock() + defer bq.queueLock.RUnlock() + return bq.lastQ +} + func (bq *blockQueue) discard() { if bq.discarded.CAS(false, true) { close(bq.checkBlocks) diff --git a/pkg/network/blockqueue_test.go b/pkg/network/blockqueue_test.go index cffdb1873..137928c72 100644 --- a/pkg/network/blockqueue_test.go +++ b/pkg/network/blockqueue_test.go @@ -22,6 +22,7 @@ func TestBlockQueue(t *testing.T) { for i := 3; i < 5; i++ { assert.NoError(t, bq.putBlock(blocks[i])) } + assert.Equal(t, uint32(0), bq.lastQueued()) // nothing should be put into the blockchain assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 2, bq.length()) @@ -30,6 +31,7 @@ func TestBlockQueue(t *testing.T) { assert.NoError(t, bq.putBlock(blocks[i])) } // but they're still not put into the blockchain, because bq isn't running + assert.Equal(t, uint32(4), bq.lastQueued()) assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 4, bq.length()) // block with too big index is dropped @@ -37,17 +39,15 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, 4, bq.length()) go bq.run() // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one - for i := 0; i < 5; i++ { - if chain.BlockHeight() != 4 { - time.Sleep(time.Second) - } - } + assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond) + assert.Equal(t, uint32(4), bq.lastQueued()) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // put some old blocks for i := 1; i < 5; i++ { assert.NoError(t, bq.putBlock(blocks[i])) } + assert.Equal(t, uint32(4), bq.lastQueued()) assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // unexpected blocks with run() active @@ -64,11 +64,8 @@ func TestBlockQueue(t *testing.T) { assert.NoError(t, bq.putBlock(blocks[6])) assert.NoError(t, bq.putBlock(blocks[5])) // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one - for i := 0; i < 5; i++ { - if chain.BlockHeight() != 8 { - time.Sleep(time.Second) - } - } + assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) + assert.Equal(t, uint32(8), bq.lastQueued()) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(8), chain.BlockHeight()) bq.discard() diff --git a/pkg/network/server.go b/pkg/network/server.go index 98815b8af..371842f80 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1070,7 +1070,16 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // 2. Send requests for chunk in increasing order. // 3. After all requests were sent, request random height. func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error { - pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) + h := bq.BlockHeight() + pl := getRequestBlocksPayload(p, h, &s.lastRequestedBlock) + lq := s.bQueue.lastQueued() + if lq > pl.IndexStart { + c := int16(h + blockCacheSize - lq) + if c < payload.MaxHashesCount { + pl.Count = c + } + pl.IndexStart = lq + 1 + } return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, pl)) }