From 4aa1a37f3f81693f559bb640cd2b6026cf385988 Mon Sep 17 00:00:00 2001 From: Evgenii Stratonikov Date: Thu, 26 Nov 2020 18:53:10 +0300 Subject: [PATCH] network: fetch blocks in parallel Blockcache size is 2000, while max request size is 500. Try to fetch blocks in chunks starting from current height. Lower height has priority. --- pkg/network/server.go | 40 ++++++++++++++++++++++++++-- pkg/network/server_test.go | 54 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 2 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index ed184e86f..78d6bc559 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "fmt" + mrand "math/rand" "net" "strconv" "sync" @@ -67,6 +68,9 @@ type ( lock sync.RWMutex peers map[Peer]bool + // lastRequestedHeight contains last requested height. + lastRequestedHeight atomic.Uint32 + register chan Peer unregister chan peerDrop quit chan struct{} @@ -710,9 +714,41 @@ func (s *Server) handleGetAddrCmd(p Peer) error { // requestBlocks sends a CMDGetBlockByIndex message to the peer // to sync up in blocks. A maximum of maxBlockBatch will -// send at once. +// send at once. Two things we need to take care of: +// 1. If possible, blocks should be fetched in parallel. +// height..+500 to one peer, height+500..+1000 to another etc. +// 2. Every block must eventually be fetched even if peer sends no answer. +// Thus the following algorithm is used: +// 1. Block range is divided into chunks of payload.MaxHashesCount. +// 2. Send requests for chunk in increasing order. +// 3. After all requests were sent, request random height. func (s *Server) requestBlocks(p Peer) error { - payload := payload.NewGetBlockByIndex(s.chain.BlockHeight()+1, -1) + var currHeight = s.chain.BlockHeight() + var peerHeight = p.LastBlockIndex() + var needHeight uint32 + // lastRequestedHeight can only be increased. + for { + old := s.lastRequestedHeight.Load() + if old <= currHeight { + needHeight = currHeight + 1 + if !s.lastRequestedHeight.CAS(old, needHeight) { + continue + } + } else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) { + needHeight = currHeight + 1 + if peerHeight > old+payload.MaxHashesCount { + needHeight = old + payload.MaxHashesCount + if !s.lastRequestedHeight.CAS(old, needHeight) { + continue + } + } + } else { + index := mrand.Intn(blockCacheSize / payload.MaxHashesCount) + needHeight = currHeight + 1 + uint32(index*payload.MaxHashesCount) + } + break + } + payload := payload.NewGetBlockByIndex(needHeight, -1) return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, payload)) } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 30f18942d..4f2e56c33 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -12,6 +12,60 @@ import ( "github.com/stretchr/testify/require" ) +func TestGetBlocksByIndex(t *testing.T) { + s := newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"}) + ps := make([]*localPeer, 10) + expectsCmd := make([]CommandType, 10) + expectedHeight := make([][]uint32, 10) + start := s.chain.BlockHeight() + for i := range ps { + i := i + ps[i] = newLocalPeer(t, s) + ps[i].messageHandler = func(t *testing.T, msg *Message) { + require.Equal(t, expectsCmd[i], msg.Command) + if expectsCmd[i] == CMDGetBlockByIndex { + p, ok := msg.Payload.(*payload.GetBlockByIndex) + require.True(t, ok) + require.Contains(t, expectedHeight[i], p.IndexStart) + expectsCmd[i] = CMDPong + } else if expectsCmd[i] == CMDPong { + expectsCmd[i] = CMDGetBlockByIndex + } + } + expectsCmd[i] = CMDGetBlockByIndex + expectedHeight[i] = []uint32{start + 1} + } + go s.transport.Accept() + + nonce := uint32(0) + checkPingRespond := func(t *testing.T, peerIndex int, peerHeight uint32, hs ...uint32) { + nonce++ + expectedHeight[peerIndex] = hs + require.NoError(t, s.handlePing(ps[peerIndex], payload.NewPing(peerHeight, nonce))) + } + + // Send all requests for all chunks. + checkPingRespond(t, 0, 5000, 1) + checkPingRespond(t, 1, 5000, 1+payload.MaxHashesCount) + checkPingRespond(t, 2, 5000, 1+2*payload.MaxHashesCount) + checkPingRespond(t, 3, 5000, 1+3*payload.MaxHashesCount) + + // Receive some blocks. + s.chain.(*testChain).blockheight = 2123 + + // Minimum chunk has priority. + checkPingRespond(t, 5, 5000, 2124) + checkPingRespond(t, 6, 5000, 2624) + // Request minimal height for peers behind. + checkPingRespond(t, 7, 3100, 2124) + checkPingRespond(t, 8, 5000, 3124) + checkPingRespond(t, 9, 5000, 3624) + // Request random height after that. + checkPingRespond(t, 1, 5000, 2124, 2624, 3124, 3624) + checkPingRespond(t, 2, 5000, 2124, 2624, 3124, 3624) + checkPingRespond(t, 3, 5000, 2124, 2624, 3124, 3624) +} + func TestSendVersion(t *testing.T) { var ( s = newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"})