neoneo-go/pkg/peer/peer.go
decentralisedkev 8afec1ea45
[Peer] Add peer manager (#241)
* [PeerMgr]

- Add basic peer manager
2019-03-28 19:46:31 +00:00

341 lines
8.5 KiB
Go

// This impl uses channels to simulate the queue handler with the actor model.
// A suitable number k ,should be set for channel size, because if #numOfMsg > k,
// we lose determinism. k chosen should be large enough that when filled, it shall indicate that
// the peer has stopped responding, since we do not have a pingMSG, we will need another way to shut down
// peers
package peer
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/CityOfZion/neo-go/pkg/wire/command"
"github.com/CityOfZion/neo-go/pkg/peer/stall"
"github.com/CityOfZion/neo-go/pkg/wire"
"github.com/CityOfZion/neo-go/pkg/wire/payload"
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
"github.com/CityOfZion/neo-go/pkg/wire/util"
)
const (
maxOutboundConnections = 100
protocolVer = protocol.DefaultVersion
handshakeTimeout = 30 * time.Second
idleTimeout = 5 * time.Minute // If no message received after idleTimeout, then peer disconnects
// nodes will have `responseTime` seconds to reply with a response
responseTime = 120 * time.Second
// the stall detector will check every `tickerInterval` to see if messages
// are overdue. Should be less than `responseTime`
tickerInterval = 30 * time.Second
// The input buffer size is the amount of mesages that
// can be buffered into the channel to receive at once before
// blocking, and before determinism is broken
inputBufferSize = 100
// The output buffer size is the amount of messages that
// can be buffered into the channel to send at once before
// blocking, and before determinism is broken.
outputBufferSize = 100
// pingInterval = 20 * time.Second //Not implemented in neo clients
)
var (
errHandShakeTimeout = errors.New("Handshake timed out, peers have " + string(handshakeTimeout) + " Seconds to Complete the handshake")
)
// Peer represents a peer on the neo network
type Peer struct {
config LocalConfig
conn net.Conn
startHeight uint32
// atomic vals
disconnected int32
//unchangeable state: concurrent safe
addr string
protoVer protocol.Version
port uint16
inbound bool
userAgent string
services protocol.ServiceFlag
createdAt time.Time
relay bool
statemutex sync.Mutex
verackReceived bool
versionKnown bool
*stall.Detector
inch chan func() // will handle all incoming connections from peer
outch chan func() // will handle all outcoming connections from peer
quitch chan struct{}
}
// NewPeer returns a new NEO peer
func NewPeer(con net.Conn, inbound bool, cfg LocalConfig) *Peer {
return &Peer{
inch: make(chan func(), inputBufferSize),
outch: make(chan func(), outputBufferSize),
quitch: make(chan struct{}, 1),
inbound: inbound,
config: cfg,
conn: con,
createdAt: time.Now(),
startHeight: 0,
addr: con.RemoteAddr().String(),
Detector: stall.NewDetector(responseTime, tickerInterval),
}
}
// Write to a peer
func (p *Peer) Write(msg wire.Messager) error {
return wire.WriteMessage(p.conn, p.config.Net, msg)
}
// Read to a peer
func (p *Peer) Read() (wire.Messager, error) {
return wire.ReadMessage(p.conn, p.config.Net)
}
// Disconnect disconnects a peer and closes the connection
func (p *Peer) Disconnect() {
// return if already disconnected
if atomic.LoadInt32(&p.disconnected) != 0 {
return
}
atomic.AddInt32(&p.disconnected, 1)
p.Detector.Quit()
close(p.quitch)
p.conn.Close()
fmt.Println("Disconnected Peer with address", p.RemoteAddr().String())
}
// Port returns the peers port
func (p *Peer) Port() uint16 {
return p.port
}
// CreatedAt returns the time at which the connection was made
func (p *Peer) CreatedAt() time.Time {
return p.createdAt
}
// Height returns the latest recorded height of this peer
func (p *Peer) Height() uint32 {
return p.startHeight
}
// CanRelay returns true, if the peer can relay information
func (p *Peer) CanRelay() bool {
return p.relay
}
// LocalAddr returns this node's local address
func (p *Peer) LocalAddr() net.Addr {
return p.conn.LocalAddr()
}
// RemoteAddr returns the remote address of the connected peer
func (p *Peer) RemoteAddr() net.Addr {
return p.conn.RemoteAddr()
}
// Services returns the services offered by the peer
func (p *Peer) Services() protocol.ServiceFlag {
return p.config.Services
}
//Inbound returns true whether this peer is an inbound peer
func (p *Peer) Inbound() bool {
return p.inbound
}
// IsVerackReceived returns true, if this node has
// received a verack from this peer
func (p *Peer) IsVerackReceived() bool {
return p.verackReceived
}
//NotifyDisconnect returns once the peer has disconnected
// Blocking
func (p *Peer) NotifyDisconnect() {
fmt.Println("Peer has not disconnected yet")
<-p.quitch
fmt.Println("Peer has just disconnected")
}
//End of Exposed API functions//
// PingLoop not impl. in neo yet, adding it now
// will cause this client to disconnect from all other implementations
func (p *Peer) PingLoop() { /*not implemented in other neo clients*/ }
// Run is used to start communicating with the peer
// completes the handshake and starts observing
// for messages coming in
func (p *Peer) Run() error {
err := p.Handshake()
if err != nil {
return err
}
go p.StartProtocol()
go p.ReadLoop()
go p.WriteLoop()
//go p.PingLoop() // since it is not implemented. It will disconnect all other impls.
return nil
}
// StartProtocol run as a go-routine, will act as our queue for messages
// should be ran after handshake
func (p *Peer) StartProtocol() {
loop:
for atomic.LoadInt32(&p.disconnected) == 0 {
select {
case f := <-p.inch:
f()
case <-p.quitch:
break loop
case <-p.Detector.Quitch:
fmt.Println("Peer stalled, disconnecting")
break loop
}
}
p.Disconnect()
}
// ReadLoop Will block on the read until a message is read
// Should only be called after handshake is complete
// on a seperate go-routine.
func (p *Peer) ReadLoop() {
idleTimer := time.AfterFunc(idleTimeout, func() {
fmt.Println("Timing out peer")
p.Disconnect()
})
loop:
for atomic.LoadInt32(&p.disconnected) == 0 {
idleTimer.Reset(idleTimeout) // reset timer on each loop
readmsg, err := p.Read()
// Message read; stop Timer
idleTimer.Stop()
if err != nil {
fmt.Println("Err on read", err) // This will also happen if Peer is disconnected
break loop
}
// Remove message as pending from the stall detector
p.Detector.RemoveMessage(readmsg.Command())
switch msg := readmsg.(type) {
case *payload.VersionMessage:
fmt.Println("Already received a Version, disconnecting. " + p.RemoteAddr().String())
break loop // We have already done the handshake, break loop and disconnect
case *payload.VerackMessage:
if p.verackReceived {
fmt.Println("Already received a Verack, disconnecting. " + p.RemoteAddr().String())
break loop
}
p.statemutex.Lock() // This should not happen, however if it does, then we should set it.
p.verackReceived = true
p.statemutex.Unlock()
case *payload.AddrMessage:
p.OnAddr(msg)
case *payload.GetAddrMessage:
p.OnGetAddr(msg)
case *payload.GetBlocksMessage:
p.OnGetBlocks(msg)
case *payload.BlockMessage:
p.OnBlocks(msg)
case *payload.HeadersMessage:
p.OnHeaders(msg)
case *payload.GetHeadersMessage:
p.OnGetHeaders(msg)
case *payload.InvMessage:
p.OnInv(msg)
case *payload.GetDataMessage:
p.OnGetData(msg)
case *payload.TXMessage:
p.OnTX(msg)
default:
fmt.Println("Cannot recognise message", msg.Command()) //Do not disconnect peer, just Log Message
}
}
idleTimer.Stop()
p.Disconnect()
}
// WriteLoop will Queue all messages to be written to the peer.
func (p *Peer) WriteLoop() {
for atomic.LoadInt32(&p.disconnected) == 0 {
select {
case f := <-p.outch:
f()
case <-p.Detector.Quitch: // if the detector quits, disconnect peer
p.Disconnect()
}
}
}
// Outgoing Requests
// RequestHeaders will write a getheaders to this peer
func (p *Peer) RequestHeaders(hash util.Uint256) error {
c := make(chan error, 0)
p.outch <- func() {
getHeaders, err := payload.NewGetHeadersMessage([]util.Uint256{hash}, util.Uint256{})
err = p.Write(getHeaders)
if err != nil {
p.Detector.AddMessage(command.GetHeaders)
}
c <- err
}
return <-c
}
// RequestBlocks will ask this peer for a set of blocks
func (p *Peer) RequestBlocks(hashes []util.Uint256) error {
c := make(chan error, 0)
p.outch <- func() {
getdata, err := payload.NewGetDataMessage(payload.InvTypeBlock)
err = getdata.AddHashes(hashes)
if err != nil {
c <- err
return
}
err = p.Write(getdata)
if err != nil {
p.Detector.AddMessage(command.GetData)
}
c <- err
}
return <-c
}