From 536a499236b9c94dfe04db87c41521253b2507cf Mon Sep 17 00:00:00 2001 From: anthdm Date: Fri, 26 Jan 2018 19:04:13 +0100 Subject: [PATCH] initial commit. --- LICENCE.md | 7 + README.md | 1 + cmd/neoserver/main.go | 31 ++++ pkg/core/.keep | 0 pkg/network/message.go | 274 ++++++++++++++++++++++++++++++++++++ pkg/network/message_test.go | 73 ++++++++++ pkg/network/peer.go | 42 ++++++ pkg/network/server.go | 206 +++++++++++++++++++++++++++ pkg/network/tcp.go | 71 ++++++++++ pkg/vm/.keep | 0 10 files changed, 705 insertions(+) create mode 100644 LICENCE.md create mode 100644 README.md create mode 100644 cmd/neoserver/main.go create mode 100644 pkg/core/.keep create mode 100644 pkg/network/message.go create mode 100644 pkg/network/message_test.go create mode 100644 pkg/network/peer.go create mode 100644 pkg/network/server.go create mode 100644 pkg/network/tcp.go create mode 100644 pkg/vm/.keep diff --git a/LICENCE.md b/LICENCE.md new file mode 100644 index 000000000..2d25c82b4 --- /dev/null +++ b/LICENCE.md @@ -0,0 +1,7 @@ +Copyright (c) 2018 Anthony De Meulemeester + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 000000000..aa1da153e --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# GO-NEO \ No newline at end of file diff --git a/cmd/neoserver/main.go b/cmd/neoserver/main.go new file mode 100644 index 000000000..bcf5e9a87 --- /dev/null +++ b/cmd/neoserver/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "flag" + "strings" + + "github.com/anthdm/neo-go/pkg/network" +) + +var ( + port = flag.String("port", ":3000", "port the TCP listener will listen on.") + seed = flag.String("seed", "", "initial seed servers.") +) + +// Simple dirty and quick bootstrapping for the sake of development. +// e.g run 2 nodes: +// neoserver -port :4000 +// neoserver -port :3000 -seed 127.0.0.1:4000 +func main() { + flag.Parse() + + s := network.NewServer(network.ModeTestNet) + seeds := strings.Split(*seed, ",") + if len(seeds) == 0 { + seeds = []string{*seed} + } + if *seed == "" { + seeds = []string{} + } + s.Start(*port, seeds) +} diff --git a/pkg/core/.keep b/pkg/core/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/network/message.go b/pkg/network/message.go new file mode 100644 index 000000000..ac1e39367 --- /dev/null +++ b/pkg/network/message.go @@ -0,0 +1,274 @@ +package network + +import ( + "bytes" + "crypto/sha256" + "encoding/binary" + "errors" + "io" +) + +// Values used for the magic field, according to the docs. +const ( + ModeMainNet = 0x00746e41 // 7630401 + ModeTestNet = 0x74746e41 // 1953787457 + // ModeDevNet = 0xDEADBEAF +) + +// Message is the complete message send between nodes. +// +// Size Field DataType Description +// ------------------------------------------------------ +// 4 Magic uint32 Protocol ID +// 12 Command char[12] Command +// 4 length uint32 Length of payload +// 4 Checksum uint32 Checksum +// length Payload uint8[length] Content of message +type Message struct { + Magic uint32 + // Command is utf8 code, of which the length is 12 bytes, + // the extra part is filled with 0. + Command []byte + // Length of the payload + Length uint32 + // Checksum is the first 4 bytes of the value that two times SHA256 + // hash of the payload + Checksum uint32 + // Payload send with the message. + Payload []byte +} + +type commandType string + +// valid commands used to send between nodes. +const ( + cmdVersion commandType = "version" + cmdVerack = "verack" + cmdGetAddr = "getaddr" + cmdAddr = "addr" + cmdGetHeaders = "getheaders" + cmdHeaders = "headers" + cmdGetBlocks = "getblocks" + cmdInv = "inv" + cmdGetData = "getdata" + cmdBlock = "block" + cmdTX = "tx" +) + +func newMessage(magic uint32, cmd commandType, payload []byte) *Message { + sum := sumSHA256(sumSHA256(payload))[:4] + sumuint32 := binary.LittleEndian.Uint32(sum) + + return &Message{ + Magic: magic, + Command: cmdToByteSlice(cmd), + Length: uint32(len(payload)), + Checksum: sumuint32, + Payload: payload, + } +} + +// Converts the 12 byte command slice to a commandType. +func (m *Message) commandType() commandType { + cmd := string(bytes.TrimRight(m.Command, "\x00")) + switch cmd { + case "version": + return cmdVersion + case "verack": + return cmdVerack + case "addr": + return cmdAddr + case "getheaders": + return cmdGetHeaders + case "header": + return cmdHeaders + case "getblocks": + return cmdGetBlocks + case "inv": + return cmdInv + case "getdata": + return cmdGetData + case "block": + return cmdBlock + case "tx": + return cmdTX + default: + return "" + } +} + +// decode a Message from the given reader. +func (m *Message) decode(r io.Reader) error { + // 24 bytes for the fixed sized fields. + buf := make([]byte, 24) + if _, err := r.Read(buf); err != nil { + return err + } + + m.Magic = binary.LittleEndian.Uint32(buf[0:4]) + m.Command = buf[4:16] + m.Length = binary.LittleEndian.Uint32(buf[16:20]) + m.Checksum = binary.LittleEndian.Uint32(buf[20:24]) + + payload := make([]byte, m.Length) + if _, err := r.Read(payload); err != nil { + return err + } + + // Compare the checksum of the payload. + if !compareChecksum(m.Checksum, payload) { + return errors.New("checksum mismatch error") + } + + m.Payload = payload + + return nil +} + +// encode a Message to any given io.Writer. +func (m *Message) encode(w io.Writer) error { + // 24 bytes for the fixed sized fields + the length of the payload. + buf := make([]byte, 24+m.Length) + + binary.LittleEndian.PutUint32(buf[0:4], m.Magic) + copy(buf[4:16], m.Command) + binary.LittleEndian.PutUint32(buf[16:20], m.Length) + binary.LittleEndian.PutUint32(buf[20:24], m.Checksum) + copy(buf[24:len(buf)], m.Payload) + + _, err := w.Write(buf) + return err +} + +func (m *Message) decodePayload() (interface{}, error) { + switch m.commandType() { + case cmdVersion: + v := &Version{} + if err := v.decode(m.Payload); err != nil { + return nil, err + } + return v, nil + } + return nil, nil +} + +// Version payload description. +// +// Size Field DataType Description +// --------------------------------------------------------------------------------------------- +// 4 Version uint32 Version of protocol, 0 for now +// 8 Services uint64 The service provided by the node is currently 1 +// 4 Timestamp uint32 Current time +// 2 Port uint16 Port that the server is listening on, it's 0 if not used. +// 4 Nonce uint32 It's used to distinguish the node from public IP +// ? UserAgent varstr Client ID +// 4 StartHeight uint32 Height of block chain +// 1 Relay bool Whether to receive and forward +type Version struct { + // currently the version of the protocol is 0 + Version uint32 + // currently 1 + Services uint64 + // timestamp + Timestamp uint32 + // port this server is listening on + Port uint16 + // it's used to distinguish the node from public IP + Nonce uint32 + // client id + UserAgent []byte // ? + // Height of the block chain + StartHeight uint32 + // Whether to receive and forward + Relay bool +} + +func newVersionPayload(p uint16, ua string, h uint32, r bool) *Version { + return &Version{ + Version: 0, + Services: 1, + Timestamp: 12345, + Port: p, + Nonce: 1911099534, + UserAgent: []byte(ua), + StartHeight: 0, + Relay: r, + } +} + +func (p *Version) decode(b []byte) error { + // Fixed fields have a total of 27 bytes. We substract this size + // with the total buffer length to know the length of the user agent. + lenUA := len(b) - 27 + + p.Version = binary.LittleEndian.Uint32(b[0:4]) + p.Services = binary.LittleEndian.Uint64(b[4:12]) + p.Timestamp = binary.LittleEndian.Uint32(b[12:16]) + // FIXME: port's byteorder should be big endian according to the docs. + // but when connecting to the privnet docker image it's little endian. + p.Port = binary.LittleEndian.Uint16(b[16:18]) + p.Nonce = binary.LittleEndian.Uint32(b[18:22]) + p.UserAgent = b[22 : 22+lenUA] + curlen := 22 + lenUA + p.StartHeight = binary.LittleEndian.Uint32(b[curlen : curlen+4]) + p.Relay = b[len(b)-1 : len(b)][0] == 1 + + return nil +} + +func (p *Version) encode() ([]byte, error) { + // 27 bytes for the fixed size fields + the length of the user agent + // which is kinda variable, according to the docs. + buf := make([]byte, 27+len(p.UserAgent)) + + binary.LittleEndian.PutUint32(buf[0:4], p.Version) + binary.LittleEndian.PutUint64(buf[4:12], p.Services) + binary.LittleEndian.PutUint32(buf[12:16], p.Timestamp) + // FIXME: byte order (little / big)? + binary.LittleEndian.PutUint16(buf[16:18], p.Port) + binary.LittleEndian.PutUint32(buf[18:22], p.Nonce) + copy(buf[22:22+len(p.UserAgent)], p.UserAgent) // + curLen := 22 + len(p.UserAgent) + binary.LittleEndian.PutUint32(buf[curLen:curLen+4], p.StartHeight) + + // yikes + var b []byte + if p.Relay { + b = []byte{1} + } else { + b = []byte{0} + } + + copy(buf[curLen+4:len(buf)], b) + + return buf, nil +} + +// convert a command (string) to a byte slice filled with 0 bytes till +// size 12. +func cmdToByteSlice(cmd commandType) []byte { + cmdLen := len(cmd) + if cmdLen > 12 { + panic("exceeded command max length of size 12") + } + + // The command can have max 12 bytes, rest is filled with 0. + b := []byte(cmd) + for i := 0; i < 12-cmdLen; i++ { + b = append(b, byte('\x00')) + } + + return b +} + +func sumSHA256(b []byte) []byte { + h := sha256.New() + h.Write(b) + return h.Sum(nil) +} + +func compareChecksum(have uint32, b []byte) bool { + sum := sumSHA256(sumSHA256(b))[:4] + want := binary.LittleEndian.Uint32(sum) + return have == want +} diff --git a/pkg/network/message_test.go b/pkg/network/message_test.go new file mode 100644 index 000000000..88a5aa228 --- /dev/null +++ b/pkg/network/message_test.go @@ -0,0 +1,73 @@ +package network + +import ( + "bytes" + "encoding/binary" + "reflect" + "testing" +) + +func TestNewMessage(t *testing.T) { + payload := []byte{} + m := newMessage(ModeTestNet, "version", payload) + + if have, want := m.Length, uint32(0); want != have { + t.Errorf("want %d have %d", want, have) + } + if have, want := len(m.Command), 12; want != have { + t.Errorf("want %d have %d", want, have) + } + + sum := sumSHA256(sumSHA256(payload))[:4] + sumuint32 := binary.LittleEndian.Uint32(sum) + if have, want := m.Checksum, sumuint32; want != have { + t.Errorf("want %d have %d", want, have) + } +} +func TestMessageEncodeDecode(t *testing.T) { + m := newMessage(ModeTestNet, "version", []byte{}) + + buf := &bytes.Buffer{} + if err := m.encode(buf); err != nil { + t.Error(err) + } + + md := &Message{} + if err := md.decode(buf); err != nil { + t.Error(err) + } + if !reflect.DeepEqual(m, md) { + t.Errorf("both messages should be equal: %v != %v", m, md) + } +} + +func TestMessageInvalidChecksum(t *testing.T) { + m := newMessage(ModeTestNet, "version", []byte{}) + m.Checksum = 1337 + + buf := &bytes.Buffer{} + if err := m.encode(buf); err != nil { + t.Error(err) + } + + md := &Message{} + if err := md.decode(buf); err == nil { + t.Error("decode should failed with checkum mismatch error") + } +} + +func TestNewVersionPayload(t *testing.T) { + p := newVersionPayload(3000, "/neo/", 0, true) + b, err := p.encode() + if err != nil { + t.Fatal(err) + } + + pd := &Version{} + if err := pd.decode(b); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(p, pd) { + t.Errorf("both payloads should be equal: %v != %v", p, pd) + } +} diff --git a/pkg/network/peer.go b/pkg/network/peer.go new file mode 100644 index 000000000..2dced35a6 --- /dev/null +++ b/pkg/network/peer.go @@ -0,0 +1,42 @@ +package network + +import ( + "log" + "net" +) + +// Peer represents a remote node, backed by TCP transport. +type Peer struct { + // underlaying TCP connection + conn net.Conn + // channel to coordinate message writes back to the connection. + send chan *Message + // verack is true if this node has sended it's version. + verack bool +} + +// NewPeer returns a (TCP) Peer. +func NewPeer(conn net.Conn) *Peer { + return &Peer{ + conn: conn, + send: make(chan *Message), + } +} + +// writeLoop writes messages to the underlying TCP connection. +// A goroutine writeLoop is started for each connection. +// There should be at most one writer to a connection executing +// all writes from this goroutine. +func (p *Peer) writeLoop() { + // clean up the connection. + defer func() { + p.conn.Close() + }() + + for { + msg := <-p.send + if err := msg.encode(p.conn); err != nil { + log.Printf("encode error: %s", err) + } + } +} diff --git a/pkg/network/server.go b/pkg/network/server.go new file mode 100644 index 000000000..b83d4a518 --- /dev/null +++ b/pkg/network/server.go @@ -0,0 +1,206 @@ +package network + +import ( + "errors" + "fmt" + "log" + "net" + "os" +) + +const ( + version = "0.0.1" + portMainNet = 10333 + portTestNet = 20333 + // make sure we can run a server without consuming + // docker privnet ports. + portDevNet = 3000 +) + +type messageTuple struct { + peer *Peer + msg *Message +} + +// Server is the representation of a full working NEO TCP node. +type Server struct { + logger *log.Logger + + // userAgent of the server. + userAgent string + // The "magic" mode the server is currently running on. + // This can either be 0x00746e41 or 0x74746e41 for main or test net. + netMode uint32 + // map that holds all connected peers to this server. + peers map[*Peer]bool + + register chan *Peer + unregister chan *Peer + + // channel for coordinating messages. + message chan messageTuple + + // channel used to gracefull shutdown the server. + quit chan struct{} + + // Whether this server will receive and forward messages. + relay bool + + // TCP listener of the server + listener net.Listener + + dev bool +} + +// NewServer returns a pointer to a new server. +func NewServer(mode uint32) *Server { + logger := log.New(os.Stdout, "NEO SERVER :: ", 0) + + if mode != ModeTestNet && mode != ModeMainNet { + logger.Fatalf("invalid network mode %d", mode) + } + + s := &Server{ + userAgent: fmt.Sprintf("/NEO:%s/", version), + logger: logger, + peers: make(map[*Peer]bool), + register: make(chan *Peer), + unregister: make(chan *Peer), + message: make(chan messageTuple), + relay: true, + netMode: mode, + quit: make(chan struct{}), + } + + return s +} + +// Start run's the server. +func (s *Server) Start(port string, seeds []string) { + fmt.Println(logo()) + s.logger.Printf("running %s on %s - relay: %v", s.userAgent, "testnet", s.relay) + + go listenTCP(s, port) + + if len(seeds) > 0 { + connectToSeeds(s, seeds) + } + + s.loop() +} + +// Stop the server, attemping a gracefull shutdown. +func (s *Server) Stop() { s.quit <- struct{}{} } + +// shutdown the server, disconnecting all peers. +func (s *Server) shutdown() { + s.logger.Println("attemping a quitefull shutdown.") + s.listener.Close() + + // disconnect and remove all connected peers. + for peer := range s.peers { + peer.conn.Close() + s.unregister <- peer + } +} + +func (s *Server) loop() { + for { + select { + case peer := <-s.register: + s.logger.Printf("peer registered from address %s", peer.conn.RemoteAddr()) + resp, err := s.handlePeerConnected() + if err != nil { + s.logger.Fatalf("handling initial peer connection failed: %s", err) + } + peer.send <- resp + case peer := <-s.unregister: + s.logger.Printf("peer %s disconnected", peer.conn.RemoteAddr()) + case tuple := <-s.message: + s.logger.Printf("new incomming message %s", string(tuple.msg.Command)) + if err := s.processMessage(tuple.msg, tuple.peer); err != nil { + s.logger.Fatalf("failed to process message: %s", err) + } + case <-s.quit: + s.shutdown() + } + } +} + +// TODO: unregister peers on error. +// processMessage processes the received message from a remote node. +func (s *Server) processMessage(msg *Message, peer *Peer) error { + switch msg.commandType() { + case cmdVersion: + v, _ := msg.decodePayload() + resp, err := s.handleVersionCmd(v.(*Version)) + if err != nil { + return err + } + peer.send <- resp + case cmdVerack: + case cmdGetAddr: + case cmdAddr: + case cmdGetHeaders: + case cmdHeaders: + case cmdGetBlocks: + case cmdInv: + case cmdGetData: + case cmdBlock: + case cmdTX: + default: + return errors.New("invalid RPC command received: " + string(msg.commandType())) + } + + return nil +} + +// When a new peer is connected we respond with the version command. +// No further communication should been made before both sides has received +// the version of eachother. +func (s *Server) handlePeerConnected() (*Message, error) { + payload := newVersionPayload(s.port(), s.userAgent, 0, s.relay) + b, err := payload.encode() + if err != nil { + return nil, err + } + msg := newMessage(ModeTestNet, cmdVersion, b) + return msg, nil +} + +// Version declares the server's version when a new connection is been made. +// We respond with a instant "verack" message. +func (s *Server) handleVersionCmd(v *Version) (*Message, error) { + // TODO: check version and verify to trust that node. + + // Empty payload for the verack message. + fmt.Printf("%+v\n", v) + + msg := newMessage(s.netMode, cmdVerack, nil) + return msg, nil +} + +func (s *Server) port() uint16 { + if s.dev { + return portDevNet + } + if s.netMode == ModeMainNet { + return portMainNet + } + if s.netMode == ModeTestNet { + return portTestNet + } + + s.logger.Fatalf("the server dont know what ports it running, yikes.") + return 0 +} + +func logo() string { + return ` + _ ____________ __________ + / | / / ____/ __ \ / ____/ __ \ + / |/ / __/ / / / /_____/ / __/ / / / + / /| / /___/ /_/ /_____/ /_/ / /_/ / +/_/ |_/_____/\____/ \____/\____/ +` +} diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go new file mode 100644 index 000000000..e417cfe14 --- /dev/null +++ b/pkg/network/tcp.go @@ -0,0 +1,71 @@ +package network + +import ( + "fmt" + "io" + "log" + "net" +) + +func listenTCP(s *Server, port string) error { + ln, err := net.Listen("tcp", port) + if err != nil { + return err + } + + for { + conn, err := ln.Accept() + if err != nil { + return err + } + go handleConnection(s, conn) + } +} + +func connectToSeeds(s *Server, addrs []string) { + fmt.Println(len(addrs)) + for _, addr := range addrs { + go func(addr string) { + conn, err := net.Dial("tcp", addr) + if err != nil { + log.Printf("failed to connect to remote node %s: %s", addr, err) + if conn != nil { + conn.Close() + } + return + } + go handleConnection(s, conn) + }(addr) + } +} + +func handleConnection(s *Server, conn net.Conn) { + peer := NewPeer(conn) + s.register <- peer + + // remove the peer from connected peers and cleanup the connection. + defer func() { + s.unregister <- peer + conn.Close() + }() + + // Start a goroutine that will handle all writes to the registered peer. + go peer.writeLoop() + + // Read from the connection and decode it into an RPCMessage and + // tell the server there is message available for proccesing. + for { + msg := &Message{} + if err := msg.decode(conn); err != nil { + // remote connection probably closed. + if err == io.EOF { + s.logger.Printf("conn read error: %s", err) + break + } + // remove this node on any decode errors. + s.logger.Printf("RPC :: decode error %s", err) + break + } + s.message <- messageTuple{peer, msg} + } +} diff --git a/pkg/vm/.keep b/pkg/vm/.keep new file mode 100644 index 000000000..e69de29bb