diff --git a/pkg/network/server.go b/pkg/network/server.go index 6afd7c861..b2817a34d 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -153,13 +153,17 @@ func (s *Server) run() { "addr": p.NetAddr(), }).Info("new peer connected") case drop := <-s.unregister: - delete(s.peers, drop.peer) - log.WithFields(log.Fields{ - "addr": drop.peer.NetAddr(), - "reason": drop.reason, - "peerCount": s.PeerCount(), - }).Warn("peer disconnected") - s.discovery.BackFill(drop.peer.NetAddr().String()) + if s.peers[drop.peer] { + delete(s.peers, drop.peer) + log.WithFields(log.Fields{ + "addr": drop.peer.NetAddr(), + "reason": drop.reason, + "peerCount": s.PeerCount(), + }).Warn("peer disconnected") + s.discovery.BackFill(drop.peer.NetAddr().String()) + } + // else the peer is already gone, which can happen + // because we have two goroutines sending signals here } } } @@ -187,22 +191,33 @@ func (s *Server) startProtocol(p Peer) { "id": p.Version().Nonce, }).Info("started protocol") - s.requestHeaders(p) + err := s.requestHeaders(p) + if err != nil { + p.Disconnect(err) + return + } timer := time.NewTimer(s.ProtoTickInterval) for { select { - case err := <-p.Done(): - s.unregister <- peerDrop{p, err} - return + case err = <-p.Done(): + // time to stop case m := <-s.addrReq: - p.WriteMsg(m) + err = p.WriteMsg(m) case <-timer.C: // Try to sync in headers and block with the peer if his block height is higher then ours. if p.Version().StartHeight > s.chain.BlockHeight() { - s.requestBlocks(p) + err = s.requestBlocks(p) } - timer.Reset(s.ProtoTickInterval) + if err == nil { + timer.Reset(s.ProtoTickInterval) + } + } + if err != nil { + s.unregister <- peerDrop{p, err} + timer.Stop() + p.Disconnect(err) + return } } } @@ -279,16 +294,16 @@ func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error { // requestHeaders will send a getheaders message to the peer. // The peer will respond with headers op to a count of 2000. -func (s *Server) requestHeaders(p Peer) { +func (s *Server) requestHeaders(p Peer) error { start := []util.Uint256{s.chain.CurrentHeaderHash()} payload := payload.NewGetBlocks(start, util.Uint256{}) - p.WriteMsg(NewMessage(s.Net, CMDGetHeaders, payload)) + return p.WriteMsg(NewMessage(s.Net, CMDGetHeaders, payload)) } // requestBlocks will send a getdata message to the peer // to sync up in blocks. A maximum of maxBlockBatch will // send at once. -func (s *Server) requestBlocks(p Peer) { +func (s *Server) requestBlocks(p Peer) error { var ( hashes []util.Uint256 hashStart = s.chain.BlockHeight() + 1 @@ -301,10 +316,11 @@ func (s *Server) requestBlocks(p Peer) { } if len(hashes) > 0 { payload := payload.NewInventory(payload.BlockType, hashes) - p.WriteMsg(NewMessage(s.Net, CMDGetData, payload)) + return p.WriteMsg(NewMessage(s.Net, CMDGetData, payload)) } else if s.chain.HeaderHeight() < p.Version().StartHeight { - s.requestHeaders(p) + return s.requestHeaders(p) } + return nil } // handleMessage will process the given message. diff --git a/pkg/network/tcp_peer.go b/pkg/network/tcp_peer.go index 2ebc12078..10dac0a82 100644 --- a/pkg/network/tcp_peer.go +++ b/pkg/network/tcp_peer.go @@ -136,7 +136,12 @@ func (p *TCPPeer) Done() chan error { // Disconnect will fill the peer's done channel with the given error. func (p *TCPPeer) Disconnect(err error) { p.conn.Close() - p.done <- err + select { + case p.done <- err: + // one message to the queue + default: + // the other side may already be gone, it's OK + } } // Version implements the Peer interface. diff --git a/pkg/network/tcp_transport.go b/pkg/network/tcp_transport.go index 82ea5f351..8f61b7607 100644 --- a/pkg/network/tcp_transport.go +++ b/pkg/network/tcp_transport.go @@ -75,21 +75,19 @@ func (t *TCPTransport) handleConn(conn net.Conn) { err error ) - defer func() { - p.Disconnect(err) - }() - t.server.register <- p for { msg := &Message{} if err = msg.Decode(p.conn); err != nil { - return + break } if err = t.server.handleMessage(p, msg); err != nil { - return + break } } + t.server.unregister <- peerDrop{p, err} + p.Disconnect(err) } // Close implements the Transporter interface.