neo-go/pkg/network/tcp_transport.go
Roman Khimov 734338ad70 network: move Version sending to the tcp transport
We have a race between reader and writer goroutines for the same connection
that leads to handshake failures when reader is faster to read the incoming
version (and try to reply to it) than writer is to write our own Version:

WARN[0000] peer disconnected                             addr="172.200.0.4:20334" peerCount=5 reason="invalid handshake: tried to send VersionAck, but didn't send Version yet

Fix it by moving Version sending before the reader loop starts.
2019-11-29 11:05:42 +03:00

109 lines
2.1 KiB
Go

package network
import (
"net"
"regexp"
"time"
"github.com/CityOfZion/neo-go/pkg/io"
log "github.com/sirupsen/logrus"
)
// TCPTransport allows network communication over TCP.
type TCPTransport struct {
server *Server
listener net.Listener
bindAddr string
}
var reClosedNetwork = regexp.MustCompile(".* use of closed network connection")
// NewTCPTransport returns a new TCPTransport that will listen for
// new incoming peer connections.
func NewTCPTransport(s *Server, bindAddr string) *TCPTransport {
return &TCPTransport{
server: s,
bindAddr: bindAddr,
}
}
// Dial implements the Transporter interface.
func (t *TCPTransport) Dial(addr string, timeout time.Duration) error {
conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
return err
}
go t.handleConn(conn)
return nil
}
// Accept implements the Transporter interface.
func (t *TCPTransport) Accept() {
l, err := net.Listen("tcp", t.bindAddr)
if err != nil {
log.Fatalf("TCP listen error %s", err)
return
}
t.listener = l
for {
conn, err := l.Accept()
if err != nil {
log.Warnf("TCP accept error: %s", err)
if t.isCloseError(err) {
break
}
continue
}
go t.handleConn(conn)
}
}
func (t *TCPTransport) isCloseError(err error) bool {
if opErr, ok := err.(*net.OpError); ok {
if reClosedNetwork.Match([]byte(opErr.Error())) {
return true
}
}
return false
}
func (t *TCPTransport) handleConn(conn net.Conn) {
var (
p = NewTCPPeer(conn)
err error
)
t.server.register <- p
// When a new peer is connected we send out our version immediately.
if err := t.server.sendVersion(p); err != nil {
log.WithFields(log.Fields{
"addr": p.RemoteAddr(),
}).Error(err)
}
r := io.NewBinReaderFromIO(p.conn)
for {
msg := &Message{}
if err = msg.Decode(r); err != nil {
break
}
if err = t.server.handleMessage(p, msg); err != nil {
break
}
}
t.server.unregister <- peerDrop{p, err}
p.Disconnect(err)
}
// Close implements the Transporter interface.
func (t *TCPTransport) Close() {
t.listener.Close()
}
// Proto implements the Transporter interface.
func (t *TCPTransport) Proto() string {
return "tcp"
}