From 0c9d2dd04e54fe85336855e6aae2df60f0d90705 Mon Sep 17 00:00:00 2001 From: anthdm Date: Wed, 31 Jan 2018 09:27:08 +0100 Subject: [PATCH] Block message + handle the length of the user agent better. --- pkg/core/.keep | 0 pkg/core/block.go | 33 +++++++++ pkg/core/witness.go | 19 +++++ pkg/network/message.go | 28 +++----- pkg/network/message_test.go | 4 +- pkg/network/payload/version.go | 27 +++----- pkg/network/payload/version_test.go | 4 +- pkg/network/peer.go | 104 ++++++++++++++++++++++++---- pkg/network/server.go | 67 +++++++++--------- pkg/network/server_test.go | 26 +++++++ pkg/network/tcp.go | 2 +- pkg/util/types.go | 18 ++++- 12 files changed, 243 insertions(+), 89 deletions(-) delete mode 100644 pkg/core/.keep create mode 100644 pkg/core/witness.go create mode 100644 pkg/network/server_test.go diff --git a/pkg/core/.keep b/pkg/core/.keep deleted file mode 100644 index e69de29bb..000000000 diff --git a/pkg/core/block.go b/pkg/core/block.go index 67057569d..1b5016a1c 100644 --- a/pkg/core/block.go +++ b/pkg/core/block.go @@ -1,6 +1,9 @@ package core import ( + "encoding/binary" + "io" + . "github.com/anthdm/neo-go/pkg/util" ) @@ -26,3 +29,33 @@ type Block struct { // transaction list Transactions []*Transaction } + +// EncodeBinary encodes the block to the given writer. +func (b *Block) EncodeBinary(w io.Writer) error { + return nil +} + +// DecodeBinary decods the block from the given reader. +func (b *Block) DecodeBinary(r io.Reader) error { + err := binary.Read(r, binary.LittleEndian, &b.Version) + err = binary.Read(r, binary.LittleEndian, &b.PrevBlock) + err = binary.Read(r, binary.LittleEndian, &b.MerkleRoot) + err = binary.Read(r, binary.LittleEndian, &b.Timestamp) + err = binary.Read(r, binary.LittleEndian, &b.Height) + err = binary.Read(r, binary.LittleEndian, &b.Nonce) + err = binary.Read(r, binary.LittleEndian, &b.NextMiner) + err = binary.Read(r, binary.LittleEndian, &b._sep) + var n uint8 + err = binary.Read(r, binary.LittleEndian, &n) + err = binary.Read(r, binary.LittleEndian, &n) + + // txs := make([]byte, n) + // err = binary.Read(r, binary.LittleEndian, &txs) + // err = binary.Read(r, binary.LittleEndian, &n) + // fmt.Println(n) + + return err +} + +// Size implements the payload interface. +func (b *Block) Size() uint32 { return 0 } diff --git a/pkg/core/witness.go b/pkg/core/witness.go new file mode 100644 index 000000000..2b30be5a6 --- /dev/null +++ b/pkg/core/witness.go @@ -0,0 +1,19 @@ +package core + +import "io" + +// Witness ... +type Witness struct { + InvocationScript []byte + VerificationScript []byte +} + +// DecodeBinary implements the payload interface. +func (wit *Witness) DecodeBinary(r io.Reader) error { + return nil +} + +// EncodeBinary implements the payload interface. +func (wit *Witness) EncodeBinary(w io.Writer) error { + return nil +} diff --git a/pkg/network/message.go b/pkg/network/message.go index 3df2e0305..070cec320 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -8,6 +8,7 @@ import ( "fmt" "io" + "github.com/anthdm/neo-go/pkg/core" "github.com/anthdm/neo-go/pkg/network/payload" ) @@ -46,14 +47,6 @@ const ( ) // Message is the complete message send between nodes. -// -// Size Field DataType Description -// ------------------------------------------------------ -// 4 Magic uint32 Protocol ID -// 12 Command char[12] Command -// 4 length uint32 Length of payload -// 4 Checksum uint32 Checksum -// length Payload uint8[length] Content of message type Message struct { Magic NetMode // Command is utf8 code, of which the length is 12 bytes, @@ -93,11 +86,11 @@ func newMessage(magic NetMode, cmd commandType, p payload.Payload) *Message { ) if p != nil { - size = p.Size() buf := new(bytes.Buffer) if err := p.EncodeBinary(buf); err != nil { panic(err) } + size = uint32(buf.Len()) checksum = sumSHA256(sumSHA256(buf.Bytes())) } else { checksum = sumSHA256(sumSHA256([]byte{})) @@ -152,8 +145,6 @@ func (m *Message) decode(r io.Reader) error { binary.Read(r, binary.LittleEndian, &m.Length) binary.Read(r, binary.LittleEndian, &m.Checksum) - fmt.Println(cmdByteArrayToString(m.Command)) - // return if their is no payload. if m.Length == 0 { return nil @@ -163,24 +154,22 @@ func (m *Message) decode(r io.Reader) error { } func (m *Message) decodePayload(r io.Reader) error { - pbuf := make([]byte, m.Length) - n, err := r.Read(pbuf) + buf := make([]byte, m.Length) + n, err := r.Read(buf) if err != nil { return err } - fmt.Printf("The length of the payload is %d\n", n) - if uint32(n) != m.Length { return fmt.Errorf("expected to have read exactly %d bytes got %d", m.Length, n) } // Compare the checksum of the payload. - if !compareChecksum(m.Checksum, pbuf) { + if !compareChecksum(m.Checksum, buf) { return errChecksumMismatch } - r = bytes.NewReader(pbuf) + r = bytes.NewReader(buf) var p payload.Payload switch m.commandType() { case cmdVersion: @@ -198,6 +187,11 @@ func (m *Message) decodePayload(r io.Reader) error { if err := p.DecodeBinary(r); err != nil { return err } + case cmdBlock: + p = &core.Block{} + if err := p.DecodeBinary(r); err != nil { + return err + } } m.Payload = p diff --git a/pkg/network/message_test.go b/pkg/network/message_test.go index ae443b149..4b4e5608f 100644 --- a/pkg/network/message_test.go +++ b/pkg/network/message_test.go @@ -33,7 +33,7 @@ func TestMessageEncodeDecode(t *testing.T) { } func TestMessageEncodeDecodeWithVersion(t *testing.T) { - p := payload.NewVersion(12227, 2000, "./neo:2.6.0/", 0, true) + p := payload.NewVersion(12227, 2000, "/neo:2.6.0/", 0, true) m := newMessage(ModeTestNet, cmdVersion, p) buf := new(bytes.Buffer) @@ -52,7 +52,7 @@ func TestMessageEncodeDecodeWithVersion(t *testing.T) { } func TestMessageInvalidChecksum(t *testing.T) { - p := payload.NewVersion(1111, 3000, "./NEO:2.6.0/", 0, true) + p := payload.NewVersion(1111, 3000, "/NEO:2.6.0/", 0, true) m := newMessage(ModeTestNet, cmdVersion, p) m.Checksum = 1337 diff --git a/pkg/network/payload/version.go b/pkg/network/payload/version.go index 329d920b7..7656f3e92 100644 --- a/pkg/network/payload/version.go +++ b/pkg/network/payload/version.go @@ -5,10 +5,7 @@ import ( "io" ) -const ( - lenUA = 12 - minVersionSize = 27 -) +const minVersionSize = 27 // Version payload. type Version struct { @@ -23,7 +20,7 @@ type Version struct { // it's used to distinguish the node from public IP Nonce uint32 // client id - UserAgent [lenUA]byte + UserAgent []byte // Height of the block chain StartHeight uint32 // Whether to receive and forward @@ -38,7 +35,7 @@ func NewVersion(id uint32, p uint16, ua string, h uint32, r bool) *Version { Timestamp: 12345, Port: p, Nonce: id, - UserAgent: uaToByteArray(ua), + UserAgent: []byte(ua), StartHeight: 0, Relay: r, } @@ -46,16 +43,17 @@ func NewVersion(id uint32, p uint16, ua string, h uint32, r bool) *Version { // DecodeBinary implements the Payload interface. func (p *Version) DecodeBinary(r io.Reader) error { - // TODO: Length of the user agent should be calculated dynamicaly. - // There is no information about the size or format of this. - // the only thing we know is by looking at the #c source code. - // /NEO:{0}/ => /NEO:2.6.0/ err := binary.Read(r, binary.LittleEndian, &p.Version) err = binary.Read(r, binary.LittleEndian, &p.Services) err = binary.Read(r, binary.LittleEndian, &p.Timestamp) err = binary.Read(r, binary.LittleEndian, &p.Port) err = binary.Read(r, binary.LittleEndian, &p.Nonce) + + var lenUA uint8 + err = binary.Read(r, binary.LittleEndian, &lenUA) + p.UserAgent = make([]byte, lenUA) err = binary.Read(r, binary.LittleEndian, &p.UserAgent) + err = binary.Read(r, binary.LittleEndian, &p.StartHeight) err = binary.Read(r, binary.LittleEndian, &p.Relay) @@ -69,6 +67,7 @@ func (p *Version) EncodeBinary(w io.Writer) error { err = binary.Write(w, binary.LittleEndian, p.Timestamp) err = binary.Write(w, binary.LittleEndian, p.Port) err = binary.Write(w, binary.LittleEndian, p.Nonce) + err = binary.Write(w, binary.LittleEndian, uint8(len(p.UserAgent))) err = binary.Write(w, binary.LittleEndian, p.UserAgent) err = binary.Write(w, binary.LittleEndian, p.StartHeight) err = binary.Write(w, binary.LittleEndian, p.Relay) @@ -80,11 +79,3 @@ func (p *Version) EncodeBinary(w io.Writer) error { func (p *Version) Size() uint32 { return uint32(minVersionSize + len(p.UserAgent)) } - -func uaToByteArray(ua string) [lenUA]byte { - buf := [lenUA]byte{} - for i := 0; i < lenUA; i++ { - buf[i] = ua[i] - } - return buf -} diff --git a/pkg/network/payload/version_test.go b/pkg/network/payload/version_test.go index 6eac5b645..098a5d05f 100644 --- a/pkg/network/payload/version_test.go +++ b/pkg/network/payload/version_test.go @@ -7,7 +7,7 @@ import ( ) func TestVersionEncodeDecode(t *testing.T) { - version := NewVersion(13337, 3000, "./NEO:0.0.1/", 0, true) + version := NewVersion(13337, 3000, "/NEO:0.0.1/", 0, true) buf := new(bytes.Buffer) if err := version.EncodeBinary(buf); err != nil { @@ -24,6 +24,6 @@ func TestVersionEncodeDecode(t *testing.T) { } if version.Size() != uint32(minVersionSize+len(version.UserAgent)) { - t.Fatalf("Expected version size of %d", minVersionSize+lenUA) + t.Fatalf("Expected version size of %d", minVersionSize+len(version.UserAgent)) } } diff --git a/pkg/network/peer.go b/pkg/network/peer.go index 544b7cfe2..830844cad 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -1,49 +1,123 @@ package network import ( + "fmt" "log" "net" "github.com/anthdm/neo-go/pkg/util" ) -// Peer represents a remote node, backed by TCP transport. -type Peer struct { - id uint32 +// Peer is the local representation of a remote node. It's an interface that may +// be backed by any concrete transport: local, HTTP, tcp. +type Peer interface { + id() uint32 + endpoint() util.Endpoint + send(*Message) + verack() bool + verify(uint32) + disconnect() +} + +// LocalPeer is a peer without any transport, mainly used for testing. +type LocalPeer struct { + _id uint32 + _verack bool + _endpoint util.Endpoint + _send chan *Message +} + +// NewLocalPeer return a LocalPeer. +func NewLocalPeer() *LocalPeer { + e, _ := util.EndpointFromString("1.1.1.1:1111") + return &LocalPeer{_endpoint: e} +} + +func (p *LocalPeer) id() uint32 { return p._id } +func (p *LocalPeer) verack() bool { return p._verack } +func (p *LocalPeer) endpoint() util.Endpoint { return p._endpoint } +func (p *LocalPeer) disconnect() {} + +func (p *LocalPeer) send(msg *Message) { + p._send <- msg +} + +func (p *LocalPeer) verify(id uint32) { + fmt.Println(id) + p._verack = true + p._id = id +} + +// TCPPeer represents a remote node, backed by TCP transport. +type TCPPeer struct { + _id uint32 // underlying TCP connection conn net.Conn // host and port information about this peer. - endpoint util.Endpoint + _endpoint util.Endpoint // channel to coordinate messages writen back to the connection. - send chan *Message + _send chan *Message // whether this peers version was acknowledged. - verack bool + _verack bool } -// NewPeer returns a (TCP) Peer. -func NewPeer(conn net.Conn) *Peer { +// NewTCPPeer returns a pointer to a TCP Peer. +func NewTCPPeer(conn net.Conn) *TCPPeer { e, _ := util.EndpointFromString(conn.RemoteAddr().String()) - return &Peer{ - conn: conn, - send: make(chan *Message), - endpoint: e, + return &TCPPeer{ + conn: conn, + _send: make(chan *Message), + _endpoint: e, } } +// id implements the peer interface +func (p *TCPPeer) id() uint32 { + return p._id +} + +// endpoint implements the peer interface +func (p *TCPPeer) endpoint() util.Endpoint { + return p._endpoint +} + +// verack implements the peer interface +func (p *TCPPeer) verack() bool { + return p._verack +} + +// verify implements the peer interface +func (p *TCPPeer) verify(id uint32) { + p._id = id + p._verack = true +} + +// send implements the peer interface +func (p *TCPPeer) send(msg *Message) { + p._send <- msg +} + +func (p *TCPPeer) disconnect() { + close(p._send) + p.conn.Close() +} + // writeLoop writes messages to the underlying TCP connection. // A goroutine writeLoop is started for each connection. // There should be at most one writer to a connection executing // all writes from this goroutine. -func (p *Peer) writeLoop() { +func (p *TCPPeer) writeLoop() { // clean up the connection. defer func() { p.conn.Close() }() for { - msg := <-p.send - rpcLogger.Printf("OUT :: %s", msg.commandType()) + msg := <-p._send + + rpcLogger.Printf("[SERVER] :: OUT :: %s :: %+v", msg.commandType(), msg.Payload) + if err := msg.encode(p.conn); err != nil { log.Printf("encode error: %s", err) } diff --git a/pkg/network/server.go b/pkg/network/server.go index ceba713b2..56e71d0e5 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -7,9 +7,9 @@ import ( "net" "os" "strconv" - "sync" "time" + "github.com/anthdm/neo-go/pkg/core" "github.com/anthdm/neo-go/pkg/network/payload" "github.com/anthdm/neo-go/pkg/util" ) @@ -28,7 +28,7 @@ var ( ) type messageTuple struct { - peer *Peer + peer Peer msg *Message } @@ -36,8 +36,6 @@ type messageTuple struct { type Server struct { logger *log.Logger - mtx sync.RWMutex - // id of the server id uint32 @@ -51,10 +49,10 @@ type Server struct { // Or 56753 to work with the docker privnet. net NetMode // map that holds all connected peers to this server. - peers map[*Peer]bool + peers map[Peer]bool - register chan *Peer - unregister chan *Peer + register chan Peer + unregister chan Peer // channel for coordinating messages. message chan messageTuple @@ -79,11 +77,11 @@ func NewServer(net NetMode) *Server { s := &Server{ id: util.RandUint32(1111111, 9999999), - userAgent: fmt.Sprintf("\v/NEO:%s/", version), + userAgent: fmt.Sprintf("/NEO:%s/", version), logger: logger, - peers: make(map[*Peer]bool), - register: make(chan *Peer), - unregister: make(chan *Peer), + peers: make(map[Peer]bool), + register: make(chan Peer), + unregister: make(chan Peer), message: make(chan messageTuple), relay: true, net: net, @@ -137,17 +135,16 @@ func (s *Server) loop() { // When a new connection is been established, (by this server or remote node) // its peer will be received on this channel. // Any peer registration must happen via this channel. - s.logger.Printf("peer registered from address %s", peer.conn.RemoteAddr()) + s.logger.Printf("peer registered from address %s", peer.endpoint()) s.peers[peer] = true s.handlePeerConnected(peer) case peer := <-s.unregister: // unregister should take care of all the cleanup that has to be made. if _, ok := s.peers[peer]; ok { - peer.conn.Close() - close(peer.send) + peer.disconnect() delete(s.peers, peer) - s.logger.Printf("peer %s disconnected", peer.conn.RemoteAddr()) + s.logger.Printf("peer %s disconnected", peer.endpoint()) } case tuple := <-s.message: @@ -166,14 +163,14 @@ func (s *Server) loop() { } // processMessage processes the message received from the peer. -func (s *Server) processMessage(msg *Message, peer *Peer) error { +func (s *Server) processMessage(msg *Message, peer Peer) error { command := msg.commandType() - rpcLogger.Printf("[NODE %d] :: IN :: %s :: %+v", peer.id, command, msg.Payload) + rpcLogger.Printf("[NODE %d] :: IN :: %s :: %+v", peer.id(), command, msg.Payload) // Disconnect if the remote is sending messages other then version // if we didn't verack this peer. - if !peer.verack && command != cmdVersion { + if !peer.verack() && command != cmdVersion { return errors.New("version noack") } @@ -192,6 +189,7 @@ func (s *Server) processMessage(msg *Message, peer *Peer) error { return s.handleInvCmd(msg.Payload.(*payload.Inventory), peer) case cmdGetData: case cmdBlock: + return s.handleBlockCmd(msg.Payload.(*core.Block), peer) case cmdTX: case cmdConsensus: default: @@ -204,29 +202,29 @@ func (s *Server) processMessage(msg *Message, peer *Peer) error { // When a new peer is connected we send our version. // No further communication should be made before both sides has received // the versions of eachother. -func (s *Server) handlePeerConnected(peer *Peer) { +func (s *Server) handlePeerConnected(peer Peer) { // TODO get heigth of block when thats implemented. payload := payload.NewVersion(s.id, s.port, s.userAgent, 0, s.relay) msg := newMessage(s.net, cmdVersion, payload) - peer.send <- msg + peer.send(msg) } // Version declares the server's version. -func (s *Server) handleVersionCmd(v *payload.Version, peer *Peer) error { +func (s *Server) handleVersionCmd(v *payload.Version, peer Peer) error { if s.id == v.Nonce { return errors.New("remote nonce equal to server id") } - if peer.endpoint.Port != v.Port { + + if peer.endpoint().Port != v.Port { return errors.New("port mismatch") } // we respond with a verack, we successfully received peer's version // at this point. - peer.verack = true - peer.id = v.Nonce + peer.verify(v.Nonce) verackMsg := newMessage(s.net, cmdVerack, nil) - peer.send <- verackMsg + peer.send(verackMsg) go s.sendLoop(peer) @@ -234,7 +232,7 @@ func (s *Server) handleVersionCmd(v *payload.Version, peer *Peer) error { } // When the remote node reveals its known peers we try to connect to all of them. -func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer *Peer) error { +func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer Peer) error { for _, addr := range addrList.Addrs { if !s.peerAlreadyConnected(addr.Addr) { go connectToRemoteNode(s, addr.Addr.String()) @@ -243,7 +241,7 @@ func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer *Peer) error return nil } -func (s *Server) handleInvCmd(inv *payload.Inventory, peer *Peer) error { +func (s *Server) handleInvCmd(inv *payload.Inventory, peer Peer) error { if !inv.Type.Valid() { return fmt.Errorf("invalid inventory type: %s", inv.Type) } @@ -254,11 +252,16 @@ func (s *Server) handleInvCmd(inv *payload.Inventory, peer *Peer) error { payload := payload.NewInventory(inv.Type, inv.Hashes) msg := newMessage(s.net, cmdGetData, payload) - peer.send <- msg + peer.send(msg) return nil } +func (s *Server) handleBlockCmd(block *core.Block, peer Peer) error { + fmt.Println("received a block yyyyyyeeeeeehhhhh!") + return nil +} + func (s *Server) peerAlreadyConnected(addr net.Addr) bool { // TODO: check for race conditions //s.mtx.RLock() @@ -267,7 +270,7 @@ func (s *Server) peerAlreadyConnected(addr net.Addr) bool { // What about ourself ^^ for peer := range s.peers { - if peer.conn.RemoteAddr().String() == addr.String() { + if peer.endpoint().String() == addr.String() { return true } } @@ -282,13 +285,13 @@ func (s *Server) handleGetAddrCmd(msg *Message, peer *Peer) error { return nil } -func (s *Server) sendLoop(peer *Peer) { +func (s *Server) sendLoop(peer Peer) { // TODO: check if this peer is still connected. for { getaddrMsg := newMessage(s.net, cmdGetAddr, nil) - peer.send <- getaddrMsg + peer.send(getaddrMsg) - time.Sleep(10 * time.Second) + time.Sleep(120 * time.Second) } } diff --git a/pkg/network/server_test.go b/pkg/network/server_test.go new file mode 100644 index 000000000..5445c3168 --- /dev/null +++ b/pkg/network/server_test.go @@ -0,0 +1,26 @@ +package network + +import ( + "testing" +) + +func TestHandleVersion(t *testing.T) { + // s := NewServer(ModeDevNet) + // go s.Start(":3000", nil) + + // p := NewLocalPeer() + // s.register <- p + + // version := payload.NewVersion(1337, p.endpoint().Port, "/NEO:0.0.0/.", 0, true) + // s.handleVersionCmd(version, p) + + // if len(s.peers) != 1 { + // t.Fatalf("expecting the server to have %d peers got %d", 1, len(s.peers)) + // } + // if p.id() != 1337 { + // t.Fatalf("expecting peer's id to be %d got %d", 1337, p._id) + // } + // if !p.verack() { + // t.Fatal("expecting peer to be verified") + // } +} diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go index e05cefbbb..de92a7205 100644 --- a/pkg/network/tcp.go +++ b/pkg/network/tcp.go @@ -40,7 +40,7 @@ func connectToSeeds(s *Server, addrs []string) { } func handleConnection(s *Server, conn net.Conn) { - peer := NewPeer(conn) + peer := NewTCPPeer(conn) s.register <- peer // remove the peer from connected peers and cleanup the connection. diff --git a/pkg/util/types.go b/pkg/util/types.go index 51588988e..5480e8155 100644 --- a/pkg/util/types.go +++ b/pkg/util/types.go @@ -33,10 +33,10 @@ func (u *Uint256) UnmarshalBinary(b []byte) error { return nil } -// ToSlice return a byte slice of u. +// ToSlice returns a byte slice of u. func (u Uint256) ToSlice() []byte { b := make([]byte, 32) - for i := 0; i < 32; i++ { + for i := 0; i < len(b); i++ { b[i] = byte(u[i]) } return b @@ -48,3 +48,17 @@ func (u Uint256) String() string { // Uint160 is a 20 byte long unsigned integer type Uint160 [20]uint8 + +// ToSlice returns a byte slice of u. +func (u Uint160) ToSlice() []byte { + b := make([]byte, 20) + for i := 0; i < len(b); i++ { + b[i] = byte(u[i]) + } + return b +} + +// String implements the stringer interface. +func (u Uint160) String() string { + return hex.EncodeToString(u.ToSlice()) +}