From cea983acc6b419e7acafcf363e6920e5e78a31bd Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Wed, 4 Sep 2019 15:30:04 +0300 Subject: [PATCH] _pkg.dev: drop peer/peermgr/syncmgr It was a nice attempt, but there is not a lot that we can reuse for the master branch. Refs. #307. --- _pkg.dev/peer/config.go | 31 -- _pkg.dev/peer/peer.go | 340 --------------------- _pkg.dev/peer/peer_test.go | 196 ------------ _pkg.dev/peer/peerhandshake.go | 132 -------- _pkg.dev/peer/readme.md | 67 ---- _pkg.dev/peer/responsehandlers.go | 111 ------- _pkg.dev/peer/stall/stall.go | 175 ----------- _pkg.dev/peer/stall/stall_test.go | 84 ----- _pkg.dev/peermgr/blockcache.go | 155 ---------- _pkg.dev/peermgr/blockcache_test.go | 80 ----- _pkg.dev/peermgr/peermgr.go | 227 -------------- _pkg.dev/peermgr/peermgr_test.go | 201 ------------ _pkg.dev/syncmgr/blockmode.go | 61 ---- _pkg.dev/syncmgr/blockpool.go | 57 ---- _pkg.dev/syncmgr/blockpool_test.go | 42 --- _pkg.dev/syncmgr/config.go | 44 --- _pkg.dev/syncmgr/headermode.go | 42 --- _pkg.dev/syncmgr/mockhelpers_test.go | 113 ------- _pkg.dev/syncmgr/normalmode.go | 60 ---- _pkg.dev/syncmgr/syncmgr.go | 152 --------- _pkg.dev/syncmgr/syncmgr_onblock_test.go | 97 ------ _pkg.dev/syncmgr/syncmgr_onheaders_test.go | 117 ------- 22 files changed, 2584 deletions(-) delete mode 100644 _pkg.dev/peer/config.go delete mode 100644 _pkg.dev/peer/peer.go delete mode 100644 _pkg.dev/peer/peer_test.go delete mode 100644 _pkg.dev/peer/peerhandshake.go delete mode 100644 _pkg.dev/peer/readme.md delete mode 100644 _pkg.dev/peer/responsehandlers.go delete mode 100644 _pkg.dev/peer/stall/stall.go delete mode 100644 _pkg.dev/peer/stall/stall_test.go delete mode 100644 _pkg.dev/peermgr/blockcache.go delete mode 100644 _pkg.dev/peermgr/blockcache_test.go delete mode 100644 _pkg.dev/peermgr/peermgr.go delete mode 100644 _pkg.dev/peermgr/peermgr_test.go delete mode 100644 _pkg.dev/syncmgr/blockmode.go delete mode 100644 _pkg.dev/syncmgr/blockpool.go delete mode 100644 _pkg.dev/syncmgr/blockpool_test.go delete mode 100644 _pkg.dev/syncmgr/config.go delete mode 100644 _pkg.dev/syncmgr/headermode.go delete mode 100644 _pkg.dev/syncmgr/mockhelpers_test.go delete mode 100644 _pkg.dev/syncmgr/normalmode.go delete mode 100644 _pkg.dev/syncmgr/syncmgr.go delete mode 100644 _pkg.dev/syncmgr/syncmgr_onblock_test.go delete mode 100644 _pkg.dev/syncmgr/syncmgr_onheaders_test.go diff --git a/_pkg.dev/peer/config.go b/_pkg.dev/peer/config.go deleted file mode 100644 index 47c688741..000000000 --- a/_pkg.dev/peer/config.go +++ /dev/null @@ -1,31 +0,0 @@ -package peer - -import ( - "github.com/CityOfZion/neo-go/pkg/wire/payload" - "github.com/CityOfZion/neo-go/pkg/wire/protocol" -) - -// LocalConfig specifies the properties that should be available for each remote peer -type LocalConfig struct { - Net protocol.Magic - UserAgent string - Services protocol.ServiceFlag - Nonce uint32 - ProtocolVer protocol.Version - Relay bool - Port uint16 - - // pointer to config will keep the startheight updated - StartHeight func() uint32 - - // Response Handlers - OnHeader func(*Peer, *payload.HeadersMessage) - OnGetHeaders func(*Peer, *payload.GetHeadersMessage) - OnAddr func(*Peer, *payload.AddrMessage) - OnGetAddr func(*Peer, *payload.GetAddrMessage) - OnInv func(*Peer, *payload.InvMessage) - OnGetData func(*Peer, *payload.GetDataMessage) - OnBlock func(*Peer, *payload.BlockMessage) - OnGetBlocks func(*Peer, *payload.GetBlocksMessage) - OnTx func(*Peer, *payload.TXMessage) -} diff --git a/_pkg.dev/peer/peer.go b/_pkg.dev/peer/peer.go deleted file mode 100644 index 8fe2f28d6..000000000 --- a/_pkg.dev/peer/peer.go +++ /dev/null @@ -1,340 +0,0 @@ -// This impl uses channels to simulate the queue handler with the actor model. -// A suitable number k ,should be set for channel size, because if #numOfMsg > k, -// we lose determinism. k chosen should be large enough that when filled, it shall indicate that -// the peer has stopped responding, since we do not have a pingMSG, we will need another way to shut down -// peers - -package peer - -import ( - "errors" - "fmt" - "net" - "sync" - "sync/atomic" - "time" - - "github.com/CityOfZion/neo-go/pkg/wire/command" - - "github.com/CityOfZion/neo-go/pkg/peer/stall" - "github.com/CityOfZion/neo-go/pkg/wire" - "github.com/CityOfZion/neo-go/pkg/wire/payload" - "github.com/CityOfZion/neo-go/pkg/wire/protocol" - "github.com/CityOfZion/neo-go/pkg/wire/util" -) - -const ( - maxOutboundConnections = 100 - protocolVer = protocol.DefaultVersion - handshakeTimeout = 30 * time.Second - idleTimeout = 5 * time.Minute // If no message received after idleTimeout, then peer disconnects - - // nodes will have `responseTime` seconds to reply with a response - responseTime = 120 * time.Second - - // the stall detector will check every `tickerInterval` to see if messages - // are overdue. Should be less than `responseTime` - tickerInterval = 30 * time.Second - - // The input buffer size is the amount of mesages that - // can be buffered into the channel to receive at once before - // blocking, and before determinism is broken - inputBufferSize = 100 - - // The output buffer size is the amount of messages that - // can be buffered into the channel to send at once before - // blocking, and before determinism is broken. - outputBufferSize = 100 - - // pingInterval = 20 * time.Second //Not implemented in neo clients -) - -var ( - errHandShakeTimeout = errors.New("Handshake timed out, peers have " + string(handshakeTimeout) + " Seconds to Complete the handshake") -) - -// Peer represents a peer on the neo network -type Peer struct { - config LocalConfig - conn net.Conn - - startHeight uint32 - - // atomic vals - disconnected int32 - - //unchangeable state: concurrent safe - addr string - protoVer protocol.Version - port uint16 - inbound bool - userAgent string - services protocol.ServiceFlag - createdAt time.Time - relay bool - - statemutex sync.Mutex - verackReceived bool - versionKnown bool - - *stall.Detector - - inch chan func() // will handle all incoming connections from peer - outch chan func() // will handle all outcoming connections from peer - quitch chan struct{} -} - -// NewPeer returns a new NEO peer -func NewPeer(con net.Conn, inbound bool, cfg LocalConfig) *Peer { - return &Peer{ - inch: make(chan func(), inputBufferSize), - outch: make(chan func(), outputBufferSize), - quitch: make(chan struct{}, 1), - inbound: inbound, - config: cfg, - conn: con, - createdAt: time.Now(), - startHeight: 0, - addr: con.RemoteAddr().String(), - Detector: stall.NewDetector(responseTime, tickerInterval), - } -} - -// Write to a peer -func (p *Peer) Write(msg wire.Messager) error { - return wire.WriteMessage(p.conn, p.config.Net, msg) -} - -// Read to a peer -func (p *Peer) Read() (wire.Messager, error) { - return wire.ReadMessage(p.conn, p.config.Net) -} - -// Disconnect disconnects a peer and closes the connection -func (p *Peer) Disconnect() { - - // return if already disconnected - if atomic.LoadInt32(&p.disconnected) != 0 { - return - } - - atomic.AddInt32(&p.disconnected, 1) - - p.Detector.Quit() - close(p.quitch) - p.conn.Close() - - fmt.Println("Disconnected Peer with address", p.RemoteAddr().String()) -} - -// Port returns the peers port -func (p *Peer) Port() uint16 { - return p.port -} - -// CreatedAt returns the time at which the connection was made -func (p *Peer) CreatedAt() time.Time { - return p.createdAt -} - -// Height returns the latest recorded height of this peer -func (p *Peer) Height() uint32 { - return p.startHeight -} - -// CanRelay returns true, if the peer can relay information -func (p *Peer) CanRelay() bool { - return p.relay -} - -// LocalAddr returns this node's local address -func (p *Peer) LocalAddr() net.Addr { - return p.conn.LocalAddr() -} - -// RemoteAddr returns the remote address of the connected peer -func (p *Peer) RemoteAddr() net.Addr { - return p.conn.RemoteAddr() -} - -// Services returns the services offered by the peer -func (p *Peer) Services() protocol.ServiceFlag { - return p.config.Services -} - -//Inbound returns true whether this peer is an inbound peer -func (p *Peer) Inbound() bool { - return p.inbound -} - -// IsVerackReceived returns true, if this node has -// received a verack from this peer -func (p *Peer) IsVerackReceived() bool { - return p.verackReceived -} - -//NotifyDisconnect returns once the peer has disconnected -// Blocking -func (p *Peer) NotifyDisconnect() { - <-p.quitch - fmt.Println("Peer has just disconnected") -} - -//End of Exposed API functions// - -// PingLoop not impl. in neo yet, adding it now -// will cause this client to disconnect from all other implementations -func (p *Peer) PingLoop() { /*not implemented in other neo clients*/ } - -// Run is used to start communicating with the peer -// completes the handshake and starts observing -// for messages coming in -func (p *Peer) Run() error { - - err := p.Handshake() - if err != nil { - return err - } - go p.StartProtocol() - go p.ReadLoop() - go p.WriteLoop() - - //go p.PingLoop() // since it is not implemented. It will disconnect all other impls. - return nil -} - -// StartProtocol run as a go-routine, will act as our queue for messages -// should be ran after handshake -func (p *Peer) StartProtocol() { -loop: - for atomic.LoadInt32(&p.disconnected) == 0 { - select { - case f := <-p.inch: - f() - case <-p.quitch: - break loop - case <-p.Detector.Quitch: - fmt.Println("Peer stalled, disconnecting") - break loop - } - } - p.Disconnect() -} - -// ReadLoop Will block on the read until a message is read -// Should only be called after handshake is complete -// on a seperate go-routine. -func (p *Peer) ReadLoop() { - - idleTimer := time.AfterFunc(idleTimeout, func() { - fmt.Println("Timing out peer") - p.Disconnect() - }) - -loop: - for atomic.LoadInt32(&p.disconnected) == 0 { - - idleTimer.Reset(idleTimeout) // reset timer on each loop - - readmsg, err := p.Read() - - // Message read; stop Timer - idleTimer.Stop() - - if err != nil { - fmt.Println("Err on read", err) // This will also happen if Peer is disconnected - break loop - } - - // Remove message as pending from the stall detector - p.Detector.RemoveMessage(readmsg.Command()) - - switch msg := readmsg.(type) { - - case *payload.VersionMessage: - fmt.Println("Already received a Version, disconnecting. " + p.RemoteAddr().String()) - break loop // We have already done the handshake, break loop and disconnect - case *payload.VerackMessage: - if p.verackReceived { - fmt.Println("Already received a Verack, disconnecting. " + p.RemoteAddr().String()) - break loop - } - p.statemutex.Lock() // This should not happen, however if it does, then we should set it. - p.verackReceived = true - p.statemutex.Unlock() - case *payload.AddrMessage: - p.OnAddr(msg) - case *payload.GetAddrMessage: - p.OnGetAddr(msg) - case *payload.GetBlocksMessage: - p.OnGetBlocks(msg) - case *payload.BlockMessage: - p.OnBlocks(msg) - case *payload.HeadersMessage: - p.OnHeaders(msg) - case *payload.GetHeadersMessage: - p.OnGetHeaders(msg) - case *payload.InvMessage: - p.OnInv(msg) - case *payload.GetDataMessage: - p.OnGetData(msg) - case *payload.TXMessage: - p.OnTX(msg) - default: - fmt.Println("Cannot recognise message", msg.Command()) //Do not disconnect peer, just Log Message - } - } - - idleTimer.Stop() - p.Disconnect() -} - -// WriteLoop will Queue all messages to be written to the peer. -func (p *Peer) WriteLoop() { - for atomic.LoadInt32(&p.disconnected) == 0 { - select { - case f := <-p.outch: - f() - case <-p.Detector.Quitch: // if the detector quits, disconnect peer - p.Disconnect() - } - } -} - -// Outgoing Requests - -// RequestHeaders will write a getheaders to this peer -func (p *Peer) RequestHeaders(hash util.Uint256) error { - c := make(chan error, 0) - p.outch <- func() { - getHeaders, err := payload.NewGetHeadersMessage([]util.Uint256{hash}, util.Uint256{}) - err = p.Write(getHeaders) - if err != nil { - p.Detector.AddMessage(command.GetHeaders) - } - c <- err - } - return <-c -} - -// RequestBlocks will ask this peer for a set of blocks -func (p *Peer) RequestBlocks(hashes []util.Uint256) error { - c := make(chan error, 0) - - p.outch <- func() { - getdata, err := payload.NewGetDataMessage(payload.InvTypeBlock) - err = getdata.AddHashes(hashes) - if err != nil { - c <- err - return - } - - err = p.Write(getdata) - if err != nil { - p.Detector.AddMessage(command.GetData) - } - - c <- err - } - return <-c -} diff --git a/_pkg.dev/peer/peer_test.go b/_pkg.dev/peer/peer_test.go deleted file mode 100644 index fd27aeb55..000000000 --- a/_pkg.dev/peer/peer_test.go +++ /dev/null @@ -1,196 +0,0 @@ -package peer_test - -import ( - "net" - "testing" - "time" - - "github.com/CityOfZion/neo-go/pkg/peer" - "github.com/CityOfZion/neo-go/pkg/wire" - "github.com/CityOfZion/neo-go/pkg/wire/payload" - "github.com/CityOfZion/neo-go/pkg/wire/protocol" - "github.com/stretchr/testify/assert" -) - -func returnConfig() peer.LocalConfig { - - DefaultHeight := func() uint32 { - return 10 - } - - OnAddr := func(p *peer.Peer, msg *payload.AddrMessage) {} - OnHeader := func(p *peer.Peer, msg *payload.HeadersMessage) {} - OnGetHeaders := func(p *peer.Peer, msg *payload.GetHeadersMessage) {} - OnInv := func(p *peer.Peer, msg *payload.InvMessage) {} - OnGetData := func(p *peer.Peer, msg *payload.GetDataMessage) {} - OnBlock := func(p *peer.Peer, msg *payload.BlockMessage) {} - OnGetBlocks := func(p *peer.Peer, msg *payload.GetBlocksMessage) {} - - return peer.LocalConfig{ - Net: protocol.MainNet, - UserAgent: "NEO-GO-Default", - Services: protocol.NodePeerService, - Nonce: 1200, - ProtocolVer: 0, - Relay: false, - Port: 10332, - // pointer to config will keep the startheight updated for each version - //Message we plan to send - StartHeight: DefaultHeight, - OnHeader: OnHeader, - OnAddr: OnAddr, - OnGetHeaders: OnGetHeaders, - OnInv: OnInv, - OnGetData: OnGetData, - OnBlock: OnBlock, - OnGetBlocks: OnGetBlocks, - } -} - -func TestHandshake(t *testing.T) { - address := ":20338" - go func() { - - conn, err := net.DialTimeout("tcp", address, 2*time.Second) - if err != nil { - t.Fatal(err) - } - p := peer.NewPeer(conn, true, returnConfig()) - err = p.Run() - verack, err := payload.NewVerackMessage() - if err != nil { - t.Fail() - } - if err := p.Write(verack); err != nil { - t.Fatal(err) - } - - assert.Equal(t, true, p.IsVerackReceived()) - - }() - - listener, err := net.Listen("tcp", address) - if err != nil { - t.Fatal(err) - return - } - - defer func() { - listener.Close() - }() - - for { - - conn, err := listener.Accept() - if err != nil { - t.Fatal(err) - } - - tcpAddrMe := &net.TCPAddr{IP: net.ParseIP("82.2.97.142"), Port: 20338} - nonce := uint32(100) - messageVer, err := payload.NewVersionMessage(tcpAddrMe, 2595770, false, protocol.DefaultVersion, protocol.UserAgent, nonce, protocol.NodePeerService) - - if err != nil { - t.Fatal(err) - } - - if err := wire.WriteMessage(conn, protocol.MainNet, messageVer); err != nil { - t.Fatal(err) - return - } - - readmsg, err := wire.ReadMessage(conn, protocol.MainNet) - if err != nil { - t.Fatal(err) - } - version, ok := readmsg.(*payload.VersionMessage) - if !ok { - t.Fatal(err) - } - - assert.NotEqual(t, nil, version) - - messageVrck, err := payload.NewVerackMessage() - if err != nil { - t.Fatal(err) - } - - assert.NotEqual(t, nil, messageVrck) - - if err := wire.WriteMessage(conn, protocol.MainNet, messageVrck); err != nil { - t.Fatal(err) - } - - readmsg, err = wire.ReadMessage(conn, protocol.MainNet) - if err != nil { - t.Fatal(err) - } - - assert.NotEqual(t, nil, readmsg) - - verk, ok := readmsg.(*payload.VerackMessage) - if !ok { - t.Fatal(err) - } - assert.NotEqual(t, nil, verk) - - return - } - -} - -func TestConfigurations(t *testing.T) { - _, conn := net.Pipe() - - inbound := true - - config := returnConfig() - - p := peer.NewPeer(conn, inbound, config) - - // test inbound - assert.Equal(t, inbound, p.Inbound()) - - // handshake not done, should be false - assert.Equal(t, false, p.IsVerackReceived()) - - assert.Equal(t, config.Services, p.Services()) - - assert.Equal(t, config.Relay, p.CanRelay()) - - assert.WithinDuration(t, time.Now(), p.CreatedAt(), 1*time.Second) -} - -func TestPeerDisconnect(t *testing.T) { - // Make sure everything is shutdown - // Make sure timer is shutdown in stall detector too. Should maybe put this part of test into stall detector. - - _, conn := net.Pipe() - inbound := true - config := returnConfig() - p := peer.NewPeer(conn, inbound, config) - - p.Disconnect() - verack, err := payload.NewVerackMessage() - assert.Nil(t, err) - - err = p.Write(verack) - assert.NotNil(t, err) - - // Check if stall detector is still running - _, ok := <-p.Detector.Quitch - assert.Equal(t, ok, false) -} - -func TestNotifyDisconnect(t *testing.T) { - - _, conn := net.Pipe() - inbound := true - config := returnConfig() - p := peer.NewPeer(conn, inbound, config) - - p.Disconnect() - p.NotifyDisconnect() - // TestNotify uses default test timeout as the passing condition - // Failure condition can be seen when you comment out p.Disconnect() -} diff --git a/_pkg.dev/peer/peerhandshake.go b/_pkg.dev/peer/peerhandshake.go deleted file mode 100644 index 90e4ecc81..000000000 --- a/_pkg.dev/peer/peerhandshake.go +++ /dev/null @@ -1,132 +0,0 @@ -package peer - -import ( - "fmt" - "net" - "time" - - "github.com/CityOfZion/neo-go/pkg/wire" - "github.com/CityOfZion/neo-go/pkg/wire/payload" - iputils "github.com/CityOfZion/neo-go/pkg/wire/util/ip" -) - -// Handshake will initiate a handshake with this peer -func (p *Peer) Handshake() error { - - handshakeErr := make(chan error, 1) - go func() { - if p.inbound { - handshakeErr <- p.inboundHandShake() - } else { - handshakeErr <- p.outboundHandShake() - } - }() - - select { - case err := <-handshakeErr: - if err != nil { - return err - } - case <-time.After(handshakeTimeout): - return errHandShakeTimeout - } - - // This is purely here for Logs - if p.inbound { - fmt.Println("inbound handshake with", p.RemoteAddr().String(), "successful") - } else { - - fmt.Println("outbound handshake with", p.RemoteAddr().String(), "successful") - } - return nil -} - -// If this peer has an inbound conn (conn that is going into another peer) -// then he has dialed and so, we must read the version message -func (p *Peer) inboundHandShake() error { - var err error - if err := p.writeLocalVersionMSG(); err != nil { - return err - } - if err := p.readRemoteVersionMSG(); err != nil { - return err - } - verack, err := payload.NewVerackMessage() - if err != nil { - return err - } - err = p.Write(verack) - return p.readVerack() -} -func (p *Peer) outboundHandShake() error { - var err error - err = p.readRemoteVersionMSG() - if err != nil { - return err - } - - err = p.writeLocalVersionMSG() - if err != nil { - return err - } - - err = p.readVerack() - if err != nil { - return err - } - verack, err := payload.NewVerackMessage() - if err != nil { - return err - } - return p.Write(verack) -} -func (p *Peer) writeLocalVersionMSG() error { - - nonce := p.config.Nonce - relay := p.config.Relay - port := int(p.config.Port) - ua := p.config.UserAgent - sh := p.config.StartHeight() - services := p.config.Services - proto := p.config.ProtocolVer - ip := iputils.GetLocalIP() - tcpAddrMe := &net.TCPAddr{IP: ip, Port: port} - - messageVer, err := payload.NewVersionMessage(tcpAddrMe, sh, relay, proto, ua, nonce, services) - - if err != nil { - return err - } - return p.Write(messageVer) -} - -func (p *Peer) readRemoteVersionMSG() error { - readmsg, err := wire.ReadMessage(p.conn, p.config.Net) - if err != nil { - return err - } - - version, ok := readmsg.(*payload.VersionMessage) - if !ok { - return err - } - return p.OnVersion(version) -} - -func (p *Peer) readVerack() error { - readmsg, err := wire.ReadMessage(p.conn, p.config.Net) - - if err != nil { - return err - } - - _, ok := readmsg.(*payload.VerackMessage) - - if !ok { - return err - } - // should only be accessed on one go-routine - p.verackReceived = true - - return nil -} diff --git a/_pkg.dev/peer/readme.md b/_pkg.dev/peer/readme.md deleted file mode 100644 index ea13a62a5..000000000 --- a/_pkg.dev/peer/readme.md +++ /dev/null @@ -1,67 +0,0 @@ -# Package - Peer - - - -## Responsibility - -Once a connection has been made. The connection will represent a established peer to the localNode. Since a connection and the `Wire` is a golang primitive, that we cannot do much with. The peer package will encapsulate both, while adding extra functionality. - - -## Features - -- The handshake protocol is automatically executed and handled by the peer package. If a Version/Verack is received twice, the peer will be disconnected. - -- IdleTimeouts: If a Message is not received from the peer within a set period of time, the peer will be disconnected. - -- StallTimeouts: For Example, If a GetHeaders, is sent to the Peer and a Headers Response is not received within a certain period of time, then the peer is disconnected. - -- Concurrency Model: The concurrency model used is similar to Actor model, with a few changes. Messages can be sent to a peer asynchronously or synchronously. An example of an synchornous message send is the `RequestHeaders` method, where the channel blocks until an error value is received. The `OnHeaders` message is however asynchronously called. Furthermore, all methods passed through the config, are wrapped inside of an additional `Peers` method, this is to lay the ground work to capturing statistics regarding a specific command. These are also used so that we can pass behaviour to be executed down the channel. - -- Configuration: Each Peer will have a config struct passed to it, with information about the Local Peer and functions that will encapsulate the behaviour of what the peer should do, given a request. This way, the peer is not dependent on any other package. - -## Usage - - conn, err := net.Dial("tcp", "seed2.neo.org:10333") - if err != nil { - fmt.Println("Error dialing connection", err.Error()) - return - } - - config := peer.LocalConfig{ - Net: protocol.MainNet, - UserAgent: "NEO-G", - Services: protocol.NodePeerService, - Nonce: 1200, - ProtocolVer: 0, - Relay: false, - Port: 10332, - StartHeight: LocalHeight, - OnHeader: OnHeader, - } - - p := peer.NewPeer(conn, false, config) - err = p.Run() - - hash, err := util.Uint256DecodeString(chainparams.GenesisHash) - // hash2, err := util.Uint256DecodeString("ff8fe95efc5d1cc3a22b17503aecaf289cef68f94b79ddad6f613569ca2342d8") - err = p.RequestHeaders(hash) - - func OnHeader(peer *peer.Peer, msg *payload.HeadersMessage) { - // This function is passed to peer - // and the peer will execute it on receiving a header - } - - func LocalHeight() uint32 { - // This will be a function from the object that handles the block heights - return 10 - } - - -### Notes - - -Should we follow the actor model for Peers? Each Peer will have a ID, which we can take as the PID or if -we launch a go-routine for each peer, then we can use that as an implicit PID. - -Peer information should be stored into a database, if no db exists, we should get it from an initial peers file. -We can use this to periodically store information about a peer. \ No newline at end of file diff --git a/_pkg.dev/peer/responsehandlers.go b/_pkg.dev/peer/responsehandlers.go deleted file mode 100644 index 303ee0759..000000000 --- a/_pkg.dev/peer/responsehandlers.go +++ /dev/null @@ -1,111 +0,0 @@ -package peer - -import ( - "errors" - "time" - - "github.com/CityOfZion/neo-go/pkg/wire/payload" -) - -// OnGetData is called when a GetData message is received -func (p *Peer) OnGetData(msg *payload.GetDataMessage) { - p.inch <- func() { - if p.config.OnInv != nil { - p.config.OnGetData(p, msg) - } - } -} - -//OnTX is called when a TX message is received -func (p *Peer) OnTX(msg *payload.TXMessage) { - p.inch <- func() { - p.inch <- func() { - if p.config.OnTx != nil { - p.config.OnTx(p, msg) - } - } - } -} - -// OnInv is called when a Inv message is received -func (p *Peer) OnInv(msg *payload.InvMessage) { - p.inch <- func() { - if p.config.OnInv != nil { - p.config.OnInv(p, msg) - } - } -} - -// OnGetHeaders is called when a GetHeaders message is received -func (p *Peer) OnGetHeaders(msg *payload.GetHeadersMessage) { - p.inch <- func() { - if p.config.OnGetHeaders != nil { - p.config.OnGetHeaders(p, msg) - } - } -} - -// OnAddr is called when a Addr message is received -func (p *Peer) OnAddr(msg *payload.AddrMessage) { - p.inch <- func() { - if p.config.OnAddr != nil { - p.config.OnAddr(p, msg) - } - } -} - -// OnGetAddr is called when a GetAddr message is received -func (p *Peer) OnGetAddr(msg *payload.GetAddrMessage) { - p.inch <- func() { - if p.config.OnGetAddr != nil { - p.config.OnGetAddr(p, msg) - } - } -} - -// OnGetBlocks is called when a GetBlocks message is received -func (p *Peer) OnGetBlocks(msg *payload.GetBlocksMessage) { - p.inch <- func() { - if p.config.OnGetBlocks != nil { - p.config.OnGetBlocks(p, msg) - } - } -} - -// OnBlocks is called when a Blocks message is received -func (p *Peer) OnBlocks(msg *payload.BlockMessage) { - p.Detector.RemoveMessage(msg.Command()) - p.inch <- func() { - if p.config.OnBlock != nil { - p.config.OnBlock(p, msg) - } - } -} - -// OnHeaders is called when a Headers message is received -func (p *Peer) OnHeaders(msg *payload.HeadersMessage) { - p.Detector.RemoveMessage(msg.Command()) - p.inch <- func() { - if p.config.OnHeader != nil { - p.config.OnHeader(p, msg) - } - } -} - -// OnVersion Listener will be called -// during the handshake, any error checking should be done here for the versionMessage. -// This should only ever be called during the handshake. Any other place and the peer will disconnect. -func (p *Peer) OnVersion(msg *payload.VersionMessage) error { - if msg.Nonce == p.config.Nonce { - p.conn.Close() - return errors.New("self connection, disconnecting Peer") - } - p.versionKnown = true - p.port = msg.Port - p.services = msg.Services - p.userAgent = string(msg.UserAgent) - p.createdAt = time.Now() - p.relay = msg.Relay - p.startHeight = msg.StartHeight - return nil -} diff --git a/_pkg.dev/peer/stall/stall.go b/_pkg.dev/peer/stall/stall.go deleted file mode 100644 index e69bcb442..000000000 --- a/_pkg.dev/peer/stall/stall.go +++ /dev/null @@ -1,175 +0,0 @@ -package stall - -import ( - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/CityOfZion/neo-go/pkg/wire/command" -) - -// Detector (stall detector) will keep track of all pendingMessages -// If any message takes too long to reply -// the detector will disconnect the peer -type Detector struct { - responseTime time.Duration - tickInterval time.Duration - - lock *sync.RWMutex - responses map[command.Type]time.Time - - // The detector is embedded into a peer and the peer watches this quit chan - // If this chan is closed, the peer disconnects - Quitch chan struct{} - - // atomic vals - disconnected int32 -} - -// NewDetector will create a new stall detector -// rT is the responseTime and signals how long -// a peer has to reply back to a sent message -// tickerInterval is how often the detector wil check for stalled messages -func NewDetector(rTime time.Duration, tickerInterval time.Duration) *Detector { - d := &Detector{ - responseTime: rTime, - tickInterval: tickerInterval, - lock: new(sync.RWMutex), - responses: map[command.Type]time.Time{}, - Quitch: make(chan struct{}), - } - go d.loop() - return d -} - -func (d *Detector) loop() { - ticker := time.NewTicker(d.tickInterval) - - defer func() { - d.Quit() - d.DeleteAll() - ticker.Stop() - }() - - for { - select { - case <-ticker.C: - now := time.Now() - d.lock.RLock() - resp := d.responses - d.lock.RUnlock() - for _, deadline := range resp { - if now.After(deadline) { - fmt.Println(resp) - fmt.Println("Deadline passed") - return - } - } - } - } -} - -// Quit is a concurrent safe way to call the Quit channel -// Without blocking -func (d *Detector) Quit() { - // return if already disconnected - if atomic.LoadInt32(&d.disconnected) != 0 { - return - } - - atomic.AddInt32(&d.disconnected, 1) - close(d.Quitch) -} - -//AddMessage will add a message to the responses map -// Call this function when we send a message to a peer -// The command passed through is the command that we sent -// we will then set a timer for the expected message(s) -func (d *Detector) AddMessage(cmd command.Type) { - cmds := d.addMessage(cmd) - d.lock.Lock() - for _, cmd := range cmds { - d.responses[cmd] = time.Now().Add(d.responseTime) - } - d.lock.Unlock() -} - -// RemoveMessage remove messages from the responses map -// Call this function when we receive a message from -// peer. This will remove the pendingresponse message from the map. -// The command passed through is the command we received -func (d *Detector) RemoveMessage(cmd command.Type) { - cmds := d.removeMessage(cmd) - d.lock.Lock() - for _, cmd := range cmds { - delete(d.responses, cmd) - } - d.lock.Unlock() -} - -// DeleteAll empties the map of all contents and -// is called when the detector is being shut down -func (d *Detector) DeleteAll() { - d.lock.Lock() - d.responses = make(map[command.Type]time.Time) - d.lock.Unlock() -} - -// GetMessages Will return a map of all of the pendingResponses -// and their deadlines -func (d *Detector) GetMessages() map[command.Type]time.Time { - var resp map[command.Type]time.Time - d.lock.RLock() - resp = d.responses - d.lock.RUnlock() - return resp -} - -// when a message is added, we will add a deadline for -// expected response -func (d *Detector) addMessage(cmd command.Type) []command.Type { - var cmds []command.Type - - switch cmd { - case command.GetHeaders: - // We now will expect a Headers Message - cmds = append(cmds, command.Headers) - case command.GetAddr: - // We now will expect a Headers Message - cmds = append(cmds, command.Addr) - case command.GetData: - // We will now expect a block/tx message - cmds = append(cmds, command.Block) - cmds = append(cmds, command.TX) - case command.GetBlocks: - // we will now expect a inv message - cmds = append(cmds, command.Inv) - case command.Version: - // We will now expect a verack - cmds = append(cmds, command.Verack) - } - return cmds -} - -// if receive a message, we will delete it from pending -func (d *Detector) removeMessage(cmd command.Type) []command.Type { - var cmds []command.Type - - switch cmd { - case command.Block: - // We will now remove a block and tx message - cmds = append(cmds, command.Block) - cmds = append(cmds, command.TX) - case command.TX: - // We will now remove a block and tx message - cmds = append(cmds, command.Block) - cmds = append(cmds, command.TX) - case command.Verack: - // We will now expect a verack - cmds = append(cmds, cmd) - default: - cmds = append(cmds, cmd) - } - return cmds -} diff --git a/_pkg.dev/peer/stall/stall_test.go b/_pkg.dev/peer/stall/stall_test.go deleted file mode 100644 index 83de4e3a0..000000000 --- a/_pkg.dev/peer/stall/stall_test.go +++ /dev/null @@ -1,84 +0,0 @@ -package stall - -import ( - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/CityOfZion/neo-go/pkg/wire/command" -) - -func TestAddRemoveMessage(t *testing.T) { - - responseTime := 2 * time.Millisecond - tickerInterval := 1 * time.Millisecond - - d := NewDetector(responseTime, tickerInterval) - d.AddMessage(command.GetAddr) - mp := d.GetMessages() - - assert.Equal(t, 1, len(mp)) - assert.IsType(t, time.Time{}, mp[command.GetAddr]) - - d.RemoveMessage(command.Addr) - mp = d.GetMessages() - - assert.Equal(t, 0, len(mp)) - assert.Empty(t, mp[command.GetAddr]) -} - -type mockPeer struct { - lock *sync.RWMutex - online bool - detector *Detector -} - -func (mp *mockPeer) loop() { -loop: - for { - select { - case <-mp.detector.Quitch: - - break loop - } - } - // cleanup - mp.lock.Lock() - mp.online = false - mp.lock.Unlock() -} -func TestDeadlineWorks(t *testing.T) { - - responseTime := 2 * time.Millisecond - tickerInterval := 1 * time.Millisecond - - d := NewDetector(responseTime, tickerInterval) - mp := mockPeer{online: true, detector: d, lock: new(sync.RWMutex)} - go mp.loop() - - d.AddMessage(command.GetAddr) - time.Sleep(responseTime + 1*time.Millisecond) - - k := make(map[command.Type]time.Time) - d.lock.RLock() - assert.Equal(t, k, d.responses) - d.lock.RUnlock() - mp.lock.RLock() - assert.Equal(t, false, mp.online) - mp.lock.RUnlock() -} -func TestDeadlineShouldNotBeEmpty(t *testing.T) { - responseTime := 10 * time.Millisecond - tickerInterval := 1 * time.Millisecond - - d := NewDetector(responseTime, tickerInterval) - d.AddMessage(command.GetAddr) - time.Sleep(1 * time.Millisecond) - - k := make(map[command.Type]time.Time) - d.lock.RLock() - assert.NotEqual(t, k, d.responses) - d.lock.RUnlock() -} diff --git a/_pkg.dev/peermgr/blockcache.go b/_pkg.dev/peermgr/blockcache.go deleted file mode 100644 index 8e06b8251..000000000 --- a/_pkg.dev/peermgr/blockcache.go +++ /dev/null @@ -1,155 +0,0 @@ -package peermgr - -import ( - "errors" - "sort" - "sync" - - "github.com/CityOfZion/neo-go/pkg/wire/util" -) - -var ( - //ErrCacheLimit is returned when the cache limit is reached - ErrCacheLimit = errors.New("nomore items can be added to the cache") - - //ErrNoItems is returned when pickItem is called and there are no items in the cache - ErrNoItems = errors.New("there are no items in the cache") - - //ErrDuplicateItem is returned when you try to add the same item, more than once to the cache - ErrDuplicateItem = errors.New("this item is already in the cache") -) - -//BlockInfo holds the necessary information that the cache needs -// to sort and store block requests -type BlockInfo struct { - BlockHash util.Uint256 - BlockIndex uint32 -} - -// Equals returns true if two blockInfo objects -// have the same hash and the same index -func (bi *BlockInfo) Equals(other BlockInfo) bool { - return bi.BlockHash.Equals(other.BlockHash) && bi.BlockIndex == other.BlockIndex -} - -// indexSorter sorts the blockInfos by blockIndex. -type indexSorter []BlockInfo - -func (is indexSorter) Len() int { return len(is) } -func (is indexSorter) Swap(i, j int) { is[i], is[j] = is[j], is[i] } -func (is indexSorter) Less(i, j int) bool { return is[i].BlockIndex < is[j].BlockIndex } - -//blockCache will cache any pending block requests -// for the node when there are no available nodes -type blockCache struct { - cacheLimit int - cacheLock sync.Mutex - cache []BlockInfo -} - -func newBlockCache(cacheLimit int) *blockCache { - return &blockCache{ - cache: make([]BlockInfo, 0, cacheLimit), - cacheLimit: cacheLimit, - } -} - -func (bc *blockCache) addBlockInfo(bi BlockInfo) error { - if bc.cacheLen() == bc.cacheLimit { - return ErrCacheLimit - } - - bc.cacheLock.Lock() - defer bc.cacheLock.Unlock() - - // Check for duplicates. slice will always be small so a simple for loop will work - for _, bInfo := range bc.cache { - if bInfo.Equals(bi) { - return ErrDuplicateItem - } - } - bc.cache = append(bc.cache, bi) - - sort.Sort(indexSorter(bc.cache)) - - return nil -} - -func (bc *blockCache) addBlockInfos(bis []BlockInfo) error { - - if len(bis)+bc.cacheLen() > bc.cacheLimit { - return errors.New("too many items to add, this will exceed the cache limit") - } - - for _, bi := range bis { - err := bc.addBlockInfo(bi) - if err != nil { - return err - } - } - return nil -} - -func (bc *blockCache) cacheLen() int { - bc.cacheLock.Lock() - defer bc.cacheLock.Unlock() - return len(bc.cache) -} - -func (bc *blockCache) pickFirstItem() (BlockInfo, error) { - return bc.pickItem(0) -} - -func (bc *blockCache) pickAllItems() ([]BlockInfo, error) { - - numOfItems := bc.cacheLen() - - items := make([]BlockInfo, 0, numOfItems) - - for i := 0; i < numOfItems; i++ { - bi, err := bc.pickFirstItem() - if err != nil { - return nil, err - } - items = append(items, bi) - } - return items, nil -} - -func (bc *blockCache) pickItem(i uint) (BlockInfo, error) { - if bc.cacheLen() < 1 { - return BlockInfo{}, ErrNoItems - } - - if i >= uint(bc.cacheLen()) { - return BlockInfo{}, errors.New("index out of range") - } - - bc.cacheLock.Lock() - defer bc.cacheLock.Unlock() - - item := bc.cache[i] - bc.cache = append(bc.cache[:i], bc.cache[i+1:]...) - return item, nil -} - -func (bc *blockCache) removeHash(hashToRemove util.Uint256) error { - index, err := bc.findHash(hashToRemove) - if err != nil { - return err - } - - _, err = bc.pickItem(uint(index)) - return err -} - -func (bc *blockCache) findHash(hashToFind util.Uint256) (int, error) { - bc.cacheLock.Lock() - defer bc.cacheLock.Unlock() - for i, bInfo := range bc.cache { - if bInfo.BlockHash.Equals(hashToFind) { - return i, nil - } - } - return -1, errors.New("hash cannot be found in the cache") -} diff --git a/_pkg.dev/peermgr/blockcache_test.go b/_pkg.dev/peermgr/blockcache_test.go deleted file mode 100644 index 3cb928e9e..000000000 --- a/_pkg.dev/peermgr/blockcache_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package peermgr - -import ( - "math/rand" - "testing" - - "github.com/CityOfZion/neo-go/pkg/wire/util" - "github.com/stretchr/testify/assert" -) - -func TestAddBlock(t *testing.T) { - - bc := &blockCache{ - cacheLimit: 20, - } - bi := randomBlockInfo(t) - - err := bc.addBlockInfo(bi) - assert.Equal(t, nil, err) - - assert.Equal(t, 1, bc.cacheLen()) - - err = bc.addBlockInfo(bi) - assert.Equal(t, ErrDuplicateItem, err) - - assert.Equal(t, 1, bc.cacheLen()) -} - -func TestCacheLimit(t *testing.T) { - - bc := &blockCache{ - cacheLimit: 20, - } - - for i := 0; i < bc.cacheLimit; i++ { - err := bc.addBlockInfo(randomBlockInfo(t)) - assert.Equal(t, nil, err) - } - - err := bc.addBlockInfo(randomBlockInfo(t)) - assert.Equal(t, ErrCacheLimit, err) - - assert.Equal(t, bc.cacheLimit, bc.cacheLen()) -} -func TestPickItem(t *testing.T) { - - bc := &blockCache{ - cacheLimit: 20, - } - - for i := 0; i < bc.cacheLimit; i++ { - err := bc.addBlockInfo(randomBlockInfo(t)) - assert.Equal(t, nil, err) - } - - for i := 0; i < bc.cacheLimit; i++ { - _, err := bc.pickFirstItem() - assert.Equal(t, nil, err) - } - - assert.Equal(t, 0, bc.cacheLen()) -} - -func randomUint256(t *testing.T) util.Uint256 { - rand32 := make([]byte, 32) - rand.Read(rand32) - - u, err := util.Uint256DecodeBytes(rand32) - assert.Equal(t, nil, err) - - return u -} - -func randomBlockInfo(t *testing.T) BlockInfo { - - return BlockInfo{ - randomUint256(t), - rand.Uint32(), - } -} diff --git a/_pkg.dev/peermgr/peermgr.go b/_pkg.dev/peermgr/peermgr.go deleted file mode 100644 index e83e37768..000000000 --- a/_pkg.dev/peermgr/peermgr.go +++ /dev/null @@ -1,227 +0,0 @@ -package peermgr - -import ( - "errors" - "fmt" - "sync" - - "github.com/CityOfZion/neo-go/pkg/wire/command" - - "github.com/CityOfZion/neo-go/pkg/wire/util" -) - -const ( - // blockCacheLimit is the maximum amount of pending requests that the cache can hold - pendingBlockCacheLimit = 20 - - //peerBlockCacheLimit is the maximum amount of inflight blocks that a peer can - // have, before they are flagged as busy - peerBlockCacheLimit = 1 -) - -var ( - //ErrNoAvailablePeers is returned when a request for data from a peer is invoked - // but there are no available peers to request data from - ErrNoAvailablePeers = errors.New("there are no available peers to interact with") - - // ErrUnknownPeer is returned when a peer that the peer manager does not know about - // sends a message to this node - ErrUnknownPeer = errors.New("this peer has not been registered with the peer manager") -) - -//mPeer represents a peer that is managed by the peer manager -type mPeer interface { - Disconnect() - RequestBlocks([]util.Uint256) error - RequestHeaders(util.Uint256) error - NotifyDisconnect() -} - -type peerstats struct { - // when a peer is sent a blockRequest - // the peermanager will track this using this blockCache - blockCache *blockCache - // all other requests will be tracked using the requests map - requests map[command.Type]bool -} - -//PeerMgr manages all peers that the node is connected to -type PeerMgr struct { - pLock sync.RWMutex - peers map[mPeer]peerstats - - requestCache *blockCache -} - -//New returns a new peermgr object -func New() *PeerMgr { - return &PeerMgr{ - peers: make(map[mPeer]peerstats), - requestCache: newBlockCache(pendingBlockCacheLimit), - } -} - -// AddPeer adds a peer to the list of managed peers -func (pmgr *PeerMgr) AddPeer(peer mPeer) { - - pmgr.pLock.Lock() - defer pmgr.pLock.Unlock() - if _, exists := pmgr.peers[peer]; exists { - return - } - pmgr.peers[peer] = peerstats{ - requests: make(map[command.Type]bool), - blockCache: newBlockCache(peerBlockCacheLimit), - } - go pmgr.onDisconnect(peer) -} - -//MsgReceived notifies the peer manager that we have received a -// message from a peer -func (pmgr *PeerMgr) MsgReceived(peer mPeer, cmd command.Type) error { - pmgr.pLock.Lock() - defer pmgr.pLock.Unlock() - - // if peer was unknown then disconnect - val, ok := pmgr.peers[peer] - if !ok { - - go func() { - peer.NotifyDisconnect() - }() - - peer.Disconnect() - return ErrUnknownPeer - } - val.requests[cmd] = false - - return nil -} - -//BlockMsgReceived notifies the peer manager that we have received a -// block message from a peer -func (pmgr *PeerMgr) BlockMsgReceived(peer mPeer, bi BlockInfo) error { - - // if peer was unknown then disconnect - val, ok := pmgr.peers[peer] - if !ok { - - go func() { - peer.NotifyDisconnect() - }() - - peer.Disconnect() - return ErrUnknownPeer - } - - // // remove item from the peersBlock cache - err := val.blockCache.removeHash(bi.BlockHash) - if err != nil { - return err - } - - // check if cache empty, if so then return - if pmgr.requestCache.cacheLen() == 0 { - return nil - } - - // Try to clean an item from the pendingBlockCache, a peer has just finished serving a block request - cachedBInfo, err := pmgr.requestCache.pickFirstItem() - if err != nil { - return err - } - - return pmgr.blockCallPeer(cachedBInfo, func(p mPeer) error { - return p.RequestBlocks([]util.Uint256{cachedBInfo.BlockHash}) - }) -} - -// Len returns the amount of peers that the peer manager -//currently knows about -func (pmgr *PeerMgr) Len() int { - pmgr.pLock.Lock() - defer pmgr.pLock.Unlock() - return len(pmgr.peers) -} - -// RequestBlock will request a block from the most -// available peer. Then update it's stats, so we know that -// this peer is busy -func (pmgr *PeerMgr) RequestBlock(bi BlockInfo) error { - pmgr.pLock.Lock() - defer pmgr.pLock.Unlock() - - err := pmgr.blockCallPeer(bi, func(p mPeer) error { - return p.RequestBlocks([]util.Uint256{bi.BlockHash}) - }) - - if err == ErrNoAvailablePeers { - return pmgr.requestCache.addBlockInfo(bi) - } - - return err -} - -// RequestHeaders will request a headers from the most available peer. -func (pmgr *PeerMgr) RequestHeaders(hash util.Uint256) error { - pmgr.pLock.Lock() - defer pmgr.pLock.Unlock() - return pmgr.callPeerForCmd(command.Headers, func(p mPeer) error { - return p.RequestHeaders(hash) - }) -} - -func (pmgr *PeerMgr) callPeerForCmd(cmd command.Type, f func(p mPeer) error) error { - for peer, stats := range pmgr.peers { - if !stats.requests[cmd] { - stats.requests[cmd] = true - return f(peer) - } - } - return ErrNoAvailablePeers -} - -func (pmgr *PeerMgr) blockCallPeer(bi BlockInfo, f func(p mPeer) error) error { - for peer, stats := range pmgr.peers { - if stats.blockCache.cacheLen() < peerBlockCacheLimit { - err := stats.blockCache.addBlockInfo(bi) - if err != nil { - return err - } - return f(peer) - } - } - return ErrNoAvailablePeers -} - -func (pmgr *PeerMgr) onDisconnect(p mPeer) { - - // Blocking until peer is disconnected - p.NotifyDisconnect() - - pmgr.pLock.Lock() - defer func() { - delete(pmgr.peers, p) - pmgr.pLock.Unlock() - }() - - // Add all of peers outstanding block requests into - // the peer managers pendingBlockRequestCache - - val, ok := pmgr.peers[p] - if !ok { - return - } - - pendingRequests, err := val.blockCache.pickAllItems() - if err != nil { - fmt.Println(err.Error()) - return - } - - err = pmgr.requestCache.addBlockInfos(pendingRequests) - if err != nil { - fmt.Println(err.Error()) - return - } -} diff --git a/_pkg.dev/peermgr/peermgr_test.go b/_pkg.dev/peermgr/peermgr_test.go deleted file mode 100644 index 9a725af43..000000000 --- a/_pkg.dev/peermgr/peermgr_test.go +++ /dev/null @@ -1,201 +0,0 @@ -package peermgr - -import ( - "testing" - - "github.com/CityOfZion/neo-go/pkg/wire/command" - "github.com/CityOfZion/neo-go/pkg/wire/util" - "github.com/stretchr/testify/assert" -) - -type peer struct { - quit chan bool - nonce int - disconnected bool - blockRequested int - headersRequested int -} - -func (p *peer) Disconnect() { - p.disconnected = true - p.quit <- true -} -func (p *peer) RequestBlocks([]util.Uint256) error { - p.blockRequested++ - return nil -} -func (p *peer) RequestHeaders(util.Uint256) error { - p.headersRequested++ - return nil -} -func (p *peer) NotifyDisconnect() { - <-p.quit -} - -func TestAddPeer(t *testing.T) { - pmgr := New() - - peerA := &peer{nonce: 1} - peerB := &peer{nonce: 2} - peerC := &peer{nonce: 3} - - pmgr.AddPeer(peerA) - pmgr.AddPeer(peerB) - pmgr.AddPeer(peerC) - pmgr.AddPeer(peerC) - - assert.Equal(t, 3, pmgr.Len()) -} - -func TestRequestBlocks(t *testing.T) { - pmgr := New() - - peerA := &peer{nonce: 1} - peerB := &peer{nonce: 2} - peerC := &peer{nonce: 3} - - pmgr.AddPeer(peerA) - pmgr.AddPeer(peerB) - pmgr.AddPeer(peerC) - - firstBlock := randomBlockInfo(t) - err := pmgr.RequestBlock(firstBlock) - assert.Nil(t, err) - - secondBlock := randomBlockInfo(t) - err = pmgr.RequestBlock(secondBlock) - assert.Nil(t, err) - - thirdBlock := randomBlockInfo(t) - err = pmgr.RequestBlock(thirdBlock) - assert.Nil(t, err) - - // Since the peer manager did not get a MsgReceived - // in between the block requests - // a request should be sent to all peers - // This is only true, if peerBlockCacheLimit == 1 - - assert.Equal(t, 1, peerA.blockRequested) - assert.Equal(t, 1, peerB.blockRequested) - assert.Equal(t, 1, peerC.blockRequested) - - // Since the peer manager still has not received a MsgReceived - // another call to request blocks, will add the request to the cache - // and return a nil err - - fourthBlock := randomBlockInfo(t) - err = pmgr.RequestBlock(fourthBlock) - assert.Equal(t, nil, err) - assert.Equal(t, 1, pmgr.requestCache.cacheLen()) - - // If we tell the peer manager that we have received a block - // it will check the cache for any pending requests and send a block request if there are any. - // The request will go to the peer who sent back the block corresponding to the first hash - // since the other two peers are still busy with their block requests - - peer := findPeerwithHash(t, pmgr, firstBlock.BlockHash) - err = pmgr.BlockMsgReceived(peer, firstBlock) - assert.Nil(t, err) - - totalRequests := peerA.blockRequested + peerB.blockRequested + peerC.blockRequested - assert.Equal(t, 4, totalRequests) - - // // cache should be empty now - assert.Equal(t, 0, pmgr.requestCache.cacheLen()) -} - -// The peer manager does not tell you what peer was sent a particular block request -// For testing purposes, the following function will find that peer -func findPeerwithHash(t *testing.T, pmgr *PeerMgr, blockHash util.Uint256) mPeer { - for peer, stats := range pmgr.peers { - _, err := stats.blockCache.findHash(blockHash) - if err == nil { - return peer - } - } - assert.Fail(t, "cannot find a peer with that hash") - return nil -} - -func TestRequestHeaders(t *testing.T) { - pmgr := New() - - peerA := &peer{nonce: 1} - peerB := &peer{nonce: 2} - peerC := &peer{nonce: 3} - - pmgr.AddPeer(peerA) - pmgr.AddPeer(peerB) - pmgr.AddPeer(peerC) - - err := pmgr.RequestHeaders(util.Uint256{}) - assert.Nil(t, err) - - err = pmgr.RequestHeaders(util.Uint256{}) - assert.Nil(t, err) - - err = pmgr.RequestHeaders(util.Uint256{}) - assert.Nil(t, err) - - // Since the peer manager did not get a MsgReceived - // in between the header requests - // a request should be sent to all peers - - assert.Equal(t, 1, peerA.headersRequested) - assert.Equal(t, 1, peerB.headersRequested) - assert.Equal(t, 1, peerC.headersRequested) - - // Since the peer manager still has not received a MsgReceived - // another call to request header, will return a NoAvailablePeerError - - err = pmgr.RequestHeaders(util.Uint256{}) - assert.Equal(t, ErrNoAvailablePeers, err) - - // If we tell the peer manager that peerA has given us a block - // then send another BlockRequest. It will go to peerA - // since the other two peers are still busy with their - // block requests - - err = pmgr.MsgReceived(peerA, command.Headers) - assert.Nil(t, err) - err = pmgr.RequestHeaders(util.Uint256{}) - assert.Nil(t, err) - - assert.Equal(t, 2, peerA.headersRequested) - assert.Equal(t, 1, peerB.headersRequested) - assert.Equal(t, 1, peerC.headersRequested) -} - -func TestUnknownPeer(t *testing.T) { - pmgr := New() - - unknownPeer := &peer{ - disconnected: false, - quit: make(chan bool), - } - - err := pmgr.MsgReceived(unknownPeer, command.Headers) - assert.Equal(t, true, unknownPeer.disconnected) - assert.Equal(t, ErrUnknownPeer, err) -} - -func TestNotifyDisconnect(t *testing.T) { - pmgr := New() - - peerA := &peer{ - nonce: 1, - quit: make(chan bool), - } - - pmgr.AddPeer(peerA) - - if pmgr.Len() != 1 { - t.Fail() - } - - peerA.Disconnect() - - if pmgr.Len() != 0 { - t.Fail() - } -} diff --git a/_pkg.dev/syncmgr/blockmode.go b/_pkg.dev/syncmgr/blockmode.go deleted file mode 100644 index f357ca642..000000000 --- a/_pkg.dev/syncmgr/blockmode.go +++ /dev/null @@ -1,61 +0,0 @@ -package syncmgr - -import ( - "github.com/CityOfZion/neo-go/pkg/chain" - "github.com/CityOfZion/neo-go/pkg/wire/payload" -) - -// blockModeOnBlock is called when the sync manager is block mode -// and receives a block. -func (s *Syncmgr) blockModeOnBlock(peer SyncPeer, block payload.Block) error { - - // Check if it is a future block - // XXX: since we are storing blocks in memory, we do not want to store blocks - // from the tip - if block.Index > s.nextBlockIndex+2000 { - return nil - } - if block.Index > s.nextBlockIndex { - s.addToBlockPool(block) - return nil - } - - // Process Block - err := s.processBlock(block) - if err != nil && err != chain.ErrBlockAlreadyExists { - return s.cfg.FetchBlockAgain(block.Hash) - } - - // Check the block pool - err = s.checkPool() - if err != nil { - return err - } - - // Check if blockhashReceived == the header hash from last get headers this node performed - // if not then increment and request next block - if s.headerHash != block.Hash { - nextHash, err := s.cfg.GetNextBlockHash() - if err != nil { - return err - } - return s.cfg.RequestBlock(nextHash, block.Index) - } - - // If we are caught up then go into normal mode - diff := peer.Height() - block.Index - if diff <= cruiseHeight { - s.syncmode = normalMode - s.timer.Reset(blockTimer) - return nil - } - - // If not then we go back into headersMode and request more headers. - s.syncmode = headersMode - return s.cfg.RequestHeaders(block.Hash) -} - -func (s *Syncmgr) blockModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error { - // We ignore headers when in this mode - return nil -} diff --git a/_pkg.dev/syncmgr/blockpool.go b/_pkg.dev/syncmgr/blockpool.go deleted file mode 100644 index 2b1f37761..000000000 --- a/_pkg.dev/syncmgr/blockpool.go +++ /dev/null @@ -1,57 +0,0 @@ -package syncmgr - -import ( - "sort" - - "github.com/CityOfZion/neo-go/pkg/wire/payload" -) - -func (s *Syncmgr) addToBlockPool(newBlock payload.Block) { - s.poolLock.Lock() - defer s.poolLock.Unlock() - - for _, block := range s.blockPool { - if block.Index == newBlock.Index { - return - } - } - - s.blockPool = append(s.blockPool, newBlock) - - // sort slice using block index - sort.Slice(s.blockPool, func(i, j int) bool { - return s.blockPool[i].Index < s.blockPool[j].Index - }) - -} - -func (s *Syncmgr) checkPool() error { - // Assuming that the blocks are sorted in order - - var indexesToRemove = -1 - - s.poolLock.Lock() - defer func() { - // removes all elements before this index, including the element at this index - s.blockPool = s.blockPool[indexesToRemove+1:] - s.poolLock.Unlock() - }() - - // loop iterates through the cache, processing any - // blocks that can be added to the chain - for i, block := range s.blockPool { - if s.nextBlockIndex != block.Index { - break - } - - // Save this block and save the indice location so we can remove it, when we defer - err := s.processBlock(block) - if err != nil { - return err - } - - indexesToRemove = i - } - - return nil -} diff --git a/_pkg.dev/syncmgr/blockpool_test.go b/_pkg.dev/syncmgr/blockpool_test.go deleted file mode 100644 index c236afc2c..000000000 --- a/_pkg.dev/syncmgr/blockpool_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package syncmgr - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestAddBlockPoolFlush(t *testing.T) { - syncmgr, _ := setupSyncMgr(blockMode, 10) - - blockMessage := randomBlockMessage(t, 11) - - peer := &mockPeer{ - height: 100, - } - - // Since the block has Index 11 and the sync manager needs the block with index 10 - // This block will be added to the blockPool - err := syncmgr.OnBlock(peer, blockMessage) - assert.Nil(t, err) - assert.Equal(t, 1, len(syncmgr.blockPool)) - - // The sync manager is still looking for the block at height 10 - // Since this block is at height 12, it will be added to the block pool - blockMessage = randomBlockMessage(t, 12) - err = syncmgr.OnBlock(peer, blockMessage) - assert.Nil(t, err) - assert.Equal(t, 2, len(syncmgr.blockPool)) - - // This is the block that the sync manager was waiting for - // It should process this block, the check the pool for the next set of blocks - blockMessage = randomBlockMessage(t, 10) - err = syncmgr.OnBlock(peer, blockMessage) - assert.Nil(t, err) - assert.Equal(t, 0, len(syncmgr.blockPool)) - - // Since we processed 3 blocks and the sync manager started - //looking for block with index 10. The syncmananger should be looking for - // the block with index 13 - assert.Equal(t, uint32(13), syncmgr.nextBlockIndex) -} diff --git a/_pkg.dev/syncmgr/config.go b/_pkg.dev/syncmgr/config.go deleted file mode 100644 index 43387d414..000000000 --- a/_pkg.dev/syncmgr/config.go +++ /dev/null @@ -1,44 +0,0 @@ -package syncmgr - -import ( - "github.com/CityOfZion/neo-go/pkg/wire/payload" - "github.com/CityOfZion/neo-go/pkg/wire/util" -) - -// Config is the configuration file for the sync manager -type Config struct { - - // Chain functions - ProcessBlock func(block payload.Block) error - ProcessHeaders func(hdrs []*payload.BlockBase) error - - // RequestHeaders will send a getHeaders request - // with the hash passed in as a parameter - RequestHeaders func(hash util.Uint256) error - - //RequestBlock will send a getdata request for the block - // with the hash passed as a parameter - RequestBlock func(hash util.Uint256, index uint32) error - - // GetNextBlockHash returns the block hash of the header infront of thr block - // at the tip of this nodes chain. This assumes that the node is not in sync - GetNextBlockHash func() (util.Uint256, error) - - // AskForNewBlocks will send out a message to the network - // asking for new blocks - AskForNewBlocks func() - - // FetchHeadersAgain is called when a peer has provided headers that have not - // validated properly. We pass in the hash of the first header - FetchHeadersAgain func(util.Uint256) error - - // FetchHeadersAgain is called when a peer has provided a block that has not - // validated properly. We pass in the hash of the block - FetchBlockAgain func(util.Uint256) error -} - -// SyncPeer represents a peer on the network -// that this node can sync with -type SyncPeer interface { - Height() uint32 -} diff --git a/_pkg.dev/syncmgr/headermode.go b/_pkg.dev/syncmgr/headermode.go deleted file mode 100644 index 3a8e4d681..000000000 --- a/_pkg.dev/syncmgr/headermode.go +++ /dev/null @@ -1,42 +0,0 @@ -package syncmgr - -import ( - "github.com/CityOfZion/neo-go/pkg/chain" - "github.com/CityOfZion/neo-go/pkg/wire/payload" -) - -// headersModeOnHeaders is called when the sync manager is headers mode -// and receives a header. -func (s *Syncmgr) headersModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error { - // If we are in Headers mode, then we just need to process the headers - // Note: For the un-optimised version, we move straight to blocksOnly mode - - firstHash := hdrs[0].Hash - firstHdrIndex := hdrs[0].Index - - err := s.cfg.ProcessHeaders(hdrs) - if err == nil { - // Update syncmgr last header - s.headerHash = hdrs[len(hdrs)-1].Hash - - s.syncmode = blockMode - return s.cfg.RequestBlock(firstHash, firstHdrIndex) - } - - // Check whether it is a validation error, or a database error - if _, ok := err.(*chain.ValidationError); ok { - // If we get a validation error we re-request the headers - // the method will automatically fetch from a different peer - // XXX: Add increment banScore for this peer - return s.cfg.FetchHeadersAgain(firstHash) - } - // This means it is a database error. We have no way to recover from this. - panic(err.Error()) -} - -// headersModeOnBlock is called when the sync manager is headers mode -// and receives a block. -func (s *Syncmgr) headersModeOnBlock(peer SyncPeer, block payload.Block) error { - // While in headers mode, ignore any blocks received - return nil -} diff --git a/_pkg.dev/syncmgr/mockhelpers_test.go b/_pkg.dev/syncmgr/mockhelpers_test.go deleted file mode 100644 index d95e95e6a..000000000 --- a/_pkg.dev/syncmgr/mockhelpers_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package syncmgr - -import ( - "crypto/rand" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/CityOfZion/neo-go/pkg/wire/payload" - "github.com/CityOfZion/neo-go/pkg/wire/util" -) - -type syncTestHelper struct { - blocksProcessed int - headersProcessed int - newBlockRequest int - headersFetchRequest int - blockFetchRequest int - err error -} - -func (s *syncTestHelper) ProcessBlock(msg payload.Block) error { - s.blocksProcessed++ - return s.err -} -func (s *syncTestHelper) ProcessHeaders(hdrs []*payload.BlockBase) error { - s.headersProcessed = s.headersProcessed + len(hdrs) - return s.err -} - -func (s *syncTestHelper) GetNextBlockHash() (util.Uint256, error) { - return util.Uint256{}, s.err -} - -func (s *syncTestHelper) AskForNewBlocks() { - s.newBlockRequest++ -} - -func (s *syncTestHelper) FetchHeadersAgain(util.Uint256) error { - s.headersFetchRequest++ - return s.err -} - -func (s *syncTestHelper) FetchBlockAgain(util.Uint256) error { - s.blockFetchRequest++ - return s.err -} - -func (s *syncTestHelper) RequestBlock(util.Uint256, uint32) error { - s.blockFetchRequest++ - return s.err -} - -func (s *syncTestHelper) RequestHeaders(util.Uint256) error { - s.headersFetchRequest++ - return s.err -} - -type mockPeer struct { - height uint32 -} - -func (p *mockPeer) Height() uint32 { return p.height } - -func randomHeadersMessage(t *testing.T, num int) *payload.HeadersMessage { - var hdrs []*payload.BlockBase - - for i := 0; i < num; i++ { - hash := randomUint256(t) - hdr := &payload.BlockBase{Hash: hash} - hdrs = append(hdrs, hdr) - } - - hdrsMsg, err := payload.NewHeadersMessage() - assert.Nil(t, err) - - hdrsMsg.Headers = hdrs - - return hdrsMsg -} - -func randomUint256(t *testing.T) util.Uint256 { - hash := make([]byte, 32) - _, err := rand.Read(hash) - assert.Nil(t, err) - - u, err := util.Uint256DecodeBytes(hash) - assert.Nil(t, err) - return u -} - -func setupSyncMgr(mode mode, nextBlockIndex uint32) (*Syncmgr, *syncTestHelper) { - helper := &syncTestHelper{} - - cfg := &Config{ - ProcessBlock: helper.ProcessBlock, - ProcessHeaders: helper.ProcessHeaders, - - GetNextBlockHash: helper.GetNextBlockHash, - AskForNewBlocks: helper.AskForNewBlocks, - - FetchHeadersAgain: helper.FetchHeadersAgain, - FetchBlockAgain: helper.FetchBlockAgain, - - RequestBlock: helper.RequestBlock, - RequestHeaders: helper.RequestHeaders, - } - - syncmgr := New(cfg, nextBlockIndex) - syncmgr.syncmode = mode - - return syncmgr, helper -} diff --git a/_pkg.dev/syncmgr/normalmode.go b/_pkg.dev/syncmgr/normalmode.go deleted file mode 100644 index ad22e52f2..000000000 --- a/_pkg.dev/syncmgr/normalmode.go +++ /dev/null @@ -1,60 +0,0 @@ -package syncmgr - -import ( - "github.com/CityOfZion/neo-go/pkg/wire/payload" -) - -func (s *Syncmgr) normalModeOnHeaders(peer SyncPeer, hdrs []*payload.BlockBase) error { - // If in normal mode, first process the headers - err := s.cfg.ProcessHeaders(hdrs) - if err != nil { - // If something went wrong with processing the headers - // Ask another peer for the headers. - //XXX: Increment banscore for this peer - return s.cfg.FetchHeadersAgain(hdrs[0].Hash) - } - - lenHeaders := len(hdrs) - firstHash := hdrs[0].Hash - firstHdrIndex := hdrs[0].Index - lastHash := hdrs[lenHeaders-1].Hash - - // Update syncmgr latest header - s.headerHash = lastHash - - // If there are 2k headers, then ask for more headers and switch back to headers mode. - if lenHeaders == 2000 { - s.syncmode = headersMode - return s.cfg.RequestHeaders(lastHash) - } - - // Ask for the corresponding block iff there is < 2k headers - // then switch to blocksMode - // Bounds state that len > 1 && len!= 2000 & maxHeadersInMessage == 2000 - // This means that we have less than 2k headers - s.syncmode = blockMode - return s.cfg.RequestBlock(firstHash, firstHdrIndex) -} - -// normalModeOnBlock is called when the sync manager is normal mode -// and receives a block. -func (s *Syncmgr) normalModeOnBlock(peer SyncPeer, block payload.Block) error { - // stop the timer that periodically asks for blocks - s.timer.Stop() - - // process block - err := s.processBlock(block) - if err != nil { - s.timer.Reset(blockTimer) - return s.cfg.FetchBlockAgain(block.Hash) - } - - diff := peer.Height() - block.Index - if diff > trailingHeight { - s.syncmode = headersMode - return s.cfg.RequestHeaders(block.Hash) - } - - s.timer.Reset(blockTimer) - return nil -} diff --git a/_pkg.dev/syncmgr/syncmgr.go b/_pkg.dev/syncmgr/syncmgr.go deleted file mode 100644 index dc639a8d3..000000000 --- a/_pkg.dev/syncmgr/syncmgr.go +++ /dev/null @@ -1,152 +0,0 @@ -package syncmgr - -import ( - "fmt" - "sync" - "time" - - "github.com/CityOfZion/neo-go/pkg/wire/payload" - "github.com/CityOfZion/neo-go/pkg/wire/util" -) - -type mode uint8 - -// Note: this is the unoptimised version without parallel sync -// The algorithm for the unoptimsied version is simple: -// Download 2000 headers, then download the blocks for those headers -// Once those blocks are downloaded, we repeat the process again -// Until we are nomore than one block behind the tip. -// Once this happens, we switch into normal mode. -//In normal mode, we have a timer on for X seconds and ask nodes for blocks and also to doublecheck -// if we are behind once the timer runs out. -// The timer restarts whenever we receive a block. -// The parameter X should be approximately the time it takes the network to reach consensus - -//blockTimer approximates to how long it takes to reach consensus and propagate -// a block in the network. Once a node has synchronised with the network, he will -// ask the network for a newblock every blockTimer -const blockTimer = 20 * time.Second - -// trailingHeight indicates how many blocks the node has to be behind by -// before he switches to headersMode. -const trailingHeight = 100 - -// indicates how many blocks the node has to be behind by -// before he switches to normalMode and fetches blocks every X seconds. -const cruiseHeight = 0 - -const ( - headersMode mode = 1 - blockMode mode = 2 - normalMode mode = 3 -) - -//Syncmgr keeps the node in sync with the rest of the network -type Syncmgr struct { - syncmode mode - cfg *Config - timer *time.Timer - - // headerHash is the hash of the last header in the last OnHeaders message that we received. - // When receiving blocks, we can use this to determine whether the node has downloaded - // all of the blocks for the last headers messages - headerHash util.Uint256 - - poolLock sync.Mutex - blockPool []payload.Block - nextBlockIndex uint32 -} - -// New creates a new sync manager -func New(cfg *Config, nextBlockIndex uint32) *Syncmgr { - - newBlockTimer := time.AfterFunc(blockTimer, func() { - cfg.AskForNewBlocks() - }) - newBlockTimer.Stop() - - return &Syncmgr{ - syncmode: headersMode, - cfg: cfg, - timer: newBlockTimer, - nextBlockIndex: nextBlockIndex, - } -} - -// OnHeader is called when the node receives a headers message -func (s *Syncmgr) OnHeader(peer SyncPeer, msg *payload.HeadersMessage) error { - - // XXX(Optimisation): First check if we actually need these headers - // Check the last header in msg and then check what our latest header that was saved is - // If our latest header is above the lastHeader, then we do not save it - // We could also have that our latest header is above only some of the headers. - // In this case, we should remove the headers that we already have - - if len(msg.Headers) == 0 { - // XXX: Increment banScore for this peer, for sending empty headers message - return nil - } - - var err error - - switch s.syncmode { - case headersMode: - err = s.headersModeOnHeaders(peer, msg.Headers) - case blockMode: - err = s.blockModeOnHeaders(peer, msg.Headers) - case normalMode: - err = s.normalModeOnHeaders(peer, msg.Headers) - default: - err = s.headersModeOnHeaders(peer, msg.Headers) - } - - // XXX(Kev):The only meaningful error here would be if the peer - // we re-requested blocks from failed. In the next iteration, this will be handled - // by the peer manager, who will only return an error, if we are connected to no peers. - // Upon re-alising this, the node will then send out GetAddresses to the network and - // syncing will be resumed, once we find peers to connect to. - - hdr := msg.Headers[len(msg.Headers)-1] - fmt.Printf("Finished processing headers. LastHash in set was: %s\n ", hdr.Hash.ReverseString()) - - return err -} - -// OnBlock is called when the node receives a block -func (s *Syncmgr) OnBlock(peer SyncPeer, msg *payload.BlockMessage) error { - fmt.Printf("Block received with height %d\n", msg.Block.Index) - - var err error - - switch s.syncmode { - case headersMode: - err = s.headersModeOnBlock(peer, msg.Block) - case blockMode: - err = s.blockModeOnBlock(peer, msg.Block) - case normalMode: - err = s.normalModeOnBlock(peer, msg.Block) - default: - err = s.headersModeOnBlock(peer, msg.Block) - } - - fmt.Printf("Processed Block with height %d\n", msg.Block.Index) - - return err -} - -//IsCurrent returns true if the node is currently -// synced up with the network -func (s *Syncmgr) IsCurrent() bool { - return s.syncmode == normalMode -} - -func (s *Syncmgr) processBlock(block payload.Block) error { - err := s.cfg.ProcessBlock(block) - if err != nil { - return err - } - - s.nextBlockIndex++ - - return nil -} diff --git a/_pkg.dev/syncmgr/syncmgr_onblock_test.go b/_pkg.dev/syncmgr/syncmgr_onblock_test.go deleted file mode 100644 index d08aa8a74..000000000 --- a/_pkg.dev/syncmgr/syncmgr_onblock_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package syncmgr - -import ( - "testing" - - "github.com/CityOfZion/neo-go/pkg/chain" - - "github.com/CityOfZion/neo-go/pkg/wire/payload" - "github.com/stretchr/testify/assert" -) - -func TestHeadersModeOnBlock(t *testing.T) { - - syncmgr, helper := setupSyncMgr(headersMode, 0) - - syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) - - // In headerMode, we do nothing - assert.Equal(t, 0, helper.blocksProcessed) -} - -func TestBlockModeOnBlock(t *testing.T) { - - syncmgr, helper := setupSyncMgr(blockMode, 0) - - syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) - - // When a block is received in blockMode, it is processed - assert.Equal(t, 1, helper.blocksProcessed) -} -func TestNormalModeOnBlock(t *testing.T) { - - syncmgr, helper := setupSyncMgr(normalMode, 0) - - syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 0)) - - // When a block is received in normal, it is processed - assert.Equal(t, 1, helper.blocksProcessed) -} - -func TestBlockModeToNormalMode(t *testing.T) { - - syncmgr, _ := setupSyncMgr(blockMode, 100) - - peer := &mockPeer{ - height: 100, - } - - blkMessage := randomBlockMessage(t, 100) - - syncmgr.OnBlock(peer, blkMessage) - - // We should switch to normal mode, since the block - //we received is close to the height of the peer. See cruiseHeight - assert.Equal(t, normalMode, syncmgr.syncmode) - -} -func TestBlockModeStayInBlockMode(t *testing.T) { - - syncmgr, _ := setupSyncMgr(blockMode, 0) - - // We need our latest know hash to not be equal to the hash - // of the block we received, to stay in blockmode - syncmgr.headerHash = randomUint256(t) - - peer := &mockPeer{ - height: 2000, - } - - blkMessage := randomBlockMessage(t, 100) - - syncmgr.OnBlock(peer, blkMessage) - - // We should stay in block mode, since the block we received is - // still quite far behind the peers height - assert.Equal(t, blockMode, syncmgr.syncmode) -} -func TestBlockModeAlreadyExistsErr(t *testing.T) { - - syncmgr, helper := setupSyncMgr(blockMode, 100) - helper.err = chain.ErrBlockAlreadyExists - - syncmgr.OnBlock(&mockPeer{}, randomBlockMessage(t, 100)) - - assert.Equal(t, 0, helper.blockFetchRequest) - - // If we have a block already exists in blockmode, then we - // switch back to headers mode. - assert.Equal(t, headersMode, syncmgr.syncmode) -} - -func randomBlockMessage(t *testing.T, height uint32) *payload.BlockMessage { - blockMessage, err := payload.NewBlockMessage() - blockMessage.BlockBase.Index = height - assert.Nil(t, err) - return blockMessage -} diff --git a/_pkg.dev/syncmgr/syncmgr_onheaders_test.go b/_pkg.dev/syncmgr/syncmgr_onheaders_test.go deleted file mode 100644 index f2bcef3f3..000000000 --- a/_pkg.dev/syncmgr/syncmgr_onheaders_test.go +++ /dev/null @@ -1,117 +0,0 @@ -package syncmgr - -import ( - "testing" - - "github.com/CityOfZion/neo-go/pkg/chain" - - "github.com/stretchr/testify/assert" - - "github.com/CityOfZion/neo-go/pkg/wire/util" -) - -func TestHeadersModeOnHeaders(t *testing.T) { - - syncmgr, helper := setupSyncMgr(headersMode, 0) - - syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 0)) - - // Since there were no headers, we should have exited early and processed nothing - assert.Equal(t, 0, helper.headersProcessed) - - // ProcessHeaders should have been called once to process all 100 headers - syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100)) - assert.Equal(t, 100, helper.headersProcessed) - - // Mode should now be blockMode - assert.Equal(t, blockMode, syncmgr.syncmode) - -} - -func TestBlockModeOnHeaders(t *testing.T) { - syncmgr, helper := setupSyncMgr(blockMode, 0) - - // If we receive a header in blockmode, no headers will be processed - syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 100)) - assert.Equal(t, 0, helper.headersProcessed) -} -func TestNormalModeOnHeadersMaxHeaders(t *testing.T) { - syncmgr, helper := setupSyncMgr(normalMode, 0) - - // If we receive a header in normalmode, headers will be processed - syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 2000)) - assert.Equal(t, 2000, helper.headersProcessed) - - // Mode should now be headersMode since we received 2000 headers - assert.Equal(t, headersMode, syncmgr.syncmode) -} - -// This differs from the previous function in that -//we did not receive the max amount of headers -func TestNormalModeOnHeaders(t *testing.T) { - syncmgr, helper := setupSyncMgr(normalMode, 0) - - // If we receive a header in normalmode, headers will be processed - syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) - assert.Equal(t, 200, helper.headersProcessed) - - // Because we did not receive 2000 headers, we switch to blockMode - assert.Equal(t, blockMode, syncmgr.syncmode) -} - -func TestLastHeaderUpdates(t *testing.T) { - syncmgr, _ := setupSyncMgr(headersMode, 0) - - hdrsMessage := randomHeadersMessage(t, 200) - hdrs := hdrsMessage.Headers - lastHeader := hdrs[len(hdrs)-1] - - syncmgr.OnHeader(&mockPeer{}, hdrsMessage) - - // Headers are processed in headersMode - // Last header should be updated - assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash)) - - // Change mode to blockMode and reset lastHeader - syncmgr.syncmode = blockMode - syncmgr.headerHash = util.Uint256{} - - syncmgr.OnHeader(&mockPeer{}, hdrsMessage) - - // header should not be changed - assert.False(t, syncmgr.headerHash.Equals(lastHeader.Hash)) - - // Change mode to normalMode and reset lastHeader - syncmgr.syncmode = normalMode - syncmgr.headerHash = util.Uint256{} - - syncmgr.OnHeader(&mockPeer{}, hdrsMessage) - - // headers are processed in normalMode - // hash should be updated - assert.True(t, syncmgr.headerHash.Equals(lastHeader.Hash)) - -} - -func TestHeadersModeOnHeadersErr(t *testing.T) { - - syncmgr, helper := setupSyncMgr(headersMode, 0) - helper.err = &chain.ValidationError{} - - syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) - - // On a validation error, we should request for another peer - // to send us these headers - assert.Equal(t, 1, helper.headersFetchRequest) -} - -func TestNormalModeOnHeadersErr(t *testing.T) { - syncmgr, helper := setupSyncMgr(normalMode, 0) - helper.err = &chain.ValidationError{} - - syncmgr.OnHeader(&mockPeer{}, randomHeadersMessage(t, 200)) - - // On a validation error, we should request for another peer - // to send us these headers - assert.Equal(t, 1, helper.headersFetchRequest) -}