tweaked TCP transport + finished version + verack.

This commit is contained in:
anthdm 2018-02-01 14:53:49 +01:00
parent 63072ebe75
commit b416a51db7
2 changed files with 50 additions and 38 deletions

View file

@ -9,7 +9,6 @@ import (
type Peer interface {
id() uint32
addr() util.Endpoint
verack() bool
disconnect()
callVersion(*Message)
callGetaddr(*Message)
@ -20,7 +19,6 @@ type Peer interface {
type LocalPeer struct {
s *Server
nonce uint32
isVerack bool
endpoint util.Endpoint
}
@ -39,6 +37,5 @@ func (p *LocalPeer) callGetaddr(msg *Message) {
}
func (p *LocalPeer) id() uint32 { return p.nonce }
func (p *LocalPeer) verack() bool { return p.isVerack }
func (p *LocalPeer) addr() util.Endpoint { return p.endpoint }
func (p *LocalPeer) disconnect() {}

View file

@ -22,6 +22,7 @@ func listenTCP(s *Server, port int) error {
if err != nil {
return err
}
go handleConnection(s, conn)
}
}
@ -54,8 +55,10 @@ func handleConnection(s *Server, conn net.Conn) {
s.unregister <- peer
}()
// Start a goroutine that will handle all writes to the registered peer.
// Start a goroutine that will handle all outgoing messages.
go peer.writeLoop()
// Start a goroutine that will handle all incomming messages.
go handleMessage(s, peer)
// Read from the connection and decode it into a Message ready for processing.
buf := make([]byte, 1024)
@ -71,12 +74,20 @@ func handleConnection(s *Server, conn net.Conn) {
s.logger.Printf("decode error %s", err)
break
}
handleMessage(msg, s, peer)
peer.receive <- msg
}
}
// handleMessage hands the message received from a TCP connection over to the server.
func handleMessage(msg *Message, s *Server, p *TCPPeer) {
func handleMessage(s *Server, p *TCPPeer) {
// Disconnect the peer when we break out of the loop.
defer func() {
p.disconnect()
}()
for {
msg := <-p.receive
command := msg.commandType()
s.logger.Printf("IN :: %d :: %s :: %v", p.id(), command, msg)
@ -84,9 +95,16 @@ func handleMessage(msg *Message, s *Server, p *TCPPeer) {
switch command {
case cmdVersion:
resp := s.handleVersionCmd(msg, p)
p.isVerack = true
p.nonce = msg.Payload.(*payload.Version).Nonce
p.send <- resp
// after sending our version we want a "verack" and nothing else.
msg := <-p.receive
if msg.commandType() != cmdVerack {
break
}
// we can start the protocol now.
go s.sendLoop(p)
case cmdAddr:
s.handleAddrCmd(msg, p)
case cmdGetAddr:
@ -98,12 +116,13 @@ func handleMessage(msg *Message, s *Server, p *TCPPeer) {
case cmdConsensus:
case cmdTX:
case cmdVerack:
go s.sendLoop(p)
// disconnect the peer, verack should already be handled.
break
case cmdGetHeaders:
case cmdGetBlocks:
case cmdGetData:
case cmdHeaders:
default:
}
}
}
@ -118,8 +137,8 @@ type TCPPeer struct {
endpoint util.Endpoint
// channel to coordinate messages writen back to the connection.
send chan *Message
// whether this peers version was acknowledged.
isVerack bool
// channel to receive from underlying connection.
receive chan *Message
}
// NewTCPPeer returns a pointer to a TCP Peer.
@ -129,6 +148,7 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
return &TCPPeer{
conn: conn,
send: make(chan *Message),
receive: make(chan *Message),
endpoint: e,
s: s,
}
@ -148,11 +168,6 @@ func (p *TCPPeer) addr() util.Endpoint {
return p.endpoint
}
// verack implements the peer interface
func (p *TCPPeer) verack() bool {
return p.isVerack
}
// callGetaddr will send the "getaddr" command to the remote.
func (p *TCPPeer) callGetaddr(msg *Message) {
p.send <- msg