From d3bb8ddf8ff8de9754dbf569d39008e1cc518ff5 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Fri, 13 Sep 2019 15:36:53 +0300 Subject: [PATCH] network: handle errors and connection close more correctly This makes writer side handle errors properly and fixes communication between reader and writer goroutine to always correctly unregister the peer. This is especially important for the case where error occurs before handshake completes as in this case we don't even have goroutine in startProtocol() running. --- pkg/network/server.go | 54 +++++++++++++++++++++++------------- pkg/network/tcp_peer.go | 7 ++++- pkg/network/tcp_transport.go | 10 +++---- 3 files changed, 45 insertions(+), 26 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 6afd7c861..b2817a34d 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -153,13 +153,17 @@ func (s *Server) run() { "addr": p.NetAddr(), }).Info("new peer connected") case drop := <-s.unregister: - delete(s.peers, drop.peer) - log.WithFields(log.Fields{ - "addr": drop.peer.NetAddr(), - "reason": drop.reason, - "peerCount": s.PeerCount(), - }).Warn("peer disconnected") - s.discovery.BackFill(drop.peer.NetAddr().String()) + if s.peers[drop.peer] { + delete(s.peers, drop.peer) + log.WithFields(log.Fields{ + "addr": drop.peer.NetAddr(), + "reason": drop.reason, + "peerCount": s.PeerCount(), + }).Warn("peer disconnected") + s.discovery.BackFill(drop.peer.NetAddr().String()) + } + // else the peer is already gone, which can happen + // because we have two goroutines sending signals here } } } @@ -187,22 +191,33 @@ func (s *Server) startProtocol(p Peer) { "id": p.Version().Nonce, }).Info("started protocol") - s.requestHeaders(p) + err := s.requestHeaders(p) + if err != nil { + p.Disconnect(err) + return + } timer := time.NewTimer(s.ProtoTickInterval) for { select { - case err := <-p.Done(): - s.unregister <- peerDrop{p, err} - return + case err = <-p.Done(): + // time to stop case m := <-s.addrReq: - p.WriteMsg(m) + err = p.WriteMsg(m) case <-timer.C: // Try to sync in headers and block with the peer if his block height is higher then ours. if p.Version().StartHeight > s.chain.BlockHeight() { - s.requestBlocks(p) + err = s.requestBlocks(p) } - timer.Reset(s.ProtoTickInterval) + if err == nil { + timer.Reset(s.ProtoTickInterval) + } + } + if err != nil { + s.unregister <- peerDrop{p, err} + timer.Stop() + p.Disconnect(err) + return } } } @@ -279,16 +294,16 @@ func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error { // requestHeaders will send a getheaders message to the peer. // The peer will respond with headers op to a count of 2000. -func (s *Server) requestHeaders(p Peer) { +func (s *Server) requestHeaders(p Peer) error { start := []util.Uint256{s.chain.CurrentHeaderHash()} payload := payload.NewGetBlocks(start, util.Uint256{}) - p.WriteMsg(NewMessage(s.Net, CMDGetHeaders, payload)) + return p.WriteMsg(NewMessage(s.Net, CMDGetHeaders, payload)) } // requestBlocks will send a getdata message to the peer // to sync up in blocks. A maximum of maxBlockBatch will // send at once. -func (s *Server) requestBlocks(p Peer) { +func (s *Server) requestBlocks(p Peer) error { var ( hashes []util.Uint256 hashStart = s.chain.BlockHeight() + 1 @@ -301,10 +316,11 @@ func (s *Server) requestBlocks(p Peer) { } if len(hashes) > 0 { payload := payload.NewInventory(payload.BlockType, hashes) - p.WriteMsg(NewMessage(s.Net, CMDGetData, payload)) + return p.WriteMsg(NewMessage(s.Net, CMDGetData, payload)) } else if s.chain.HeaderHeight() < p.Version().StartHeight { - s.requestHeaders(p) + return s.requestHeaders(p) } + return nil } // handleMessage will process the given message. diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 2ebc12078..10dac0a82 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -136,7 +136,12 @@ func (p *TCPPeer) Done() chan error { // Disconnect will fill the peer's done channel with the given error. func (p *TCPPeer) Disconnect(err error) { p.conn.Close() - p.done <- err + select { + case p.done <- err: + // one message to the queue + default: + // the other side may already be gone, it's OK + } } // Version implements the Peer interface. diff --git a/pkg/network/tcp_transport.go b/pkg/network/tcp_transport.go index 82ea5f351..8f61b7607 100644 --- a/pkg/network/tcp_transport.go +++ b/pkg/network/tcp_transport.go @@ -75,21 +75,19 @@ func (t *TCPTransport) handleConn(conn net.Conn) { err error ) - defer func() { - p.Disconnect(err) - }() - t.server.register <- p for { msg := &Message{} if err = msg.Decode(p.conn); err != nil { - return + break } if err = t.server.handleMessage(p, msg); err != nil { - return + break } } + t.server.unregister <- peerDrop{p, err} + p.Disconnect(err) } // Close implements the Transporter interface.