network: fetch blocks in parallel
Blockcache size is 2000, while max request size is 500. Try to fetch blocks in chunks starting from current height. Lower height has priority.
This commit is contained in:
parent
9a4183abb9
commit
4aa1a37f3f
2 changed files with 92 additions and 2 deletions
|
@ -5,6 +5,7 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
mrand "math/rand"
|
||||||
"net"
|
"net"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -67,6 +68,9 @@ type (
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
peers map[Peer]bool
|
peers map[Peer]bool
|
||||||
|
|
||||||
|
// lastRequestedHeight contains last requested height.
|
||||||
|
lastRequestedHeight atomic.Uint32
|
||||||
|
|
||||||
register chan Peer
|
register chan Peer
|
||||||
unregister chan peerDrop
|
unregister chan peerDrop
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
@ -710,9 +714,41 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
|
||||||
|
|
||||||
// requestBlocks sends a CMDGetBlockByIndex message to the peer
|
// requestBlocks sends a CMDGetBlockByIndex message to the peer
|
||||||
// to sync up in blocks. A maximum of maxBlockBatch will
|
// to sync up in blocks. A maximum of maxBlockBatch will
|
||||||
// send at once.
|
// send at once. Two things we need to take care of:
|
||||||
|
// 1. If possible, blocks should be fetched in parallel.
|
||||||
|
// height..+500 to one peer, height+500..+1000 to another etc.
|
||||||
|
// 2. Every block must eventually be fetched even if peer sends no answer.
|
||||||
|
// Thus the following algorithm is used:
|
||||||
|
// 1. Block range is divided into chunks of payload.MaxHashesCount.
|
||||||
|
// 2. Send requests for chunk in increasing order.
|
||||||
|
// 3. After all requests were sent, request random height.
|
||||||
func (s *Server) requestBlocks(p Peer) error {
|
func (s *Server) requestBlocks(p Peer) error {
|
||||||
payload := payload.NewGetBlockByIndex(s.chain.BlockHeight()+1, -1)
|
var currHeight = s.chain.BlockHeight()
|
||||||
|
var peerHeight = p.LastBlockIndex()
|
||||||
|
var needHeight uint32
|
||||||
|
// lastRequestedHeight can only be increased.
|
||||||
|
for {
|
||||||
|
old := s.lastRequestedHeight.Load()
|
||||||
|
if old <= currHeight {
|
||||||
|
needHeight = currHeight + 1
|
||||||
|
if !s.lastRequestedHeight.CAS(old, needHeight) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
} else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) {
|
||||||
|
needHeight = currHeight + 1
|
||||||
|
if peerHeight > old+payload.MaxHashesCount {
|
||||||
|
needHeight = old + payload.MaxHashesCount
|
||||||
|
if !s.lastRequestedHeight.CAS(old, needHeight) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
index := mrand.Intn(blockCacheSize / payload.MaxHashesCount)
|
||||||
|
needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
payload := payload.NewGetBlockByIndex(needHeight, -1)
|
||||||
return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, payload))
|
return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, payload))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,6 +12,60 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestGetBlocksByIndex(t *testing.T) {
|
||||||
|
s := newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"})
|
||||||
|
ps := make([]*localPeer, 10)
|
||||||
|
expectsCmd := make([]CommandType, 10)
|
||||||
|
expectedHeight := make([][]uint32, 10)
|
||||||
|
start := s.chain.BlockHeight()
|
||||||
|
for i := range ps {
|
||||||
|
i := i
|
||||||
|
ps[i] = newLocalPeer(t, s)
|
||||||
|
ps[i].messageHandler = func(t *testing.T, msg *Message) {
|
||||||
|
require.Equal(t, expectsCmd[i], msg.Command)
|
||||||
|
if expectsCmd[i] == CMDGetBlockByIndex {
|
||||||
|
p, ok := msg.Payload.(*payload.GetBlockByIndex)
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Contains(t, expectedHeight[i], p.IndexStart)
|
||||||
|
expectsCmd[i] = CMDPong
|
||||||
|
} else if expectsCmd[i] == CMDPong {
|
||||||
|
expectsCmd[i] = CMDGetBlockByIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
expectsCmd[i] = CMDGetBlockByIndex
|
||||||
|
expectedHeight[i] = []uint32{start + 1}
|
||||||
|
}
|
||||||
|
go s.transport.Accept()
|
||||||
|
|
||||||
|
nonce := uint32(0)
|
||||||
|
checkPingRespond := func(t *testing.T, peerIndex int, peerHeight uint32, hs ...uint32) {
|
||||||
|
nonce++
|
||||||
|
expectedHeight[peerIndex] = hs
|
||||||
|
require.NoError(t, s.handlePing(ps[peerIndex], payload.NewPing(peerHeight, nonce)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send all requests for all chunks.
|
||||||
|
checkPingRespond(t, 0, 5000, 1)
|
||||||
|
checkPingRespond(t, 1, 5000, 1+payload.MaxHashesCount)
|
||||||
|
checkPingRespond(t, 2, 5000, 1+2*payload.MaxHashesCount)
|
||||||
|
checkPingRespond(t, 3, 5000, 1+3*payload.MaxHashesCount)
|
||||||
|
|
||||||
|
// Receive some blocks.
|
||||||
|
s.chain.(*testChain).blockheight = 2123
|
||||||
|
|
||||||
|
// Minimum chunk has priority.
|
||||||
|
checkPingRespond(t, 5, 5000, 2124)
|
||||||
|
checkPingRespond(t, 6, 5000, 2624)
|
||||||
|
// Request minimal height for peers behind.
|
||||||
|
checkPingRespond(t, 7, 3100, 2124)
|
||||||
|
checkPingRespond(t, 8, 5000, 3124)
|
||||||
|
checkPingRespond(t, 9, 5000, 3624)
|
||||||
|
// Request random height after that.
|
||||||
|
checkPingRespond(t, 1, 5000, 2124, 2624, 3124, 3624)
|
||||||
|
checkPingRespond(t, 2, 5000, 2124, 2624, 3124, 3624)
|
||||||
|
checkPingRespond(t, 3, 5000, 2124, 2624, 3124, 3624)
|
||||||
|
}
|
||||||
|
|
||||||
func TestSendVersion(t *testing.T) {
|
func TestSendVersion(t *testing.T) {
|
||||||
var (
|
var (
|
||||||
s = newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"})
|
s = newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"})
|
||||||
|
|
Loading…
Reference in a new issue