network: rework ping sends, fix overpinging

Our node was too pingy because of wrong timer setups (that divided timeout
Duration by time.Second), it also was wrong in its time calculations (using
UTC time to calculate intervals). At the same time missing block is a
server-wide problem, so it's better solved with server-wide protocol loop.
This commit is contained in:
Roman Khimov 2020-01-27 12:44:05 +03:00
parent 39800aecb3
commit 06c3fbe455
4 changed files with 31 additions and 19 deletions

View file

@ -225,8 +225,9 @@ func (p *localPeer) HandleVersionAck() error {
p.handshaked = true p.handshaked = true
return nil return nil
} }
func (p *localPeer) SendPing() error { func (p *localPeer) SendPing(m *Message) error {
p.pingSent++ p.pingSent++
_ = p.EnqueueMessage(m)
return nil return nil
} }
func (p *localPeer) HandlePong(pong *payload.Ping) error { func (p *localPeer) HandlePong(pong *payload.Ping) error {

View file

@ -41,7 +41,7 @@ type Peer interface {
// SendPing enqueues a ping message to be sent to the peer and does // SendPing enqueues a ping message to be sent to the peer and does
// appropriate protocol handling like timeouts and outstanding pings // appropriate protocol handling like timeouts and outstanding pings
// management. // management.
SendPing() error SendPing(*Message) error
// SendVersion checks handshake status and sends a version message to // SendVersion checks handshake status and sends a version message to
// the peer. // the peer.
SendVersion() error SendVersion() error

View file

@ -66,8 +66,6 @@ type (
quit chan struct{} quit chan struct{}
connected *atomic.Bool connected *atomic.Bool
// Time of the last block receival.
lastBlockTS *atomic.Int64
log *zap.Logger log *zap.Logger
} }
@ -100,7 +98,6 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (*
unregister: make(chan peerDrop), unregister: make(chan peerDrop),
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
connected: atomic.NewBool(false), connected: atomic.NewBool(false),
lastBlockTS: atomic.NewInt64(0),
log: log, log: log,
} }
@ -195,7 +192,10 @@ func (s *Server) BadPeers() []string {
return []string{} return []string{}
} }
// run is a goroutine that starts another goroutine to manage protocol specifics
// while itself dealing with peers management (handling connects/disconnects).
func (s *Server) run() { func (s *Server) run() {
go s.runProto()
for { for {
if s.PeerCount() < s.MinPeers { if s.PeerCount() < s.MinPeers {
s.discovery.RequestRemote(s.AttemptConnPeers) s.discovery.RequestRemote(s.AttemptConnPeers)
@ -254,6 +254,26 @@ func (s *Server) run() {
} }
} }
// runProto is a goroutine that manages server-wide protocol events.
func (s *Server) runProto() {
pingTimer := time.NewTimer(s.PingInterval)
for {
prevHeight := s.chain.BlockHeight()
select {
case <-s.quit:
return
case <-pingTimer.C:
if s.chain.BlockHeight() == prevHeight {
// Get a copy of s.peers to avoid holding a lock while sending.
for peer := range s.Peers() {
_ = peer.SendPing(s.MkMsg(CMDPing, payload.NewPing(s.id, s.chain.HeaderHeight())))
}
}
pingTimer.Reset(s.PingInterval)
}
}
}
func (s *Server) tryStartConsensus() { func (s *Server) tryStartConsensus() {
if s.Wallet == nil || s.connected.Load() { if s.Wallet == nil || s.connected.Load() {
return return
@ -359,7 +379,6 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) {
// handleBlockCmd processes the received block received from its peer. // handleBlockCmd processes the received block received from its peer.
func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
s.lastBlockTS.Store(time.Now().UTC().Unix())
return s.bQueue.putBlock(block) return s.bQueue.putBlock(block)
} }
@ -663,12 +682,6 @@ func (s *Server) handleNewPayload(p *consensus.Payload) {
s.broadcastHPMessage(msg) s.broadcastHPMessage(msg)
} }
// getLastBlockTime returns unix timestamp for the moment when the last block
// was received.
func (s *Server) getLastBlockTime() int64 {
return s.lastBlockTS.Load()
}
func (s *Server) requestTx(hashes ...util.Uint256) { func (s *Server) requestTx(hashes ...util.Uint256) {
if len(hashes) == 0 { if len(hashes) == 0 {
return return

View file

@ -202,11 +202,6 @@ func (p *TCPPeer) StartProtocol() {
// Try to sync in headers and block with the peer if his block height is higher then ours. // Try to sync in headers and block with the peer if his block height is higher then ours.
if p.LastBlockIndex() > p.server.chain.BlockHeight() { if p.LastBlockIndex() > p.server.chain.BlockHeight() {
err = p.server.requestBlocks(p) err = p.server.requestBlocks(p)
} else {
diff := time.Now().UTC().Unix() - p.server.getLastBlockTime()
if diff > int64(p.server.PingInterval/time.Second) {
err = p.SendPing()
}
} }
if err == nil { if err == nil {
timer.Reset(p.server.ProtoTickInterval) timer.Reset(p.server.ProtoTickInterval)
@ -340,7 +335,10 @@ func (p *TCPPeer) LastBlockIndex() uint32 {
// SendPing sends a ping message to the peer and does appropriate accounting of // SendPing sends a ping message to the peer and does appropriate accounting of
// outstanding pings and timeouts. // outstanding pings and timeouts.
func (p *TCPPeer) SendPing() error { func (p *TCPPeer) SendPing(msg *Message) error {
if !p.Handshaked() {
return errStateMismatch
}
p.lock.Lock() p.lock.Lock()
p.pingSent++ p.pingSent++
if p.pingTimer == nil { if p.pingTimer == nil {
@ -349,7 +347,7 @@ func (p *TCPPeer) SendPing() error {
}) })
} }
p.lock.Unlock() p.lock.Unlock()
return p.EnqueueMessage(p.server.MkMsg(CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) return p.EnqueueMessage(msg)
} }
// HandlePong handles a pong message received from the peer and does appropriate // HandlePong handles a pong message received from the peer and does appropriate