From 3cc2a6381ba82363b25b7f472e3ffa7a64da254a Mon Sep 17 00:00:00 2001 From: anthdm Date: Sun, 28 Jan 2018 18:42:22 +0100 Subject: [PATCH] handle inventory message. --- pkg/network/message.go | 17 ++++++++--------- pkg/network/payload/inventory.go | 28 ++++++++++++++++------------ pkg/network/server.go | 28 ++++++++++------------------ pkg/network/tcp.go | 4 ++-- 4 files changed, 36 insertions(+), 41 deletions(-) diff --git a/pkg/network/message.go b/pkg/network/message.go index 596ef664f..55d36d927 100644 --- a/pkg/network/message.go +++ b/pkg/network/message.go @@ -165,27 +165,26 @@ func (m *Message) decodePayload(r io.Reader) error { } // Compare the checksum of the payload. - fmt.Println(len(pbuf)) if !compareChecksum(m.Checksum, pbuf) { return errors.New("checksum mismatch error") } - rr := bytes.NewReader(pbuf) + r = bytes.NewReader(pbuf) var p payload.Payload switch m.commandType() { case cmdVersion: p = &payload.Version{} - if err := p.DecodeBinary(rr); err != nil { + if err := p.DecodeBinary(r); err != nil { + return err + } + case cmdInv: + p = &payload.Inventory{} + if err := p.DecodeBinary(r); err != nil { return err } - // case cmdInv: - // p = &payload.Inventory{} - // if err := p.UnmarshalBinary(pbuf); err != nil { - // return err - // } case cmdAddr: p = &payload.AddressList{} - if err := p.DecodeBinary(rr); err != nil { + if err := p.DecodeBinary(r); err != nil { return err } } diff --git a/pkg/network/payload/inventory.go b/pkg/network/payload/inventory.go index ca508b4bb..980433865 100644 --- a/pkg/network/payload/inventory.go +++ b/pkg/network/payload/inventory.go @@ -1,8 +1,8 @@ package payload import ( - "bytes" "encoding/binary" + "io" . "github.com/anthdm/neo-go/pkg/util" ) @@ -42,19 +42,23 @@ type Inventory struct { Hash Uint256 } -// UnmarshalBinary implements the Payloader interface. -func (p *Inventory) UnmarshalBinary(b []byte) error { - // TODO: what byte is [1:2] ? - // We have 1 byte for the type which is uint8 and 32 for the hash. - // There is 1 byte left over. - binary.Read(bytes.NewReader(b), binary.LittleEndian, &p.Type) - p.Hash.UnmarshalBinary(b[2:len(b)]) - return nil +// DecodeBinary implements the Payload interface. +func (p *Inventory) DecodeBinary(r io.Reader) error { + // TODO: is there a list len? + // The first byte is the type the second byte seems to be + // always one on docker privnet. + var listLen uint8 + err := binary.Read(r, binary.LittleEndian, &p.Type) + err = binary.Read(r, binary.LittleEndian, &listLen) + err = binary.Read(r, binary.LittleEndian, &p.Hash) + + return err } -// MarshalBinary implements the Payloader interface. -func (p *Inventory) MarshalBinary() ([]byte, error) { - return nil, nil +// EncodeBinary implements the Payload interface. +func (p *Inventory) EncodeBinary(w io.Writer) error { + // TODO + return nil } // Size implements the Payloader interface. diff --git a/pkg/network/server.go b/pkg/network/server.go index 64a4fb716..6b3494258 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -128,14 +128,11 @@ func (s *Server) shutdown() { // disconnect and remove all connected peers. for peer := range s.peers { - peer.conn.Close() s.unregister <- peer } } func (s *Server) disconnect(p *Peer) { - p.conn.Close() - close(p.send) s.unregister <- p } @@ -156,7 +153,10 @@ func (s *Server) loop() { peer.send <- resp } case peer := <-s.unregister: + // unregister should take care of all the cleanup that has to be made. if _, ok := s.peers[peer]; ok { + peer.conn.Close() + close(peer.send) delete(s.peers, peer) s.logger.Printf("peer %s disconnected", peer.conn.RemoteAddr()) } @@ -231,19 +231,20 @@ func (s *Server) handleVersionCmd(v *payload.Version, peer *Peer) error { // When the remote node reveals its known peers we try to connect to all of them. func (s *Server) handleAddrCmd(addrList *payload.AddressList, peer *Peer) error { for _, addr := range addrList.Addrs { - fmt.Println(addr) - // if !s.addrAlreadyConnected(addr.Addr) { - // go connectToRemoteNode(s, addr.Addr.String()) - // } + if !s.peerAlreadyConnected(addr.Addr) { + go connectToRemoteNode(s, addr.Addr.String()) + } } return nil } -func (s *Server) addrAlreadyConnected(addr net.Addr) bool { +func (s *Server) peerAlreadyConnected(addr net.Addr) bool { // TODO: check for race conditions //s.mtx.RLock() //defer s.mtx.RUnlock() + // What about ourself ^^ + for peer := range s.peers { if peer.conn.RemoteAddr().String() == addr.String() { return true @@ -256,16 +257,7 @@ func (s *Server) addrAlreadyConnected(addr net.Addr) bool { // providing information about the other nodes in the network. // e.g. this server's connected peers. func (s *Server) handleGetAddrCmd(msg *Message, peer *Peer) error { - // payload := NewAddrPayload() - // b, err := payload.encode() - // if err != nil { - // return err - // } - // var addrList []AddrWithTimestamp - // for peer := range s.peers { - // addrList = append(addrList, newAddrWithTimestampFromPeer(peer)) - // } - + // TODO return nil } diff --git a/pkg/network/tcp.go b/pkg/network/tcp.go index 06249d132..457830f8b 100644 --- a/pkg/network/tcp.go +++ b/pkg/network/tcp.go @@ -23,7 +23,7 @@ func listenTCP(s *Server, port string) error { func connectToRemoteNode(s *Server, address string) { conn, err := net.Dial("tcp", address) if err != nil { - s.logger.Printf("failed to connects to remote node %s", address) + s.logger.Printf("failed to connect to remote node %s", address) if conn != nil { conn.Close() } @@ -45,8 +45,8 @@ func handleConnection(s *Server, conn net.Conn, initiated bool) { // remove the peer from connected peers and cleanup the connection. defer func() { + // all cleanup will happen in the server's loop when unregister is received. s.unregister <- peer - conn.Close() }() // Start a goroutine that will handle all writes to the registered peer.