Merge pull request #627 from nspcc-dev/fix-getdata-and-pings

Fix getdata and pings
This commit is contained in:
Roman Khimov 2020-01-28 17:43:13 +03:00 committed by GitHub
commit ab03aee2cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 38 additions and 23 deletions

View file

@ -225,8 +225,9 @@ func (p *localPeer) HandleVersionAck() error {
p.handshaked = true p.handshaked = true
return nil return nil
} }
func (p *localPeer) SendPing() error { func (p *localPeer) SendPing(m *Message) error {
p.pingSent++ p.pingSent++
_ = p.EnqueueMessage(m)
return nil return nil
} }
func (p *localPeer) HandlePong(pong *payload.Ping) error { func (p *localPeer) HandlePong(pong *payload.Ping) error {

View file

@ -41,7 +41,7 @@ type Peer interface {
// SendPing enqueues a ping message to be sent to the peer and does // SendPing enqueues a ping message to be sent to the peer and does
// appropriate protocol handling like timeouts and outstanding pings // appropriate protocol handling like timeouts and outstanding pings
// management. // management.
SendPing() error SendPing(*Message) error
// SendVersion checks handshake status and sends a version message to // SendVersion checks handshake status and sends a version message to
// the peer. // the peer.
SendVersion() error SendVersion() error

View file

@ -66,8 +66,6 @@ type (
quit chan struct{} quit chan struct{}
connected *atomic.Bool connected *atomic.Bool
// Time of the last block receival.
lastBlockTS *atomic.Int64
log *zap.Logger log *zap.Logger
} }
@ -100,7 +98,6 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
connected: atomic.NewBool(false), connected: atomic.NewBool(false),
lastBlockTS: atomic.NewInt64(0),
log: log, log: log,
} }
@ -195,7 +192,10 @@ func (s *Server) BadPeers() []string {
return []string{} return []string{}
} }
// run is a goroutine that starts another goroutine to manage protocol specifics
// while itself dealing with peers management (handling connects/disconnects).
func (s *Server) run() { func (s *Server) run() {
go s.runProto()
for { for {
if s.PeerCount() < s.MinPeers { if s.PeerCount() < s.MinPeers {
s.discovery.RequestRemote(s.AttemptConnPeers) s.discovery.RequestRemote(s.AttemptConnPeers)
@ -254,6 +254,26 @@ func (s *Server) run() {
} }
} }
// runProto is a goroutine that manages server-wide protocol events.
func (s *Server) runProto() {
pingTimer := time.NewTimer(s.PingInterval)
for {
prevHeight := s.chain.BlockHeight()
select {
case <-s.quit:
return
case <-pingTimer.C:
if s.chain.BlockHeight() == prevHeight {
// Get a copy of s.peers to avoid holding a lock while sending.
for peer := range s.Peers() {
_ = peer.SendPing(s.MkMsg(CMDPing, payload.NewPing(s.id, s.chain.HeaderHeight())))
}
}
pingTimer.Reset(s.PingInterval)
}
}
}
func (s *Server) tryStartConsensus() { func (s *Server) tryStartConsensus() {
if s.Wallet == nil || s.connected.Load() { if s.Wallet == nil || s.connected.Load() {
return return
@ -359,7 +379,6 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) {
// handleBlockCmd processes the received block received from its peer. // handleBlockCmd processes the received block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
s.lastBlockTS.Store(time.Now().UTC().Unix())
return s.bQueue.putBlock(block) return s.bQueue.putBlock(block)
} }
@ -435,13 +454,16 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error {
} }
if msg != nil { if msg != nil {
pkt, err := msg.Bytes() pkt, err := msg.Bytes()
if err == nil {
if inv.Type == payload.ConsensusType {
err = p.EnqueueHPPacket(pkt)
} else {
err = p.EnqueuePacket(pkt)
}
}
if err != nil { if err != nil {
return err return err
} }
if inv.Type == payload.ConsensusType {
return p.EnqueueHPPacket(pkt)
}
return p.EnqueuePacket(pkt)
} }
} }
return nil return nil
@ -660,12 +682,6 @@ func (s *Server) handleNewPayload(p *consensus.Payload) {
s.broadcastHPMessage(msg) s.broadcastHPMessage(msg)
} }
// getLastBlockTime returns unix timestamp for the moment when the last block
// was received.
func (s *Server) getLastBlockTime() int64 {
return s.lastBlockTS.Load()
}
func (s *Server) requestTx(hashes ...util.Uint256) { func (s *Server) requestTx(hashes ...util.Uint256) {
if len(hashes) == 0 { if len(hashes) == 0 {
return return

View file

@ -202,11 +202,6 @@ func (p *TCPPeer) StartProtocol() {
// Try to sync in headers and block with the peer if his block height is higher then ours. // Try to sync in headers and block with the peer if his block height is higher then ours.
if p.LastBlockIndex() > p.server.chain.BlockHeight() { if p.LastBlockIndex() > p.server.chain.BlockHeight() {
err = p.server.requestBlocks(p) err = p.server.requestBlocks(p)
} else {
diff := time.Now().UTC().Unix() - p.server.getLastBlockTime()
if diff > int64(p.server.PingInterval/time.Second) {
err = p.SendPing()
}
} }
if err == nil { if err == nil {
timer.Reset(p.server.ProtoTickInterval) timer.Reset(p.server.ProtoTickInterval)
@ -340,7 +335,10 @@ func (p *TCPPeer) LastBlockIndex() uint32 {
// SendPing sends a ping message to the peer and does appropriate accounting of // SendPing sends a ping message to the peer and does appropriate accounting of
// outstanding pings and timeouts. // outstanding pings and timeouts.
func (p *TCPPeer) SendPing() error { func (p *TCPPeer) SendPing(msg *Message) error {
if !p.Handshaked() {
return errStateMismatch
}
p.lock.Lock() p.lock.Lock()
p.pingSent++ p.pingSent++
if p.pingTimer == nil { if p.pingTimer == nil {
@ -349,7 +347,7 @@ func (p *TCPPeer) SendPing() error {
}) })
} }
p.lock.Unlock() p.lock.Unlock()
return p.EnqueueMessage(p.server.MkMsg(CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) return p.EnqueueMessage(msg)
} }
// HandlePong handles a pong message received from the peer and does appropriate // HandlePong handles a pong message received from the peer and does appropriate