From 06c3fbe45554e361dbb386d9985c327bbe491811 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 27 Jan 2020 12:44:05 +0300 Subject: [PATCH] network: rework ping sends, fix overpinging Our node was too pingy because of wrong timer setups (that divided timeout Duration by time.Second), it also was wrong in its time calculations (using UTC time to calculate intervals). At the same time missing block is a server-wide problem, so it's better solved with server-wide protocol loop. --- pkg/network/helper_test.go | 3 ++- pkg/network/peer.go | 2 +- pkg/network/server.go | 33 +++++++++++++++++++++++---------- pkg/network/tcp_peer.go | 12 +++++------- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 0c6bd9281..3f76a5b30 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -225,8 +225,9 @@ func (p *localPeer) HandleVersionAck() error { p.handshaked = true return nil } -func (p *localPeer) SendPing() error { +func (p *localPeer) SendPing(m *Message) error { p.pingSent++ + _ = p.EnqueueMessage(m) return nil } func (p *localPeer) HandlePong(pong *payload.Ping) error { diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 9f2443d0c..332a4f99a 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -41,7 +41,7 @@ type Peer interface { // SendPing enqueues a ping message to be sent to the peer and does // appropriate protocol handling like timeouts and outstanding pings // management. - SendPing() error + SendPing(*Message) error // SendVersion checks handshake status and sends a version message to // the peer. SendVersion() error diff --git a/pkg/network/server.go b/pkg/network/server.go index c1407742f..89357d632 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -66,8 +66,6 @@ type ( quit chan struct{} connected *atomic.Bool - // Time of the last block receival. - lastBlockTS *atomic.Int64 log *zap.Logger } @@ -100,7 +98,6 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* unregister: make(chan peerDrop), peers: make(map[Peer]bool), connected: atomic.NewBool(false), - lastBlockTS: atomic.NewInt64(0), log: log, } @@ -195,7 +192,10 @@ func (s *Server) BadPeers() []string { return []string{} } +// run is a goroutine that starts another goroutine to manage protocol specifics +// while itself dealing with peers management (handling connects/disconnects). func (s *Server) run() { + go s.runProto() for { if s.PeerCount() < s.MinPeers { s.discovery.RequestRemote(s.AttemptConnPeers) @@ -254,6 +254,26 @@ func (s *Server) run() { } } +// runProto is a goroutine that manages server-wide protocol events. +func (s *Server) runProto() { + pingTimer := time.NewTimer(s.PingInterval) + for { + prevHeight := s.chain.BlockHeight() + select { + case <-s.quit: + return + case <-pingTimer.C: + if s.chain.BlockHeight() == prevHeight { + // Get a copy of s.peers to avoid holding a lock while sending. + for peer := range s.Peers() { + _ = peer.SendPing(s.MkMsg(CMDPing, payload.NewPing(s.id, s.chain.HeaderHeight()))) + } + } + pingTimer.Reset(s.PingInterval) + } + } +} + func (s *Server) tryStartConsensus() { if s.Wallet == nil || s.connected.Load() { return @@ -359,7 +379,6 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) { // handleBlockCmd processes the received block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { - s.lastBlockTS.Store(time.Now().UTC().Unix()) return s.bQueue.putBlock(block) } @@ -663,12 +682,6 @@ func (s *Server) handleNewPayload(p *consensus.Payload) { s.broadcastHPMessage(msg) } -// getLastBlockTime returns unix timestamp for the moment when the last block -// was received. -func (s *Server) getLastBlockTime() int64 { - return s.lastBlockTS.Load() -} - func (s *Server) requestTx(hashes ...util.Uint256) { if len(hashes) == 0 { return diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 096966359..1dfbbcc44 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -202,11 +202,6 @@ func (p *TCPPeer) StartProtocol() { // Try to sync in headers and block with the peer if his block height is higher then ours. if p.LastBlockIndex() > p.server.chain.BlockHeight() { err = p.server.requestBlocks(p) - } else { - diff := time.Now().UTC().Unix() - p.server.getLastBlockTime() - if diff > int64(p.server.PingInterval/time.Second) { - err = p.SendPing() - } } if err == nil { timer.Reset(p.server.ProtoTickInterval) @@ -340,7 +335,10 @@ func (p *TCPPeer) LastBlockIndex() uint32 { // SendPing sends a ping message to the peer and does appropriate accounting of // outstanding pings and timeouts. -func (p *TCPPeer) SendPing() error { +func (p *TCPPeer) SendPing(msg *Message) error { + if !p.Handshaked() { + return errStateMismatch + } p.lock.Lock() p.pingSent++ if p.pingTimer == nil { @@ -349,7 +347,7 @@ func (p *TCPPeer) SendPing() error { }) } p.lock.Unlock() - return p.EnqueueMessage(p.server.MkMsg(CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) + return p.EnqueueMessage(msg) } // HandlePong handles a pong message received from the peer and does appropriate