network: don't request blocks we already have in the queue

Fixes #2258.
This commit is contained in:
Roman Khimov 2022-01-18 00:04:41 +03:00
parent 03fd91e857
commit 89d754da6f
3 changed files with 27 additions and 2 deletions

View file

@ -17,8 +17,9 @@ type Blockqueuer interface {
type blockQueue struct { type blockQueue struct {
log *zap.Logger log *zap.Logger
queueLock sync.Mutex queueLock sync.RWMutex
queue []*block.Block queue []*block.Block
lastQ uint32
checkBlocks chan struct{} checkBlocks chan struct{}
chain Blockqueuer chain Blockqueuer
relayF func(*block.Block) relayF func(*block.Block)
@ -115,6 +116,10 @@ func (bq *blockQueue) putBlock(block *block.Block) error {
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
bq.len++ bq.len++
bq.queue[pos] = block bq.queue[pos] = block
for pos < blockCacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
bq.lastQ = bq.queue[pos].Index
pos++
}
} }
l := bq.len l := bq.len
bq.queueLock.Unlock() bq.queueLock.Unlock()
@ -129,6 +134,12 @@ func (bq *blockQueue) putBlock(block *block.Block) error {
return nil return nil
} }
func (bq *blockQueue) lastQueued() uint32 {
bq.queueLock.RLock()
defer bq.queueLock.RUnlock()
return bq.lastQ
}
func (bq *blockQueue) discard() { func (bq *blockQueue) discard() {
if bq.discarded.CAS(false, true) { if bq.discarded.CAS(false, true) {
close(bq.checkBlocks) close(bq.checkBlocks)

View file

@ -22,6 +22,7 @@ func TestBlockQueue(t *testing.T) {
for i := 3; i < 5; i++ { for i := 3; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i])) assert.NoError(t, bq.putBlock(blocks[i]))
} }
assert.Equal(t, uint32(0), bq.lastQueued())
// nothing should be put into the blockchain // nothing should be put into the blockchain
assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 2, bq.length()) assert.Equal(t, 2, bq.length())
@ -30,6 +31,7 @@ func TestBlockQueue(t *testing.T) {
assert.NoError(t, bq.putBlock(blocks[i])) assert.NoError(t, bq.putBlock(blocks[i]))
} }
// but they're still not put into the blockchain, because bq isn't running // but they're still not put into the blockchain, because bq isn't running
assert.Equal(t, uint32(4), bq.lastQueued())
assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 4, bq.length()) assert.Equal(t, 4, bq.length())
// block with too big index is dropped // block with too big index is dropped
@ -38,12 +40,14 @@ func TestBlockQueue(t *testing.T) {
go bq.run() go bq.run()
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond) assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond)
assert.Equal(t, uint32(4), bq.lastQueued())
assert.Equal(t, 0, bq.length()) assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight()) assert.Equal(t, uint32(4), chain.BlockHeight())
// put some old blocks // put some old blocks
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
assert.NoError(t, bq.putBlock(blocks[i])) assert.NoError(t, bq.putBlock(blocks[i]))
} }
assert.Equal(t, uint32(4), bq.lastQueued())
assert.Equal(t, 0, bq.length()) assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight()) assert.Equal(t, uint32(4), chain.BlockHeight())
// unexpected blocks with run() active // unexpected blocks with run() active
@ -61,6 +65,7 @@ func TestBlockQueue(t *testing.T) {
assert.NoError(t, bq.putBlock(blocks[5])) assert.NoError(t, bq.putBlock(blocks[5]))
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond)
assert.Equal(t, uint32(8), bq.lastQueued())
assert.Equal(t, 1, bq.length()) assert.Equal(t, 1, bq.length())
assert.Equal(t, uint32(8), chain.BlockHeight()) assert.Equal(t, uint32(8), chain.BlockHeight())
bq.discard() bq.discard()

View file

@ -1070,7 +1070,16 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// 2. Send requests for chunk in increasing order. // 2. Send requests for chunk in increasing order.
// 3. After all requests were sent, request random height. // 3. After all requests were sent, request random height.
func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error { func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error {
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock) h := bq.BlockHeight()
pl := getRequestBlocksPayload(p, h, &s.lastRequestedBlock)
lq := s.bQueue.lastQueued()
if lq > pl.IndexStart {
c := int16(h + blockCacheSize - lq)
if c < payload.MaxHashesCount {
pl.Count = c
}
pl.IndexStart = lq + 1
}
return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, pl)) return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, pl))
} }