Finished version + verack.

This commit is contained in:
Anthony De Meulemeester 2018-02-01 15:16:59 +01:00 committed by GitHub
commit 9f7e2083e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 195 additions and 38 deletions

View file

@ -0,0 +1,54 @@
package payload
import (
"encoding/binary"
"io"
. "github.com/anthdm/neo-go/pkg/util"
)
// HashStartStop contains fields and methods to be shared with the
// "GetBlocks" and "GetHeaders" payload.
type HashStartStop struct {
// hash of latest block that node requests
HashStart []Uint256
// hash of last block that node requests
HashStop Uint256
}
// DecodeBinary implements the payload interface.
func (p *HashStartStop) DecodeBinary(r io.Reader) error {
var lenStart uint8
err := binary.Read(r, binary.LittleEndian, &lenStart)
p.HashStart = make([]Uint256, lenStart)
err = binary.Read(r, binary.LittleEndian, &p.HashStart)
err = binary.Read(r, binary.LittleEndian, &p.HashStop)
return err
}
// EncodeBinary implements the payload interface.
func (p *HashStartStop) EncodeBinary(w io.Writer) error {
err := binary.Write(w, binary.LittleEndian, uint8(len(p.HashStart)))
err = binary.Write(w, binary.LittleEndian, p.HashStart)
err = binary.Write(w, binary.LittleEndian, p.HashStop)
return err
}
// Size implements the payload interface.
func (p *HashStartStop) Size() uint32 { return 0 }
// GetBlocks payload
type GetBlocks struct {
HashStartStop
}
// NewGetBlocks return a pointer to a GetBlocks object.
func NewGetBlocks(start []Uint256, stop Uint256) *GetBlocks {
p := &GetBlocks{}
p.HashStart = start
p.HashStop = stop
return p
}

View file

@ -0,0 +1,37 @@
package payload
import (
"bytes"
"crypto/sha256"
"reflect"
"testing"
. "github.com/anthdm/neo-go/pkg/util"
)
func TestGetBlocksEncodeDecode(t *testing.T) {
start := []Uint256{
sha256.Sum256([]byte("a")),
sha256.Sum256([]byte("b")),
}
stop := sha256.Sum256([]byte("c"))
p := NewGetBlocks(start, stop)
buf := new(bytes.Buffer)
if err := p.EncodeBinary(buf); err != nil {
t.Fatal(err)
}
if have, want := buf.Len(), 1+64+32; have != want {
t.Fatalf("expecting a length of %d got %d", want, have)
}
pDecode := &GetBlocks{}
if err := pDecode.DecodeBinary(buf); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(p, pDecode) {
t.Fatalf("expecting both getblocks payloads to be equal %v and %v", p, pDecode)
}
}

View file

@ -0,0 +1,17 @@
package payload
import "github.com/anthdm/neo-go/pkg/util"
// GetHeaders payload is the same as the "GetBlocks" payload.
type GetHeaders struct {
HashStartStop
}
// NewGetHeaders return a pointer to a GetHeaders object.
func NewGetHeaders(start []util.Uint256, stop util.Uint256) *GetHeaders {
p := &GetHeaders{}
p.HashStart = start
p.HashStop = stop
return p
}

View file

@ -0,0 +1,37 @@
package payload
import (
"bytes"
"crypto/sha256"
"reflect"
"testing"
"github.com/anthdm/neo-go/pkg/util"
)
func TestGetHeadersEncodeDecode(t *testing.T) {
start := []util.Uint256{
sha256.Sum256([]byte("a")),
sha256.Sum256([]byte("b")),
}
stop := sha256.Sum256([]byte("c"))
p := NewGetHeaders(start, stop)
buf := new(bytes.Buffer)
if err := p.EncodeBinary(buf); err != nil {
t.Fatal(err)
}
if have, want := buf.Len(), 1+64+32; have != want {
t.Fatalf("expecting a length of %d got %d", want, have)
}
pDecode := &GetHeaders{}
if err := pDecode.DecodeBinary(buf); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(p, pDecode) {
t.Fatalf("expecting both getheaders payloads to be equal %v and %v", p, pDecode)
}
}

View file

@ -9,7 +9,6 @@ import (
type Peer interface { type Peer interface {
id() uint32 id() uint32
addr() util.Endpoint addr() util.Endpoint
verack() bool
disconnect() disconnect()
callVersion(*Message) callVersion(*Message)
callGetaddr(*Message) callGetaddr(*Message)
@ -20,7 +19,6 @@ type Peer interface {
type LocalPeer struct { type LocalPeer struct {
s *Server s *Server
nonce uint32 nonce uint32
isVerack bool
endpoint util.Endpoint endpoint util.Endpoint
} }
@ -39,6 +37,5 @@ func (p *LocalPeer) callGetaddr(msg *Message) {
} }
func (p *LocalPeer) id() uint32 { return p.nonce } func (p *LocalPeer) id() uint32 { return p.nonce }
func (p *LocalPeer) verack() bool { return p.isVerack }
func (p *LocalPeer) addr() util.Endpoint { return p.endpoint } func (p *LocalPeer) addr() util.Endpoint { return p.endpoint }
func (p *LocalPeer) disconnect() {} func (p *LocalPeer) disconnect() {}

View file

@ -22,6 +22,7 @@ func listenTCP(s *Server, port int) error {
if err != nil { if err != nil {
return err return err
} }
go handleConnection(s, conn) go handleConnection(s, conn)
} }
} }
@ -54,8 +55,10 @@ func handleConnection(s *Server, conn net.Conn) {
s.unregister <- peer s.unregister <- peer
}() }()
// Start a goroutine that will handle all writes to the registered peer. // Start a goroutine that will handle all outgoing messages.
go peer.writeLoop() go peer.writeLoop()
// Start a goroutine that will handle all incomming messages.
go handleMessage(s, peer)
// Read from the connection and decode it into a Message ready for processing. // Read from the connection and decode it into a Message ready for processing.
buf := make([]byte, 1024) buf := make([]byte, 1024)
@ -71,12 +74,20 @@ func handleConnection(s *Server, conn net.Conn) {
s.logger.Printf("decode error %s", err) s.logger.Printf("decode error %s", err)
break break
} }
handleMessage(msg, s, peer)
peer.receive <- msg
} }
} }
// handleMessage hands the message received from a TCP connection over to the server. // handleMessage hands the message received from a TCP connection over to the server.
func handleMessage(msg *Message, s *Server, p *TCPPeer) { func handleMessage(s *Server, p *TCPPeer) {
// Disconnect the peer when we break out of the loop.
defer func() {
p.disconnect()
}()
for {
msg := <-p.receive
command := msg.commandType() command := msg.commandType()
s.logger.Printf("IN :: %d :: %s :: %v", p.id(), command, msg) s.logger.Printf("IN :: %d :: %s :: %v", p.id(), command, msg)
@ -84,9 +95,16 @@ func handleMessage(msg *Message, s *Server, p *TCPPeer) {
switch command { switch command {
case cmdVersion: case cmdVersion:
resp := s.handleVersionCmd(msg, p) resp := s.handleVersionCmd(msg, p)
p.isVerack = true
p.nonce = msg.Payload.(*payload.Version).Nonce p.nonce = msg.Payload.(*payload.Version).Nonce
p.send <- resp p.send <- resp
// after sending our version we want a "verack" and nothing else.
msg := <-p.receive
if msg.commandType() != cmdVerack {
break
}
// we can start the protocol now.
go s.sendLoop(p)
case cmdAddr: case cmdAddr:
s.handleAddrCmd(msg, p) s.handleAddrCmd(msg, p)
case cmdGetAddr: case cmdGetAddr:
@ -98,12 +116,13 @@ func handleMessage(msg *Message, s *Server, p *TCPPeer) {
case cmdConsensus: case cmdConsensus:
case cmdTX: case cmdTX:
case cmdVerack: case cmdVerack:
go s.sendLoop(p) // disconnect the peer, verack should already be handled.
break
case cmdGetHeaders: case cmdGetHeaders:
case cmdGetBlocks: case cmdGetBlocks:
case cmdGetData: case cmdGetData:
case cmdHeaders: case cmdHeaders:
default: }
} }
} }
@ -118,8 +137,8 @@ type TCPPeer struct {
endpoint util.Endpoint endpoint util.Endpoint
// channel to coordinate messages writen back to the connection. // channel to coordinate messages writen back to the connection.
send chan *Message send chan *Message
// whether this peers version was acknowledged. // channel to receive from underlying connection.
isVerack bool receive chan *Message
} }
// NewTCPPeer returns a pointer to a TCP Peer. // NewTCPPeer returns a pointer to a TCP Peer.
@ -129,6 +148,7 @@ func NewTCPPeer(conn net.Conn, s *Server) *TCPPeer {
return &TCPPeer{ return &TCPPeer{
conn: conn, conn: conn,
send: make(chan *Message), send: make(chan *Message),
receive: make(chan *Message),
endpoint: e, endpoint: e,
s: s, s: s,
} }
@ -148,11 +168,6 @@ func (p *TCPPeer) addr() util.Endpoint {
return p.endpoint return p.endpoint
} }
// verack implements the peer interface
func (p *TCPPeer) verack() bool {
return p.isVerack
}
// callGetaddr will send the "getaddr" command to the remote. // callGetaddr will send the "getaddr" command to the remote.
func (p *TCPPeer) callGetaddr(msg *Message) { func (p *TCPPeer) callGetaddr(msg *Message) {
p.send <- msg p.send <- msg