Merge pull request #2200 from nspcc-dev/optimize-headers

network: request headers in parallel
This commit is contained in:
Roman Khimov 2021-10-07 14:51:38 +03:00 committed by GitHub
commit 5ff475383b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 19 deletions

View file

@ -49,6 +49,7 @@ type FakeChain struct {
type FakeStateSync struct { type FakeStateSync struct {
IsActiveFlag uatomic.Bool IsActiveFlag uatomic.Bool
IsInitializedFlag uatomic.Bool IsInitializedFlag uatomic.Bool
RequestHeaders uatomic.Bool
InitFunc func(h uint32) error InitFunc func(h uint32) error
TraverseFunc func(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error TraverseFunc func(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error
AddMPTNodesFunc func(nodes [][]byte) error AddMPTNodesFunc func(nodes [][]byte) error
@ -503,7 +504,7 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error {
} }
// NeedHeaders implements StateSync interface. // NeedHeaders implements StateSync interface.
func (s *FakeStateSync) NeedHeaders() bool { return false } func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() }
// NeedMPTNodes implements StateSync interface. // NeedMPTNodes implements StateSync interface.
func (s *FakeStateSync) NeedMPTNodes() bool { func (s *FakeStateSync) NeedMPTNodes() bool {

View file

@ -84,8 +84,10 @@ type (
lock sync.RWMutex lock sync.RWMutex
peers map[Peer]bool peers map[Peer]bool
// lastRequestedHeight contains last requested height. // lastRequestedBlock contains a height of the last requested block.
lastRequestedHeight atomic.Uint32 lastRequestedBlock atomic.Uint32
// lastRequestedHeader contains a height of the last requested header.
lastRequestedHeader atomic.Uint32
register chan Peer register chan Peer
unregister chan peerDrop unregister chan peerDrop
@ -694,11 +696,8 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error {
// requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers. // requestHeaders sends a CMDGetHeaders message to the peer to sync up in headers.
func (s *Server) requestHeaders(p Peer) error { func (s *Server) requestHeaders(p Peer) error {
// TODO: optimize pl := getRequestBlocksPayload(p, s.chain.HeaderHeight(), &s.lastRequestedHeader)
currHeight := s.chain.HeaderHeight() return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, pl))
needHeight := currHeight + 1
payload := payload.NewGetBlockByIndex(needHeight, -1)
return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, payload))
} }
// handlePing processes pong request. // handlePing processes pong request.
@ -1112,22 +1111,26 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// 2. Send requests for chunk in increasing order. // 2. Send requests for chunk in increasing order.
// 3. After all requests were sent, request random height. // 3. After all requests were sent, request random height.
func (s *Server) requestBlocks(bq blockchainer.Blockqueuer, p Peer) error { func (s *Server) requestBlocks(bq blockchainer.Blockqueuer, p Peer) error {
var currHeight = bq.BlockHeight() pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, pl))
}
func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *atomic.Uint32) *payload.GetBlockByIndex {
var peerHeight = p.LastBlockIndex() var peerHeight = p.LastBlockIndex()
var needHeight uint32 var needHeight uint32
// lastRequestedHeight can only be increased. // lastRequestedBlock can only be increased.
for { for {
old := s.lastRequestedHeight.Load() old := lastRequestedHeight.Load()
if old <= currHeight { if old <= currHeight {
needHeight = currHeight + 1 needHeight = currHeight + 1
if !s.lastRequestedHeight.CAS(old, needHeight) { if !lastRequestedHeight.CAS(old, needHeight) {
continue continue
} }
} else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) { } else if old < currHeight+(blockCacheSize-payload.MaxHashesCount) {
needHeight = currHeight + 1 needHeight = currHeight + 1
if peerHeight > old+payload.MaxHashesCount { if peerHeight > old+payload.MaxHashesCount {
needHeight = old + payload.MaxHashesCount needHeight = old + payload.MaxHashesCount
if !s.lastRequestedHeight.CAS(old, needHeight) { if !lastRequestedHeight.CAS(old, needHeight) {
continue continue
} }
} }
@ -1137,8 +1140,7 @@ func (s *Server) requestBlocks(bq blockchainer.Blockqueuer, p Peer) error {
} }
break break
} }
payload := payload.NewGetBlockByIndex(needHeight, -1) return payload.NewGetBlockByIndex(needHeight, -1)
return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, payload))
} }
// handleMessage processes the given message. // handleMessage processes the given message.

View file

@ -186,26 +186,34 @@ func TestServerRegisterPeer(t *testing.T) {
} }
func TestGetBlocksByIndex(t *testing.T) { func TestGetBlocksByIndex(t *testing.T) {
testGetBlocksByIndex(t, CMDGetBlockByIndex)
}
func testGetBlocksByIndex(t *testing.T, cmd CommandType) {
s := newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"}) s := newTestServer(t, ServerConfig{Port: 0, UserAgent: "/test/"})
start := s.chain.BlockHeight()
if cmd == CMDGetHeaders {
start = s.chain.HeaderHeight()
s.stateSync.(*fakechain.FakeStateSync).RequestHeaders.Store(true)
}
ps := make([]*localPeer, 10) ps := make([]*localPeer, 10)
expectsCmd := make([]CommandType, 10) expectsCmd := make([]CommandType, 10)
expectedHeight := make([][]uint32, 10) expectedHeight := make([][]uint32, 10)
start := s.chain.BlockHeight()
for i := range ps { for i := range ps {
i := i i := i
ps[i] = newLocalPeer(t, s) ps[i] = newLocalPeer(t, s)
ps[i].messageHandler = func(t *testing.T, msg *Message) { ps[i].messageHandler = func(t *testing.T, msg *Message) {
require.Equal(t, expectsCmd[i], msg.Command) require.Equal(t, expectsCmd[i], msg.Command)
if expectsCmd[i] == CMDGetBlockByIndex { if expectsCmd[i] == cmd {
p, ok := msg.Payload.(*payload.GetBlockByIndex) p, ok := msg.Payload.(*payload.GetBlockByIndex)
require.True(t, ok) require.True(t, ok)
require.Contains(t, expectedHeight[i], p.IndexStart) require.Contains(t, expectedHeight[i], p.IndexStart)
expectsCmd[i] = CMDPong expectsCmd[i] = CMDPong
} else if expectsCmd[i] == CMDPong { } else if expectsCmd[i] == CMDPong {
expectsCmd[i] = CMDGetBlockByIndex expectsCmd[i] = cmd
} }
} }
expectsCmd[i] = CMDGetBlockByIndex expectsCmd[i] = cmd
expectedHeight[i] = []uint32{start + 1} expectedHeight[i] = []uint32{start + 1}
} }
go s.transport.Accept() go s.transport.Accept()
@ -678,6 +686,9 @@ func TestGetHeaders(t *testing.T) {
s.testHandleMessage(t, p, CMDGetHeaders, &payload.GetBlockByIndex{IndexStart: 123, Count: -1}) s.testHandleMessage(t, p, CMDGetHeaders, &payload.GetBlockByIndex{IndexStart: 123, Count: -1})
require.Nil(t, actual) require.Nil(t, actual)
}) })
t.Run("distribute requests between peers", func(t *testing.T) {
testGetBlocksByIndex(t, CMDGetHeaders)
})
} }
func TestInv(t *testing.T) { func TestInv(t *testing.T) {