diff --git a/pkg/network/payload/address.go b/pkg/network/payload/address.go index c891753e5..23eb50408 100644 --- a/pkg/network/payload/address.go +++ b/pkg/network/payload/address.go @@ -3,6 +3,7 @@ package payload import ( "io" "net" + "strconv" "time" "github.com/CityOfZion/neo-go/pkg/util" @@ -47,6 +48,15 @@ func (p *AddressAndTime) EncodeBinary(w io.Writer) error { return bw.Err } +// IPPortString makes a string from IP and port specified. +func (p *AddressAndTime) IPPortString() string { + var netip net.IP = make(net.IP, 16) + + copy(netip, p.IP[:]) + port := strconv.Itoa(int(p.Port)) + return netip.String() + ":" + port +} + // AddressList is a list with AddrAndTime. type AddressList struct { Addrs []*AddressAndTime diff --git a/pkg/network/payload/payload.go b/pkg/network/payload/payload.go index 90d5b0d74..e23f9955a 100644 --- a/pkg/network/payload/payload.go +++ b/pkg/network/payload/payload.go @@ -7,3 +7,22 @@ type Payload interface { EncodeBinary(io.Writer) error DecodeBinary(io.Reader) error } + +// NullPayload is a dummy payload with no fields. +type NullPayload struct { +} + +// NewNullPayload returns zero-sized stub payload. +func NewNullPayload() *NullPayload { + return &NullPayload{} +} + +// DecodeBinary implements the Payload interface. +func (p *NullPayload) DecodeBinary(r io.Reader) error { + return nil +} + +// EncodeBinary implements the Payload interface. +func (p *NullPayload) EncodeBinary(r io.Writer) error { + return nil +} diff --git a/pkg/network/server.go b/pkg/network/server.go index a91c331cb..007f9ff3e 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -48,6 +48,7 @@ type ( lock sync.RWMutex peers map[Peer]bool + addrReq chan *Message register chan Peer unregister chan peerDrop quit chan struct{} @@ -66,6 +67,7 @@ func NewServer(config ServerConfig, chain core.Blockchainer) *Server { chain: chain, id: rand.Uint32(), quit: make(chan struct{}), + addrReq: make(chan *Message, 1), register: make(chan Peer), unregister: make(chan peerDrop), peers: make(map[Peer]bool), @@ -123,6 +125,15 @@ func (s *Server) run() { if c < minPeers { s.discovery.RequestRemote(maxPeers - c) } + if s.discovery.PoolCount() < minPoolCount { + select { + case s.addrReq <- NewMessage(s.Net, CMDGetAddr, payload.NewNullPayload()): + // sent request + default: + // we have one in the queue already that is + // gonna be served by some worker when it's ready + } + } select { case <-s.quit: s.transport.Close() @@ -184,6 +195,8 @@ func (s *Server) startProtocol(p Peer) { case err := <-p.Done(): s.unregister <- peerDrop{p, err} return + case m := <-s.addrReq: + p.WriteMsg(m) case <-timer.C: // Try to sync in headers and block with the peer if his block height is higher then ours. if p.Version().StartHeight > s.chain.BlockHeight() { @@ -253,6 +266,14 @@ func (s *Server) handleInvCmd(p Peer, inv *payload.Inventory) error { return p.WriteMsg(NewMessage(s.Net, CMDGetData, payload)) } +// handleAddrCmd will process received addresses. +func (s *Server) handleAddrCmd(p Peer, addrs *payload.AddressList) error { + for _, a := range addrs.Addrs { + s.discovery.BackFill(a.IPPortString()) + } + return nil +} + // requestHeaders will send a getheaders message to the peer. // The peer will respond with headers op to a count of 2000. func (s *Server) requestHeaders(p Peer) { @@ -292,6 +313,9 @@ func (s *Server) handleMessage(peer Peer, msg *Message) error { } switch msg.CommandType() { + case CMDAddr: + addrs := msg.Payload.(*payload.AddressList) + return s.handleAddrCmd(peer, addrs) case CMDVersion: version := msg.Payload.(*payload.Version) return s.handleVersionCmd(peer, version)