handle inventory message.

This commit is contained in:
anthdm 2018-01-28 18:42:22 +01:00
parent 5799cdb3ea
commit 3cc2a6381b
4 changed files with 36 additions and 41 deletions

View file

@ -165,27 +165,26 @@ func (m *Message) decodePayload(r io.Reader) error {
} }
// Compare the checksum of the payload. // Compare the checksum of the payload.
fmt.Println(len(pbuf))
if !compareChecksum(m.Checksum, pbuf) { if !compareChecksum(m.Checksum, pbuf) {
return errors.New("checksum mismatch error") return errors.New("checksum mismatch error")
} }
rr := bytes.NewReader(pbuf) r = bytes.NewReader(pbuf)
var p payload.Payload var p payload.Payload
switch m.commandType() { switch m.commandType() {
case cmdVersion: case cmdVersion:
p = &payload.Version{} p = &payload.Version{}
if err := p.DecodeBinary(rr); err != nil { if err := p.DecodeBinary(r); err != nil {
return err
}
case cmdInv:
p = &payload.Inventory{}
if err := p.DecodeBinary(r); err != nil {
return err return err
} }
// case cmdInv:
// p = &payload.Inventory{}
// if err := p.UnmarshalBinary(pbuf); err != nil {
// return err
// }
case cmdAddr: case cmdAddr:
p = &payload.AddressList{} p = &payload.AddressList{}
if err := p.DecodeBinary(rr); err != nil { if err := p.DecodeBinary(r); err != nil {
return err return err
} }
} }

View file

@ -1,8 +1,8 @@
package payload package payload
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"io"
. "github.com/anthdm/neo-go/pkg/util" . "github.com/anthdm/neo-go/pkg/util"
) )
@ -42,19 +42,23 @@ type Inventory struct {
Hash Uint256 Hash Uint256
} }
// UnmarshalBinary implements the Payloader interface. // DecodeBinary implements the Payload interface.
func (p *Inventory) UnmarshalBinary(b []byte) error { func (p *Inventory) DecodeBinary(r io.Reader) error {
// TODO: what byte is [1:2] ? // TODO: is there a list len?
// We have 1 byte for the type which is uint8 and 32 for the hash. // The first byte is the type the second byte seems to be
// There is 1 byte left over. // always one on docker privnet.
binary.Read(bytes.NewReader(b), binary.LittleEndian, &p.Type) var listLen uint8
p.Hash.UnmarshalBinary(b[2:len(b)]) err := binary.Read(r, binary.LittleEndian, &p.Type)
return nil err = binary.Read(r, binary.LittleEndian, &listLen)
err = binary.Read(r, binary.LittleEndian, &p.Hash)
return err
} }
// MarshalBinary implements the Payloader interface. // EncodeBinary implements the Payload interface.
func (p *Inventory) MarshalBinary() ([]byte, error) { func (p *Inventory) EncodeBinary(w io.Writer) error {
return nil, nil // TODO
return nil
} }
// Size implements the Payloader interface. // Size implements the Payloader interface.

View file

@ -128,14 +128,11 @@ func (s *Server) shutdown() {
// disconnect and remove all connected peers. // disconnect and remove all connected peers.
for peer := range s.peers { for peer := range s.peers {
peer.conn.Close()
s.unregister <- peer s.unregister <- peer
} }
} }
func (s *Server) disconnect(p *Peer) { func (s *Server) disconnect(p *Peer) {
p.conn.Close()
close(p.send)
s.unregister <- p s.unregister <- p
} }
@ -156,7 +153,10 @@ func (s *Server) loop() {
peer.send <- resp peer.send <- resp
} }
case peer := <-s.unregister: case peer := <-s.unregister:
// unregister should take care of all the cleanup that has to be made.
if _, ok := s.peers[peer]; ok { if _, ok := s.peers[peer]; ok {
peer.conn.Close()
close(peer.send)
delete(s.peers, peer) delete(s.peers, peer)
s.logger.Printf("peer %s disconnected", peer.conn.RemoteAddr()) s.logger.Printf("peer %s disconnected", peer.conn.RemoteAddr())
} }
@ -231,19 +231,20 @@ func (s *Server) handleVersionCmd(v *payload.Version, peer *Peer) error {
// When the remote node reveals its known peers we try to connect to all of them. // When the remote node reveals its known peers we try to connect to all of them.
func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer *Peer) error { func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer *Peer) error {
for _, addr := range addrList.Addrs { for _, addr := range addrList.Addrs {
fmt.Println(addr) if !s.peerAlreadyConnected(addr.Addr) {
// if !s.addrAlreadyConnected(addr.Addr) { go connectToRemoteNode(s, addr.Addr.String())
// go connectToRemoteNode(s, addr.Addr.String()) }
// }
} }
return nil return nil
} }
func (s *Server) addrAlreadyConnected(addr net.Addr) bool { func (s *Server) peerAlreadyConnected(addr net.Addr) bool {
// TODO: check for race conditions // TODO: check for race conditions
//s.mtx.RLock() //s.mtx.RLock()
//defer s.mtx.RUnlock() //defer s.mtx.RUnlock()
// What about ourself ^^
for peer := range s.peers { for peer := range s.peers {
if peer.conn.RemoteAddr().String() == addr.String() { if peer.conn.RemoteAddr().String() == addr.String() {
return true return true
@ -256,16 +257,7 @@ func (s *Server) addrAlreadyConnected(addr net.Addr) bool {
// providing information about the other nodes in the network. // providing information about the other nodes in the network.
// e.g. this server's connected peers. // e.g. this server's connected peers.
func (s *Server) handleGetAddrCmd(msg *Message, peer *Peer) error { func (s *Server) handleGetAddrCmd(msg *Message, peer *Peer) error {
// payload := NewAddrPayload() // TODO
// b, err := payload.encode()
// if err != nil {
// return err
// }
// var addrList []AddrWithTimestamp
// for peer := range s.peers {
// addrList = append(addrList, newAddrWithTimestampFromPeer(peer))
// }
return nil return nil
} }

View file

@ -23,7 +23,7 @@ func listenTCP(s *Server, port string) error {
func connectToRemoteNode(s *Server, address string) { func connectToRemoteNode(s *Server, address string) {
conn, err := net.Dial("tcp", address) conn, err := net.Dial("tcp", address)
if err != nil { if err != nil {
s.logger.Printf("failed to connects to remote node %s", address) s.logger.Printf("failed to connect to remote node %s", address)
if conn != nil { if conn != nil {
conn.Close() conn.Close()
} }
@ -45,8 +45,8 @@ func handleConnection(s *Server, conn net.Conn, initiated bool) {
// remove the peer from connected peers and cleanup the connection. // remove the peer from connected peers and cleanup the connection.
defer func() { defer func() {
// all cleanup will happen in the server's loop when unregister is received.
s.unregister <- peer s.unregister <- peer
conn.Close()
}() }()
// Start a goroutine that will handle all writes to the registered peer. // Start a goroutine that will handle all writes to the registered peer.