neoneo-go/pkg/network/server.go

303 lines
6.9 KiB
Go
Raw Normal View History

2018-01-26 18:04:13 +00:00
package network
import (
"fmt"
"log"
"net"
"os"
2018-01-26 20:39:34 +00:00
"strconv"
2018-01-28 10:12:05 +00:00
"time"
2018-01-27 15:00:28 +00:00
"github.com/anthdm/neo-go/pkg/network/payload"
2018-01-28 07:03:18 +00:00
"github.com/anthdm/neo-go/pkg/util"
2018-01-26 18:04:13 +00:00
)
const (
2018-01-26 20:39:34 +00:00
// node version
version = "2.6.0"
// official ports according to the protocol.
2018-01-26 18:04:13 +00:00
portMainNet = 10333
portTestNet = 20333
2018-01-31 19:11:08 +00:00
maxPeers = 50
2018-01-26 18:04:13 +00:00
)
type messageTuple struct {
peer Peer
2018-01-26 18:04:13 +00:00
msg *Message
}
// Server is the representation of a full working NEO TCP node.
type Server struct {
logger *log.Logger
2018-01-28 07:03:18 +00:00
// id of the server
id uint32
2018-01-26 20:39:34 +00:00
// the port the TCP listener is listening on.
port uint16
2018-01-26 18:04:13 +00:00
// userAgent of the server.
userAgent string
// The "magic" mode the server is currently running on.
// This can either be 0x00746e41 or 0x74746e41 for main or test net.
2018-01-26 20:39:34 +00:00
// Or 56753 to work with the docker privnet.
net NetMode
2018-01-26 18:04:13 +00:00
// map that holds all connected peers to this server.
peers map[Peer]bool
2018-01-31 19:11:08 +00:00
// channel for handling new registerd peers.
register chan Peer
// channel for safely removing and disconnecting peers.
unregister chan Peer
2018-01-26 18:04:13 +00:00
// channel for coordinating messages.
message chan messageTuple
// channel used to gracefull shutdown the server.
quit chan struct{}
// Whether this server will receive and forward messages.
relay bool
// TCP listener of the server
listener net.Listener
2018-01-31 19:11:08 +00:00
// RPC channels
versionCh chan versionTuple
getaddrCh chan getaddrTuple
invCh chan invTuple
addrCh chan addrTuple
2018-01-26 18:04:13 +00:00
}
// NewServer returns a pointer to a new server.
2018-01-26 20:39:34 +00:00
func NewServer(net NetMode) *Server {
2018-01-31 19:11:08 +00:00
logger := log.New(os.Stdout, "[NEO SERVER] :: ", 0)
2018-01-26 18:04:13 +00:00
2018-01-26 20:39:34 +00:00
if net != ModeTestNet && net != ModeMainNet && net != ModeDevNet {
logger.Fatalf("invalid network mode %d", net)
2018-01-26 18:04:13 +00:00
}
s := &Server{
2018-01-28 07:03:18 +00:00
id: util.RandUint32(1111111, 9999999),
userAgent: fmt.Sprintf("/NEO:%s/", version),
2018-01-26 18:04:13 +00:00
logger: logger,
peers: make(map[Peer]bool),
register: make(chan Peer),
unregister: make(chan Peer),
2018-01-26 18:04:13 +00:00
message: make(chan messageTuple),
2018-01-31 19:11:08 +00:00
relay: true, // currently relay is not handled.
2018-01-26 20:39:34 +00:00
net: net,
2018-01-26 18:04:13 +00:00
quit: make(chan struct{}),
2018-01-31 19:11:08 +00:00
versionCh: make(chan versionTuple),
getaddrCh: make(chan getaddrTuple),
invCh: make(chan invTuple),
addrCh: make(chan addrTuple),
2018-01-26 18:04:13 +00:00
}
return s
}
// Start run's the server.
func (s *Server) Start(port string, seeds []string) {
2018-01-26 20:39:34 +00:00
p, err := strconv.Atoi(port[1:len(port)])
if err != nil {
s.logger.Fatalf("could not convert port to integer: %s", err)
}
s.port = uint16(p)
2018-01-26 18:04:13 +00:00
fmt.Println(logo())
2018-01-28 07:03:18 +00:00
fmt.Println(string(s.userAgent))
fmt.Println("")
s.logger.Printf("NET: %s - TCP: %d - RELAY: %v - ID: %d",
s.net, int(s.port), s.relay, s.id)
2018-01-26 18:04:13 +00:00
go listenTCP(s, port)
if len(seeds) > 0 {
connectToSeeds(s, seeds)
}
s.loop()
}
// Stop the server, attemping a gracefull shutdown.
func (s *Server) Stop() { s.quit <- struct{}{} }
// shutdown the server, disconnecting all peers.
func (s *Server) shutdown() {
s.logger.Println("attemping a quitefull shutdown.")
s.listener.Close()
// disconnect and remove all connected peers.
for peer := range s.peers {
s.unregister <- peer
}
}
func (s *Server) loop() {
for {
select {
2018-01-31 19:11:08 +00:00
// When a new connection is been established, (by this server or remote node)
// its peer will be received on this channel.
// Any peer registration must happen via this channel.
2018-01-26 18:04:13 +00:00
case peer := <-s.register:
2018-01-31 19:11:08 +00:00
if len(s.peers) < maxPeers {
s.logger.Printf("peer registered from address %s", peer.addr())
s.peers[peer] = true
s.handlePeerConnected(peer)
}
2018-01-27 12:39:07 +00:00
2018-01-31 19:11:08 +00:00
// Unregister should take care of all the cleanup that has to be made.
2018-01-26 18:04:13 +00:00
case peer := <-s.unregister:
2018-01-27 07:37:07 +00:00
if _, ok := s.peers[peer]; ok {
peer.disconnect()
2018-01-27 07:37:07 +00:00
delete(s.peers, peer)
2018-01-31 19:11:08 +00:00
s.logger.Printf("peer %s disconnected", peer.addr())
}
// Process the received version and respond with a verack.
case t := <-s.versionCh:
if s.id == t.request.Nonce {
t.peer.disconnect()
2018-01-27 07:37:07 +00:00
}
2018-01-31 19:11:08 +00:00
if t.peer.addr().Port != t.request.Port {
t.peer.disconnect()
}
t.response <- newMessage(ModeDevNet, cmdVerack, nil)
// Process the getaddr cmd.
case t := <-s.getaddrCh:
t.response <- &Message{} // just for now.
// Process the addr cmd. Register peer will handle the maxPeers connected.
case t := <-s.addrCh:
for _, addr := range t.request.Addrs {
if !s.peerAlreadyConnected(addr.Addr) {
// TODO: this is not transport abstracted.
go connectToRemoteNode(s, addr.Addr.String())
}
}
t.response <- true
2018-01-29 18:17:49 +00:00
2018-01-31 19:11:08 +00:00
// Process inventories cmd.
case t := <-s.invCh:
if !t.request.Type.Valid() {
t.peer.disconnect()
break
}
if len(t.request.Hashes) == 0 {
t.peer.disconnect()
break
2018-01-26 18:04:13 +00:00
}
2018-01-29 18:17:49 +00:00
2018-01-31 19:11:08 +00:00
payload := payload.NewInventory(t.request.Type, t.request.Hashes)
msg := newMessage(s.net, cmdGetData, payload)
t.response <- msg
2018-01-26 18:04:13 +00:00
case <-s.quit:
s.shutdown()
}
}
}
2018-01-29 18:17:49 +00:00
// When a new peer is connected we send our version.
// No further communication should be made before both sides has received
// the versions of eachother.
2018-01-31 19:11:08 +00:00
func (s *Server) handlePeerConnected(p Peer) {
// TODO: get the blockheight of this server once core implemented this.
2018-01-28 07:03:18 +00:00
payload := payload.NewVersion(s.id, s.port, s.userAgent, 0, s.relay)
2018-01-27 15:00:28 +00:00
msg := newMessage(s.net, cmdVersion, payload)
2018-01-31 19:11:08 +00:00
p.callVersion(msg)
2018-01-26 18:04:13 +00:00
}
2018-01-31 19:11:08 +00:00
type versionTuple struct {
peer Peer
request *payload.Version
response chan *Message
}
2018-01-31 19:11:08 +00:00
func (s *Server) handleVersionCmd(msg *Message, p Peer) *Message {
t := versionTuple{
peer: p,
request: msg.Payload.(*payload.Version),
response: make(chan *Message),
2018-01-29 18:17:49 +00:00
}
2018-01-27 07:37:07 +00:00
2018-01-31 19:11:08 +00:00
s.versionCh <- t
2018-01-27 12:39:07 +00:00
2018-01-31 19:11:08 +00:00
return <-t.response
}
2018-01-28 10:12:05 +00:00
2018-01-31 19:11:08 +00:00
type getaddrTuple struct {
peer Peer
request *Message
response chan *Message
2018-01-28 10:12:05 +00:00
}
2018-01-31 19:11:08 +00:00
func (s *Server) handleGetaddrCmd(msg *Message, p Peer) *Message {
t := getaddrTuple{
peer: p,
request: msg,
response: make(chan *Message),
2018-01-28 10:20:42 +00:00
}
2018-01-31 19:11:08 +00:00
s.getaddrCh <- t
return <-t.response
2018-01-30 10:56:36 +00:00
}
2018-01-31 19:11:08 +00:00
type invTuple struct {
peer Peer
request *payload.Inventory
response chan *Message
}
2018-01-30 10:56:36 +00:00
2018-01-31 19:11:08 +00:00
func (s *Server) handleInvCmd(msg *Message, p Peer) *Message {
t := invTuple{
request: msg.Payload.(*payload.Inventory),
response: make(chan *Message),
}
2018-01-30 10:56:36 +00:00
2018-01-31 19:11:08 +00:00
s.invCh <- t
2018-01-31 19:11:08 +00:00
return <-t.response
}
2018-01-30 10:56:36 +00:00
2018-01-31 19:11:08 +00:00
type addrTuple struct {
request *payload.AddressList
response chan bool
2018-01-27 12:39:07 +00:00
}
2018-01-31 19:11:08 +00:00
func (s *Server) handleAddrCmd(msg *Message, p Peer) bool {
t := addrTuple{
request: msg.Payload.(*payload.AddressList),
response: make(chan bool),
}
2018-01-28 13:59:32 +00:00
2018-01-31 19:11:08 +00:00
s.addrCh <- t
2018-01-28 17:42:22 +00:00
2018-01-31 19:11:08 +00:00
return <-t.response
}
// check if the addr is already connected to the server.
func (s *Server) peerAlreadyConnected(addr net.Addr) bool {
2018-01-28 13:59:32 +00:00
for peer := range s.peers {
2018-01-31 19:11:08 +00:00
if peer.addr().String() == addr.String() {
2018-01-28 13:59:32 +00:00
return true
}
}
return false
}
func (s *Server) sendLoop(peer Peer) {
2018-01-28 15:18:48 +00:00
// TODO: check if this peer is still connected.
2018-01-28 10:12:05 +00:00
for {
getaddrMsg := newMessage(s.net, cmdGetAddr, nil)
2018-01-31 19:11:08 +00:00
peer.callGetaddr(getaddrMsg)
2018-01-28 10:12:05 +00:00
time.Sleep(120 * time.Second)
2018-01-28 10:12:05 +00:00
}
}
2018-01-26 18:04:13 +00:00
func logo() string {
return `
_ ____________ __________
/ | / / ____/ __ \ / ____/ __ \
/ |/ / __/ / / / /_____/ / __/ / / /
/ /| / /___/ /_/ /_____/ /_/ / /_/ /
/_/ |_/_____/\____/ \____/\____/
`
}