forked from TrueCloudLab/neoneo-go
Merge pull request #2329 from nspcc-dev/dont-request-queued-blocks
Don't request queued blocks again
This commit is contained in:
commit
01e9af6a30
3 changed files with 36 additions and 15 deletions
|
@ -17,8 +17,9 @@ type Blockqueuer interface {
|
||||||
|
|
||||||
type blockQueue struct {
|
type blockQueue struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
queueLock sync.Mutex
|
queueLock sync.RWMutex
|
||||||
queue []*block.Block
|
queue []*block.Block
|
||||||
|
lastQ uint32
|
||||||
checkBlocks chan struct{}
|
checkBlocks chan struct{}
|
||||||
chain Blockqueuer
|
chain Blockqueuer
|
||||||
relayF func(*block.Block)
|
relayF func(*block.Block)
|
||||||
|
@ -32,6 +33,10 @@ const (
|
||||||
blockCacheSize = 2000
|
blockCacheSize = 2000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func indexToPosition(i uint32) int {
|
||||||
|
return int(i) % blockCacheSize
|
||||||
|
}
|
||||||
|
|
||||||
func newBlockQueue(capacity int, bc Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue {
|
func newBlockQueue(capacity int, bc Blockqueuer, log *zap.Logger, relayer func(*block.Block)) *blockQueue {
|
||||||
if log == nil {
|
if log == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -56,12 +61,12 @@ func (bq *blockQueue) run() {
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
h := bq.chain.BlockHeight()
|
h := bq.chain.BlockHeight()
|
||||||
pos := int(h+1) % blockCacheSize
|
pos := indexToPosition(h + 1)
|
||||||
bq.queueLock.Lock()
|
bq.queueLock.Lock()
|
||||||
b := bq.queue[pos]
|
b := bq.queue[pos]
|
||||||
// The chain moved forward using blocks from other sources (consensus).
|
// The chain moved forward using blocks from other sources (consensus).
|
||||||
for i := lastHeight; i < h; i++ {
|
for i := lastHeight; i < h; i++ {
|
||||||
old := int(i+1) % blockCacheSize
|
old := indexToPosition(i + 1)
|
||||||
if bq.queue[old] != nil && bq.queue[old].Index == i {
|
if bq.queue[old] != nil && bq.queue[old].Index == i {
|
||||||
bq.len--
|
bq.len--
|
||||||
bq.queue[old] = nil
|
bq.queue[old] = nil
|
||||||
|
@ -106,11 +111,15 @@ func (bq *blockQueue) putBlock(block *block.Block) error {
|
||||||
bq.queueLock.Unlock()
|
bq.queueLock.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
pos := block.Index % blockCacheSize
|
pos := indexToPosition(block.Index)
|
||||||
// If we already have it, keep the old block, throw away new one.
|
// If we already have it, keep the old block, throw away new one.
|
||||||
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
|
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
|
||||||
bq.len++
|
bq.len++
|
||||||
bq.queue[pos] = block
|
bq.queue[pos] = block
|
||||||
|
for pos < blockCacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
|
||||||
|
bq.lastQ = bq.queue[pos].Index
|
||||||
|
pos++
|
||||||
|
}
|
||||||
}
|
}
|
||||||
l := bq.len
|
l := bq.len
|
||||||
bq.queueLock.Unlock()
|
bq.queueLock.Unlock()
|
||||||
|
@ -125,6 +134,12 @@ func (bq *blockQueue) putBlock(block *block.Block) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bq *blockQueue) lastQueued() uint32 {
|
||||||
|
bq.queueLock.RLock()
|
||||||
|
defer bq.queueLock.RUnlock()
|
||||||
|
return bq.lastQ
|
||||||
|
}
|
||||||
|
|
||||||
func (bq *blockQueue) discard() {
|
func (bq *blockQueue) discard() {
|
||||||
if bq.discarded.CAS(false, true) {
|
if bq.discarded.CAS(false, true) {
|
||||||
close(bq.checkBlocks)
|
close(bq.checkBlocks)
|
||||||
|
|
|
@ -22,6 +22,7 @@ func TestBlockQueue(t *testing.T) {
|
||||||
for i := 3; i < 5; i++ {
|
for i := 3; i < 5; i++ {
|
||||||
assert.NoError(t, bq.putBlock(blocks[i]))
|
assert.NoError(t, bq.putBlock(blocks[i]))
|
||||||
}
|
}
|
||||||
|
assert.Equal(t, uint32(0), bq.lastQueued())
|
||||||
// nothing should be put into the blockchain
|
// nothing should be put into the blockchain
|
||||||
assert.Equal(t, uint32(0), chain.BlockHeight())
|
assert.Equal(t, uint32(0), chain.BlockHeight())
|
||||||
assert.Equal(t, 2, bq.length())
|
assert.Equal(t, 2, bq.length())
|
||||||
|
@ -30,6 +31,7 @@ func TestBlockQueue(t *testing.T) {
|
||||||
assert.NoError(t, bq.putBlock(blocks[i]))
|
assert.NoError(t, bq.putBlock(blocks[i]))
|
||||||
}
|
}
|
||||||
// but they're still not put into the blockchain, because bq isn't running
|
// but they're still not put into the blockchain, because bq isn't running
|
||||||
|
assert.Equal(t, uint32(4), bq.lastQueued())
|
||||||
assert.Equal(t, uint32(0), chain.BlockHeight())
|
assert.Equal(t, uint32(0), chain.BlockHeight())
|
||||||
assert.Equal(t, 4, bq.length())
|
assert.Equal(t, 4, bq.length())
|
||||||
// block with too big index is dropped
|
// block with too big index is dropped
|
||||||
|
@ -37,17 +39,15 @@ func TestBlockQueue(t *testing.T) {
|
||||||
assert.Equal(t, 4, bq.length())
|
assert.Equal(t, 4, bq.length())
|
||||||
go bq.run()
|
go bq.run()
|
||||||
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
|
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
|
||||||
for i := 0; i < 5; i++ {
|
assert.Eventually(t, func() bool { return chain.BlockHeight() == 4 }, 4*time.Second, 100*time.Millisecond)
|
||||||
if chain.BlockHeight() != 4 {
|
assert.Equal(t, uint32(4), bq.lastQueued())
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert.Equal(t, 0, bq.length())
|
assert.Equal(t, 0, bq.length())
|
||||||
assert.Equal(t, uint32(4), chain.BlockHeight())
|
assert.Equal(t, uint32(4), chain.BlockHeight())
|
||||||
// put some old blocks
|
// put some old blocks
|
||||||
for i := 1; i < 5; i++ {
|
for i := 1; i < 5; i++ {
|
||||||
assert.NoError(t, bq.putBlock(blocks[i]))
|
assert.NoError(t, bq.putBlock(blocks[i]))
|
||||||
}
|
}
|
||||||
|
assert.Equal(t, uint32(4), bq.lastQueued())
|
||||||
assert.Equal(t, 0, bq.length())
|
assert.Equal(t, 0, bq.length())
|
||||||
assert.Equal(t, uint32(4), chain.BlockHeight())
|
assert.Equal(t, uint32(4), chain.BlockHeight())
|
||||||
// unexpected blocks with run() active
|
// unexpected blocks with run() active
|
||||||
|
@ -64,11 +64,8 @@ func TestBlockQueue(t *testing.T) {
|
||||||
assert.NoError(t, bq.putBlock(blocks[6]))
|
assert.NoError(t, bq.putBlock(blocks[6]))
|
||||||
assert.NoError(t, bq.putBlock(blocks[5]))
|
assert.NoError(t, bq.putBlock(blocks[5]))
|
||||||
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
|
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
|
||||||
for i := 0; i < 5; i++ {
|
assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond)
|
||||||
if chain.BlockHeight() != 8 {
|
assert.Equal(t, uint32(8), bq.lastQueued())
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert.Equal(t, 1, bq.length())
|
assert.Equal(t, 1, bq.length())
|
||||||
assert.Equal(t, uint32(8), chain.BlockHeight())
|
assert.Equal(t, uint32(8), chain.BlockHeight())
|
||||||
bq.discard()
|
bq.discard()
|
||||||
|
|
|
@ -1070,7 +1070,16 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
|
||||||
// 2. Send requests for chunk in increasing order.
|
// 2. Send requests for chunk in increasing order.
|
||||||
// 3. After all requests were sent, request random height.
|
// 3. After all requests were sent, request random height.
|
||||||
func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error {
|
func (s *Server) requestBlocks(bq Blockqueuer, p Peer) error {
|
||||||
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
|
h := bq.BlockHeight()
|
||||||
|
pl := getRequestBlocksPayload(p, h, &s.lastRequestedBlock)
|
||||||
|
lq := s.bQueue.lastQueued()
|
||||||
|
if lq > pl.IndexStart {
|
||||||
|
c := int16(h + blockCacheSize - lq)
|
||||||
|
if c < payload.MaxHashesCount {
|
||||||
|
pl.Count = c
|
||||||
|
}
|
||||||
|
pl.IndexStart = lq + 1
|
||||||
|
}
|
||||||
return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, pl))
|
return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, pl))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue