From 83306a5c9624c2e6dbad8b6e8c0d9c3cab803ab5 Mon Sep 17 00:00:00 2001 From: Anthony De Meulemeester Date: Thu, 1 Feb 2018 09:00:42 +0100 Subject: [PATCH] Added the start of RPC + removed proxy functions. * Set the listener of the server when opened. * refactor server RPC. * deleted proxy functions + moved TCPPeer to tcp file * implemented the start of JSON-RPC * changed string port args to int * added peerCount. * Start a server with startOpts * Updated README --- README.md | 16 ++- cmd/neoserver/main.go | 33 ++++-- pkg/network/peer.go | 80 --------------- pkg/network/rpc.go | 129 ++++++++++++++++++++++++ pkg/network/server.go | 201 +++++++++++++++---------------------- pkg/network/server_test.go | 14 +++ pkg/network/tcp.go | 88 +++++++++++++++- 7 files changed, 341 insertions(+), 220 deletions(-) create mode 100644 pkg/network/rpc.go diff --git a/README.md b/README.md index 1150445c7..87e863e74 100644 --- a/README.md +++ b/README.md @@ -34,9 +34,9 @@ The project will exist out of the following topics/packages: 1. network (started) 2. core (started) -3. vm (open) -4. smartcontract (open) -5. api (RPC server) (open) +3. api (JSON-RPC server) (started) +4. vm (open) +5. smartcontract (open) # Getting started ### Server @@ -53,8 +53,16 @@ You can add multiple seeds if you want: `neoserver -seed 127.0.0.1:20333,127.0.01:20334` +By default the server will currently run on port 3000, for testing purposes. You can change that by setting the tcp flag: + +`neoserver -seed 127.0.0.1:20333 -tcp 1337` + ### RPC -To be implemented.. +If you want your node to also serve JSON-RPC, you can do that by setting the following flag: + +`neoserver -rpc 4000` + +In this case server will accept and respond JSON-RPC on port 4000. Keep in mind that currently there is only a small subset of the JSON-RPC implemented. Feel free to make a PR with more functionality. ### vm To be implemented.. diff --git a/cmd/neoserver/main.go b/cmd/neoserver/main.go index f06c73c3d..fff926922 100644 --- a/cmd/neoserver/main.go +++ b/cmd/neoserver/main.go @@ -8,25 +8,36 @@ import ( ) var ( - port = flag.String("port", ":3000", "port the TCP listener will listen on.") + tcp = flag.Int("tcp", 3000, "port TCP listener will listen on.") seed = flag.String("seed", "", "initial seed servers.") net = flag.Int("net", 56753, "the mode the server will operate in.") + rpc = flag.Int("rpc", 0, "let this server also respond to rpc calls on this port") ) // Simple dirty and quick bootstrapping for the sake of development. // e.g run 2 nodes: -// neoserver -port :4000 -// neoserver -port :3000 -seed 127.0.0.1:4000 +// neoserver -tcp :4000 +// neoserver -tcp :3000 -seed 127.0.0.1:4000 func main() { flag.Parse() + opts := network.StartOpts{ + Seeds: parseSeeds(*seed), + TCP: *tcp, + RPC: *rpc, + } + s := network.NewServer(network.NetMode(*net)) - seeds := strings.Split(*seed, ",") - if len(seeds) == 0 { - seeds = []string{*seed} - } - if *seed == "" { - seeds = []string{} - } - s.Start(*port, seeds) + s.Start(opts) +} + +func parseSeeds(s string) []string { + if len(s) == 0 { + return nil + } + seeds := strings.Split(s, ",") + if len(seeds) == 0 { + return nil + } + return seeds } diff --git a/pkg/network/peer.go b/pkg/network/peer.go index af38fe142..025f4b7d8 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -1,8 +1,6 @@ package network import ( - "net" - "github.com/anthdm/neo-go/pkg/util" ) @@ -44,81 +42,3 @@ func (p *LocalPeer) id() uint32 { return p.nonce } func (p *LocalPeer) verack() bool { return p.isVerack } func (p *LocalPeer) addr() util.Endpoint { return p.endpoint } func (p *LocalPeer) disconnect() {} - -// TCPPeer represents a remote node, backed by TCP transport. -type TCPPeer struct { - s *Server - // nonce (id) of the peer. - nonce uint32 - // underlying TCP connection - conn net.Conn - // host and port information about this peer. - endpoint util.Endpoint - // channel to coordinate messages writen back to the connection. - send chan *Message - // whether this peers version was acknowledged. - isVerack bool -} - -// NewTCPPeer returns a pointer to a TCP Peer. -func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { - e, _ := util.EndpointFromString(conn.RemoteAddr().String()) - - return &TCPPeer{ - conn: conn, - send: make(chan *Message), - endpoint: e, - s: s, - } -} - -func (p *TCPPeer) callVersion(msg *Message) { - p.send <- msg -} - -// id implements the peer interface -func (p *TCPPeer) id() uint32 { - return p.nonce -} - -// endpoint implements the peer interface -func (p *TCPPeer) addr() util.Endpoint { - return p.endpoint -} - -// verack implements the peer interface -func (p *TCPPeer) verack() bool { - return p.isVerack -} - -// callGetaddr will send the "getaddr" command to the remote. -func (p *TCPPeer) callGetaddr(msg *Message) { - p.send <- msg -} - -func (p *TCPPeer) disconnect() { - close(p.send) - p.conn.Close() -} - -// writeLoop writes messages to the underlying TCP connection. -// A goroutine writeLoop is started for each connection. -// There should be at most one writer to a connection executing -// all writes from this goroutine. -func (p *TCPPeer) writeLoop() { - // clean up the connection. - defer func() { - p.conn.Close() - }() - - for { - msg := <-p.send - - p.s.logger.Printf("OUT :: %s :: %+v", msg.commandType(), msg.Payload) - - // should we disconnect here? - if err := msg.encode(p.conn); err != nil { - p.s.logger.Printf("encode error: %s", err) - } - } -} diff --git a/pkg/network/rpc.go b/pkg/network/rpc.go new file mode 100644 index 000000000..c1156fa65 --- /dev/null +++ b/pkg/network/rpc.go @@ -0,0 +1,129 @@ +package network + +import ( + "encoding/json" + "fmt" + "net/http" +) + +const ( + rpcPortMainNet = 20332 + rpcPortTestNet = 10332 + rpcVersion = "2.0" + + // error response messages + methodNotFound = "Method not found" + parseError = "Parse error" +) + +// Each NEO node has a set of optional APIs for accessing blockchain +// data and making things easier for development of blockchain apps. +// APIs are provided via JSON-RPC , comm at bottom layer is with http/https protocol. + +// listenHTTP creates an ingress bridge from the outside world to the passed +// server, by installing handlers for all the necessary RPCs to the passed mux. +func listenHTTP(s *Server, port int) { + api := &API{s} + p := fmt.Sprintf(":%d", port) + s.logger.Printf("serving RPC on %d", port) + s.logger.Printf("%s", http.ListenAndServe(p, api)) +} + +// API serves JSON-RPC. +type API struct { + s *Server +} + +func (s *API) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Official nodes respond a parse error if the method is not POST. + // Instead of returning a decent response for this, let's do the same. + if r.Method != "POST" { + writeError(w, 0, 0, parseError) + } + + var req Request + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, 0, 0, parseError) + return + } + defer r.Body.Close() + + if req.Version != rpcVersion { + writeJSON(w, http.StatusBadRequest, nil) + return + } + + switch req.Method { + case "getconnectioncount": + if err := s.getConnectionCount(w, &req); err != nil { + writeError(w, 0, 0, parseError) + return + } + case "getblockcount": + case "getbestblockhash": + default: + writeError(w, 0, 0, methodNotFound) + } +} + +// This is an Example on how we could handle incomming RPC requests. +func (s *API) getConnectionCount(w http.ResponseWriter, req *Request) error { + count := s.s.peerCount() + + resp := ConnectionCountResponse{ + Version: rpcVersion, + Result: count, + ID: 1, + } + + return writeJSON(w, http.StatusOK, resp) +} + +// writeError returns a JSON error with given parameters. All error HTTP +// status codes are 200. According to the official API. +func writeError(w http.ResponseWriter, id, code int, msg string) error { + resp := RequestError{ + Version: rpcVersion, + ID: id, + Error: Error{ + Code: code, + Message: msg, + }, + } + + return writeJSON(w, http.StatusOK, resp) +} + +func writeJSON(w http.ResponseWriter, status int, v interface{}) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + return json.NewEncoder(w).Encode(v) +} + +// Request is an object received through JSON-RPC from the client. +type Request struct { + Version string `json:"jsonrpc"` + Method string `json:"method"` + Params []string `json:"params"` + ID int `json:"id"` +} + +// ConnectionCountResponse .. +type ConnectionCountResponse struct { + Version string `json:"jsonrpc"` + Result int `json:"result"` + ID int `json:"id"` +} + +// RequestError .. +type RequestError struct { + Version string `json:"jsonrpc"` + ID int `json:"id"` + Error Error `json:"error"` +} + +// Error holds information about an RCP error. +type Error struct { + Code int `json:"code"` + Message string `json:"message"` +} diff --git a/pkg/network/server.go b/pkg/network/server.go index 36dd70fa9..c18c11695 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -5,7 +5,6 @@ import ( "log" "net" "os" - "strconv" "time" "github.com/anthdm/neo-go/pkg/network/payload" @@ -53,12 +52,8 @@ type Server struct { relay bool // TCP listener of the server listener net.Listener - - // RPC channels - versionCh chan versionTuple - getaddrCh chan getaddrTuple - invCh chan invTuple - addrCh chan addrTuple + // channel for safely responding the number of current connected peers. + peerCountCh chan peerCount } // NewServer returns a pointer to a new server. @@ -70,32 +65,26 @@ func NewServer(net NetMode) *Server { } s := &Server{ - id: util.RandUint32(1111111, 9999999), - userAgent: fmt.Sprintf("/NEO:%s/", version), - logger: logger, - peers: make(map[Peer]bool), - register: make(chan Peer), - unregister: make(chan Peer), - message: make(chan messageTuple), - relay: true, // currently relay is not handled. - net: net, - quit: make(chan struct{}), - versionCh: make(chan versionTuple), - getaddrCh: make(chan getaddrTuple), - invCh: make(chan invTuple), - addrCh: make(chan addrTuple), + id: util.RandUint32(1111111, 9999999), + userAgent: fmt.Sprintf("/NEO:%s/", version), + logger: logger, + peers: make(map[Peer]bool), + register: make(chan Peer), + unregister: make(chan Peer), + message: make(chan messageTuple), + relay: true, // currently relay is not handled. + net: net, + quit: make(chan struct{}), + peerCountCh: make(chan peerCount), } return s } // Start run's the server. -func (s *Server) Start(port string, seeds []string) { - p, err := strconv.Atoi(port[1:len(port)]) - if err != nil { - s.logger.Fatalf("could not convert port to integer: %s", err) - } - s.port = uint16(p) +// TODO: server should be initialized with a config. +func (s *Server) Start(opts StartOpts) { + s.port = uint16(opts.TCP) fmt.Println(logo()) fmt.Println(string(s.userAgent)) @@ -103,10 +92,14 @@ func (s *Server) Start(port string, seeds []string) { s.logger.Printf("NET: %s - TCP: %d - RELAY: %v - ID: %d", s.net, int(s.port), s.relay, s.id) - go listenTCP(s, port) + go listenTCP(s, opts.TCP) - if len(seeds) > 0 { - connectToSeeds(s, seeds) + if opts.RPC > 0 { + go listenHTTP(s, opts.RPC) + } + + if len(opts.Seeds) > 0 { + connectToSeeds(s, opts.Seeds) } s.loop() @@ -147,44 +140,8 @@ func (s *Server) loop() { s.logger.Printf("peer %s disconnected", peer.addr()) } - // Process the received version and respond with a verack. - case t := <-s.versionCh: - if s.id == t.request.Nonce { - t.peer.disconnect() - } - if t.peer.addr().Port != t.request.Port { - t.peer.disconnect() - } - t.response <- newMessage(ModeDevNet, cmdVerack, nil) - - // Process the getaddr cmd. - case t := <-s.getaddrCh: - t.response <- &Message{} // just for now. - - // Process the addr cmd. Register peer will handle the maxPeers connected. - case t := <-s.addrCh: - for _, addr := range t.request.Addrs { - if !s.peerAlreadyConnected(addr.Addr) { - // TODO: this is not transport abstracted. - go connectToRemoteNode(s, addr.Addr.String()) - } - } - t.response <- true - - // Process inventories cmd. - case t := <-s.invCh: - if !t.request.Type.Valid() { - t.peer.disconnect() - break - } - if len(t.request.Hashes) == 0 { - t.peer.disconnect() - break - } - - payload := payload.NewInventory(t.request.Type, t.request.Hashes) - msg := newMessage(s.net, cmdGetData, payload) - t.response <- msg + case t := <-s.peerCountCh: + t.count <- len(s.peers) case <-s.quit: s.shutdown() @@ -202,73 +159,47 @@ func (s *Server) handlePeerConnected(p Peer) { p.callVersion(msg) } -type versionTuple struct { - peer Peer - request *payload.Version - response chan *Message -} - func (s *Server) handleVersionCmd(msg *Message, p Peer) *Message { - t := versionTuple{ - peer: p, - request: msg.Payload.(*payload.Version), - response: make(chan *Message), + version := msg.Payload.(*payload.Version) + if s.id == version.Nonce { + p.disconnect() + return nil } - - s.versionCh <- t - - return <-t.response -} - -type getaddrTuple struct { - peer Peer - request *Message - response chan *Message + if p.addr().Port != version.Port { + p.disconnect() + return nil + } + return newMessage(ModeDevNet, cmdVerack, nil) } func (s *Server) handleGetaddrCmd(msg *Message, p Peer) *Message { - t := getaddrTuple{ - peer: p, - request: msg, - response: make(chan *Message), - } - - s.getaddrCh <- t - - return <-t.response -} - -type invTuple struct { - peer Peer - request *payload.Inventory - response chan *Message + return nil } func (s *Server) handleInvCmd(msg *Message, p Peer) *Message { - t := invTuple{ - request: msg.Payload.(*payload.Inventory), - response: make(chan *Message), + inv := msg.Payload.(*payload.Inventory) + if !inv.Type.Valid() { + p.disconnect() + return nil + } + if len(inv.Hashes) == 0 { + p.disconnect() + return nil } - s.invCh <- t - - return <-t.response + payload := payload.NewInventory(inv.Type, inv.Hashes) + resp := newMessage(s.net, cmdGetData, payload) + return resp } -type addrTuple struct { - request *payload.AddressList - response chan bool -} - -func (s *Server) handleAddrCmd(msg *Message, p Peer) bool { - t := addrTuple{ - request: msg.Payload.(*payload.AddressList), - response: make(chan bool), +func (s *Server) handleAddrCmd(msg *Message, p Peer) { + addrList := msg.Payload.(*payload.AddressList) + for _, addr := range addrList.Addrs { + if !s.peerAlreadyConnected(addr.Addr) { + // TODO: this is not transport abstracted. + go connectToRemoteNode(s, addr.Addr.String()) + } } - - s.addrCh <- t - - return <-t.response } // check if the addr is already connected to the server. @@ -283,6 +214,7 @@ func (s *Server) peerAlreadyConnected(addr net.Addr) bool { func (s *Server) sendLoop(peer Peer) { // TODO: check if this peer is still connected. + // dont keep asking (maxPeers and no new nodes) for { getaddrMsg := newMessage(s.net, cmdGetAddr, nil) peer.callGetaddr(getaddrMsg) @@ -291,6 +223,31 @@ func (s *Server) sendLoop(peer Peer) { } } +type peerCount struct { + count chan int +} + +// peerCount returns the number of connected peers to this server. +func (s *Server) peerCount() int { + ch := peerCount{ + count: make(chan int), + } + + s.peerCountCh <- ch + + return <-ch.count +} + +// StartOpts holds the server configuration. +type StartOpts struct { + // tcp port + TCP int + // slice of peer addresses the server will connect to + Seeds []string + // JSON-RPC port. If 0 no RPC handler will be attached. + RPC int +} + func logo() string { return ` _ ____________ __________ diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go index ad07d93e1..f0ba721f7 100644 --- a/pkg/network/server_test.go +++ b/pkg/network/server_test.go @@ -24,6 +24,20 @@ func TestHandleVersion(t *testing.T) { } } +func TestPeerCount(t *testing.T) { + s := NewServer(ModeDevNet) + go s.loop() + + lenPeers := 10 + for i := 0; i < lenPeers; i++ { + s.register <- NewLocalPeer(s) + } + + if have, want := s.peerCount(), lenPeers; want != have { + t.Fatalf("expected %d connected peers got %d", want, have) + } +} + func TestHandleAddrCmd(t *testing.T) { // todo } diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go index f9ffc39e0..a1280a6b0 100644 --- a/pkg/network/tcp.go +++ b/pkg/network/tcp.go @@ -2,13 +2,15 @@ package network import ( "bytes" + "fmt" "net" "github.com/anthdm/neo-go/pkg/network/payload" + "github.com/anthdm/neo-go/pkg/util" ) -func listenTCP(s *Server, port string) error { - ln, err := net.Listen("tcp", port) +func listenTCP(s *Server, port int) error { + ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return err } @@ -73,10 +75,11 @@ func handleConnection(s *Server, conn net.Conn) { } } +// handleMessage hands the message received from a TCP connection over to the server. func handleMessage(msg *Message, s *Server, p *TCPPeer) { command := msg.commandType() - s.logger.Printf("%d :: IN :: %s :: %v", p.id(), command, msg) + s.logger.Printf("IN :: %d :: %s :: %v", p.id(), command, msg) switch command { case cmdVersion: @@ -103,3 +106,82 @@ func handleMessage(msg *Message, s *Server, p *TCPPeer) { default: } } + +// TCPPeer represents a remote node, backed by TCP transport. +type TCPPeer struct { + s *Server + // nonce (id) of the peer. + nonce uint32 + // underlying TCP connection + conn net.Conn + // host and port information about this peer. + endpoint util.Endpoint + // channel to coordinate messages writen back to the connection. + send chan *Message + // whether this peers version was acknowledged. + isVerack bool +} + +// NewTCPPeer returns a pointer to a TCP Peer. +func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer { + e, _ := util.EndpointFromString(conn.RemoteAddr().String()) + + return &TCPPeer{ + conn: conn, + send: make(chan *Message), + endpoint: e, + s: s, + } +} + +func (p *TCPPeer) callVersion(msg *Message) { + p.send <- msg +} + +// id implements the peer interface +func (p *TCPPeer) id() uint32 { + return p.nonce +} + +// endpoint implements the peer interface +func (p *TCPPeer) addr() util.Endpoint { + return p.endpoint +} + +// verack implements the peer interface +func (p *TCPPeer) verack() bool { + return p.isVerack +} + +// callGetaddr will send the "getaddr" command to the remote. +func (p *TCPPeer) callGetaddr(msg *Message) { + p.send <- msg +} + +// disconnect closes the send channel and the underlying connection. +func (p *TCPPeer) disconnect() { + close(p.send) + p.conn.Close() +} + +// writeLoop writes messages to the underlying TCP connection. +// A goroutine writeLoop is started for each connection. +// There should be at most one writer to a connection executing +// all writes from this goroutine. +func (p *TCPPeer) writeLoop() { + // clean up the connection. + defer func() { + p.conn.Close() + }() + + for { + msg := <-p.send + + p.s.logger.Printf("OUT :: %s :: %+v", msg.commandType(), msg.Payload) + + // should we disconnect here? + if err := msg.encode(p.conn); err != nil { + p.s.logger.Printf("encode error: %s", err) + } + } +}