neoneo-go/pkg/network/server.go

252 lines
6.1 KiB
Go
Raw Normal View History

2018-01-26 18:04:13 +00:00
package network
import (
"errors"
"fmt"
"log"
"net"
"os"
2018-01-26 20:39:34 +00:00
"strconv"
2018-01-27 15:00:28 +00:00
"github.com/anthdm/neo-go/pkg/network/payload"
2018-01-28 07:03:18 +00:00
"github.com/anthdm/neo-go/pkg/util"
2018-01-26 18:04:13 +00:00
)
const (
2018-01-26 20:39:34 +00:00
// node version
version = "2.6.0"
// official ports according to the protocol.
2018-01-26 18:04:13 +00:00
portMainNet = 10333
portTestNet = 20333
2018-01-26 20:39:34 +00:00
)
var (
// rpcLogger used for debugging RPC messages between nodes.
rpcLogger = log.New(os.Stdout, "RPC :: ", 0)
2018-01-26 18:04:13 +00:00
)
type messageTuple struct {
peer *Peer
msg *Message
}
// Server is the representation of a full working NEO TCP node.
type Server struct {
logger *log.Logger
2018-01-28 07:03:18 +00:00
// id of the server
id uint32
2018-01-26 20:39:34 +00:00
// the port the TCP listener is listening on.
port uint16
2018-01-26 18:04:13 +00:00
// userAgent of the server.
userAgent string
// The "magic" mode the server is currently running on.
// This can either be 0x00746e41 or 0x74746e41 for main or test net.
2018-01-26 20:39:34 +00:00
// Or 56753 to work with the docker privnet.
net NetMode
2018-01-26 18:04:13 +00:00
// map that holds all connected peers to this server.
peers map[*Peer]bool
register chan *Peer
unregister chan *Peer
// channel for coordinating messages.
message chan messageTuple
// channel used to gracefull shutdown the server.
quit chan struct{}
// Whether this server will receive and forward messages.
relay bool
// TCP listener of the server
listener net.Listener
}
// NewServer returns a pointer to a new server.
2018-01-26 20:39:34 +00:00
func NewServer(net NetMode) *Server {
2018-01-26 18:04:13 +00:00
logger := log.New(os.Stdout, "NEO SERVER :: ", 0)
2018-01-26 20:39:34 +00:00
if net != ModeTestNet && net != ModeMainNet && net != ModeDevNet {
logger.Fatalf("invalid network mode %d", net)
2018-01-26 18:04:13 +00:00
}
s := &Server{
2018-01-27 07:37:07 +00:00
// It is important to have this user agent correct. Otherwise we will get
// disconnected.
2018-01-28 07:03:18 +00:00
id: util.RandUint32(1111111, 9999999),
2018-01-27 07:37:07 +00:00
userAgent: fmt.Sprintf("\v/NEO:%s/", version),
2018-01-26 18:04:13 +00:00
logger: logger,
peers: make(map[*Peer]bool),
register: make(chan *Peer),
unregister: make(chan *Peer),
message: make(chan messageTuple),
relay: true,
2018-01-26 20:39:34 +00:00
net: net,
2018-01-26 18:04:13 +00:00
quit: make(chan struct{}),
}
return s
}
// Start run's the server.
func (s *Server) Start(port string, seeds []string) {
2018-01-26 20:39:34 +00:00
p, err := strconv.Atoi(port[1:len(port)])
if err != nil {
s.logger.Fatalf("could not convert port to integer: %s", err)
}
s.port = uint16(p)
2018-01-26 18:04:13 +00:00
fmt.Println(logo())
2018-01-28 07:03:18 +00:00
fmt.Println(string(s.userAgent))
fmt.Println("")
s.logger.Printf("NET: %s - TCP: %d - RELAY: %v - ID: %d",
s.net, int(s.port), s.relay, s.id)
2018-01-26 18:04:13 +00:00
go listenTCP(s, port)
if len(seeds) > 0 {
connectToSeeds(s, seeds)
}
s.loop()
}
// Stop the server, attemping a gracefull shutdown.
func (s *Server) Stop() { s.quit <- struct{}{} }
// shutdown the server, disconnecting all peers.
func (s *Server) shutdown() {
s.logger.Println("attemping a quitefull shutdown.")
s.listener.Close()
// disconnect and remove all connected peers.
for peer := range s.peers {
peer.conn.Close()
s.unregister <- peer
}
}
2018-01-27 07:37:07 +00:00
func (s *Server) disconnect(p *Peer) {
p.conn.Close()
close(p.send)
s.unregister <- p
}
2018-01-26 18:04:13 +00:00
func (s *Server) loop() {
for {
select {
case peer := <-s.register:
s.logger.Printf("peer registered from address %s", peer.conn.RemoteAddr())
2018-01-26 20:39:34 +00:00
2018-01-27 12:39:07 +00:00
s.peers[peer] = true
2018-01-26 20:39:34 +00:00
// only respond with the version mesage if the peer initiated the connection.
if peer.initiator {
2018-01-26 20:39:34 +00:00
resp, err := s.handlePeerConnected()
if err != nil {
s.logger.Fatalf("handling initial peer connection failed: %s", err)
}
peer.send <- resp
2018-01-26 18:04:13 +00:00
}
case peer := <-s.unregister:
2018-01-27 07:37:07 +00:00
if _, ok := s.peers[peer]; ok {
delete(s.peers, peer)
s.logger.Printf("peer %s disconnected", peer.conn.RemoteAddr())
}
2018-01-26 18:04:13 +00:00
case tuple := <-s.message:
if err := s.processMessage(tuple.msg, tuple.peer); err != nil {
s.logger.Fatalf("failed to process message: %s", err)
2018-01-27 12:39:07 +00:00
s.disconnect(tuple.peer)
2018-01-26 18:04:13 +00:00
}
case <-s.quit:
s.shutdown()
}
}
}
// TODO: unregister peers on error.
// processMessage processes the received message from a remote node.
func (s *Server) processMessage(msg *Message, peer *Peer) error {
2018-01-28 07:03:18 +00:00
rpcLogger.Printf("IN :: %s", msg.commandType())
if msg.Length > 0 {
rpcLogger.Printf("IN :: %+v", msg.Payload)
}
2018-01-26 20:39:34 +00:00
2018-01-26 18:04:13 +00:00
switch msg.commandType() {
case cmdVersion:
2018-01-27 15:47:43 +00:00
return s.handleVersionCmd(msg.Payload.(*payload.Version), peer)
2018-01-26 18:04:13 +00:00
case cmdVerack:
case cmdGetAddr:
2018-01-27 12:39:07 +00:00
return s.handleGetAddrCmd(msg, peer)
2018-01-26 18:04:13 +00:00
case cmdAddr:
case cmdGetHeaders:
case cmdHeaders:
case cmdGetBlocks:
case cmdInv:
case cmdGetData:
case cmdBlock:
case cmdTX:
default:
return errors.New("invalid RPC command received: " + string(msg.commandType()))
}
return nil
}
// When a new peer is connected we respond with the version command.
// No further communication should been made before both sides has received
// the version of eachother.
func (s *Server) handlePeerConnected() (*Message, error) {
2018-01-28 07:03:18 +00:00
payload := payload.NewVersion(s.id, s.port, s.userAgent, 0, s.relay)
2018-01-27 15:00:28 +00:00
msg := newMessage(s.net, cmdVersion, payload)
2018-01-26 18:04:13 +00:00
return msg, nil
}
2018-01-27 07:37:07 +00:00
// Version declares the server's version.
2018-01-27 15:00:28 +00:00
func (s *Server) handleVersionCmd(v *payload.Version, peer *Peer) error {
2018-01-26 18:04:13 +00:00
// TODO: check version and verify to trust that node.
2018-01-28 07:03:18 +00:00
payload := payload.NewVersion(s.id, s.port, s.userAgent, 0, s.relay)
2018-01-27 12:39:07 +00:00
// we respond with our version.
2018-01-27 15:00:28 +00:00
versionMsg := newMessage(s.net, cmdVersion, payload)
2018-01-27 07:37:07 +00:00
peer.send <- versionMsg
2018-01-27 12:39:07 +00:00
// we respond with a verack, we successfully received peer's version
// at this point.
2018-01-27 07:37:07 +00:00
peer.verack = true
verackMsg := newMessage(s.net, cmdVerack, nil)
peer.send <- verackMsg
2018-01-27 12:39:07 +00:00
return nil
}
// After receiving the "getaddr" the server needs to respond with an "addr" message.
// providing information about the other nodes in the network.
// e.g. this server's connected peers.
func (s *Server) handleGetAddrCmd(msg *Message, peer *Peer) error {
// payload := NewAddrPayload()
// b, err := payload.encode()
// if err != nil {
// return err
// }
2018-01-27 15:47:43 +00:00
// var addrList []AddrWithTimestamp
// for peer := range s.peers {
// addrList = append(addrList, newAddrWithTimestampFromPeer(peer))
// }
2018-01-27 12:39:07 +00:00
return nil
2018-01-26 18:04:13 +00:00
}
func logo() string {
return `
_ ____________ __________
/ | / / ____/ __ \ / ____/ __ \
/ |/ / __/ / / / /_____/ / __/ / / /
/ /| / /___/ /_/ /_____/ /_/ / /_/ /
/_/ |_/_____/\____/ \____/\____/
`
}