Merge pull request #456 from nspcc-dev/pingpong_430

add ping pong processing
This commit is contained in:
Roman Khimov 2020-01-20 16:10:29 +03:00 committed by GitHub
commit bb80ba9b9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 195 additions and 3 deletions

View file

@ -27,6 +27,7 @@ const (
maxBlockBatch = 200
maxAddrsToSend = 200
minPoolCount = 30
defaultPingLimit = 4
)
var (
@ -314,7 +315,7 @@ func (s *Server) startProtocol(p Peer) {
zap.Uint32("id", p.Version().Nonce))
s.discovery.RegisterGoodAddr(p.PeerAddr().String())
if s.chain.HeaderHeight() < p.Version().StartHeight {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
err = s.requestHeaders(p)
if err != nil {
p.Disconnect(err)
@ -323,6 +324,7 @@ func (s *Server) startProtocol(p Peer) {
}
timer := time.NewTimer(s.ProtoTickInterval)
pingTimer := time.NewTimer(s.PingTimeout)
for {
select {
case err = <-p.Done():
@ -331,12 +333,31 @@ func (s *Server) startProtocol(p Peer) {
err = p.WriteMsg(m)
case <-timer.C:
// Try to sync in headers and block with the peer if his block height is higher then ours.
if p.Version().StartHeight > s.chain.BlockHeight() {
if p.LastBlockIndex() > s.chain.BlockHeight() {
err = s.requestBlocks(p)
}
if err == nil {
timer.Reset(s.ProtoTickInterval)
}
if s.chain.HeaderHeight() >= p.LastBlockIndex() {
block, errGetBlock := s.chain.GetBlock(s.chain.CurrentBlockHash())
if errGetBlock != nil {
err = errGetBlock
} else {
diff := uint32(time.Now().UTC().Unix()) - block.Timestamp
if diff > uint32(s.PingInterval/time.Second) {
p.UpdatePingSent(p.GetPingSent() + 1)
err = p.WriteMsg(NewMessage(s.Net, CMDPing, payload.NewPing(s.id, s.chain.HeaderHeight())))
}
}
}
case <-pingTimer.C:
if p.GetPingSent() > defaultPingLimit {
err = errors.New("ping/pong timeout")
} else {
pingTimer.Reset(s.PingTimeout)
p.UpdatePingSent(0)
}
}
if err != nil {
s.unregister <- peerDrop{p, err}
@ -394,7 +415,7 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) {
// The peer will respond with a maximum of 2000 headers in one batch.
// We will ask one more batch here if needed. Eventually we will get synced
// due to the startProtocol routine that will ask headers every protoTick.
if s.chain.HeaderHeight() < p.Version().StartHeight {
if s.chain.HeaderHeight() < p.LastBlockIndex() {
s.requestHeaders(p)
}
}
@ -404,6 +425,25 @@ func (s *Server) handleBlockCmd(p Peer, block *core.Block) error {
return s.bQueue.putBlock(block)
}
// handlePing processes ping request.
func (s *Server) handlePing(p Peer, ping *payload.Ping) error {
return p.WriteMsg(NewMessage(s.Net, CMDPong, payload.NewPing(s.id, s.chain.BlockHeight())))
}
// handlePing processes pong request.
func (s *Server) handlePong(p Peer, pong *payload.Ping) error {
pingSent := p.GetPingSent()
if pingSent == 0 {
return errors.New("pong message wasn't expected")
}
p.UpdatePingSent(pingSent - 1)
p.UpdateLastBlockIndex(pong.LastBlockIndex)
if s.chain.HeaderHeight() < pong.LastBlockIndex {
return s.requestHeaders(p)
}
return nil
}
// handleInvCmd processes the received inventory.
func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error {
reqHashes := make([]util.Uint256, 0)
@ -640,6 +680,12 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error {
case CMDTX:
tx := msg.Payload.(*transaction.Transaction)
return s.handleTxCmd(tx)
case CMDPing:
ping := msg.Payload.(*payload.Ping)
return s.handlePing(peer, ping)
case CMDPong:
pong := msg.Payload.(*payload.Ping)
return s.handlePong(peer, pong)
case CMDVersion, CMDVerack:
return fmt.Errorf("received '%s' after the handshake", msg.CommandType())
}