30e5aa8f48
* [database] - Add Prefix method to interface - Convert leveldb error to `database error` - Be explicit with prefixedKey in `Table` as slices can be pointers * [protocol] - Add stringer method to protocol * [Chaindb] - Added saveBlock() which will allow us to save a block into the database. The block is broken up into transactions and Headers. The headers are saved as is. The transactions are saved as is, then the utxos in the transactions are collected to make the utxo db. - Verification for blocks and transactions will reside in the same package. Note that the save methods are all unexported, while the Get methods are exported. Making it so that any can call a get method, but only code in this package may save to the database. The other code which will reside in this package will be code verification logic. * [chaindb] - Added saveHeader function which saveHeaders uses - Update the latest header, each time we save a header instead of after a batch. This is so that we can call saveHeader without saveHeaders. This functionality can be rolled back if the performance of updating the header after a batch is significant - small refactor in test code
453 lines
12 KiB
Go
453 lines
12 KiB
Go
// This impl uses channels to simulate the queue handler with the actor model.
|
|
// A suitable number k ,should be set for channel size, because if #numOfMsg > k,
|
|
// we lose determinism. k chosen should be large enough that when filled, it shall indicate that
|
|
// the peer has stopped responding, since we do not have a pingMSG, we will need another way to shut down
|
|
// peers
|
|
|
|
package peer
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/CityOfZion/neo-go/pkg/wire/command"
|
|
|
|
"github.com/CityOfZion/neo-go/pkg/peer/stall"
|
|
"github.com/CityOfZion/neo-go/pkg/wire"
|
|
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
|
|
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
)
|
|
|
|
const (
|
|
maxOutboundConnections = 100
|
|
protocolVer = protocol.DefaultVersion
|
|
handshakeTimeout = 30 * time.Second
|
|
idleTimeout = 5 * time.Minute // If no message received after idleTimeout, then peer disconnects
|
|
|
|
// nodes will have `responseTime` seconds to reply with a response
|
|
responseTime = 120 * time.Second
|
|
|
|
// the stall detector will check every `tickerInterval` to see if messages
|
|
// are overdue. Should be less than `responseTime`
|
|
tickerInterval = 30 * time.Second
|
|
|
|
// The input buffer size is the amount of mesages that
|
|
// can be buffered into the channel to receive at once before
|
|
// blocking, and before determinism is broken
|
|
inputBufferSize = 100
|
|
|
|
// The output buffer size is the amount of messages that
|
|
// can be buffered into the channel to send at once before
|
|
// blocking, and before determinism is broken.
|
|
outputBufferSize = 100
|
|
|
|
// pingInterval = 20 * time.Second //Not implemented in neo clients
|
|
)
|
|
|
|
var (
|
|
errHandShakeTimeout = errors.New("Handshake timed out, peers have " + string(handshakeTimeout) + " Seconds to Complete the handshake")
|
|
)
|
|
|
|
// Peer represents a peer on the neo network
|
|
type Peer struct {
|
|
config LocalConfig
|
|
conn net.Conn
|
|
|
|
// atomic vals
|
|
disconnected int32
|
|
|
|
//unchangeable state: concurrent safe
|
|
addr string
|
|
protoVer protocol.Version
|
|
port uint16
|
|
inbound bool
|
|
userAgent string
|
|
services protocol.ServiceFlag
|
|
createdAt time.Time
|
|
relay bool
|
|
|
|
statemutex sync.Mutex
|
|
verackReceived bool
|
|
versionKnown bool
|
|
|
|
*stall.Detector
|
|
|
|
inch chan func() // will handle all incoming connections from peer
|
|
outch chan func() // will handle all outcoming connections from peer
|
|
quitch chan struct{}
|
|
}
|
|
|
|
// NewPeer returns a new NEO peer
|
|
func NewPeer(con net.Conn, inbound bool, cfg LocalConfig) *Peer {
|
|
p := Peer{}
|
|
p.inch = make(chan func(), inputBufferSize)
|
|
p.outch = make(chan func(), outputBufferSize)
|
|
p.quitch = make(chan struct{}, 1)
|
|
p.inbound = inbound
|
|
p.config = cfg
|
|
p.conn = con
|
|
p.createdAt = time.Now()
|
|
p.addr = p.conn.RemoteAddr().String()
|
|
|
|
p.Detector = stall.NewDetector(responseTime, tickerInterval)
|
|
|
|
// TODO: set the unchangeable states
|
|
return &p
|
|
}
|
|
|
|
// Write to a peer
|
|
func (p *Peer) Write(msg wire.Messager) error {
|
|
return wire.WriteMessage(p.conn, p.config.Net, msg)
|
|
}
|
|
|
|
// Read to a peer
|
|
func (p *Peer) Read() (wire.Messager, error) {
|
|
return wire.ReadMessage(p.conn, p.config.Net)
|
|
}
|
|
|
|
// Disconnect disconnects a peer and closes the connection
|
|
func (p *Peer) Disconnect() {
|
|
|
|
// return if already disconnected
|
|
if atomic.LoadInt32(&p.disconnected) != 0 {
|
|
return
|
|
}
|
|
|
|
atomic.AddInt32(&p.disconnected, 1)
|
|
|
|
p.Detector.Quit()
|
|
close(p.quitch)
|
|
p.conn.Close()
|
|
|
|
fmt.Println("Disconnected Peer with address", p.RemoteAddr().String())
|
|
|
|
}
|
|
|
|
// Port returns the peers port
|
|
func (p *Peer) Port() uint16 {
|
|
return p.port
|
|
}
|
|
|
|
// CreatedAt returns the time at which the connection was made
|
|
func (p *Peer) CreatedAt() time.Time {
|
|
return p.createdAt
|
|
}
|
|
|
|
// CanRelay returns true, if the peer can relay information
|
|
func (p *Peer) CanRelay() bool {
|
|
return p.relay
|
|
}
|
|
|
|
// LocalAddr returns this node's local address
|
|
func (p *Peer) LocalAddr() net.Addr {
|
|
return p.conn.LocalAddr()
|
|
}
|
|
|
|
// RemoteAddr returns the remote address of the connected peer
|
|
func (p *Peer) RemoteAddr() net.Addr {
|
|
return p.conn.RemoteAddr()
|
|
}
|
|
|
|
// Services returns the services offered by the peer
|
|
func (p *Peer) Services() protocol.ServiceFlag {
|
|
return p.config.Services
|
|
}
|
|
|
|
//Inbound returns true whether this peer is an inbound peer
|
|
func (p *Peer) Inbound() bool {
|
|
return p.inbound
|
|
}
|
|
|
|
// UserAgent returns this nodes, useragent
|
|
func (p *Peer) UserAgent() string {
|
|
return p.config.UserAgent
|
|
}
|
|
|
|
// IsVerackReceived returns true, if this node has
|
|
// received a verack from this peer
|
|
func (p *Peer) IsVerackReceived() bool {
|
|
return p.verackReceived
|
|
}
|
|
|
|
//NotifyDisconnect returns once the peer has disconnected
|
|
// Blocking
|
|
func (p *Peer) NotifyDisconnect() bool {
|
|
fmt.Println("Peer has not disconnected yet")
|
|
<-p.quitch
|
|
fmt.Println("Peer has just disconnected")
|
|
return true
|
|
}
|
|
|
|
//End of Exposed API functions//
|
|
|
|
// PingLoop not impl. in neo yet, adding it now
|
|
// will cause this client to disconnect from all other implementations
|
|
func (p *Peer) PingLoop() { /*not implemented in other neo clients*/ }
|
|
|
|
// Run is used to start communicating with the peer
|
|
// completes the handshake and starts observing
|
|
// for messages coming in
|
|
func (p *Peer) Run() error {
|
|
|
|
err := p.Handshake()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go p.StartProtocol()
|
|
go p.ReadLoop()
|
|
go p.WriteLoop()
|
|
|
|
//go p.PingLoop() // since it is not implemented. It will disconnect all other impls.
|
|
return nil
|
|
|
|
}
|
|
|
|
// StartProtocol run as a go-routine, will act as our queue for messages
|
|
// should be ran after handshake
|
|
func (p *Peer) StartProtocol() {
|
|
loop:
|
|
for atomic.LoadInt32(&p.disconnected) == 0 {
|
|
select {
|
|
case f := <-p.inch:
|
|
f()
|
|
case <-p.quitch:
|
|
break loop
|
|
case <-p.Detector.Quitch:
|
|
fmt.Println("Peer stalled, disconnecting")
|
|
break loop
|
|
}
|
|
}
|
|
p.Disconnect()
|
|
}
|
|
|
|
// ReadLoop Will block on the read until a message is read
|
|
// Should only be called after handshake is complete
|
|
// on a seperate go-routine.
|
|
func (p *Peer) ReadLoop() {
|
|
|
|
idleTimer := time.AfterFunc(idleTimeout, func() {
|
|
fmt.Println("Timing out peer")
|
|
p.Disconnect()
|
|
})
|
|
|
|
loop:
|
|
for atomic.LoadInt32(&p.disconnected) == 0 {
|
|
|
|
idleTimer.Reset(idleTimeout) // reset timer on each loop
|
|
|
|
readmsg, err := p.Read()
|
|
|
|
// Message read; stop Timer
|
|
idleTimer.Stop()
|
|
|
|
if err != nil {
|
|
fmt.Println("Err on read", err) // This will also happen if Peer is disconnected
|
|
break loop
|
|
}
|
|
|
|
// Remove message as pending from the stall detector
|
|
p.Detector.RemoveMessage(readmsg.Command())
|
|
|
|
switch msg := readmsg.(type) {
|
|
|
|
case *payload.VersionMessage:
|
|
fmt.Println("Already received a Version, disconnecting. " + p.RemoteAddr().String())
|
|
break loop // We have already done the handshake, break loop and disconnect
|
|
case *payload.VerackMessage:
|
|
if p.verackReceived {
|
|
fmt.Println("Already received a Verack, disconnecting. " + p.RemoteAddr().String())
|
|
break loop
|
|
}
|
|
p.statemutex.Lock() // This should not happen, however if it does, then we should set it.
|
|
p.verackReceived = true
|
|
p.statemutex.Unlock()
|
|
case *payload.AddrMessage:
|
|
p.OnAddr(msg)
|
|
case *payload.GetAddrMessage:
|
|
p.OnGetAddr(msg)
|
|
case *payload.GetBlocksMessage:
|
|
p.OnGetBlocks(msg)
|
|
case *payload.BlockMessage:
|
|
p.OnBlocks(msg)
|
|
case *payload.HeadersMessage:
|
|
p.OnHeaders(msg)
|
|
case *payload.GetHeadersMessage:
|
|
p.OnGetHeaders(msg)
|
|
case *payload.InvMessage:
|
|
p.OnInv(msg)
|
|
case *payload.GetDataMessage:
|
|
p.OnGetData(msg)
|
|
case *payload.TXMessage:
|
|
p.OnTX(msg)
|
|
default:
|
|
fmt.Println("Cannot recognise message", msg.Command()) //Do not disconnect peer, just Log Message
|
|
}
|
|
}
|
|
|
|
idleTimer.Stop()
|
|
p.Disconnect()
|
|
}
|
|
|
|
// WriteLoop will Queue all messages to be written to the peer.
|
|
func (p *Peer) WriteLoop() {
|
|
for atomic.LoadInt32(&p.disconnected) == 0 {
|
|
select {
|
|
case f := <-p.outch:
|
|
f()
|
|
case <-p.Detector.Quitch: // if the detector quits, disconnect peer
|
|
p.Disconnect()
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnGetData is called when a GetData message is received
|
|
func (p *Peer) OnGetData(msg *payload.GetDataMessage) {
|
|
|
|
p.inch <- func() {
|
|
if p.config.OnInv != nil {
|
|
p.config.OnGetData(msg)
|
|
}
|
|
fmt.Println("That was an getdata Message please pass func down through config", msg.Command())
|
|
}
|
|
}
|
|
|
|
//OnTX is callwed when a TX message is received
|
|
func (p *Peer) OnTX(msg *payload.TXMessage) {
|
|
|
|
p.inch <- func() {
|
|
getdata, err := payload.NewGetDataMessage(payload.InvTypeTx)
|
|
if err != nil {
|
|
fmt.Println("Eor", err)
|
|
}
|
|
id, err := msg.Tx.ID()
|
|
getdata.AddHash(id)
|
|
p.Write(getdata)
|
|
}
|
|
}
|
|
|
|
// OnInv is called when a Inv message is received
|
|
func (p *Peer) OnInv(msg *payload.InvMessage) {
|
|
|
|
p.inch <- func() {
|
|
if p.config.OnInv != nil {
|
|
p.config.OnInv(p, msg)
|
|
}
|
|
fmt.Println("That was an inv Message please pass func down through config", msg.Command())
|
|
}
|
|
}
|
|
|
|
// OnGetHeaders is called when a GetHeaders message is received
|
|
func (p *Peer) OnGetHeaders(msg *payload.GetHeadersMessage) {
|
|
p.inch <- func() {
|
|
if p.config.OnGetHeaders != nil {
|
|
p.config.OnGetHeaders(msg)
|
|
}
|
|
fmt.Println("That was a getheaders message, please pass func down through config", msg.Command())
|
|
|
|
}
|
|
}
|
|
|
|
// OnAddr is called when a Addr message is received
|
|
func (p *Peer) OnAddr(msg *payload.AddrMessage) {
|
|
p.inch <- func() {
|
|
if p.config.OnAddr != nil {
|
|
p.config.OnAddr(p, msg)
|
|
}
|
|
fmt.Println("That was a addr message, please pass func down through config", msg.Command())
|
|
|
|
}
|
|
}
|
|
|
|
// OnGetAddr is called when a GetAddr message is received
|
|
func (p *Peer) OnGetAddr(msg *payload.GetAddrMessage) {
|
|
p.inch <- func() {
|
|
if p.config.OnGetAddr != nil {
|
|
p.config.OnGetAddr(p, msg)
|
|
}
|
|
fmt.Println("That was a getaddr message, please pass func down through config", msg.Command())
|
|
|
|
}
|
|
}
|
|
|
|
// OnGetBlocks is called when a GetBlocks message is received
|
|
func (p *Peer) OnGetBlocks(msg *payload.GetBlocksMessage) {
|
|
p.inch <- func() {
|
|
if p.config.OnGetBlocks != nil {
|
|
p.config.OnGetBlocks(msg)
|
|
}
|
|
fmt.Println("That was a getblocks message, please pass func down through config", msg.Command())
|
|
}
|
|
}
|
|
|
|
// OnBlocks is called when a Blocks message is received
|
|
func (p *Peer) OnBlocks(msg *payload.BlockMessage) {
|
|
p.inch <- func() {
|
|
if p.config.OnBlock != nil {
|
|
p.config.OnBlock(p, msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
// OnVersion Listener will be called
|
|
// during the handshake, any error checking should be done here for the versionMessage.
|
|
// This should only ever be called during the handshake. Any other place and the peer will disconnect.
|
|
func (p *Peer) OnVersion(msg *payload.VersionMessage) error {
|
|
if msg.Nonce == p.config.Nonce {
|
|
p.conn.Close()
|
|
return errors.New("Self connection, disconnecting Peer")
|
|
}
|
|
p.versionKnown = true
|
|
p.port = msg.Port
|
|
p.services = msg.Services
|
|
p.userAgent = string(msg.UserAgent)
|
|
p.createdAt = time.Now()
|
|
p.relay = msg.Relay
|
|
return nil
|
|
}
|
|
|
|
// OnHeaders is called when a Headers message is received
|
|
func (p *Peer) OnHeaders(msg *payload.HeadersMessage) {
|
|
fmt.Println("We have received the headers")
|
|
p.inch <- func() {
|
|
if p.config.OnHeader != nil {
|
|
p.config.OnHeader(p, msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
// RequestHeaders will write a getheaders to this peer
|
|
func (p *Peer) RequestHeaders(hash util.Uint256) error {
|
|
c := make(chan error, 0)
|
|
p.outch <- func() {
|
|
p.Detector.AddMessage(command.GetHeaders)
|
|
getHeaders, err := payload.NewGetHeadersMessage([]util.Uint256{hash}, util.Uint256{})
|
|
err = p.Write(getHeaders)
|
|
c <- err
|
|
}
|
|
return <-c
|
|
}
|
|
|
|
// RequestBlocks will ask this peer for a set of blocks
|
|
func (p *Peer) RequestBlocks(hashes []util.Uint256) error {
|
|
c := make(chan error, 0)
|
|
|
|
p.outch <- func() {
|
|
p.Detector.AddMessage(command.GetData)
|
|
getdata, err := payload.NewGetDataMessage(payload.InvTypeBlock)
|
|
err = getdata.AddHashes(hashes)
|
|
if err != nil {
|
|
c <- err
|
|
return
|
|
}
|
|
err = p.Write(getdata)
|
|
c <- err
|
|
}
|
|
|
|
return <-c
|
|
|
|
}
|