Merge pull request #1246 from nspcc-dev/protocol/getblockbyindex

protocol: request blocks by index
This commit is contained in:
Roman Khimov 2020-08-05 23:21:07 +03:00 committed by GitHub
commit 57ee8b80e5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 102 additions and 145 deletions

View file

@ -203,6 +203,8 @@ func (p *Policy) OnPersistEnd(dao dao.DAO) {
maxBlockSystemFee := p.getInt64WithKey(dao, maxBlockSystemFeeKey) maxBlockSystemFee := p.getInt64WithKey(dao, maxBlockSystemFeeKey)
p.maxBlockSystemFee = maxBlockSystemFee p.maxBlockSystemFee = maxBlockSystemFee
p.maxVerificationGas = defaultMaxVerificationGas
p.isValid = true p.isValid = true
} }
@ -256,7 +258,10 @@ func (p *Policy) GetFeePerByteInternal(dao dao.DAO) int64 {
// GetMaxVerificationGas returns maximum gas allowed to be burned during verificaion. // GetMaxVerificationGas returns maximum gas allowed to be burned during verificaion.
func (p *Policy) GetMaxVerificationGas(_ dao.DAO) int64 { func (p *Policy) GetMaxVerificationGas(_ dao.DAO) int64 {
return p.maxVerificationGas if p.isValid {
return p.maxVerificationGas
}
return defaultMaxVerificationGas
} }
// getMaxBlockSystemFee is Policy contract method and returns the maximum overall // getMaxBlockSystemFee is Policy contract method and returns the maximum overall

View file

@ -61,18 +61,18 @@ const (
CMDPong CommandType = 0x19 CMDPong CommandType = 0x19
// synchronization // synchronization
CMDGetHeaders CommandType = 0x20 CMDGetHeaders CommandType = 0x20
CMDHeaders CommandType = 0x21 CMDHeaders CommandType = 0x21
CMDGetBlocks CommandType = 0x24 CMDGetBlocks CommandType = 0x24
CMDMempool CommandType = 0x25 CMDMempool CommandType = 0x25
CMDInv CommandType = 0x27 CMDInv CommandType = 0x27
CMDGetData CommandType = 0x28 CMDGetData CommandType = 0x28
CMDGetBlockData CommandType = 0x29 CMDGetBlockByIndex CommandType = 0x29
CMDNotFound CommandType = 0x2a CMDNotFound CommandType = 0x2a
CMDTX = CommandType(payload.TXType) CMDTX = CommandType(payload.TXType)
CMDBlock = CommandType(payload.BlockType) CMDBlock = CommandType(payload.BlockType)
CMDConsensus = CommandType(payload.ConsensusType) CMDConsensus = CommandType(payload.ConsensusType)
CMDReject CommandType = 0x2f CMDReject CommandType = 0x2f
// SPV protocol // SPV protocol
CMDFilterLoad CommandType = 0x30 CMDFilterLoad CommandType = 0x30
@ -146,11 +146,11 @@ func (m *Message) decodePayload() error {
case CMDConsensus: case CMDConsensus:
p = consensus.NewPayload(m.Network) p = consensus.NewPayload(m.Network)
case CMDGetBlocks: case CMDGetBlocks:
fallthrough
case CMDGetHeaders:
p = &payload.GetBlocks{} p = &payload.GetBlocks{}
case CMDGetBlockData: case CMDGetHeaders:
p = &payload.GetBlockData{} fallthrough
case CMDGetBlockByIndex:
p = &payload.GetBlockByIndex{}
case CMDHeaders: case CMDHeaders:
p = &payload.Headers{Network: m.Network} p = &payload.Headers{Network: m.Network}
case CMDTX: case CMDTX:

View file

@ -20,7 +20,7 @@ func _() {
_ = x[CMDMempool-37] _ = x[CMDMempool-37]
_ = x[CMDInv-39] _ = x[CMDInv-39]
_ = x[CMDGetData-40] _ = x[CMDGetData-40]
_ = x[CMDGetBlockData-41] _ = x[CMDGetBlockByIndex-41]
_ = x[CMDNotFound-42] _ = x[CMDNotFound-42]
_ = x[CMDTX-43] _ = x[CMDTX-43]
_ = x[CMDBlock-44] _ = x[CMDBlock-44]
@ -39,7 +39,7 @@ const (
_CommandType_name_2 = "CMDPingCMDPong" _CommandType_name_2 = "CMDPingCMDPong"
_CommandType_name_3 = "CMDGetHeadersCMDHeaders" _CommandType_name_3 = "CMDGetHeadersCMDHeaders"
_CommandType_name_4 = "CMDGetBlocksCMDMempool" _CommandType_name_4 = "CMDGetBlocksCMDMempool"
_CommandType_name_5 = "CMDInvCMDGetDataCMDGetBlockDataCMDNotFoundCMDTXCMDBlockCMDConsensus" _CommandType_name_5 = "CMDInvCMDGetDataCMDGetBlockByIndexCMDNotFoundCMDTXCMDBlockCMDConsensus"
_CommandType_name_6 = "CMDRejectCMDFilterLoadCMDFilterAddCMDFilterClear" _CommandType_name_6 = "CMDRejectCMDFilterLoadCMDFilterAddCMDFilterClear"
_CommandType_name_7 = "CMDMerkleBlock" _CommandType_name_7 = "CMDMerkleBlock"
_CommandType_name_8 = "CMDAlert" _CommandType_name_8 = "CMDAlert"
@ -51,7 +51,7 @@ var (
_CommandType_index_2 = [...]uint8{0, 7, 14} _CommandType_index_2 = [...]uint8{0, 7, 14}
_CommandType_index_3 = [...]uint8{0, 13, 23} _CommandType_index_3 = [...]uint8{0, 13, 23}
_CommandType_index_4 = [...]uint8{0, 12, 22} _CommandType_index_4 = [...]uint8{0, 12, 22}
_CommandType_index_5 = [...]uint8{0, 6, 16, 31, 42, 47, 55, 67} _CommandType_index_5 = [...]uint8{0, 6, 16, 34, 45, 50, 58, 70}
_CommandType_index_6 = [...]uint8{0, 9, 22, 34, 48} _CommandType_index_6 = [...]uint8{0, 9, 22, 34, 48}
) )

View file

@ -0,0 +1,36 @@
package payload
import (
"errors"
"github.com/nspcc-dev/neo-go/pkg/io"
)
// GetBlockByIndex payload
type GetBlockByIndex struct {
IndexStart uint32
Count int16
}
// NewGetBlockByIndex returns GetBlockByIndex payload with specified start index and count
func NewGetBlockByIndex(indexStart uint32, count int16) *GetBlockByIndex {
return &GetBlockByIndex{
IndexStart: indexStart,
Count: count,
}
}
// DecodeBinary implements Serializable interface.
func (d *GetBlockByIndex) DecodeBinary(br *io.BinReader) {
d.IndexStart = br.ReadU32LE()
d.Count = int16(br.ReadU16LE())
if d.Count < -1 || d.Count == 0 || d.Count > MaxHeadersAllowed {
br.Err = errors.New("invalid block count")
}
}
// EncodeBinary implements Serializable interface.
func (d *GetBlockByIndex) EncodeBinary(bw *io.BinWriter) {
bw.WriteU32LE(d.IndexStart)
bw.WriteU16LE(uint16(d.Count))
}

View file

@ -8,18 +8,18 @@ import (
) )
func TestGetBlockDataEncodeDecode(t *testing.T) { func TestGetBlockDataEncodeDecode(t *testing.T) {
d := NewGetBlockData(123, 100) d := NewGetBlockByIndex(123, 100)
testserdes.EncodeDecodeBinary(t, d, new(GetBlockData)) testserdes.EncodeDecodeBinary(t, d, new(GetBlockByIndex))
// invalid block count // invalid block count
d = NewGetBlockData(5, 0) d = NewGetBlockByIndex(5, 0)
data, err := testserdes.EncodeBinary(d) data, err := testserdes.EncodeBinary(d)
require.NoError(t, err) require.NoError(t, err)
require.Error(t, testserdes.DecodeBinary(data, new(GetBlockData))) require.Error(t, testserdes.DecodeBinary(data, new(GetBlockByIndex)))
// invalid block count // invalid block count
d = NewGetBlockData(5, maxBlockCount+1) d = NewGetBlockByIndex(5, MaxHeadersAllowed+1)
data, err = testserdes.EncodeBinary(d) data, err = testserdes.EncodeBinary(d)
require.NoError(t, err) require.NoError(t, err)
require.Error(t, testserdes.DecodeBinary(data, new(GetBlockData))) require.Error(t, testserdes.DecodeBinary(data, new(GetBlockByIndex)))
} }

View file

@ -1,39 +0,0 @@
package payload
import (
"errors"
"github.com/nspcc-dev/neo-go/pkg/io"
)
// maximum number of blocks to query about
const maxBlockCount = 500
// GetBlockData payload
type GetBlockData struct {
IndexStart uint32
Count uint16
}
// NewGetBlockData returns GetBlockData payload with specified start index and count
func NewGetBlockData(indexStart uint32, count uint16) *GetBlockData {
return &GetBlockData{
IndexStart: indexStart,
Count: count,
}
}
// DecodeBinary implements Serializable interface.
func (d *GetBlockData) DecodeBinary(br *io.BinReader) {
d.IndexStart = br.ReadU32LE()
d.Count = br.ReadU16LE()
if d.Count == 0 || d.Count > maxBlockCount {
br.Err = errors.New("invalid block count")
}
}
// EncodeBinary implements Serializable interface.
func (d *GetBlockData) EncodeBinary(bw *io.BinWriter) {
bw.WriteU32LE(d.IndexStart)
bw.WriteU16LE(d.Count)
}

View file

@ -446,23 +446,6 @@ func (s *Server) handleVersionCmd(p Peer, version *payload.Version) error {
return p.SendVersionAck(NewMessage(CMDVerack, nil)) return p.SendVersionAck(NewMessage(CMDVerack, nil))
} }
// handleHeadersCmd processes the headers received from its peer.
// If the headerHeight of the blockchain still smaller then the peer
// the server will request more headers.
// This method could best be called in a separate routine.
func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) {
if err := s.chain.AddHeaders(headers.Hdrs...); err != nil {
s.log.Warn("failed processing headers", zap.Error(err))
return
}
// The peer will respond with a maximum of 2000 headers in one batch.
// We will ask one more batch here if needed. Eventually we will get synced
// due to the startProtocol routine that will ask headers every protoTick.
if s.chain.HeaderHeight() < p.LastBlockIndex() {
s.requestHeaders(p)
}
}
// handleBlockCmd processes the received block received from its peer. // handleBlockCmd processes the received block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
return s.bQueue.putBlock(block) return s.bQueue.putBlock(block)
@ -479,8 +462,8 @@ func (s *Server) handlePong(p Peer, pong *payload.Ping) error {
if err != nil { if err != nil {
return err return err
} }
if s.chain.HeaderHeight() < pong.LastBlockIndex { if s.chain.BlockHeight() < pong.LastBlockIndex {
return s.requestHeaders(p) return s.requestBlocks(p)
} }
return nil return nil
} }
@ -609,32 +592,41 @@ func (s *Server) handleGetBlocksCmd(p Peer, gb *payload.GetBlocks) error {
return p.EnqueueP2PMessage(msg) return p.EnqueueP2PMessage(msg)
} }
// handleGetBlockDataCmd processes the getblockdata request. // handleGetBlockByIndexCmd processes the getblockbyindex request.
func (s *Server) handleGetBlockDataCmd(p Peer, gbd *payload.GetBlockData) error { func (s *Server) handleGetBlockByIndexCmd(p Peer, gbd *payload.GetBlockByIndex) error {
for i := gbd.IndexStart; i < gbd.IndexStart+uint32(gbd.Count); i++ { count := gbd.Count
b, err := s.chain.GetBlock(s.chain.GetHeaderHash(int(i))) if gbd.Count < 0 || gbd.Count > payload.MaxHashesCount {
count = payload.MaxHashesCount
}
for i := gbd.IndexStart; i < gbd.IndexStart+uint32(count); i++ {
hash := s.chain.GetHeaderHash(int(i))
if hash.Equals(util.Uint256{}) {
break
}
b, err := s.chain.GetBlock(hash)
if err != nil { if err != nil {
return err break
} }
msg := NewMessage(CMDBlock, b) msg := NewMessage(CMDBlock, b)
return p.EnqueueP2PMessage(msg) if err = p.EnqueueP2PMessage(msg); err != nil {
return err
}
} }
return nil return nil
} }
// handleGetHeadersCmd processes the getheaders request. // handleGetHeadersCmd processes the getheaders request.
func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlocks) error { func (s *Server) handleGetHeadersCmd(p Peer, gh *payload.GetBlockByIndex) error {
count := gh.Count if gh.IndexStart > s.chain.HeaderHeight() {
if gh.Count < 0 || gh.Count > payload.MaxHashesCount { return nil
count = payload.MaxHashesCount
} }
start, err := s.chain.GetHeader(gh.HashStart) count := gh.Count
if err != nil { if gh.Count < 0 || gh.Count > payload.MaxHeadersAllowed {
return err count = payload.MaxHeadersAllowed
} }
resp := payload.Headers{} resp := payload.Headers{}
resp.Hdrs = make([]*block.Header, 0, payload.MaxHeadersAllowed) resp.Hdrs = make([]*block.Header, 0, count)
for i := start.Index + 1; i < start.Index+uint32(count); i++ { for i := gh.IndexStart; i < gh.IndexStart+uint32(count); i++ {
hash := s.chain.GetHeaderHash(int(i)) hash := s.chain.GetHeaderHash(int(i))
if hash.Equals(util.Uint256{}) { if hash.Equals(util.Uint256{}) {
break break
@ -698,34 +690,12 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
return p.EnqueueP2PMessage(NewMessage(CMDAddr, alist)) return p.EnqueueP2PMessage(NewMessage(CMDAddr, alist))
} }
// requestHeaders sends a getheaders message to the peer. // requestBlocks sends a CMDGetBlockByIndex message to the peer
// The peer will respond with headers op to a count of 500.
func (s *Server) requestHeaders(p Peer) error {
payload := payload.NewGetBlocks(s.chain.CurrentHeaderHash(), -1)
return p.EnqueueP2PMessage(NewMessage(CMDGetHeaders, payload))
}
// requestBlocks sends a getdata message to the peer
// to sync up in blocks. A maximum of maxBlockBatch will // to sync up in blocks. A maximum of maxBlockBatch will
// send at once. // send at once.
func (s *Server) requestBlocks(p Peer) error { func (s *Server) requestBlocks(p Peer) error {
var ( payload := payload.NewGetBlockByIndex(s.chain.BlockHeight(), -1)
hashes []util.Uint256 return p.EnqueueP2PMessage(NewMessage(CMDGetBlockByIndex, payload))
hashStart = s.chain.BlockHeight() + 1
headerHeight = s.chain.HeaderHeight()
)
for hashStart <= headerHeight && len(hashes) < maxBlockBatch {
hash := s.chain.GetHeaderHash(int(hashStart))
hashes = append(hashes, hash)
hashStart++
}
if len(hashes) > 0 {
payload := payload.NewInventory(payload.BlockType, hashes)
return p.EnqueueP2PMessage(NewMessage(CMDGetData, payload))
} else if s.chain.HeaderHeight() < p.LastBlockIndex() {
return s.requestHeaders(p)
}
return nil
} }
// handleMessage processes the given message. // handleMessage processes the given message.
@ -750,18 +720,15 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
case CMDGetBlocks: case CMDGetBlocks:
gb := msg.Payload.(*payload.GetBlocks) gb := msg.Payload.(*payload.GetBlocks)
return s.handleGetBlocksCmd(peer, gb) return s.handleGetBlocksCmd(peer, gb)
case CMDGetBlockData: case CMDGetBlockByIndex:
gbd := msg.Payload.(*payload.GetBlockData) gbd := msg.Payload.(*payload.GetBlockByIndex)
return s.handleGetBlockDataCmd(peer, gbd) return s.handleGetBlockByIndexCmd(peer, gbd)
case CMDGetData: case CMDGetData:
inv := msg.Payload.(*payload.Inventory) inv := msg.Payload.(*payload.Inventory)
return s.handleGetDataCmd(peer, inv) return s.handleGetDataCmd(peer, inv)
case CMDGetHeaders: case CMDGetHeaders:
gh := msg.Payload.(*payload.GetBlocks) gh := msg.Payload.(*payload.GetBlockByIndex)
return s.handleGetHeadersCmd(peer, gh) return s.handleGetHeadersCmd(peer, gh)
case CMDHeaders:
headers := msg.Payload.(*payload.Headers)
go s.handleHeadersCmd(peer, headers)
case CMDInv: case CMDInv:
inventory := msg.Payload.(*payload.Inventory) inventory := msg.Payload.(*payload.Inventory)
return s.handleInvCmd(peer, inventory) return s.handleInvCmd(peer, inventory)

View file

@ -142,15 +142,3 @@ func TestServerNotSendsVerack(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
require.Equal(t, errAlreadyConnected, err) require.Equal(t, errAlreadyConnected, err)
} }
func TestRequestHeaders(t *testing.T) {
var (
s = newTestServer(t, ServerConfig{})
p = newLocalPeer(t, s)
)
p.messageHandler = func(t *testing.T, msg *Message) {
assert.IsType(t, &payload.GetBlocks{}, msg.Payload)
assert.Equal(t, CMDGetHeaders, msg.Command)
}
s.requestHeaders(p)
}

View file

@ -235,8 +235,8 @@ func (p *TCPPeer) StartProtocol() {
zap.Uint32("id", p.Version().Nonce)) zap.Uint32("id", p.Version().Nonce))
p.server.discovery.RegisterGoodAddr(p.PeerAddr().String(), p.version.Capabilities) p.server.discovery.RegisterGoodAddr(p.PeerAddr().String(), p.version.Capabilities)
if p.server.chain.HeaderHeight() < p.LastBlockIndex() { if p.server.chain.BlockHeight() < p.LastBlockIndex() {
err = p.server.requestHeaders(p) err = p.server.requestBlocks(p)
if err != nil { if err != nil {
p.Disconnect(err) p.Disconnect(err)
return return