From 9dc30bb9e80295959fa56f6a8e6a8eb46549b516 Mon Sep 17 00:00:00 2001 From: anthdm Date: Wed, 31 Jan 2018 14:32:57 +0100 Subject: [PATCH 01/11] Set the listener of the server when opened. --- pkg/network/tcp.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go index de92a7205..3a8db8501 100644 --- a/pkg/network/tcp.go +++ b/pkg/network/tcp.go @@ -11,6 +11,8 @@ func listenTCP(s *Server, port string) error { return err } + s.listener = ln + for { conn, err := ln.Accept() if err != nil { From 861882ff83bfd69e60e90d48c5fefce822ad7252 Mon Sep 17 00:00:00 2001 From: anthdm Date: Wed, 31 Jan 2018 20:11:08 +0100 Subject: [PATCH 02/11] refactor server RPC. --- pkg/network/peer.go | 95 +++++++------- pkg/network/server.go | 263 ++++++++++++++++++------------------- pkg/network/server_test.go | 44 ++++--- pkg/network/tcp.go | 62 ++++++--- 4 files changed, 252 insertions(+), 212 deletions(-) diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 830844cad..af38fe142 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -1,8 +1,6 @@ package network import ( - "fmt" - "log" "net" "github.com/anthdm/neo-go/pkg/util" @@ -12,94 +10,94 @@ import ( // be backed by any concrete transport: local, HTTP, tcp. type Peer interface { id() uint32 - endpoint() util.Endpoint - send(*Message) + addr() util.Endpoint verack() bool - verify(uint32) disconnect() + callVersion(*Message) + callGetaddr(*Message) } -// LocalPeer is a peer without any transport, mainly used for testing. +// LocalPeer is the simplest kind of peer, mapped to a server in the +// same process-space. type LocalPeer struct { - _id uint32 - _verack bool - _endpoint util.Endpoint - _send chan *Message + s *Server + nonce uint32 + isVerack bool + endpoint util.Endpoint } // NewLocalPeer return a LocalPeer. -func NewLocalPeer() *LocalPeer { +func NewLocalPeer(s *Server) *LocalPeer { e, _ := util.EndpointFromString("1.1.1.1:1111") - return &LocalPeer{_endpoint: e} + return &LocalPeer{endpoint: e, s: s} } -func (p *LocalPeer) id() uint32 { return p._id } -func (p *LocalPeer) verack() bool { return p._verack } -func (p *LocalPeer) endpoint() util.Endpoint { return p._endpoint } -func (p *LocalPeer) disconnect() {} - -func (p *LocalPeer) send(msg *Message) { - p._send <- msg +func (p *LocalPeer) callVersion(msg *Message) { + p.s.handleVersionCmd(msg, p) } -func (p *LocalPeer) verify(id uint32) { - fmt.Println(id) - p._verack = true - p._id = id +func (p *LocalPeer) callGetaddr(msg *Message) { + p.s.handleGetaddrCmd(msg, p) } +func (p *LocalPeer) id() uint32 { return p.nonce } +func (p *LocalPeer) verack() bool { return p.isVerack } +func (p *LocalPeer) addr() util.Endpoint { return p.endpoint } +func (p *LocalPeer) disconnect() {} + // TCPPeer represents a remote node, backed by TCP transport. type TCPPeer struct { - _id uint32 + s *Server + // nonce (id) of the peer. + nonce uint32 // underlying TCP connection conn net.Conn // host and port information about this peer. - _endpoint util.Endpoint + endpoint util.Endpoint // channel to coordinate messages writen back to the connection. - _send chan *Message + send chan *Message // whether this peers version was acknowledged. - _verack bool + isVerack bool } // NewTCPPeer returns a pointer to a TCP Peer. -func NewTCPPeer(conn net.Conn) *TCPPeer { +func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { e, _ := util.EndpointFromString(conn.RemoteAddr().String()) return &TCPPeer{ - conn: conn, - _send: make(chan *Message), - _endpoint: e, + conn: conn, + send: make(chan *Message), + endpoint: e, + s: s, } } +func (p *TCPPeer) callVersion(msg *Message) { + p.send <- msg +} + // id implements the peer interface func (p *TCPPeer) id() uint32 { - return p._id + return p.nonce } // endpoint implements the peer interface -func (p *TCPPeer) endpoint() util.Endpoint { - return p._endpoint +func (p *TCPPeer) addr() util.Endpoint { + return p.endpoint } // verack implements the peer interface func (p *TCPPeer) verack() bool { - return p._verack + return p.isVerack } -// verify implements the peer interface -func (p *TCPPeer) verify(id uint32) { - p._id = id - p._verack = true -} - -// send implements the peer interface -func (p *TCPPeer) send(msg *Message) { - p._send <- msg +// callGetaddr will send the "getaddr" command to the remote. +func (p *TCPPeer) callGetaddr(msg *Message) { + p.send <- msg } func (p *TCPPeer) disconnect() { - close(p._send) + close(p.send) p.conn.Close() } @@ -114,12 +112,13 @@ func (p *TCPPeer) writeLoop() { }() for { - msg := <-p._send + msg := <-p.send - rpcLogger.Printf("[SERVER] :: OUT :: %s :: %+v", msg.commandType(), msg.Payload) + p.s.logger.Printf("OUT :: %s :: %+v", msg.commandType(), msg.Payload) + // should we disconnect here? if err := msg.encode(p.conn); err != nil { - log.Printf("encode error: %s", err) + p.s.logger.Printf("encode error: %s", err) } } } diff --git a/pkg/network/server.go b/pkg/network/server.go index 30a003bfb..36dd70fa9 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -1,7 +1,6 @@ package network import ( - "errors" "fmt" "log" "net" @@ -9,7 +8,6 @@ import ( "strconv" "time" - "github.com/anthdm/neo-go/pkg/core" "github.com/anthdm/neo-go/pkg/network/payload" "github.com/anthdm/neo-go/pkg/util" ) @@ -20,11 +18,7 @@ const ( // official ports according to the protocol. portMainNet = 10333 portTestNet = 20333 -) - -var ( - // rpcLogger used for debugging RPC messages between nodes. - rpcLogger = log.New(os.Stdout, "", 0) + maxPeers = 50 ) type messageTuple struct { @@ -35,13 +29,10 @@ type messageTuple struct { // Server is the representation of a full working NEO TCP node. type Server struct { logger *log.Logger - // id of the server id uint32 - // the port the TCP listener is listening on. port uint16 - // userAgent of the server. userAgent string // The "magic" mode the server is currently running on. @@ -50,26 +41,29 @@ type Server struct { net NetMode // map that holds all connected peers to this server. peers map[Peer]bool - - register chan Peer + // channel for handling new registerd peers. + register chan Peer + // channel for safely removing and disconnecting peers. unregister chan Peer - // channel for coordinating messages. message chan messageTuple - // channel used to gracefull shutdown the server. quit chan struct{} - // Whether this server will receive and forward messages. relay bool - // TCP listener of the server listener net.Listener + + // RPC channels + versionCh chan versionTuple + getaddrCh chan getaddrTuple + invCh chan invTuple + addrCh chan addrTuple } // NewServer returns a pointer to a new server. func NewServer(net NetMode) *Server { - logger := log.New(os.Stdout, "NEO SERVER :: ", 0) + logger := log.New(os.Stdout, "[NEO SERVER] :: ", 0) if net != ModeTestNet && net != ModeMainNet && net != ModeDevNet { logger.Fatalf("invalid network mode %d", net) @@ -83,9 +77,13 @@ func NewServer(net NetMode) *Server { register: make(chan Peer), unregister: make(chan Peer), message: make(chan messageTuple), - relay: true, + relay: true, // currently relay is not handled. net: net, quit: make(chan struct{}), + versionCh: make(chan versionTuple), + getaddrCh: make(chan getaddrTuple), + invCh: make(chan invTuple), + addrCh: make(chan addrTuple), } return s @@ -131,30 +129,62 @@ func (s *Server) shutdown() { func (s *Server) loop() { for { select { + // When a new connection is been established, (by this server or remote node) + // its peer will be received on this channel. + // Any peer registration must happen via this channel. case peer := <-s.register: - // When a new connection is been established, (by this server or remote node) - // its peer will be received on this channel. - // Any peer registration must happen via this channel. - s.logger.Printf("peer registered from address %s", peer.endpoint()) - s.peers[peer] = true - s.handlePeerConnected(peer) + if len(s.peers) < maxPeers { + s.logger.Printf("peer registered from address %s", peer.addr()) + s.peers[peer] = true + s.handlePeerConnected(peer) + } + // Unregister should take care of all the cleanup that has to be made. case peer := <-s.unregister: - // unregister should take care of all the cleanup that has to be made. if _, ok := s.peers[peer]; ok { peer.disconnect() delete(s.peers, peer) - s.logger.Printf("peer %s disconnected", peer.endpoint()) + s.logger.Printf("peer %s disconnected", peer.addr()) } - case tuple := <-s.message: - // When a remote node sends data over its connection it will be received - // on this channel. - // All errors encountered should be return and handled here. - if err := s.processMessage(tuple.msg, tuple.peer); err != nil { - s.logger.Fatalf("failed to process message: %s", err) - s.unregister <- tuple.peer + // Process the received version and respond with a verack. + case t := <-s.versionCh: + if s.id == t.request.Nonce { + t.peer.disconnect() } + if t.peer.addr().Port != t.request.Port { + t.peer.disconnect() + } + t.response <- newMessage(ModeDevNet, cmdVerack, nil) + + // Process the getaddr cmd. + case t := <-s.getaddrCh: + t.response <- &Message{} // just for now. + + // Process the addr cmd. Register peer will handle the maxPeers connected. + case t := <-s.addrCh: + for _, addr := range t.request.Addrs { + if !s.peerAlreadyConnected(addr.Addr) { + // TODO: this is not transport abstracted. + go connectToRemoteNode(s, addr.Addr.String()) + } + } + t.response <- true + + // Process inventories cmd. + case t := <-s.invCh: + if !t.request.Type.Valid() { + t.peer.disconnect() + break + } + if len(t.request.Hashes) == 0 { + t.peer.disconnect() + break + } + + payload := payload.NewInventory(t.request.Type, t.request.Hashes) + msg := newMessage(s.net, cmdGetData, payload) + t.response <- msg case <-s.quit: s.shutdown() @@ -162,135 +192,100 @@ func (s *Server) loop() { } } -// processMessage processes the message received from the peer. -func (s *Server) processMessage(msg *Message, peer Peer) error { - command := msg.commandType() - - rpcLogger.Printf("[NODE %d] :: IN :: %s :: %+v", peer.id(), command, msg.Payload) - - // Disconnect if the remote is sending messages other then version - // if we didn't verack this peer. - if !peer.verack() && command != cmdVersion { - return errors.New("version noack") - } - - switch command { - case cmdVersion: - return s.handleVersionCmd(msg.Payload.(*payload.Version), peer) - case cmdVerack: - case cmdGetAddr: - // return s.handleGetAddrCmd(msg, peer) - case cmdAddr: - return s.handleAddrCmd(msg.Payload.(*payload.AddressList), peer) - case cmdGetHeaders: - case cmdHeaders: - case cmdGetBlocks: - case cmdInv: - return s.handleInvCmd(msg.Payload.(*payload.Inventory), peer) - case cmdGetData: - case cmdBlock: - return s.handleBlockCmd(msg.Payload.(*core.Block), peer) - case cmdTX: - case cmdConsensus: - default: - return fmt.Errorf("invalid RPC command received: %s", command) - } - - return nil -} - // When a new peer is connected we send our version. // No further communication should be made before both sides has received // the versions of eachother. -func (s *Server) handlePeerConnected(peer Peer) { - // TODO get heigth of block when thats implemented. +func (s *Server) handlePeerConnected(p Peer) { + // TODO: get the blockheight of this server once core implemented this. payload := payload.NewVersion(s.id, s.port, s.userAgent, 0, s.relay) msg := newMessage(s.net, cmdVersion, payload) - - peer.send(msg) + p.callVersion(msg) } -// Version declares the server's version. -func (s *Server) handleVersionCmd(v *payload.Version, peer Peer) error { - if s.id == v.Nonce { - return errors.New("remote nonce equal to server id") - } - - if peer.endpoint().Port != v.Port { - return errors.New("port mismatch") - } - - // we respond with a verack, we successfully received peer's version - // at this point. - peer.verify(v.Nonce) - verackMsg := newMessage(s.net, cmdVerack, nil) - peer.send(verackMsg) - - go s.sendLoop(peer) - - return nil +type versionTuple struct { + peer Peer + request *payload.Version + response chan *Message } -// When the remote node reveals its known peers we try to connect to all of them. -func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer Peer) error { - for _, addr := range addrList.Addrs { - if !s.peerAlreadyConnected(addr.Addr) { - go connectToRemoteNode(s, addr.Addr.String()) - } - } - return nil -} - -func (s *Server) handleInvCmd(inv *payload.Inventory, peer Peer) error { - if !inv.Type.Valid() { - return fmt.Errorf("invalid inventory type: %s", inv.Type) - } - if len(inv.Hashes) == 0 { - return nil +func (s *Server) handleVersionCmd(msg *Message, p Peer) *Message { + t := versionTuple{ + peer: p, + request: msg.Payload.(*payload.Version), + response: make(chan *Message), } - payload := payload.NewInventory(inv.Type, inv.Hashes) - msg := newMessage(s.net, cmdGetData, payload) + s.versionCh <- t - peer.send(msg) - - return nil + return <-t.response } -func (s *Server) handleBlockCmd(block *core.Block, peer Peer) error { - fmt.Println("Block received") - fmt.Printf("%+v\n", block) - return nil +type getaddrTuple struct { + peer Peer + request *Message + response chan *Message } +func (s *Server) handleGetaddrCmd(msg *Message, p Peer) *Message { + t := getaddrTuple{ + peer: p, + request: msg, + response: make(chan *Message), + } + + s.getaddrCh <- t + + return <-t.response +} + +type invTuple struct { + peer Peer + request *payload.Inventory + response chan *Message +} + +func (s *Server) handleInvCmd(msg *Message, p Peer) *Message { + t := invTuple{ + request: msg.Payload.(*payload.Inventory), + response: make(chan *Message), + } + + s.invCh <- t + + return <-t.response +} + +type addrTuple struct { + request *payload.AddressList + response chan bool +} + +func (s *Server) handleAddrCmd(msg *Message, p Peer) bool { + t := addrTuple{ + request: msg.Payload.(*payload.AddressList), + response: make(chan bool), + } + + s.addrCh <- t + + return <-t.response +} + +// check if the addr is already connected to the server. func (s *Server) peerAlreadyConnected(addr net.Addr) bool { - // TODO: check for race conditions - //s.mtx.RLock() - //defer s.mtx.RUnlock() - - // What about ourself ^^ - for peer := range s.peers { - if peer.endpoint().String() == addr.String() { + if peer.addr().String() == addr.String() { return true } } return false } -// After receiving the "getaddr" the server needs to respond with an "addr" message. -// providing information about the other nodes in the network. -// e.g. this server's connected peers. -func (s *Server) handleGetAddrCmd(msg *Message, peer *Peer) error { - // TODO - return nil -} - func (s *Server) sendLoop(peer Peer) { // TODO: check if this peer is still connected. for { getaddrMsg := newMessage(s.net, cmdGetAddr, nil) - peer.send(getaddrMsg) + peer.callGetaddr(getaddrMsg) time.Sleep(120 * time.Second) } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index 5445c3168..ad07d93e1 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -2,25 +2,39 @@ package network import ( "testing" + + "github.com/anthdm/neo-go/pkg/network/payload" ) func TestHandleVersion(t *testing.T) { - // s := NewServer(ModeDevNet) - // go s.Start(":3000", nil) + s := NewServer(ModeDevNet) + go s.loop() - // p := NewLocalPeer() - // s.register <- p + p := NewLocalPeer(s) - // version := payload.NewVersion(1337, p.endpoint().Port, "/NEO:0.0.0/.", 0, true) - // s.handleVersionCmd(version, p) + version := payload.NewVersion(1337, p.addr().Port, "/NEO:0.0.0/", 0, true) + msg := newMessage(ModeDevNet, cmdVersion, version) - // if len(s.peers) != 1 { - // t.Fatalf("expecting the server to have %d peers got %d", 1, len(s.peers)) - // } - // if p.id() != 1337 { - // t.Fatalf("expecting peer's id to be %d got %d", 1337, p._id) - // } - // if !p.verack() { - // t.Fatal("expecting peer to be verified") - // } + resp := s.handleVersionCmd(msg, p) + if resp.commandType() != cmdVerack { + t.Fatalf("expected response message to be verack got %s", resp.commandType()) + } + if resp.Payload != nil { + t.Fatal("verack payload should be nil") + } +} + +func TestHandleAddrCmd(t *testing.T) { + // todo +} + +func TestHandleGetAddrCmd(t *testing.T) { + // todo +} + +func TestHandleInv(t *testing.T) { + // todo +} +func TestHandleBlockCmd(t *testing.T) { + // todo } diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go index 3a8db8501..f9ffc39e0 100644 --- a/pkg/network/tcp.go +++ b/pkg/network/tcp.go @@ -1,8 +1,10 @@ package network import ( - "io" + "bytes" "net" + + "github.com/anthdm/neo-go/pkg/network/payload" ) func listenTCP(s *Server, port string) error { @@ -31,7 +33,6 @@ func connectToRemoteNode(s *Server, address string) { } return } - s.logger.Printf("connected to %s", conn.RemoteAddr()) go handleConnection(s, conn) } @@ -42,7 +43,7 @@ func connectToSeeds(s *Server, addrs []string) { } func handleConnection(s *Server, conn net.Conn) { - peer := NewTCPPeer(conn) + peer := NewTCPPeer(conn, s) s.register <- peer // remove the peer from connected peers and cleanup the connection. @@ -54,20 +55,51 @@ func handleConnection(s *Server, conn net.Conn) { // Start a goroutine that will handle all writes to the registered peer. go peer.writeLoop() - // Read from the connection and decode it into an RPCMessage and - // tell the server there is message available for proccesing. + // Read from the connection and decode it into a Message ready for processing. + buf := make([]byte, 1024) for { - msg := &Message{} - if err := msg.decode(conn); err != nil { - // remote connection probably closed. - if err == io.EOF { - s.logger.Printf("conn read error: %s", err) - break - } - // remove this node on any decode errors. - s.logger.Printf("RPC :: decode error %s", err) + _, err := conn.Read(buf) + if err != nil { + s.logger.Printf("conn read error: %s", err) break } - s.message <- messageTuple{peer, msg} + + msg := &Message{} + if err := msg.decode(bytes.NewReader(buf)); err != nil { + s.logger.Printf("decode error %s", err) + break + } + handleMessage(msg, s, peer) + } +} + +func handleMessage(msg *Message, s *Server, p *TCPPeer) { + command := msg.commandType() + + s.logger.Printf("%d :: IN :: %s :: %v", p.id(), command, msg) + + switch command { + case cmdVersion: + resp := s.handleVersionCmd(msg, p) + p.isVerack = true + p.nonce = msg.Payload.(*payload.Version).Nonce + p.send <- resp + case cmdAddr: + s.handleAddrCmd(msg, p) + case cmdGetAddr: + s.handleGetaddrCmd(msg, p) + case cmdInv: + resp := s.handleInvCmd(msg, p) + p.send <- resp + case cmdBlock: + case cmdConsensus: + case cmdTX: + case cmdVerack: + go s.sendLoop(p) + case cmdGetHeaders: + case cmdGetBlocks: + case cmdGetData: + case cmdHeaders: + default: } } From 626a82b93ec066ed9e679d221fbf79d3a1731bbf Mon Sep 17 00:00:00 2001 From: anthdm Date: Wed, 31 Jan 2018 22:14:13 +0100 Subject: [PATCH 03/11] deleted proxy functions + moved TCPPeer to tcp file --- pkg/network/peer.go | 80 -------------------------- pkg/network/server.go | 131 +++++++++--------------------------------- pkg/network/tcp.go | 82 +++++++++++++++++++++++++- 3 files changed, 109 insertions(+), 184 deletions(-) diff --git a/pkg/network/peer.go b/pkg/network/peer.go index af38fe142..025f4b7d8 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -1,8 +1,6 @@ package network import ( - "net" - "github.com/anthdm/neo-go/pkg/util" ) @@ -44,81 +42,3 @@ func (p *LocalPeer) id() uint32 { return p.nonce } func (p *LocalPeer) verack() bool { return p.isVerack } func (p *LocalPeer) addr() util.Endpoint { return p.endpoint } func (p *LocalPeer) disconnect() {} - -// TCPPeer represents a remote node, backed by TCP transport. -type TCPPeer struct { - s *Server - // nonce (id) of the peer. - nonce uint32 - // underlying TCP connection - conn net.Conn - // host and port information about this peer. - endpoint util.Endpoint - // channel to coordinate messages writen back to the connection. - send chan *Message - // whether this peers version was acknowledged. - isVerack bool -} - -// NewTCPPeer returns a pointer to a TCP Peer. -func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { - e, _ := util.EndpointFromString(conn.RemoteAddr().String()) - - return &TCPPeer{ - conn: conn, - send: make(chan *Message), - endpoint: e, - s: s, - } -} - -func (p *TCPPeer) callVersion(msg *Message) { - p.send <- msg -} - -// id implements the peer interface -func (p *TCPPeer) id() uint32 { - return p.nonce -} - -// endpoint implements the peer interface -func (p *TCPPeer) addr() util.Endpoint { - return p.endpoint -} - -// verack implements the peer interface -func (p *TCPPeer) verack() bool { - return p.isVerack -} - -// callGetaddr will send the "getaddr" command to the remote. -func (p *TCPPeer) callGetaddr(msg *Message) { - p.send <- msg -} - -func (p *TCPPeer) disconnect() { - close(p.send) - p.conn.Close() -} - -// writeLoop writes messages to the underlying TCP connection. -// A goroutine writeLoop is started for each connection. -// There should be at most one writer to a connection executing -// all writes from this goroutine. -func (p *TCPPeer) writeLoop() { - // clean up the connection. - defer func() { - p.conn.Close() - }() - - for { - msg := <-p.send - - p.s.logger.Printf("OUT :: %s :: %+v", msg.commandType(), msg.Payload) - - // should we disconnect here? - if err := msg.encode(p.conn); err != nil { - p.s.logger.Printf("encode error: %s", err) - } - } -} diff --git a/pkg/network/server.go b/pkg/network/server.go index 36dd70fa9..7e73c57d7 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -53,12 +53,6 @@ type Server struct { relay bool // TCP listener of the server listener net.Listener - - // RPC channels - versionCh chan versionTuple - getaddrCh chan getaddrTuple - invCh chan invTuple - addrCh chan addrTuple } // NewServer returns a pointer to a new server. @@ -80,10 +74,6 @@ func NewServer(net NetMode) *Server { relay: true, // currently relay is not handled. net: net, quit: make(chan struct{}), - versionCh: make(chan versionTuple), - getaddrCh: make(chan getaddrTuple), - invCh: make(chan invTuple), - addrCh: make(chan addrTuple), } return s @@ -147,45 +137,6 @@ func (s *Server) loop() { s.logger.Printf("peer %s disconnected", peer.addr()) } - // Process the received version and respond with a verack. - case t := <-s.versionCh: - if s.id == t.request.Nonce { - t.peer.disconnect() - } - if t.peer.addr().Port != t.request.Port { - t.peer.disconnect() - } - t.response <- newMessage(ModeDevNet, cmdVerack, nil) - - // Process the getaddr cmd. - case t := <-s.getaddrCh: - t.response <- &Message{} // just for now. - - // Process the addr cmd. Register peer will handle the maxPeers connected. - case t := <-s.addrCh: - for _, addr := range t.request.Addrs { - if !s.peerAlreadyConnected(addr.Addr) { - // TODO: this is not transport abstracted. - go connectToRemoteNode(s, addr.Addr.String()) - } - } - t.response <- true - - // Process inventories cmd. - case t := <-s.invCh: - if !t.request.Type.Valid() { - t.peer.disconnect() - break - } - if len(t.request.Hashes) == 0 { - t.peer.disconnect() - break - } - - payload := payload.NewInventory(t.request.Type, t.request.Hashes) - msg := newMessage(s.net, cmdGetData, payload) - t.response <- msg - case <-s.quit: s.shutdown() } @@ -202,73 +153,47 @@ func (s *Server) handlePeerConnected(p Peer) { p.callVersion(msg) } -type versionTuple struct { - peer Peer - request *payload.Version - response chan *Message -} - func (s *Server) handleVersionCmd(msg *Message, p Peer) *Message { - t := versionTuple{ - peer: p, - request: msg.Payload.(*payload.Version), - response: make(chan *Message), + version := msg.Payload.(*payload.Version) + if s.id == version.Nonce { + p.disconnect() + return nil } - - s.versionCh <- t - - return <-t.response -} - -type getaddrTuple struct { - peer Peer - request *Message - response chan *Message + if p.addr().Port != version.Port { + p.disconnect() + return nil + } + return newMessage(ModeDevNet, cmdVerack, nil) } func (s *Server) handleGetaddrCmd(msg *Message, p Peer) *Message { - t := getaddrTuple{ - peer: p, - request: msg, - response: make(chan *Message), - } - - s.getaddrCh <- t - - return <-t.response -} - -type invTuple struct { - peer Peer - request *payload.Inventory - response chan *Message + return nil } func (s *Server) handleInvCmd(msg *Message, p Peer) *Message { - t := invTuple{ - request: msg.Payload.(*payload.Inventory), - response: make(chan *Message), + inv := msg.Payload.(*payload.Inventory) + if !inv.Type.Valid() { + p.disconnect() + return nil + } + if len(inv.Hashes) == 0 { + p.disconnect() + return nil } - s.invCh <- t - - return <-t.response + payload := payload.NewInventory(inv.Type, inv.Hashes) + resp := newMessage(s.net, cmdGetData, payload) + return resp } -type addrTuple struct { - request *payload.AddressList - response chan bool -} - -func (s *Server) handleAddrCmd(msg *Message, p Peer) bool { - t := addrTuple{ - request: msg.Payload.(*payload.AddressList), - response: make(chan bool), +func (s *Server) handleAddrCmd(msg *Message, p Peer) { + addrList := msg.Payload.(*payload.AddressList) + for _, addr := range addrList.Addrs { + if !s.peerAlreadyConnected(addr.Addr) { + // TODO: this is not transport abstracted. + go connectToRemoteNode(s, addr.Addr.String()) + } } - - s.addrCh <- t - - return <-t.response } // check if the addr is already connected to the server. diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go index f9ffc39e0..e8421265e 100644 --- a/pkg/network/tcp.go +++ b/pkg/network/tcp.go @@ -5,6 +5,7 @@ import ( "net" "github.com/anthdm/neo-go/pkg/network/payload" + "github.com/anthdm/neo-go/pkg/util" ) func listenTCP(s *Server, port string) error { @@ -76,7 +77,7 @@ func handleConnection(s *Server, conn net.Conn) { func handleMessage(msg *Message, s *Server, p *TCPPeer) { command := msg.commandType() - s.logger.Printf("%d :: IN :: %s :: %v", p.id(), command, msg) + s.logger.Printf("IN :: %d :: %s :: %v", p.id(), command, msg) switch command { case cmdVersion: @@ -103,3 +104,82 @@ func handleMessage(msg *Message, s *Server, p *TCPPeer) { default: } } + +// TCPPeer represents a remote node, backed by TCP transport. +type TCPPeer struct { + s *Server + // nonce (id) of the peer. + nonce uint32 + // underlying TCP connection + conn net.Conn + // host and port information about this peer. + endpoint util.Endpoint + // channel to coordinate messages writen back to the connection. + send chan *Message + // whether this peers version was acknowledged. + isVerack bool +} + +// NewTCPPeer returns a pointer to a TCP Peer. +func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { + e, _ := util.EndpointFromString(conn.RemoteAddr().String()) + + return &TCPPeer{ + conn: conn, + send: make(chan *Message), + endpoint: e, + s: s, + } +} + +func (p *TCPPeer) callVersion(msg *Message) { + p.send <- msg +} + +// id implements the peer interface +func (p *TCPPeer) id() uint32 { + return p.nonce +} + +// endpoint implements the peer interface +func (p *TCPPeer) addr() util.Endpoint { + return p.endpoint +} + +// verack implements the peer interface +func (p *TCPPeer) verack() bool { + return p.isVerack +} + +// callGetaddr will send the "getaddr" command to the remote. +func (p *TCPPeer) callGetaddr(msg *Message) { + p.send <- msg +} + +// disconnect closes the send channel and the underlying connection. +func (p *TCPPeer) disconnect() { + close(p.send) + p.conn.Close() +} + +// writeLoop writes messages to the underlying TCP connection. +// A goroutine writeLoop is started for each connection. +// There should be at most one writer to a connection executing +// all writes from this goroutine. +func (p *TCPPeer) writeLoop() { + // clean up the connection. + defer func() { + p.conn.Close() + }() + + for { + msg := <-p.send + + p.s.logger.Printf("OUT :: %s :: %+v", msg.commandType(), msg.Payload) + + // should we disconnect here? + if err := msg.encode(p.conn); err != nil { + p.s.logger.Printf("encode error: %s", err) + } + } +} From 572bd813cd163ef8bc196006cff6c1f72b620d8d Mon Sep 17 00:00:00 2001 From: anthdm Date: Thu, 1 Feb 2018 08:18:38 +0100 Subject: [PATCH 04/11] implemented the start of JSON-RPC --- pkg/network/rpc.go | 129 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 pkg/network/rpc.go diff --git a/pkg/network/rpc.go b/pkg/network/rpc.go new file mode 100644 index 000000000..c1156fa65 --- /dev/null +++ b/pkg/network/rpc.go @@ -0,0 +1,129 @@ +package network + +import ( + "encoding/json" + "fmt" + "net/http" +) + +const ( + rpcPortMainNet = 20332 + rpcPortTestNet = 10332 + rpcVersion = "2.0" + + // error response messages + methodNotFound = "Method not found" + parseError = "Parse error" +) + +// Each NEO node has a set of optional APIs for accessing blockchain +// data and making things easier for development of blockchain apps. +// APIs are provided via JSON-RPC , comm at bottom layer is with http/https protocol. + +// listenHTTP creates an ingress bridge from the outside world to the passed +// server, by installing handlers for all the necessary RPCs to the passed mux. +func listenHTTP(s *Server, port int) { + api := &API{s} + p := fmt.Sprintf(":%d", port) + s.logger.Printf("serving RPC on %d", port) + s.logger.Printf("%s", http.ListenAndServe(p, api)) +} + +// API serves JSON-RPC. +type API struct { + s *Server +} + +func (s *API) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Official nodes respond a parse error if the method is not POST. + // Instead of returning a decent response for this, let's do the same. + if r.Method != "POST" { + writeError(w, 0, 0, parseError) + } + + var req Request + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, 0, 0, parseError) + return + } + defer r.Body.Close() + + if req.Version != rpcVersion { + writeJSON(w, http.StatusBadRequest, nil) + return + } + + switch req.Method { + case "getconnectioncount": + if err := s.getConnectionCount(w, &req); err != nil { + writeError(w, 0, 0, parseError) + return + } + case "getblockcount": + case "getbestblockhash": + default: + writeError(w, 0, 0, methodNotFound) + } +} + +// This is an Example on how we could handle incomming RPC requests. +func (s *API) getConnectionCount(w http.ResponseWriter, req *Request) error { + count := s.s.peerCount() + + resp := ConnectionCountResponse{ + Version: rpcVersion, + Result: count, + ID: 1, + } + + return writeJSON(w, http.StatusOK, resp) +} + +// writeError returns a JSON error with given parameters. All error HTTP +// status codes are 200. According to the official API. +func writeError(w http.ResponseWriter, id, code int, msg string) error { + resp := RequestError{ + Version: rpcVersion, + ID: id, + Error: Error{ + Code: code, + Message: msg, + }, + } + + return writeJSON(w, http.StatusOK, resp) +} + +func writeJSON(w http.ResponseWriter, status int, v interface{}) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + return json.NewEncoder(w).Encode(v) +} + +// Request is an object received through JSON-RPC from the client. +type Request struct { + Version string `json:"jsonrpc"` + Method string `json:"method"` + Params []string `json:"params"` + ID int `json:"id"` +} + +// ConnectionCountResponse .. +type ConnectionCountResponse struct { + Version string `json:"jsonrpc"` + Result int `json:"result"` + ID int `json:"id"` +} + +// RequestError .. +type RequestError struct { + Version string `json:"jsonrpc"` + ID int `json:"id"` + Error Error `json:"error"` +} + +// Error holds information about an RCP error. +type Error struct { + Code int `json:"code"` + Message string `json:"message"` +} From 45ac0d237a3e69949c66285f7ddabd1d39138474 Mon Sep 17 00:00:00 2001 From: anthdm Date: Thu, 1 Feb 2018 08:19:12 +0100 Subject: [PATCH 05/11] changed string port args to int --- pkg/network/tcp.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go index e8421265e..a1280a6b0 100644 --- a/pkg/network/tcp.go +++ b/pkg/network/tcp.go @@ -2,14 +2,15 @@ package network import ( "bytes" + "fmt" "net" "github.com/anthdm/neo-go/pkg/network/payload" "github.com/anthdm/neo-go/pkg/util" ) -func listenTCP(s *Server, port string) error { - ln, err := net.Listen("tcp", port) +func listenTCP(s *Server, port int) error { + ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return err } @@ -74,6 +75,7 @@ func handleConnection(s *Server, conn net.Conn) { } } +// handleMessage hands the message received from a TCP connection over to the server. func handleMessage(msg *Message, s *Server, p *TCPPeer) { command := msg.commandType() From 0e22ae09bd490fd1d22e6d030dcba0d93fce7441 Mon Sep 17 00:00:00 2001 From: anthdm Date: Thu, 1 Feb 2018 08:19:29 +0100 Subject: [PATCH 06/11] added peerCount. --- pkg/network/server.go | 72 +++++++++++++++++++++++++++----------- pkg/network/server_test.go | 13 +++++++ 2 files changed, 65 insertions(+), 20 deletions(-) diff --git a/pkg/network/server.go b/pkg/network/server.go index 7e73c57d7..c18c11695 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -5,7 +5,6 @@ import ( "log" "net" "os" - "strconv" "time" "github.com/anthdm/neo-go/pkg/network/payload" @@ -53,6 +52,8 @@ type Server struct { relay bool // TCP listener of the server listener net.Listener + // channel for safely responding the number of current connected peers. + peerCountCh chan peerCount } // NewServer returns a pointer to a new server. @@ -64,28 +65,26 @@ func NewServer(net NetMode) *Server { } s := &Server{ - id: util.RandUint32(1111111, 9999999), - userAgent: fmt.Sprintf("/NEO:%s/", version), - logger: logger, - peers: make(map[Peer]bool), - register: make(chan Peer), - unregister: make(chan Peer), - message: make(chan messageTuple), - relay: true, // currently relay is not handled. - net: net, - quit: make(chan struct{}), + id: util.RandUint32(1111111, 9999999), + userAgent: fmt.Sprintf("/NEO:%s/", version), + logger: logger, + peers: make(map[Peer]bool), + register: make(chan Peer), + unregister: make(chan Peer), + message: make(chan messageTuple), + relay: true, // currently relay is not handled. + net: net, + quit: make(chan struct{}), + peerCountCh: make(chan peerCount), } return s } // Start run's the server. -func (s *Server) Start(port string, seeds []string) { - p, err := strconv.Atoi(port[1:len(port)]) - if err != nil { - s.logger.Fatalf("could not convert port to integer: %s", err) - } - s.port = uint16(p) +// TODO: server should be initialized with a config. +func (s *Server) Start(opts StartOpts) { + s.port = uint16(opts.TCP) fmt.Println(logo()) fmt.Println(string(s.userAgent)) @@ -93,10 +92,14 @@ func (s *Server) Start(port string, seeds []string) { s.logger.Printf("NET: %s - TCP: %d - RELAY: %v - ID: %d", s.net, int(s.port), s.relay, s.id) - go listenTCP(s, port) + go listenTCP(s, opts.TCP) - if len(seeds) > 0 { - connectToSeeds(s, seeds) + if opts.RPC > 0 { + go listenHTTP(s, opts.RPC) + } + + if len(opts.Seeds) > 0 { + connectToSeeds(s, opts.Seeds) } s.loop() @@ -137,6 +140,9 @@ func (s *Server) loop() { s.logger.Printf("peer %s disconnected", peer.addr()) } + case t := <-s.peerCountCh: + t.count <- len(s.peers) + case <-s.quit: s.shutdown() } @@ -208,6 +214,7 @@ func (s *Server) peerAlreadyConnected(addr net.Addr) bool { func (s *Server) sendLoop(peer Peer) { // TODO: check if this peer is still connected. + // dont keep asking (maxPeers and no new nodes) for { getaddrMsg := newMessage(s.net, cmdGetAddr, nil) peer.callGetaddr(getaddrMsg) @@ -216,6 +223,31 @@ func (s *Server) sendLoop(peer Peer) { } } +type peerCount struct { + count chan int +} + +// peerCount returns the number of connected peers to this server. +func (s *Server) peerCount() int { + ch := peerCount{ + count: make(chan int), + } + + s.peerCountCh <- ch + + return <-ch.count +} + +// StartOpts holds the server configuration. +type StartOpts struct { + // tcp port + TCP int + // slice of peer addresses the server will connect to + Seeds []string + // JSON-RPC port. If 0 no RPC handler will be attached. + RPC int +} + func logo() string { return ` _ ____________ __________ diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index ad07d93e1..3c1771296 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -23,6 +23,19 @@ func TestHandleVersion(t *testing.T) { t.Fatal("verack payload should be nil") } } +func TestPeerCount(t *testing.T) { + s := NewServer(ModeDevNet) + go s.loop() + + lenPeers := 10 + for i := 0; i < lenPeers; i++ { + s.register <- NewLocalPeer(s) + } + + if have, want := s.peerCount(), lenPeers; want != have { + t.Fatalf("expected %d connected peers got %d", want, have) + } +} func TestHandleAddrCmd(t *testing.T) { // todo From 8c33392ff657235303c977945f9a0b340bcb7f0b Mon Sep 17 00:00:00 2001 From: anthdm Date: Thu, 1 Feb 2018 08:19:44 +0100 Subject: [PATCH 07/11] Start a server with startOpts --- cmd/neoserver/main.go | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/cmd/neoserver/main.go b/cmd/neoserver/main.go index f06c73c3d..fff926922 100644 --- a/cmd/neoserver/main.go +++ b/cmd/neoserver/main.go @@ -8,25 +8,36 @@ import ( ) var ( - port = flag.String("port", ":3000", "port the TCP listener will listen on.") + tcp = flag.Int("tcp", 3000, "port TCP listener will listen on.") seed = flag.String("seed", "", "initial seed servers.") net = flag.Int("net", 56753, "the mode the server will operate in.") + rpc = flag.Int("rpc", 0, "let this server also respond to rpc calls on this port") ) // Simple dirty and quick bootstrapping for the sake of development. // e.g run 2 nodes: -// neoserver -port :4000 -// neoserver -port :3000 -seed 127.0.0.1:4000 +// neoserver -tcp :4000 +// neoserver -tcp :3000 -seed 127.0.0.1:4000 func main() { flag.Parse() + opts := network.StartOpts{ + Seeds: parseSeeds(*seed), + TCP: *tcp, + RPC: *rpc, + } + s := network.NewServer(network.NetMode(*net)) - seeds := strings.Split(*seed, ",") - if len(seeds) == 0 { - seeds = []string{*seed} - } - if *seed == "" { - seeds = []string{} - } - s.Start(*port, seeds) + s.Start(opts) +} + +func parseSeeds(s string) []string { + if len(s) == 0 { + return nil + } + seeds := strings.Split(s, ",") + if len(seeds) == 0 { + return nil + } + return seeds } From 22d3572e12dfbc10fa4f024a90a3280270f0a077 Mon Sep 17 00:00:00 2001 From: anthdm Date: Thu, 1 Feb 2018 08:19:52 +0100 Subject: [PATCH 08/11] Updated README --- README.md | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 1150445c7..87e863e74 100644 --- a/README.md +++ b/README.md @@ -34,9 +34,9 @@ The project will exist out of the following topics/packages: 1. network (started) 2. core (started) -3. vm (open) -4. smartcontract (open) -5. api (RPC server) (open) +3. api (JSON-RPC server) (started) +4. vm (open) +5. smartcontract (open) # Getting started ### Server @@ -53,8 +53,16 @@ You can add multiple seeds if you want: `neoserver -seed 127.0.0.1:20333,127.0.01:20334` +By default the server will currently run on port 3000, for testing purposes. You can change that by setting the tcp flag: + +`neoserver -seed 127.0.0.1:20333 -tcp 1337` + ### RPC -To be implemented.. +If you want your node to also serve JSON-RPC, you can do that by setting the following flag: + +`neoserver -rpc 4000` + +In this case server will accept and respond JSON-RPC on port 4000. Keep in mind that currently there is only a small subset of the JSON-RPC implemented. Feel free to make a PR with more functionality. ### vm To be implemented.. From 04e9060484d28c96d5937569869ab471b9e6f342 Mon Sep 17 00:00:00 2001 From: anthdm Date: Thu, 1 Feb 2018 10:25:34 +0100 Subject: [PATCH 09/11] Added GetBlocks payload --- pkg/network/payload/getblocks.go | 48 +++++++++++++++++++++++++++ pkg/network/payload/getblocks_test.go | 37 +++++++++++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 pkg/network/payload/getblocks.go create mode 100644 pkg/network/payload/getblocks_test.go diff --git a/pkg/network/payload/getblocks.go b/pkg/network/payload/getblocks.go new file mode 100644 index 000000000..909335b9a --- /dev/null +++ b/pkg/network/payload/getblocks.go @@ -0,0 +1,48 @@ +package payload + +import ( + "encoding/binary" + "io" + + . "github.com/anthdm/neo-go/pkg/util" +) + +// GetBlocks payload +type GetBlocks struct { + // hash of latest block that node requests + HashStart []Uint256 + // hash of last block that node requests + HashStop Uint256 +} + +// NewGetBlocks return a pointer to a GetBlocks object. +func NewGetBlocks(start []Uint256, stop Uint256) *GetBlocks { + return &GetBlocks{ + HashStart: start, + HashStop: stop, + } +} + +// DecodeBinary implements the payload interface. +func (p *GetBlocks) DecodeBinary(r io.Reader) error { + var lenStart uint8 + + err := binary.Read(r, binary.LittleEndian, &lenStart) + p.HashStart = make([]Uint256, lenStart) + err = binary.Read(r, binary.LittleEndian, &p.HashStart) + err = binary.Read(r, binary.LittleEndian, &p.HashStop) + + return err +} + +// EncodeBinary implements the payload interface. +func (p *GetBlocks) EncodeBinary(w io.Writer) error { + err := binary.Write(w, binary.LittleEndian, uint8(len(p.HashStart))) + err = binary.Write(w, binary.LittleEndian, p.HashStart) + err = binary.Write(w, binary.LittleEndian, p.HashStop) + + return err +} + +// Size implements the payload interface. +func (p *GetBlocks) Size() uint32 { return 0 } diff --git a/pkg/network/payload/getblocks_test.go b/pkg/network/payload/getblocks_test.go new file mode 100644 index 000000000..c73772386 --- /dev/null +++ b/pkg/network/payload/getblocks_test.go @@ -0,0 +1,37 @@ +package payload + +import ( + "bytes" + "crypto/sha256" + "reflect" + "testing" + + . "github.com/anthdm/neo-go/pkg/util" +) + +func TestGetBlocksEncodeDecode(t *testing.T) { + start := []Uint256{ + sha256.Sum256([]byte("a")), + sha256.Sum256([]byte("b")), + } + stop := sha256.Sum256([]byte("c")) + + p := NewGetBlocks(start, stop) + buf := new(bytes.Buffer) + if err := p.EncodeBinary(buf); err != nil { + t.Fatal(err) + } + + if have, want := buf.Len(), 1+64+32; have != want { + t.Fatalf("expecting a length of %d got %d", want, have) + } + + pDecode := &GetBlocks{} + if err := pDecode.DecodeBinary(buf); err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(p, pDecode) { + t.Fatalf("expecting both getblocks payloads to be equal %v and %v", p, pDecode) + } +} From 63072ebe75cd2ebb1c58d0f23a393bc3cb787b7b Mon Sep 17 00:00:00 2001 From: anthdm Date: Thu, 1 Feb 2018 10:56:33 +0100 Subject: [PATCH 10/11] Added getheaders payload + abstracted the fields. --- pkg/network/payload/getblocks.go | 32 +++++++++++++--------- pkg/network/payload/getheaders.go | 17 ++++++++++++ pkg/network/payload/getheaders_test.go | 37 ++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 13 deletions(-) create mode 100644 pkg/network/payload/getheaders.go create mode 100644 pkg/network/payload/getheaders_test.go diff --git a/pkg/network/payload/getblocks.go b/pkg/network/payload/getblocks.go index 909335b9a..81371cb25 100644 --- a/pkg/network/payload/getblocks.go +++ b/pkg/network/payload/getblocks.go @@ -7,24 +7,17 @@ import ( . "github.com/anthdm/neo-go/pkg/util" ) -// GetBlocks payload -type GetBlocks struct { +// HashStartStop contains fields and methods to be shared with the +// "GetBlocks" and "GetHeaders" payload. +type HashStartStop struct { // hash of latest block that node requests HashStart []Uint256 // hash of last block that node requests HashStop Uint256 } -// NewGetBlocks return a pointer to a GetBlocks object. -func NewGetBlocks(start []Uint256, stop Uint256) *GetBlocks { - return &GetBlocks{ - HashStart: start, - HashStop: stop, - } -} - // DecodeBinary implements the payload interface. -func (p *GetBlocks) DecodeBinary(r io.Reader) error { +func (p *HashStartStop) DecodeBinary(r io.Reader) error { var lenStart uint8 err := binary.Read(r, binary.LittleEndian, &lenStart) @@ -36,7 +29,7 @@ func (p *GetBlocks) DecodeBinary(r io.Reader) error { } // EncodeBinary implements the payload interface. -func (p *GetBlocks) EncodeBinary(w io.Writer) error { +func (p *HashStartStop) EncodeBinary(w io.Writer) error { err := binary.Write(w, binary.LittleEndian, uint8(len(p.HashStart))) err = binary.Write(w, binary.LittleEndian, p.HashStart) err = binary.Write(w, binary.LittleEndian, p.HashStop) @@ -45,4 +38,17 @@ func (p *GetBlocks) EncodeBinary(w io.Writer) error { } // Size implements the payload interface. -func (p *GetBlocks) Size() uint32 { return 0 } +func (p *HashStartStop) Size() uint32 { return 0 } + +// GetBlocks payload +type GetBlocks struct { + HashStartStop +} + +// NewGetBlocks return a pointer to a GetBlocks object. +func NewGetBlocks(start []Uint256, stop Uint256) *GetBlocks { + p := &GetBlocks{} + p.HashStart = start + p.HashStop = stop + return p +} diff --git a/pkg/network/payload/getheaders.go b/pkg/network/payload/getheaders.go new file mode 100644 index 000000000..9ade274f7 --- /dev/null +++ b/pkg/network/payload/getheaders.go @@ -0,0 +1,17 @@ +package payload + +import "github.com/anthdm/neo-go/pkg/util" + +// GetHeaders payload is the same as the "GetBlocks" payload. +type GetHeaders struct { + HashStartStop +} + +// NewGetHeaders return a pointer to a GetHeaders object. +func NewGetHeaders(start []util.Uint256, stop util.Uint256) *GetHeaders { + p := &GetHeaders{} + p.HashStart = start + p.HashStop = stop + + return p +} diff --git a/pkg/network/payload/getheaders_test.go b/pkg/network/payload/getheaders_test.go new file mode 100644 index 000000000..32cbb1b86 --- /dev/null +++ b/pkg/network/payload/getheaders_test.go @@ -0,0 +1,37 @@ +package payload + +import ( + "bytes" + "crypto/sha256" + "reflect" + "testing" + + "github.com/anthdm/neo-go/pkg/util" +) + +func TestGetHeadersEncodeDecode(t *testing.T) { + start := []util.Uint256{ + sha256.Sum256([]byte("a")), + sha256.Sum256([]byte("b")), + } + stop := sha256.Sum256([]byte("c")) + + p := NewGetHeaders(start, stop) + buf := new(bytes.Buffer) + if err := p.EncodeBinary(buf); err != nil { + t.Fatal(err) + } + + if have, want := buf.Len(), 1+64+32; have != want { + t.Fatalf("expecting a length of %d got %d", want, have) + } + + pDecode := &GetHeaders{} + if err := pDecode.DecodeBinary(buf); err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(p, pDecode) { + t.Fatalf("expecting both getheaders payloads to be equal %v and %v", p, pDecode) + } +} From b416a51db7ebb94f4b5540039a5f7bf2140921e2 Mon Sep 17 00:00:00 2001 From: anthdm Date: Thu, 1 Feb 2018 14:53:49 +0100 Subject: [PATCH 11/11] tweaked TCP transport + finished version + verack. --- pkg/network/peer.go | 3 -- pkg/network/tcp.go | 85 ++++++++++++++++++++++++++------------------- 2 files changed, 50 insertions(+), 38 deletions(-) diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 025f4b7d8..36a4f87f2 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -9,7 +9,6 @@ import ( type Peer interface { id() uint32 addr() util.Endpoint - verack() bool disconnect() callVersion(*Message) callGetaddr(*Message) @@ -20,7 +19,6 @@ type Peer interface { type LocalPeer struct { s *Server nonce uint32 - isVerack bool endpoint util.Endpoint } @@ -39,6 +37,5 @@ func (p *LocalPeer) callGetaddr(msg *Message) { } func (p *LocalPeer) id() uint32 { return p.nonce } -func (p *LocalPeer) verack() bool { return p.isVerack } func (p *LocalPeer) addr() util.Endpoint { return p.endpoint } func (p *LocalPeer) disconnect() {} diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go index a1280a6b0..648fe6274 100644 --- a/pkg/network/tcp.go +++ b/pkg/network/tcp.go @@ -22,6 +22,7 @@ func listenTCP(s *Server, port int) error { if err != nil { return err } + go handleConnection(s, conn) } } @@ -54,8 +55,10 @@ func handleConnection(s *Server, conn net.Conn) { s.unregister <- peer }() - // Start a goroutine that will handle all writes to the registered peer. + // Start a goroutine that will handle all outgoing messages. go peer.writeLoop() + // Start a goroutine that will handle all incomming messages. + go handleMessage(s, peer) // Read from the connection and decode it into a Message ready for processing. buf := make([]byte, 1024) @@ -71,39 +74,55 @@ func handleConnection(s *Server, conn net.Conn) { s.logger.Printf("decode error %s", err) break } - handleMessage(msg, s, peer) + + peer.receive <- msg } } // handleMessage hands the message received from a TCP connection over to the server. -func handleMessage(msg *Message, s *Server, p *TCPPeer) { - command := msg.commandType() +func handleMessage(s *Server, p *TCPPeer) { + // Disconnect the peer when we break out of the loop. + defer func() { + p.disconnect() + }() - s.logger.Printf("IN :: %d :: %s :: %v", p.id(), command, msg) + for { + msg := <-p.receive + command := msg.commandType() - switch command { - case cmdVersion: - resp := s.handleVersionCmd(msg, p) - p.isVerack = true - p.nonce = msg.Payload.(*payload.Version).Nonce - p.send <- resp - case cmdAddr: - s.handleAddrCmd(msg, p) - case cmdGetAddr: - s.handleGetaddrCmd(msg, p) - case cmdInv: - resp := s.handleInvCmd(msg, p) - p.send <- resp - case cmdBlock: - case cmdConsensus: - case cmdTX: - case cmdVerack: - go s.sendLoop(p) - case cmdGetHeaders: - case cmdGetBlocks: - case cmdGetData: - case cmdHeaders: - default: + s.logger.Printf("IN :: %d :: %s :: %v", p.id(), command, msg) + + switch command { + case cmdVersion: + resp := s.handleVersionCmd(msg, p) + p.nonce = msg.Payload.(*payload.Version).Nonce + p.send <- resp + + // after sending our version we want a "verack" and nothing else. + msg := <-p.receive + if msg.commandType() != cmdVerack { + break + } + // we can start the protocol now. + go s.sendLoop(p) + case cmdAddr: + s.handleAddrCmd(msg, p) + case cmdGetAddr: + s.handleGetaddrCmd(msg, p) + case cmdInv: + resp := s.handleInvCmd(msg, p) + p.send <- resp + case cmdBlock: + case cmdConsensus: + case cmdTX: + case cmdVerack: + // disconnect the peer, verack should already be handled. + break + case cmdGetHeaders: + case cmdGetBlocks: + case cmdGetData: + case cmdHeaders: + } } } @@ -118,8 +137,8 @@ type TCPPeer struct { endpoint util.Endpoint // channel to coordinate messages writen back to the connection. send chan *Message - // whether this peers version was acknowledged. - isVerack bool + // channel to receive from underlying connection. + receive chan *Message } // NewTCPPeer returns a pointer to a TCP Peer. @@ -129,6 +148,7 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { return &TCPPeer{ conn: conn, send: make(chan *Message), + receive: make(chan *Message), endpoint: e, s: s, } @@ -148,11 +168,6 @@ func (p *TCPPeer) addr() util.Endpoint { return p.endpoint } -// verack implements the peer interface -func (p *TCPPeer) verack() bool { - return p.isVerack -} - // callGetaddr will send the "getaddr" command to the remote. func (p *TCPPeer) callGetaddr(msg *Message) { p.send <- msg