Merge pull request #1568 from nspcc-dev/blockfetch

network: fetch blocks in parallel
This commit is contained in:
Roman Khimov 2020-12-02 10:59:43 +03:00 committed by GitHub
commit 0c1d1e061d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 2 deletions

View file

@ -5,6 +5,7 @@ import (
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
mrand "math/rand"
"net" "net"
"strconv" "strconv"
"sync" "sync"
@ -67,6 +68,9 @@ type (
lock sync.RWMutex lock sync.RWMutex
peers map[Peer]bool peers map[Peer]bool
// lastRequestedHeight contains last requested height.
lastRequestedHeight atomic.Uint32
register chan Peer register chan Peer
unregister chan peerDrop unregister chan peerDrop
quit chan struct{} quit chan struct{}
@ -710,9 +714,41 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// requestBlocks sends a CMDGetBlockByIndex message to the peer // requestBlocks sends a CMDGetBlockByIndex message to the peer
// to sync up in blocks. A maximum of maxBlockBatch will // to sync up in blocks. A maximum of maxBlockBatch will
// send at once. // send at once. Two things we need to take care of:
// 1. If possible, blocks should be fetched in parallel.
// height..+500 to one peer, height+500..+1000 to another etc.
// 2. Every block must eventually be fetched even if peer sends no answer.
// Thus the following algorithm is used:
// 1. Block range is divided into chunks of payload.MaxHashesCount.
// 2. Send requests for chunk in increasing order.
// 3. After all requests were sent, request random height.
func (s *Server) requestBlocks(p Peer) error { func (s *Server) requestBlocks(p Peer) error {
payload := payload.NewGetBlockByIndex(s.chain.BlockHeight()+1, -1) var currHeight = s.chain.BlockHeight()
var peerHeight = p.LastBlockIndex()
var needHeight uint32
// lastRequestedHeight can only be increased.
for {
old := s.lastRequestedHeight.Load()
if old <= currHeight {
needHeight = currHeight + 1
if !s.lastRequestedHeight.CAS(old, needHeight) {
continue
}
} else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) {
needHeight = currHeight + 1
if peerHeight > old+payload.MaxHashesCount {
needHeight = old + payload.MaxHashesCount
if !s.lastRequestedHeight.CAS(old, needHeight) {
continue
}
}
} else {
index := mrand.Intn(blockCacheSize / payload.MaxHashesCount)
needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount)
}
break
}
payload := payload.NewGetBlockByIndex(needHeight, -1)
return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, payload)) return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, payload))
} }

View file

@ -12,6 +12,60 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestGetBlocksByIndex(t *testing.T) {
s := newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"})
ps := make([]*localPeer, 10)
expectsCmd := make([]CommandType, 10)
expectedHeight := make([][]uint32, 10)
start := s.chain.BlockHeight()
for i := range ps {
i := i
ps[i] = newLocalPeer(t, s)
ps[i].messageHandler = func(t *testing.T, msg *Message) {
require.Equal(t, expectsCmd[i], msg.Command)
if expectsCmd[i] == CMDGetBlockByIndex {
p, ok := msg.Payload.(*payload.GetBlockByIndex)
require.True(t, ok)
require.Contains(t, expectedHeight[i], p.IndexStart)
expectsCmd[i] = CMDPong
} else if expectsCmd[i] == CMDPong {
expectsCmd[i] = CMDGetBlockByIndex
}
}
expectsCmd[i] = CMDGetBlockByIndex
expectedHeight[i] = []uint32{start + 1}
}
go s.transport.Accept()
nonce := uint32(0)
checkPingRespond := func(t *testing.T, peerIndex int, peerHeight uint32, hs ...uint32) {
nonce++
expectedHeight[peerIndex] = hs
require.NoError(t, s.handlePing(ps[peerIndex], payload.NewPing(peerHeight, nonce)))
}
// Send all requests for all chunks.
checkPingRespond(t, 0, 5000, 1)
checkPingRespond(t, 1, 5000, 1+payload.MaxHashesCount)
checkPingRespond(t, 2, 5000, 1+2*payload.MaxHashesCount)
checkPingRespond(t, 3, 5000, 1+3*payload.MaxHashesCount)
// Receive some blocks.
s.chain.(*testChain).blockheight = 2123
// Minimum chunk has priority.
checkPingRespond(t, 5, 5000, 2124)
checkPingRespond(t, 6, 5000, 2624)
// Request minimal height for peers behind.
checkPingRespond(t, 7, 3100, 2124)
checkPingRespond(t, 8, 5000, 3124)
checkPingRespond(t, 9, 5000, 3624)
// Request random height after that.
checkPingRespond(t, 1, 5000, 2124, 2624, 3124, 3624)
checkPingRespond(t, 2, 5000, 2124, 2624, 3124, 3624)
checkPingRespond(t, 3, 5000, 2124, 2624, 3124, 3624)
}
func TestSendVersion(t *testing.T) { func TestSendVersion(t *testing.T) {
var ( var (
s = newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"}) s = newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"})