network: move peer filtering to getPeers()

It doesn't change much, we can't magically get more valid peers and if some
die while we're iterating we'd detect that by an error returned from send().
This commit is contained in:
Roman Khimov 2021-08-06 15:04:13 +03:00
parent de6f4987f6
commit 80f3ec2312

View file

@ -288,7 +288,7 @@ func (s *Server) Shutdown() {
s.transport.Close() s.transport.Close()
s.discovery.Close() s.discovery.Close()
s.consensus.Shutdown() s.consensus.Shutdown()
for _, p := range s.getPeers() { for _, p := range s.getPeers(nil) {
p.Disconnect(errServerShutdown) p.Disconnect(errServerShutdown)
} }
s.bQueue.discard() s.bQueue.discard()
@ -431,7 +431,7 @@ func (s *Server) runProto() {
case <-pingTimer.C: case <-pingTimer.C:
if s.chain.BlockHeight() == prevHeight { if s.chain.BlockHeight() == prevHeight {
// Get a copy of s.peers to avoid holding a lock while sending. // Get a copy of s.peers to avoid holding a lock while sending.
for _, peer := range s.getPeers() { for _, peer := range s.getPeers(nil) {
_ = peer.SendPing(NewMessage(CMDPing, payload.NewPing(s.chain.BlockHeight(), s.id))) _ = peer.SendPing(NewMessage(CMDPing, payload.NewPing(s.chain.BlockHeight(), s.id)))
} }
} }
@ -489,14 +489,17 @@ func (s *Server) UnsubscribeFromNotaryRequests(ch chan<- mempoolevent.Event) {
s.notaryRequestPool.UnsubscribeFromTransactions(ch) s.notaryRequestPool.UnsubscribeFromTransactions(ch)
} }
// getPeers returns current list of peers connected to // getPeers returns current list of peers connected to the server filtered by
// the server. // isOK function if it's given.
func (s *Server) getPeers() []Peer { func (s *Server) getPeers(isOK func(Peer) bool) []Peer {
s.lock.RLock() s.lock.RLock()
defer s.lock.RUnlock() defer s.lock.RUnlock()
peers := make([]Peer, 0, len(s.peers)) peers := make([]Peer, 0, len(s.peers))
for k := range s.peers { for k := range s.peers {
if isOK != nil && !isOK(k) {
continue
}
peers = append(peers, k) peers = append(peers, k)
} }
@ -1142,7 +1145,7 @@ func (s *Server) requestTx(hashes ...util.Uint256) {
// peer is considered invalid if it returns false). // peer is considered invalid if it returns false).
func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) { func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []byte) error, peerOK func(Peer) bool) {
// Get a copy of s.peers to avoid holding a lock while sending. // Get a copy of s.peers to avoid holding a lock while sending.
peers := s.getPeers() peers := s.getPeers(peerOK)
if len(peers) == 0 { if len(peers) == 0 {
return return
} }
@ -1158,9 +1161,6 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b
okCount := 0 okCount := 0
sentCount := 0 sentCount := 0
for i, peer := range peers { for i, peer := range peers {
if peerOK != nil && !peerOK(peer) {
continue
}
okCount++ okCount++
if err := send(peer, false, pkt); err != nil { if err := send(peer, false, pkt); err != nil {
continue continue
@ -1179,7 +1179,7 @@ func (s *Server) iteratePeersWithSendMsg(msg *Message, send func(Peer, bool, []b
// Perform blocking send now. // Perform blocking send now.
for i, peer := range peers { for i, peer := range peers {
if success[i] || (peerOK != nil && !peerOK(peer)) { if success[i] {
continue continue
} }
if err := send(peer, true, pkt); err != nil { if err := send(peer, true, pkt); err != nil {