diff --git a/pkg/network/helper_test.go b/pkg/network/helper_test.go index 0c6bd9281..3f76a5b30 100644 --- a/pkg/network/helper_test.go +++ b/pkg/network/helper_test.go @@ -225,8 +225,9 @@ func (p *localPeer) HandleVersionAck() error { p.handshaked = true return nil } -func (p *localPeer) SendPing() error { +func (p *localPeer) SendPing(m *Message) error { p.pingSent++ + _ = p.EnqueueMessage(m) return nil } func (p *localPeer) HandlePong(pong *payload.Ping) error { diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 9f2443d0c..332a4f99a 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -41,7 +41,7 @@ type Peer interface { // SendPing enqueues a ping message to be sent to the peer and does // appropriate protocol handling like timeouts and outstanding pings // management. - SendPing() error + SendPing(*Message) error // SendVersion checks handshake status and sends a version message to // the peer. SendVersion() error diff --git a/pkg/network/server.go b/pkg/network/server.go index f08e0e54c..89357d632 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -66,8 +66,6 @@ type ( quit chan struct{} connected *atomic.Bool - // Time of the last block receival. - lastBlockTS *atomic.Int64 log *zap.Logger } @@ -100,7 +98,6 @@ func NewServer(config ServerConfig, chain core.Blockchainer, log *zap.Logger) (* unregister: make(chan peerDrop), peers: make(map[Peer]bool), connected: atomic.NewBool(false), - lastBlockTS: atomic.NewInt64(0), log: log, } @@ -195,7 +192,10 @@ func (s *Server) BadPeers() []string { return []string{} } +// run is a goroutine that starts another goroutine to manage protocol specifics +// while itself dealing with peers management (handling connects/disconnects). func (s *Server) run() { + go s.runProto() for { if s.PeerCount() < s.MinPeers { s.discovery.RequestRemote(s.AttemptConnPeers) @@ -254,6 +254,26 @@ func (s *Server) run() { } } +// runProto is a goroutine that manages server-wide protocol events. +func (s *Server) runProto() { + pingTimer := time.NewTimer(s.PingInterval) + for { + prevHeight := s.chain.BlockHeight() + select { + case <-s.quit: + return + case <-pingTimer.C: + if s.chain.BlockHeight() == prevHeight { + // Get a copy of s.peers to avoid holding a lock while sending. + for peer := range s.Peers() { + _ = peer.SendPing(s.MkMsg(CMDPing, payload.NewPing(s.id, s.chain.HeaderHeight()))) + } + } + pingTimer.Reset(s.PingInterval) + } + } +} + func (s *Server) tryStartConsensus() { if s.Wallet == nil || s.connected.Load() { return @@ -359,7 +379,6 @@ func (s *Server) handleHeadersCmd(p Peer, headers *payload.Headers) { // handleBlockCmd processes the received block received from its peer. func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { - s.lastBlockTS.Store(time.Now().UTC().Unix()) return s.bQueue.putBlock(block) } @@ -435,13 +454,16 @@ func (s *Server) handleGetDataCmd(p Peer, inv *payload.Inventory) error { } if msg != nil { pkt, err := msg.Bytes() + if err == nil { + if inv.Type == payload.ConsensusType { + err = p.EnqueueHPPacket(pkt) + } else { + err = p.EnqueuePacket(pkt) + } + } if err != nil { return err } - if inv.Type == payload.ConsensusType { - return p.EnqueueHPPacket(pkt) - } - return p.EnqueuePacket(pkt) } } return nil @@ -660,12 +682,6 @@ func (s *Server) handleNewPayload(p *consensus.Payload) { s.broadcastHPMessage(msg) } -// getLastBlockTime returns unix timestamp for the moment when the last block -// was received. -func (s *Server) getLastBlockTime() int64 { - return s.lastBlockTS.Load() -} - func (s *Server) requestTx(hashes ...util.Uint256) { if len(hashes) == 0 { return diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 096966359..1dfbbcc44 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -202,11 +202,6 @@ func (p *TCPPeer) StartProtocol() { // Try to sync in headers and block with the peer if his block height is higher then ours. if p.LastBlockIndex() > p.server.chain.BlockHeight() { err = p.server.requestBlocks(p) - } else { - diff := time.Now().UTC().Unix() - p.server.getLastBlockTime() - if diff > int64(p.server.PingInterval/time.Second) { - err = p.SendPing() - } } if err == nil { timer.Reset(p.server.ProtoTickInterval) @@ -340,7 +335,10 @@ func (p *TCPPeer) LastBlockIndex() uint32 { // SendPing sends a ping message to the peer and does appropriate accounting of // outstanding pings and timeouts. -func (p *TCPPeer) SendPing() error { +func (p *TCPPeer) SendPing(msg *Message) error { + if !p.Handshaked() { + return errStateMismatch + } p.lock.Lock() p.pingSent++ if p.pingTimer == nil { @@ -349,7 +347,7 @@ func (p *TCPPeer) SendPing() error { }) } p.lock.Unlock() - return p.EnqueueMessage(p.server.MkMsg(CMDPing, payload.NewPing(p.server.id, p.server.chain.HeaderHeight()))) + return p.EnqueueMessage(msg) } // HandlePong handles a pong message received from the peer and does appropriate