Block message + handle the length of the user agent better.

This commit is contained in:
anthdm 2018-01-31 09:27:08 +01:00
parent e9f9354b86
commit 0c9d2dd04e
12 changed files with 243 additions and 89 deletions

View file

View file

@ -1,6 +1,9 @@
package core
import (
"encoding/binary"
"io"
. "github.com/anthdm/neo-go/pkg/util"
)
@ -26,3 +29,33 @@ type Block struct {
// transaction list
Transactions []*Transaction
}
// EncodeBinary encodes the block to the given writer.
func (b *Block) EncodeBinary(w io.Writer) error {
return nil
}
// DecodeBinary decods the block from the given reader.
func (b *Block) DecodeBinary(r io.Reader) error {
err := binary.Read(r, binary.LittleEndian, &b.Version)
err = binary.Read(r, binary.LittleEndian, &b.PrevBlock)
err = binary.Read(r, binary.LittleEndian, &b.MerkleRoot)
err = binary.Read(r, binary.LittleEndian, &b.Timestamp)
err = binary.Read(r, binary.LittleEndian, &b.Height)
err = binary.Read(r, binary.LittleEndian, &b.Nonce)
err = binary.Read(r, binary.LittleEndian, &b.NextMiner)
err = binary.Read(r, binary.LittleEndian, &b._sep)
var n uint8
err = binary.Read(r, binary.LittleEndian, &n)
err = binary.Read(r, binary.LittleEndian, &n)
// txs := make([]byte, n)
// err = binary.Read(r, binary.LittleEndian, &txs)
// err = binary.Read(r, binary.LittleEndian, &n)
// fmt.Println(n)
return err
}
// Size implements the payload interface.
func (b *Block) Size() uint32 { return 0 }

19
pkg/core/witness.go Normal file
View file

@ -0,0 +1,19 @@
package core
import "io"
// Witness ...
type Witness struct {
InvocationScript []byte
VerificationScript []byte
}
// DecodeBinary implements the payload interface.
func (wit *Witness) DecodeBinary(r io.Reader) error {
return nil
}
// EncodeBinary implements the payload interface.
func (wit *Witness) EncodeBinary(w io.Writer) error {
return nil
}

View file

@ -8,6 +8,7 @@ import (
"fmt"
"io"
"github.com/anthdm/neo-go/pkg/core"
"github.com/anthdm/neo-go/pkg/network/payload"
)
@ -46,14 +47,6 @@ const (
)
// Message is the complete message send between nodes.
//
// Size Field DataType Description
// ------------------------------------------------------
// 4 Magic uint32 Protocol ID
// 12 Command char[12] Command
// 4 length uint32 Length of payload
// 4 Checksum uint32 Checksum
// length Payload uint8[length] Content of message
type Message struct {
Magic NetMode
// Command is utf8 code, of which the length is 12 bytes,
@ -93,11 +86,11 @@ func newMessage(magic NetMode, cmd commandType, p payload.Payload) *Message {
)
if p != nil {
size = p.Size()
buf := new(bytes.Buffer)
if err := p.EncodeBinary(buf); err != nil {
panic(err)
}
size = uint32(buf.Len())
checksum = sumSHA256(sumSHA256(buf.Bytes()))
} else {
checksum = sumSHA256(sumSHA256([]byte{}))
@ -152,8 +145,6 @@ func (m *Message) decode(r io.Reader) error {
binary.Read(r, binary.LittleEndian, &m.Length)
binary.Read(r, binary.LittleEndian, &m.Checksum)
fmt.Println(cmdByteArrayToString(m.Command))
// return if their is no payload.
if m.Length == 0 {
return nil
@ -163,24 +154,22 @@ func (m *Message) decode(r io.Reader) error {
}
func (m *Message) decodePayload(r io.Reader) error {
pbuf := make([]byte, m.Length)
n, err := r.Read(pbuf)
buf := make([]byte, m.Length)
n, err := r.Read(buf)
if err != nil {
return err
}
fmt.Printf("The length of the payload is %d\n", n)
if uint32(n) != m.Length {
return fmt.Errorf("expected to have read exactly %d bytes got %d", m.Length, n)
}
// Compare the checksum of the payload.
if !compareChecksum(m.Checksum, pbuf) {
if !compareChecksum(m.Checksum, buf) {
return errChecksumMismatch
}
r = bytes.NewReader(pbuf)
r = bytes.NewReader(buf)
var p payload.Payload
switch m.commandType() {
case cmdVersion:
@ -198,6 +187,11 @@ func (m *Message) decodePayload(r io.Reader) error {
if err := p.DecodeBinary(r); err != nil {
return err
}
case cmdBlock:
p = &core.Block{}
if err := p.DecodeBinary(r); err != nil {
return err
}
}
m.Payload = p

View file

@ -33,7 +33,7 @@ func TestMessageEncodeDecode(t *testing.T) {
}
func TestMessageEncodeDecodeWithVersion(t *testing.T) {
p := payload.NewVersion(12227, 2000, "./neo:2.6.0/", 0, true)
p := payload.NewVersion(12227, 2000, "/neo:2.6.0/", 0, true)
m := newMessage(ModeTestNet, cmdVersion, p)
buf := new(bytes.Buffer)
@ -52,7 +52,7 @@ func TestMessageEncodeDecodeWithVersion(t *testing.T) {
}
func TestMessageInvalidChecksum(t *testing.T) {
p := payload.NewVersion(1111, 3000, "./NEO:2.6.0/", 0, true)
p := payload.NewVersion(1111, 3000, "/NEO:2.6.0/", 0, true)
m := newMessage(ModeTestNet, cmdVersion, p)
m.Checksum = 1337

View file

@ -5,10 +5,7 @@ import (
"io"
)
const (
lenUA = 12
minVersionSize = 27
)
const minVersionSize = 27
// Version payload.
type Version struct {
@ -23,7 +20,7 @@ type Version struct {
// it's used to distinguish the node from public IP
Nonce uint32
// client id
UserAgent [lenUA]byte
UserAgent []byte
// Height of the block chain
StartHeight uint32
// Whether to receive and forward
@ -38,7 +35,7 @@ func NewVersion(id uint32, p uint16, ua string, h uint32, r bool) *Version {
Timestamp: 12345,
Port: p,
Nonce: id,
UserAgent: uaToByteArray(ua),
UserAgent: []byte(ua),
StartHeight: 0,
Relay: r,
}
@ -46,16 +43,17 @@ func NewVersion(id uint32, p uint16, ua string, h uint32, r bool) *Version {
// DecodeBinary implements the Payload interface.
func (p *Version) DecodeBinary(r io.Reader) error {
// TODO: Length of the user agent should be calculated dynamicaly.
// There is no information about the size or format of this.
// the only thing we know is by looking at the #c source code.
// /NEO:{0}/ => /NEO:2.6.0/
err := binary.Read(r, binary.LittleEndian, &p.Version)
err = binary.Read(r, binary.LittleEndian, &p.Services)
err = binary.Read(r, binary.LittleEndian, &p.Timestamp)
err = binary.Read(r, binary.LittleEndian, &p.Port)
err = binary.Read(r, binary.LittleEndian, &p.Nonce)
var lenUA uint8
err = binary.Read(r, binary.LittleEndian, &lenUA)
p.UserAgent = make([]byte, lenUA)
err = binary.Read(r, binary.LittleEndian, &p.UserAgent)
err = binary.Read(r, binary.LittleEndian, &p.StartHeight)
err = binary.Read(r, binary.LittleEndian, &p.Relay)
@ -69,6 +67,7 @@ func (p *Version) EncodeBinary(w io.Writer) error {
err = binary.Write(w, binary.LittleEndian, p.Timestamp)
err = binary.Write(w, binary.LittleEndian, p.Port)
err = binary.Write(w, binary.LittleEndian, p.Nonce)
err = binary.Write(w, binary.LittleEndian, uint8(len(p.UserAgent)))
err = binary.Write(w, binary.LittleEndian, p.UserAgent)
err = binary.Write(w, binary.LittleEndian, p.StartHeight)
err = binary.Write(w, binary.LittleEndian, p.Relay)
@ -80,11 +79,3 @@ func (p *Version) EncodeBinary(w io.Writer) error {
func (p *Version) Size() uint32 {
return uint32(minVersionSize + len(p.UserAgent))
}
func uaToByteArray(ua string) [lenUA]byte {
buf := [lenUA]byte{}
for i := 0; i < lenUA; i++ {
buf[i] = ua[i]
}
return buf
}

View file

@ -7,7 +7,7 @@ import (
)
func TestVersionEncodeDecode(t *testing.T) {
version := NewVersion(13337, 3000, "./NEO:0.0.1/", 0, true)
version := NewVersion(13337, 3000, "/NEO:0.0.1/", 0, true)
buf := new(bytes.Buffer)
if err := version.EncodeBinary(buf); err != nil {
@ -24,6 +24,6 @@ func TestVersionEncodeDecode(t *testing.T) {
}
if version.Size() != uint32(minVersionSize+len(version.UserAgent)) {
t.Fatalf("Expected version size of %d", minVersionSize+lenUA)
t.Fatalf("Expected version size of %d", minVersionSize+len(version.UserAgent))
}
}

View file

@ -1,49 +1,123 @@
package network
import (
"fmt"
"log"
"net"
"github.com/anthdm/neo-go/pkg/util"
)
// Peer represents a remote node, backed by TCP transport.
type Peer struct {
id uint32
// Peer is the local representation of a remote node. It's an interface that may
// be backed by any concrete transport: local, HTTP, tcp.
type Peer interface {
id() uint32
endpoint() util.Endpoint
send(*Message)
verack() bool
verify(uint32)
disconnect()
}
// LocalPeer is a peer without any transport, mainly used for testing.
type LocalPeer struct {
_id uint32
_verack bool
_endpoint util.Endpoint
_send chan *Message
}
// NewLocalPeer return a LocalPeer.
func NewLocalPeer() *LocalPeer {
e, _ := util.EndpointFromString("1.1.1.1:1111")
return &LocalPeer{_endpoint: e}
}
func (p *LocalPeer) id() uint32 { return p._id }
func (p *LocalPeer) verack() bool { return p._verack }
func (p *LocalPeer) endpoint() util.Endpoint { return p._endpoint }
func (p *LocalPeer) disconnect() {}
func (p *LocalPeer) send(msg *Message) {
p._send <- msg
}
func (p *LocalPeer) verify(id uint32) {
fmt.Println(id)
p._verack = true
p._id = id
}
// TCPPeer represents a remote node, backed by TCP transport.
type TCPPeer struct {
_id uint32
// underlying TCP connection
conn net.Conn
// host and port information about this peer.
endpoint util.Endpoint
_endpoint util.Endpoint
// channel to coordinate messages writen back to the connection.
send chan *Message
_send chan *Message
// whether this peers version was acknowledged.
verack bool
_verack bool
}
// NewPeer returns a (TCP) Peer.
func NewPeer(conn net.Conn) *Peer {
// NewTCPPeer returns a pointer to a TCP Peer.
func NewTCPPeer(conn net.Conn) *TCPPeer {
e, _ := util.EndpointFromString(conn.RemoteAddr().String())
return &Peer{
return &TCPPeer{
conn: conn,
send: make(chan *Message),
endpoint: e,
_send: make(chan *Message),
_endpoint: e,
}
}
// id implements the peer interface
func (p *TCPPeer) id() uint32 {
return p._id
}
// endpoint implements the peer interface
func (p *TCPPeer) endpoint() util.Endpoint {
return p._endpoint
}
// verack implements the peer interface
func (p *TCPPeer) verack() bool {
return p._verack
}
// verify implements the peer interface
func (p *TCPPeer) verify(id uint32) {
p._id = id
p._verack = true
}
// send implements the peer interface
func (p *TCPPeer) send(msg *Message) {
p._send <- msg
}
func (p *TCPPeer) disconnect() {
close(p._send)
p.conn.Close()
}
// writeLoop writes messages to the underlying TCP connection.
// A goroutine writeLoop is started for each connection.
// There should be at most one writer to a connection executing
// all writes from this goroutine.
func (p *Peer) writeLoop() {
func (p *TCPPeer) writeLoop() {
// clean up the connection.
defer func() {
p.conn.Close()
}()
for {
msg := <-p.send
rpcLogger.Printf("OUT :: %s", msg.commandType())
msg := <-p._send
rpcLogger.Printf("[SERVER] :: OUT :: %s :: %+v", msg.commandType(), msg.Payload)
if err := msg.encode(p.conn); err != nil {
log.Printf("encode error: %s", err)
}

View file

@ -7,9 +7,9 @@ import (
"net"
"os"
"strconv"
"sync"
"time"
"github.com/anthdm/neo-go/pkg/core"
"github.com/anthdm/neo-go/pkg/network/payload"
"github.com/anthdm/neo-go/pkg/util"
)
@ -28,7 +28,7 @@ var (
)
type messageTuple struct {
peer *Peer
peer Peer
msg *Message
}
@ -36,8 +36,6 @@ type messageTuple struct {
type Server struct {
logger *log.Logger
mtx sync.RWMutex
// id of the server
id uint32
@ -51,10 +49,10 @@ type Server struct {
// Or 56753 to work with the docker privnet.
net NetMode
// map that holds all connected peers to this server.
peers map[*Peer]bool
peers map[Peer]bool
register chan *Peer
unregister chan *Peer
register chan Peer
unregister chan Peer
// channel for coordinating messages.
message chan messageTuple
@ -79,11 +77,11 @@ func NewServer(net NetMode) *Server {
s := &Server{
id: util.RandUint32(1111111, 9999999),
userAgent: fmt.Sprintf("\v/NEO:%s/", version),
userAgent: fmt.Sprintf("/NEO:%s/", version),
logger: logger,
peers: make(map[*Peer]bool),
register: make(chan *Peer),
unregister: make(chan *Peer),
peers: make(map[Peer]bool),
register: make(chan Peer),
unregister: make(chan Peer),
message: make(chan messageTuple),
relay: true,
net: net,
@ -137,17 +135,16 @@ func (s *Server) loop() {
// When a new connection is been established, (by this server or remote node)
// its peer will be received on this channel.
// Any peer registration must happen via this channel.
s.logger.Printf("peer registered from address %s", peer.conn.RemoteAddr())
s.logger.Printf("peer registered from address %s", peer.endpoint())
s.peers[peer] = true
s.handlePeerConnected(peer)
case peer := <-s.unregister:
// unregister should take care of all the cleanup that has to be made.
if _, ok := s.peers[peer]; ok {
peer.conn.Close()
close(peer.send)
peer.disconnect()
delete(s.peers, peer)
s.logger.Printf("peer %s disconnected", peer.conn.RemoteAddr())
s.logger.Printf("peer %s disconnected", peer.endpoint())
}
case tuple := <-s.message:
@ -166,14 +163,14 @@ func (s *Server) loop() {
}
// processMessage processes the message received from the peer.
func (s *Server) processMessage(msg *Message, peer *Peer) error {
func (s *Server) processMessage(msg *Message, peer Peer) error {
command := msg.commandType()
rpcLogger.Printf("[NODE %d] :: IN :: %s :: %+v", peer.id, command, msg.Payload)
rpcLogger.Printf("[NODE %d] :: IN :: %s :: %+v", peer.id(), command, msg.Payload)
// Disconnect if the remote is sending messages other then version
// if we didn't verack this peer.
if !peer.verack && command != cmdVersion {
if !peer.verack() && command != cmdVersion {
return errors.New("version noack")
}
@ -192,6 +189,7 @@ func (s *Server) processMessage(msg *Message, peer *Peer) error {
return s.handleInvCmd(msg.Payload.(*payload.Inventory), peer)
case cmdGetData:
case cmdBlock:
return s.handleBlockCmd(msg.Payload.(*core.Block), peer)
case cmdTX:
case cmdConsensus:
default:
@ -204,29 +202,29 @@ func (s *Server) processMessage(msg *Message, peer *Peer) error {
// When a new peer is connected we send our version.
// No further communication should be made before both sides has received
// the versions of eachother.
func (s *Server) handlePeerConnected(peer *Peer) {
func (s *Server) handlePeerConnected(peer Peer) {
// TODO get heigth of block when thats implemented.
payload := payload.NewVersion(s.id, s.port, s.userAgent, 0, s.relay)
msg := newMessage(s.net, cmdVersion, payload)
peer.send <- msg
peer.send(msg)
}
// Version declares the server's version.
func (s *Server) handleVersionCmd(v *payload.Version, peer *Peer) error {
func (s *Server) handleVersionCmd(v *payload.Version, peer Peer) error {
if s.id == v.Nonce {
return errors.New("remote nonce equal to server id")
}
if peer.endpoint.Port != v.Port {
if peer.endpoint().Port != v.Port {
return errors.New("port mismatch")
}
// we respond with a verack, we successfully received peer's version
// at this point.
peer.verack = true
peer.id = v.Nonce
peer.verify(v.Nonce)
verackMsg := newMessage(s.net, cmdVerack, nil)
peer.send <- verackMsg
peer.send(verackMsg)
go s.sendLoop(peer)
@ -234,7 +232,7 @@ func (s *Server) handleVersionCmd(v *payload.Version, peer *Peer) error {
}
// When the remote node reveals its known peers we try to connect to all of them.
func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer *Peer) error {
func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer Peer) error {
for _, addr := range addrList.Addrs {
if !s.peerAlreadyConnected(addr.Addr) {
go connectToRemoteNode(s, addr.Addr.String())
@ -243,7 +241,7 @@ func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer *Peer) error
return nil
}
func (s *Server) handleInvCmd(inv *payload.Inventory, peer *Peer) error {
func (s *Server) handleInvCmd(inv *payload.Inventory, peer Peer) error {
if !inv.Type.Valid() {
return fmt.Errorf("invalid inventory type: %s", inv.Type)
}
@ -254,11 +252,16 @@ func (s *Server) handleInvCmd(inv *payload.Inventory, peer *Peer) error {
payload := payload.NewInventory(inv.Type, inv.Hashes)
msg := newMessage(s.net, cmdGetData, payload)
peer.send <- msg
peer.send(msg)
return nil
}
func (s *Server) handleBlockCmd(block *core.Block, peer Peer) error {
fmt.Println("received a block yyyyyyeeeeeehhhhh!")
return nil
}
func (s *Server) peerAlreadyConnected(addr net.Addr) bool {
// TODO: check for race conditions
//s.mtx.RLock()
@ -267,7 +270,7 @@ func (s *Server) peerAlreadyConnected(addr net.Addr) bool {
// What about ourself ^^
for peer := range s.peers {
if peer.conn.RemoteAddr().String() == addr.String() {
if peer.endpoint().String() == addr.String() {
return true
}
}
@ -282,13 +285,13 @@ func (s *Server) handleGetAddrCmd(msg *Message, peer *Peer) error {
return nil
}
func (s *Server) sendLoop(peer *Peer) {
func (s *Server) sendLoop(peer Peer) {
// TODO: check if this peer is still connected.
for {
getaddrMsg := newMessage(s.net, cmdGetAddr, nil)
peer.send <- getaddrMsg
peer.send(getaddrMsg)
time.Sleep(10 * time.Second)
time.Sleep(120 * time.Second)
}
}

View file

@ -0,0 +1,26 @@
package network
import (
"testing"
)
func TestHandleVersion(t *testing.T) {
// s := NewServer(ModeDevNet)
// go s.Start(":3000", nil)
// p := NewLocalPeer()
// s.register <- p
// version := payload.NewVersion(1337, p.endpoint().Port, "/NEO:0.0.0/.", 0, true)
// s.handleVersionCmd(version, p)
// if len(s.peers) != 1 {
// t.Fatalf("expecting the server to have %d peers got %d", 1, len(s.peers))
// }
// if p.id() != 1337 {
// t.Fatalf("expecting peer's id to be %d got %d", 1337, p._id)
// }
// if !p.verack() {
// t.Fatal("expecting peer to be verified")
// }
}

View file

@ -40,7 +40,7 @@ func connectToSeeds(s *Server, addrs []string) {
}
func handleConnection(s *Server, conn net.Conn) {
peer := NewPeer(conn)
peer := NewTCPPeer(conn)
s.register <- peer
// remove the peer from connected peers and cleanup the connection.

View file

@ -33,10 +33,10 @@ func (u *Uint256) UnmarshalBinary(b []byte) error {
return nil
}
// ToSlice return a byte slice of u.
// ToSlice returns a byte slice of u.
func (u Uint256) ToSlice() []byte {
b := make([]byte, 32)
for i := 0; i < 32; i++ {
for i := 0; i < len(b); i++ {
b[i] = byte(u[i])
}
return b
@ -48,3 +48,17 @@ func (u Uint256) String() string {
// Uint160 is a 20 byte long unsigned integer
type Uint160 [20]uint8
// ToSlice returns a byte slice of u.
func (u Uint160) ToSlice() []byte {
b := make([]byte, 20)
for i := 0; i < len(b); i++ {
b[i] = byte(u[i])
}
return b
}
// String implements the stringer interface.
func (u Uint160) String() string {
return hex.EncodeToString(u.ToSlice())
}