Added the start of RPC + removed proxy functions.

* Set the listener of the server when opened.

* refactor server RPC.

* deleted proxy functions + moved TCPPeer to tcp file

* implemented the start of JSON-RPC

* changed string port args to int

* added peerCount.

* Start a server with startOpts

* Updated README
This commit is contained in:
Anthony De Meulemeester 2018-02-01 09:00:42 +01:00 committed by GitHub
parent 0eeb15f62d
commit 83306a5c96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 341 additions and 220 deletions

View file

@ -34,9 +34,9 @@ The project will exist out of the following topics/packages:
1. network (started) 1. network (started)
2. core (started) 2. core (started)
3. vm (open) 3. api (JSON-RPC server) (started)
4. smartcontract (open) 4. vm (open)
5. api (RPC server) (open) 5. smartcontract (open)
# Getting started # Getting started
### Server ### Server
@ -53,8 +53,16 @@ You can add multiple seeds if you want:
`neoserver -seed 127.0.0.1:20333,127.0.01:20334` `neoserver -seed 127.0.0.1:20333,127.0.01:20334`
By default the server will currently run on port 3000, for testing purposes. You can change that by setting the tcp flag:
`neoserver -seed 127.0.0.1:20333 -tcp 1337`
### RPC ### RPC
To be implemented.. If you want your node to also serve JSON-RPC, you can do that by setting the following flag:
`neoserver -rpc 4000`
In this case server will accept and respond JSON-RPC on port 4000. Keep in mind that currently there is only a small subset of the JSON-RPC implemented. Feel free to make a PR with more functionality.
### vm ### vm
To be implemented.. To be implemented..

View file

@ -8,25 +8,36 @@ import (
) )
var ( var (
port = flag.String("port", ":3000", "port the TCP listener will listen on.") tcp = flag.Int("tcp", 3000, "port TCP listener will listen on.")
seed = flag.String("seed", "", "initial seed servers.") seed = flag.String("seed", "", "initial seed servers.")
net = flag.Int("net", 56753, "the mode the server will operate in.") net = flag.Int("net", 56753, "the mode the server will operate in.")
rpc = flag.Int("rpc", 0, "let this server also respond to rpc calls on this port")
) )
// Simple dirty and quick bootstrapping for the sake of development. // Simple dirty and quick bootstrapping for the sake of development.
// e.g run 2 nodes: // e.g run 2 nodes:
// neoserver -port :4000 // neoserver -tcp :4000
// neoserver -port :3000 -seed 127.0.0.1:4000 // neoserver -tcp :3000 -seed 127.0.0.1:4000
func main() { func main() {
flag.Parse() flag.Parse()
opts := network.StartOpts{
Seeds: parseSeeds(*seed),
TCP: *tcp,
RPC: *rpc,
}
s := network.NewServer(network.NetMode(*net)) s := network.NewServer(network.NetMode(*net))
seeds := strings.Split(*seed, ",") s.Start(opts)
if len(seeds) == 0 { }
seeds = []string{*seed}
} func parseSeeds(s string) []string {
if *seed == "" { if len(s) == 0 {
seeds = []string{} return nil
} }
s.Start(*port, seeds) seeds := strings.Split(s, ",")
if len(seeds) == 0 {
return nil
}
return seeds
} }

View file

@ -1,8 +1,6 @@
package network package network
import ( import (
"net"
"github.com/anthdm/neo-go/pkg/util" "github.com/anthdm/neo-go/pkg/util"
) )
@ -44,81 +42,3 @@ func (p *LocalPeer) id() uint32 { return p.nonce }
func (p *LocalPeer) verack() bool { return p.isVerack } func (p *LocalPeer) verack() bool { return p.isVerack }
func (p *LocalPeer) addr() util.Endpoint { return p.endpoint } func (p *LocalPeer) addr() util.Endpoint { return p.endpoint }
func (p *LocalPeer) disconnect() {} func (p *LocalPeer) disconnect() {}
// TCPPeer represents a remote node, backed by TCP transport.
type TCPPeer struct {
s *Server
// nonce (id) of the peer.
nonce uint32
// underlying TCP connection
conn net.Conn
// host and port information about this peer.
endpoint util.Endpoint
// channel to coordinate messages writen back to the connection.
send chan *Message
// whether this peers version was acknowledged.
isVerack bool
}
// NewTCPPeer returns a pointer to a TCP Peer.
func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
e, _ := util.EndpointFromString(conn.RemoteAddr().String())
return &TCPPeer{
conn: conn,
send: make(chan *Message),
endpoint: e,
s: s,
}
}
func (p *TCPPeer) callVersion(msg *Message) {
p.send <- msg
}
// id implements the peer interface
func (p *TCPPeer) id() uint32 {
return p.nonce
}
// endpoint implements the peer interface
func (p *TCPPeer) addr() util.Endpoint {
return p.endpoint
}
// verack implements the peer interface
func (p *TCPPeer) verack() bool {
return p.isVerack
}
// callGetaddr will send the "getaddr" command to the remote.
func (p *TCPPeer) callGetaddr(msg *Message) {
p.send <- msg
}
func (p *TCPPeer) disconnect() {
close(p.send)
p.conn.Close()
}
// writeLoop writes messages to the underlying TCP connection.
// A goroutine writeLoop is started for each connection.
// There should be at most one writer to a connection executing
// all writes from this goroutine.
func (p *TCPPeer) writeLoop() {
// clean up the connection.
defer func() {
p.conn.Close()
}()
for {
msg := <-p.send
p.s.logger.Printf("OUT :: %s :: %+v", msg.commandType(), msg.Payload)
// should we disconnect here?
if err := msg.encode(p.conn); err != nil {
p.s.logger.Printf("encode error: %s", err)
}
}
}

129
pkg/network/rpc.go Normal file
View file

@ -0,0 +1,129 @@
package network
import (
"encoding/json"
"fmt"
"net/http"
)
const (
rpcPortMainNet = 20332
rpcPortTestNet = 10332
rpcVersion = "2.0"
// error response messages
methodNotFound = "Method not found"
parseError = "Parse error"
)
// Each NEO node has a set of optional APIs for accessing blockchain
// data and making things easier for development of blockchain apps.
// APIs are provided via JSON-RPC , comm at bottom layer is with http/https protocol.
// listenHTTP creates an ingress bridge from the outside world to the passed
// server, by installing handlers for all the necessary RPCs to the passed mux.
func listenHTTP(s *Server, port int) {
api := &API{s}
p := fmt.Sprintf(":%d", port)
s.logger.Printf("serving RPC on %d", port)
s.logger.Printf("%s", http.ListenAndServe(p, api))
}
// API serves JSON-RPC.
type API struct {
s *Server
}
func (s *API) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Official nodes respond a parse error if the method is not POST.
// Instead of returning a decent response for this, let's do the same.
if r.Method != "POST" {
writeError(w, 0, 0, parseError)
}
var req Request
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, 0, 0, parseError)
return
}
defer r.Body.Close()
if req.Version != rpcVersion {
writeJSON(w, http.StatusBadRequest, nil)
return
}
switch req.Method {
case "getconnectioncount":
if err := s.getConnectionCount(w, &req); err != nil {
writeError(w, 0, 0, parseError)
return
}
case "getblockcount":
case "getbestblockhash":
default:
writeError(w, 0, 0, methodNotFound)
}
}
// This is an Example on how we could handle incomming RPC requests.
func (s *API) getConnectionCount(w http.ResponseWriter, req *Request) error {
count := s.s.peerCount()
resp := ConnectionCountResponse{
Version: rpcVersion,
Result: count,
ID: 1,
}
return writeJSON(w, http.StatusOK, resp)
}
// writeError returns a JSON error with given parameters. All error HTTP
// status codes are 200. According to the official API.
func writeError(w http.ResponseWriter, id, code int, msg string) error {
resp := RequestError{
Version: rpcVersion,
ID: id,
Error: Error{
Code: code,
Message: msg,
},
}
return writeJSON(w, http.StatusOK, resp)
}
func writeJSON(w http.ResponseWriter, status int, v interface{}) error {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
return json.NewEncoder(w).Encode(v)
}
// Request is an object received through JSON-RPC from the client.
type Request struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Params []string `json:"params"`
ID int `json:"id"`
}
// ConnectionCountResponse ..
type ConnectionCountResponse struct {
Version string `json:"jsonrpc"`
Result int `json:"result"`
ID int `json:"id"`
}
// RequestError ..
type RequestError struct {
Version string `json:"jsonrpc"`
ID int `json:"id"`
Error Error `json:"error"`
}
// Error holds information about an RCP error.
type Error struct {
Code int `json:"code"`
Message string `json:"message"`
}

View file

@ -5,7 +5,6 @@ import (
"log" "log"
"net" "net"
"os" "os"
"strconv"
"time" "time"
"github.com/anthdm/neo-go/pkg/network/payload" "github.com/anthdm/neo-go/pkg/network/payload"
@ -53,12 +52,8 @@ type Server struct {
relay bool relay bool
// TCP listener of the server // TCP listener of the server
listener net.Listener listener net.Listener
// channel for safely responding the number of current connected peers.
// RPC channels peerCountCh chan peerCount
versionCh chan versionTuple
getaddrCh chan getaddrTuple
invCh chan invTuple
addrCh chan addrTuple
} }
// NewServer returns a pointer to a new server. // NewServer returns a pointer to a new server.
@ -70,32 +65,26 @@ func NewServer(net NetMode) *Server {
} }
s := &Server{ s := &Server{
id: util.RandUint32(1111111, 9999999), id: util.RandUint32(1111111, 9999999),
userAgent: fmt.Sprintf("/NEO:%s/", version), userAgent: fmt.Sprintf("/NEO:%s/", version),
logger: logger, logger: logger,
peers: make(map[Peer]bool), peers: make(map[Peer]bool),
register: make(chan Peer), register: make(chan Peer),
unregister: make(chan Peer), unregister: make(chan Peer),
message: make(chan messageTuple), message: make(chan messageTuple),
relay: true, // currently relay is not handled. relay: true, // currently relay is not handled.
net: net, net: net,
quit: make(chan struct{}), quit: make(chan struct{}),
versionCh: make(chan versionTuple), peerCountCh: make(chan peerCount),
getaddrCh: make(chan getaddrTuple),
invCh: make(chan invTuple),
addrCh: make(chan addrTuple),
} }
return s return s
} }
// Start run's the server. // Start run's the server.
func (s *Server) Start(port string, seeds []string) { // TODO: server should be initialized with a config.
p, err := strconv.Atoi(port[1:len(port)]) func (s *Server) Start(opts StartOpts) {
if err != nil { s.port = uint16(opts.TCP)
s.logger.Fatalf("could not convert port to integer: %s", err)
}
s.port = uint16(p)
fmt.Println(logo()) fmt.Println(logo())
fmt.Println(string(s.userAgent)) fmt.Println(string(s.userAgent))
@ -103,10 +92,14 @@ func (s *Server) Start(port string, seeds []string) {
s.logger.Printf("NET: %s - TCP: %d - RELAY: %v - ID: %d", s.logger.Printf("NET: %s - TCP: %d - RELAY: %v - ID: %d",
s.net, int(s.port), s.relay, s.id) s.net, int(s.port), s.relay, s.id)
go listenTCP(s, port) go listenTCP(s, opts.TCP)
if len(seeds) > 0 { if opts.RPC > 0 {
connectToSeeds(s, seeds) go listenHTTP(s, opts.RPC)
}
if len(opts.Seeds) > 0 {
connectToSeeds(s, opts.Seeds)
} }
s.loop() s.loop()
@ -147,44 +140,8 @@ func (s *Server) loop() {
s.logger.Printf("peer %s disconnected", peer.addr()) s.logger.Printf("peer %s disconnected", peer.addr())
} }
// Process the received version and respond with a verack. case t := <-s.peerCountCh:
case t := <-s.versionCh: t.count <- len(s.peers)
if s.id == t.request.Nonce {
t.peer.disconnect()
}
if t.peer.addr().Port != t.request.Port {
t.peer.disconnect()
}
t.response <- newMessage(ModeDevNet, cmdVerack, nil)
// Process the getaddr cmd.
case t := <-s.getaddrCh:
t.response <- &Message{} // just for now.
// Process the addr cmd. Register peer will handle the maxPeers connected.
case t := <-s.addrCh:
for _, addr := range t.request.Addrs {
if !s.peerAlreadyConnected(addr.Addr) {
// TODO: this is not transport abstracted.
go connectToRemoteNode(s, addr.Addr.String())
}
}
t.response <- true
// Process inventories cmd.
case t := <-s.invCh:
if !t.request.Type.Valid() {
t.peer.disconnect()
break
}
if len(t.request.Hashes) == 0 {
t.peer.disconnect()
break
}
payload := payload.NewInventory(t.request.Type, t.request.Hashes)
msg := newMessage(s.net, cmdGetData, payload)
t.response <- msg
case <-s.quit: case <-s.quit:
s.shutdown() s.shutdown()
@ -202,73 +159,47 @@ func (s *Server) handlePeerConnected(p Peer) {
p.callVersion(msg) p.callVersion(msg)
} }
type versionTuple struct {
peer Peer
request *payload.Version
response chan *Message
}
func (s *Server) handleVersionCmd(msg *Message, p Peer) *Message { func (s *Server) handleVersionCmd(msg *Message, p Peer) *Message {
t := versionTuple{ version := msg.Payload.(*payload.Version)
peer: p, if s.id == version.Nonce {
request: msg.Payload.(*payload.Version), p.disconnect()
response: make(chan *Message), return nil
} }
if p.addr().Port != version.Port {
s.versionCh <- t p.disconnect()
return nil
return <-t.response }
} return newMessage(ModeDevNet, cmdVerack, nil)
type getaddrTuple struct {
peer Peer
request *Message
response chan *Message
} }
func (s *Server) handleGetaddrCmd(msg *Message, p Peer) *Message { func (s *Server) handleGetaddrCmd(msg *Message, p Peer) *Message {
t := getaddrTuple{ return nil
peer: p,
request: msg,
response: make(chan *Message),
}
s.getaddrCh <- t
return <-t.response
}
type invTuple struct {
peer Peer
request *payload.Inventory
response chan *Message
} }
func (s *Server) handleInvCmd(msg *Message, p Peer) *Message { func (s *Server) handleInvCmd(msg *Message, p Peer) *Message {
t := invTuple{ inv := msg.Payload.(*payload.Inventory)
request: msg.Payload.(*payload.Inventory), if !inv.Type.Valid() {
response: make(chan *Message), p.disconnect()
return nil
}
if len(inv.Hashes) == 0 {
p.disconnect()
return nil
} }
s.invCh <- t payload := payload.NewInventory(inv.Type, inv.Hashes)
resp := newMessage(s.net, cmdGetData, payload)
return <-t.response return resp
} }
type addrTuple struct { func (s *Server) handleAddrCmd(msg *Message, p Peer) {
request *payload.AddressList addrList := msg.Payload.(*payload.AddressList)
response chan bool for _, addr := range addrList.Addrs {
} if !s.peerAlreadyConnected(addr.Addr) {
// TODO: this is not transport abstracted.
func (s *Server) handleAddrCmd(msg *Message, p Peer) bool { go connectToRemoteNode(s, addr.Addr.String())
t := addrTuple{ }
request: msg.Payload.(*payload.AddressList),
response: make(chan bool),
} }
s.addrCh <- t
return <-t.response
} }
// check if the addr is already connected to the server. // check if the addr is already connected to the server.
@ -283,6 +214,7 @@ func (s *Server) peerAlreadyConnected(addr net.Addr) bool {
func (s *Server) sendLoop(peer Peer) { func (s *Server) sendLoop(peer Peer) {
// TODO: check if this peer is still connected. // TODO: check if this peer is still connected.
// dont keep asking (maxPeers and no new nodes)
for { for {
getaddrMsg := newMessage(s.net, cmdGetAddr, nil) getaddrMsg := newMessage(s.net, cmdGetAddr, nil)
peer.callGetaddr(getaddrMsg) peer.callGetaddr(getaddrMsg)
@ -291,6 +223,31 @@ func (s *Server) sendLoop(peer Peer) {
} }
} }
type peerCount struct {
count chan int
}
// peerCount returns the number of connected peers to this server.
func (s *Server) peerCount() int {
ch := peerCount{
count: make(chan int),
}
s.peerCountCh <- ch
return <-ch.count
}
// StartOpts holds the server configuration.
type StartOpts struct {
// tcp port
TCP int
// slice of peer addresses the server will connect to
Seeds []string
// JSON-RPC port. If 0 no RPC handler will be attached.
RPC int
}
func logo() string { func logo() string {
return ` return `
_ ____________ __________ _ ____________ __________

View file

@ -24,6 +24,20 @@ func TestHandleVersion(t *testing.T) {
} }
} }
func TestPeerCount(t *testing.T) {
s := NewServer(ModeDevNet)
go s.loop()
lenPeers := 10
for i := 0; i < lenPeers; i++ {
s.register <- NewLocalPeer(s)
}
if have, want := s.peerCount(), lenPeers; want != have {
t.Fatalf("expected %d connected peers got %d", want, have)
}
}
func TestHandleAddrCmd(t *testing.T) { func TestHandleAddrCmd(t *testing.T) {
// todo // todo
} }

View file

@ -2,13 +2,15 @@ package network
import ( import (
"bytes" "bytes"
"fmt"
"net" "net"
"github.com/anthdm/neo-go/pkg/network/payload" "github.com/anthdm/neo-go/pkg/network/payload"
"github.com/anthdm/neo-go/pkg/util"
) )
func listenTCP(s *Server, port string) error { func listenTCP(s *Server, port int) error {
ln, err := net.Listen("tcp", port) ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil { if err != nil {
return err return err
} }
@ -73,10 +75,11 @@ func handleConnection(s *Server, conn net.Conn) {
} }
} }
// handleMessage hands the message received from a TCP connection over to the server.
func handleMessage(msg *Message, s *Server, p *TCPPeer) { func handleMessage(msg *Message, s *Server, p *TCPPeer) {
command := msg.commandType() command := msg.commandType()
s.logger.Printf("%d :: IN :: %s :: %v", p.id(), command, msg) s.logger.Printf("IN :: %d :: %s :: %v", p.id(), command, msg)
switch command { switch command {
case cmdVersion: case cmdVersion:
@ -103,3 +106,82 @@ func handleMessage(msg *Message, s *Server, p *TCPPeer) {
default: default:
} }
} }
// TCPPeer represents a remote node, backed by TCP transport.
type TCPPeer struct {
s *Server
// nonce (id) of the peer.
nonce uint32
// underlying TCP connection
conn net.Conn
// host and port information about this peer.
endpoint util.Endpoint
// channel to coordinate messages writen back to the connection.
send chan *Message
// whether this peers version was acknowledged.
isVerack bool
}
// NewTCPPeer returns a pointer to a TCP Peer.
func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
e, _ := util.EndpointFromString(conn.RemoteAddr().String())
return &TCPPeer{
conn: conn,
send: make(chan *Message),
endpoint: e,
s: s,
}
}
func (p *TCPPeer) callVersion(msg *Message) {
p.send <- msg
}
// id implements the peer interface
func (p *TCPPeer) id() uint32 {
return p.nonce
}
// endpoint implements the peer interface
func (p *TCPPeer) addr() util.Endpoint {
return p.endpoint
}
// verack implements the peer interface
func (p *TCPPeer) verack() bool {
return p.isVerack
}
// callGetaddr will send the "getaddr" command to the remote.
func (p *TCPPeer) callGetaddr(msg *Message) {
p.send <- msg
}
// disconnect closes the send channel and the underlying connection.
func (p *TCPPeer) disconnect() {
close(p.send)
p.conn.Close()
}
// writeLoop writes messages to the underlying TCP connection.
// A goroutine writeLoop is started for each connection.
// There should be at most one writer to a connection executing
// all writes from this goroutine.
func (p *TCPPeer) writeLoop() {
// clean up the connection.
defer func() {
p.conn.Close()
}()
for {
msg := <-p.send
p.s.logger.Printf("OUT :: %s :: %+v", msg.commandType(), msg.Payload)
// should we disconnect here?
if err := msg.encode(p.conn); err != nil {
p.s.logger.Printf("encode error: %s", err)
}
}
}