network: fetch blocks in parallel

Blockcache size is 2000, while max request size is 500.
Try to fetch blocks in chunks starting from current height.
Lower height has priority.
This commit is contained in:
Evgenii Stratonikov 2020-11-26 18:53:10 +03:00
parent 9a4183abb9
commit 4aa1a37f3f
2 changed files with 92 additions and 2 deletions

View file

@ -5,6 +5,7 @@ import (
"encoding/binary"
"errors"
"fmt"
mrand "math/rand"
"net"
"strconv"
"sync"
@ -67,6 +68,9 @@ type (
lock sync.RWMutex
peers map[Peer]bool
// lastRequestedHeight contains last requested height.
lastRequestedHeight atomic.Uint32
register chan Peer
unregister chan peerDrop
quit chan struct{}
@ -710,9 +714,41 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// requestBlocks sends a CMDGetBlockByIndex message to the peer
// to sync up in blocks. A maximum of maxBlockBatch will
// send at once.
// send at once. Two things we need to take care of:
// 1. If possible, blocks should be fetched in parallel.
// height..+500 to one peer, height+500..+1000 to another etc.
// 2. Every block must eventually be fetched even if peer sends no answer.
// Thus the following algorithm is used:
// 1. Block range is divided into chunks of payload.MaxHashesCount.
// 2. Send requests for chunk in increasing order.
// 3. After all requests were sent, request random height.
func (s *Server) requestBlocks(p Peer) error {
payload := payload.NewGetBlockByIndex(s.chain.BlockHeight()+1, -1)
var currHeight = s.chain.BlockHeight()
var peerHeight = p.LastBlockIndex()
var needHeight uint32
// lastRequestedHeight can only be increased.
for {
old := s.lastRequestedHeight.Load()
if old <= currHeight {
needHeight = currHeight + 1
if !s.lastRequestedHeight.CAS(old, needHeight) {
continue
}
} else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) {
needHeight = currHeight + 1
if peerHeight > old+payload.MaxHashesCount {
needHeight = old + payload.MaxHashesCount
if !s.lastRequestedHeight.CAS(old, needHeight) {
continue
}
}
} else {
index := mrand.Intn(blockCacheSize / payload.MaxHashesCount)
needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount)
}
break
}
payload := payload.NewGetBlockByIndex(needHeight, -1)
return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, payload))
}

View file

@ -12,6 +12,60 @@ import (
"github.com/stretchr/testify/require"
)
func TestGetBlocksByIndex(t *testing.T) {
s := newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"})
ps := make([]*localPeer, 10)
expectsCmd := make([]CommandType, 10)
expectedHeight := make([][]uint32, 10)
start := s.chain.BlockHeight()
for i := range ps {
i := i
ps[i] = newLocalPeer(t, s)
ps[i].messageHandler = func(t *testing.T, msg *Message) {
require.Equal(t, expectsCmd[i], msg.Command)
if expectsCmd[i] == CMDGetBlockByIndex {
p, ok := msg.Payload.(*payload.GetBlockByIndex)
require.True(t, ok)
require.Contains(t, expectedHeight[i], p.IndexStart)
expectsCmd[i] = CMDPong
} else if expectsCmd[i] == CMDPong {
expectsCmd[i] = CMDGetBlockByIndex
}
}
expectsCmd[i] = CMDGetBlockByIndex
expectedHeight[i] = []uint32{start + 1}
}
go s.transport.Accept()
nonce := uint32(0)
checkPingRespond := func(t *testing.T, peerIndex int, peerHeight uint32, hs ...uint32) {
nonce++
expectedHeight[peerIndex] = hs
require.NoError(t, s.handlePing(ps[peerIndex], payload.NewPing(peerHeight, nonce)))
}
// Send all requests for all chunks.
checkPingRespond(t, 0, 5000, 1)
checkPingRespond(t, 1, 5000, 1+payload.MaxHashesCount)
checkPingRespond(t, 2, 5000, 1+2*payload.MaxHashesCount)
checkPingRespond(t, 3, 5000, 1+3*payload.MaxHashesCount)
// Receive some blocks.
s.chain.(*testChain).blockheight = 2123
// Minimum chunk has priority.
checkPingRespond(t, 5, 5000, 2124)
checkPingRespond(t, 6, 5000, 2624)
// Request minimal height for peers behind.
checkPingRespond(t, 7, 3100, 2124)
checkPingRespond(t, 8, 5000, 3124)
checkPingRespond(t, 9, 5000, 3624)
// Request random height after that.
checkPingRespond(t, 1, 5000, 2124, 2624, 3124, 3624)
checkPingRespond(t, 2, 5000, 2124, 2624, 3124, 3624)
checkPingRespond(t, 3, 5000, 2124, 2624, 3124, 3624)
}
func TestSendVersion(t *testing.T) {
var (
s = newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"})