diff --git a/pkg/peer/peer.go b/pkg/peer/peer.go index 94f9c685c..d1594d6de 100644 --- a/pkg/peer/peer.go +++ b/pkg/peer/peer.go @@ -175,11 +175,10 @@ func (p *Peer) IsVerackReceived() bool { //NotifyDisconnect returns once the peer has disconnected // Blocking -func (p *Peer) NotifyDisconnect() bool { +func (p *Peer) NotifyDisconnect() { fmt.Println("Peer has not disconnected yet") <-p.quitch fmt.Println("Peer has just disconnected") - return true } //End of Exposed API functions// diff --git a/pkg/peermgr/peermgr.go b/pkg/peermgr/peermgr.go new file mode 100644 index 000000000..047a109c3 --- /dev/null +++ b/pkg/peermgr/peermgr.go @@ -0,0 +1,122 @@ +package peermgr + +import ( + "errors" + "sync" + + "github.com/CityOfZion/neo-go/pkg/wire/command" + + "github.com/CityOfZion/neo-go/pkg/wire/util" +) + +var ( + //ErrNoAvailablePeers is returned when a request for data from a peer is invoked + // but there are no available peers to request data from + ErrNoAvailablePeers = errors.New("there are no available peers to interact with") + + // ErrUnknownPeer is returned when a peer that the peer manager does not know about + // sends a message to this node + ErrUnknownPeer = errors.New("this peer has not been registered with the peer manager") +) + +//mPeer represents a peer that is managed by the peer manager +type mPeer interface { + Disconnect() + RequestBlocks([]util.Uint256) error + RequestHeaders(util.Uint256) error + NotifyDisconnect() +} + +type peerstats struct { + requests map[command.Type]bool +} + +//PeerMgr manages all peers that the node is connected to +type PeerMgr struct { + pLock sync.RWMutex + peers map[mPeer]peerstats +} + +//New returns a new peermgr object +func New() *PeerMgr { + return &PeerMgr{ + peers: make(map[mPeer]peerstats), + } +} + +// AddPeer adds a peer to the list of managed peers +func (pmgr *PeerMgr) AddPeer(peer mPeer) { + + pmgr.pLock.Lock() + defer pmgr.pLock.Unlock() + if _, exists := pmgr.peers[peer]; exists { + return + } + pmgr.peers[peer] = peerstats{requests: make(map[command.Type]bool)} + go pmgr.onDisconnect(peer) +} + +//MsgReceived notifies the peer manager that we have received a +// message from a peer +func (pmgr *PeerMgr) MsgReceived(peer mPeer, cmd command.Type) error { + pmgr.pLock.Lock() + defer pmgr.pLock.Unlock() + val, ok := pmgr.peers[peer] + if !ok { + + go func() { + peer.NotifyDisconnect() + }() + + peer.Disconnect() + return ErrUnknownPeer + } + val.requests[cmd] = false + + return nil +} + +// Len returns the amount of peers that the peer manager +//currently knows about +func (pmgr *PeerMgr) Len() int { + pmgr.pLock.Lock() + defer pmgr.pLock.Unlock() + return len(pmgr.peers) +} + +// RequestBlock will request a block from the most +// available peer. Then update it's stats, so we know that +// this peer is busy +func (pmgr *PeerMgr) RequestBlock(hash util.Uint256) error { + return pmgr.callPeerForCmd(command.Block, func(p mPeer) error { + return p.RequestBlocks([]util.Uint256{hash}) + }) +} + +// RequestHeaders will request a headers from the most available peer. +func (pmgr *PeerMgr) RequestHeaders(hash util.Uint256) error { + return pmgr.callPeerForCmd(command.Headers, func(p mPeer) error { + return p.RequestHeaders(hash) + }) +} + +func (pmgr *PeerMgr) callPeerForCmd(cmd command.Type, f func(p mPeer) error) error { + pmgr.pLock.Lock() + defer pmgr.pLock.Unlock() + for peer, stats := range pmgr.peers { + if !stats.requests[cmd] { + stats.requests[cmd] = true + return f(peer) + } + } + return ErrNoAvailablePeers +} +func (pmgr *PeerMgr) onDisconnect(p mPeer) { + + // Blocking until peer is disconnected + p.NotifyDisconnect() + + pmgr.pLock.Lock() + delete(pmgr.peers, p) + pmgr.pLock.Unlock() +} diff --git a/pkg/peermgr/peermgr_test.go b/pkg/peermgr/peermgr_test.go new file mode 100644 index 000000000..86d5be41c --- /dev/null +++ b/pkg/peermgr/peermgr_test.go @@ -0,0 +1,179 @@ +package peermgr + +import ( + "testing" + + "github.com/CityOfZion/neo-go/pkg/wire/command" + + "github.com/CityOfZion/neo-go/pkg/wire/util" + "github.com/stretchr/testify/assert" +) + +type peer struct { + quit chan bool + nonce int + disconnected bool + blockRequested int + headersRequested int +} + +func (p *peer) Disconnect() { + p.disconnected = true + p.quit <- true +} +func (p *peer) RequestBlocks([]util.Uint256) error { + p.blockRequested++ + return nil +} +func (p *peer) RequestHeaders(util.Uint256) error { + p.headersRequested++ + return nil +} +func (p *peer) NotifyDisconnect() { + <-p.quit +} + +func TestAddPeer(t *testing.T) { + pmgr := New() + + peerA := &peer{nonce: 1} + peerB := &peer{nonce: 2} + peerC := &peer{nonce: 3} + + pmgr.AddPeer(peerA) + pmgr.AddPeer(peerB) + pmgr.AddPeer(peerC) + pmgr.AddPeer(peerC) + + assert.Equal(t, 3, pmgr.Len()) +} + +func TestRequestBlocks(t *testing.T) { + pmgr := New() + + peerA := &peer{nonce: 1} + peerB := &peer{nonce: 2} + peerC := &peer{nonce: 3} + + pmgr.AddPeer(peerA) + pmgr.AddPeer(peerB) + pmgr.AddPeer(peerC) + + err := pmgr.RequestBlock(util.Uint256{}) + assert.Nil(t, err) + + err = pmgr.RequestBlock(util.Uint256{}) + assert.Nil(t, err) + + err = pmgr.RequestBlock(util.Uint256{}) + assert.Nil(t, err) + + // Since the peer manager did not get a MsgReceived + // in between the block requests + // a request should be sent to all peers + + assert.Equal(t, 1, peerA.blockRequested) + assert.Equal(t, 1, peerB.blockRequested) + assert.Equal(t, 1, peerC.blockRequested) + + // Since the peer manager still has not received a MsgReceived + // another call to request blocks, will return a NoAvailablePeerError + + err = pmgr.RequestBlock(util.Uint256{}) + assert.Equal(t, ErrNoAvailablePeers, err) + + // If we tell the peer manager that peerA has given us a block + // then send another BlockRequest. It will go to peerA + // since the other two peers are still busy with their + // block requests + + pmgr.MsgReceived(peerA, command.Block) + err = pmgr.RequestBlock(util.Uint256{}) + assert.Nil(t, err) + + assert.Equal(t, 2, peerA.blockRequested) + assert.Equal(t, 1, peerB.blockRequested) + assert.Equal(t, 1, peerC.blockRequested) +} +func TestRequestHeaders(t *testing.T) { + pmgr := New() + + peerA := &peer{nonce: 1} + peerB := &peer{nonce: 2} + peerC := &peer{nonce: 3} + + pmgr.AddPeer(peerA) + pmgr.AddPeer(peerB) + pmgr.AddPeer(peerC) + + err := pmgr.RequestHeaders(util.Uint256{}) + assert.Nil(t, err) + + err = pmgr.RequestHeaders(util.Uint256{}) + assert.Nil(t, err) + + err = pmgr.RequestHeaders(util.Uint256{}) + assert.Nil(t, err) + + // Since the peer manager did not get a MsgReceived + // in between the header requests + // a request should be sent to all peers + + assert.Equal(t, 1, peerA.headersRequested) + assert.Equal(t, 1, peerB.headersRequested) + assert.Equal(t, 1, peerC.headersRequested) + + // Since the peer manager still has not received a MsgReceived + // another call to request header, will return a NoAvailablePeerError + + err = pmgr.RequestHeaders(util.Uint256{}) + assert.Equal(t, ErrNoAvailablePeers, err) + + // If we tell the peer manager that peerA has given us a block + // then send another BlockRequest. It will go to peerA + // since the other two peers are still busy with their + // block requests + + err = pmgr.MsgReceived(peerA, command.Headers) + assert.Nil(t, err) + err = pmgr.RequestHeaders(util.Uint256{}) + assert.Nil(t, err) + + assert.Equal(t, 2, peerA.headersRequested) + assert.Equal(t, 1, peerB.headersRequested) + assert.Equal(t, 1, peerC.headersRequested) +} + +func TestUnknownPeer(t *testing.T) { + pmgr := New() + + unknownPeer := &peer{ + disconnected: false, + quit: make(chan bool), + } + + err := pmgr.MsgReceived(unknownPeer, command.Block) + assert.Equal(t, true, unknownPeer.disconnected) + assert.Equal(t, ErrUnknownPeer, err) +} + +func TestNotifyDisconnect(t *testing.T) { + pmgr := New() + + peerA := &peer{ + nonce: 1, + quit: make(chan bool), + } + + pmgr.AddPeer(peerA) + + if pmgr.Len() != 1 { + t.Fail() + } + + peerA.Disconnect() + + if pmgr.Len() != 0 { + t.Fail() + } +}