forked from TrueCloudLab/neoneo-go
_pkg.dev: drop peer/peermgr/syncmgr
It was a nice attempt, but there is not a lot that we can reuse for the master branch. Refs. #307.
This commit is contained in:
parent
441f1d3bf5
commit
cea983acc6
22 changed files with 0 additions and 2584 deletions
|
@ -1,31 +0,0 @@
|
||||||
package peer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
|
|
||||||
)
|
|
||||||
|
|
||||||
// LocalConfig specifies the properties that should be available for each remote peer
|
|
||||||
type LocalConfig struct {
|
|
||||||
Net protocol.Magic
|
|
||||||
UserAgent string
|
|
||||||
Services protocol.ServiceFlag
|
|
||||||
Nonce uint32
|
|
||||||
ProtocolVer protocol.Version
|
|
||||||
Relay bool
|
|
||||||
Port uint16
|
|
||||||
|
|
||||||
// pointer to config will keep the startheight updated
|
|
||||||
StartHeight func() uint32
|
|
||||||
|
|
||||||
// Response Handlers
|
|
||||||
OnHeader func(*Peer, *payload.HeadersMessage)
|
|
||||||
OnGetHeaders func(*Peer, *payload.GetHeadersMessage)
|
|
||||||
OnAddr func(*Peer, *payload.AddrMessage)
|
|
||||||
OnGetAddr func(*Peer, *payload.GetAddrMessage)
|
|
||||||
OnInv func(*Peer, *payload.InvMessage)
|
|
||||||
OnGetData func(*Peer, *payload.GetDataMessage)
|
|
||||||
OnBlock func(*Peer, *payload.BlockMessage)
|
|
||||||
OnGetBlocks func(*Peer, *payload.GetBlocksMessage)
|
|
||||||
OnTx func(*Peer, *payload.TXMessage)
|
|
||||||
}
|
|
|
@ -1,340 +0,0 @@
|
||||||
// This impl uses channels to simulate the queue handler with the actor model.
|
|
||||||
// A suitable number k ,should be set for channel size, because if #numOfMsg > k,
|
|
||||||
// we lose determinism. k chosen should be large enough that when filled, it shall indicate that
|
|
||||||
// the peer has stopped responding, since we do not have a pingMSG, we will need another way to shut down
|
|
||||||
// peers
|
|
||||||
|
|
||||||
package peer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/command"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/peer/stall"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
maxOutboundConnections = 100
|
|
||||||
protocolVer = protocol.DefaultVersion
|
|
||||||
handshakeTimeout = 30 * time.Second
|
|
||||||
idleTimeout = 5 * time.Minute // If no message received after idleTimeout, then peer disconnects
|
|
||||||
|
|
||||||
// nodes will have `responseTime` seconds to reply with a response
|
|
||||||
responseTime = 120 * time.Second
|
|
||||||
|
|
||||||
// the stall detector will check every `tickerInterval` to see if messages
|
|
||||||
// are overdue. Should be less than `responseTime`
|
|
||||||
tickerInterval = 30 * time.Second
|
|
||||||
|
|
||||||
// The input buffer size is the amount of mesages that
|
|
||||||
// can be buffered into the channel to receive at once before
|
|
||||||
// blocking, and before determinism is broken
|
|
||||||
inputBufferSize = 100
|
|
||||||
|
|
||||||
// The output buffer size is the amount of messages that
|
|
||||||
// can be buffered into the channel to send at once before
|
|
||||||
// blocking, and before determinism is broken.
|
|
||||||
outputBufferSize = 100
|
|
||||||
|
|
||||||
// pingInterval = 20 * time.Second //Not implemented in neo clients
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
errHandShakeTimeout = errors.New("Handshake timed out, peers have " + string(handshakeTimeout) + " Seconds to Complete the handshake")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Peer represents a peer on the neo network
|
|
||||||
type Peer struct {
|
|
||||||
config LocalConfig
|
|
||||||
conn net.Conn
|
|
||||||
|
|
||||||
startHeight uint32
|
|
||||||
|
|
||||||
// atomic vals
|
|
||||||
disconnected int32
|
|
||||||
|
|
||||||
//unchangeable state: concurrent safe
|
|
||||||
addr string
|
|
||||||
protoVer protocol.Version
|
|
||||||
port uint16
|
|
||||||
inbound bool
|
|
||||||
userAgent string
|
|
||||||
services protocol.ServiceFlag
|
|
||||||
createdAt time.Time
|
|
||||||
relay bool
|
|
||||||
|
|
||||||
statemutex sync.Mutex
|
|
||||||
verackReceived bool
|
|
||||||
versionKnown bool
|
|
||||||
|
|
||||||
*stall.Detector
|
|
||||||
|
|
||||||
inch chan func() // will handle all incoming connections from peer
|
|
||||||
outch chan func() // will handle all outcoming connections from peer
|
|
||||||
quitch chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPeer returns a new NEO peer
|
|
||||||
func NewPeer(con net.Conn, inbound bool, cfg LocalConfig) *Peer {
|
|
||||||
return &Peer{
|
|
||||||
inch: make(chan func(), inputBufferSize),
|
|
||||||
outch: make(chan func(), outputBufferSize),
|
|
||||||
quitch: make(chan struct{}, 1),
|
|
||||||
inbound: inbound,
|
|
||||||
config: cfg,
|
|
||||||
conn: con,
|
|
||||||
createdAt: time.Now(),
|
|
||||||
startHeight: 0,
|
|
||||||
addr: con.RemoteAddr().String(),
|
|
||||||
Detector: stall.NewDetector(responseTime, tickerInterval),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write to a peer
|
|
||||||
func (p *Peer) Write(msg wire.Messager) error {
|
|
||||||
return wire.WriteMessage(p.conn, p.config.Net, msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read to a peer
|
|
||||||
func (p *Peer) Read() (wire.Messager, error) {
|
|
||||||
return wire.ReadMessage(p.conn, p.config.Net)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Disconnect disconnects a peer and closes the connection
|
|
||||||
func (p *Peer) Disconnect() {
|
|
||||||
|
|
||||||
// return if already disconnected
|
|
||||||
if atomic.LoadInt32(&p.disconnected) != 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.AddInt32(&p.disconnected, 1)
|
|
||||||
|
|
||||||
p.Detector.Quit()
|
|
||||||
close(p.quitch)
|
|
||||||
p.conn.Close()
|
|
||||||
|
|
||||||
fmt.Println("Disconnected Peer with address", p.RemoteAddr().String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Port returns the peers port
|
|
||||||
func (p *Peer) Port() uint16 {
|
|
||||||
return p.port
|
|
||||||
}
|
|
||||||
|
|
||||||
// CreatedAt returns the time at which the connection was made
|
|
||||||
func (p *Peer) CreatedAt() time.Time {
|
|
||||||
return p.createdAt
|
|
||||||
}
|
|
||||||
|
|
||||||
// Height returns the latest recorded height of this peer
|
|
||||||
func (p *Peer) Height() uint32 {
|
|
||||||
return p.startHeight
|
|
||||||
}
|
|
||||||
|
|
||||||
// CanRelay returns true, if the peer can relay information
|
|
||||||
func (p *Peer) CanRelay() bool {
|
|
||||||
return p.relay
|
|
||||||
}
|
|
||||||
|
|
||||||
// LocalAddr returns this node's local address
|
|
||||||
func (p *Peer) LocalAddr() net.Addr {
|
|
||||||
return p.conn.LocalAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoteAddr returns the remote address of the connected peer
|
|
||||||
func (p *Peer) RemoteAddr() net.Addr {
|
|
||||||
return p.conn.RemoteAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Services returns the services offered by the peer
|
|
||||||
func (p *Peer) Services() protocol.ServiceFlag {
|
|
||||||
return p.config.Services
|
|
||||||
}
|
|
||||||
|
|
||||||
//Inbound returns true whether this peer is an inbound peer
|
|
||||||
func (p *Peer) Inbound() bool {
|
|
||||||
return p.inbound
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsVerackReceived returns true, if this node has
|
|
||||||
// received a verack from this peer
|
|
||||||
func (p *Peer) IsVerackReceived() bool {
|
|
||||||
return p.verackReceived
|
|
||||||
}
|
|
||||||
|
|
||||||
//NotifyDisconnect returns once the peer has disconnected
|
|
||||||
// Blocking
|
|
||||||
func (p *Peer) NotifyDisconnect() {
|
|
||||||
<-p.quitch
|
|
||||||
fmt.Println("Peer has just disconnected")
|
|
||||||
}
|
|
||||||
|
|
||||||
//End of Exposed API functions//
|
|
||||||
|
|
||||||
// PingLoop not impl. in neo yet, adding it now
|
|
||||||
// will cause this client to disconnect from all other implementations
|
|
||||||
func (p *Peer) PingLoop() { /*not implemented in other neo clients*/ }
|
|
||||||
|
|
||||||
// Run is used to start communicating with the peer
|
|
||||||
// completes the handshake and starts observing
|
|
||||||
// for messages coming in
|
|
||||||
func (p *Peer) Run() error {
|
|
||||||
|
|
||||||
err := p.Handshake()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
go p.StartProtocol()
|
|
||||||
go p.ReadLoop()
|
|
||||||
go p.WriteLoop()
|
|
||||||
|
|
||||||
//go p.PingLoop() // since it is not implemented. It will disconnect all other impls.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// StartProtocol run as a go-routine, will act as our queue for messages
|
|
||||||
// should be ran after handshake
|
|
||||||
func (p *Peer) StartProtocol() {
|
|
||||||
loop:
|
|
||||||
for atomic.LoadInt32(&p.disconnected) == 0 {
|
|
||||||
select {
|
|
||||||
case f := <-p.inch:
|
|
||||||
f()
|
|
||||||
case <-p.quitch:
|
|
||||||
break loop
|
|
||||||
case <-p.Detector.Quitch:
|
|
||||||
fmt.Println("Peer stalled, disconnecting")
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
p.Disconnect()
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadLoop Will block on the read until a message is read
|
|
||||||
// Should only be called after handshake is complete
|
|
||||||
// on a seperate go-routine.
|
|
||||||
func (p *Peer) ReadLoop() {
|
|
||||||
|
|
||||||
idleTimer := time.AfterFunc(idleTimeout, func() {
|
|
||||||
fmt.Println("Timing out peer")
|
|
||||||
p.Disconnect()
|
|
||||||
})
|
|
||||||
|
|
||||||
loop:
|
|
||||||
for atomic.LoadInt32(&p.disconnected) == 0 {
|
|
||||||
|
|
||||||
idleTimer.Reset(idleTimeout) // reset timer on each loop
|
|
||||||
|
|
||||||
readmsg, err := p.Read()
|
|
||||||
|
|
||||||
// Message read; stop Timer
|
|
||||||
idleTimer.Stop()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Err on read", err) // This will also happen if Peer is disconnected
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove message as pending from the stall detector
|
|
||||||
p.Detector.RemoveMessage(readmsg.Command())
|
|
||||||
|
|
||||||
switch msg := readmsg.(type) {
|
|
||||||
|
|
||||||
case *payload.VersionMessage:
|
|
||||||
fmt.Println("Already received a Version, disconnecting. " + p.RemoteAddr().String())
|
|
||||||
break loop // We have already done the handshake, break loop and disconnect
|
|
||||||
case *payload.VerackMessage:
|
|
||||||
if p.verackReceived {
|
|
||||||
fmt.Println("Already received a Verack, disconnecting. " + p.RemoteAddr().String())
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
p.statemutex.Lock() // This should not happen, however if it does, then we should set it.
|
|
||||||
p.verackReceived = true
|
|
||||||
p.statemutex.Unlock()
|
|
||||||
case *payload.AddrMessage:
|
|
||||||
p.OnAddr(msg)
|
|
||||||
case *payload.GetAddrMessage:
|
|
||||||
p.OnGetAddr(msg)
|
|
||||||
case *payload.GetBlocksMessage:
|
|
||||||
p.OnGetBlocks(msg)
|
|
||||||
case *payload.BlockMessage:
|
|
||||||
p.OnBlocks(msg)
|
|
||||||
case *payload.HeadersMessage:
|
|
||||||
p.OnHeaders(msg)
|
|
||||||
case *payload.GetHeadersMessage:
|
|
||||||
p.OnGetHeaders(msg)
|
|
||||||
case *payload.InvMessage:
|
|
||||||
p.OnInv(msg)
|
|
||||||
case *payload.GetDataMessage:
|
|
||||||
p.OnGetData(msg)
|
|
||||||
case *payload.TXMessage:
|
|
||||||
p.OnTX(msg)
|
|
||||||
default:
|
|
||||||
fmt.Println("Cannot recognise message", msg.Command()) //Do not disconnect peer, just Log Message
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
idleTimer.Stop()
|
|
||||||
p.Disconnect()
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteLoop will Queue all messages to be written to the peer.
|
|
||||||
func (p *Peer) WriteLoop() {
|
|
||||||
for atomic.LoadInt32(&p.disconnected) == 0 {
|
|
||||||
select {
|
|
||||||
case f := <-p.outch:
|
|
||||||
f()
|
|
||||||
case <-p.Detector.Quitch: // if the detector quits, disconnect peer
|
|
||||||
p.Disconnect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Outgoing Requests
|
|
||||||
|
|
||||||
// RequestHeaders will write a getheaders to this peer
|
|
||||||
func (p *Peer) RequestHeaders(hash util.Uint256) error {
|
|
||||||
c := make(chan error, 0)
|
|
||||||
p.outch <- func() {
|
|
||||||
getHeaders, err := payload.NewGetHeadersMessage([]util.Uint256{hash}, util.Uint256{})
|
|
||||||
err = p.Write(getHeaders)
|
|
||||||
if err != nil {
|
|
||||||
p.Detector.AddMessage(command.GetHeaders)
|
|
||||||
}
|
|
||||||
c <- err
|
|
||||||
}
|
|
||||||
return <-c
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestBlocks will ask this peer for a set of blocks
|
|
||||||
func (p *Peer) RequestBlocks(hashes []util.Uint256) error {
|
|
||||||
c := make(chan error, 0)
|
|
||||||
|
|
||||||
p.outch <- func() {
|
|
||||||
getdata, err := payload.NewGetDataMessage(payload.InvTypeBlock)
|
|
||||||
err = getdata.AddHashes(hashes)
|
|
||||||
if err != nil {
|
|
||||||
c <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = p.Write(getdata)
|
|
||||||
if err != nil {
|
|
||||||
p.Detector.AddMessage(command.GetData)
|
|
||||||
}
|
|
||||||
|
|
||||||
c <- err
|
|
||||||
}
|
|
||||||
return <-c
|
|
||||||
}
|
|
|
@ -1,196 +0,0 @@
|
||||||
package peer_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/peer"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/protocol"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func returnConfig() peer.LocalConfig {
|
|
||||||
|
|
||||||
DefaultHeight := func() uint32 {
|
|
||||||
return 10
|
|
||||||
}
|
|
||||||
|
|
||||||
OnAddr := func(p *peer.Peer, msg *payload.AddrMessage) {}
|
|
||||||
OnHeader := func(p *peer.Peer, msg *payload.HeadersMessage) {}
|
|
||||||
OnGetHeaders := func(p *peer.Peer, msg *payload.GetHeadersMessage) {}
|
|
||||||
OnInv := func(p *peer.Peer, msg *payload.InvMessage) {}
|
|
||||||
OnGetData := func(p *peer.Peer, msg *payload.GetDataMessage) {}
|
|
||||||
OnBlock := func(p *peer.Peer, msg *payload.BlockMessage) {}
|
|
||||||
OnGetBlocks := func(p *peer.Peer, msg *payload.GetBlocksMessage) {}
|
|
||||||
|
|
||||||
return peer.LocalConfig{
|
|
||||||
Net: protocol.MainNet,
|
|
||||||
UserAgent: "NEO-GO-Default",
|
|
||||||
Services: protocol.NodePeerService,
|
|
||||||
Nonce: 1200,
|
|
||||||
ProtocolVer: 0,
|
|
||||||
Relay: false,
|
|
||||||
Port: 10332,
|
|
||||||
// pointer to config will keep the startheight updated for each version
|
|
||||||
//Message we plan to send
|
|
||||||
StartHeight: DefaultHeight,
|
|
||||||
OnHeader: OnHeader,
|
|
||||||
OnAddr: OnAddr,
|
|
||||||
OnGetHeaders: OnGetHeaders,
|
|
||||||
OnInv: OnInv,
|
|
||||||
OnGetData: OnGetData,
|
|
||||||
OnBlock: OnBlock,
|
|
||||||
OnGetBlocks: OnGetBlocks,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandshake(t *testing.T) {
|
|
||||||
address := ":20338"
|
|
||||||
go func() {
|
|
||||||
|
|
||||||
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
p := peer.NewPeer(conn, true, returnConfig())
|
|
||||||
err = p.Run()
|
|
||||||
verack, err := payload.NewVerackMessage()
|
|
||||||
if err != nil {
|
|
||||||
t.Fail()
|
|
||||||
}
|
|
||||||
if err := p.Write(verack); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, true, p.IsVerackReceived())
|
|
||||||
|
|
||||||
}()
|
|
||||||
|
|
||||||
listener, err := net.Listen("tcp", address)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
listener.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
|
|
||||||
conn, err := listener.Accept()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
tcpAddrMe := &net.TCPAddr{IP: net.ParseIP("82.2.97.142"), Port: 20338}
|
|
||||||
nonce := uint32(100)
|
|
||||||
messageVer, err := payload.NewVersionMessage(tcpAddrMe, 2595770, false, protocol.DefaultVersion, protocol.UserAgent, nonce, protocol.NodePeerService)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := wire.WriteMessage(conn, protocol.MainNet, messageVer); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
readmsg, err := wire.ReadMessage(conn, protocol.MainNet)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
version, ok := readmsg.(*payload.VersionMessage)
|
|
||||||
if !ok {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.NotEqual(t, nil, version)
|
|
||||||
|
|
||||||
messageVrck, err := payload.NewVerackMessage()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.NotEqual(t, nil, messageVrck)
|
|
||||||
|
|
||||||
if err := wire.WriteMessage(conn, protocol.MainNet, messageVrck); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
readmsg, err = wire.ReadMessage(conn, protocol.MainNet)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.NotEqual(t, nil, readmsg)
|
|
||||||
|
|
||||||
verk, ok := readmsg.(*payload.VerackMessage)
|
|
||||||
if !ok {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
assert.NotEqual(t, nil, verk)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestConfigurations(t *testing.T) {
|
|
||||||
_, conn := net.Pipe()
|
|
||||||
|
|
||||||
inbound := true
|
|
||||||
|
|
||||||
config := returnConfig()
|
|
||||||
|
|
||||||
p := peer.NewPeer(conn, inbound, config)
|
|
||||||
|
|
||||||
// test inbound
|
|
||||||
assert.Equal(t, inbound, p.Inbound())
|
|
||||||
|
|
||||||
// handshake not done, should be false
|
|
||||||
assert.Equal(t, false, p.IsVerackReceived())
|
|
||||||
|
|
||||||
assert.Equal(t, config.Services, p.Services())
|
|
||||||
|
|
||||||
assert.Equal(t, config.Relay, p.CanRelay())
|
|
||||||
|
|
||||||
assert.WithinDuration(t, time.Now(), p.CreatedAt(), 1*time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPeerDisconnect(t *testing.T) {
|
|
||||||
// Make sure everything is shutdown
|
|
||||||
// Make sure timer is shutdown in stall detector too. Should maybe put this part of test into stall detector.
|
|
||||||
|
|
||||||
_, conn := net.Pipe()
|
|
||||||
inbound := true
|
|
||||||
config := returnConfig()
|
|
||||||
p := peer.NewPeer(conn, inbound, config)
|
|
||||||
|
|
||||||
p.Disconnect()
|
|
||||||
verack, err := payload.NewVerackMessage()
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
err = p.Write(verack)
|
|
||||||
assert.NotNil(t, err)
|
|
||||||
|
|
||||||
// Check if stall detector is still running
|
|
||||||
_, ok := <-p.Detector.Quitch
|
|
||||||
assert.Equal(t, ok, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNotifyDisconnect(t *testing.T) {
|
|
||||||
|
|
||||||
_, conn := net.Pipe()
|
|
||||||
inbound := true
|
|
||||||
config := returnConfig()
|
|
||||||
p := peer.NewPeer(conn, inbound, config)
|
|
||||||
|
|
||||||
p.Disconnect()
|
|
||||||
p.NotifyDisconnect()
|
|
||||||
// TestNotify uses default test timeout as the passing condition
|
|
||||||
// Failure condition can be seen when you comment out p.Disconnect()
|
|
||||||
}
|
|
|
@ -1,132 +0,0 @@
|
||||||
package peer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
iputils "github.com/CityOfZion/neo-go/pkg/wire/util/ip"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Handshake will initiate a handshake with this peer
|
|
||||||
func (p *Peer) Handshake() error {
|
|
||||||
|
|
||||||
handshakeErr := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
if p.inbound {
|
|
||||||
handshakeErr <- p.inboundHandShake()
|
|
||||||
} else {
|
|
||||||
handshakeErr <- p.outboundHandShake()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-handshakeErr:
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
case <-time.After(handshakeTimeout):
|
|
||||||
return errHandShakeTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is purely here for Logs
|
|
||||||
if p.inbound {
|
|
||||||
fmt.Println("inbound handshake with", p.RemoteAddr().String(), "successful")
|
|
||||||
} else {
|
|
||||||
|
|
||||||
fmt.Println("outbound handshake with", p.RemoteAddr().String(), "successful")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this peer has an inbound conn (conn that is going into another peer)
|
|
||||||
// then he has dialed and so, we must read the version message
|
|
||||||
func (p *Peer) inboundHandShake() error {
|
|
||||||
var err error
|
|
||||||
if err := p.writeLocalVersionMSG(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := p.readRemoteVersionMSG(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
verack, err := payload.NewVerackMessage()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = p.Write(verack)
|
|
||||||
return p.readVerack()
|
|
||||||
}
|
|
||||||
func (p *Peer) outboundHandShake() error {
|
|
||||||
var err error
|
|
||||||
err = p.readRemoteVersionMSG()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = p.writeLocalVersionMSG()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = p.readVerack()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
verack, err := payload.NewVerackMessage()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return p.Write(verack)
|
|
||||||
}
|
|
||||||
func (p *Peer) writeLocalVersionMSG() error {
|
|
||||||
|
|
||||||
nonce := p.config.Nonce
|
|
||||||
relay := p.config.Relay
|
|
||||||
port := int(p.config.Port)
|
|
||||||
ua := p.config.UserAgent
|
|
||||||
sh := p.config.StartHeight()
|
|
||||||
services := p.config.Services
|
|
||||||
proto := p.config.ProtocolVer
|
|
||||||
ip := iputils.GetLocalIP()
|
|
||||||
tcpAddrMe := &net.TCPAddr{IP: ip, Port: port}
|
|
||||||
|
|
||||||
messageVer, err := payload.NewVersionMessage(tcpAddrMe, sh, relay, proto, ua, nonce, services)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return p.Write(messageVer)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Peer) readRemoteVersionMSG() error {
|
|
||||||
readmsg, err := wire.ReadMessage(p.conn, p.config.Net)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
version, ok := readmsg.(*payload.VersionMessage)
|
|
||||||
if !ok {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return p.OnVersion(version)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Peer) readVerack() error {
|
|
||||||
readmsg, err := wire.ReadMessage(p.conn, p.config.Net)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, ok := readmsg.(*payload.VerackMessage)
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// should only be accessed on one go-routine
|
|
||||||
p.verackReceived = true
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,67 +0,0 @@
|
||||||
# Package - Peer
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
## Responsibility
|
|
||||||
|
|
||||||
Once a connection has been made. The connection will represent a established peer to the localNode. Since a connection and the `Wire` is a golang primitive, that we cannot do much with. The peer package will encapsulate both, while adding extra functionality.
|
|
||||||
|
|
||||||
|
|
||||||
## Features
|
|
||||||
|
|
||||||
- The handshake protocol is automatically executed and handled by the peer package. If a Version/Verack is received twice, the peer will be disconnected.
|
|
||||||
|
|
||||||
- IdleTimeouts: If a Message is not received from the peer within a set period of time, the peer will be disconnected.
|
|
||||||
|
|
||||||
- StallTimeouts: For Example, If a GetHeaders, is sent to the Peer and a Headers Response is not received within a certain period of time, then the peer is disconnected.
|
|
||||||
|
|
||||||
- Concurrency Model: The concurrency model used is similar to Actor model, with a few changes. Messages can be sent to a peer asynchronously or synchronously. An example of an synchornous message send is the `RequestHeaders` method, where the channel blocks until an error value is received. The `OnHeaders` message is however asynchronously called. Furthermore, all methods passed through the config, are wrapped inside of an additional `Peers` method, this is to lay the ground work to capturing statistics regarding a specific command. These are also used so that we can pass behaviour to be executed down the channel.
|
|
||||||
|
|
||||||
- Configuration: Each Peer will have a config struct passed to it, with information about the Local Peer and functions that will encapsulate the behaviour of what the peer should do, given a request. This way, the peer is not dependent on any other package.
|
|
||||||
|
|
||||||
## Usage
|
|
||||||
|
|
||||||
conn, err := net.Dial("tcp", "seed2.neo.org:10333")
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Error dialing connection", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
config := peer.LocalConfig{
|
|
||||||
Net: protocol.MainNet,
|
|
||||||
UserAgent: "NEO-G",
|
|
||||||
Services: protocol.NodePeerService,
|
|
||||||
Nonce: 1200,
|
|
||||||
ProtocolVer: 0,
|
|
||||||
Relay: false,
|
|
||||||
Port: 10332,
|
|
||||||
StartHeight: LocalHeight,
|
|
||||||
OnHeader: OnHeader,
|
|
||||||
}
|
|
||||||
|
|
||||||
p := peer.NewPeer(conn, false, config)
|
|
||||||
err = p.Run()
|
|
||||||
|
|
||||||
hash, err := util.Uint256DecodeString(chainparams.GenesisHash)
|
|
||||||
// hash2, err := util.Uint256DecodeString("ff8fe95efc5d1cc3a22b17503aecaf289cef68f94b79ddad6f613569ca2342d8")
|
|
||||||
err = p.RequestHeaders(hash)
|
|
||||||
|
|
||||||
func OnHeader(peer *peer.Peer, msg *payload.HeadersMessage) {
|
|
||||||
// This function is passed to peer
|
|
||||||
// and the peer will execute it on receiving a header
|
|
||||||
}
|
|
||||||
|
|
||||||
func LocalHeight() uint32 {
|
|
||||||
// This will be a function from the object that handles the block heights
|
|
||||||
return 10
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
### Notes
|
|
||||||
|
|
||||||
|
|
||||||
Should we follow the actor model for Peers? Each Peer will have a ID, which we can take as the PID or if
|
|
||||||
we launch a go-routine for each peer, then we can use that as an implicit PID.
|
|
||||||
|
|
||||||
Peer information should be stored into a database, if no db exists, we should get it from an initial peers file.
|
|
||||||
We can use this to periodically store information about a peer.
|
|
|
@ -1,111 +0,0 @@
|
||||||
package peer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
)
|
|
||||||
|
|
||||||
// OnGetData is called when a GetData message is received
|
|
||||||
func (p *Peer) OnGetData(msg *payload.GetDataMessage) {
|
|
||||||
p.inch <- func() {
|
|
||||||
if p.config.OnInv != nil {
|
|
||||||
p.config.OnGetData(p, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//OnTX is called when a TX message is received
|
|
||||||
func (p *Peer) OnTX(msg *payload.TXMessage) {
|
|
||||||
p.inch <- func() {
|
|
||||||
p.inch <- func() {
|
|
||||||
if p.config.OnTx != nil {
|
|
||||||
p.config.OnTx(p, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnInv is called when a Inv message is received
|
|
||||||
func (p *Peer) OnInv(msg *payload.InvMessage) {
|
|
||||||
p.inch <- func() {
|
|
||||||
if p.config.OnInv != nil {
|
|
||||||
p.config.OnInv(p, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnGetHeaders is called when a GetHeaders message is received
|
|
||||||
func (p *Peer) OnGetHeaders(msg *payload.GetHeadersMessage) {
|
|
||||||
p.inch <- func() {
|
|
||||||
if p.config.OnGetHeaders != nil {
|
|
||||||
p.config.OnGetHeaders(p, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnAddr is called when a Addr message is received
|
|
||||||
func (p *Peer) OnAddr(msg *payload.AddrMessage) {
|
|
||||||
p.inch <- func() {
|
|
||||||
if p.config.OnAddr != nil {
|
|
||||||
p.config.OnAddr(p, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnGetAddr is called when a GetAddr message is received
|
|
||||||
func (p *Peer) OnGetAddr(msg *payload.GetAddrMessage) {
|
|
||||||
p.inch <- func() {
|
|
||||||
if p.config.OnGetAddr != nil {
|
|
||||||
p.config.OnGetAddr(p, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnGetBlocks is called when a GetBlocks message is received
|
|
||||||
func (p *Peer) OnGetBlocks(msg *payload.GetBlocksMessage) {
|
|
||||||
p.inch <- func() {
|
|
||||||
if p.config.OnGetBlocks != nil {
|
|
||||||
p.config.OnGetBlocks(p, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnBlocks is called when a Blocks message is received
|
|
||||||
func (p *Peer) OnBlocks(msg *payload.BlockMessage) {
|
|
||||||
p.Detector.RemoveMessage(msg.Command())
|
|
||||||
p.inch <- func() {
|
|
||||||
if p.config.OnBlock != nil {
|
|
||||||
p.config.OnBlock(p, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnHeaders is called when a Headers message is received
|
|
||||||
func (p *Peer) OnHeaders(msg *payload.HeadersMessage) {
|
|
||||||
p.Detector.RemoveMessage(msg.Command())
|
|
||||||
p.inch <- func() {
|
|
||||||
if p.config.OnHeader != nil {
|
|
||||||
p.config.OnHeader(p, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnVersion Listener will be called
|
|
||||||
// during the handshake, any error checking should be done here for the versionMessage.
|
|
||||||
// This should only ever be called during the handshake. Any other place and the peer will disconnect.
|
|
||||||
func (p *Peer) OnVersion(msg *payload.VersionMessage) error {
|
|
||||||
if msg.Nonce == p.config.Nonce {
|
|
||||||
p.conn.Close()
|
|
||||||
return errors.New("self connection, disconnecting Peer")
|
|
||||||
}
|
|
||||||
p.versionKnown = true
|
|
||||||
p.port = msg.Port
|
|
||||||
p.services = msg.Services
|
|
||||||
p.userAgent = string(msg.UserAgent)
|
|
||||||
p.createdAt = time.Now()
|
|
||||||
p.relay = msg.Relay
|
|
||||||
p.startHeight = msg.StartHeight
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,175 +0,0 @@
|
||||||
package stall
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/command"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Detector (stall detector) will keep track of all pendingMessages
|
|
||||||
// If any message takes too long to reply
|
|
||||||
// the detector will disconnect the peer
|
|
||||||
type Detector struct {
|
|
||||||
responseTime time.Duration
|
|
||||||
tickInterval time.Duration
|
|
||||||
|
|
||||||
lock *sync.RWMutex
|
|
||||||
responses map[command.Type]time.Time
|
|
||||||
|
|
||||||
// The detector is embedded into a peer and the peer watches this quit chan
|
|
||||||
// If this chan is closed, the peer disconnects
|
|
||||||
Quitch chan struct{}
|
|
||||||
|
|
||||||
// atomic vals
|
|
||||||
disconnected int32
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDetector will create a new stall detector
|
|
||||||
// rT is the responseTime and signals how long
|
|
||||||
// a peer has to reply back to a sent message
|
|
||||||
// tickerInterval is how often the detector wil check for stalled messages
|
|
||||||
func NewDetector(rTime time.Duration, tickerInterval time.Duration) *Detector {
|
|
||||||
d := &Detector{
|
|
||||||
responseTime: rTime,
|
|
||||||
tickInterval: tickerInterval,
|
|
||||||
lock: new(sync.RWMutex),
|
|
||||||
responses: map[command.Type]time.Time{},
|
|
||||||
Quitch: make(chan struct{}),
|
|
||||||
}
|
|
||||||
go d.loop()
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Detector) loop() {
|
|
||||||
ticker := time.NewTicker(d.tickInterval)
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
d.Quit()
|
|
||||||
d.DeleteAll()
|
|
||||||
ticker.Stop()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
now := time.Now()
|
|
||||||
d.lock.RLock()
|
|
||||||
resp := d.responses
|
|
||||||
d.lock.RUnlock()
|
|
||||||
for _, deadline := range resp {
|
|
||||||
if now.After(deadline) {
|
|
||||||
fmt.Println(resp)
|
|
||||||
fmt.Println("Deadline passed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Quit is a concurrent safe way to call the Quit channel
|
|
||||||
// Without blocking
|
|
||||||
func (d *Detector) Quit() {
|
|
||||||
// return if already disconnected
|
|
||||||
if atomic.LoadInt32(&d.disconnected) != 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.AddInt32(&d.disconnected, 1)
|
|
||||||
close(d.Quitch)
|
|
||||||
}
|
|
||||||
|
|
||||||
//AddMessage will add a message to the responses map
|
|
||||||
// Call this function when we send a message to a peer
|
|
||||||
// The command passed through is the command that we sent
|
|
||||||
// we will then set a timer for the expected message(s)
|
|
||||||
func (d *Detector) AddMessage(cmd command.Type) {
|
|
||||||
cmds := d.addMessage(cmd)
|
|
||||||
d.lock.Lock()
|
|
||||||
for _, cmd := range cmds {
|
|
||||||
d.responses[cmd] = time.Now().Add(d.responseTime)
|
|
||||||
}
|
|
||||||
d.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// RemoveMessage remove messages from the responses map
|
|
||||||
// Call this function when we receive a message from
|
|
||||||
// peer. This will remove the pendingresponse message from the map.
|
|
||||||
// The command passed through is the command we received
|
|
||||||
func (d *Detector) RemoveMessage(cmd command.Type) {
|
|
||||||
cmds := d.removeMessage(cmd)
|
|
||||||
d.lock.Lock()
|
|
||||||
for _, cmd := range cmds {
|
|
||||||
delete(d.responses, cmd)
|
|
||||||
}
|
|
||||||
d.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteAll empties the map of all contents and
|
|
||||||
// is called when the detector is being shut down
|
|
||||||
func (d *Detector) DeleteAll() {
|
|
||||||
d.lock.Lock()
|
|
||||||
d.responses = make(map[command.Type]time.Time)
|
|
||||||
d.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetMessages Will return a map of all of the pendingResponses
|
|
||||||
// and their deadlines
|
|
||||||
func (d *Detector) GetMessages() map[command.Type]time.Time {
|
|
||||||
var resp map[command.Type]time.Time
|
|
||||||
d.lock.RLock()
|
|
||||||
resp = d.responses
|
|
||||||
d.lock.RUnlock()
|
|
||||||
return resp
|
|
||||||
}
|
|
||||||
|
|
||||||
// when a message is added, we will add a deadline for
|
|
||||||
// expected response
|
|
||||||
func (d *Detector) addMessage(cmd command.Type) []command.Type {
|
|
||||||
var cmds []command.Type
|
|
||||||
|
|
||||||
switch cmd {
|
|
||||||
case command.GetHeaders:
|
|
||||||
// We now will expect a Headers Message
|
|
||||||
cmds = append(cmds, command.Headers)
|
|
||||||
case command.GetAddr:
|
|
||||||
// We now will expect a Headers Message
|
|
||||||
cmds = append(cmds, command.Addr)
|
|
||||||
case command.GetData:
|
|
||||||
// We will now expect a block/tx message
|
|
||||||
cmds = append(cmds, command.Block)
|
|
||||||
cmds = append(cmds, command.TX)
|
|
||||||
case command.GetBlocks:
|
|
||||||
// we will now expect a inv message
|
|
||||||
cmds = append(cmds, command.Inv)
|
|
||||||
case command.Version:
|
|
||||||
// We will now expect a verack
|
|
||||||
cmds = append(cmds, command.Verack)
|
|
||||||
}
|
|
||||||
return cmds
|
|
||||||
}
|
|
||||||
|
|
||||||
// if receive a message, we will delete it from pending
|
|
||||||
func (d *Detector) removeMessage(cmd command.Type) []command.Type {
|
|
||||||
var cmds []command.Type
|
|
||||||
|
|
||||||
switch cmd {
|
|
||||||
case command.Block:
|
|
||||||
// We will now remove a block and tx message
|
|
||||||
cmds = append(cmds, command.Block)
|
|
||||||
cmds = append(cmds, command.TX)
|
|
||||||
case command.TX:
|
|
||||||
// We will now remove a block and tx message
|
|
||||||
cmds = append(cmds, command.Block)
|
|
||||||
cmds = append(cmds, command.TX)
|
|
||||||
case command.Verack:
|
|
||||||
// We will now expect a verack
|
|
||||||
cmds = append(cmds, cmd)
|
|
||||||
default:
|
|
||||||
cmds = append(cmds, cmd)
|
|
||||||
}
|
|
||||||
return cmds
|
|
||||||
}
|
|
|
@ -1,84 +0,0 @@
|
||||||
package stall
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/command"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestAddRemoveMessage(t *testing.T) {
|
|
||||||
|
|
||||||
responseTime := 2 * time.Millisecond
|
|
||||||
tickerInterval := 1 * time.Millisecond
|
|
||||||
|
|
||||||
d := NewDetector(responseTime, tickerInterval)
|
|
||||||
d.AddMessage(command.GetAddr)
|
|
||||||
mp := d.GetMessages()
|
|
||||||
|
|
||||||
assert.Equal(t, 1, len(mp))
|
|
||||||
assert.IsType(t, time.Time{}, mp[command.GetAddr])
|
|
||||||
|
|
||||||
d.RemoveMessage(command.Addr)
|
|
||||||
mp = d.GetMessages()
|
|
||||||
|
|
||||||
assert.Equal(t, 0, len(mp))
|
|
||||||
assert.Empty(t, mp[command.GetAddr])
|
|
||||||
}
|
|
||||||
|
|
||||||
type mockPeer struct {
|
|
||||||
lock *sync.RWMutex
|
|
||||||
online bool
|
|
||||||
detector *Detector
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mp *mockPeer) loop() {
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-mp.detector.Quitch:
|
|
||||||
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// cleanup
|
|
||||||
mp.lock.Lock()
|
|
||||||
mp.online = false
|
|
||||||
mp.lock.Unlock()
|
|
||||||
}
|
|
||||||
func TestDeadlineWorks(t *testing.T) {
|
|
||||||
|
|
||||||
responseTime := 2 * time.Millisecond
|
|
||||||
tickerInterval := 1 * time.Millisecond
|
|
||||||
|
|
||||||
d := NewDetector(responseTime, tickerInterval)
|
|
||||||
mp := mockPeer{online: true, detector: d, lock: new(sync.RWMutex)}
|
|
||||||
go mp.loop()
|
|
||||||
|
|
||||||
d.AddMessage(command.GetAddr)
|
|
||||||
time.Sleep(responseTime + 1*time.Millisecond)
|
|
||||||
|
|
||||||
k := make(map[command.Type]time.Time)
|
|
||||||
d.lock.RLock()
|
|
||||||
assert.Equal(t, k, d.responses)
|
|
||||||
d.lock.RUnlock()
|
|
||||||
mp.lock.RLock()
|
|
||||||
assert.Equal(t, false, mp.online)
|
|
||||||
mp.lock.RUnlock()
|
|
||||||
}
|
|
||||||
func TestDeadlineShouldNotBeEmpty(t *testing.T) {
|
|
||||||
responseTime := 10 * time.Millisecond
|
|
||||||
tickerInterval := 1 * time.Millisecond
|
|
||||||
|
|
||||||
d := NewDetector(responseTime, tickerInterval)
|
|
||||||
d.AddMessage(command.GetAddr)
|
|
||||||
time.Sleep(1 * time.Millisecond)
|
|
||||||
|
|
||||||
k := make(map[command.Type]time.Time)
|
|
||||||
d.lock.RLock()
|
|
||||||
assert.NotEqual(t, k, d.responses)
|
|
||||||
d.lock.RUnlock()
|
|
||||||
}
|
|
|
@ -1,155 +0,0 @@
|
||||||
package peermgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"sort"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
//ErrCacheLimit is returned when the cache limit is reached
|
|
||||||
ErrCacheLimit = errors.New("nomore items can be added to the cache")
|
|
||||||
|
|
||||||
//ErrNoItems is returned when pickItem is called and there are no items in the cache
|
|
||||||
ErrNoItems = errors.New("there are no items in the cache")
|
|
||||||
|
|
||||||
//ErrDuplicateItem is returned when you try to add the same item, more than once to the cache
|
|
||||||
ErrDuplicateItem = errors.New("this item is already in the cache")
|
|
||||||
)
|
|
||||||
|
|
||||||
//BlockInfo holds the necessary information that the cache needs
|
|
||||||
// to sort and store block requests
|
|
||||||
type BlockInfo struct {
|
|
||||||
BlockHash util.Uint256
|
|
||||||
BlockIndex uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
// Equals returns true if two blockInfo objects
|
|
||||||
// have the same hash and the same index
|
|
||||||
func (bi *BlockInfo) Equals(other BlockInfo) bool {
|
|
||||||
return bi.BlockHash.Equals(other.BlockHash) && bi.BlockIndex == other.BlockIndex
|
|
||||||
}
|
|
||||||
|
|
||||||
// indexSorter sorts the blockInfos by blockIndex.
|
|
||||||
type indexSorter []BlockInfo
|
|
||||||
|
|
||||||
func (is indexSorter) Len() int { return len(is) }
|
|
||||||
func (is indexSorter) Swap(i, j int) { is[i], is[j] = is[j], is[i] }
|
|
||||||
func (is indexSorter) Less(i, j int) bool { return is[i].BlockIndex < is[j].BlockIndex }
|
|
||||||
|
|
||||||
//blockCache will cache any pending block requests
|
|
||||||
// for the node when there are no available nodes
|
|
||||||
type blockCache struct {
|
|
||||||
cacheLimit int
|
|
||||||
cacheLock sync.Mutex
|
|
||||||
cache []BlockInfo
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBlockCache(cacheLimit int) *blockCache {
|
|
||||||
return &blockCache{
|
|
||||||
cache: make([]BlockInfo, 0, cacheLimit),
|
|
||||||
cacheLimit: cacheLimit,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *blockCache) addBlockInfo(bi BlockInfo) error {
|
|
||||||
if bc.cacheLen() == bc.cacheLimit {
|
|
||||||
return ErrCacheLimit
|
|
||||||
}
|
|
||||||
|
|
||||||
bc.cacheLock.Lock()
|
|
||||||
defer bc.cacheLock.Unlock()
|
|
||||||
|
|
||||||
// Check for duplicates. slice will always be small so a simple for loop will work
|
|
||||||
for _, bInfo := range bc.cache {
|
|
||||||
if bInfo.Equals(bi) {
|
|
||||||
return ErrDuplicateItem
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bc.cache = append(bc.cache, bi)
|
|
||||||
|
|
||||||
sort.Sort(indexSorter(bc.cache))
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *blockCache) addBlockInfos(bis []BlockInfo) error {
|
|
||||||
|
|
||||||
if len(bis)+bc.cacheLen() > bc.cacheLimit {
|
|
||||||
return errors.New("too many items to add, this will exceed the cache limit")
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, bi := range bis {
|
|
||||||
err := bc.addBlockInfo(bi)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *blockCache) cacheLen() int {
|
|
||||||
bc.cacheLock.Lock()
|
|
||||||
defer bc.cacheLock.Unlock()
|
|
||||||
return len(bc.cache)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *blockCache) pickFirstItem() (BlockInfo, error) {
|
|
||||||
return bc.pickItem(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *blockCache) pickAllItems() ([]BlockInfo, error) {
|
|
||||||
|
|
||||||
numOfItems := bc.cacheLen()
|
|
||||||
|
|
||||||
items := make([]BlockInfo, 0, numOfItems)
|
|
||||||
|
|
||||||
for i := 0; i < numOfItems; i++ {
|
|
||||||
bi, err := bc.pickFirstItem()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
items = append(items, bi)
|
|
||||||
}
|
|
||||||
return items, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *blockCache) pickItem(i uint) (BlockInfo, error) {
|
|
||||||
if bc.cacheLen() < 1 {
|
|
||||||
return BlockInfo{}, ErrNoItems
|
|
||||||
}
|
|
||||||
|
|
||||||
if i >= uint(bc.cacheLen()) {
|
|
||||||
return BlockInfo{}, errors.New("index out of range")
|
|
||||||
}
|
|
||||||
|
|
||||||
bc.cacheLock.Lock()
|
|
||||||
defer bc.cacheLock.Unlock()
|
|
||||||
|
|
||||||
item := bc.cache[i]
|
|
||||||
bc.cache = append(bc.cache[:i], bc.cache[i+1:]...)
|
|
||||||
return item, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *blockCache) removeHash(hashToRemove util.Uint256) error {
|
|
||||||
index, err := bc.findHash(hashToRemove)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = bc.pickItem(uint(index))
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bc *blockCache) findHash(hashToFind util.Uint256) (int, error) {
|
|
||||||
bc.cacheLock.Lock()
|
|
||||||
defer bc.cacheLock.Unlock()
|
|
||||||
for i, bInfo := range bc.cache {
|
|
||||||
if bInfo.BlockHash.Equals(hashToFind) {
|
|
||||||
return i, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1, errors.New("hash cannot be found in the cache")
|
|
||||||
}
|
|
|
@ -1,80 +0,0 @@
|
||||||
package peermgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/rand"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestAddBlock(t *testing.T) {
|
|
||||||
|
|
||||||
bc := &blockCache{
|
|
||||||
cacheLimit: 20,
|
|
||||||
}
|
|
||||||
bi := randomBlockInfo(t)
|
|
||||||
|
|
||||||
err := bc.addBlockInfo(bi)
|
|
||||||
assert.Equal(t, nil, err)
|
|
||||||
|
|
||||||
assert.Equal(t, 1, bc.cacheLen())
|
|
||||||
|
|
||||||
err = bc.addBlockInfo(bi)
|
|
||||||
assert.Equal(t, ErrDuplicateItem, err)
|
|
||||||
|
|
||||||
assert.Equal(t, 1, bc.cacheLen())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCacheLimit(t *testing.T) {
|
|
||||||
|
|
||||||
bc := &blockCache{
|
|
||||||
cacheLimit: 20,
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < bc.cacheLimit; i++ {
|
|
||||||
err := bc.addBlockInfo(randomBlockInfo(t))
|
|
||||||
assert.Equal(t, nil, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err := bc.addBlockInfo(randomBlockInfo(t))
|
|
||||||
assert.Equal(t, ErrCacheLimit, err)
|
|
||||||
|
|
||||||
assert.Equal(t, bc.cacheLimit, bc.cacheLen())
|
|
||||||
}
|
|
||||||
func TestPickItem(t *testing.T) {
|
|
||||||
|
|
||||||
bc := &blockCache{
|
|
||||||
cacheLimit: 20,
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < bc.cacheLimit; i++ {
|
|
||||||
err := bc.addBlockInfo(randomBlockInfo(t))
|
|
||||||
assert.Equal(t, nil, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < bc.cacheLimit; i++ {
|
|
||||||
_, err := bc.pickFirstItem()
|
|
||||||
assert.Equal(t, nil, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
assert.Equal(t, 0, bc.cacheLen())
|
|
||||||
}
|
|
||||||
|
|
||||||
func randomUint256(t *testing.T) util.Uint256 {
|
|
||||||
rand32 := make([]byte, 32)
|
|
||||||
rand.Read(rand32)
|
|
||||||
|
|
||||||
u, err := util.Uint256DecodeBytes(rand32)
|
|
||||||
assert.Equal(t, nil, err)
|
|
||||||
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func randomBlockInfo(t *testing.T) BlockInfo {
|
|
||||||
|
|
||||||
return BlockInfo{
|
|
||||||
randomUint256(t),
|
|
||||||
rand.Uint32(),
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,227 +0,0 @@
|
||||||
package peermgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/command"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// blockCacheLimit is the maximum amount of pending requests that the cache can hold
|
|
||||||
pendingBlockCacheLimit = 20
|
|
||||||
|
|
||||||
//peerBlockCacheLimit is the maximum amount of inflight blocks that a peer can
|
|
||||||
// have, before they are flagged as busy
|
|
||||||
peerBlockCacheLimit = 1
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
//ErrNoAvailablePeers is returned when a request for data from a peer is invoked
|
|
||||||
// but there are no available peers to request data from
|
|
||||||
ErrNoAvailablePeers = errors.New("there are no available peers to interact with")
|
|
||||||
|
|
||||||
// ErrUnknownPeer is returned when a peer that the peer manager does not know about
|
|
||||||
// sends a message to this node
|
|
||||||
ErrUnknownPeer = errors.New("this peer has not been registered with the peer manager")
|
|
||||||
)
|
|
||||||
|
|
||||||
//mPeer represents a peer that is managed by the peer manager
|
|
||||||
type mPeer interface {
|
|
||||||
Disconnect()
|
|
||||||
RequestBlocks([]util.Uint256) error
|
|
||||||
RequestHeaders(util.Uint256) error
|
|
||||||
NotifyDisconnect()
|
|
||||||
}
|
|
||||||
|
|
||||||
type peerstats struct {
|
|
||||||
// when a peer is sent a blockRequest
|
|
||||||
// the peermanager will track this using this blockCache
|
|
||||||
blockCache *blockCache
|
|
||||||
// all other requests will be tracked using the requests map
|
|
||||||
requests map[command.Type]bool
|
|
||||||
}
|
|
||||||
|
|
||||||
//PeerMgr manages all peers that the node is connected to
|
|
||||||
type PeerMgr struct {
|
|
||||||
pLock sync.RWMutex
|
|
||||||
peers map[mPeer]peerstats
|
|
||||||
|
|
||||||
requestCache *blockCache
|
|
||||||
}
|
|
||||||
|
|
||||||
//New returns a new peermgr object
|
|
||||||
func New() *PeerMgr {
|
|
||||||
return &PeerMgr{
|
|
||||||
peers: make(map[mPeer]peerstats),
|
|
||||||
requestCache: newBlockCache(pendingBlockCacheLimit),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddPeer adds a peer to the list of managed peers
|
|
||||||
func (pmgr *PeerMgr) AddPeer(peer mPeer) {
|
|
||||||
|
|
||||||
pmgr.pLock.Lock()
|
|
||||||
defer pmgr.pLock.Unlock()
|
|
||||||
if _, exists := pmgr.peers[peer]; exists {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
pmgr.peers[peer] = peerstats{
|
|
||||||
requests: make(map[command.Type]bool),
|
|
||||||
blockCache: newBlockCache(peerBlockCacheLimit),
|
|
||||||
}
|
|
||||||
go pmgr.onDisconnect(peer)
|
|
||||||
}
|
|
||||||
|
|
||||||
//MsgReceived notifies the peer manager that we have received a
|
|
||||||
// message from a peer
|
|
||||||
func (pmgr *PeerMgr) MsgReceived(peer mPeer, cmd command.Type) error {
|
|
||||||
pmgr.pLock.Lock()
|
|
||||||
defer pmgr.pLock.Unlock()
|
|
||||||
|
|
||||||
// if peer was unknown then disconnect
|
|
||||||
val, ok := pmgr.peers[peer]
|
|
||||||
if !ok {
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
peer.NotifyDisconnect()
|
|
||||||
}()
|
|
||||||
|
|
||||||
peer.Disconnect()
|
|
||||||
return ErrUnknownPeer
|
|
||||||
}
|
|
||||||
val.requests[cmd] = false
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//BlockMsgReceived notifies the peer manager that we have received a
|
|
||||||
// block message from a peer
|
|
||||||
func (pmgr *PeerMgr) BlockMsgReceived(peer mPeer, bi BlockInfo) error {
|
|
||||||
|
|
||||||
// if peer was unknown then disconnect
|
|
||||||
val, ok := pmgr.peers[peer]
|
|
||||||
if !ok {
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
peer.NotifyDisconnect()
|
|
||||||
}()
|
|
||||||
|
|
||||||
peer.Disconnect()
|
|
||||||
return ErrUnknownPeer
|
|
||||||
}
|
|
||||||
|
|
||||||
// // remove item from the peersBlock cache
|
|
||||||
err := val.blockCache.removeHash(bi.BlockHash)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if cache empty, if so then return
|
|
||||||
if pmgr.requestCache.cacheLen() == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to clean an item from the pendingBlockCache, a peer has just finished serving a block request
|
|
||||||
cachedBInfo, err := pmgr.requestCache.pickFirstItem()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return pmgr.blockCallPeer(cachedBInfo, func(p mPeer) error {
|
|
||||||
return p.RequestBlocks([]util.Uint256{cachedBInfo.BlockHash})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Len returns the amount of peers that the peer manager
|
|
||||||
//currently knows about
|
|
||||||
func (pmgr *PeerMgr) Len() int {
|
|
||||||
pmgr.pLock.Lock()
|
|
||||||
defer pmgr.pLock.Unlock()
|
|
||||||
return len(pmgr.peers)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestBlock will request a block from the most
|
|
||||||
// available peer. Then update it's stats, so we know that
|
|
||||||
// this peer is busy
|
|
||||||
func (pmgr *PeerMgr) RequestBlock(bi BlockInfo) error {
|
|
||||||
pmgr.pLock.Lock()
|
|
||||||
defer pmgr.pLock.Unlock()
|
|
||||||
|
|
||||||
err := pmgr.blockCallPeer(bi, func(p mPeer) error {
|
|
||||||
return p.RequestBlocks([]util.Uint256{bi.BlockHash})
|
|
||||||
})
|
|
||||||
|
|
||||||
if err == ErrNoAvailablePeers {
|
|
||||||
return pmgr.requestCache.addBlockInfo(bi)
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestHeaders will request a headers from the most available peer.
|
|
||||||
func (pmgr *PeerMgr) RequestHeaders(hash util.Uint256) error {
|
|
||||||
pmgr.pLock.Lock()
|
|
||||||
defer pmgr.pLock.Unlock()
|
|
||||||
return pmgr.callPeerForCmd(command.Headers, func(p mPeer) error {
|
|
||||||
return p.RequestHeaders(hash)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pmgr *PeerMgr) callPeerForCmd(cmd command.Type, f func(p mPeer) error) error {
|
|
||||||
for peer, stats := range pmgr.peers {
|
|
||||||
if !stats.requests[cmd] {
|
|
||||||
stats.requests[cmd] = true
|
|
||||||
return f(peer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ErrNoAvailablePeers
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pmgr *PeerMgr) blockCallPeer(bi BlockInfo, f func(p mPeer) error) error {
|
|
||||||
for peer, stats := range pmgr.peers {
|
|
||||||
if stats.blockCache.cacheLen() < peerBlockCacheLimit {
|
|
||||||
err := stats.blockCache.addBlockInfo(bi)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return f(peer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ErrNoAvailablePeers
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pmgr *PeerMgr) onDisconnect(p mPeer) {
|
|
||||||
|
|
||||||
// Blocking until peer is disconnected
|
|
||||||
p.NotifyDisconnect()
|
|
||||||
|
|
||||||
pmgr.pLock.Lock()
|
|
||||||
defer func() {
|
|
||||||
delete(pmgr.peers, p)
|
|
||||||
pmgr.pLock.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Add all of peers outstanding block requests into
|
|
||||||
// the peer managers pendingBlockRequestCache
|
|
||||||
|
|
||||||
val, ok := pmgr.peers[p]
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
pendingRequests, err := val.blockCache.pickAllItems()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = pmgr.requestCache.addBlockInfos(pendingRequests)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,201 +0,0 @@
|
||||||
package peermgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/command"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
type peer struct {
|
|
||||||
quit chan bool
|
|
||||||
nonce int
|
|
||||||
disconnected bool
|
|
||||||
blockRequested int
|
|
||||||
headersRequested int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *peer) Disconnect() {
|
|
||||||
p.disconnected = true
|
|
||||||
p.quit <- true
|
|
||||||
}
|
|
||||||
func (p *peer) RequestBlocks([]util.Uint256) error {
|
|
||||||
p.blockRequested++
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (p *peer) RequestHeaders(util.Uint256) error {
|
|
||||||
p.headersRequested++
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
func (p *peer) NotifyDisconnect() {
|
|
||||||
<-p.quit
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAddPeer(t *testing.T) {
|
|
||||||
pmgr := New()
|
|
||||||
|
|
||||||
peerA := &peer{nonce: 1}
|
|
||||||
peerB := &peer{nonce: 2}
|
|
||||||
peerC := &peer{nonce: 3}
|
|
||||||
|
|
||||||
pmgr.AddPeer(peerA)
|
|
||||||
pmgr.AddPeer(peerB)
|
|
||||||
pmgr.AddPeer(peerC)
|
|
||||||
pmgr.AddPeer(peerC)
|
|
||||||
|
|
||||||
assert.Equal(t, 3, pmgr.Len())
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRequestBlocks(t *testing.T) {
|
|
||||||
pmgr := New()
|
|
||||||
|
|
||||||
peerA := &peer{nonce: 1}
|
|
||||||
peerB := &peer{nonce: 2}
|
|
||||||
peerC := &peer{nonce: 3}
|
|
||||||
|
|
||||||
pmgr.AddPeer(peerA)
|
|
||||||
pmgr.AddPeer(peerB)
|
|
||||||
pmgr.AddPeer(peerC)
|
|
||||||
|
|
||||||
firstBlock := randomBlockInfo(t)
|
|
||||||
err := pmgr.RequestBlock(firstBlock)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
secondBlock := randomBlockInfo(t)
|
|
||||||
err = pmgr.RequestBlock(secondBlock)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
thirdBlock := randomBlockInfo(t)
|
|
||||||
err = pmgr.RequestBlock(thirdBlock)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
// Since the peer manager did not get a MsgReceived
|
|
||||||
// in between the block requests
|
|
||||||
// a request should be sent to all peers
|
|
||||||
// This is only true, if peerBlockCacheLimit == 1
|
|
||||||
|
|
||||||
assert.Equal(t, 1, peerA.blockRequested)
|
|
||||||
assert.Equal(t, 1, peerB.blockRequested)
|
|
||||||
assert.Equal(t, 1, peerC.blockRequested)
|
|
||||||
|
|
||||||
// Since the peer manager still has not received a MsgReceived
|
|
||||||
// another call to request blocks, will add the request to the cache
|
|
||||||
// and return a nil err
|
|
||||||
|
|
||||||
fourthBlock := randomBlockInfo(t)
|
|
||||||
err = pmgr.RequestBlock(fourthBlock)
|
|
||||||
assert.Equal(t, nil, err)
|
|
||||||
assert.Equal(t, 1, pmgr.requestCache.cacheLen())
|
|
||||||
|
|
||||||
// If we tell the peer manager that we have received a block
|
|
||||||
// it will check the cache for any pending requests and send a block request if there are any.
|
|
||||||
// The request will go to the peer who sent back the block corresponding to the first hash
|
|
||||||
// since the other two peers are still busy with their block requests
|
|
||||||
|
|
||||||
peer := findPeerwithHash(t, pmgr, firstBlock.BlockHash)
|
|
||||||
err = pmgr.BlockMsgReceived(peer, firstBlock)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
totalRequests := peerA.blockRequested + peerB.blockRequested + peerC.blockRequested
|
|
||||||
assert.Equal(t, 4, totalRequests)
|
|
||||||
|
|
||||||
// // cache should be empty now
|
|
||||||
assert.Equal(t, 0, pmgr.requestCache.cacheLen())
|
|
||||||
}
|
|
||||||
|
|
||||||
// The peer manager does not tell you what peer was sent a particular block request
|
|
||||||
// For testing purposes, the following function will find that peer
|
|
||||||
func findPeerwithHash(t *testing.T, pmgr *PeerMgr, blockHash util.Uint256) mPeer {
|
|
||||||
for peer, stats := range pmgr.peers {
|
|
||||||
_, err := stats.blockCache.findHash(blockHash)
|
|
||||||
if err == nil {
|
|
||||||
return peer
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert.Fail(t, "cannot find a peer with that hash")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestRequestHeaders(t *testing.T) {
|
|
||||||
pmgr := New()
|
|
||||||
|
|
||||||
peerA := &peer{nonce: 1}
|
|
||||||
peerB := &peer{nonce: 2}
|
|
||||||
peerC := &peer{nonce: 3}
|
|
||||||
|
|
||||||
pmgr.AddPeer(peerA)
|
|
||||||
pmgr.AddPeer(peerB)
|
|
||||||
pmgr.AddPeer(peerC)
|
|
||||||
|
|
||||||
err := pmgr.RequestHeaders(util.Uint256{})
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
err = pmgr.RequestHeaders(util.Uint256{})
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
err = pmgr.RequestHeaders(util.Uint256{})
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
// Since the peer manager did not get a MsgReceived
|
|
||||||
// in between the header requests
|
|
||||||
// a request should be sent to all peers
|
|
||||||
|
|
||||||
assert.Equal(t, 1, peerA.headersRequested)
|
|
||||||
assert.Equal(t, 1, peerB.headersRequested)
|
|
||||||
assert.Equal(t, 1, peerC.headersRequested)
|
|
||||||
|
|
||||||
// Since the peer manager still has not received a MsgReceived
|
|
||||||
// another call to request header, will return a NoAvailablePeerError
|
|
||||||
|
|
||||||
err = pmgr.RequestHeaders(util.Uint256{})
|
|
||||||
assert.Equal(t, ErrNoAvailablePeers, err)
|
|
||||||
|
|
||||||
// If we tell the peer manager that peerA has given us a block
|
|
||||||
// then send another BlockRequest. It will go to peerA
|
|
||||||
// since the other two peers are still busy with their
|
|
||||||
// block requests
|
|
||||||
|
|
||||||
err = pmgr.MsgReceived(peerA, command.Headers)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
err = pmgr.RequestHeaders(util.Uint256{})
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, 2, peerA.headersRequested)
|
|
||||||
assert.Equal(t, 1, peerB.headersRequested)
|
|
||||||
assert.Equal(t, 1, peerC.headersRequested)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestUnknownPeer(t *testing.T) {
|
|
||||||
pmgr := New()
|
|
||||||
|
|
||||||
unknownPeer := &peer{
|
|
||||||
disconnected: false,
|
|
||||||
quit: make(chan bool),
|
|
||||||
}
|
|
||||||
|
|
||||||
err := pmgr.MsgReceived(unknownPeer, command.Headers)
|
|
||||||
assert.Equal(t, true, unknownPeer.disconnected)
|
|
||||||
assert.Equal(t, ErrUnknownPeer, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNotifyDisconnect(t *testing.T) {
|
|
||||||
pmgr := New()
|
|
||||||
|
|
||||||
peerA := &peer{
|
|
||||||
nonce: 1,
|
|
||||||
quit: make(chan bool),
|
|
||||||
}
|
|
||||||
|
|
||||||
pmgr.AddPeer(peerA)
|
|
||||||
|
|
||||||
if pmgr.Len() != 1 {
|
|
||||||
t.Fail()
|
|
||||||
}
|
|
||||||
|
|
||||||
peerA.Disconnect()
|
|
||||||
|
|
||||||
if pmgr.Len() != 0 {
|
|
||||||
t.Fail()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,61 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/chain"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
)
|
|
||||||
|
|
||||||
// blockModeOnBlock is called when the sync manager is block mode
|
|
||||||
// and receives a block.
|
|
||||||
func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error {
|
|
||||||
|
|
||||||
// Check if it is a future block
|
|
||||||
// XXX: since we are storing blocks in memory, we do not want to store blocks
|
|
||||||
// from the tip
|
|
||||||
if block.Index > s.nextBlockIndex+2000 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if block.Index > s.nextBlockIndex {
|
|
||||||
s.addToBlockPool(block)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process Block
|
|
||||||
err := s.processBlock(block)
|
|
||||||
if err != nil && err != chain.ErrBlockAlreadyExists {
|
|
||||||
return s.cfg.FetchBlockAgain(block.Hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check the block pool
|
|
||||||
err = s.checkPool()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if blockhashReceived == the header hash from last get headers this node performed
|
|
||||||
// if not then increment and request next block
|
|
||||||
if s.headerHash != block.Hash {
|
|
||||||
nextHash, err := s.cfg.GetNextBlockHash()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return s.cfg.RequestBlock(nextHash, block.Index)
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we are caught up then go into normal mode
|
|
||||||
diff := peer.Height() - block.Index
|
|
||||||
if diff <= cruiseHeight {
|
|
||||||
s.syncmode = normalMode
|
|
||||||
s.timer.Reset(blockTimer)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If not then we go back into headersMode and request more headers.
|
|
||||||
s.syncmode = headersMode
|
|
||||||
return s.cfg.RequestHeaders(block.Hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Syncmgr) blockModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error {
|
|
||||||
// We ignore headers when in this mode
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,57 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *Syncmgr) addToBlockPool(newBlock payload.Block) {
|
|
||||||
s.poolLock.Lock()
|
|
||||||
defer s.poolLock.Unlock()
|
|
||||||
|
|
||||||
for _, block := range s.blockPool {
|
|
||||||
if block.Index == newBlock.Index {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.blockPool = append(s.blockPool, newBlock)
|
|
||||||
|
|
||||||
// sort slice using block index
|
|
||||||
sort.Slice(s.blockPool, func(i, j int) bool {
|
|
||||||
return s.blockPool[i].Index < s.blockPool[j].Index
|
|
||||||
})
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Syncmgr) checkPool() error {
|
|
||||||
// Assuming that the blocks are sorted in order
|
|
||||||
|
|
||||||
var indexesToRemove = -1
|
|
||||||
|
|
||||||
s.poolLock.Lock()
|
|
||||||
defer func() {
|
|
||||||
// removes all elements before this index, including the element at this index
|
|
||||||
s.blockPool = s.blockPool[indexesToRemove+1:]
|
|
||||||
s.poolLock.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// loop iterates through the cache, processing any
|
|
||||||
// blocks that can be added to the chain
|
|
||||||
for i, block := range s.blockPool {
|
|
||||||
if s.nextBlockIndex != block.Index {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save this block and save the indice location so we can remove it, when we defer
|
|
||||||
err := s.processBlock(block)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
indexesToRemove = i
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,42 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestAddBlockPoolFlush(t *testing.T) {
|
|
||||||
syncmgr, _ := setupSyncMgr(blockMode, 10)
|
|
||||||
|
|
||||||
blockMessage := randomBlockMessage(t, 11)
|
|
||||||
|
|
||||||
peer := &mockPeer{
|
|
||||||
height: 100,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Since the block has Index 11 and the sync manager needs the block with index 10
|
|
||||||
// This block will be added to the blockPool
|
|
||||||
err := syncmgr.OnBlock(peer, blockMessage)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.Equal(t, 1, len(syncmgr.blockPool))
|
|
||||||
|
|
||||||
// The sync manager is still looking for the block at height 10
|
|
||||||
// Since this block is at height 12, it will be added to the block pool
|
|
||||||
blockMessage = randomBlockMessage(t, 12)
|
|
||||||
err = syncmgr.OnBlock(peer, blockMessage)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.Equal(t, 2, len(syncmgr.blockPool))
|
|
||||||
|
|
||||||
// This is the block that the sync manager was waiting for
|
|
||||||
// It should process this block, the check the pool for the next set of blocks
|
|
||||||
blockMessage = randomBlockMessage(t, 10)
|
|
||||||
err = syncmgr.OnBlock(peer, blockMessage)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
assert.Equal(t, 0, len(syncmgr.blockPool))
|
|
||||||
|
|
||||||
// Since we processed 3 blocks and the sync manager started
|
|
||||||
//looking for block with index 10. The syncmananger should be looking for
|
|
||||||
// the block with index 13
|
|
||||||
assert.Equal(t, uint32(13), syncmgr.nextBlockIndex)
|
|
||||||
}
|
|
|
@ -1,44 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Config is the configuration file for the sync manager
|
|
||||||
type Config struct {
|
|
||||||
|
|
||||||
// Chain functions
|
|
||||||
ProcessBlock func(block payload.Block) error
|
|
||||||
ProcessHeaders func(hdrs []*payload.BlockBase) error
|
|
||||||
|
|
||||||
// RequestHeaders will send a getHeaders request
|
|
||||||
// with the hash passed in as a parameter
|
|
||||||
RequestHeaders func(hash util.Uint256) error
|
|
||||||
|
|
||||||
//RequestBlock will send a getdata request for the block
|
|
||||||
// with the hash passed as a parameter
|
|
||||||
RequestBlock func(hash util.Uint256, index uint32) error
|
|
||||||
|
|
||||||
// GetNextBlockHash returns the block hash of the header infront of thr block
|
|
||||||
// at the tip of this nodes chain. This assumes that the node is not in sync
|
|
||||||
GetNextBlockHash func() (util.Uint256, error)
|
|
||||||
|
|
||||||
// AskForNewBlocks will send out a message to the network
|
|
||||||
// asking for new blocks
|
|
||||||
AskForNewBlocks func()
|
|
||||||
|
|
||||||
// FetchHeadersAgain is called when a peer has provided headers that have not
|
|
||||||
// validated properly. We pass in the hash of the first header
|
|
||||||
FetchHeadersAgain func(util.Uint256) error
|
|
||||||
|
|
||||||
// FetchHeadersAgain is called when a peer has provided a block that has not
|
|
||||||
// validated properly. We pass in the hash of the block
|
|
||||||
FetchBlockAgain func(util.Uint256) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncPeer represents a peer on the network
|
|
||||||
// that this node can sync with
|
|
||||||
type SyncPeer interface {
|
|
||||||
Height() uint32
|
|
||||||
}
|
|
|
@ -1,42 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/chain"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
)
|
|
||||||
|
|
||||||
// headersModeOnHeaders is called when the sync manager is headers mode
|
|
||||||
// and receives a header.
|
|
||||||
func (s *Syncmgr) headersModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error {
|
|
||||||
// If we are in Headers mode, then we just need to process the headers
|
|
||||||
// Note: For the un-optimised version, we move straight to blocksOnly mode
|
|
||||||
|
|
||||||
firstHash := hdrs[0].Hash
|
|
||||||
firstHdrIndex := hdrs[0].Index
|
|
||||||
|
|
||||||
err := s.cfg.ProcessHeaders(hdrs)
|
|
||||||
if err == nil {
|
|
||||||
// Update syncmgr last header
|
|
||||||
s.headerHash = hdrs[len(hdrs)-1].Hash
|
|
||||||
|
|
||||||
s.syncmode = blockMode
|
|
||||||
return s.cfg.RequestBlock(firstHash, firstHdrIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check whether it is a validation error, or a database error
|
|
||||||
if _, ok := err.(*chain.ValidationError); ok {
|
|
||||||
// If we get a validation error we re-request the headers
|
|
||||||
// the method will automatically fetch from a different peer
|
|
||||||
// XXX: Add increment banScore for this peer
|
|
||||||
return s.cfg.FetchHeadersAgain(firstHash)
|
|
||||||
}
|
|
||||||
// This means it is a database error. We have no way to recover from this.
|
|
||||||
panic(err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// headersModeOnBlock is called when the sync manager is headers mode
|
|
||||||
// and receives a block.
|
|
||||||
func (s *Syncmgr) headersModeOnBlock(peer SyncPeer, block payload.Block) error {
|
|
||||||
// While in headers mode, ignore any blocks received
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,113 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/rand"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
type syncTestHelper struct {
|
|
||||||
blocksProcessed int
|
|
||||||
headersProcessed int
|
|
||||||
newBlockRequest int
|
|
||||||
headersFetchRequest int
|
|
||||||
blockFetchRequest int
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *syncTestHelper) ProcessBlock(msg payload.Block) error {
|
|
||||||
s.blocksProcessed++
|
|
||||||
return s.err
|
|
||||||
}
|
|
||||||
func (s *syncTestHelper) ProcessHeaders(hdrs []*payload.BlockBase) error {
|
|
||||||
s.headersProcessed = s.headersProcessed + len(hdrs)
|
|
||||||
return s.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *syncTestHelper) GetNextBlockHash() (util.Uint256, error) {
|
|
||||||
return util.Uint256{}, s.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *syncTestHelper) AskForNewBlocks() {
|
|
||||||
s.newBlockRequest++
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *syncTestHelper) FetchHeadersAgain(util.Uint256) error {
|
|
||||||
s.headersFetchRequest++
|
|
||||||
return s.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *syncTestHelper) FetchBlockAgain(util.Uint256) error {
|
|
||||||
s.blockFetchRequest++
|
|
||||||
return s.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *syncTestHelper) RequestBlock(util.Uint256, uint32) error {
|
|
||||||
s.blockFetchRequest++
|
|
||||||
return s.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *syncTestHelper) RequestHeaders(util.Uint256) error {
|
|
||||||
s.headersFetchRequest++
|
|
||||||
return s.err
|
|
||||||
}
|
|
||||||
|
|
||||||
type mockPeer struct {
|
|
||||||
height uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *mockPeer) Height() uint32 { return p.height }
|
|
||||||
|
|
||||||
func randomHeadersMessage(t *testing.T, num int) *payload.HeadersMessage {
|
|
||||||
var hdrs []*payload.BlockBase
|
|
||||||
|
|
||||||
for i := 0; i < num; i++ {
|
|
||||||
hash := randomUint256(t)
|
|
||||||
hdr := &payload.BlockBase{Hash: hash}
|
|
||||||
hdrs = append(hdrs, hdr)
|
|
||||||
}
|
|
||||||
|
|
||||||
hdrsMsg, err := payload.NewHeadersMessage()
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
hdrsMsg.Headers = hdrs
|
|
||||||
|
|
||||||
return hdrsMsg
|
|
||||||
}
|
|
||||||
|
|
||||||
func randomUint256(t *testing.T) util.Uint256 {
|
|
||||||
hash := make([]byte, 32)
|
|
||||||
_, err := rand.Read(hash)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
u, err := util.Uint256DecodeBytes(hash)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
return u
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupSyncMgr(mode mode, nextBlockIndex uint32) (*Syncmgr, *syncTestHelper) {
|
|
||||||
helper := &syncTestHelper{}
|
|
||||||
|
|
||||||
cfg := &Config{
|
|
||||||
ProcessBlock: helper.ProcessBlock,
|
|
||||||
ProcessHeaders: helper.ProcessHeaders,
|
|
||||||
|
|
||||||
GetNextBlockHash: helper.GetNextBlockHash,
|
|
||||||
AskForNewBlocks: helper.AskForNewBlocks,
|
|
||||||
|
|
||||||
FetchHeadersAgain: helper.FetchHeadersAgain,
|
|
||||||
FetchBlockAgain: helper.FetchBlockAgain,
|
|
||||||
|
|
||||||
RequestBlock: helper.RequestBlock,
|
|
||||||
RequestHeaders: helper.RequestHeaders,
|
|
||||||
}
|
|
||||||
|
|
||||||
syncmgr := New(cfg, nextBlockIndex)
|
|
||||||
syncmgr.syncmode = mode
|
|
||||||
|
|
||||||
return syncmgr, helper
|
|
||||||
}
|
|
|
@ -1,60 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *Syncmgr) normalModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error {
|
|
||||||
// If in normal mode, first process the headers
|
|
||||||
err := s.cfg.ProcessHeaders(hdrs)
|
|
||||||
if err != nil {
|
|
||||||
// If something went wrong with processing the headers
|
|
||||||
// Ask another peer for the headers.
|
|
||||||
//XXX: Increment banscore for this peer
|
|
||||||
return s.cfg.FetchHeadersAgain(hdrs[0].Hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
lenHeaders := len(hdrs)
|
|
||||||
firstHash := hdrs[0].Hash
|
|
||||||
firstHdrIndex := hdrs[0].Index
|
|
||||||
lastHash := hdrs[lenHeaders-1].Hash
|
|
||||||
|
|
||||||
// Update syncmgr latest header
|
|
||||||
s.headerHash = lastHash
|
|
||||||
|
|
||||||
// If there are 2k headers, then ask for more headers and switch back to headers mode.
|
|
||||||
if lenHeaders == 2000 {
|
|
||||||
s.syncmode = headersMode
|
|
||||||
return s.cfg.RequestHeaders(lastHash)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ask for the corresponding block iff there is < 2k headers
|
|
||||||
// then switch to blocksMode
|
|
||||||
// Bounds state that len > 1 && len!= 2000 & maxHeadersInMessage == 2000
|
|
||||||
// This means that we have less than 2k headers
|
|
||||||
s.syncmode = blockMode
|
|
||||||
return s.cfg.RequestBlock(firstHash, firstHdrIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// normalModeOnBlock is called when the sync manager is normal mode
|
|
||||||
// and receives a block.
|
|
||||||
func (s *Syncmgr) normalModeOnBlock(peer SyncPeer, block payload.Block) error {
|
|
||||||
// stop the timer that periodically asks for blocks
|
|
||||||
s.timer.Stop()
|
|
||||||
|
|
||||||
// process block
|
|
||||||
err := s.processBlock(block)
|
|
||||||
if err != nil {
|
|
||||||
s.timer.Reset(blockTimer)
|
|
||||||
return s.cfg.FetchBlockAgain(block.Hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
diff := peer.Height() - block.Index
|
|
||||||
if diff > trailingHeight {
|
|
||||||
s.syncmode = headersMode
|
|
||||||
return s.cfg.RequestHeaders(block.Hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.timer.Reset(blockTimer)
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,152 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
type mode uint8
|
|
||||||
|
|
||||||
// Note: this is the unoptimised version without parallel sync
|
|
||||||
// The algorithm for the unoptimsied version is simple:
|
|
||||||
// Download 2000 headers, then download the blocks for those headers
|
|
||||||
// Once those blocks are downloaded, we repeat the process again
|
|
||||||
// Until we are nomore than one block behind the tip.
|
|
||||||
// Once this happens, we switch into normal mode.
|
|
||||||
//In normal mode, we have a timer on for X seconds and ask nodes for blocks and also to doublecheck
|
|
||||||
// if we are behind once the timer runs out.
|
|
||||||
// The timer restarts whenever we receive a block.
|
|
||||||
// The parameter X should be approximately the time it takes the network to reach consensus
|
|
||||||
|
|
||||||
//blockTimer approximates to how long it takes to reach consensus and propagate
|
|
||||||
// a block in the network. Once a node has synchronised with the network, he will
|
|
||||||
// ask the network for a newblock every blockTimer
|
|
||||||
const blockTimer = 20 * time.Second
|
|
||||||
|
|
||||||
// trailingHeight indicates how many blocks the node has to be behind by
|
|
||||||
// before he switches to headersMode.
|
|
||||||
const trailingHeight = 100
|
|
||||||
|
|
||||||
// indicates how many blocks the node has to be behind by
|
|
||||||
// before he switches to normalMode and fetches blocks every X seconds.
|
|
||||||
const cruiseHeight = 0
|
|
||||||
|
|
||||||
const (
|
|
||||||
headersMode mode = 1
|
|
||||||
blockMode mode = 2
|
|
||||||
normalMode mode = 3
|
|
||||||
)
|
|
||||||
|
|
||||||
//Syncmgr keeps the node in sync with the rest of the network
|
|
||||||
type Syncmgr struct {
|
|
||||||
syncmode mode
|
|
||||||
cfg *Config
|
|
||||||
timer *time.Timer
|
|
||||||
|
|
||||||
// headerHash is the hash of the last header in the last OnHeaders message that we received.
|
|
||||||
// When receiving blocks, we can use this to determine whether the node has downloaded
|
|
||||||
// all of the blocks for the last headers messages
|
|
||||||
headerHash util.Uint256
|
|
||||||
|
|
||||||
poolLock sync.Mutex
|
|
||||||
blockPool []payload.Block
|
|
||||||
nextBlockIndex uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates a new sync manager
|
|
||||||
func New(cfg *Config, nextBlockIndex uint32) *Syncmgr {
|
|
||||||
|
|
||||||
newBlockTimer := time.AfterFunc(blockTimer, func() {
|
|
||||||
cfg.AskForNewBlocks()
|
|
||||||
})
|
|
||||||
newBlockTimer.Stop()
|
|
||||||
|
|
||||||
return &Syncmgr{
|
|
||||||
syncmode: headersMode,
|
|
||||||
cfg: cfg,
|
|
||||||
timer: newBlockTimer,
|
|
||||||
nextBlockIndex: nextBlockIndex,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnHeader is called when the node receives a headers message
|
|
||||||
func (s *Syncmgr) OnHeader(peer SyncPeer, msg *payload.HeadersMessage) error {
|
|
||||||
|
|
||||||
// XXX(Optimisation): First check if we actually need these headers
|
|
||||||
// Check the last header in msg and then check what our latest header that was saved is
|
|
||||||
// If our latest header is above the lastHeader, then we do not save it
|
|
||||||
// We could also have that our latest header is above only some of the headers.
|
|
||||||
// In this case, we should remove the headers that we already have
|
|
||||||
|
|
||||||
if len(msg.Headers) == 0 {
|
|
||||||
// XXX: Increment banScore for this peer, for sending empty headers message
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
switch s.syncmode {
|
|
||||||
case headersMode:
|
|
||||||
err = s.headersModeOnHeaders(peer, msg.Headers)
|
|
||||||
case blockMode:
|
|
||||||
err = s.blockModeOnHeaders(peer, msg.Headers)
|
|
||||||
case normalMode:
|
|
||||||
err = s.normalModeOnHeaders(peer, msg.Headers)
|
|
||||||
default:
|
|
||||||
err = s.headersModeOnHeaders(peer, msg.Headers)
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXX(Kev):The only meaningful error here would be if the peer
|
|
||||||
// we re-requested blocks from failed. In the next iteration, this will be handled
|
|
||||||
// by the peer manager, who will only return an error, if we are connected to no peers.
|
|
||||||
// Upon re-alising this, the node will then send out GetAddresses to the network and
|
|
||||||
// syncing will be resumed, once we find peers to connect to.
|
|
||||||
|
|
||||||
hdr := msg.Headers[len(msg.Headers)-1]
|
|
||||||
fmt.Printf("Finished processing headers. LastHash in set was: %s\n ", hdr.Hash.ReverseString())
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnBlock is called when the node receives a block
|
|
||||||
func (s *Syncmgr) OnBlock(peer SyncPeer, msg *payload.BlockMessage) error {
|
|
||||||
fmt.Printf("Block received with height %d\n", msg.Block.Index)
|
|
||||||
|
|
||||||
var err error
|
|
||||||
|
|
||||||
switch s.syncmode {
|
|
||||||
case headersMode:
|
|
||||||
err = s.headersModeOnBlock(peer, msg.Block)
|
|
||||||
case blockMode:
|
|
||||||
err = s.blockModeOnBlock(peer, msg.Block)
|
|
||||||
case normalMode:
|
|
||||||
err = s.normalModeOnBlock(peer, msg.Block)
|
|
||||||
default:
|
|
||||||
err = s.headersModeOnBlock(peer, msg.Block)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("Processed Block with height %d\n", msg.Block.Index)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
//IsCurrent returns true if the node is currently
|
|
||||||
// synced up with the network
|
|
||||||
func (s *Syncmgr) IsCurrent() bool {
|
|
||||||
return s.syncmode == normalMode
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Syncmgr) processBlock(block payload.Block) error {
|
|
||||||
err := s.cfg.ProcessBlock(block)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.nextBlockIndex++
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
|
@ -1,97 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/chain"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/payload"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestHeadersModeOnBlock(t *testing.T) {
|
|
||||||
|
|
||||||
syncmgr, helper := setupSyncMgr(headersMode, 0)
|
|
||||||
|
|
||||||
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
|
|
||||||
|
|
||||||
// In headerMode, we do nothing
|
|
||||||
assert.Equal(t, 0, helper.blocksProcessed)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBlockModeOnBlock(t *testing.T) {
|
|
||||||
|
|
||||||
syncmgr, helper := setupSyncMgr(blockMode, 0)
|
|
||||||
|
|
||||||
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
|
|
||||||
|
|
||||||
// When a block is received in blockMode, it is processed
|
|
||||||
assert.Equal(t, 1, helper.blocksProcessed)
|
|
||||||
}
|
|
||||||
func TestNormalModeOnBlock(t *testing.T) {
|
|
||||||
|
|
||||||
syncmgr, helper := setupSyncMgr(normalMode, 0)
|
|
||||||
|
|
||||||
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0))
|
|
||||||
|
|
||||||
// When a block is received in normal, it is processed
|
|
||||||
assert.Equal(t, 1, helper.blocksProcessed)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBlockModeToNormalMode(t *testing.T) {
|
|
||||||
|
|
||||||
syncmgr, _ := setupSyncMgr(blockMode, 100)
|
|
||||||
|
|
||||||
peer := &mockPeer{
|
|
||||||
height: 100,
|
|
||||||
}
|
|
||||||
|
|
||||||
blkMessage := randomBlockMessage(t, 100)
|
|
||||||
|
|
||||||
syncmgr.OnBlock(peer, blkMessage)
|
|
||||||
|
|
||||||
// We should switch to normal mode, since the block
|
|
||||||
//we received is close to the height of the peer. See cruiseHeight
|
|
||||||
assert.Equal(t, normalMode, syncmgr.syncmode)
|
|
||||||
|
|
||||||
}
|
|
||||||
func TestBlockModeStayInBlockMode(t *testing.T) {
|
|
||||||
|
|
||||||
syncmgr, _ := setupSyncMgr(blockMode, 0)
|
|
||||||
|
|
||||||
// We need our latest know hash to not be equal to the hash
|
|
||||||
// of the block we received, to stay in blockmode
|
|
||||||
syncmgr.headerHash = randomUint256(t)
|
|
||||||
|
|
||||||
peer := &mockPeer{
|
|
||||||
height: 2000,
|
|
||||||
}
|
|
||||||
|
|
||||||
blkMessage := randomBlockMessage(t, 100)
|
|
||||||
|
|
||||||
syncmgr.OnBlock(peer, blkMessage)
|
|
||||||
|
|
||||||
// We should stay in block mode, since the block we received is
|
|
||||||
// still quite far behind the peers height
|
|
||||||
assert.Equal(t, blockMode, syncmgr.syncmode)
|
|
||||||
}
|
|
||||||
func TestBlockModeAlreadyExistsErr(t *testing.T) {
|
|
||||||
|
|
||||||
syncmgr, helper := setupSyncMgr(blockMode, 100)
|
|
||||||
helper.err = chain.ErrBlockAlreadyExists
|
|
||||||
|
|
||||||
syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 100))
|
|
||||||
|
|
||||||
assert.Equal(t, 0, helper.blockFetchRequest)
|
|
||||||
|
|
||||||
// If we have a block already exists in blockmode, then we
|
|
||||||
// switch back to headers mode.
|
|
||||||
assert.Equal(t, headersMode, syncmgr.syncmode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func randomBlockMessage(t *testing.T, height uint32) *payload.BlockMessage {
|
|
||||||
blockMessage, err := payload.NewBlockMessage()
|
|
||||||
blockMessage.BlockBase.Index = height
|
|
||||||
assert.Nil(t, err)
|
|
||||||
return blockMessage
|
|
||||||
}
|
|
|
@ -1,117 +0,0 @@
|
||||||
package syncmgr
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/chain"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"github.com/CityOfZion/neo-go/pkg/wire/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestHeadersModeOnHeaders(t *testing.T) {
|
|
||||||
|
|
||||||
syncmgr, helper := setupSyncMgr(headersMode, 0)
|
|
||||||
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 0))
|
|
||||||
|
|
||||||
// Since there were no headers, we should have exited early and processed nothing
|
|
||||||
assert.Equal(t, 0, helper.headersProcessed)
|
|
||||||
|
|
||||||
// ProcessHeaders should have been called once to process all 100 headers
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100))
|
|
||||||
assert.Equal(t, 100, helper.headersProcessed)
|
|
||||||
|
|
||||||
// Mode should now be blockMode
|
|
||||||
assert.Equal(t, blockMode, syncmgr.syncmode)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBlockModeOnHeaders(t *testing.T) {
|
|
||||||
syncmgr, helper := setupSyncMgr(blockMode, 0)
|
|
||||||
|
|
||||||
// If we receive a header in blockmode, no headers will be processed
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100))
|
|
||||||
assert.Equal(t, 0, helper.headersProcessed)
|
|
||||||
}
|
|
||||||
func TestNormalModeOnHeadersMaxHeaders(t *testing.T) {
|
|
||||||
syncmgr, helper := setupSyncMgr(normalMode, 0)
|
|
||||||
|
|
||||||
// If we receive a header in normalmode, headers will be processed
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 2000))
|
|
||||||
assert.Equal(t, 2000, helper.headersProcessed)
|
|
||||||
|
|
||||||
// Mode should now be headersMode since we received 2000 headers
|
|
||||||
assert.Equal(t, headersMode, syncmgr.syncmode)
|
|
||||||
}
|
|
||||||
|
|
||||||
// This differs from the previous function in that
|
|
||||||
//we did not receive the max amount of headers
|
|
||||||
func TestNormalModeOnHeaders(t *testing.T) {
|
|
||||||
syncmgr, helper := setupSyncMgr(normalMode, 0)
|
|
||||||
|
|
||||||
// If we receive a header in normalmode, headers will be processed
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
|
|
||||||
assert.Equal(t, 200, helper.headersProcessed)
|
|
||||||
|
|
||||||
// Because we did not receive 2000 headers, we switch to blockMode
|
|
||||||
assert.Equal(t, blockMode, syncmgr.syncmode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestLastHeaderUpdates(t *testing.T) {
|
|
||||||
syncmgr, _ := setupSyncMgr(headersMode, 0)
|
|
||||||
|
|
||||||
hdrsMessage := randomHeadersMessage(t, 200)
|
|
||||||
hdrs := hdrsMessage.Headers
|
|
||||||
lastHeader := hdrs[len(hdrs)-1]
|
|
||||||
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, hdrsMessage)
|
|
||||||
|
|
||||||
// Headers are processed in headersMode
|
|
||||||
// Last header should be updated
|
|
||||||
assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash))
|
|
||||||
|
|
||||||
// Change mode to blockMode and reset lastHeader
|
|
||||||
syncmgr.syncmode = blockMode
|
|
||||||
syncmgr.headerHash = util.Uint256{}
|
|
||||||
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, hdrsMessage)
|
|
||||||
|
|
||||||
// header should not be changed
|
|
||||||
assert.False(t, syncmgr.headerHash.Equals(lastHeader.Hash))
|
|
||||||
|
|
||||||
// Change mode to normalMode and reset lastHeader
|
|
||||||
syncmgr.syncmode = normalMode
|
|
||||||
syncmgr.headerHash = util.Uint256{}
|
|
||||||
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, hdrsMessage)
|
|
||||||
|
|
||||||
// headers are processed in normalMode
|
|
||||||
// hash should be updated
|
|
||||||
assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash))
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHeadersModeOnHeadersErr(t *testing.T) {
|
|
||||||
|
|
||||||
syncmgr, helper := setupSyncMgr(headersMode, 0)
|
|
||||||
helper.err = &chain.ValidationError{}
|
|
||||||
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
|
|
||||||
|
|
||||||
// On a validation error, we should request for another peer
|
|
||||||
// to send us these headers
|
|
||||||
assert.Equal(t, 1, helper.headersFetchRequest)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNormalModeOnHeadersErr(t *testing.T) {
|
|
||||||
syncmgr, helper := setupSyncMgr(normalMode, 0)
|
|
||||||
helper.err = &chain.ValidationError{}
|
|
||||||
|
|
||||||
syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200))
|
|
||||||
|
|
||||||
// On a validation error, we should request for another peer
|
|
||||||
// to send us these headers
|
|
||||||
assert.Equal(t, 1, helper.headersFetchRequest)
|
|
||||||
}
|
|
Loading…
Reference in a new issue